Skip to content

Commit f848470

Browse files
committed
Do not set ending sequence number for child shard partitions discovered after initial scan for shards
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 2b5daf0 commit f848470

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ public class LeaderProgressState {
1212
@JsonProperty("streamArns")
1313
private List<String> streamArns;
1414

15+
@JsonProperty("completed_first_discovery")
16+
private boolean completedFirstDiscovery = false;
17+
1518
public boolean isInitialized() {
1619
return initialized;
1720
}
@@ -27,4 +30,13 @@ public List<String> getStreamArns() {
2730
public void setStreamArns(List<String> streamArns) {
2831
this.streamArns = streamArns;
2932
}
33+
34+
public boolean isCompletedFirstDiscovery() {
35+
return completedFirstDiscovery;
36+
}
37+
38+
public void setCompletedFirstDiscovery(boolean completedFirstDiscovery) {
39+
this.completedFirstDiscovery = completedFirstDiscovery;
40+
}
41+
3042
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class LeaderScheduler implements Runnable {
5757
private LeaderPartition leaderPartition;
5858

5959
private List<String> streamArns;
60+
private Map<String, Boolean> isExportDoneForStreamArns;
6061

6162
public LeaderScheduler(EnhancedSourceCoordinator coordinator, DynamoDbClient dynamoDbClient, ShardManager shardManager, List<TableConfig> tableConfigs) {
6263
this(coordinator, dynamoDbClient, shardManager, tableConfigs, DEFAULT_LEASE_INTERVAL);
@@ -116,6 +117,7 @@ public void run() {
116117

117118
// Step 3: Find and create children partitions.
118119
compareAndCreateChildrenPartitions(sourcePartitions);
120+
leaderPartition.getProgressState().get().setCompletedFirstDiscovery(true);
119121
}
120122

121123
}
@@ -337,7 +339,11 @@ private void createChildStreamPartition(StreamPartition streamPartition, String
337339
StreamProgressState parentStreamProgressState = streamPartition.getProgressState().get();
338340
StreamProgressState streamProgressState = new StreamProgressState();
339341
streamProgressState.setStartTime(parentStreamProgressState.getStartTime());
340-
streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId));
342+
if (shouldSetEndingSequenceNumber()) {
343+
streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId));
344+
} else {
345+
streamProgressState.setEndingSequenceNumber(null);
346+
}
341347
streamProgressState.setWaitForExport(parentStreamProgressState.shouldWaitForExport());
342348
StreamPartition partition = new StreamPartition(streamPartition.getStreamArn(), childShardId, Optional.of(streamProgressState));
343349
coordinator.createPartition(partition);
@@ -363,4 +369,7 @@ private void createExportPartition(String tableArn, Instant exportTime, String b
363369
coordinator.createPartition(exportPartition);
364370
}
365371

372+
private boolean shouldSetEndingSequenceNumber() {
373+
return leaderPartition.getProgressState().get().isCompletedFirstDiscovery();
374+
}
366375
}

0 commit comments

Comments
 (0)