Skip to content

Commit 10e07a1

Browse files
committed
Deserialize change stream proto based (mutable key range) return types.
1 parent bb696f5 commit 10e07a1

File tree

3 files changed

+0
-40
lines changed

3 files changed

+0
-40
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4545
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4646
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
48-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
4947
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
5048
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
5149
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -306,21 +304,9 @@ ChangeStreamRecord parseProtoPartitionEventRecord(
306304
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
307305
final Timestamp commitTimestamp =
308306
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
309-
List<MoveInEvent> moveInEvents = new ArrayList<>();
310-
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent
311-
moveInEventProto : partitionEventRecordProto.getMoveInEventsList()) {
312-
moveInEvents.add(new MoveInEvent(moveInEventProto.getSourcePartitionToken()));
313-
}
314-
List<MoveOutEvent> moveOutEvents = new ArrayList<>();
315-
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent
316-
moveOutEventProto : partitionEventRecordProto.getMoveOutEventsList()) {
317-
moveOutEvents.add(new MoveOutEvent(moveOutEventProto.getDestinationPartitionToken()));
318-
}
319307
return new PartitionEventRecord(
320308
commitTimestamp,
321309
partitionEventRecordProto.getRecordSequence(),
322-
moveInEvents,
323-
moveOutEvents,
324310
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
325311
}
326312

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4343
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4444
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
45-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
46-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
4745
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
4846
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4947
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -987,8 +985,6 @@ public void testMappingProtoRowToPartitionEventRecord() {
987985
new PartitionEventRecord(
988986
Timestamp.MIN_VALUE,
989987
"fakeRecordSequence",
990-
Arrays.asList(new MoveInEvent("token1"), new MoveInEvent("token2")),
991-
Arrays.asList(new MoveOutEvent("token1"), new MoveOutEvent("token2")),
992988
null);
993989
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
994990
recordToProto(partitionEventRecord);

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestProtoMapper.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
3232
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
3333
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
34-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
35-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
3634
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
3735
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
3836
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
@@ -88,30 +86,10 @@ private static com.google.spanner.v1.ChangeStreamRecord convertPartitionEndRecor
8886

8987
private static com.google.spanner.v1.ChangeStreamRecord convertPartitionEventRecordToProto(
9088
PartitionEventRecord partitionEventRecord) {
91-
List<com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent>
92-
moveInEventsProto = new ArrayList<>();
93-
for (MoveInEvent moveInEvent : partitionEventRecord.getMoveInEvents()) {
94-
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent moveInEventProto =
95-
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent.newBuilder()
96-
.setSourcePartitionToken(moveInEvent.getSourcePartitionToken())
97-
.build();
98-
moveInEventsProto.add(moveInEventProto);
99-
}
100-
List<com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent>
101-
moveOutEventsProto = new ArrayList<>();
102-
for (MoveOutEvent moveOutEvent : partitionEventRecord.getMoveOutEvents()) {
103-
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent moveOutEventProto =
104-
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent.newBuilder()
105-
.setDestinationPartitionToken(moveOutEvent.getDestinationPartitionToken())
106-
.build();
107-
moveOutEventsProto.add(moveOutEventProto);
108-
}
10989
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto =
11090
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.newBuilder()
11191
.setCommitTimestamp(partitionEventRecord.getCommitTimestamp().toProto())
11292
.setRecordSequence(partitionEventRecord.getRecordSequence())
113-
.addAllMoveInEvents(moveInEventsProto)
114-
.addAllMoveOutEvents(moveOutEventsProto)
11593
.build();
11694
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
11795
com.google.spanner.v1.ChangeStreamRecord.newBuilder()

0 commit comments

Comments
 (0)