Skip to content

Remove unused IOException from Backoff and BackoffUtils. #35445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ public Boolean call() {
}
// Sleeping a while if there is a problem with the work, then go on with the next work.
} while (success || BackOffUtils.next(sleeper, backOff));
} catch (IOException e) { // Failure of BackOff.
LOG.error("Already tried several attempts at working on tasks. Aborting.", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted during thread execution or sleep.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
return Optional.empty();
}
} catch (IOException ioe) {
LOG.warn("Error backing off, will not retry: ", ioe);
return Optional.empty();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.runners.dataflow.worker.windmill.client;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -211,8 +210,6 @@ private void startStream() {
// Shutdown the stream to clean up any dangling resources and pending requests.
shutdown();
break;
} catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
Expand Down Expand Up @@ -375,11 +372,7 @@ private class ResponseObserver implements StreamObserver<ResponseT> {

@Override
public void onNext(ResponseT response) {
try {
backoff.reset();
} catch (IOException e) {
// Ignore.
}
backoff.reset();
debugMetrics.recordResponse();
onResponse(response);
}
Expand All @@ -400,8 +393,6 @@ public void onError(Throwable t) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (IOException e) {
// Ignore.
}

executeSafely(AbstractWindmillStream.this::startStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,8 @@ private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new WindmillRpcException(e);
}
} catch (IOException | InterruptedException i) {
if (i instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} catch (InterruptedException i) {
Thread.currentThread().interrupt();
WindmillRpcException rpcException = new WindmillRpcException(e);
rpcException.addSuppressed(i);
throw rpcException;
Expand Down
21 changes: 9 additions & 12 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
*/
package org.apache.beam.sdk.util;

import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;

/**
* Back-off policy when retrying an operation.
*
* <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
*/
/** Back-off policy when retrying an operation. */
@Internal
public interface BackOff {

/** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */
long STOP = -1L;

/** Reset to initial state. */
void reset() throws IOException;
void reset();

/**
* Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
Expand All @@ -47,7 +44,7 @@ public interface BackOff {
* }
* </pre>
*/
long nextBackOffMillis() throws IOException;
long nextBackOffMillis();

/**
* Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
Expand All @@ -57,10 +54,10 @@ public interface BackOff {
new BackOff() {

@Override
public void reset() throws IOException {}
public void reset() {}

@Override
public long nextBackOffMillis() throws IOException {
public long nextBackOffMillis() {
return 0;
}
};
Expand All @@ -73,10 +70,10 @@ public long nextBackOffMillis() throws IOException {
new BackOff() {

@Override
public void reset() throws IOException {}
public void reset() {}

@Override
public long nextBackOffMillis() throws IOException {
public long nextBackOffMillis() {
return STOP;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
*/
package org.apache.beam.sdk.util;

import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;

/**
* Utilities for {@link BackOff}.
*
* <p><b>Note</b>: This is copied from Google API client library to avoid its dependency.
*/
/** Utilities for {@link BackOff}. */
@Internal
public final class BackOffUtils {

/**
Expand All @@ -39,8 +36,7 @@ public final class BackOffUtils {
* BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP}
* @throws InterruptedException if any thread has interrupted the current thread
*/
public static boolean next(Sleeper sleeper, BackOff backOff)
throws InterruptedException, IOException {
public static boolean next(Sleeper sleeper, BackOff backOff) throws InterruptedException {
long backOffTime = backOff.nextBackOffMillis();
if (backOffTime == BackOff.STOP) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for
Expand Down Expand Up @@ -91,7 +89,6 @@ public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
* response is empty or if the consumer is throttled by AWS.
*/
class DefaultRateLimiter implements RateLimitPolicy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultRateLimiter.class);
private final Sleeper sleeper;
private final BackOff emptySuccess;
private final BackOff throttled;
Expand Down Expand Up @@ -122,25 +119,17 @@ public DefaultRateLimiter(

@Override
public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
try {
if (records.isEmpty()) {
BackOffUtils.next(sleeper, emptySuccess);
} else {
emptySuccess.reset();
}
throttled.reset();
} catch (IOException e) {
LOG.warn("Error applying onSuccess rate limit policy", e);
if (records.isEmpty()) {
BackOffUtils.next(sleeper, emptySuccess);
} else {
emptySuccess.reset();
}
throttled.reset();
}

@Override
public void onThrottle(KinesisClientThrottledException e) throws InterruptedException {
try {
BackOffUtils.next(sleeper, throttled);
} catch (IOException ioe) {
LOG.warn("Error applying onThrottle rate limit policy", e);
}
BackOffUtils.next(sleeper, throttled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static List<Shard> listShardsAtPoint(

static ShardFilter buildShardFilterForStartingPoint(
KinesisClient kinesisClient, String streamName, StartingPoint startingPoint)
throws IOException, InterruptedException {
throws InterruptedException {
InitialPositionInStream position = startingPoint.getPosition();
switch (position) {
case LATEST:
Expand All @@ -77,8 +77,8 @@ static ShardFilter buildShardFilterForStartingPoint(
}

private static ShardFilter buildShardFilterForTimestamp(
KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp)
throws IOException, InterruptedException {
KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp) throws
InterruptedException {
StreamDescriptionSummary streamDescription = describeStreamSummary(kinesisClient, streamName);

Instant streamCreationTimestamp = TimeUtil.toJoda(streamDescription.streamCreationTimestamp());
Expand All @@ -104,7 +104,7 @@ private static ShardFilter buildShardFilterForTimestamp(

private static StreamDescriptionSummary describeStreamSummary(
KinesisClient kinesisClient, final String streamName)
throws IOException, InterruptedException {
throws InterruptedException {
// DescribeStreamSummary has limits that can be hit fairly easily if we are attempting
// to configure multiple KinesisIO inputs in the same account. Retry up to
// DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS times if we end up hitting that limit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void close() throws Exception {
}

// commit the list of entities to datastore
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
private void flushBatch() throws DatastoreException, InterruptedException {
LOG.info("Writing batch of {} entities", entities.size());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void clear() {
}

@Before
public void initTest() throws IOException, InterruptedException {
public void initTest() throws InterruptedException {
BackOff backOff = FluentBackoff.DEFAULT.withMaxRetries(4).backoff();
Query createQuery = new Query(String.format("CREATE DATABASE %s", options.getDatabaseName()));
try (InfluxDB connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.net.URLClassLoader;
import java.sql.Connection;
Expand Down Expand Up @@ -2844,7 +2843,7 @@ private void cleanUpStatementAndConnection() throws Exception {
}

private void executeBatch(ProcessContext context, Iterable<T> records)
throws SQLException, IOException, InterruptedException {
throws SQLException, InterruptedException {
Long startTimeNs = System.nanoTime();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = checkStateNotNull(retryBackOff).backoff();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -353,8 +352,6 @@ private void backoffIfNeeded(BackOff backOff, Sleeper sleeper) {
incIfPresent(sleeperCounter);
sleeper.sleep(backOff.nextBackOffMillis());
} catch (InterruptedException ignored) {
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Optional;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.util.BackOff;
Expand Down Expand Up @@ -119,12 +118,8 @@ OutputT apply(InputT input) throws UserCodeExecutionException {
latestError = Optional.of(e);
} catch (InterruptedException ignored) {
}
try {
incIfPresent(getBackoffCounter());
waitFor = getBackOff().nextBackOffMillis();
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
incIfPresent(getBackoffCounter());
waitFor = getBackOff().nextBackOffMillis();
}
throw latestError.orElse(
new UserCodeExecutionException("failed to process for input: " + input));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ private static class CustomBackOffSupplier implements SerializableSupplier<BackO
public BackOff get() {
return new BackOff() {
@Override
public void reset() throws IOException {}
public void reset() {}

@Override
public long nextBackOffMillis() throws IOException {
public long nextBackOffMillis() {
counter.inc();
return 0;
}
Expand Down
Loading