Skip to content

Deserialize proto based (mutable key range) change stream return records #35408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ class BeamModulePlugin implements Plugin<Project> {
def arrow_version = "15.0.2"
def jmh_version = "1.34"
def jupiter_version = "5.7.0"
def spanner_grpc_proto_version = "6.95.1"

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
Expand Down Expand Up @@ -862,7 +863,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ public Struct getCurrentRowAsStruct() {
return resultSet.getCurrentRowAsStruct();
}

/**
* Returns the only change stream record proto at the current pointer of the result set. It also
* updates the timestamp at which the record was read. This function enhances the getProtoMessage
* function but only focus on the ChangeStreamRecord type.
*
* @return a change stream record as a proto or null
*/
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord() {
recordReadAt = Timestamp.now();
return resultSet.getProtoMessage(
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
}

/** Returns true if the result set at the current pointer contain only one proto change record. */
public boolean isProtoChangeRecord() {
return resultSet.getColumnCount() == 1
&& !resultSet.isNull(0)
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
}

/**
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
* which the record was read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Value;
import com.google.protobuf.util.JsonFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -42,7 +44,10 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -223,12 +228,218 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
return Collections.singletonList(
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
}
// In GoogleSQL, change stream records are returned as an array of structs.

// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
if (resultSet.isProtoChangeRecord()) {
return Arrays.asList(
toChangeStreamRecord(
partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata));
}

// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
// of structs.
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
.collect(Collectors.toList());
}

ChangeStreamRecord toChangeStreamRecord(
PartitionMetadata partition,
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto,
ChangeStreamResultSetMetadata resultSetMetadata) {
if (changeStreamRecordProto.hasPartitionStartRecord()) {
return parseProtoPartitionStartRecord(
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
return parseProtoPartitionEndRecord(
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
return parseProtoPartitionEventRecord(
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
return parseProtoHeartbeatRecord(
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
return parseProtoDataChangeRecord(
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
} else {
throw new IllegalArgumentException(
"Unknown change stream record type " + changeStreamRecordProto.toString());
}
}

ChangeStreamRecord parseProtoPartitionStartRecord(
PartitionMetadata partition,
ChangeStreamResultSetMetadata resultSetMetadata,
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
final Timestamp startTimestamp =
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
return new PartitionStartRecord(
startTimestamp,
partitionStartRecordProto.getRecordSequence(),
partitionStartRecordProto.getPartitionTokensList(),
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
}

ChangeStreamRecord parseProtoPartitionEndRecord(
PartitionMetadata partition,
ChangeStreamResultSetMetadata resultSetMetadata,
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
return new PartitionEndRecord(
endTimestamp,
partitionEndRecordProto.getRecordSequence(),
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
}

ChangeStreamRecord parseProtoPartitionEventRecord(
PartitionMetadata partition,
ChangeStreamResultSetMetadata resultSetMetadata,
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
final Timestamp commitTimestamp =
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
return new PartitionEventRecord(
commitTimestamp,
partitionEventRecordProto.getRecordSequence(),
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
}

ChangeStreamRecord parseProtoHeartbeatRecord(
PartitionMetadata partition,
ChangeStreamResultSetMetadata resultSetMetadata,
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
return new HeartbeatRecord(
heartbeatTimestamp,
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
}

ChangeStreamRecord parseProtoDataChangeRecord(
PartitionMetadata partition,
ChangeStreamResultSetMetadata resultSetMetadata,
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
final Timestamp commitTimestamp =
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
return new DataChangeRecord(
partition.getPartitionToken(),
commitTimestamp,
dataChangeRecordProto.getServerTransactionId(),
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
dataChangeRecordProto.getRecordSequence(),
dataChangeRecordProto.getTable(),
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
parseProtoMod(
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
parseProtoModType(dataChangeRecordProto.getModType()),
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
dataChangeRecordProto.getTransactionTag(),
dataChangeRecordProto.getIsSystemTransaction(),
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
}

List<ColumnType> parseProtoColumnMetadata(
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
columnMetadataProtos) {
List<ColumnType> columnTypes = new ArrayList<>();
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
columnMetadataProto : columnMetadataProtos) {
// TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`.
String typeCodeJson;
try {
typeCodeJson = this.printer.print(columnMetadataProto.getType());
} catch (InvalidProtocolBufferException exc) {
throw new IllegalArgumentException(
"Failed to print type: " + columnMetadataProto.getType().toString());
}
ColumnType columnType =
new ColumnType(
columnMetadataProto.getName(),
new TypeCode(typeCodeJson),
columnMetadataProto.getIsPrimaryKey(),
columnMetadataProto.getOrdinalPosition());
columnTypes.add(columnType);
}
return columnTypes;
}

String convertModValueProtosToJson(
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
columnMetadataProtos) {
com.google.protobuf.Struct.Builder modStructValueBuilder =
com.google.protobuf.Struct.newBuilder();
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
modValueProtos) {
final String columnName =
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
final Value columnValue = modValueProto.getValue();
modStructValueBuilder.putFields(columnName, columnValue);
}
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
String modValueJson;
try {
modValueJson = this.printer.print(modStructValue);
} catch (InvalidProtocolBufferException exc) {
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
}
return modValueJson;
}

List<Mod> parseProtoMod(
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
columnMetadataProtos) {
List<Mod> mods = new ArrayList<>();
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
final String keysJson =
convertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
final String oldValuesJson =
convertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
final String newValuesJson =
convertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
mods.add(mod);
}
return mods;
}

ModType parseProtoModType(
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
return ModType.INSERT;
} else if (modTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
return ModType.UPDATE;
} else if (modTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
return ModType.DELETE;
}
return ModType.UNKNOWN;
}

ValueCaptureType parseProtoValueCaptureType(
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
valueCaptureTypeProto) {
if (valueCaptureTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
return ValueCaptureType.NEW_ROW;
} else if (valueCaptureTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
return ValueCaptureType.NEW_VALUES;
} else if (valueCaptureTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
.OLD_AND_NEW_VALUES) {
return ValueCaptureType.OLD_AND_NEW_VALUES;
} else if (valueCaptureTypeProto
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
.NEW_ROW_AND_OLD_VALUES) {
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
}
return ValueCaptureType.UNKNOWN;
}

Stream<ChangeStreamRecord> toChangeStreamRecord(
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public String toString() {
+ '\''
+ ", isSystemTransaction="
+ isSystemTransaction
+ ", metadata"
+ ", metadata="
+ metadata
+ '}';
}
Expand Down
Loading
Loading