From eff89050c17849d76e9121af933fa4cf27e5b75a Mon Sep 17 00:00:00 2001 From: changliiu Date: Fri, 13 Jun 2025 16:41:15 +0000 Subject: [PATCH] Parse v2 record. pick 4d2a5a7d0c # Fix API surface test (#35028) Deserialize change stream proto based (mutable key range) return types. --- .../beam/gradle/BeamModulePlugin.groovy | 3 +- .../dao/ChangeStreamResultSet.java | 20 ++ .../mapper/ChangeStreamRecordMapper.java | 213 ++++++++++++++- .../changestreams/model/DataChangeRecord.java | 2 +- .../mapper/ChangeStreamRecordMapperTest.java | 109 ++++++++ .../changestreams/util/TestProtoMapper.java | 257 ++++++++++++++++++ 6 files changed, 601 insertions(+), 3 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestProtoMapper.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 703f07ab3af9..68472386c0f1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -653,6 +653,7 @@ class BeamModulePlugin implements Plugin { def arrow_version = "15.0.2" def jmh_version = "1.34" def jupiter_version = "5.7.0" + def spanner_grpc_proto_version = "6.95.1" // Export Spark versions, so they are defined in a single place only project.ext.spark3_version = spark3_version @@ -862,7 +863,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version - proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java index f4ffba598a4b..1268c739164f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java @@ -108,6 +108,26 @@ public Struct getCurrentRowAsStruct() { return resultSet.getCurrentRowAsStruct(); } + /** + * Returns the only change stream record proto at the current pointer of the result set. It also + * updates the timestamp at which the record was read. This function enhances the getProtoMessage + * function but only focus on the ChangeStreamRecord type. + * + * @return a change stream record as a proto or null + */ + public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord() { + recordReadAt = Timestamp.now(); + return resultSet.getProtoMessage( + 0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance()); + } + + /** Returns true if the result set at the current pointer contain only one proto change record. */ + public boolean isProtoChangeRecord() { + return resultSet.getColumnCount() == 1 + && !resultSet.isNull(0) + && resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO; + } + /** * Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at * which the record was read. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java index 20314566dcc7..018f8ed55247 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java @@ -23,6 +23,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Value; import com.google.protobuf.util.JsonFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -42,7 +44,10 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -223,12 +228,218 @@ public List toChangeStreamRecords( return Collections.singletonList( toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata)); } - // In GoogleSQL, change stream records are returned as an array of structs. + + // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos. + if (resultSet.isProtoChangeRecord()) { + return Arrays.asList( + toChangeStreamRecord( + partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata)); + } + + // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array + // of structs. return resultSet.getCurrentRowAsStruct().getStructList(0).stream() .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata)) .collect(Collectors.toList()); } + ChangeStreamRecord toChangeStreamRecord( + PartitionMetadata partition, + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto, + ChangeStreamResultSetMetadata resultSetMetadata) { + if (changeStreamRecordProto.hasPartitionStartRecord()) { + return parseProtoPartitionStartRecord( + partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord()); + } else if (changeStreamRecordProto.hasPartitionEndRecord()) { + return parseProtoPartitionEndRecord( + partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord()); + } else if (changeStreamRecordProto.hasPartitionEventRecord()) { + return parseProtoPartitionEventRecord( + partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord()); + } else if (changeStreamRecordProto.hasHeartbeatRecord()) { + return parseProtoHeartbeatRecord( + partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord()); + } else if (changeStreamRecordProto.hasDataChangeRecord()) { + return parseProtoDataChangeRecord( + partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord()); + } else { + throw new IllegalArgumentException( + "Unknown change stream record type " + changeStreamRecordProto.toString()); + } + } + + ChangeStreamRecord parseProtoPartitionStartRecord( + PartitionMetadata partition, + ChangeStreamResultSetMetadata resultSetMetadata, + com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) { + final Timestamp startTimestamp = + Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp()); + return new PartitionStartRecord( + startTimestamp, + partitionStartRecordProto.getRecordSequence(), + partitionStartRecordProto.getPartitionTokensList(), + changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata)); + } + + ChangeStreamRecord parseProtoPartitionEndRecord( + PartitionMetadata partition, + ChangeStreamResultSetMetadata resultSetMetadata, + com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) { + final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp()); + return new PartitionEndRecord( + endTimestamp, + partitionEndRecordProto.getRecordSequence(), + changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata)); + } + + ChangeStreamRecord parseProtoPartitionEventRecord( + PartitionMetadata partition, + ChangeStreamResultSetMetadata resultSetMetadata, + com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) { + final Timestamp commitTimestamp = + Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp()); + return new PartitionEventRecord( + commitTimestamp, + partitionEventRecordProto.getRecordSequence(), + changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata)); + } + + ChangeStreamRecord parseProtoHeartbeatRecord( + PartitionMetadata partition, + ChangeStreamResultSetMetadata resultSetMetadata, + com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) { + final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp()); + return new HeartbeatRecord( + heartbeatTimestamp, + changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata)); + } + + ChangeStreamRecord parseProtoDataChangeRecord( + PartitionMetadata partition, + ChangeStreamResultSetMetadata resultSetMetadata, + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) { + final Timestamp commitTimestamp = + Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp()); + return new DataChangeRecord( + partition.getPartitionToken(), + commitTimestamp, + dataChangeRecordProto.getServerTransactionId(), + dataChangeRecordProto.getIsLastRecordInTransactionInPartition(), + dataChangeRecordProto.getRecordSequence(), + dataChangeRecordProto.getTable(), + parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()), + parseProtoMod( + dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()), + parseProtoModType(dataChangeRecordProto.getModType()), + parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()), + dataChangeRecordProto.getNumberOfRecordsInTransaction(), + dataChangeRecordProto.getNumberOfPartitionsInTransaction(), + dataChangeRecordProto.getTransactionTag(), + dataChangeRecordProto.getIsSystemTransaction(), + changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata)); + } + + List parseProtoColumnMetadata( + List + columnMetadataProtos) { + List columnTypes = new ArrayList<>(); + for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata + columnMetadataProto : columnMetadataProtos) { + // TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`. + String typeCodeJson; + try { + typeCodeJson = this.printer.print(columnMetadataProto.getType()); + } catch (InvalidProtocolBufferException exc) { + throw new IllegalArgumentException( + "Failed to print type: " + columnMetadataProto.getType().toString()); + } + ColumnType columnType = + new ColumnType( + columnMetadataProto.getName(), + new TypeCode(typeCodeJson), + columnMetadataProto.getIsPrimaryKey(), + columnMetadataProto.getOrdinalPosition()); + columnTypes.add(columnType); + } + return columnTypes; + } + + String convertModValueProtosToJson( + List modValueProtos, + List + columnMetadataProtos) { + com.google.protobuf.Struct.Builder modStructValueBuilder = + com.google.protobuf.Struct.newBuilder(); + for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto : + modValueProtos) { + final String columnName = + columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName(); + final Value columnValue = modValueProto.getValue(); + modStructValueBuilder.putFields(columnName, columnValue); + } + Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build(); + String modValueJson; + try { + modValueJson = this.printer.print(modStructValue); + } catch (InvalidProtocolBufferException exc) { + throw new IllegalArgumentException("Failed to print type: " + modStructValue); + } + return modValueJson; + } + + List parseProtoMod( + List modProtos, + List + columnMetadataProtos) { + List mods = new ArrayList<>(); + for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) { + final String keysJson = + convertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos); + final String oldValuesJson = + convertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos); + final String newValuesJson = + convertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos); + Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson); + mods.add(mod); + } + return mods; + } + + ModType parseProtoModType( + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) { + if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) { + return ModType.INSERT; + } else if (modTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) { + return ModType.UPDATE; + } else if (modTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) { + return ModType.DELETE; + } + return ModType.UNKNOWN; + } + + ValueCaptureType parseProtoValueCaptureType( + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + valueCaptureTypeProto) { + if (valueCaptureTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) { + return ValueCaptureType.NEW_ROW; + } else if (valueCaptureTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) { + return ValueCaptureType.NEW_VALUES; + } else if (valueCaptureTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + .OLD_AND_NEW_VALUES) { + return ValueCaptureType.OLD_AND_NEW_VALUES; + } else if (valueCaptureTypeProto + == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + .NEW_ROW_AND_OLD_VALUES) { + return ValueCaptureType.NEW_ROW_AND_OLD_VALUES; + } + return ValueCaptureType.UNKNOWN; + } + Stream toChangeStreamRecord( PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java index e00ef9c08ca1..ffba4d10e3a2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java @@ -288,7 +288,7 @@ public String toString() { + '\'' + ", isSystemTransaction=" + isSystemTransaction - + ", metadata" + + ", metadata=" + metadata + '}'; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java index a06fb074e637..b3dd1bef049f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestJsonMapper.recordToJson; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsWithUnknownModTypeAndValueCaptureType; @@ -41,8 +42,11 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -930,4 +934,109 @@ public void testMappingJsonRowToChildPartitionRecord() { Collections.singletonList(childPartitionsRecord), mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); } + + @Test + public void testMappingProtoRowToPartitionStartRecord() { + final PartitionStartRecord partitionStartRecord = + new PartitionStartRecord( + Timestamp.MIN_VALUE, + "fakeRecordSequence", + Arrays.asList("partitionToken1", "partitionToken2"), + null); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + recordToProto(partitionStartRecord); + assertNotNull(changeStreamRecordProto); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + + when(resultSet.isProtoChangeRecord()).thenReturn(true); + when(resultSet.getProtoChangeStreamRecord()).thenReturn(changeStreamRecordProto); + assertEquals( + Collections.singletonList(partitionStartRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + + @Test + public void testMappingProtoRowToPartitionEndRecord() { + final PartitionEndRecord partitionEndChange = + new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + recordToProto(partitionEndChange); + assertNotNull(changeStreamRecordProto); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + + when(resultSet.isProtoChangeRecord()).thenReturn(true); + when(resultSet.getProtoChangeStreamRecord()).thenReturn(changeStreamRecordProto); + assertEquals( + Collections.singletonList(partitionEndChange), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + + @Test + public void testMappingProtoRowToPartitionEventRecord() { + final PartitionEventRecord partitionEventRecord = + new PartitionEventRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + recordToProto(partitionEventRecord); + assertNotNull(changeStreamRecordProto); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + + when(resultSet.isProtoChangeRecord()).thenReturn(true); + when(resultSet.getProtoChangeStreamRecord()).thenReturn(changeStreamRecordProto); + assertEquals( + Collections.singletonList(partitionEventRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + + @Test + public void testMappingProtoRowToHeartbeatRecord() { + final HeartbeatRecord heartbeatRecord = + new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + recordToProto(heartbeatRecord); + assertNotNull(changeStreamRecordProto); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + + when(resultSet.isProtoChangeRecord()).thenReturn(true); + when(resultSet.getProtoChangeStreamRecord()).thenReturn(changeStreamRecordProto); + assertEquals( + Collections.singletonList(heartbeatRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } + + @Test + public void testMappingProtoRowToDataChangeRecord() { + final DataChangeRecord dataChangeRecord = + new DataChangeRecord( + "partitionToken", + Timestamp.ofTimeSecondsAndNanos(10L, 20), + "serverTransactionId", + true, + "1", + "tableName", + Arrays.asList( + new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L), + new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)), + Collections.singletonList( + new Mod( + "{\"column1\":\"value1\"}", + "{\"column2\":\"oldValue2\"}", + "{\"column2\":\"newValue2\"}")), + ModType.UPDATE, + ValueCaptureType.OLD_AND_NEW_VALUES, + 10L, + 2L, + "transactionTag", + true, + null); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + recordToProto(dataChangeRecord); + assertNotNull(changeStreamRecordProto); + ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + + when(resultSet.isProtoChangeRecord()).thenReturn(true); + when(resultSet.getProtoChangeStreamRecord()).thenReturn(changeStreamRecordProto); + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestProtoMapper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestProtoMapper.java new file mode 100644 index 000000000000..6cf5958f03e9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestProtoMapper.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.util; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Value; +import com.google.protobuf.util.JsonFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType; + +// Test util class to convert ChangeStreamRecord class to proto represenatation. Similar to +// TestJsonMapper and TestStructMapper. +public class TestProtoMapper { + + public static com.google.spanner.v1.ChangeStreamRecord recordToProto(ChangeStreamRecord record) { + if (record instanceof PartitionStartRecord) { + return convertPartitionStartRecordToProto((PartitionStartRecord) record); + } else if (record instanceof PartitionEndRecord) { + return convertPartitionEndRecordToProto((PartitionEndRecord) record); + } else if (record instanceof PartitionEventRecord) { + return convertPartitionEventRecordToProto((PartitionEventRecord) record); + } else if (record instanceof HeartbeatRecord) { + return convertHeartbeatRecordToProto((HeartbeatRecord) record); + } else if (record instanceof DataChangeRecord) { + return convertDataChangeRecordToProto((DataChangeRecord) record); + } else { + throw new UnsupportedOperationException("Unimplemented mapping for " + record.getClass()); + } + } + + private static com.google.spanner.v1.ChangeStreamRecord convertPartitionStartRecordToProto( + PartitionStartRecord partitionStartRecord) { + com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto = + com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord.newBuilder() + .setStartTimestamp(partitionStartRecord.getStartTimestamp().toProto()) + .setRecordSequence(partitionStartRecord.getRecordSequence()) + .addAllPartitionTokens(partitionStartRecord.getPartitionTokens()) + .build(); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + com.google.spanner.v1.ChangeStreamRecord.newBuilder() + .setPartitionStartRecord(partitionStartRecordProto) + .build(); + return changeStreamRecordProto; + } + + private static com.google.spanner.v1.ChangeStreamRecord convertPartitionEndRecordToProto( + PartitionEndRecord partitionEndRecord) { + com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto = + com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord.newBuilder() + .setEndTimestamp(partitionEndRecord.getEndTimestamp().toProto()) + .setRecordSequence(partitionEndRecord.getRecordSequence()) + .build(); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + com.google.spanner.v1.ChangeStreamRecord.newBuilder() + .setPartitionEndRecord(partitionEndRecordProto) + .build(); + return changeStreamRecordProto; + } + + private static com.google.spanner.v1.ChangeStreamRecord convertPartitionEventRecordToProto( + PartitionEventRecord partitionEventRecord) { + com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto = + com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.newBuilder() + .setCommitTimestamp(partitionEventRecord.getCommitTimestamp().toProto()) + .setRecordSequence(partitionEventRecord.getRecordSequence()) + .build(); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + com.google.spanner.v1.ChangeStreamRecord.newBuilder() + .setPartitionEventRecord(partitionEventRecordProto) + .build(); + return changeStreamRecordProto; + } + + private static com.google.spanner.v1.ChangeStreamRecord convertHeartbeatRecordToProto( + HeartbeatRecord heartbeatRecord) { + com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto = + com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord.newBuilder() + .setTimestamp(heartbeatRecord.getTimestamp().toProto()) + .build(); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + com.google.spanner.v1.ChangeStreamRecord.newBuilder() + .setHeartbeatRecord(heartbeatRecordProto) + .build(); + return changeStreamRecordProto; + } + + private static com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType getProtoModType( + ModType modType) { + if (modType == ModType.INSERT) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT; + } else if (modType == ModType.UPDATE) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE; + } else if (modType == ModType.DELETE) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE; + } + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.MOD_TYPE_UNSPECIFIED; + } + + private static com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + getProtoValueCaptureTypeProto(ValueCaptureType valueCaptureType) { + if (valueCaptureType == ValueCaptureType.NEW_ROW) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW; + } + if (valueCaptureType == ValueCaptureType.NEW_VALUES) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES; + } + if (valueCaptureType == ValueCaptureType.OLD_AND_NEW_VALUES) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + .OLD_AND_NEW_VALUES; + } + if (valueCaptureType == ValueCaptureType.NEW_ROW_AND_OLD_VALUES) { + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + .NEW_ROW_AND_OLD_VALUES; + } + return com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType + .VALUE_CAPTURE_TYPE_UNSPECIFIED; + } + + private static List + getProtoColumnMetadata(List columnTypes) { + JsonFormat.Parser jsonParser = JsonFormat.parser().ignoringUnknownFields(); + + List + columnMetaDataProtos = new ArrayList<>(); + for (ColumnType columnType : columnTypes) { + // TypeCode class contains json format type code, e.g. {\"code\":\"INT64\"}. We need to + // extract "INT64" type code. + Value.Builder typeCodeJson = Value.newBuilder(); + try { + jsonParser.merge(columnType.getType().getCode(), typeCodeJson); + } catch (InvalidProtocolBufferException exc) { + throw new IllegalArgumentException( + "Failed to parse json type code into proto: " + columnType.getType().getCode()); + } + Value typeCode = + Optional.ofNullable(typeCodeJson.build().getStructValue().getFieldsMap().get("code")) + .orElseThrow(IllegalArgumentException::new); + + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata columnMetadataProto = + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata.newBuilder() + .setName(columnType.getName()) + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(com.google.spanner.v1.TypeCode.valueOf(typeCode.getStringValue()))) + .setIsPrimaryKey(columnType.isPrimaryKey()) + .setOrdinalPosition(columnType.getOrdinalPosition()) + .build(); + columnMetaDataProtos.add(columnMetadataProto); + } + return columnMetaDataProtos; + } + + private static List + columnsJsonToProtos(String columnsJson, Map columnNameToIndex) { + List modValueProtos = + new ArrayList<>(); + JsonFormat.Parser jsonParser = JsonFormat.parser().ignoringUnknownFields(); + Value.Builder columnsJsonValue = Value.newBuilder(); + try { + jsonParser.merge(columnsJson, columnsJsonValue); + } catch (InvalidProtocolBufferException exc) { + throw new IllegalArgumentException( + "Failed to parse json type columns into proto: " + columnsJson); + } + Map columns = columnsJsonValue.build().getStructValue().getFieldsMap(); + for (Map.Entry entry : columns.entrySet()) { + final String columnName = entry.getKey(); + final String columnValue = entry.getValue().getStringValue(); + final Integer columnIndex = columnNameToIndex.get(columnName); + + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto = + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue.newBuilder() + .setColumnMetadataIndex(columnIndex) + .setValue(Value.newBuilder().setStringValue(columnValue).build()) + .build(); + modValueProtos.add(modValueProto); + } + return modValueProtos; + } + + private static List getProtoMods( + List mods, List columnTypes) { + Map columnNameToIndex = new HashMap<>(); + for (int i = 0; i < columnTypes.size(); ++i) { + columnNameToIndex.put(columnTypes.get(i).getName(), i); + } + List modProtos = + new ArrayList<>(); + for (Mod mod : mods) { + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto = + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod.newBuilder() + .addAllKeys(columnsJsonToProtos(mod.getKeysJson(), columnNameToIndex)) + .addAllOldValues(columnsJsonToProtos(mod.getOldValuesJson(), columnNameToIndex)) + .addAllNewValues(columnsJsonToProtos(mod.getNewValuesJson(), columnNameToIndex)) + .build(); + modProtos.add(modProto); + } + return modProtos; + } + + private static com.google.spanner.v1.ChangeStreamRecord convertDataChangeRecordToProto( + DataChangeRecord dataChangeRecord) { + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto = + com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.newBuilder() + .setCommitTimestamp(dataChangeRecord.getCommitTimestamp().toProto()) + .setRecordSequence(dataChangeRecord.getRecordSequence()) + .setServerTransactionId(dataChangeRecord.getServerTransactionId()) + .setIsLastRecordInTransactionInPartition( + dataChangeRecord.isLastRecordInTransactionInPartition()) + .setTable(dataChangeRecord.getTableName()) + .addAllColumnMetadata(getProtoColumnMetadata(dataChangeRecord.getRowType())) + .addAllMods(getProtoMods(dataChangeRecord.getMods(), dataChangeRecord.getRowType())) + .setModType(getProtoModType(dataChangeRecord.getModType())) + .setValueCaptureType( + getProtoValueCaptureTypeProto(dataChangeRecord.getValueCaptureType())) + .setNumberOfRecordsInTransaction( + (int) dataChangeRecord.getNumberOfRecordsInTransaction()) + .setNumberOfPartitionsInTransaction( + (int) dataChangeRecord.getNumberOfPartitionsInTransaction()) + .setTransactionTag(dataChangeRecord.getTransactionTag()) + .setIsSystemTransaction(dataChangeRecord.isSystemTransaction()) + .build(); + com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto = + com.google.spanner.v1.ChangeStreamRecord.newBuilder() + .setDataChangeRecord(dataChangeRecordProto) + .build(); + return changeStreamRecordProto; + } +}