Skip to content

Commit 56c7d57

Browse files
artembilancppwfs
authored andcommitted
GH-3189: Properly handle async in ScatterGatherHandler (#10469)
* GH-3189: Properly handle `async` in `ScatterGatherHandler` Fixes: #3189 Despite supporting `async = true`, the `ScatterGatherHandler` does blocking in its `handleRequestMessage()` on the `gatherResultChannel.receive()` call. * Fix `ScatterGatherHandler` to handle an `async` mode via internal `Mono` for the reply object * Use pattern variable expressions for `ifs` in the `ScatterGatherHandler.doInit()` for the better readability * Extract `ScatterGatherHandler.replyFromGatherResult()` method to avoid code duplication * Document a new (fixed) functionality * Mention `Mono` reply in the `ScatterGatherHandler` Javadocs **Auto-cherry-pick to `6.5.x`**
1 parent ef69d25 commit 56c7d57

File tree

3 files changed

+64
-20
lines changed

3 files changed

+64
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
package org.springframework.integration.scattergather;
1818

19+
import java.time.Duration;
20+
21+
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.Sinks;
23+
1924
import org.springframework.aop.support.AopUtils;
2025
import org.springframework.beans.factory.BeanFactory;
2126
import org.springframework.beans.factory.BeanInitializationException;
@@ -30,6 +35,7 @@
3035
import org.springframework.integration.endpoint.PollingConsumer;
3136
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
3237
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
38+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3339
import org.springframework.integration.support.management.ManageableLifecycle;
3440
import org.springframework.messaging.Message;
3541
import org.springframework.messaging.MessageChannel;
@@ -46,6 +52,9 @@
4652
/**
4753
* The {@link MessageHandler} implementation for the
4854
* <a href="https://www.enterpriseintegrationpatterns.com/BroadcastAggregate.html">Scatter-Gather</a> EIP pattern.
55+
* <p>
56+
* When {@link #setAsync(boolean)} is {@code true}, the {@link ScatterGatherHandler} produces
57+
* a {@link Mono} as a reply based on the gather result.
4958
*
5059
* @author Artem Bilan
5160
* @author Abdul Zaheer
@@ -142,11 +151,11 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
142151
}
143152

144153
});
145-
if (this.gatherChannel instanceof SubscribableChannel) {
146-
this.gatherEndpoint = new EventDrivenConsumer((SubscribableChannel) this.gatherChannel, this.gatherer);
154+
if (this.gatherChannel instanceof SubscribableChannel subscribableChannel) {
155+
this.gatherEndpoint = new EventDrivenConsumer(subscribableChannel, this.gatherer);
147156
}
148-
else if (this.gatherChannel instanceof PollableChannel) {
149-
this.gatherEndpoint = new PollingConsumer((PollableChannel) this.gatherChannel, this.gatherer);
157+
else if (this.gatherChannel instanceof PollableChannel pollableChannel) {
158+
this.gatherEndpoint = new PollingConsumer(pollableChannel, this.gatherer);
150159
((PollingConsumer) this.gatherEndpoint).setReceiveTimeout(this.gatherTimeout);
151160
}
152161
else if (this.gatherChannel instanceof ReactiveStreamsSubscribableChannel) {
@@ -187,7 +196,18 @@ private Message<?> enhanceScatterReplyMessage(Message<?> message) {
187196
@Override
188197
protected Object handleRequestMessage(Message<?> requestMessage) {
189198
MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
190-
PollableChannel gatherResultChannel = new QueueChannel();
199+
boolean async = isAsync();
200+
MessageChannel gatherResultChannel;
201+
Sinks.One<Message<?>> replyMono;
202+
203+
if (async) {
204+
replyMono = Sinks.one();
205+
gatherResultChannel = (message, timeout) -> replyMono.tryEmitValue(message).isSuccess();
206+
}
207+
else {
208+
replyMono = null;
209+
gatherResultChannel = new QueueChannel();
210+
}
191211

192212
Message<?> scatterMessage =
193213
getMessageBuilderFactory()
@@ -200,17 +220,28 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
200220

201221
this.messagingTemplate.send(this.scatterChannel, scatterMessage);
202222

203-
Message<?> gatherResult = gatherResultChannel.receive(this.gatherTimeout);
204-
if (gatherResult != null) {
205-
return getMessageBuilderFactory()
206-
.fromMessage(gatherResult)
207-
.removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL,
208-
MessageHeaders.REPLY_CHANNEL, MessageHeaders.ERROR_CHANNEL);
223+
if (replyMono != null) {
224+
return replyMono.asMono()
225+
.map(this::replyFromGatherResult)
226+
.timeout(Duration.ofMillis(this.gatherTimeout), Mono.empty());
227+
}
228+
else {
229+
Message<?> gatherResult = ((PollableChannel) gatherResultChannel).receive(this.gatherTimeout);
230+
if (gatherResult != null) {
231+
return replyFromGatherResult(gatherResult);
232+
}
209233
}
210234

211235
return null;
212236
}
213237

238+
private AbstractIntegrationMessageBuilder<?> replyFromGatherResult(Message<?> gatherResult) {
239+
return getMessageBuilderFactory()
240+
.fromMessage(gatherResult)
241+
.removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL,
242+
MessageHeaders.REPLY_CHANNEL, MessageHeaders.ERROR_CHANNEL);
243+
}
244+
214245
@Override
215246
public void start() {
216247
if (this.gatherEndpoint != null) {
@@ -236,8 +267,8 @@ private static void checkClass(Class<?> gathererClass, String className, String
236267
Assert.isAssignable(clazz, gathererClass,
237268
() -> "the '" + type + "' must be an " + className + " " + "instance");
238269
}
239-
catch (ClassNotFoundException e) {
240-
throw new IllegalStateException("The class for '" + className + "' cannot be loaded", e);
270+
catch (ClassNotFoundException ex) {
271+
throw new IllegalStateException("The class for '" + className + "' cannot be loaded", ex);
241272
}
242273
}
243274

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl.routers;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -463,15 +464,21 @@ public void testRouterAsNonLastComponent() {
463464
@Test
464465
public void testScatterGather() {
465466
QueueChannel replyChannel = new QueueChannel();
466-
Message<String> request = MessageBuilder.withPayload("foo")
467+
Message<String> request = MessageBuilder.withPayload("test")
467468
.setReplyChannel(replyChannel)
468469
.build();
469470
this.scatterGatherFlowInput.send(request);
470471
Message<?> bestQuoteMessage = replyChannel.receive(10000);
471-
assertThat(bestQuoteMessage).isNotNull();
472-
Object payload = bestQuoteMessage.getPayload();
473-
assertThat(payload).isInstanceOf(List.class);
474-
assertThat(((List<?>) payload).size()).isGreaterThanOrEqualTo(1);
472+
assertThat(bestQuoteMessage)
473+
.extracting(Message::getPayload)
474+
.asInstanceOf(InstanceOfAssertFactories.LIST)
475+
.hasSizeGreaterThanOrEqualTo(1)
476+
.first()
477+
.asInstanceOf(InstanceOfAssertFactories.type(Message.class))
478+
.extracting(Message::getHeaders)
479+
.asInstanceOf(InstanceOfAssertFactories.MAP)
480+
.extractingByKey("gatherResultChannel")
481+
.isNotInstanceOf(PollableChannel.class);
475482
}
476483

477484
@Autowired
@@ -859,9 +866,11 @@ public IntegrationFlow scatterGatherFlow() {
859866
group.size() == 3 ||
860867
group.getMessages()
861868
.stream()
862-
.anyMatch(m -> (Double) m.getPayload() > 5)),
869+
.anyMatch(m -> (Double) m.getPayload() > 5))
870+
.outputProcessor(group -> new ArrayList<>(group.getMessages())),
863871
scatterGather -> scatterGather
864-
.gatherTimeout(10_000));
872+
.gatherTimeout(10_000)
873+
.async(true));
865874
}
866875

867876
@Bean

src/reference/antora/modules/ROOT/pages/scatter-gather.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ Mutually exclusive with `scatter-channel` attribute.
154154
<13> The `<aggregator>` options.
155155
Required.
156156

157+
NOTE: Starting with version `6.5.3`, when a `ScatterGatherHandler` is configured for the `async = true` option, the request message handling thread is not blocked anymore waiting for a gather result on an internal `((PollableChannel) gatherResultChannel).receive(this.gatherTimeout)` operation.
158+
Instead, a `reactor.core.publisher.Mono` is returned as a reply object based on a gather result eventually produced from the `gatherResultChannel`.
159+
Such a `Mono` is handled then according to the xref:reactive-streams.adoc#reactive-reply-payload[Reactive Streams support] in the framework.
160+
157161
[[scatter-gather-error-handling]]
158162
== Error Handling
159163

0 commit comments

Comments
 (0)