Skip to content

Commit 2fdb381

Browse files
committed
Protobased connector integration test.
1 parent 4422e14 commit 2fdb381

File tree

9 files changed

+469
-18
lines changed

9 files changed

+469
-18
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public ProcessContinuation run(
9999
RestrictionTracker<TimestampRange, Timestamp> tracker,
100100
OutputReceiver<PartitionMetadata> receiver,
101101
ManualWatermarkEstimator<Instant> watermarkEstimator) {
102-
102+
System.err.println("changliiu DetectNewPartitionsAction");
103103
final Timestamp readTimestamp = tracker.currentRestriction().getFrom();
104104
// Updates the current watermark as the min of the watermarks from all existing partitions
105105
final Timestamp minWatermark = cache.getUnfinishedMinWatermark();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionEndRecordAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public Optional<ProcessContinuation> run(
8383
ManualWatermarkEstimator<Instant> watermarkEstimator) {
8484

8585
final String token = partition.getPartitionToken();
86+
System.err.println("changliiu PartitionEndRecordAction");
8687
LOG.debug("[{}] Processing partition end record {}", token, record);
8788

8889
final Timestamp timestamp = record.getEndTimestamp();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionEventRecordAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public Optional<ProcessContinuation> run(
8383
ManualWatermarkEstimator<Instant> watermarkEstimator) {
8484

8585
final String token = partition.getPartitionToken();
86+
System.err.println("changliiu PartitionEventRecordAction");
8687
LOG.debug("[{}] Processing partition event record {}", token, record);
8788

8889
final Timestamp timestamp = record.getCommitTimestamp();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public Optional<ProcessContinuation> run(
102102
RestrictionInterrupter<Timestamp> interrupter,
103103
ManualWatermarkEstimator<Instant> watermarkEstimator) {
104104
final String token = partition.getPartitionToken();
105-
105+
System.err.println("changliiu PartitionStartRecordAction");
106106
LOG.debug("[{}] Processing partition start record {}", token, record);
107107

108108
final Timestamp startTimestamp = record.getStartTimestamp();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public ProcessContinuation run(
176176
OutputReceiver<DataChangeRecord> receiver,
177177
ManualWatermarkEstimator<Instant> watermarkEstimator,
178178
BundleFinalizer bundleFinalizer) {
179+
System.err.println("changliiu QueryChangeStreamAction");
179180
final String token = partition.getPartitionToken();
180181
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
181182
final Timestamp changeStreamQueryEndTimestamp =
@@ -208,6 +209,8 @@ public ProcessContinuation run(
208209
updatedPartition, resultSet, resultSet.getMetadata());
209210
Optional<ProcessContinuation> maybeContinuation;
210211
for (final ChangeStreamRecord record : records) {
212+
System.err.println(
213+
"changliiu finish-parsing:\n" + record.toString() + "\n, size: " + records.size());
211214
if (record instanceof DataChangeRecord) {
212215
maybeContinuation =
213216
dataChangeRecordAction.run(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,15 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
231231

232232
// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
233233
if (resultSet.isProtoChangeRecord()) {
234+
System.err.println("changliiu v2222 change stream recort");
234235
return Arrays.asList(
235236
toChangeStreamRecord(
236237
partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata));
237238
}
238239

239240
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
240241
// of structs.
242+
System.err.println("changliiu v111 change stream recort");
241243
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
242244
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
243245
.collect(Collectors.toList());
@@ -247,6 +249,8 @@ ChangeStreamRecord toChangeStreamRecord(
247249
PartitionMetadata partition,
248250
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto,
249251
ChangeStreamResultSetMetadata resultSetMetadata) {
252+
System.err.println(
253+
"changliiu proto-toChangeStreamRecord:\n" + changeStreamRecordProto.toString());
250254
if (changeStreamRecordProto.hasPartitionStartRecord()) {
251255
return parseProtoPartitionStartRecord(
252256
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,16 @@ public class IntegrationTestEnv extends ExternalResource {
5959
private String metadataDatabaseId;
6060
private String metadataTableName;
6161
private Spanner spanner;
62-
private final String host = "https://spanner.googleapis.com";
62+
private String host = "https://spanner.googleapis.com";
6363
private DatabaseAdminClient databaseAdminClient;
6464
private DatabaseClient databaseClient;
6565
private boolean isPostgres;
66+
private boolean isPlacementTableBasedChangeStream;
6667
public boolean useSeparateMetadataDb;
6768

6869
@Override
6970
protected void before() throws Throwable {
71+
System.err.println("changliiu IntegrationTestEnv before ");
7072
final ChangeStreamTestPipelineOptions options =
7173
IOITHelper.readIOTestPipelineOptions(ChangeStreamTestPipelineOptions.class);
7274

@@ -89,10 +91,17 @@ protected void before() throws Throwable {
8991

9092
IntegrationTestEnv() {
9193
this.isPostgres = false;
94+
this.isPlacementTableBasedChangeStream = false;
9295
}
9396

94-
IntegrationTestEnv(boolean isPostgres) {
95-
this.isPostgres = true;
97+
IntegrationTestEnv(
98+
boolean isPostgres, boolean isPlacementTableBasedChangeStream, Optional<String> host) {
99+
this.isPostgres = isPostgres;
100+
this.isPlacementTableBasedChangeStream = isPlacementTableBasedChangeStream;
101+
if (host.isPresent()) {
102+
this.host = host.get();
103+
}
104+
System.err.println("changliiu host: " + this.host);
96105
}
97106

98107
@Override
@@ -154,6 +163,7 @@ protected void after() {
154163
}
155164

156165
void createMetadataDatabase() throws ExecutionException, InterruptedException, TimeoutException {
166+
System.err.println("changliiu IntegrationTestEnv createMetadataDatabase ");
157167
recreateDatabase(databaseAdminClient, instanceId, metadataDatabaseId, isPostgres);
158168
useSeparateMetadataDb = true;
159169
}
@@ -183,27 +193,42 @@ String createSingersTable() throws InterruptedException, ExecutionException, Tim
183193
.updateDatabaseDdl(
184194
instanceId,
185195
databaseId,
186-
Collections.singletonList(
187-
"CREATE TABLE "
188-
+ tableName
189-
+ " ("
190-
+ " SingerId INT64 NOT NULL,"
191-
+ " FirstName STRING(1024),"
192-
+ " LastName STRING(1024),"
193-
+ " SingerInfo BYTES(MAX)"
194-
+ " ) PRIMARY KEY (SingerId)"),
196+
Collections.singletonList(createGSQLTableDDL(tableName)),
195197
null)
196198
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
197199
}
198200
tables.add(tableName);
199201
return tableName;
200202
}
201203

204+
String createGSQLTableDDL(String tableName) {
205+
if (this.isPlacementTableBasedChangeStream) {
206+
// create a placement table.
207+
return "CREATE TABLE "
208+
+ tableName
209+
+ " ("
210+
+ " SingerId INT64 NOT NULL,"
211+
+ " FirstName STRING(1024),"
212+
+ " LastName STRING(1024),"
213+
+ " SingerInfo BYTES(MAX),"
214+
+ " Location STRING(MAX) NOT NULL PLACEMENT KEY"
215+
+ " ) PRIMARY KEY (SingerId)";
216+
}
217+
return "CREATE TABLE "
218+
+ tableName
219+
+ " ("
220+
+ " SingerId INT64 NOT NULL,"
221+
+ " FirstName STRING(1024),"
222+
+ " LastName STRING(1024),"
223+
+ " SingerInfo BYTES(MAX)"
224+
+ " ) PRIMARY KEY (SingerId)";
225+
}
226+
202227
String createChangeStreamFor(String tableName)
203228
throws InterruptedException, ExecutionException, TimeoutException {
204229
final String changeStreamName = generateChangeStreamName();
230+
LOG.info("CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\"");
205231
if (this.isPostgres) {
206-
LOG.info("CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\"");
207232
databaseAdminClient
208233
.updateDatabaseDdl(
209234
instanceId,
@@ -217,15 +242,28 @@ String createChangeStreamFor(String tableName)
217242
.updateDatabaseDdl(
218243
instanceId,
219244
databaseId,
220-
Collections.singletonList(
221-
"CREATE CHANGE STREAM " + changeStreamName + " FOR " + tableName),
245+
Collections.singletonList(createGSQLChangeStreamDDL(changeStreamName, tableName)),
222246
null)
223247
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
224248
}
225249
changeStreams.add(changeStreamName);
226250
return changeStreamName;
227251
}
228252

253+
String createGSQLChangeStreamDDL(String changeStreamName, String tableName) {
254+
if (this.isPlacementTableBasedChangeStream) {
255+
System.err.println(
256+
"changliiu create v222 change stream: " + this.isPlacementTableBasedChangeStream);
257+
// Create a MUTABLE_KEY_RANGE change stream.
258+
return "CREATE CHANGE STREAM "
259+
+ changeStreamName
260+
+ " FOR "
261+
+ tableName
262+
+ "SET OPTIONS (partition_mode = 'MUTABLE_KEY_RANGE')";
263+
}
264+
return "CREATE CHANGE STREAM " + changeStreamName + " FOR " + tableName;
265+
}
266+
229267
void createRoleAndGrantPrivileges(String table, String changeStream)
230268
throws InterruptedException, ExecutionException, TimeoutException {
231269
if (this.isPostgres) {
@@ -296,13 +334,23 @@ private void recreateDatabase(
296334
Collections.emptyList())
297335
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
298336
} else {
337+
System.err.println(
338+
"changliiu recreateDatabase 1. Host: "
339+
+ host
340+
+ ", "
341+
+ projectId
342+
+ ", "
343+
+ instanceId
344+
+ ", "
345+
+ databaseId);
299346
databaseAdminClient
300347
.createDatabase(
301348
databaseAdminClient
302349
.newDatabaseBuilder(DatabaseId.of(this.projectId, instanceId, databaseId))
303350
.build(),
304351
Collections.emptyList())
305352
.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
353+
System.err.println("changliiu recreateDatabase end");
306354
}
307355
}
308356

0 commit comments

Comments
 (0)