Skip to content

Commit 8278918

Browse files
committed
Get shard iterator at sequence number for last shard iterators on ending sequence numbers
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 2b5daf0 commit 8278918

File tree

5 files changed

+26
-6
lines changed

5 files changed

+26
-6
lines changed

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/LeaderProgressState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ public List<String> getStreamArns() {
2727
public void setStreamArns(List<String> streamArns) {
2828
this.streamArns = streamArns;
2929
}
30+
3031
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,5 +362,4 @@ private void createExportPartition(String tableArn, Instant exportTime, String b
362362
ExportPartition exportPartition = new ExportPartition(tableArn, exportTime, Optional.of(exportProgressState));
363363
coordinator.createPartition(exportPartition);
364364
}
365-
366365
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ public Runnable createConsumer(final StreamPartition streamPartition,
8686
// If ending sequence number is present, get the shardIterator for last record
8787
String endingSequenceNumber = progressState.get().getEndingSequenceNumber();
8888
if (endingSequenceNumber != null && !endingSequenceNumber.isEmpty()) {
89-
lastShardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), endingSequenceNumber);
89+
lastShardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), endingSequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
9090
}
9191
}
9292

93-
String shardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber);
93+
String shardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
9494
if (shardIterator == null) {
9595
LOG.error("Failed to start consuming shard '{}'. Unable to get a shard iterator for this shard, this shard may have expired", streamPartition.getShardId());
9696
return null;
@@ -136,7 +136,7 @@ private TableInfo getTableInfo(String tableArn) {
136136
* @param sequenceNumber The last Sequence Number processed if any
137137
* @return A shard iterator.
138138
*/
139-
public String getShardIterator(String streamArn, String shardId, String sequenceNumber) {
139+
public String getShardIterator(String streamArn, String shardId, String sequenceNumber, ShardIteratorType shardIteratorType) {
140140
LOG.debug("Get Initial Shard Iter for {}", shardId);
141141
GetShardIteratorRequest getShardIteratorRequest;
142142

@@ -145,7 +145,7 @@ public String getShardIterator(String streamArn, String shardId, String sequence
145145
getShardIteratorRequest = GetShardIteratorRequest.builder()
146146
.shardId(shardId)
147147
.streamArn(streamArn)
148-
.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
148+
.shardIteratorType(shardIteratorType)
149149
.sequenceNumber(sequenceNumber)
150150
.build();
151151
} else {

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,14 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE
189189
leaderPartition = new LeaderPartition();
190190
leaderPartition.getProgressState().get().setInitialized(true);
191191
leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn));
192+
193+
final String expectedEndingSequenceNumber = UUID.randomUUID().toString();
194+
when(shardManager.getEndingSequenceNumber(anyString())).thenReturn(expectedEndingSequenceNumber);
192195
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
193196

197+
final ArgumentCaptor<StreamPartition> createdPartitionCaptor = ArgumentCaptor.forClass(StreamPartition.class);
198+
when(coordinator.createPartition(createdPartitionCaptor.capture())).thenReturn(true);
199+
194200
ExecutorService executorService = Executors.newSingleThreadExecutor();
195201
executorService.submit(() -> leaderScheduler.run());
196202

@@ -207,6 +213,10 @@ void test_shardDiscovery_should_create_children_partitions() throws InterruptedE
207213

208214
verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
209215

216+
final List<StreamPartition> createdPartitions = createdPartitionCaptor.getAllValues();
217+
createdPartitions.forEach(streamPartition -> {
218+
assertThat(streamPartition.getProgressState().get().getEndingSequenceNumber(), equalTo(expectedEndingSequenceNumber));
219+
});
210220
}
211221

212222
@Test

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.time.Duration;
3434
import java.time.Instant;
35+
import java.util.List;
3536
import java.util.Optional;
3637
import java.util.UUID;
3738

@@ -193,7 +194,16 @@ public void test_create_shardConsumer_for_closedShards() {
193194
Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), shardAcknowledgementManager);
194195
assertThat(consumer, notNullValue());
195196
// Should get iterators twice
196-
verify(dynamoDbStreamsClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class));
197+
198+
final ArgumentCaptor<GetShardIteratorRequest> captor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
199+
verify(dynamoDbStreamsClient, times(2)).getShardIterator(captor.capture());
200+
final List<GetShardIteratorRequest> getShardIteratorRequests = captor.getAllValues();
201+
202+
final GetShardIteratorRequest lastShardIteratorRequest = getShardIteratorRequests.get(0);
203+
assertThat(lastShardIteratorRequest.shardIteratorType(), equalTo(ShardIteratorType.AT_SEQUENCE_NUMBER));
204+
205+
final GetShardIteratorRequest getShardIteratorRequest = getShardIteratorRequests.get(1);
206+
assertThat(getShardIteratorRequest.shardIteratorType(), equalTo(ShardIteratorType.TRIM_HORIZON));
197207

198208
verify(streamApiInvocations, times(2)).increment();
199209

0 commit comments

Comments
 (0)