Skip to content

Commit 78866d6

Browse files
committed
Use AbortSignal to interrupt jobs after shutdown timeout
- private AbortController created upon .work() - After stop() and shutdown timeout, in-progress jobs are sent an abort signal and subsequently FAILed on the server. - 3 seconds delay is added after abort signal is sent to allow cleanup - exit code when jobs are aborted at end of grace period is 1 A job may be interrupted when a worker shuts down. In this case there are two mechanisms to ensure graceful interruption: the shutdown timeout and the execution context `AbortSignal`. The shutdown timeout is configured in `WorkerOptions.timeout`. When a worker is instructed to stop (via process signal or server message), it will stop accepting new work (e.g. `quiet`) and wait the configured duration for any in-progress jobs to complete uninterrupted. If this duration elapses and jobs are still in progress, these jobs will receive an AbortSignal via `Context.signal`. All jobs will be `FAIL`ed on the Faktory server, allowing them to retry later. The abort signal can be used to interrupt asynchronous processes and perform some cleanup tasks before an abrupt exit (`process.exit`). After the abort signal is sent, a job will have 3 seconds to perform cleanup before the process is abruptly exited.
1 parent 7467256 commit 78866d6

File tree

9 files changed

+1324
-908
lines changed

9 files changed

+1324
-908
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 4.7.0 | 2024-12-17
2+
3+
Features:
4+
5+
- Job context now includes property `signal: AbortSignal` for job functions to use for more graceful shutdowns during a hard shutdown. Thanks, @knpwrs (#409)
6+
17
## 4.6.0 | 2024-07-24
28

39
Features:

README.md

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
A node.js client and worker library for the [Faktory](https://github.com/contribsys/faktory) job server. The client allows you to push jobs and communicate with the Faktory server and the worker fetches background jobs from the Faktory server and processes them.
1010

11-
Faktory server compatibility: `~v1.6.1`
11+
Faktory server compatibility: `>v1.6.1`
1212

1313
## Installation
1414

@@ -127,6 +127,26 @@ worker.on("fail", ({ job, error }) => {
127127
});
128128
```
129129

130+
### Shutdown and AbortSignal
131+
132+
A job may be interrupted when a worker shuts down. In this case there are two mechanisms to ensure graceful interruption: the shutdown timeout and the execution context `AbortSignal`. The shutdown timeout is configured in `WorkerOptions.timeout`. When a worker is instructed to stop (via process signal or server message), it will stop accepting new work (e.g. `quiet`) and wait the configured duration for any in-progress jobs to complete uninterrupted. If this duration elapses and jobs are still in progress, these jobs will receive an AbortSignal via `Context.signal`. All jobs will be `FAIL`ed on the Faktory server, allowing them to retry later. The abort signal can be used to interrupt asynchronous processes and perform some cleanup tasks before an abrupt exit (`process.exit`). After the abort signal is sent, a job will have 3 seconds to perform cleanup before the process is abruptly exited.
133+
134+
Example - A long-running subprocess:
135+
136+
```js
137+
faktory.register("JobWithAbort", (...args) => async ({ signal }) => {
138+
try {
139+
await execa("ffmpeg", [/* arg1, arg2, ..., argN */], { cancelSignal: signal });
140+
} catch (e) {
141+
if (e.code === "ABORT_ERR") {
142+
// Remove some tempfiles or other type of cleanup...
143+
// Propagating the ABORT_ERR is not necessary, the job will be FAILed if it was in-progress
144+
// at the end of the shutdown timeout
145+
}
146+
}
147+
});
148+
```
149+
130150
### Middleware
131151

132152
Faktory middleware works just like [`koa`](https://github.com/koajs/koa) middleware. You can register a middleware function (async or sync) with `.use`. Middleware is called for every job that is performed. Always return a promise, `await next()`, or `return next();` to allow execution to continue down the middleware chain.
@@ -164,31 +184,40 @@ Here are the defaults:
164184

165185
```js
166186
await faktory.work({
187+
// The Faktory server host.
188+
// Will be extracted from FAKTORY_URL ENV variable if set.
167189
host: process.env.FAKTORY_URL || "127.0.0.1",
168190

169-
// default: 7419 -- can extracted from FAKTORY_URL env var
191+
// The port the Faktory server is listening on (default: 7419).
192+
// Will be extracted from FAKTORY_URL ENV variable if set.
170193
port: 7419,
171194

172-
// can extracted from FAKTORY_URL env var
195+
// The Faktory server password.
196+
// Will be extracted from FAKTORY_URL ENV variable if set.
173197
password: undefined,
174198

175-
// this is a max number of jobs the worker will have
176-
// in progress at any time
199+
// This sets the maximum number of jobs the worker may have in
200+
// progress at any time.
177201
concurrency: 20,
178202

179-
// the queues the worker will process—remember to preserve default if overriding this
180-
// default fetching behavior is **Strictly Ordered**
203+
// The list of queues the worker will fetch and process jobs from.
204+
// Remember to preserve `default` in this list if overriding.
205+
// Queues can be specified in two ways: **strictly ordered** and
206+
// **weighted random**. See documentation above.
181207
queues: ["default"],
182208

183-
// the number of milliseconds jobs have to complete after
184-
// receiving a graceful shutdown signal. After this timeout, in-progress jobs may be abruptly stopped.
185-
timeout: 8 * 1000,
209+
// When the worker is asked to stop, in-progress jobs have this many
210+
// seconds to finish their work before an AbortSignal is sent.
211+
// Jobs have 3 seconds after the AbortSignal to clean up before an
212+
// abrupt process exit occurs.
213+
timeout: 8,
186214

187-
// the worker id to use in the faktory-server connection
188-
// for this process. must be unique per process.
215+
// The worker id to use in the faktory-server connection
216+
// for this worker. Omit for a pseudo-randomly generated id.
217+
// Must be unique per process.
189218
wid: uuid().first(8),
190219

191-
// labels for the faktory worker process to see in the UI
220+
// Labels for this worker as can be seen in the UI.
192221
labels: [],
193222
});
194223
```
@@ -212,9 +241,8 @@ The function passed to `register` can be a thunk. The registered function will r
212241
```js
213242
faktory.register("JobWithHeaders", (...args) => async ({ job }) => {
214243
const [email] = args;
215-
I18n.locale = job.custom.locale;
216244
log(job.custom.txid);
217-
await sendEmail(email);
245+
await sendEmail(email, { locale: job.custom.locale });
218246
});
219247
```
220248

@@ -271,6 +299,7 @@ faktory.register("TouchRecord", (id) => async ({ db }) => {
271299
- [x] Fail jobs
272300
- [x] Add'l client commands API
273301
- [x] Labels
302+
- [x] AbortController
274303

275304
## Development
276305

0 commit comments

Comments
 (0)