Skip to content

Commit 254762f

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (#35028) Deserialize change stream proto based (mutable key range) return types.
1 parent dc213a9 commit 254762f

File tree

6 files changed

+650
-3
lines changed

6 files changed

+650
-3
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,7 @@ class BeamModulePlugin implements Plugin<Project> {
653653
def arrow_version = "15.0.2"
654654
def jmh_version = "1.34"
655655
def jupiter_version = "5.7.0"
656+
def spanner_grpc_proto_version = "6.95.1"
656657

657658
// Export Spark versions, so they are defined in a single place only
658659
project.ext.spark3_version = spark3_version
@@ -862,7 +863,7 @@ class BeamModulePlugin implements Plugin<Project> {
862863
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
863864
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
864865
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
865-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
866+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
866867
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
868869
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.Timestamp;
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Struct;
23+
import com.google.cloud.spanner.Type;
2324
import org.joda.time.Duration;
2425

2526
/**
@@ -108,6 +109,48 @@ public Struct getCurrentRowAsStruct() {
108109
return resultSet.getCurrentRowAsStruct();
109110
}
110111

112+
/**
113+
* Returns the change stream record proto at the current pointer of the result set. It also
114+
* updates the timestamp at which the record was read. This function enhances the getProtoMessage
115+
* function but only focus on the ChangeStreamRecord type.
116+
*
117+
* @param columnIndex Index of the column.
118+
* @return a change stream record as a proto or null
119+
*/
120+
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord(int columnIndex) {
121+
recordReadAt = Timestamp.now();
122+
return resultSet.getProtoMessage(
123+
columnIndex, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
124+
}
125+
126+
/**
127+
* Get the column count at the current pointer of the result set.
128+
*
129+
* @return the number of columns in the underlying data
130+
*/
131+
public int getColumnCount() {
132+
return resultSet.getColumnCount();
133+
}
134+
135+
/**
136+
* If the column at the current pointer of the result set indexed by the columnIndex is null.
137+
*
138+
* @return true if a column contains a NULL value
139+
*/
140+
public boolean isNull(int columnIndex) {
141+
return resultSet.isNull(columnIndex);
142+
}
143+
144+
/**
145+
* Returns the column type at the current pointer of the result set indexed by the columnIndex.
146+
*
147+
* @param columnIndex Index of the column
148+
* @return the type of a column
149+
*/
150+
public Type getColumnType(int columnIndex) {
151+
return resultSet.getColumnType(columnIndex);
152+
}
153+
111154
/**
112155
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
113156
* which the record was read.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 218 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -42,7 +44,10 @@
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4446
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4549
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
50+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4651
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4752
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4853
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223228
return Collections.singletonList(
224229
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225230
}
226-
// In GoogleSQL, change stream records are returned as an array of structs.
231+
232+
// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
233+
if (isProtoChangeRecord(resultSet)) {
234+
return Arrays.asList(
235+
fromProtoChangeStreamRecord(
236+
partition, resultSetMetadata, resultSet.getProtoChangeStreamRecord(0)));
237+
}
238+
239+
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
240+
// of structs.
227241
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228242
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229243
.collect(Collectors.toList());
230244
}
231245

246+
boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) {
247+
return currentRow.getColumnCount() == 1
248+
&& !currentRow.isNull(0)
249+
&& currentRow.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
250+
}
251+
252+
ChangeStreamRecord fromProtoChangeStreamRecord(
253+
PartitionMetadata partition,
254+
ChangeStreamResultSetMetadata resultSetMetadata,
255+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto) {
256+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
257+
return parseProtoPartitionStartRecord(
258+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
259+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
260+
return parseProtoPartitionEndRecord(
261+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
262+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
263+
return parseProtoPartitionEventRecord(
264+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
265+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
266+
return parseProtoHeartbeatRecord(
267+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
268+
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
269+
return parseProtoDataChangeRecord(
270+
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
271+
} else {
272+
throw new IllegalArgumentException(
273+
"Unknown change stream record type " + changeStreamRecordProto.toString());
274+
}
275+
}
276+
277+
ChangeStreamRecord parseProtoPartitionStartRecord(
278+
PartitionMetadata partition,
279+
ChangeStreamResultSetMetadata resultSetMetadata,
280+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
281+
final Timestamp startTimestamp =
282+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
283+
return new PartitionStartRecord(
284+
startTimestamp,
285+
partitionStartRecordProto.getRecordSequence(),
286+
partitionStartRecordProto.getPartitionTokensList(),
287+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
288+
}
289+
290+
ChangeStreamRecord parseProtoPartitionEndRecord(
291+
PartitionMetadata partition,
292+
ChangeStreamResultSetMetadata resultSetMetadata,
293+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
294+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
295+
return new PartitionEndRecord(
296+
endTimestamp,
297+
partitionEndRecordProto.getRecordSequence(),
298+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
299+
}
300+
301+
ChangeStreamRecord parseProtoPartitionEventRecord(
302+
PartitionMetadata partition,
303+
ChangeStreamResultSetMetadata resultSetMetadata,
304+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
305+
final Timestamp commitTimestamp =
306+
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
307+
return new PartitionEventRecord(
308+
commitTimestamp,
309+
partitionEventRecordProto.getRecordSequence(),
310+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
311+
}
312+
313+
ChangeStreamRecord parseProtoHeartbeatRecord(
314+
PartitionMetadata partition,
315+
ChangeStreamResultSetMetadata resultSetMetadata,
316+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
317+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
318+
return new HeartbeatRecord(
319+
heartbeatTimestamp,
320+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
321+
}
322+
323+
ChangeStreamRecord parseProtoDataChangeRecord(
324+
PartitionMetadata partition,
325+
ChangeStreamResultSetMetadata resultSetMetadata,
326+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
327+
final Timestamp commitTimestamp =
328+
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
329+
return new DataChangeRecord(
330+
partition.getPartitionToken(),
331+
commitTimestamp,
332+
dataChangeRecordProto.getServerTransactionId(),
333+
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
334+
dataChangeRecordProto.getRecordSequence(),
335+
dataChangeRecordProto.getTable(),
336+
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
337+
parseProtoMod(
338+
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
339+
parseProtoModType(dataChangeRecordProto.getModType()),
340+
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
341+
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
342+
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
343+
dataChangeRecordProto.getTransactionTag(),
344+
dataChangeRecordProto.getIsSystemTransaction(),
345+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
346+
}
347+
348+
List<ColumnType> parseProtoColumnMetadata(
349+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
350+
columnMetadataProtos) {
351+
List<ColumnType> columnTypes = new ArrayList<>();
352+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
353+
columnMetadataProto : columnMetadataProtos) {
354+
// TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`.
355+
String typeCodeJson;
356+
try {
357+
typeCodeJson = this.printer.print(columnMetadataProto.getType());
358+
} catch (InvalidProtocolBufferException exc) {
359+
throw new IllegalArgumentException(
360+
"Failed to print type: " + columnMetadataProto.getType().toString());
361+
}
362+
ColumnType columnType =
363+
new ColumnType(
364+
columnMetadataProto.getName(),
365+
new TypeCode(typeCodeJson),
366+
columnMetadataProto.getIsPrimaryKey(),
367+
columnMetadataProto.getOrdinalPosition());
368+
columnTypes.add(columnType);
369+
}
370+
return columnTypes;
371+
}
372+
373+
String ConvertModValueProtosToJson(
374+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
375+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
376+
columnMetadataProtos) {
377+
com.google.protobuf.Struct.Builder modStructValueBuilder =
378+
com.google.protobuf.Struct.newBuilder();
379+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
380+
modValueProtos) {
381+
final String columnName =
382+
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
383+
final Value columnValue = modValueProto.getValue();
384+
modStructValueBuilder.putFields(columnName, columnValue);
385+
}
386+
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
387+
String modValueJson;
388+
try {
389+
modValueJson = this.printer.print(modStructValue);
390+
} catch (InvalidProtocolBufferException exc) {
391+
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
392+
}
393+
return modValueJson;
394+
}
395+
396+
List<Mod> parseProtoMod(
397+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
398+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
399+
columnMetadataProtos) {
400+
List<Mod> mods = new ArrayList<>();
401+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
402+
final String keysJson =
403+
ConvertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
404+
final String oldValuesJson =
405+
ConvertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
406+
final String newValuesJson =
407+
ConvertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
408+
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
409+
mods.add(mod);
410+
}
411+
return mods;
412+
}
413+
414+
ModType parseProtoModType(
415+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
416+
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
417+
return ModType.INSERT;
418+
} else if (modTypeProto
419+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
420+
return ModType.UPDATE;
421+
} else if (modTypeProto
422+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
423+
return ModType.DELETE;
424+
}
425+
return ModType.UNKNOWN;
426+
}
427+
428+
ValueCaptureType parseProtoValueCaptureType(
429+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
430+
valueCaptureTypeProto) {
431+
if (valueCaptureTypeProto
432+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
433+
return ValueCaptureType.NEW_ROW;
434+
} else if (valueCaptureTypeProto
435+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
436+
return ValueCaptureType.NEW_VALUES;
437+
} else if (valueCaptureTypeProto
438+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
439+
.OLD_AND_NEW_VALUES) {
440+
return ValueCaptureType.OLD_AND_NEW_VALUES;
441+
} else if (valueCaptureTypeProto
442+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
443+
.NEW_ROW_AND_OLD_VALUES) {
444+
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
445+
}
446+
return ValueCaptureType.UNKNOWN;
447+
}
448+
232449
Stream<ChangeStreamRecord> toChangeStreamRecord(
233450
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234451

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public String toString() {
288288
+ '\''
289289
+ ", isSystemTransaction="
290290
+ isSystemTransaction
291-
+ ", metadata"
291+
+ ", metadata="
292292
+ metadata
293293
+ '}';
294294
}

0 commit comments

Comments
 (0)