From 22f6b30c43d9cbd510713e6f0712a2d48708f446 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 26 Jun 2025 12:19:35 +0200 Subject: [PATCH 1/2] Remove unused IOException from Backoff and BackoffUtils. Update some now unnecessary call-site handling. --- .../worker/DataflowBatchWorkerHarness.java | 2 -- ...reamingEngineComputationConfigFetcher.java | 3 --- .../client/grpc/GrpcWindmillServer.java | 6 ++---- .../org/apache/beam/sdk/util/BackOff.java | 21 ++++++++----------- .../apache/beam/sdk/util/BackOffUtils.java | 12 ++++------- .../aws2/kinesis/RateLimitPolicyFactory.java | 20 ++++++------------ .../io/aws2/kinesis/ShardListingUtils.java | 8 +++---- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +- .../beam/sdk/io/influxdb/InfluxDbIOIT.java | 2 +- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 3 +-- .../RequestResponseIOTest.java | 4 ++-- 11 files changed, 30 insertions(+), 53 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java index a1c93cdc5782..0afae3ef2da7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java @@ -133,8 +133,6 @@ public Boolean call() { } // Sleeping a while if there is a problem with the work, then go on with the next work. } while (success || BackOffUtils.next(sleeper, backOff)); - } catch (IOException e) { // Failure of BackOff. - LOG.error("Already tried several attempts at working on tasks. Aborting.", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Interrupted during thread execution or sleep.", e); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 5235679f9122..73f2afc04223 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -146,9 +146,6 @@ private static Optional fetchConfigWithRetry( if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { return Optional.empty(); } - } catch (IOException ioe) { - LOG.warn("Error backing off, will not retry: ", ioe); - return Optional.empty(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return Optional.empty(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 62a8f53e7ef6..bd3971599c71 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -291,10 +291,8 @@ private ResponseT callWithBackoff(Supplier function) { if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { throw new WindmillRpcException(e); } - } catch (IOException | InterruptedException i) { - if (i instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } + } catch (InterruptedException i) { + Thread.currentThread().interrupt(); WindmillRpcException rpcException = new WindmillRpcException(e); rpcException.addSuppressed(i); throw rpcException; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java index a3ea30bbb810..130dc15952b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java @@ -17,20 +17,17 @@ */ package org.apache.beam.sdk.util; -import java.io.IOException; +import org.apache.beam.sdk.annotations.Internal; -/** - * Back-off policy when retrying an operation. - * - *

