Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2506,8 +2506,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} =
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache)

while not ProcessState.stopIt(notice("Shutting down", reason = it)):
poll()
waitFor ProcessState.waitStopSignals()

# time to say goodbye
node.stop()
Expand Down
129 changes: 102 additions & 27 deletions beacon_chain/process_state.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@
## * The main thread wakes up any threads it started and notifies them of the
## imminent shutdown then waits for them to terminate
##
## `chronos` has a `waitSignal` function that could be use to wake it when a
## signal arrives - at the time of writing, it only works in a single-threaded
## application when chronos is the only signal handler and requires using
## its own raising mechanism instead of the standard `raise`/`pthread_kill`
## functions which makes it difficult to use:
## https://github.com/status-im/nim-chronos/issues/581
##
## As such, polling `ProcessState.stopping` ends up being the more reliable
## cross-platform solution in spite of its downsides.
## In this way, the main thread is notified that _some_ thread or the user wants
## the process to shut down. The main thread stops whatever it's doing and
## notifies all threads it started that shutdown is imminent and then proceeds
## with the shutdown.

{.push raises: [].}

import std/atomics, results

import std/atomics, results, chronos, chronos/threadsync, chronicles
export results

type ProcessState* {.pure.} = enum
Expand All @@ -48,29 +42,69 @@ var shutdownSource: Atomic[pointer]

import system/ansi_c

when defined(posix):
import posix

when defined(linux):
var signalTarget = pthread_self()

proc ignoreStopSignalsInThread*(_: type ProcessState): bool =
# Block signals in the current thread and all threads created from it
# (hopefully)
var signalMask, oldSignalMask: Sigset

sigemptyset(signalMask) == 0 and sigaddset(signalMask, posix.SIGINT) == 0 and
sigaddset(signalMask, posix.SIGTERM) == 0 and
pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) == 0

proc raiseStopSignal() =
# If the default signal handler is blocked and the app is polling, we still
# want the state updated - blocking signals on all threads is necessary for
# waitSignal to work, but the application might want to check _before_ the
# signal handler is invoked.
processState.store(ProcessState.Stopping)

when defined(linux):
# On linux, we want to direct the signal to the thread that is currently
# listening on `waitSignal` - when there's no such thread, it doesn't
# really matter which thread it goes to
discard pthread_kill(signalTarget, posix.SIGTERM)
else:
# kqueue listens only to process-directed signals - for waitSignal to
# work as expected, we use kill
discard kill(getpid(), posix.SIGTERM)

else:
proc ignoreStopSignalsInThread*(_: type ProcessState): bool =
true

import chronos/osutils

proc raiseStopSignal() =
discard c_raise(ansi_c.SIGINT)
# Chronos installs its own handlers that are incompatible with `raise` -
# when waitSignal is running we must also notify chronos
discard osutils.raiseSignal(chronos.SIGINT)

proc scheduleStop*(_: type ProcessState, source: cstring) =
## Schedule that the process should stop in a thread-safe way. This function
## can be used from non-nim threads as well.
##
# TODO in theory, we could use `raise`/`kill`/`etc` depending on the platform
# to set `processState` from within the signal handler - if we were
# a kqueue/epoll-based signal handler, this would be the way to go so
# as to provide a wakeup notification - there are platform-based
# differences to take into account however, ie on kqueue, only process-
# directed signals are woken up whereas on linux, the signal has to
# reach the correct thread that is doing the waiting which requires
# special care.
var nilptr: pointer
discard shutdownSource.compareExchange(nilptr, source, moRelaxed)
processState.store(ProcessState.Stopping)
raiseStopSignal()

proc notifyRunning*(_: type ProcessState) =
processState.store(ProcessState.Running, moRelaxed)

proc setupStopHandlers*(_: type ProcessState) =
## Install signal handlers for SIGINT/SIGTERM such that the application
## updates `processState` on CTRL-C and similar, allowing it to gracefully
## shut down by monitoring `ProcessState.stopping` at regular intervals.
## shut down by monitoring `ProcessState.running` at regular intervals.
##
## `async` applications should prefer to use
## `await ProcessState.waitStopsignals()` since the CTRL-C handling provided
## by `signal` does not wake the async polling loop and can therefore get
## stuck if no events are happening.
##
## This function should be called early on from the main thread to avoid the
## default Nim signal handlers from being used as these will crash or close
Expand Down Expand Up @@ -104,6 +138,46 @@ proc setupStopHandlers*(_: type ProcessState) =
when defined(posix):
c_signal(ansi_c.SIGTERM, controlCHandler)

proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]).} =
## Monitor stop signals via chronos' event loop, masking other handlers.
##
## This approach ensures that the event loop wakes up on signal delivery
## unlike `setupStopHandlers` which merely sets a flag that must be polled.
##
## Only one thread should ever listen for stop signals this way.

# Ensure other threads don't cause a crash, in case the application did not
# already call it
ProcessState.setupStopHandlers()

let
sigint = waitSignal(chronos.SIGINT)
sigterm = waitSignal(chronos.SIGTERM)

debug "Waiting for signal", chroniclesThreadIds = true
when defined(linux):
signalTarget = pthread_self()

try:
discard await race(sigint, sigterm)

var source = cast[cstring](shutdownSource.load())
if source == nil:
source = "Unknown"

notice "Shutting down", chroniclesThreadIds = true, source

processState.store(ProcessState.Stopping, moRelaxed)
finally:
# waitSignal sometimes overwrites signal handlers:
# https://github.com/status-im/nim-chronos/issues/581
ProcessState.setupStopHandlers()

# Might be finished already, which is fine..
await noCancel sigint.cancelAndWait()
await noCancel sigterm.cancelAndWait()


proc running*(_: type ProcessState): bool =
processState.load(moRelaxed) == ProcessState.Running

Expand All @@ -126,7 +200,7 @@ template stopIt*(_: type ProcessState, body: untyped): bool =
false

when isMainModule: # Test case
import os, chronos, chronos/threadsync
import os

proc threadWork() {.async.} =
var todo = 2
Expand Down Expand Up @@ -167,14 +241,15 @@ when isMainModule: # Test case
# set the same flag as `waitStopSignals` does.
ProcessState.setupStopHandlers()

# Wait for a stop signal - this can be either the user pressing ctrl-c or
# an out-of-band notification via kill/windows service command / some rest
# API etc
echo "main thread waiting"
while ProcessState.stopping.isNone:
os.sleep(100)

waitFor ProcessState.waitStopSignals()
echo "main thread firing stopper"

# Notify the thread should stop itself as well using a ThreadSignalPtr
# rather than an OS signal - this is more portable
# rather than an OS signal
waitFor stopper.fire()

workerThread.joinThread()
Expand Down
Loading