|
1 | 1 | # crates_io_worker
|
2 | 2 |
|
3 |
| -This package contains the background job runner for the crates.io application. |
| 3 | +A robust background job processing system for the crates.io application. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +This crate provides an async PostgreSQL-backed job queue system with support for: |
| 8 | + |
| 9 | +- **Prioritized job execution** with configurable priorities |
| 10 | +- **Job deduplication** to prevent duplicate work |
| 11 | +- **Multiple job queues** with independent worker pools |
| 12 | +- **Automatic retry** with exponential backoff for failed jobs |
| 13 | +- **Graceful shutdown** and queue management |
| 14 | +- **Error tracking** with Sentry integration |
| 15 | + |
| 16 | +## Architecture |
| 17 | + |
| 18 | +The system consists of three main components: |
| 19 | + |
| 20 | +- **`BackgroundJob`** trait - Define job types and their execution logic |
| 21 | +- **`Runner`** - High-level orchestrator that manages multiple queues and their worker pools |
| 22 | +- **`Worker`** - Low-level executor that polls for and processes individual jobs |
| 23 | + |
| 24 | +### Runner vs Worker |
| 25 | + |
| 26 | +- **`Runner`** is the entry point and orchestrator: |
| 27 | + - Manages multiple named queues (e.g., "default", "emails", "indexing") |
| 28 | + - Spawns and coordinates multiple `Worker` instances per queue |
| 29 | + - Handles job type registration and queue configuration |
| 30 | + - Provides graceful shutdown coordination across all workers |
| 31 | + |
| 32 | +- **`Worker`** is the actual job processor: |
| 33 | + - Polls the database for available jobs in a specific queue |
| 34 | + - Locks individual jobs to prevent concurrent execution |
| 35 | + - Executes job logic with error handling and retry logic |
| 36 | + - Reports job completion or failure back to the database |
| 37 | + |
| 38 | +Jobs are stored in the `background_jobs` PostgreSQL table and processed asynchronously by worker instances that poll for available work in their assigned queues. |
| 39 | + |
| 40 | +### Job Processing and Locking |
| 41 | + |
| 42 | +When a worker picks up a job from the database, the table row is immediately locked to prevent other workers from processing the same job concurrently. This ensures that: |
| 43 | + |
| 44 | +- Each job is processed exactly once, even with multiple workers running |
| 45 | +- Failed jobs can be safely retried without duplication |
| 46 | +- The system scales horizontally by adding more worker processes |
| 47 | + |
| 48 | +Once job execution completes successfully, the row is deleted from the table. If the job fails, the row remains with updated retry information for future processing attempts. |
| 49 | + |
| 50 | +## Database Schema |
| 51 | + |
| 52 | +```sql |
| 53 | +CREATE TABLE background_jobs ( |
| 54 | + id BIGSERIAL PRIMARY KEY, |
| 55 | + job_type TEXT NOT NULL, |
| 56 | + data JSONB NOT NULL, |
| 57 | + retries INTEGER NOT NULL DEFAULT 0, |
| 58 | + last_retry TIMESTAMP NOT NULL DEFAULT NOW(), |
| 59 | + created_at TIMESTAMP NOT NULL DEFAULT NOW(), |
| 60 | + priority SMALLINT NOT NULL DEFAULT 0 |
| 61 | +); |
| 62 | +``` |
| 63 | + |
| 64 | +## Usage |
| 65 | + |
| 66 | +### Defining a Job |
| 67 | + |
| 68 | +```rust |
| 69 | +use crates_io_worker::BackgroundJob; |
| 70 | +use serde::{Deserialize, Serialize}; |
| 71 | + |
| 72 | +#[derive(Serialize, Deserialize)] |
| 73 | +struct SendEmailJob { |
| 74 | + to: String, |
| 75 | + subject: String, |
| 76 | + body: String, |
| 77 | +} |
| 78 | + |
| 79 | +impl BackgroundJob for SendEmailJob { |
| 80 | + const JOB_NAME: &'static str = "send_email"; |
| 81 | + const PRIORITY: i16 = 10; |
| 82 | + const DEDUPLICATED: bool = false; |
| 83 | + const QUEUE: &'static str = "emails"; |
| 84 | + |
| 85 | + type Context = AppContext; |
| 86 | + |
| 87 | + async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> { |
| 88 | + // Job implementation |
| 89 | + ctx.email_service.send(&self.to, &self.subject, &self.body).await?; |
| 90 | + Ok(()) |
| 91 | + } |
| 92 | +} |
| 93 | +``` |
| 94 | + |
| 95 | +### Running the Worker |
| 96 | + |
| 97 | +```rust |
| 98 | +use crates_io_worker::Runner; |
| 99 | + |
| 100 | +let runner = Runner::new(connection_pool, app_context) |
| 101 | + .register_job_type::<SendEmailJob>() |
| 102 | + .configure_queue("emails", |queue| { |
| 103 | + queue.num_workers(2).poll_interval(Duration::from_secs(5)) |
| 104 | + }); |
| 105 | + |
| 106 | +runner.run().await; |
| 107 | +``` |
| 108 | + |
| 109 | +### Enqueuing Jobs |
| 110 | + |
| 111 | +```rust |
| 112 | +let job = SendEmailJob { |
| 113 | + to: "user@example.com".to_string(), |
| 114 | + subject: "Welcome!".to_string(), |
| 115 | + body: "Thanks for signing up!".to_string(), |
| 116 | +}; |
| 117 | + |
| 118 | +job.enqueue(&mut conn).await?; |
| 119 | +``` |
| 120 | + |
| 121 | +## Configuration |
| 122 | + |
| 123 | +### Job Properties |
| 124 | + |
| 125 | +- **`JOB_NAME`**: Unique identifier for the job type |
| 126 | +- **`PRIORITY`**: Execution priority (higher values = higher priority) |
| 127 | +- **`DEDUPLICATED`**: Whether to prevent duplicate jobs with identical data |
| 128 | +- **`QUEUE`**: Queue name for job execution (defaults to "default") |
| 129 | + |
| 130 | +### Queue Configuration |
| 131 | + |
| 132 | +- **Worker count**: Number of concurrent workers per queue |
| 133 | +- **Poll interval**: How often workers check for new jobs |
| 134 | +- **Shutdown behavior**: Whether to stop when queue is empty |
| 135 | + |
| 136 | +## Error Handling |
| 137 | + |
| 138 | +Failed jobs are automatically retried with exponential backoff. The retry count and last retry timestamp are tracked in the database. Jobs that continue to fail will eventually be abandoned after reaching the maximum retry limit. |
| 139 | + |
| 140 | +All job execution is instrumented with tracing and optionally reported to Sentry for error monitoring. |
| 141 | + |
| 142 | +## History |
4 | 143 |
|
5 | 144 | The implementation was originally extracted from crates.io into the separate
|
6 | 145 | [`swirl`](https://github.com/sgrif/swirl) project, but has since been
|
7 |
| -re-integrated and heavily modified. |
8 |
| - |
9 |
| -The background worker uses a `background_jobs` PostgreSQL table to store jobs |
10 |
| -that need to be run. Once a job is picked up by a worker, the table row is |
11 |
| -locked, and the job is run. If the job fails, it will be retried with |
12 |
| -exponential backoff. If the job succeeds, the row will be deleted. |
| 146 | +re-integrated and heavily modified to meet the specific needs of the crates.io platform. |
0 commit comments