Skip to content

Conversation

loserwang1024
Copy link
Contributor

@loserwang1024 loserwang1024 commented Aug 19, 2025

As shown in https://issues.apache.org/jira/browse/FLINK-38265
When I read Postgres cdc's WAL log, there is no data suddenly.  The log show that the stream split is finished when some connection exception occurs:

5-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer failureorg.postg
resql.util.PSQLException: FATAL: terminating connection due to administrator command at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]2025-08-16 08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed2025-08-16 08:15:18,963 ERROR io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - Unexpected error while closing Postgres connectionorg.postgresql.util.PSQLException: Unable to close connection properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) ~[?:1.8.0_372] at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_372] at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 more


2025-08-16 08:15:35,931 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [stream-split]
2025-08-16 08:15:35,932 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [stream-split]

 The reason is as follows:

  1. When PostgresStreamingChangeEventSource met connection exception, it will put into event queue, rather than throw it outside.
  2. IncrementalSourceStreamFetcher will seen the  streamFetchTask as finished, and will stop it.
  3. When polling data, though there are still data and exception in event queue, we will ignore them and return null.

Thus, we should not stop the task until receiving End Watermark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant