Skip to content

Commit 4296756

Browse files
committed
Add support for multi-method listeners
1 parent 5fa23bc commit 4296756

15 files changed

+782
-8
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java

+43-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.HashSet;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.Set;
3839
import java.util.concurrent.atomic.AtomicInteger;
3940
import java.util.stream.Collectors;
4041
import java.util.stream.Stream;
@@ -120,16 +121,33 @@ protected void detectAnnotationsAndRegisterEndpoints(Object bean, Class<?> targe
120121
Map<Method, A> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
121122
(MethodIntrospector.MetadataLookup<A>) method -> AnnotatedElementUtils.findMergedAnnotation(method,
122123
getAnnotationClass()));
123-
if (annotatedMethods.isEmpty()) {
124+
125+
A classListener = AnnotatedElementUtils.findMergedAnnotation(targetClass, getAnnotationClass());
126+
boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
127+
boolean hasClassLevelListeners = classListener != null;
128+
129+
if (!hasMethodLevelListeners && !hasClassLevelListeners) {
124130
this.nonAnnotatedClasses.add(targetClass);
125131
}
126-
annotatedMethods.entrySet().stream()
127-
.map(entry -> createAndConfigureEndpoint(bean, entry.getKey(), entry.getValue()))
128-
.forEach(this.endpointRegistrar::registerEndpoint);
132+
else {
133+
if (hasMethodLevelListeners) {
134+
annotatedMethods.entrySet().stream()
135+
.map(entry -> createAndConfigureEndpoint(bean, entry.getKey(), entry.getValue()))
136+
.forEach(this.endpointRegistrar::registerEndpoint);
137+
}
138+
139+
if (hasClassLevelListeners) {
140+
Set<Method> handlerMethods = getHandlerMethods(targetClass);
141+
createAndConfigureMultiMethodEndpoint(bean, targetClass, classListener,
142+
new ArrayList<>(handlerMethods));
143+
}
144+
}
129145
}
130146

131147
protected abstract Class<A> getAnnotationClass();
132148

