Skip to content

Commit bb696f5

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (#35028)
1 parent dc213a9 commit bb696f5

File tree

6 files changed

+691
-3
lines changed

6 files changed

+691
-3
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,7 @@ class BeamModulePlugin implements Plugin<Project> {
653653
def arrow_version = "15.0.2"
654654
def jmh_version = "1.34"
655655
def jupiter_version = "5.7.0"
656+
def spanner_grpc_proto_version = "6.95.1"
656657

657658
// Export Spark versions, so they are defined in a single place only
658659
project.ext.spark3_version = spark3_version
@@ -862,7 +863,7 @@ class BeamModulePlugin implements Plugin<Project> {
862863
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
863864
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
864865
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
865-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
866+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
866867
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
868869
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.Timestamp;
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Struct;
23+
import com.google.cloud.spanner.Type;
2324
import org.joda.time.Duration;
2425

2526
/**
@@ -108,6 +109,48 @@ public Struct getCurrentRowAsStruct() {
108109
return resultSet.getCurrentRowAsStruct();
109110
}
110111

112+
/**
113+
* Returns the change stream record proto at the current pointer of the result set. It also
114+
* updates the timestamp at which the record was read. This function enhances the getProtoMessage
115+
* function but only focus on the ChangeStreamRecord type.
116+
*
117+
* @param columnIndex Index of the column.
118+
* @return a change stream record as a proto or null
119+
*/
120+
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord(int columnIndex) {
121+
recordReadAt = Timestamp.now();
122+
return resultSet.getProtoMessage(
123+
columnIndex, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
124+
}
125+
126+
/**
127+
* Get the column count at the current pointer of the result set.
128+
*
129+
* @return the number of columns in the underlying data
130+
*/
131+
public int getColumnCount() {
132+
return resultSet.getColumnCount();
133+
}
134+
135+
/**
136+
* If the column at the current pointer of the result set indexed by the columnIndex is null.
137+
*
138+
* @return true if a column contains a NULL value
139+
*/
140+
public boolean isNull(int columnIndex) {
141+
return resultSet.isNull(columnIndex);
142+
}
143+
144+
/**
145+
* Returns the column type at the current pointer of the result set indexed by the columnIndex.
146+
*
147+
* @param columnIndex Index of the column
148+
* @return the type of a column
149+
*/
150+
public Type getColumnType(int columnIndex) {
151+
return resultSet.getColumnType(columnIndex);
152+
}
153+
111154
/**
112155
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
113156
* which the record was read.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 231 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -42,7 +44,12 @@
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4446
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
49+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
50+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4551
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
52+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4653
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4754
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4855
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -223,12 +230,235 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223230
return Collections.singletonList(
224231
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225232
}
226-
// In GoogleSQL, change stream records are returned as an array of structs.
233+
234+
// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
235+
if (isProtoChangeRecord(resultSet)) {
236+
return Arrays.asList(
237+
fromProtoChangeStreamRecord(
238+
partition, resultSetMetadata, resultSet.getProtoChangeStreamRecord(0)));
239+
}
240+
241+
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
242+
// of structs.
227243
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228244
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229245
.collect(Collectors.toList());
230246
}
231247

248+
boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) {
249+
return currentRow.getColumnCount() == 1
250+
&& !currentRow.isNull(0)
251+
&& currentRow.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
252+
}
253+
254+
ChangeStreamRecord fromProtoChangeStreamRecord(
255+
PartitionMetadata partition,
256+
ChangeStreamResultSetMetadata resultSetMetadata,
257+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto) {
258+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
259+
return parseProtoPartitionStartRecord(
260+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
261+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
262+
return parseProtoPartitionEndRecord(
263+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
264+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
265+
return parseProtoPartitionEventRecord(
266+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
267+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
268+
return parseProtoHeartbeatRecord(
269+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
270+
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
271+
return parseProtoDataChangeRecord(
272+
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
273+
} else {
274+
throw new IllegalArgumentException(
275+
"Unknown change stream record type " + changeStreamRecordProto.toString());
276+
}
277+
}
278+
279+
ChangeStreamRecord parseProtoPartitionStartRecord(
280+
PartitionMetadata partition,
281+
ChangeStreamResultSetMetadata resultSetMetadata,
282+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
283+
final Timestamp startTimestamp =
284+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
285+
return new PartitionStartRecord(
286+
startTimestamp,
287+
partitionStartRecordProto.getRecordSequence(),
288+
partitionStartRecordProto.getPartitionTokensList(),
289+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
290+
}
291+
292+
ChangeStreamRecord parseProtoPartitionEndRecord(
293+
PartitionMetadata partition,
294+
ChangeStreamResultSetMetadata resultSetMetadata,
295+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
296+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
297+
return new PartitionEndRecord(
298+
endTimestamp,
299+
partitionEndRecordProto.getRecordSequence(),
300+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
301+
}
302+
303+
ChangeStreamRecord parseProtoPartitionEventRecord(
304+
PartitionMetadata partition,
305+
ChangeStreamResultSetMetadata resultSetMetadata,
306+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
307+
final Timestamp commitTimestamp =
308+
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
309+
List<MoveInEvent> moveInEvents = new ArrayList<>();
310+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent
311+
moveInEventProto : partitionEventRecordProto.getMoveInEventsList()) {
312+
moveInEvents.add(new MoveInEvent(moveInEventProto.getSourcePartitionToken()));
313+
}
314+
List<MoveOutEvent> moveOutEvents = new ArrayList<>();
315+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent
316+
moveOutEventProto : partitionEventRecordProto.getMoveOutEventsList()) {
317+
moveOutEvents.add(new MoveOutEvent(moveOutEventProto.getDestinationPartitionToken()));
318+
}
319+
return new PartitionEventRecord(
320+
commitTimestamp,
321+
partitionEventRecordProto.getRecordSequence(),
322+
moveInEvents,
323+
moveOutEvents,
324+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
325+
}
326+
327+
ChangeStreamRecord parseProtoHeartbeatRecord(
328+
PartitionMetadata partition,
329+
ChangeStreamResultSetMetadata resultSetMetadata,
330+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
331+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
332+
return new HeartbeatRecord(
333+
heartbeatTimestamp,
334+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
335+
}
336+
337+
ChangeStreamRecord parseProtoDataChangeRecord(
338+
PartitionMetadata partition,
339+
ChangeStreamResultSetMetadata resultSetMetadata,
340+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
341+
final Timestamp commitTimestamp =
342+
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
343+
return new DataChangeRecord(
344+
partition.getPartitionToken(),
345+
commitTimestamp,
346+
dataChangeRecordProto.getServerTransactionId(),
347+
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
348+
dataChangeRecordProto.getRecordSequence(),
349+
dataChangeRecordProto.getTable(),
350+
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
351+
parseProtoMod(
352+
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
353+
parseProtoModType(dataChangeRecordProto.getModType()),
354+
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
355+
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
356+
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
357+
dataChangeRecordProto.getTransactionTag(),
358+
dataChangeRecordProto.getIsSystemTransaction(),
359+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
360+
}
361+
362+
List<ColumnType> parseProtoColumnMetadata(
363+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
364+
columnMetadataProtos) {
365+
List<ColumnType> columnTypes = new ArrayList<>();
366+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
367+
columnMetadataProto : columnMetadataProtos) {
368+
// TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`.
369+
String typeCodeJson;
370+
try {
371+
typeCodeJson = this.printer.print(columnMetadataProto.getType());
372+
} catch (InvalidProtocolBufferException exc) {
373+
throw new IllegalArgumentException(
374+
"Failed to print type: " + columnMetadataProto.getType().toString());
375+
}
376+
ColumnType columnType =
377+
new ColumnType(
378+
columnMetadataProto.getName(),
379+
new TypeCode(typeCodeJson),
380+
columnMetadataProto.getIsPrimaryKey(),
381+
columnMetadataProto.getOrdinalPosition());
382+
columnTypes.add(columnType);
383+
}
384+
return columnTypes;
385+
}
386+
387+
String ConvertModValueProtosToJson(
388+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
389+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
390+
columnMetadataProtos) {
391+
com.google.protobuf.Struct.Builder modStructValueBuilder =
392+
com.google.protobuf.Struct.newBuilder();
393+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
394+
modValueProtos) {
395+
final String columnName =
396+
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
397+
final Value columnValue = modValueProto.getValue();
398+
modStructValueBuilder.putFields(columnName, columnValue);
399+
}
400+
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
401+
String modValueJson;
402+
try {
403+
modValueJson = this.printer.print(modStructValue);
404+
} catch (InvalidProtocolBufferException exc) {
405+
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
406+
}
407+
return modValueJson;
408+
}
409+
410+
List<Mod> parseProtoMod(
411+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
412+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
413+
columnMetadataProtos) {
414+
List<Mod> mods = new ArrayList<>();
415+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
416+
final String keysJson = ConvertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
417+
final String oldValuesJson =
418+
ConvertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
419+
final String newValuesJson =
420+
ConvertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
421+
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
422+
mods.add(mod);
423+
}
424+
return mods;
425+
}
426+
427+
ModType parseProtoModType(
428+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
429+
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
430+
return ModType.INSERT;
431+
} else if (modTypeProto
432+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
433+
return ModType.UPDATE;
434+
} else if (modTypeProto
435+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
436+
return ModType.DELETE;
437+
}
438+
return ModType.UNKNOWN;
439+
}
440+
441+
ValueCaptureType parseProtoValueCaptureType(
442+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
443+
valueCaptureTypeProto) {
444+
if (valueCaptureTypeProto
445+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
446+
return ValueCaptureType.NEW_ROW;
447+
} else if (valueCaptureTypeProto
448+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
449+
return ValueCaptureType.NEW_VALUES;
450+
} else if (valueCaptureTypeProto
451+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
452+
.OLD_AND_NEW_VALUES) {
453+
return ValueCaptureType.OLD_AND_NEW_VALUES;
454+
} else if (valueCaptureTypeProto
455+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
456+
.NEW_ROW_AND_OLD_VALUES) {
457+
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
458+
}
459+
return ValueCaptureType.UNKNOWN;
460+
}
461+
232462
Stream<ChangeStreamRecord> toChangeStreamRecord(
233463
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234464

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public String toString() {
288288
+ '\''
289289
+ ", isSystemTransaction="
290290
+ isSystemTransaction
291-
+ ", metadata"
291+
+ ", metadata="
292292
+ metadata
293293
+ '}';
294294
}

0 commit comments

Comments
 (0)