diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java index 888705f22a..ffaa1c83cb 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java @@ -12,6 +12,9 @@ public class LeaderProgressState { @JsonProperty("streamArns") private List streamArns; + @JsonProperty("completed_first_discovery") + private boolean completedFirstDiscovery = false; + public boolean isInitialized() { return initialized; } @@ -27,4 +30,13 @@ public List getStreamArns() { public void setStreamArns(List streamArns) { this.streamArns = streamArns; } + + public boolean isCompletedFirstDiscovery() { + return completedFirstDiscovery; + } + + public void setCompletedFirstDiscovery(boolean completedFirstDiscovery) { + this.completedFirstDiscovery = completedFirstDiscovery; + } + } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java index 464c20d0ec..2e3f48fabf 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java @@ -57,6 +57,7 @@ public class LeaderScheduler implements Runnable { private LeaderPartition leaderPartition; private List streamArns; + private Map isExportDoneForStreamArns; public LeaderScheduler(EnhancedSourceCoordinator coordinator, DynamoDbClient dynamoDbClient, ShardManager shardManager, List tableConfigs) { this(coordinator, dynamoDbClient, shardManager, tableConfigs, DEFAULT_LEASE_INTERVAL); @@ -116,6 +117,7 @@ public void run() { // Step 3: Find and create children partitions. compareAndCreateChildrenPartitions(sourcePartitions); + leaderPartition.getProgressState().get().setCompletedFirstDiscovery(true); } } @@ -337,7 +339,11 @@ private void createChildStreamPartition(StreamPartition streamPartition, String StreamProgressState parentStreamProgressState = streamPartition.getProgressState().get(); StreamProgressState streamProgressState = new StreamProgressState(); streamProgressState.setStartTime(parentStreamProgressState.getStartTime()); - streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId)); + if (shouldSetEndingSequenceNumber()) { + streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId)); + } else { + streamProgressState.setEndingSequenceNumber(null); + } streamProgressState.setWaitForExport(parentStreamProgressState.shouldWaitForExport()); StreamPartition partition = new StreamPartition(streamPartition.getStreamArn(), childShardId, Optional.of(streamProgressState)); coordinator.createPartition(partition); @@ -363,4 +369,7 @@ private void createExportPartition(String tableArn, Instant exportTime, String b coordinator.createPartition(exportPartition); } + private boolean shouldSetEndingSequenceNumber() { + return !leaderPartition.getProgressState().get().isCompletedFirstDiscovery(); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java index ac523a29d9..4c6b1d27b6 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java @@ -189,9 +189,53 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(true); leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn)); + leaderPartition.getProgressState().get().setCompletedFirstDiscovery(false); + + final String expectedEndingSequenceNumber = UUID.randomUUID().toString(); + when(shardManager.getEndingSequenceNumber(anyString())).thenReturn(expectedEndingSequenceNumber); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + + final ArgumentCaptor createdPartitionCaptor = ArgumentCaptor.forClass(StreamPartition.class); + when(coordinator.createPartition(createdPartitionCaptor.capture())).thenReturn(true); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> leaderScheduler.run()); + + Thread.sleep(100); + executorService.shutdownNow(); + // Already init + verifyNoInteractions(dynamoDbClient); + + // Should check the completed partitions + verify(coordinator).queryCompletedPartitions(eq(StreamPartition.PARTITION_TYPE), any(Instant.class)); + + // Should create 3 stream partitions for child shards found + verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class)); + + verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + + final List createdPartitions = createdPartitionCaptor.getAllValues(); + createdPartitions.forEach(streamPartition -> { + assertThat(streamPartition.getProgressState().get().getEndingSequenceNumber(), equalTo(expectedEndingSequenceNumber)); + }); + + assertThat(leaderPartition.getProgressState().get().isCompletedFirstDiscovery(), equalTo(true)); + + } + + @Test + void test_shardDiscovery_should_create_children_partitions_with_null_ending_sequence_number_when_already_first_discovery_completed() throws InterruptedException { + leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, List.of(tableConfig)); + leaderPartition = new LeaderPartition(); + leaderPartition.getProgressState().get().setInitialized(true); + leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn)); + leaderPartition.getProgressState().get().setCompletedFirstDiscovery(true); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ArgumentCaptor createdPartitionCaptor = ArgumentCaptor.forClass(StreamPartition.class); + when(coordinator.createPartition(createdPartitionCaptor.capture())).thenReturn(true); + executorService.submit(() -> leaderScheduler.run()); Thread.sleep(100); @@ -207,6 +251,10 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + final List createdPartitions = createdPartitionCaptor.getAllValues(); + createdPartitions.forEach(streamPartition -> { + assertThat(streamPartition.getProgressState().get().getEndingSequenceNumber(), equalTo(null)); + }); } @Test