149+
protected abstract Set<Method> getHandlerMethods(Class<?> targetClass);
150+
133151
private Endpoint createAndConfigureEndpoint(Object bean, Method method, A annotation) {
134152
Endpoint endpoint = createEndpoint(annotation);
135153
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, HandlerMethodEndpoint.class, hme -> {
@@ -140,8 +158,29 @@ private Endpoint createAndConfigureEndpoint(Object bean, Method method, A annota
140158
return endpoint;
141159
}
142160

161+
private void createAndConfigureMultiMethodEndpoint(Object bean, Class<?> targetClass, A classListener,
162+
List<Method> handlerMethods) {
163+
Method defaultMethod = getDefaultHandlerMethod(targetClass, handlerMethods);
164+
165+
Endpoint endpoint = createMultiMethodEndpoint(classListener, handlerMethods, defaultMethod, bean);
166+
for (Method method : handlerMethods) {
167+
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, HandlerMethodEndpoint.class, hme -> {
168+
hme.setBean(bean);
169+
hme.setMethod(method);
170+
hme.setHandlerMethodFactory(this.delegatingHandlerMethodFactory);
171+
});
172+
}
173+
174+
this.endpointRegistrar.registerEndpoint(endpoint);
175+
}
176+
177+
protected abstract Method getDefaultHandlerMethod(Class<?> targetClass, List<Method> handlerMethods);
178+
143179
protected abstract Endpoint createEndpoint(A sqsListenerAnnotation);
144180

181+
protected abstract Endpoint createMultiMethodEndpoint(A sqsListenerAnnotation, List<Method> methods,
182+
@Nullable Method defaultMethod, Object bean);
183+
145184
protected Collection<String> resolveEndpointNames(String[] endpointNames) {
146185
return Arrays.stream(endpointNames).map(this::resolveExpression)
147186
.flatMap(resolvedName -> resolveAsStrings(resolvedName).stream()).collect(Collectors.toList());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.annotation;
17+
18+
import java.lang.annotation.Documented;
19+
import java.lang.annotation.ElementType;
20+
import java.lang.annotation.Retention;
21+
import java.lang.annotation.RetentionPolicy;
22+
import java.lang.annotation.Target;
23+
import org.springframework.messaging.handler.annotation.MessageMapping;
24+
25+
/**
26+
* Annotation that masks a method to be the target of a SQS message. listener within a class that is annotated with
27+
* {@link SqsListener}.
28+
* <p>
29+
* The method selection depends on the payload type.
30+
* <p>
31+
* There must be exactly one method per payload type.
32+
*/
33+
@Target(ElementType.METHOD)
34+
@Retention(RetentionPolicy.RUNTIME)
35+
@Documented
36+
@MessageMapping
37+
public @interface SqsHandler {
38+
39+
/**
40+
* When true, designate this as the default fallback method if the payload type matches no other {@link SqsHandler}
41+
* method. Only one method can be designated as the default.
42+
*
43+
* @return true if this is the default method
44+
*/
45+
boolean isDefault() default false;
46+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
* @author Joao Calassio
7979
* @since 1.1
8080
*/
81-
@Target(ElementType.METHOD)
81+
@Target({ ElementType.METHOD, ElementType.TYPE })
8282
@Retention(RetentionPolicy.RUNTIME)
8383
@Documented
8484
@MessageMapping

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java

+40
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919
import io.awspring.cloud.sqs.config.Endpoint;
20+
import io.awspring.cloud.sqs.config.MultiMethodSqsEndpoint;
2021
import io.awspring.cloud.sqs.config.SqsBeanNames;
2122
import io.awspring.cloud.sqs.config.SqsEndpoint;
2223
import io.awspring.cloud.sqs.listener.SqsHeaders;
@@ -26,12 +27,18 @@
2627
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
2728
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
2829
import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
30+
import java.lang.reflect.Method;
2931
import java.util.ArrayList;
3032
import java.util.Arrays;
3133
import java.util.Collection;
3234
import java.util.List;
35+
import java.util.Set;
36+
import org.springframework.core.MethodIntrospector;
37+
import org.springframework.core.annotation.AnnotationUtils;
38+
import org.springframework.lang.Nullable;
3339
import org.springframework.messaging.converter.MessageConverter;
3440
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
41+
import org.springframework.util.ReflectionUtils;
3542

3643
/**
3744
* {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}.
@@ -62,6 +69,22 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
6269
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
6370
}
6471

72+
@Override
73+
protected Endpoint createMultiMethodEndpoint(SqsListener sqsListenerAnnotation, List<Method> methods,
74+
@Nullable Method defaultMethod, Object bean) {
75+
return MultiMethodSqsEndpoint.builder()
76+
.factoryBeanName(resolveAsString(sqsListenerAnnotation.factory(), "factory"))
77+
.queueNames(resolveEndpointNames(sqsListenerAnnotation.value())).bean(bean).methods(methods)
78+
.defaultMethod(defaultMethod).id(getEndpointId(sqsListenerAnnotation.id()))
79+
.sqsEndpoint(createEndpoint(sqsListenerAnnotation)).build();
80+
}
81+
82+
@Override
83+
protected Set<Method> getHandlerMethods(Class<?> targetClass) {
84+
return MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils
85+
.findAnnotation(method, SqsHandler.class) != null);
86+
}
87+
6588
@Override
6689
protected String getGeneratedIdPrefix() {
6790
return GENERATED_ID_PREFIX;
@@ -90,4 +113,21 @@ protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentReso
90113
return argumentResolvers;
91114
}
92115

116+
@Override
117+
protected Method getDefaultHandlerMethod(Class<?> targetClass, List<Method> handlerMethods) {
118+
Method defaultMethod = null;
119+
for (Method method : handlerMethods) {
120+
SqsHandler annotation = method.getAnnotation(SqsHandler.class);
121+
if (annotation.isDefault()) {
122+
if (defaultMethod != null) {
123+
throw new IllegalArgumentException(
124+
"There is more than one default method for the same listener in class: " + targetClass);
125+
}
126+
else {
127+
defaultMethod = method;
128+
}
129+
}
130+
}
131+
return defaultMethod;
132+
}
93133
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java

+13
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ public void setBean(Object bean) {
9090
this.bean = bean;
9191
}
9292

93+
/**
94+
* Get the bean instance to be used when handling a message for this endpoint.
95+
* @return the bean instance.
96+
*/
97+
public Object getBean() {
98+
return this.bean;
99+
}
100+
93101
/**
94102
* Set the method to be used when handling a message for this endpoint.
95103
* @param method the method.
@@ -109,6 +117,11 @@ public void setHandlerMethodFactory(MessageHandlerMethodFactory handlerMethodFac
109117
this.handlerMethodFactory = handlerMethodFactory;
110118
}
111119

120+
@Nullable
121+
public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
122+
return this.handlerMethodFactory;
123+
}
124+
112125
@Override
113126
public void configureListenerMode(Consumer<ListenerMode> consumer) {
114127
List<MethodParameter> parameters = getMethodParameters();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.config;
17+
18+
import io.awspring.cloud.sqs.annotation.SqsHandler;
19+
import io.awspring.cloud.sqs.listener.MessageListener;
20+
import io.awspring.cloud.sqs.listener.MessageListenerContainer;
21+
import io.awspring.cloud.sqs.listener.adapter.DelegatingInvocableHandler;
22+
import io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter;
23+
import java.lang.reflect.Method;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.List;
27+
import org.springframework.lang.Nullable;
28+
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
29+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
30+
31+
/**
32+
* A {@link SqsEndpoint} extension for multiple methods using {@link SqsHandler}
33+
*
34+
* @see SqsHandler
35+
*/
36+
public class MultiMethodSqsEndpoint extends AbstractEndpoint {
37+
38+
private Endpoint endpoint;
39+
40+
private List<Method> methods;
41+
42+
private @Nullable Method defaultMethod;
43+
44+
protected MultiMethodSqsEndpoint(MultiMethodSqsEndpointBuilder builder) {
45+
super(builder.queueNames, builder.factoryName, builder.id);
46+
this.methods = builder.methods;
47+
this.defaultMethod = builder.defaultMethod;
48+
this.endpoint = builder.endpoint;
49+
this.setBean(builder.bean);
50+
}
51+
52+
public static MultiMethodSqsEndpointBuilder builder() {
53+
return new MultiMethodSqsEndpointBuilder();
54+
}
55+
56+
public Endpoint getEndpoint() {
57+
return endpoint;
58+
}
59+
60+
public static class MultiMethodSqsEndpointBuilder {
61+
62+
private List<Method> methods;
63+
64+
private @Nullable Method defaultMethod;
65+
66+
private Object bean;
67+
68+
private String id;
69+
70+
private Collection<String> queueNames;
71+
72+
private String factoryName;
73+
74+
private Endpoint endpoint;
75+
76+
public MultiMethodSqsEndpointBuilder methods(List<Method> methods) {
77+
this.methods = methods;
78+
return this;
79+
}
80+
81+
public MultiMethodSqsEndpointBuilder defaultMethod(@Nullable Method defaultMethod) {
82+
this.defaultMethod = defaultMethod;
83+
return this;
84+
}
85+
86+
public MultiMethodSqsEndpointBuilder bean(Object bean) {
87+
this.bean = bean;
88+
return this;
89+
}
90+
91+
public MultiMethodSqsEndpointBuilder queueNames(Collection<String> queueNames) {
92+
this.queueNames = queueNames;
93+
return this;
94+
}
95+
96+
public MultiMethodSqsEndpointBuilder factoryBeanName(String factoryName) {
97+
this.factoryName = factoryName;
98+
return this;
99+
}
100+
101+
public MultiMethodSqsEndpointBuilder sqsEndpoint(Endpoint sqsEndpoint) {
102+
this.endpoint = sqsEndpoint;
103+
return this;
104+
}
105+
106+
public MultiMethodSqsEndpointBuilder id(String id) {
107+
this.id = id;
108+
return this;
109+
}
110+
111+
public MultiMethodSqsEndpoint build() {
112+
return new MultiMethodSqsEndpoint(this);
113+
}
114+
}
115+
116+
public List<Method> getMethods() {
117+
return methods;
118+
}
119+
120+
public void setMethods(List<Method> methods) {
121+
this.methods = methods;
122+
}
123+
124+
@Override
125+
public <T> void setupContainer(MessageListenerContainer<T> container) {
126+
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>();
127+
InvocableHandlerMethod defaultHandler = null;
128+
129+
for (Method method : methods) {
130+
MessageHandlerMethodFactory messageHandlerMethodFactory = getMessageHandlerMethodFactory();
131+
if (messageHandlerMethodFactory != null) {
132+
InvocableHandlerMethod invocableHandlerMethod = messageHandlerMethodFactory
133+
.createInvocableHandlerMethod(getBean(), method);
134+
invocableHandlerMethods.add(invocableHandlerMethod);
135+
if (method.equals(defaultMethod)) {
136+
defaultHandler = invocableHandlerMethod;
137+
}
138+
}
139+
}
140+
141+
DelegatingInvocableHandler delegatingInvocableHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
142+
defaultHandler);
143+
144+
container.setMessageListener(createMessageListenerInstance(delegatingInvocableHandler));
145+
}
146+
147+
protected <T> MessageListener<T> createMessageListenerInstance(DelegatingInvocableHandler delegatingHandler) {
148+
return new MessagingMessageListenerAdapter<>(delegatingHandler);
149+
}
150+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java

+9
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,15 @@ protected SqsAsyncClient getSqsAsyncClientInstance() {
156156
protected void configureContainerOptions(Endpoint endpoint, SqsContainerOptionsBuilder options) {
157157
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, SqsEndpoint.class,
158158
sqsEndpoint -> configureFromSqsEndpoint(sqsEndpoint, options));
159+
160+
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, MultiMethodSqsEndpoint.class,
161+
sqsEndpoint -> configureFromMultiMethodSqsEndpoint(sqsEndpoint, options));
162+
}
163+
164+
private void configureFromMultiMethodSqsEndpoint(MultiMethodSqsEndpoint sqsEndpoint,
165+
SqsContainerOptionsBuilder options) {
166+
ConfigUtils.INSTANCE.acceptIfInstance(sqsEndpoint.getEndpoint(), SqsEndpoint.class,
167+
endpoint -> configureFromSqsEndpoint(endpoint, options));
159168
}
160169

161170
private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptionsBuilder options) {

0 commit comments

Comments
 (0)