diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java index a1b9935aeac..f3826f03ab8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java @@ -196,7 +196,7 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st statement.execute( String.format( - "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;", + "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `WEIGHT`;", inventoryDatabase.getDatabaseName())); expected.add( new AddColumnEvent( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java index 5aef160cb70..98b72ea329c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/Fetcher.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.util.Iterator; +import java.util.concurrent.Future; /** * Fetcher to fetch data of a table split, the split is either snapshot split {@link SnapshotSplit} @@ -33,7 +34,7 @@ public interface Fetcher { /** Add to task to fetch, this should call only when the reader is idle. */ - void submitTask(FetchTask fetchTask); + Future submitTask(FetchTask fetchTask); /** * Fetched records from data source. The method should return null when reaching the end of the diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index 3136bb97d25..82d08b6c79f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -40,8 +40,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,7 +88,9 @@ public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId } @Override - public void submitTask(FetchTask fetchTask) { + public Future submitTask(FetchTask fetchTask) { + CompletableFuture completableFuture = new CompletableFuture<>(); + this.snapshotSplitReadTask = fetchTask; this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit(); taskContext.configure(currentSnapshotSplit); @@ -94,10 +98,11 @@ public void submitTask(FetchTask fetchTask) { this.hasNextElement.set(true); this.reachEnd.set(false); - executorService.execute( + return executorService.submit( () -> { try { snapshotSplitReadTask.execute(taskContext); + completableFuture.complete(null); } catch (Exception e) { setReadException(e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 61c2b9ffba3..244b920466d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -44,9 +44,12 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent; + /** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */ public class IncrementalSourceStreamFetcher implements Fetcher { private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class); @@ -79,13 +82,13 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask } @Override - public void submitTask(FetchTask fetchTask) { + public Future submitTask(FetchTask fetchTask) { this.streamFetchTask = fetchTask; this.currentStreamSplit = fetchTask.getSplit().asStreamSplit(); configureFilter(); taskContext.configure(currentStreamSplit); this.queue = taskContext.getQueue(); - executorService.submit( + return executorService.submit( () -> { try { streamFetchTask.execute(taskContext); @@ -96,12 +99,6 @@ public void submitTask(FetchTask fetchTask) { currentStreamSplit), e); readException = e; - } finally { - try { - stopReadTask(); - } catch (Exception e) { - throw new RuntimeException(e); - } } }); } @@ -116,10 +113,19 @@ public boolean isFinished() { public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); final List sourceRecords = new ArrayList<>(); + // what happens if currentTaskRunning if (currentTaskRunning) { List batch = queue.poll(); for (DataChangeEvent event : batch) { - if (shouldEmit(event.getRecord())) { + if (isEndWatermarkEvent(event.getRecord())) { + LOG.info("Read split {} end watermark event", currentStreamSplit); + try { + stopReadTask(); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + } else if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); } else { LOG.debug("{} data change event should not emit", event); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 87a435ff62b..05b85e8a4a9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -58,12 +58,14 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; +import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent; /** * A Debezium binlog reader implementation that also support reads binlog and filter overlapping @@ -114,7 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); } - public void submitSplit(MySqlSplit mySqlSplit) { + public Future submitSplit(MySqlSplit mySqlSplit) { this.currentBinlogSplit = mySqlSplit.asBinlogSplit(); configureFilter(); statefulTaskContext.configure(currentBinlogSplit); @@ -134,7 +136,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { currentBinlogSplit, createEventFilter()); - executorService.submit( + return executorService.submit( () -> { try { binlogSplitReadTask.execute( @@ -148,8 +150,6 @@ public void submitSplit(MySqlSplit mySqlSplit) { currentBinlogSplit), t); readException = t; - } finally { - stopBinlogReadTask(); } }); } @@ -167,6 +167,16 @@ public Iterator pollSplitRecords() throws InterruptedException { if (currentTaskRunning) { List batch = queue.poll(); for (DataChangeEvent event : batch) { + if (isEndWatermarkEvent(event.getRecord())) { + LOG.info("Read split {} end watermark event", currentBinlogSplit); + try { + stopBinlogReadTask(); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + } + if (isParsingOnLineSchemaChanges) { Optional oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); @@ -398,4 +408,9 @@ public ExecutorService getExecutorService() { MySqlBinlogSplitReadTask getBinlogSplitReadTask() { return binlogSplitReadTask; } + + @VisibleForTesting + public StoppableChangeEventSourceContext getChangeEventSourceContext() { + return changeEventSourceContext; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/DebeziumReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/DebeziumReader.java index 3c3882e1296..c185960bca7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/DebeziumReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/DebeziumReader.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.util.Iterator; +import java.util.concurrent.Future; /** Reader to read split of table, the split is either snapshot split or binlog split. */ public interface DebeziumReader { @@ -32,7 +33,7 @@ public interface DebeziumReader { * * @param splitToRead */ - void submitSplit(Split splitToRead); + Future submitSplit(Split splitToRead); /** Close the reader and releases all resources. */ void close(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index b7b23a18e0e..ee18cde42f5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -129,7 +130,7 @@ public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskI } @Override - public void submitSplit(MySqlSplit mySqlSplit) { + public Future submitSplit(MySqlSplit mySqlSplit) { this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit(); statefulTaskContext.configure(currentSnapshotSplit); this.queue = statefulTaskContext.getQueue(); @@ -150,7 +151,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { currentSnapshotSplit, hooks, statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()); - executorService.execute( + return executorService.submit( () -> { try { currentTaskRunning = true; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e9205d2c5d5..72076bace42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.Future; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -90,6 +91,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */ class BinlogSplitReaderTest extends MySqlSourceTestBase { @@ -1119,6 +1121,37 @@ public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception Assertions.assertThat(sourceRecords).isEmpty(); } + @Test + void testReadBinlogWithException() throws Exception { + customerDatabase.createAndInitialize(); + MySqlSourceConfig sourceConfig = + getConfig(StartupOptions.latest(), new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + + // Create reader and submit splits + StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection); + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 0); + + // Mock an exception occurring during stream split reading by setting the error handler + // and stopping the change event source to test exception handling + Future future = reader.submitSplit(split); + statefulTaskContext + .getErrorHandler() + .setProducerThrowable(new RuntimeException("Test read with exception")); + reader.getChangeEventSourceContext().stopChangeEventSource(); + // wait until executor is finished. + future.get(); + + assertThatThrownBy(() -> pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord)) + .rootCause() + .isExactlyInstanceOf(RuntimeException.class) + .hasMessage("Test read with exception"); + reader.close(); + } + private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { return createBinlogReader(sourceConfig, false); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index b34db92b140..a3d28a12c3f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; @@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements FetchTask { private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class); private final StreamSplit split; + private final StoppableChangeEventSourceContext changeEventSourceContext; private volatile boolean taskRunning = false; private volatile boolean stopped = false; @@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements FetchTask { public PostgresStreamFetchTask(StreamSplit streamSplit) { this.split = streamSplit; + this.changeEventSourceContext = new StoppableChangeEventSourceContext(); } @Override @@ -92,8 +95,7 @@ public void execute(Context context) throws Exception { sourceFetchContext.getTaskContext(), sourceFetchContext.getReplicationConnection(), split); - StoppableChangeEventSourceContext changeEventSourceContext = - new StoppableChangeEventSourceContext(); + streamSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getPartition(), @@ -162,6 +164,11 @@ public void commitCurrentOffset(@Nullable Offset offsetToCommit) { } } + @VisibleForTesting + StoppableChangeEventSourceContext getChangeEventSourceContext() { + return changeEventSourceContext; + } + /** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */ public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java new file mode 100644 index 00000000000..0243676df52 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.fetch; + +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords; +import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; +import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils; +import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; +import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.Predicate; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link IncrementalSourceStreamFetcher }. */ +public class IncrementalSourceStreamFetcherTest extends PostgresTestBase { + + private static final String schemaName = "customer"; + private static final String tableName = "Customers"; + + private final UniqueDatabase customDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres", + "customer", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + + @Test + void testReadBinlogWithException() throws Exception { + customDatabase.createAndInitialize(); + PostgresSourceConfigFactory sourceConfigFactory = + getMockPostgresSourceConfigFactory(customDatabase, schemaName, tableName, 10, true); + sourceConfigFactory.startupOptions(StartupOptions.latest()); + PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0); + PostgresDialect dialect = new PostgresDialect(sourceConfigFactory.create(0)); + + // Create reader and submit splits + PostgresSourceFetchTaskContext taskContext = + new PostgresSourceFetchTaskContext(sourceConfig, dialect); + IncrementalSourceStreamFetcher fetcher = new IncrementalSourceStreamFetcher(taskContext, 0); + StreamSplit split = createStreamSplit(sourceConfig, dialect); + PostgresStreamFetchTask fetchTask = + (PostgresStreamFetchTask) dialect.createFetchTask(split); + StoppableChangeEventSourceContext changeEventSourceContext = + fetchTask.getChangeEventSourceContext(); + + Future future = fetcher.submitTask(fetchTask); + // Mock an exception occurring during stream split reading by setting the error handler + // and stopping the change event source to test exception handling + taskContext + .getErrorHandler() + .setProducerThrowable(new RuntimeException("Test read with exception")); + changeEventSourceContext.stopChangeEventSource(); + + // Wait for the task to complete + future.get(); + + assertThatThrownBy( + () -> pollRecordsFromReader(fetcher, SourceRecordUtils::isDataChangeRecord)) + .rootCause() + .isExactlyInstanceOf(RuntimeException.class) + .hasMessage("Test read with exception"); + fetcher.close(); + } + + private StreamSplit createStreamSplit( + PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws Exception { + StreamSplitAssigner streamSplitAssigner = + new StreamSplitAssigner( + sourceConfig, + dialect, + new PostgresOffsetFactory(), + new MockSplitEnumeratorContext<>(1)); + streamSplitAssigner.open(); + + Map tableSchemas = + dialect.discoverDataCollectionSchemas(sourceConfig); + return StreamSplit.fillTableSchemas(streamSplitAssigner.createStreamSplit(), tableSchemas); + } + + private List pollRecordsFromReader( + IncrementalSourceStreamFetcher fetcher, Predicate filter) { + List records = new ArrayList<>(); + Iterator recordIterator; + try { + recordIterator = fetcher.pollSplitRecords(); + } catch (InterruptedException e) { + throw new RuntimeException("Polling action was interrupted", e); + } + if (recordIterator == null) { + return records; + } + while (recordIterator.hasNext()) { + Iterator iterator = recordIterator.next().iterator(); + while (iterator.hasNext()) { + SourceRecord record = iterator.next(); + if (filter.test(record)) { + records.add(record); + } + } + } + LOG.debug("Records polled: {}", records); + return records; + } +}