Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public class LeaderProgressState {
@JsonProperty("streamArns")
private List<String> streamArns;

@JsonProperty("completed_first_discovery")
private boolean completedFirstDiscovery = false;

public boolean isInitialized() {
return initialized;
}
Expand All @@ -27,4 +30,13 @@ public List<String> getStreamArns() {
public void setStreamArns(List<String> streamArns) {
this.streamArns = streamArns;
}

public boolean isCompletedFirstDiscovery() {
return completedFirstDiscovery;
}

public void setCompletedFirstDiscovery(boolean completedFirstDiscovery) {
this.completedFirstDiscovery = completedFirstDiscovery;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class LeaderScheduler implements Runnable {
private LeaderPartition leaderPartition;

private List<String> streamArns;
private Map<String, Boolean> isExportDoneForStreamArns;

public LeaderScheduler(EnhancedSourceCoordinator coordinator, DynamoDbClient dynamoDbClient, ShardManager shardManager, List<TableConfig> tableConfigs) {
this(coordinator, dynamoDbClient, shardManager, tableConfigs, DEFAULT_LEASE_INTERVAL);
Expand Down Expand Up @@ -116,6 +117,7 @@ public void run() {

// Step 3: Find and create children partitions.
compareAndCreateChildrenPartitions(sourcePartitions);
leaderPartition.getProgressState().get().setCompletedFirstDiscovery(true);
}

}
Expand Down Expand Up @@ -337,7 +339,11 @@ private void createChildStreamPartition(StreamPartition streamPartition, String
StreamProgressState parentStreamProgressState = streamPartition.getProgressState().get();
StreamProgressState streamProgressState = new StreamProgressState();
streamProgressState.setStartTime(parentStreamProgressState.getStartTime());
streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId));
if (shouldSetEndingSequenceNumber()) {
streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId));
} else {
streamProgressState.setEndingSequenceNumber(null);
}
streamProgressState.setWaitForExport(parentStreamProgressState.shouldWaitForExport());
StreamPartition partition = new StreamPartition(streamPartition.getStreamArn(), childShardId, Optional.of(streamProgressState));
coordinator.createPartition(partition);
Expand All @@ -363,4 +369,7 @@ private void createExportPartition(String tableArn, Instant exportTime, String b
coordinator.createPartition(exportPartition);
}

private boolean shouldSetEndingSequenceNumber() {
return !leaderPartition.getProgressState().get().isCompletedFirstDiscovery();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,53 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE
leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(true);
leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn));
leaderPartition.getProgressState().get().setCompletedFirstDiscovery(false);

final String expectedEndingSequenceNumber = UUID.randomUUID().toString();
when(shardManager.getEndingSequenceNumber(anyString())).thenReturn(expectedEndingSequenceNumber);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));

final ArgumentCaptor<StreamPartition> createdPartitionCaptor = ArgumentCaptor.forClass(StreamPartition.class);
when(coordinator.createPartition(createdPartitionCaptor.capture())).thenReturn(true);

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> leaderScheduler.run());

Thread.sleep(100);
executorService.shutdownNow();
// Already init
verifyNoInteractions(dynamoDbClient);

// Should check the completed partitions
verify(coordinator).queryCompletedPartitions(eq(StreamPartition.PARTITION_TYPE), any(Instant.class));

// Should create 3 stream partitions for child shards found
verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class));

verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

final List<StreamPartition> createdPartitions = createdPartitionCaptor.getAllValues();
createdPartitions.forEach(streamPartition -> {
assertThat(streamPartition.getProgressState().get().getEndingSequenceNumber(), equalTo(expectedEndingSequenceNumber));
});

assertThat(leaderPartition.getProgressState().get().isCompletedFirstDiscovery(), equalTo(true));

}

@Test
void test_shardDiscovery_should_create_children_partitions_with_null_ending_sequence_number_when_already_first_discovery_completed() throws InterruptedException {
leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, List.of(tableConfig));
leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(true);
leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn));
leaderPartition.getProgressState().get().setCompletedFirstDiscovery(true);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));

ExecutorService executorService = Executors.newSingleThreadExecutor();
final ArgumentCaptor<StreamPartition> createdPartitionCaptor = ArgumentCaptor.forClass(StreamPartition.class);
when(coordinator.createPartition(createdPartitionCaptor.capture())).thenReturn(true);

executorService.submit(() -> leaderScheduler.run());

Thread.sleep(100);
Expand All @@ -207,6 +251,10 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE

verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

final List<StreamPartition> createdPartitions = createdPartitionCaptor.getAllValues();
createdPartitions.forEach(streamPartition -> {
assertThat(streamPartition.getProgressState().get().getEndingSequenceNumber(), equalTo(null));
});
}

@Test
Expand Down
Loading