Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `WEIGHT`;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.concurrent.Future;

/**
* Fetcher to fetch data of a table split, the split is either snapshot split {@link SnapshotSplit}
Expand All @@ -33,7 +34,7 @@
public interface Fetcher<T, Split> {

/** Add to task to fetch, this should call only when the reader is idle. */
void submitTask(FetchTask<Split> fetchTask);
Future<?> submitTask(FetchTask<Split> fetchTask);

/**
* Fetched records from data source. The method should return null when reaching the end of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -86,18 +88,21 @@ public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId
}

@Override
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

this.snapshotSplitReadTask = fetchTask;
this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
taskContext.configure(currentSnapshotSplit);
this.queue = taskContext.getQueue();
this.hasNextElement.set(true);
this.reachEnd.set(false);

executorService.execute(
return executorService.submit(
() -> {
try {
snapshotSplitReadTask.execute(taskContext);
completableFuture.complete(null);
} catch (Exception e) {
setReadException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;

/** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class);
Expand Down Expand Up @@ -79,13 +82,13 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
}

@Override
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
public Future<?> submitTask(FetchTask<SourceSplitBase> fetchTask) {
this.streamFetchTask = fetchTask;
this.currentStreamSplit = fetchTask.getSplit().asStreamSplit();
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
executorService.submit(
return executorService.submit(
() -> {
try {
streamFetchTask.execute(taskContext);
Expand All @@ -96,12 +99,6 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
currentStreamSplit),
e);
readException = e;
} finally {
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
Expand All @@ -116,10 +113,19 @@ public boolean isFinished() {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
// what happens if currentTaskRunning
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
if (isEndWatermarkEvent(event.getRecord())) {
LOG.info("Read split {} end watermark event", currentStreamSplit);
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
} else if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
} else {
LOG.debug("{} data change event should not emit", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent;

/**
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
Expand Down Expand Up @@ -114,7 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
}

public void submitSplit(MySqlSplit mySqlSplit) {
public Future<?> submitSplit(MySqlSplit mySqlSplit) {
this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
configureFilter();
statefulTaskContext.configure(currentBinlogSplit);
Expand All @@ -134,7 +136,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
currentBinlogSplit,
createEventFilter());

executorService.submit(
return executorService.submit(
() -> {
try {
binlogSplitReadTask.execute(
Expand All @@ -148,8 +150,6 @@ public void submitSplit(MySqlSplit mySqlSplit) {
currentBinlogSplit),
t);
readException = t;
} finally {
stopBinlogReadTask();
}
});
}
Expand All @@ -167,6 +167,16 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (isEndWatermarkEvent(event.getRecord())) {
LOG.info("Read split {} end watermark event", currentBinlogSplit);
try {
stopBinlogReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
}

if (isParsingOnLineSchemaChanges) {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
Expand Down Expand Up @@ -398,4 +408,9 @@ public ExecutorService getExecutorService() {
MySqlBinlogSplitReadTask getBinlogSplitReadTask() {
return binlogSplitReadTask;
}

@VisibleForTesting
public StoppableChangeEventSourceContext getChangeEventSourceContext() {
return changeEventSourceContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.concurrent.Future;

/** Reader to read split of table, the split is either snapshot split or binlog split. */
public interface DebeziumReader<T, Split> {
Expand All @@ -32,7 +33,7 @@ public interface DebeziumReader<T, Split> {
*
* @param splitToRead
*/
void submitSplit(Split splitToRead);
Future<?> submitSplit(Split splitToRead);

/** Close the reader and releases all resources. */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -129,7 +130,7 @@ public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskI
}

@Override
public void submitSplit(MySqlSplit mySqlSplit) {
public Future<?> submitSplit(MySqlSplit mySqlSplit) {
this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
statefulTaskContext.configure(currentSnapshotSplit);
this.queue = statefulTaskContext.getQueue();
Expand All @@ -150,7 +151,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
currentSnapshotSplit,
hooks,
statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill());
executorService.execute(
return executorService.submit(
() -> {
try {
currentTaskRunning = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -90,6 +91,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */
class BinlogSplitReaderTest extends MySqlSourceTestBase {
Expand Down Expand Up @@ -1119,6 +1121,37 @@ public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception
Assertions.assertThat(sourceRecords).isEmpty();
}

@Test
void testReadBinlogWithException() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.latest(), new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);

// Create reader and submit splits
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 0);

// Mock an exception occurring during stream split reading by setting the error handler
// and stopping the change event source to test exception handling
Future<?> future = reader.submitSplit(split);
statefulTaskContext
.getErrorHandler()
.setProducerThrowable(new RuntimeException("Test read with exception"));
reader.getChangeEventSourceContext().stopChangeEventSource();
// wait until executor is finished.
future.get();

assertThatThrownBy(() -> pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord))
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessage("Test read with exception");
reader.close();
}

private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.postgres.source.fetch;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class);

private final StreamSplit split;
private final StoppableChangeEventSourceContext changeEventSourceContext;
private volatile boolean taskRunning = false;
private volatile boolean stopped = false;

Expand All @@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {

public PostgresStreamFetchTask(StreamSplit streamSplit) {
this.split = streamSplit;
this.changeEventSourceContext = new StoppableChangeEventSourceContext();
}

@Override
Expand Down Expand Up @@ -92,8 +95,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getTaskContext(),
sourceFetchContext.getReplicationConnection(),
split);
StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();

streamSplitReadTask.execute(
changeEventSourceContext,
sourceFetchContext.getPartition(),
Expand Down Expand Up @@ -162,6 +164,11 @@ public void commitCurrentOffset(@Nullable Offset offsetToCommit) {
}
}

@VisibleForTesting
StoppableChangeEventSourceContext getChangeEventSourceContext() {
return changeEventSourceContext;
}

/** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */
public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
Expand Down
Loading