Skip to content

Commit b63e49b

Browse files
committed
[FLINK-38265] Stream Split shouldn't finish when exception occors but met END Watermark.
1 parent f6197d2 commit b63e49b

File tree

10 files changed

+230
-22
lines changed

10 files changed

+230
-22
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
196196

197197
statement.execute(
198198
String.format(
199-
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
199+
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `WEIGHT`;",
200200
inventoryDatabase.getDatabaseName()));
201201
expected.add(
202202
new AddColumnEvent(

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import javax.annotation.Nullable;
2525

2626
import java.util.Iterator;
27+
import java.util.concurrent.Future;
2728

2829
/**
2930
* Fetcher to fetch data of a table split, the split is either snapshot split {@link SnapshotSplit}
@@ -33,7 +34,7 @@
3334
public interface Fetcher<T, Split> {
3435

3536
/** Add to task to fetch, this should call only when the reader is idle. */
36-
void submitTask(FetchTask<Split> fetchTask);
37+
Future<?> submitTask(FetchTask<Split> fetchTask);
3738

3839
/**
3940
* Fetched records from data source. The method should return null when reaching the end of the

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import java.util.Iterator;
4141
import java.util.List;
4242
import java.util.Map;
43+
import java.util.concurrent.CompletableFuture;
4344
import java.util.concurrent.ExecutorService;
4445
import java.util.concurrent.Executors;
46+
import java.util.concurrent.Future;
4547
import java.util.concurrent.ThreadFactory;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,18 +88,21 @@ public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId
8688
}
8789

8890
@Override
89-
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
91+
public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
92+
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
93+
9094
this.snapshotSplitReadTask = fetchTask;
9195
this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
9296
taskContext.configure(currentSnapshotSplit);
9397
this.queue = taskContext.getQueue();
9498
this.hasNextElement.set(true);
9599
this.reachEnd.set(false);
96100

97-
executorService.execute(
101+
return executorService.submit(
98102
() -> {
99103
try {
100104
snapshotSplitReadTask.execute(taskContext);
105+
completableFuture.complete(null);
101106
} catch (Exception e) {
102107
setReadException(e);
103108
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@
4444
import java.util.Set;
4545
import java.util.concurrent.ExecutorService;
4646
import java.util.concurrent.Executors;
47+
import java.util.concurrent.Future;
4748
import java.util.concurrent.ThreadFactory;
4849
import java.util.concurrent.TimeUnit;
4950

51+
import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
52+
5053
/** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */
5154
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
5255
private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class);
@@ -79,13 +82,13 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
7982
}
8083

