Skip to content

Streaming support for JdbcOutboundGateway #10061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

mjd507
Copy link
Contributor

@mjd507 mjd507 commented May 31, 2025

Fixes: #3963

Introduce a queryForStream variable and a streamConsumer function in JdbcOutboundGateway. along with the underlying method JdbcPollingChannelAdapter.doPollForStream.

when produce reply messages, if in stream mode, gateways always receives a empty list, process logic are moved to streamConsumer.

Fixes: spring-projects#3963

Introduce a `queryForStream` variable and a `streamConsumer` function in `JdbcOutboundGateway`. along with the underlying method `JdbcPollingChannelAdapter.doPollForStream`.

when produce reply messages, if in stream mode, gateways always receives a empty list, process logic are moved to `streamConsumer`.

Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
Copy link
Contributor Author

@mjd507 mjd507 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding transaction, I think no impact ?

@@ -53,6 +56,10 @@ public class JdbcOutboundGateway extends AbstractReplyProducingMessageHandler {

private Integer maxRows;

private boolean queryForStream = false;

private Consumer<Object> streamConsumer;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here given Object type for Consumer, user have to do the type cast , which I am not very satisfied with this, because when set RowMapper, they already give return type.
I don't have a good way. I tried to put ?, but looks it can not be applied to Stream<?> .

try (Stream<?> stream = this.poller.doPollForStream(sqlQueryParameterSource)) {
stream.forEach(this.streamConsumer);
}
return Collections.emptyList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the effort, but this is not correct.
This is not a messaging way and this is not a goal of Spring Integration channel adapter.
If we go the of callback instead of sending a message to the channel, then there is no reason in using channel adapters.
Simple JdbcTemplate with its streaming API would do exactly the same without extra messaging layer to introduce for nothing.

Having a Stream as a message payload to produce from the channel adapter it totally fine.
However still all those questions are opened: the Stream has to be closed, not clear how to update entries of this stream to satisfy concurrent polling from the table. At the same time how do transactions behave with these opened (and possible updated) streams.

I general I suggest to close this PR since we are not going the way you suggest.
Sorry about that and thank you again for your effort!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, make sense, thank you for making me think in messaging way, I was wrong from the beginning.

@mjd507 mjd507 closed this Jun 2, 2025
@mjd507 mjd507 deleted the GH-3963 branch June 2, 2025 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

JdbcPollingChannelAdapter Streaming support
2 participants