Skip to content

1026274: Worker Framework Shutdown hanging #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,7 @@ final class BulkWorkerThreadPool implements WorkerThreadPool
private final StreamingWorkerThreadPool backupThreadPool;

private volatile boolean isActive;
private volatile AtomicInteger activeThreads = new AtomicInteger(0);

public BulkWorkerThreadPool(
final WorkerFactory workerFactory,
Expand Down Expand Up @@ -86,10 +88,12 @@ private void execute()
= new BulkWorkerTaskProvider(task, workQueue);

try {
activeThreads.addAndGet(1);
bulkWorker.processTasks(taskProvider);
} catch (final RuntimeException ex) {
LOG.warn("Bulk Worker threw unhandled exception", ex);
} finally {
activeThreads.decrementAndGet();
// Re-submit the first task if it has not been consumed
// NB: It's really faulty Worker logic to not consume at least
// the one task.
Expand Down Expand Up @@ -150,6 +154,11 @@ public int getBacklogSize()
{
return workQueue.size() + backupThreadPool.getBacklogSize();
}

@Override
public int getApproxActiveCount() {
return activeThreads.get() + backupThreadPool.getApproxActiveCount();
}

@Override
public void submitWorkerTask(final WorkerTaskImpl workerTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public int getBacklogSize()
{
return workQueue.size();
}

@Override
public int getApproxActiveCount() {
return threadPoolExecutor.getActiveCount();
}

/**
* Execute the specified task at some point in the future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public final class WorkerApplication extends Application<WorkerConfiguration>
{
private final long startTime = System.currentTimeMillis();
private static final Logger LOG = LoggerFactory.getLogger(WorkerApplication.class);
private static final long SHUTDOWN_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds
private static final int SHUTDOWN_LOG_INTERVAL = 15_000; // 15 seconds in milliseconds

/**
* Entry point for the asynchronous micro-service worker framework.
Expand Down Expand Up @@ -143,25 +145,26 @@ public void start() {

@Override
public void stop() {
LOG.info("Worker stop requested, allowing in-progress tasks to complete.");
LOG.info("Worker stop requested.");

workerQueue.shutdownIncoming();
while (!wtp.isIdle()) {

final long startTime = System.currentTimeMillis();

while(wtp.getBacklogSize() > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure about this strategy of trying to clear the backlog during shutdown? Trying to complete in-progress tasks makes sense but if I'm understanding this right it is intentionally allowing new tasks to begin that haven't been passed to the worker code yet. Wouldn't we be better to clear these backlog queues and let the next worker instance pick them up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prefetched message have their delivery count increased and this goes towards the poison message identification.
I take your point that starting more messages increases the chances they get terminated after the timeout.
Let's aim to talk it through at some stage.

try {
//The grace period will expire and the process killed so no need for time limit here
LOG.trace("Awaiting the Worker Thread Pool to become idle, {} tasks in the backlog.",
wtp.getBacklogSize());
Thread.sleep(1000);
LOG.info("Allowing {} backlog tasks to complete, {} currently active.", wtp.getBacklogSize(), wtp.getApproxActiveCount());
Thread.sleep(SHUTDOWN_LOG_INTERVAL);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
break;
}
}
LOG.trace("Worker Thread Pool is idle.");
wtp.shutdown();
try {
wtp.awaitTermination(10_000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Shutdown interrupted", e);
wtp.awaitTermination(5, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
LOG.error("Worker stop interrupted, in-progress tasks may not have completed.", e);
Thread.currentThread().interrupt();
}
workerQueue.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ void awaitTermination(long timeout, TimeUnit unit)
boolean isIdle();

int getBacklogSize();

int getApproxActiveCount();

/**
* Execute the specified task at some point in the future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class RabbitWorkerQueue implements ManagedWorkerQueue
private final RabbitWorkerQueueConfiguration config;
private final int maxTasks;
private static final Logger LOG = LoggerFactory.getLogger(RabbitWorkerQueue.class);
private boolean incomingShutdownPermanent = false;

/**
* Setup a new RabbitWorkerQueue.
Expand Down Expand Up @@ -214,6 +215,7 @@ public String getPausedQueue()
* {@inheritDoc}
*
* The incoming queues will all be cancelled so the consumer will fall back to idle.
* This is permanent, and attempts to reconnectIncoming will fail.
*/
@Override
public void shutdownIncoming()
Expand All @@ -224,6 +226,7 @@ public void shutdownIncoming()
try {
incomingChannel.basicCancel(consumerTag);
consumerTag = null;
incomingShutdownPermanent = true;
} catch (IOException e) {
metrics.incremementErrors();
LOG.warn("Failed to cancel consumer {}", consumerTag, e);
Expand Down Expand Up @@ -277,6 +280,9 @@ public void disconnectIncoming()
public void reconnectIncoming()
{
LOG.debug("Reconnecting incoming queues");
if(incomingShutdownPermanent) {
throw new IllegalStateException("Queue is permanently shutdown");
}
synchronized (consumerLock) {
if (consumerTag == null && incomingChannel.isOpen()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ShutdownDeveloperTest extends TestWorkerTestBase {
public void shutdownTest() throws IOException, TimeoutException, CodecException {

// Usage instructions
// Comment out the iages for test worker 2 and 3 in this module's pom.xml
// Comment out the images for worker-test-2 and worker-test-no-valid-cert in this module's pom.xml
// Use mvn docker:start to start test worker
// Remove the @Ignore and run the test to create 100 test messages
// From a terminal execute docker stop -t 300 CONTAINER_ID
Expand Down