8184
@Override
82-
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
85+
public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
8386
this.streamFetchTask = fetchTask;
8487
this.currentStreamSplit = fetchTask.getSplit().asStreamSplit();
8588
configureFilter();
8689
taskContext.configure(currentStreamSplit);
8790
this.queue = taskContext.getQueue();
88-
executorService.submit(
91+
return executorService.submit(
8992
() -> {
9093
try {
9194
streamFetchTask.execute(taskContext);
@@ -96,12 +99,6 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
9699
currentStreamSplit),
97100
e);
98101
readException = e;
99-
} finally {
100-
try {
101-
stopReadTask();
102-
} catch (Exception e) {
103-
throw new RuntimeException(e);
104-
}
105102
}
106103
});
107104
}
@@ -116,10 +113,19 @@ public boolean isFinished() {
116113
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
117114
checkReadException();
118115
final List<SourceRecord> sourceRecords = new ArrayList<>();
116+
// what happens if currentTaskRunning
119117
if (currentTaskRunning) {
120118
List<DataChangeEvent> batch = queue.poll();
121119
for (DataChangeEvent event : batch) {
122-
if (shouldEmit(event.getRecord())) {
120+
if (isEndWatermarkEvent(event.getRecord())) {
121+
LOG.info("Read split {} end watermark event", currentStreamSplit);
122+
try {
123+
stopReadTask();
124+
} catch (Exception e) {
125+
throw new RuntimeException(e);
126+
}
127+
break;
128+
} else if (shouldEmit(event.getRecord())) {
123129
sourceRecords.add(event.getRecord());
124130
} else {
125131
LOG.debug("{} data change event should not emit", event);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@
5858
import java.util.Set;
5959
import java.util.concurrent.ExecutorService;
6060
import java.util.concurrent.Executors;
61+
import java.util.concurrent.Future;
6162
import java.util.concurrent.ThreadFactory;
6263
import java.util.concurrent.TimeUnit;
6364
import java.util.function.Predicate;
6465

6566
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
6667
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
68+
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent;
6769

6870
/**
6971
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
@@ -114,7 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
114116
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
115117
}
116118

117-
public void submitSplit(MySqlSplit mySqlSplit) {
119+
public Future<?> submitSplit(MySqlSplit mySqlSplit) {
118120
this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
119121
configureFilter();
120122
statefulTaskContext.configure(currentBinlogSplit);
@@ -134,7 +136,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
134136
currentBinlogSplit,
135137
createEventFilter());
136138

137-
executorService.submit(
139+
return executorService.submit(
138140
() -> {
139141
try {
140142
binlogSplitReadTask.execute(
@@ -148,8 +150,6 @@ public void submitSplit(MySqlSplit mySqlSplit) {
148150
currentBinlogSplit),
149151
t);
150152
readException = t;
151-
} finally {
152-
stopBinlogReadTask();
153153
}
154154
});
155155
}
@@ -167,6 +167,16 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
167167
if (currentTaskRunning) {
168168
List<DataChangeEvent> batch = queue.poll();
169169
for (DataChangeEvent event : batch) {
170+
if (isEndWatermarkEvent(event.getRecord())) {
171+
LOG.info("Read split {} end watermark event", currentBinlogSplit);
172+
try {
173+
stopBinlogReadTask();
174+
} catch (Exception e) {
175+
throw new RuntimeException(e);
176+
}
177+
break;
178+
}
179+
170180
if (isParsingOnLineSchemaChanges) {
171181
Optional<SourceRecord> oscRecord =
172182
parseOnLineSchemaChangeEvent(event.getRecord());
@@ -398,4 +408,9 @@ public ExecutorService getExecutorService() {
398408
MySqlBinlogSplitReadTask getBinlogSplitReadTask() {
399409
return binlogSplitReadTask;
400410
}
411+
412+
@VisibleForTesting
413+
public StoppableChangeEventSourceContext getChangeEventSourceContext() {
414+
return changeEventSourceContext;
415+
}
401416
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/DebeziumReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import javax.annotation.Nullable;
2121

2222
import java.util.Iterator;
23+
import java.util.concurrent.Future;
2324

2425
/** Reader to read split of table, the split is either snapshot split or binlog split. */
2526
public interface DebeziumReader<T, Split> {
@@ -32,7 +33,7 @@ public interface DebeziumReader<T, Split> {
3233
*
3334
* @param splitToRead
3435
*/
35-
void submitSplit(Split splitToRead);
36+
Future<?> submitSplit(Split splitToRead);
3637

3738
/** Close the reader and releases all resources. */
3839
void close();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Map;
6363
import java.util.concurrent.ExecutorService;
6464
import java.util.concurrent.Executors;
65+
import java.util.concurrent.Future;
6566
import java.util.concurrent.ThreadFactory;
6667
import java.util.concurrent.TimeUnit;
6768
import java.util.concurrent.atomic.AtomicBoolean;
@@ -129,7 +130,7 @@ public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskI
129130
}
130131

131132
@Override
132-
public void submitSplit(MySqlSplit mySqlSplit) {
133+
public Future<?> submitSplit(MySqlSplit mySqlSplit) {
133134
this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
134135
statefulTaskContext.configure(currentSnapshotSplit);
135136
this.queue = statefulTaskContext.getQueue();
@@ -150,7 +151,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
150151
currentSnapshotSplit,
151152
hooks,
152153
statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill());
153-
executorService.execute(
154+
return executorService.submit(
154155
() -> {
155156
try {
156157
currentTaskRunning = true;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.Map;
8181
import java.util.Optional;
8282
import java.util.Properties;
83+
import java.util.concurrent.Future;
8384
import java.util.function.Predicate;
8485
import java.util.stream.Collectors;
8586
import java.util.stream.Stream;
@@ -90,6 +91,7 @@
9091
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
9192
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
9293
import static org.assertj.core.api.Assertions.assertThat;
94+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
9395

9496
/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */
9597
class BinlogSplitReaderTest extends MySqlSourceTestBase {
@@ -1119,6 +1121,37 @@ public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception
11191121
Assertions.assertThat(sourceRecords).isEmpty();
11201122
}
11211123

1124+
@Test
1125+
void testReadBinlogWithException() throws Exception {
1126+
customerDatabase.createAndInitialize();
1127+
MySqlSourceConfig sourceConfig =
1128+
getConfig(StartupOptions.latest(), new String[] {"customers"});
1129+
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
1130+
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
1131+
1132+
// Create reader and submit splits
1133+
StatefulTaskContext statefulTaskContext =
1134+
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
1135+
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
1136+
BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 0);
1137+
1138+
// Mock an exception occurring during stream split reading by setting the error handler
1139+
// and stopping the change event source to test exception handling
1140+
Future<?> future = reader.submitSplit(split);
1141+
statefulTaskContext
1142+
.getErrorHandler()
1143+
.setProducerThrowable(new RuntimeException("Test read with exception"));
1144+
reader.getChangeEventSourceContext().stopChangeEventSource();
1145+
// wait until executor is finished.
1146+
future.get();
1147+
1148+
assertThatThrownBy(() -> pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord))
1149+
.rootCause()
1150+
.isExactlyInstanceOf(RuntimeException.class)
1151+
.hasMessage("Test read with exception");
1152+
reader.close();
1153+
}
1154+
11221155
private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
11231156
return createBinlogReader(sourceConfig, false);
11241157
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.postgres.source.fetch;
1919

20+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2021
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
2122
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
2223
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
@@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
5455
private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class);
5556

5657
private final StreamSplit split;
58+
private final StoppableChangeEventSourceContext changeEventSourceContext;
5759
private volatile boolean taskRunning = false;
5860
private volatile boolean stopped = false;
5961

@@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
6365

6466
public PostgresStreamFetchTask(StreamSplit streamSplit) {
6567
this.split = streamSplit;
68+
this.changeEventSourceContext = new StoppableChangeEventSourceContext();
6669
}
6770

6871
@Override
@@ -92,8 +95,7 @@ public void execute(Context context) throws Exception {
9295
sourceFetchContext.getTaskContext(),
9396
sourceFetchContext.getReplicationConnection(),
9497
split);
95-
StoppableChangeEventSourceContext changeEventSourceContext =
96-
new StoppableChangeEventSourceContext();
98+
9799
streamSplitReadTask.execute(
98100
changeEventSourceContext,
99101
sourceFetchContext.getPartition(),
@@ -162,6 +164,11 @@ public void commitCurrentOffset(@Nullable Offset offsetToCommit) {
162164
}
163165
}
164166

167+
@VisibleForTesting
168+
StoppableChangeEventSourceContext getChangeEventSourceContext() {
169+
return changeEventSourceContext;
170+
}
171+
165172
/** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */
166173
public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
167174
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);

0 commit comments

Comments
 (0)