Skip to content

Commit 684c368

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (#35028) Deserialize change stream proto based (mutable key range) return types.
1 parent dc213a9 commit 684c368

File tree

6 files changed

+621
-3
lines changed

6 files changed

+621
-3
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,7 @@ class BeamModulePlugin implements Plugin<Project> {
653653
def arrow_version = "15.0.2"
654654
def jmh_version = "1.34"
655655
def jupiter_version = "5.7.0"
656+
def spanner_grpc_proto_version = "6.95.1"
656657

657658
// Export Spark versions, so they are defined in a single place only
658659
project.ext.spark3_version = spark3_version
@@ -862,7 +863,7 @@ class BeamModulePlugin implements Plugin<Project> {
862863
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
863864
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
864865
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
865-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
866+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
866867
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
868869
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,26 @@ public Struct getCurrentRowAsStruct() {
108108
return resultSet.getCurrentRowAsStruct();
109109
}
110110

111+
/**
112+
* Returns the only change stream record proto at the current pointer of the result set. It also
113+
* updates the timestamp at which the record was read. This function enhances the getProtoMessage
114+
* function but only focus on the ChangeStreamRecord type.
115+
*
116+
* @return a change stream record as a proto or null
117+
*/
118+
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord() {
119+
recordReadAt = Timestamp.now();
120+
return resultSet.getProtoMessage(
121+
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
122+
}
123+
124+
/** Returns true if the result set at the current pointer contain only one proto change record. */
125+
boolean isProtoChangeRecord() {
126+
return resultSet.getColumnCount() == 1
127+
&& !resultSet.isNull(0)
128+
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
129+
}
130+
111131
/**
112132
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
113133
* which the record was read.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 212 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -42,7 +44,10 @@
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4446
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4549
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
50+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4651
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4752
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4853
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -223,12 +228,218 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223228
return Collections.singletonList(
224229
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225230
}
226-
// In GoogleSQL, change stream records are returned as an array of structs.
231+
232+
// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
233+
if (resultSet.isProtoChangeRecord()) {
234+
return Arrays.asList(
235+
toChangeStreamRecord(
236+
partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata));
237+
}
238+
239+
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
240+
// of structs.
227241
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228242
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229243
.collect(Collectors.toList());
230244
}
231245

246+
ChangeStreamRecord toChangeStreamRecord(
247+
PartitionMetadata partition,
248+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto,
249+
ChangeStreamResultSetMetadata resultSetMetadata) {
250+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
251+
return parseProtoPartitionStartRecord(
252+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
253+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
254+
return parseProtoPartitionEndRecord(
255+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
256+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
257+
return parseProtoPartitionEventRecord(
258+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
259+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
260+
return parseProtoHeartbeatRecord(
261+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
262+
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
263+
return parseProtoDataChangeRecord(
264+
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
265+
} else {
266+
throw new IllegalArgumentException(
267+
"Unknown change stream record type " + changeStreamRecordProto.toString());
268+
}
269+
}
270+
271+
ChangeStreamRecord parseProtoPartitionStartRecord(
272+
PartitionMetadata partition,
273+
ChangeStreamResultSetMetadata resultSetMetadata,
274+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
275+
final Timestamp startTimestamp =
276+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
277+
return new PartitionStartRecord(
278+
startTimestamp,
279+
partitionStartRecordProto.getRecordSequence(),
280+
partitionStartRecordProto.getPartitionTokensList(),
281+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
282+
}
283+
284+
ChangeStreamRecord parseProtoPartitionEndRecord(
285+
PartitionMetadata partition,
286+
ChangeStreamResultSetMetadata resultSetMetadata,
287+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
288+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
289+
return new PartitionEndRecord(
290+
endTimestamp,
291+
partitionEndRecordProto.getRecordSequence(),
292+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
293+
}
294+
295+
ChangeStreamRecord parseProtoPartitionEventRecord(
296+
PartitionMetadata partition,
297+
ChangeStreamResultSetMetadata resultSetMetadata,
298+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
299+
final Timestamp commitTimestamp =
300+
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
301+
return new PartitionEventRecord(
302+
commitTimestamp,
303+
partitionEventRecordProto.getRecordSequence(),
304+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
305+
}
306+
307+
ChangeStreamRecord parseProtoHeartbeatRecord(
308+
PartitionMetadata partition,
309+
ChangeStreamResultSetMetadata resultSetMetadata,
310+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
311+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
312+
return new HeartbeatRecord(
313+
heartbeatTimestamp,
314+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
315+
}
316+
317+
ChangeStreamRecord parseProtoDataChangeRecord(
318+
PartitionMetadata partition,
319+
ChangeStreamResultSetMetadata resultSetMetadata,
320+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
321+
final Timestamp commitTimestamp =
322+
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
323+
return new DataChangeRecord(
324+
partition.getPartitionToken(),
325+
commitTimestamp,
326+
dataChangeRecordProto.getServerTransactionId(),
327+
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
328+
dataChangeRecordProto.getRecordSequence(),
329+
dataChangeRecordProto.getTable(),
330+
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
331+
parseProtoMod(
332+
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
333+
parseProtoModType(dataChangeRecordProto.getModType()),
334+
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
335+
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
336+
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
337+
dataChangeRecordProto.getTransactionTag(),
338+
dataChangeRecordProto.getIsSystemTransaction(),
339+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
340+
}
341+
342+
List<ColumnType> parseProtoColumnMetadata(
343+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
344+
columnMetadataProtos) {
345+
List<ColumnType> columnTypes = new ArrayList<>();
346+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
347+
columnMetadataProto : columnMetadataProtos) {
348+
// TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`.
349+
String typeCodeJson;
350+
try {
351+
typeCodeJson = this.printer.print(columnMetadataProto.getType());
352+
} catch (InvalidProtocolBufferException exc) {
353+
throw new IllegalArgumentException(
354+
"Failed to print type: " + columnMetadataProto.getType().toString());
355+
}
356+
ColumnType columnType =
357+
new ColumnType(
358+
columnMetadataProto.getName(),
359+
new TypeCode(typeCodeJson),
360+
columnMetadataProto.getIsPrimaryKey(),
361+
columnMetadataProto.getOrdinalPosition());
362+
columnTypes.add(columnType);
363+
}
364+
return columnTypes;
365+
}
366+
367+
String convertModValueProtosToJson(
368+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
369+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
370+
columnMetadataProtos) {
371+
com.google.protobuf.Struct.Builder modStructValueBuilder =
372+
com.google.protobuf.Struct.newBuilder();
373+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
374+
modValueProtos) {
375+
final String columnName =
376+
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
377+
final Value columnValue = modValueProto.getValue();
378+
modStructValueBuilder.putFields(columnName, columnValue);
379+
}
380+
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
381+
String modValueJson;
382+
try {
383+
modValueJson = this.printer.print(modStructValue);
384+
} catch (InvalidProtocolBufferException exc) {
385+
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
386+
}
387+
return modValueJson;
388+
}
389+
390+
List<Mod> parseProtoMod(
391+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
392+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
393+
columnMetadataProtos) {
394+
List<Mod> mods = new ArrayList<>();
395+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
396+
final String keysJson =
397+
convertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
398+
final String oldValuesJson =
399+
convertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
400+
final String newValuesJson =
401+
convertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
402+
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
403+
mods.add(mod);
404+
}
405+
return mods;
406+
}
407+
408+
ModType parseProtoModType(
409+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
410+
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
411+
return ModType.INSERT;
412+
} else if (modTypeProto
413+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
414+
return ModType.UPDATE;
415+
} else if (modTypeProto
416+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
417+
return ModType.DELETE;
418+
}
419+
return ModType.UNKNOWN;
420+
}
421+
422+
ValueCaptureType parseProtoValueCaptureType(
423+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
424+
valueCaptureTypeProto) {
425+
if (valueCaptureTypeProto
426+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
427+
return ValueCaptureType.NEW_ROW;
428+
} else if (valueCaptureTypeProto
429+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
430+
return ValueCaptureType.NEW_VALUES;
431+
} else if (valueCaptureTypeProto
432+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
433+
.OLD_AND_NEW_VALUES) {
434+
return ValueCaptureType.OLD_AND_NEW_VALUES;
435+
} else if (valueCaptureTypeProto
436+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
437+
.NEW_ROW_AND_OLD_VALUES) {
438+
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
439+
}
440+
return ValueCaptureType.UNKNOWN;
441+
}
442+
232443
Stream<ChangeStreamRecord> toChangeStreamRecord(
233444
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234445

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public String toString() {
288288
+ '\''
289289
+ ", isSystemTransaction="
290290
+ isSystemTransaction
291-
+ ", metadata"
291+
+ ", metadata="
292292
+ metadata
293293
+ '}';
294294
}

0 commit comments

Comments
 (0)