Skip to content

Commit 6c41b86

Browse files
committed
Revert changes to SemaphoreBackPressureHandler not to change default behavior (awspring#1251)
1 parent 011fde0 commit 6c41b86

File tree

4 files changed

+329
-21
lines changed

4 files changed

+329
-21
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

+4-20
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
3636
import io.awspring.cloud.sqs.listener.source.MessageSource;
3737
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
38-
import java.time.Duration;
3938
import java.util.ArrayList;
4039
import java.util.Collection;
4140
import java.util.List;
@@ -230,25 +229,10 @@ protected BackPressureHandler createBackPressureHandler() {
230229
if (containerOptions.getBackPressureHandlerSupplier() != null) {
231230
return containerOptions.getBackPressureHandlerSupplier().get();
232231
}
233-
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
234-
int batchSize = containerOptions.getMaxMessagesPerPoll();
235-
int maxConcurrentMessages = containerOptions.getMaxConcurrentMessages();
236-
var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
237-
.batchSize(batchSize).totalPermits(maxConcurrentMessages).acquireTimeout(acquireTimeout)
238-
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
239-
if (maxConcurrentMessages == batchSize) {
240-
return concurrencyLimiterBlockingBackPressureHandler;
241-
}
242-
return switch (containerOptions.getBackPressureMode()) {
243-
case FIXED_HIGH_THROUGHPUT -> concurrencyLimiterBlockingBackPressureHandler;
244-
case ALWAYS_POLL_MAX_MESSAGES,
245-
AUTO -> {
246-
var throughputBackPressureHandler = ThroughputBackPressureHandler.builder().batchSize(batchSize).build();
247-
yield new CompositeBackPressureHandler(
248-
List.of(concurrencyLimiterBlockingBackPressureHandler, throughputBackPressureHandler),
249-
batchSize, containerOptions.getStandbyLimitPollingInterval());
250-
}
251-
};
232+
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
233+
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
234+
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
235+
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
252236
}
253237

254238
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

+55-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,61 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
156156
B backPressureMode(BackPressureMode backPressureMode);
157157

158158
/**
159-
* Set the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null}.
159+
* Sets the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null} which
160+
* results in a default {@link SemaphoreBackPressureHandler} to be instantiated. In case a supplier is provided, the
161+
* {@link BackPressureHandler} will be instantiated by the supplier.
162+
* <p>
163+
* <strong>NOTE:</strong> <em>it is important for the supplier to always return a new instance as otherwise it might
164+
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
165+
* containers which is very likely not the desired behavior.</em>
166+
* <p>
167+
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
168+
* <ul>
169+
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
170+
* processed concurrently by the application.</li>
171+
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order
172+
* to reduce SQS pull costs when few messages are coming in.</li>
173+
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and
174+
* ensures they cooperate.</li>
175+
* </ul>
176+
* <p>
177+
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
178+
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
179+
* {@link CompositeBackPressureHandler}.
180+
*
181+
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
182+
*
183+
* <pre>{@code
184+
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
185+
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
186+
* .batchSize(batchSize)
187+
* .totalPermits(maxConcurrentMessages)
188+
* .acquireTimeout(acquireTimeout)
189+
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
190+
* .build()
191+
* }}</pre>
192+
*
193+
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
194+
*
195+
* <pre>{@code
196+
* containerOptionsBuilder.backPressureHandlerSupplier(() -> {
197+
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
198+
* .batchSize(batchSize)
199+
* .totalPermits(maxConcurrentMessages)
200+
* .acquireTimeout(acquireTimeout)
201+
* .throughputConfiguration(BackPressureMode.AUTO)
202+
* .build()
203+
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
204+
* .batchSize(batchSize)
205+
* .build();
206+
* return new CompositeBackPressureHandler(List.of(
207+
* concurrencyLimiterBlockingBackPressureHandler,
208+
* throughputBackPressureHandler
209+
* ),
210+
* batchSize,
211+
* standbyLimitPollingInterval
212+
* );
213+
* }}</pre>
160214
*
161215
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
162216
* @return this instance.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* {@link BackPressureHandler} implementation that uses a {@link Semaphore} for handling backpressure.
29+
*
30+
* @author Tomaz Fernandes
31+
* @since 3.0
32+
* @see io.awspring.cloud.sqs.listener.source.PollingMessageSource
33+
*/
34+
public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
35+
36+
private static final Logger logger = LoggerFactory.getLogger(SemaphoreBackPressureHandler.class);
37+
38+
private final Semaphore semaphore;
39+
40+
private final int batchSize;
41+
42+
private final int totalPermits;
43+
44+
private final Duration acquireTimeout;
45+
46+
private final BackPressureMode backPressureConfiguration;
47+
48+
private volatile CurrentThroughputMode currentThroughputMode;
49+
50+
private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
51+
52+
private String id;
53+
54+
private SemaphoreBackPressureHandler(Builder builder) {
55+
this.batchSize = builder.batchSize;
56+
this.totalPermits = builder.totalPermits;
57+
this.acquireTimeout = builder.acquireTimeout;
58+
this.backPressureConfiguration = builder.backPressureMode;
59+
this.semaphore = new Semaphore(totalPermits);
60+
this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration)
61+
? CurrentThroughputMode.HIGH
62+
: CurrentThroughputMode.LOW;
63+
logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits",
64+
backPressureConfiguration, totalPermits);
65+
}
66+
67+
public static Builder builder() {
68+
return new Builder();
69+
}
70+
71+
@Override
72+
public void setId(String id) {
73+
this.id = id;
74+
}
75+
76+
@Override
77+
public String getId() {
78+
return this.id;
79+
}
80+
81+
@Override
82+
public int request(int amount) throws InterruptedException {
83+
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
84+
}
85+
86+
// @formatter:off
87+
@Override
88+
public int requestBatch() throws InterruptedException {
89+
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
90+
? requestInLowThroughputMode()
91+
: requestInHighThroughputMode();
92+
}
93+
94+
private int requestInHighThroughputMode() throws InterruptedException {
95+
return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
96+
? this.batchSize
97+
: tryAcquirePartial();
98+
}
99+
// @formatter:on
100+
101+
private int tryAcquirePartial() throws InterruptedException {
102+
int availablePermits = this.semaphore.availablePermits();
103+
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
104+
return 0;
105+
}
106+
int permitsToRequest = Math.min(availablePermits, this.batchSize);
107+
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
108+
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
109+
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
110+
boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow);
111+
return hasAcquiredPartial ? permitsToRequest : 0;
112+
}
113+
114+
private int requestInLowThroughputMode() throws InterruptedException {
115+
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
116+
// so no actual concurrency
117+
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
118+
this.semaphore.availablePermits());
119+
boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW);
120+
if (hasAcquired) {
121+
logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
122+
// We've acquired all permits - there's no other process currently processing messages
123+
if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
124+
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
125+
this.semaphore.availablePermits());
126+
}
127+
return this.batchSize;
128+
}
129+
else {
130+
return 0;
131+
}
132+
}
133+
134+
private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException {
135+
logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode);
136+
boolean hasAcquired = this.semaphore.tryAcquire(amount, this.acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
137+
if (hasAcquired) {
138+
logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id,
139+
currentThroughputModeNow, this.semaphore.availablePermits());
140+
}
141+
else {
142+
logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}", amount,
143+
this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow,
144+
this.semaphore.availablePermits());
145+
}
146+
return hasAcquired;
147+
}
148+
149+
@Override
150+
public void releaseBatch() {
151+
maybeSwitchToLowThroughputMode();
152+
int permitsToRelease = getPermitsToRelease(this.batchSize);
153+
this.semaphore.release(permitsToRelease);
154+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
155+
this.semaphore.availablePermits());
156+
}
157+
158+
@Override
159+
public int getBatchSize() {
160+
return this.batchSize;
161+
}
162+
163+
private void maybeSwitchToLowThroughputMode() {
164+
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
165+
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
166+
logger.debug("Entire batch of permits released for {}, setting TM LOW. Permits left: {}", this.id,
167+
this.semaphore.availablePermits());
168+
this.currentThroughputMode = CurrentThroughputMode.LOW;
169+
}
170+
}
171+
172+
@Override
173+
public void release(int amount) {
174+
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
175+
this.semaphore.availablePermits());
176+
maybeSwitchToHighThroughputMode(amount);
177+
int permitsToRelease = getPermitsToRelease(amount);
178+
this.semaphore.release(permitsToRelease);
179+
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
180+
this.semaphore.availablePermits());
181+
}
182+
183+
@Override
184+
public void release(int amount, ReleaseReason reason) {
185+
if (amount == this.batchSize && reason == ReleaseReason.NONE_FETCHED) {
186+
releaseBatch();
187+
}
188+
else {
189+
release(amount);
190+
}
191+
}
192+
193+
private int getPermitsToRelease(int amount) {
194+
return this.hasAcquiredFullPermits.compareAndSet(true, false)
195+
// The first process that gets here should release all permits except for inflight messages
196+
// We can have only one batch of messages at this point since we have all permits
197+
? this.totalPermits - (this.batchSize - amount)
198+
: amount;
199+
}
200+
201+
private void maybeSwitchToHighThroughputMode(int amount) {
202+
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
203+
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
204+
this.semaphore.availablePermits());
205+
this.currentThroughputMode = CurrentThroughputMode.HIGH;
206+
}
207+
}
208+
209+
@Override
210+
public boolean drain(Duration timeout) {
211+
logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(),
212+
this.totalPermits - this.semaphore.availablePermits(), this.id);
213+
try {
214+
return this.semaphore.tryAcquire(this.totalPermits, (int) timeout.getSeconds(), TimeUnit.SECONDS);
215+
}
216+
catch (InterruptedException e) {
217+
Thread.currentThread().interrupt();
218+
throw new IllegalStateException("Interrupted while waiting to acquire permits", e);
219+
}
220+
}
221+
222+
private enum CurrentThroughputMode {
223+
224+
HIGH,
225+
226+
LOW;
227+
228+
}
229+
230+
public static class Builder {
231+
232+
private int batchSize;
233+
234+
private int totalPermits;
235+
236+
private Duration acquireTimeout;
237+
238+
private BackPressureMode backPressureMode;
239+
240+
public Builder batchSize(int batchSize) {
241+
this.batchSize = batchSize;
242+
return this;
243+
}
244+
245+
public Builder totalPermits(int totalPermits) {
246+
this.totalPermits = totalPermits;
247+
return this;
248+
}
249+
250+
public Builder acquireTimeout(Duration acquireTimeout) {
251+
this.acquireTimeout = acquireTimeout;
252+
return this;
253+
}
254+
255+
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
256+
this.backPressureMode = backPressureConfiguration;
257+
return this;
258+
}
259+
260+
public SemaphoreBackPressureHandler build() {
261+
Assert.noNullElements(
262+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
263+
"Missing configuration");
264+
return new SemaphoreBackPressureHandler(this);
265+
}
266+
267+
}
268+
269+
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ else if (hasMadeSecondPoll.compareAndSet(false, true)) {
166166
}
167167
catch (Throwable t) {
168168
logger.error("Error (not expecting it)", t);
169+
errors.add(t);
169170
throw new RuntimeException(t);
170171
}
171172
}, threadPool).whenComplete((v, t) -> {

0 commit comments

Comments
 (0)