Note: This interface is copied from Google API client library to avoid its dependency. - */ +/** Back-off policy when retrying an operation. */ +@Internal public interface BackOff { /** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */ long STOP = -1L; /** Reset to initial state. */ - void reset() throws IOException; + void reset(); /** * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to @@ -47,7 +44,7 @@ public interface BackOff { * } * */ - long nextBackOffMillis() throws IOException; + long nextBackOffMillis(); /** * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried @@ -57,10 +54,10 @@ public interface BackOff { new BackOff() { @Override - public void reset() throws IOException {} + public void reset() {} @Override - public long nextBackOffMillis() throws IOException { + public long nextBackOffMillis() { return 0; } }; @@ -73,10 +70,10 @@ public long nextBackOffMillis() throws IOException { new BackOff() { @Override - public void reset() throws IOException {} + public void reset() {} @Override - public long nextBackOffMillis() throws IOException { + public long nextBackOffMillis() { return STOP; } }; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java index 975cc744d0c1..1bce9d24c463 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java @@ -17,13 +17,10 @@ */ package org.apache.beam.sdk.util; -import java.io.IOException; +import org.apache.beam.sdk.annotations.Internal; -/** - * Utilities for {@link BackOff}. - * - *

Note: This is copied from Google API client library to avoid its dependency. - */ +/** Utilities for {@link BackOff}. */ +@Internal public final class BackOffUtils { /** @@ -39,8 +36,7 @@ public final class BackOffUtils { * BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP} * @throws InterruptedException if any thread has interrupted the current thread */ - public static boolean next(Sleeper sleeper, BackOff backOff) - throws InterruptedException, IOException { + public static boolean next(Sleeper sleeper, BackOff backOff) throws InterruptedException { long backOffTime = backOff.nextBackOffMillis(); if (backOffTime == BackOff.STOP) { return false; diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java index 34d9562187ee..f64b76e9ba7b 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java @@ -122,25 +122,17 @@ public DefaultRateLimiter( @Override public void onSuccess(List records) throws InterruptedException { - try { - if (records.isEmpty()) { - BackOffUtils.next(sleeper, emptySuccess); - } else { - emptySuccess.reset(); - } - throttled.reset(); - } catch (IOException e) { - LOG.warn("Error applying onSuccess rate limit policy", e); + if (records.isEmpty()) { + BackOffUtils.next(sleeper, emptySuccess); + } else { + emptySuccess.reset(); } + throttled.reset(); } @Override public void onThrottle(KinesisClientThrottledException e) throws InterruptedException { - try { - BackOffUtils.next(sleeper, throttled); - } catch (IOException ioe) { - LOG.warn("Error applying onThrottle rate limit policy", e); - } + BackOffUtils.next(sleeper, throttled); } } } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java index cb5f609b9ec7..3bf39813e2bf 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java @@ -60,7 +60,7 @@ static List listShardsAtPoint( static ShardFilter buildShardFilterForStartingPoint( KinesisClient kinesisClient, String streamName, StartingPoint startingPoint) - throws IOException, InterruptedException { + throws InterruptedException { InitialPositionInStream position = startingPoint.getPosition(); switch (position) { case LATEST: @@ -77,8 +77,8 @@ static ShardFilter buildShardFilterForStartingPoint( } private static ShardFilter buildShardFilterForTimestamp( - KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp) - throws IOException, InterruptedException { + KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp) throws + InterruptedException { StreamDescriptionSummary streamDescription = describeStreamSummary(kinesisClient, streamName); Instant streamCreationTimestamp = TimeUtil.toJoda(streamDescription.streamCreationTimestamp()); @@ -104,7 +104,7 @@ private static ShardFilter buildShardFilterForTimestamp( private static StreamDescriptionSummary describeStreamSummary( KinesisClient kinesisClient, final String streamName) - throws IOException, InterruptedException { + throws InterruptedException { // DescribeStreamSummary has limits that can be hit fairly easily if we are attempting // to configure multiple KinesisIO inputs in the same account. Retry up to // DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS times if we end up hitting that limit. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index ae044ae2926e..dbe689c05759 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -293,7 +293,7 @@ void close() throws Exception { } // commit the list of entities to datastore - private void flushBatch() throws DatastoreException, IOException, InterruptedException { + private void flushBatch() throws DatastoreException, InterruptedException { LOG.info("Writing batch of {} entities", entities.size()); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = diff --git a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java index da725ab541b3..2d0513f0de23 100644 --- a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java +++ b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java @@ -95,7 +95,7 @@ public void clear() { } @Before - public void initTest() throws IOException, InterruptedException { + public void initTest() throws InterruptedException { BackOff backOff = FluentBackoff.DEFAULT.withMaxRetries(4).backoff(); Query createQuery = new Query(String.format("CREATE DATABASE %s", options.getDatabaseName())); try (InfluxDB connection = diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index abc0af81f568..2c0ad23c5638 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -26,7 +26,6 @@ import com.google.auto.value.AutoValue; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.IOException; import java.io.Serializable; import java.net.URLClassLoader; import java.sql.Connection; @@ -2844,7 +2843,7 @@ private void cleanUpStatementAndConnection() throws Exception { } private void executeBatch(ProcessContext context, Iterable records) - throws SQLException, IOException, InterruptedException { + throws SQLException, InterruptedException { Long startTimeNs = System.nanoTime(); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = checkStateNotNull(retryBackOff).backoff(); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java index 4cbadf237336..aaf28cb9ad92 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -472,10 +472,10 @@ private static class CustomBackOffSupplier implements SerializableSupplier Date: Thu, 26 Jun 2025 14:58:18 +0200 Subject: [PATCH 2/2] fix a few more sites --- .../windmill/client/AbstractWindmillStream.java | 11 +---------- .../sdk/io/aws2/kinesis/RateLimitPolicyFactory.java | 3 --- .../java/org/apache/beam/io/requestresponse/Call.java | 3 --- .../org/apache/beam/io/requestresponse/Repeater.java | 9 ++------- 4 files changed, 3 insertions(+), 23 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 44286bd85f00..968664a9b874 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.client; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import java.io.IOException; import java.io.PrintWriter; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -211,8 +210,6 @@ private void startStream() { // Shutdown the stream to clean up any dangling resources and pending requests. shutdown(); break; - } catch (IOException ioe) { - // Keep trying to create the stream. } } } @@ -375,11 +372,7 @@ private class ResponseObserver implements StreamObserver { @Override public void onNext(ResponseT response) { - try { - backoff.reset(); - } catch (IOException e) { - // Ignore. - } + backoff.reset(); debugMetrics.recordResponse(); onResponse(response); } @@ -400,8 +393,6 @@ public void onError(Throwable t) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; - } catch (IOException e) { - // Ignore. } executeSafely(AbstractWindmillStream.this::startStream); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java index f64b76e9ba7b..a2ba0cf666b6 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java @@ -27,8 +27,6 @@ import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for @@ -91,7 +89,6 @@ public void onSuccess(List records) throws InterruptedException { * response is empty or if the consumer is throttled by AWS. */ class DefaultRateLimiter implements RateLimitPolicy { - private static final Logger LOG = LoggerFactory.getLogger(DefaultRateLimiter.class); private final Sleeper sleeper; private final BackOff emptySuccess; private final BackOff throttled; diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index f9c1a23e64fe..ab73946534cd 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; -import java.io.IOException; import java.io.Serializable; import java.util.Optional; import java.util.concurrent.Callable; @@ -353,8 +352,6 @@ private void backoffIfNeeded(BackOff backOff, Sleeper sleeper) { incIfPresent(sleeperCounter); sleeper.sleep(backOff.nextBackOffMillis()); } catch (InterruptedException ignored) { - } catch (IOException e) { - throw new RuntimeException(e); } } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java index 69364d85887e..e9a7666d2a1d 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; -import java.io.IOException; import java.util.Optional; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.util.BackOff; @@ -119,12 +118,8 @@ OutputT apply(InputT input) throws UserCodeExecutionException { latestError = Optional.of(e); } catch (InterruptedException ignored) { } - try { - incIfPresent(getBackoffCounter()); - waitFor = getBackOff().nextBackOffMillis(); - } catch (IOException e) { - throw new UserCodeExecutionException(e); - } + incIfPresent(getBackoffCounter()); + waitFor = getBackOff().nextBackOffMillis(); } throw latestError.orElse( new UserCodeExecutionException("failed to process for input: " + input));