@@ -55,6 +55,7 @@ public class PubSubDirectDStream<T> extends InputDStream<T> implements Streaming
55
55
56
56
private static final Logger LOG = LoggerFactory .getLogger (PubSubDirectDStream .class );
57
57
private static final String CDAP_PIPELINE = "cdap_pipeline" ;
58
+ private static final int MAX_SNAPSHOT_ATTEMPTS = 3 ;
58
59
59
60
private final Credentials credentials ;
60
61
private final PubSubSubscriberConfig config ;
@@ -64,6 +65,7 @@ public class PubSubDirectDStream<T> extends InputDStream<T> implements Streaming
64
65
private final SerializableFunction <PubSubMessage , T > mappingFn ;
65
66
private final StreamingContext streamingContext ;
66
67
private final String pipeline ;
68
+ private final BackoffConfig backoffConfig ;
67
69
68
70
private SubscriptionAdminClient subscriptionAdminClient ;
69
71
private ProjectSnapshotName currentSnapshotName ;
@@ -82,6 +84,7 @@ public PubSubDirectDStream(io.cdap.cdap.etl.api.streaming.StreamingContext conte
82
84
this .pipeline = context .getPipelineName ();
83
85
this .credentials = PubSubSubscriberUtil .createCredentials (config .getServiceAccount (),
84
86
config .isServiceAccountFilePath ());
87
+ backoffConfig = BackoffConfig .defaultInstance ();
85
88
}
86
89
87
90
@ Override
@@ -195,7 +198,7 @@ private String generateName(String subscription) {
195
198
}
196
199
197
200
private void createSubscriptionIfNotPresent () throws IOException , InterruptedException {
198
- PubSubSubscriberUtil .createSubscription (() -> true , BackoffConfig . defaultInstance () ,
201
+ PubSubSubscriberUtil .createSubscription (() -> true , backoffConfig ,
199
202
ProjectSubscriptionName .format (config .getProject (),
200
203
config .getSubscription ()),
201
204
TopicName .format (config .getProject (), config .getTopic ()),
@@ -243,15 +246,21 @@ private ProjectSnapshotName fetchSnapShot(String subscriptionId,
243
246
244
247
private Snapshot createSnapshot (ProjectSnapshotName projectSnapshotName ,
245
248
ProjectSubscriptionName projectSubscriptionName ) {
246
- // Creation takes around 3.5 s .
247
249
LOG .debug ("Creating snapshot {} for subscription {} in Pub/Sub ." , projectSnapshotName .toString (),
248
250
projectSubscriptionName .toString ());
249
- CreateSnapshotRequest request = CreateSnapshotRequest .newBuilder ()
250
- .setName (projectSnapshotName .toString ())
251
- .setSubscription (projectSubscriptionName .toString ())
252
- .putAllLabels (Collections .singletonMap (CDAP_PIPELINE , getLabelValue (pipeline )))
253
- .build ();
254
- return subscriptionAdminClient .createSnapshot (request );
251
+ try {
252
+ return PubSubSubscriberUtil .callWithRetry (() -> {
253
+ // Creation takes around 3.5 s .
254
+ CreateSnapshotRequest request = CreateSnapshotRequest .newBuilder ()
255
+ .setName (projectSnapshotName .toString ())
256
+ .setSubscription (projectSubscriptionName .toString ())
257
+ .putAllLabels (Collections .singletonMap (CDAP_PIPELINE , getLabelValue (pipeline )))
258
+ .build ();
259
+ return subscriptionAdminClient .createSnapshot (request );
260
+ }, backoffConfig , MAX_SNAPSHOT_ATTEMPTS );
261
+ } catch (Exception e ) {
262
+ throw new RuntimeException (e );
263
+ }
255
264
}
256
265
257
266
@ VisibleForTesting
@@ -268,9 +277,16 @@ static String getLabelValue(String pipeline) {
268
277
}
269
278
270
279
private void deleteSnapshot (ProjectSnapshotName projectSnapshotName ) {
271
- // Deletion takes around 2.5 s .
272
- // TODO - Consider making this asynchronous
273
- subscriptionAdminClient .deleteSnapshot (projectSnapshotName );
280
+ try {
281
+ PubSubSubscriberUtil .callWithRetry (() -> {
282
+ // Deletion takes around 2.5 s .
283
+ // TODO - Consider making this asynchronous
284
+ subscriptionAdminClient .deleteSnapshot (projectSnapshotName );
285
+ return null ;
286
+ }, backoffConfig , MAX_SNAPSHOT_ATTEMPTS );
287
+ } catch (Exception e ) {
288
+ throw new RuntimeException (e );
289
+ }
274
290
}
275
291
276
292
private void saveSnapshotAsState (Snapshot snapshot , String subscription ,
0 commit comments