Skip to content

Commit 1e806ae

Browse files
refs #151: Makes sure message broker executor service is shutdown. (#152)
* refs #151: Makes sure message broker executor service is shutdown. Also cleans up the logs for the CoreMessageListenerContainer to be more consistent, as well as providing debug logs for better debugging for each stage of the shutdown.
1 parent 961b289 commit 1e806ae

File tree

4 files changed

+124
-46
lines changed

4 files changed

+124
-46
lines changed

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/container/CoreMessageListenerContainer.java

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -161,18 +161,30 @@ void runContainer() {
161161
final Queue<Message> extraMessages = new LinkedList<>();
162162

163163
final BlockingRunnable shutdownMessageRetriever = startupMessageRetriever(messageRetriever, extraMessages::addAll);
164-
164+
log.info("Container '{}' is beginning to process messages", identifier);
165165
processMessagesFromRetriever(messageBroker, messageRetriever, messageProcessor, messageResolver,
166166
messageBrokerExecutorService, messageProcessingExecutorService);
167167
log.info("Container '{}' is being shutdown", identifier);
168+
log.debug("Container '{}' is shutting down MessageRetriever", identifier);
168169
shutdownMessageRetriever.run();
169-
processExtraMessages(messageBroker, messageProcessor, messageResolver, messageBrokerExecutorService,
170-
messageProcessingExecutorService, extraMessages);
170+
log.debug("Container '{}' has stopped the MessageRetriever", identifier);
171+
if (!extraMessages.isEmpty() && shouldProcessAnyExtraRetrievedMessagesOnShutdown()) {
172+
log.info("Container '{}' is processing {} extra messages before shutdown", identifier, extraMessages.size());
173+
processExtraMessages(messageBroker, messageProcessor, messageResolver, messageBrokerExecutorService,
174+
messageProcessingExecutorService, extraMessages);
175+
}
176+
log.debug("Container '{}' is shutting down MessageProcessor threads", identifier);
171177
shutdownMessageProcessingThreads(messageProcessingExecutorService);
178+
log.debug("Container '{}' has shutdown the MessageProcessor threads", identifier);
179+
log.debug("Container '{}' is shutting down MessageResolver", identifier);
172180
shutdownMessageResolver.run();
181+
log.debug("Container '{}' has shutdown the MessageResolver", identifier);
182+
log.debug("Container '{}' is shutting down MessageBroker", identifier);
183+
shutdownMessageBroker(messageBrokerExecutorService);
184+
log.debug("Container '{}' has shutdown the MessageBroker", identifier);
173185
log.info("Container '{}' has stopped", identifier);
174186
} catch (final InterruptedException interruptedException) {
175-
log.error("Container '{}' was interrupted during the shutdown process. Doing a forceful shutdown that may eventually complete", identifier);
187+
log.error("Container '{}' was interrupted during the shutdown process.", identifier);
176188
} catch (RuntimeException runtimeException) {
177189
log.error("Unexpected error trying to start/stop the container", runtimeException);
178190
}
@@ -199,7 +211,6 @@ private void processMessagesFromRetriever(final MessageBroker messageBroker,
199211
final MessageResolver messageResolver,
200212
final ExecutorService brokerExecutorService,
201213
final ExecutorService messageProcessingExecutorService) throws InterruptedException {
202-
log.info("Container '{}' is beginning to process messages", identifier);
203214
try {
204215
runUntilInterruption(brokerExecutorService, () -> messageBroker.processMessages(
205216
messageProcessingExecutorService,
@@ -227,18 +238,15 @@ private void processExtraMessages(final MessageBroker messageBroker,
227238
final ExecutorService messageBrokerExecutorService,
228239
final ExecutorService executorService,
229240
final Queue<Message> messages) throws InterruptedException {
230-
if (!messages.isEmpty() && shouldProcessAnyExtraRetrievedMessagesOnShutdown()) {
231-
log.debug("Container '{}' is processing {} extra messages before shutdown", identifier, messages.size());
232-
try {
233-
runUntilInterruption(messageBrokerExecutorService, () -> messageBroker.processMessages(
234-
executorService,
235-
() -> !messages.isEmpty(),
236-
() -> CompletableFuture.completedFuture(messages.poll()),
237-
message -> messageProcessor.processMessage(message, () -> messageResolver.resolveMessage(message))
238-
));
239-
} catch (final ExecutionException executionException) {
240-
log.error("Exception thrown processing extra messages", executionException.getCause());
241-
}
241+
try {
242+
runUntilInterruption(messageBrokerExecutorService, () -> messageBroker.processMessages(
243+
executorService,
244+
() -> !messages.isEmpty(),
245+
() -> CompletableFuture.completedFuture(messages.poll()),
246+
message -> messageProcessor.processMessage(message, () -> messageResolver.resolveMessage(message))
247+
));
248+
} catch (final ExecutionException executionException) {
249+
log.error("Exception thrown processing extra messages", executionException.getCause());
242250
}
243251
}
244252

@@ -281,15 +289,35 @@ private void runUntilInterruption(final ExecutorService executorService, final B
281289
* @throws InterruptedException if the thread was interrupted during this process
282290
*/
283291
private void shutdownMessageProcessingThreads(final ExecutorService executorService) throws InterruptedException {
284-
log.debug("Container '{}' is waiting for all message processing threads to finish...", identifier);
285292
if (shouldInterruptMessageProcessingThreadsOnShutdown()) {
293+
log.debug("Container '{}' is interrupting and then waiting for all message processing threads to finish", identifier);
286294
executorService.shutdownNow();
287-
log.debug("Container '{}' interrupted the message processing threads", identifier);
288295
} else {
296+
log.debug("Container '{}' is waiting for all message processing threads to finish", identifier);
289297
executorService.shutdown();
290298
}
291299

292-
executorService.awaitTermination(getMessageProcessingShutdownTimeoutInSeconds(), SECONDS);
300+
final int shutdownTimeoutInSeconds = getMessageProcessingShutdownTimeoutInSeconds();
301+
final boolean messageProcessingTerminated = executorService.awaitTermination(shutdownTimeoutInSeconds, SECONDS);
302+
if (!messageProcessingTerminated) {
303+
log.error("Container '{}' did not shutdown MessageProcessor threads within {} seconds", identifier, shutdownTimeoutInSeconds);
304+
}
305+
}
306+
307+
/**
308+
* Stop the message broker thread and wait for the configured amount of time to complete.
309+
*
310+
* @param executorService the executor service for the message broker
311+
* @throws InterruptedException if the thread was interrupted while waiting for the service to stop
312+
*/
313+
private void shutdownMessageBroker(final ExecutorService executorService) throws InterruptedException {
314+
executorService.shutdownNow();
315+
316+
final int shutdownTimeoutInSeconds = getMessageBrokerShutdownTimeoutInSeconds();
317+
final boolean terminationResult = executorService.awaitTermination(shutdownTimeoutInSeconds, SECONDS);
318+
if (!terminationResult) {
319+
log.error("Container '{}' did not shutdown MessageBroker within {} seconds", getIdentifier(), shutdownTimeoutInSeconds);
320+
}
293321
}
294322

295323
/**
@@ -309,14 +337,12 @@ private BlockingRunnable startupMessageRetriever(final MessageRetriever messageR
309337
CompletableFuture.supplyAsync(messageRetriever::run, executorService)
310338
.thenAccept(extraMessagesConsumer);
311339
return () -> {
312-
log.info("Shutting down MessageRetriever");
313340
executorService.shutdownNow();
314341

315-
final boolean retrieverShutdown;
316342
final int retrieverShutdownTimeoutInSeconds = getMessageRetrieverShutdownTimeoutInSeconds();
317-
retrieverShutdown = executorService.awaitTermination(retrieverShutdownTimeoutInSeconds, SECONDS);
343+
final boolean retrieverShutdown = executorService.awaitTermination(retrieverShutdownTimeoutInSeconds, SECONDS);
318344
if (!retrieverShutdown) {
319-
log.error("MessageRetriever did not shutdown within {} seconds", retrieverShutdownTimeoutInSeconds);
345+
log.error("Container '{}' did not shutdown MessageRetriever within {} seconds", getIdentifier(), retrieverShutdownTimeoutInSeconds);
320346
}
321347
};
322348
}
@@ -332,13 +358,12 @@ private BlockingRunnable startupMessageResolver(final MessageResolver messageRes
332358
final ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory(getIdentifier() + "-message-resolver"));
333359
CompletableFuture.runAsync(messageResolver::run, executorService);
334360
return () -> {
335-
log.info("Shutting down MessageResolver");
336361
executorService.shutdownNow();
337362

338363
final long messageResolverShutdownTimeInSeconds = getMessageResolverShutdownTimeoutInSeconds();
339364
final boolean messageResolverShutdown = executorService.awaitTermination(getMessageResolverShutdownTimeoutInSeconds(), SECONDS);
340365
if (!messageResolverShutdown) {
341-
log.error("MessageResolver did not shutdown within {} seconds", messageResolverShutdownTimeInSeconds);
366+
log.error("Container '{}' did not shutdown MessageResolver within {} seconds", getIdentifier(), messageResolverShutdownTimeInSeconds);
342367
}
343368
};
344369
}
@@ -365,6 +390,19 @@ private int getMessageProcessingShutdownTimeoutInSeconds() {
365390
);
366391
}
367392

393+
/**
394+
* Get the amount of time in seconds that we should wait for the {@link MessageBroker} to shutdown when requested.
395+
*
396+
* @return the amount of time in seconds to wait for shutdown
397+
*/
398+
private int getMessageBrokerShutdownTimeoutInSeconds() {
399+
return PropertyUtils.safelyGetPositiveOrZeroIntegerValue(
400+
"messageBrokerShutdownTimeoutInSeconds",
401+
properties::getMessageBrokerShutdownTimeoutInSeconds,
402+
DEFAULT_SHUTDOWN_TIME_IN_SECONDS
403+
);
404+
}
405+
368406
/**
369407
* Get the amount of time in seconds that we should wait for the {@link MessageRetriever} to shutdown when requested.
370408
*
@@ -385,7 +423,7 @@ private int getMessageRetrieverShutdownTimeoutInSeconds() {
385423
*/
386424
private int getMessageResolverShutdownTimeoutInSeconds() {
387425
return PropertyUtils.safelyGetPositiveOrZeroIntegerValue(
388-
"messageRetrieverShutdownTimeoutInSeconds",
426+
"messageResolverShutdownTimeoutInSeconds",
389427
properties::getMessageResolverShutdownTimeoutInSeconds,
390428
DEFAULT_SHUTDOWN_TIME_IN_SECONDS
391429
);

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/container/CoreMessageListenerContainerProperties.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,22 @@ public interface CoreMessageListenerContainerProperties {
3333
Boolean shouldProcessAnyExtraRetrievedMessagesOnShutdown();
3434

3535
/**
36-
* Gets the amount of time that the broker should wait for the {@link MessageRetriever} to shutdown when the broker is being shutdown.
36+
* Gets the amount of time that the container should wait for the {@link com.jashmore.sqs.broker.MessageBroker} to shutdown when the broker is
37+
* being shutdown.
38+
*
39+
* <p>If this value is negative or null, then {@link CoreMessageListenerContainerConstants#DEFAULT_SHUTDOWN_TIME_IN_SECONDS} will be used for instead.
40+
*
41+
* @return the number of seconds to wait for the message retriever to shutdown
42+
*/
43+
@Nullable
44+
@PositiveOrZero
45+
default Integer getMessageBrokerShutdownTimeoutInSeconds() {
46+
// Added to not cause an API breaking change
47+
return null;
48+
}
49+
50+
/**
51+
* Gets the amount of time that the container should wait for the {@link MessageRetriever} to shutdown when the broker is being shutdown.
3752
*
3853
* <p>If this value is negative or null, then {@link CoreMessageListenerContainerConstants#DEFAULT_SHUTDOWN_TIME_IN_SECONDS} will be used for instead.
3954
*
@@ -44,7 +59,7 @@ public interface CoreMessageListenerContainerProperties {
4459
Integer getMessageRetrieverShutdownTimeoutInSeconds();
4560

4661
/**
47-
* The number of seconds that the broker should wait for the message processing threads to finish when a shutdown is initiated.
62+
* The number of seconds that the container should wait for the message processing threads to finish when a shutdown is initiated.
4863
*
4964
* <p>When {@link #shouldProcessAnyExtraRetrievedMessagesOnShutdown()} is true and there are extra messages to be processed, this field will try and
5065
* put as many messages onto threads to be processed before this limit is hit. If this time limit is reached some messages may not have been processed.
@@ -58,7 +73,7 @@ public interface CoreMessageListenerContainerProperties {
5873
Integer getMessageProcessingShutdownTimeoutInSeconds();
5974

6075
/**
61-
* Gets the amount of time that the broker should wait for the {@link MessageResolver} to shutdown when the broker is being shutdown.
76+
* Gets the amount of time that the container should wait for the {@link MessageResolver} to shutdown when the broker is being shutdown.
6277
*
6378
* <p>If this value is negative or null, then {@link CoreMessageListenerContainerConstants#DEFAULT_SHUTDOWN_TIME_IN_SECONDS} will be used for instead.
6479
*

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/container/StaticCoreMessageListenerContainerProperties.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
@Value
1010
@Builder(toBuilder = true)
1111
public class StaticCoreMessageListenerContainerProperties implements CoreMessageListenerContainerProperties {
12-
private final String messageProcessingThreadNameFormat;
13-
private final Boolean shouldProcessAnyExtraRetrievedMessagesOnShutdown;
14-
private final Boolean shouldInterruptThreadsProcessingMessagesOnShutdown;
15-
private final Integer messageProcessingShutdownTimeoutInSeconds;
16-
private final Integer messageRetrieverShutdownTimeoutInSeconds;
17-
private final Integer messageResolverShutdownTimeoutInSeconds;
12+
String messageProcessingThreadNameFormat;
13+
Boolean shouldProcessAnyExtraRetrievedMessagesOnShutdown;
14+
Boolean shouldInterruptThreadsProcessingMessagesOnShutdown;
15+
Integer messageProcessingShutdownTimeoutInSeconds;
16+
Integer messageRetrieverShutdownTimeoutInSeconds;
17+
Integer messageResolverShutdownTimeoutInSeconds;
18+
Integer messageBrokerShutdownTimeoutInSeconds;
1819

1920
@Nullable
2021
@Override
@@ -28,6 +29,12 @@ public Boolean shouldProcessAnyExtraRetrievedMessagesOnShutdown() {
2829
return shouldProcessAnyExtraRetrievedMessagesOnShutdown;
2930
}
3031

32+
@Nullable
33+
@Override
34+
public @PositiveOrZero Integer getMessageBrokerShutdownTimeoutInSeconds() {
35+
return messageBrokerShutdownTimeoutInSeconds;
36+
}
37+
3138
@Nullable
3239
@Override
3340
public @PositiveOrZero Integer getMessageProcessingShutdownTimeoutInSeconds() {

java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/container/CoreMessageListenerContainerTest.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.jashmore.sqs.processor.MessageProcessor;
1515
import com.jashmore.sqs.resolver.MessageResolver;
1616
import com.jashmore.sqs.retriever.MessageRetriever;
17+
import com.jashmore.sqs.util.concurrent.CompletableFutureUtils;
1718
import lombok.extern.slf4j.Slf4j;
1819
import org.junit.jupiter.api.Test;
1920
import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,12 +36,8 @@
3536
@Slf4j
3637
@ExtendWith(MockitoExtension.class)
3738
class CoreMessageListenerContainerTest {
38-
private static final CompletableFuture<Message> STUB_MESSAGE_BROKER_DONE;
39-
40-
static {
41-
STUB_MESSAGE_BROKER_DONE = new CompletableFuture<>();
42-
STUB_MESSAGE_BROKER_DONE.completeExceptionally(new RuntimeException("Expected Messages Done"));
43-
}
39+
private static final CompletableFuture<Message> STUB_MESSAGE_BROKER_DONE
40+
= CompletableFutureUtils.completedExceptionally(new RuntimeException("Expected Messages Done"));
4441

4542
private static final StaticCoreMessageListenerContainerProperties DEFAULT_PROPERTIES = StaticCoreMessageListenerContainerProperties.builder()
4643
.shouldInterruptThreadsProcessingMessagesOnShutdown(true)
@@ -49,6 +46,7 @@ class CoreMessageListenerContainerTest {
4946
.messageProcessingShutdownTimeoutInSeconds(5)
5047
.messageResolverShutdownTimeoutInSeconds(5)
5148
.messageRetrieverShutdownTimeoutInSeconds(5)
49+
.messageBrokerShutdownTimeoutInSeconds(5)
5250
.build();
5351

5452
@Mock
@@ -255,6 +253,26 @@ void anyExtraMessagesLeftoverByAsyncMessageRetrieverWillBeProcessedOnShutdownWhe
255253
verify(messageProcessor).processMessage(eq(secondExtraMessage), any(Runnable.class));
256254
}
257255

256+
@Test
257+
void allMessageListenerThreadsWillBeShutdownWhenContainerShutdown() throws InterruptedException {
258+
// arrange
259+
when(messageRetriever.retrieveMessage())
260+
.thenReturn(STUB_MESSAGE_BROKER_DONE);
261+
when(messageRetriever.run()).thenReturn(ImmutableList.of());
262+
final StaticCoreMessageListenerContainerProperties properties = DEFAULT_PROPERTIES.toBuilder()
263+
.shouldProcessAnyExtraRetrievedMessagesOnShutdown(true)
264+
.build();
265+
final CoreMessageListenerContainer container = buildContainer(
266+
"my-specific-container-id", new StubMessageBroker(), messageResolver, messageProcessor, messageRetriever, properties);
267+
268+
// act
269+
container.runContainer();
270+
Thread.sleep(1000); // let's just wait a little bit just to be sure that the thread is gone
271+
272+
// assert
273+
assertThat(Thread.getAllStackTraces().keySet()).noneMatch(thread -> thread.getName().startsWith("my-specific-container-id"));
274+
}
275+
258276
@Test
259277
void willInterruptMessagesProcessingDuringShutdownWhenPropertySetToTrue() {
260278
// arrange
@@ -286,7 +304,7 @@ void willInterruptMessagesProcessingDuringShutdownWhenPropertySetToTrue() {
286304
container.runContainer();
287305

288306
// assert
289-
assertThat(wasThreadInterrupted).isTrue();
307+
assertThat(wasThreadInterrupted.get()).isTrue();
290308
}
291309

292310
@Test
@@ -346,7 +364,7 @@ void whenContainerIsBeingStoppedAnyAsyncMessageRetrieverThreadWillBeInterrupted(
346364
container.runContainer();
347365

348366
// assert
349-
assertThat(messageRetrieverInterrupted).isTrue();
367+
assertThat(messageRetrieverInterrupted.get()).isTrue();
350368
}
351369

352370
@Test
@@ -371,7 +389,7 @@ void whenContainerIsBeingStoppedAnyAsyncMessageResolverThreadWillBeInterrupted()
371389
container.runContainer();
372390

373391
// assert
374-
assertThat(messageResolverInterrupted).isTrue();
392+
assertThat(messageResolverInterrupted.get()).isTrue();
375393
}
376394

377395
@Test
@@ -407,7 +425,7 @@ void whenMessageRetrieverExceedsShutdownLimitTheRestOfTheShutdownProcessIsTrigge
407425
container.runContainer();
408426

409427
// assert
410-
assertThat(messageResolverInterrupted).isTrue();
428+
assertThat(messageResolverInterrupted.get()).isTrue();
411429
}
412430

413431
@Test

0 commit comments

Comments
 (0)