Skip to content

Commit aad2f7b

Browse files
authored
GH-10090: Add AmqpClientInboundGateway (#10447)
* GH-10090: Add `AmqpClientInboundGateway` Related to: #10090 * Add `AmqpClientInboundGateway` that is mostly a copy/paste of the `AmqpClientMessageProducer`, but adds a reply-producing logic * Cover with tests and document this new component * Fix a couple typos in the `amqp-1.0.adoc` * Fix docs typos * Extract the `IntegrationRabbitAmqpMessageListener` as a top-level (package private) class. Serves as a reusable unit of work for both `AmqpClientMessageProducer` & `AmqpClientInboundGateway`. The one-way and request-reply parts are handled as a `BiConsumer` action injection into this `IntegrationRabbitAmqpMessageListener` instance. Now both `AmqpClientMessageProducer` & `AmqpClientInboundGateway` are much simpler. * Improve `AmqpClientMessageProducer.processRequest` Javadoc mentioning that `requestMessage` param is out of use * Be more specific with the type for `UnsupportedOperationException` in the `IntegrationRabbitAmqpMessageListener`
1 parent 0a26563 commit aad2f7b

File tree

5 files changed

+669
-107
lines changed

5 files changed

+669
-107
lines changed
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.inbound;
18+
19+
import java.time.Duration;
20+
import java.util.Arrays;
21+
import java.util.Collection;
22+
23+
import com.rabbitmq.client.amqp.Resource;
24+
import org.aopalliance.aop.Advice;
25+
import org.jspecify.annotations.Nullable;
26+
27+
import org.springframework.amqp.core.Address;
28+
import org.springframework.amqp.core.MessagePostProcessor;
29+
import org.springframework.amqp.core.MessageProperties;
30+
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
31+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
32+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
33+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
34+
import org.springframework.amqp.support.converter.MessageConverter;
35+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
36+
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
37+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
38+
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
39+
import org.springframework.integration.core.Pausable;
40+
import org.springframework.integration.gateway.MessagingGatewaySupport;
41+
import org.springframework.messaging.Message;
42+
import org.springframework.scheduling.TaskScheduler;
43+
import org.springframework.util.Assert;
44+
import org.springframework.util.StringUtils;
45+
46+
/**
47+
* A {@link MessagingGatewaySupport} implementation for AMQP 1.0 client.
48+
* <p>
49+
* Based on the {@link RabbitAmqpListenerContainer} and requires an {@link AmqpConnectionFactory}.
50+
* An internal {@link RabbitAmqpTemplate} is used to send replies.
51+
*
52+
* @author Artem Bilan
53+
*
54+
* @since 7.0
55+
*
56+
* @see RabbitAmqpListenerContainer
57+
* @see RabbitAmqpTemplate
58+
* @see org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter
59+
*/
60+
public class AmqpClientInboundGateway extends MessagingGatewaySupport implements Pausable {
61+
62+
private final RabbitAmqpListenerContainer listenerContainer;
63+
64+
private final RabbitAmqpTemplate replyTemplate;
65+
66+
private @Nullable MessageConverter messageConverter = new SimpleMessageConverter();
67+
68+
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
69+
70+
private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
71+
72+
private @Nullable ReplyPostProcessor replyPostProcessor;
73+
74+
private volatile boolean paused;
75+
76+
public AmqpClientInboundGateway(AmqpConnectionFactory connectionFactory, String... queueNames) {
77+
this.listenerContainer = new RabbitAmqpListenerContainer(connectionFactory);
78+
this.listenerContainer.setQueueNames(queueNames);
79+
this.replyTemplate = new RabbitAmqpTemplate(connectionFactory);
80+
}
81+
82+
public void setInitialCredits(int initialCredits) {
83+
this.listenerContainer.setInitialCredits(initialCredits);
84+
}
85+
86+
public void setPriority(int priority) {
87+
this.listenerContainer.setPriority(priority);
88+
}
89+
90+
public void setStateListeners(Resource.StateListener... stateListeners) {
91+
this.listenerContainer.setStateListeners(stateListeners);
92+
}
93+
94+
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
95+
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
96+
}
97+
98+
@Override
99+
public void setTaskScheduler(TaskScheduler taskScheduler) {
100+
this.listenerContainer.setTaskScheduler(taskScheduler);
101+
}
102+
103+
public void setAdviceChain(Advice... advices) {
104+
this.listenerContainer.setAdviceChain(advices);
105+
}
106+
107+
public void setAutoSettle(boolean autoSettle) {
108+
this.listenerContainer.setAutoSettle(autoSettle);
109+
}
110+
111+
public void setDefaultRequeue(boolean defaultRequeue) {
112+
this.listenerContainer.setDefaultRequeue(defaultRequeue);
113+
}
114+
115+
public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
116+
this.listenerContainer.setGracefulShutdownPeriod(gracefulShutdownPeriod);
117+
}
118+
119+
public void setConsumersPerQueue(int consumersPerQueue) {
120+
this.listenerContainer.setConsumersPerQueue(consumersPerQueue);
121+
}
122+
123+
/**
124+
* Set a {@link MessageConverter} to replace the default {@link SimpleMessageConverter}.
125+
* If set to null, an AMQP message is sent as is into a {@link Message} payload.
126+
* And a reply message has to return an AMQP message as its payload.
127+
* @param messageConverter the {@link MessageConverter} to use or null.
128+
*/
129+
public void setMessageConverter(@Nullable MessageConverter messageConverter) {
130+
this.messageConverter = messageConverter;
131+
}
132+
133+
public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
134+
this.headerMapper = headerMapper;
135+
}
136+
137+
public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
138+
this.replyPostProcessor = replyPostProcessor;
139+
}
140+
141+
/**
142+
* Set a default {@code exchange} for sending replies
143+
* if {@code replyTo} address is not provided in the request message.
144+
* Mutually exclusive with {@link #setReplyQueue(String)}.
145+
* @param exchange the default exchange for sending replies
146+
*/
147+
public void setReplyExchange(String exchange) {
148+
this.replyTemplate.setExchange(exchange);
149+
}
150+
151+
/**
152+
* Set a default {@code routingKey} for sending replies
153+
* if {@code replyTo} address is not provided in the request message.
154+
* Used only if {@link #setReplyExchange(String)} is provided.
155+
* @param routingKey the default routing key for sending replies
156+
*/
157+
public void setReplyRoutingKey(String routingKey) {
158+
this.replyTemplate.setRoutingKey(routingKey);
159+
}
160+
161+
/**
162+
* Set a default {@code queue} for sending replies
163+
* if {@code replyTo} address is not provided in the request message.
164+
* Mutually exclusive with {@link #setReplyExchange(String)}.
165+
* @param queue the default queue for sending replies
166+
*/
167+
public void setReplyQueue(String queue) {
168+
this.replyTemplate.setQueue(queue);
169+
}
170+
171+
@Override
172+
public String getComponentType() {
173+
return "amqp:inbound-gateway";
174+
}
175+
176+
@Override
177+
protected void onInit() {
178+
super.onInit();
179+
this.listenerContainer.setBeanName(getComponentName() + ".listenerContainer");
180+
IntegrationRabbitAmqpMessageListener messageListener =
181+
new IntegrationRabbitAmqpMessageListener(this, this::processRequest, this.headerMapper,
182+
this.messageConverter, this.afterReceivePostProcessors);
183+
this.listenerContainer.setupMessageListener(messageListener);
184+
this.listenerContainer.afterPropertiesSet();
185+
}
186+
187+
@Override
188+
protected void doStart() {
189+
super.doStart();
190+
this.listenerContainer.start();
191+
}
192+
193+
@Override
194+
protected void doStop() {
195+
super.doStop();
196+
this.listenerContainer.stop();
197+
}
198+
199+
@Override
200+
public void destroy() {
201+
super.destroy();
202+
this.listenerContainer.destroy();
203+
this.replyTemplate.destroy();
204+
}
205+
206+
@Override
207+
public void pause() {
208+
this.listenerContainer.pause();
209+
this.paused = true;
210+
}
211+
212+
@Override
213+
public void resume() {
214+
this.listenerContainer.resume();
215+
this.paused = false;
216+
}
217+
218+
@Override
219+
public boolean isPaused() {
220+
return this.paused;
221+
}
222+
223+
/**
224+
* Use as {@link java.util.function.BiConsumer} for the {@link IntegrationRabbitAmqpMessageListener}.
225+
* @param messageToSend the message to produce from this endpoint.
226+
* @param requestMessage the request AMQP message.
227+
*/
228+
private void processRequest(Message<?> messageToSend, org.springframework.amqp.core.Message requestMessage) {
229+
Message<?> receivedMessage = sendAndReceiveMessage(messageToSend);
230+
if (receivedMessage != null) {
231+
org.springframework.amqp.core.Message replyMessage = fromSpringMessage(receivedMessage, requestMessage);
232+
publishReply(requestMessage, replyMessage);
233+
}
234+
else {
235+
this.logger.warn(() -> "No reply received for message: " + requestMessage);
236+
}
237+
}
238+
239+
private org.springframework.amqp.core.Message fromSpringMessage(Message<?> receivedMessage,
240+
org.springframework.amqp.core.Message requestMessage) {
241+
242+
org.springframework.amqp.core.Message replyMessage;
243+
MessageProperties messageProperties = new MessageProperties();
244+
Object payload = receivedMessage.getPayload();
245+
if (payload instanceof org.springframework.amqp.core.Message amqpMessage) {
246+
replyMessage = amqpMessage;
247+
}
248+
else {
249+
Assert.state(this.messageConverter != null,
250+
"If reply payload is not an 'org.springframework.amqp.core.Message', " +
251+
"the 'messageConverter' must be provided.");
252+
253+
replyMessage = this.messageConverter.toMessage(payload, messageProperties);
254+
this.headerMapper.fromHeadersToReply(receivedMessage.getHeaders(),
255+
messageProperties);
256+
}
257+
258+
postProcessResponse(requestMessage, replyMessage);
259+
if (this.replyPostProcessor != null) {
260+
replyMessage = this.replyPostProcessor.apply(requestMessage, replyMessage);
261+
}
262+
263+
return replyMessage;
264+
}
265+
266+
private void publishReply(org.springframework.amqp.core.Message requestMessage,
267+
org.springframework.amqp.core.Message replyMessage) {
268+
269+
Address replyTo = requestMessage.getMessageProperties().getReplyToAddress();
270+
if (replyTo != null) {
271+
String exchangeName = replyTo.getExchangeName();
272+
String routingKey = replyTo.getRoutingKey();
273+
if (StringUtils.hasText(exchangeName)) {
274+
this.replyTemplate.send(exchangeName, routingKey, replyMessage).join();
275+
}
276+
else {
277+
Assert.hasText(routingKey, "A 'replyTo' property must be provided in the requestMessage.");
278+
String queue = routingKey.replaceFirst("queues/", "");
279+
this.replyTemplate.send(queue, replyMessage).join();
280+
}
281+
}
282+
else {
283+
this.replyTemplate.send(replyMessage).join();
284+
}
285+
}
286+
287+
/**
288+
* Post-process the given response message before it will be sent.
289+
* The default implementation sets the response's correlation id to the request message's correlation id, if any;
290+
* otherwise to the request message id.
291+
* @param request the original incoming Rabbit message
292+
* @param response the outgoing Rabbit message about to be sent
293+
*/
294+
private static void postProcessResponse(org.springframework.amqp.core.Message request,
295+
org.springframework.amqp.core.Message response) {
296+
297+
String correlation = request.getMessageProperties().getCorrelationId();
298+
299+
if (correlation == null) {
300+
String messageId = request.getMessageProperties().getMessageId();
301+
if (messageId != null) {
302+
correlation = messageId;
303+
}
304+
}
305+
response.getMessageProperties().setCorrelationId(correlation);
306+
}
307+
308+
}

0 commit comments

Comments
 (0)