Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
277fdba
Check reviewers are assigned before opening review phase
jmgasper Sep 23, 2025
2014f79
Deploy branch
jmgasper Sep 23, 2025
7d62954
Ignore challenges not active
jmgasper Sep 23, 2025
012c67d
Better handling of phases with the same start and end dates
jmgasper Sep 23, 2025
1309ac1
Add winners at the end of the challenge
jmgasper Sep 23, 2025
18d952e
Logging to DB for easier queries and tracking
jmgasper Sep 23, 2025
52d511b
Fix JSONb usage
jmgasper Sep 24, 2025
71e6def
Tweaks to not complete challenge too early
jmgasper Sep 24, 2025
182f976
Better copmletion tracking
jmgasper Sep 24, 2025
4160579
Completed status check tweaks
jmgasper Sep 24, 2025
c9a887c
Better Iterative Review handling
jmgasper Sep 25, 2025
37df54a
Use bullmq for scheduling instead of native, due to overflow and lack…
jmgasper Sep 25, 2025
7289d3d
Switch to prismatic kafka client and fix an error with bullmq
jmgasper Sep 26, 2025
3968c80
Build fix
jmgasper Sep 26, 2025
becc408
Build fix
jmgasper Sep 26, 2025
1f5d81d
Build fix
jmgasper Sep 26, 2025
09c8163
Minor cleanup and text tweaks
jmgasper Sep 26, 2025
4c44d55
Patch kafka library until https://github.com/platformatic/kafka/issue…
jmgasper Sep 26, 2025
335089a
Build fix
jmgasper Sep 26, 2025
e2bba84
Fix scheduler IDs so bullmq doesn't complain
jmgasper Sep 26, 2025
332ed0d
Pause review phase if reviews are still pending, instead of closing a…
jmgasper Sep 26, 2025
d46e214
Fix up problem where review closes early
jmgasper Sep 26, 2025
bfbdbda
Fix issue with duplicate pending reviews getting created
jmgasper Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ workflows:
branches:
only:
- develop
- reviews

# Production builds are exectuted only on tagged commits to the
# master branch.
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*

# IDE
.vscode/

# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json

Expand Down Expand Up @@ -45,6 +48,7 @@ build/Release
# Dependency directories
node_modules/
jspm_packages/
src/autopilot/generated/

# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
Expand Down
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
22
23 changes: 7 additions & 16 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,34 +1,25 @@
# ---- Base Stage ----
FROM node:20-bookworm AS base
FROM node:22-alpine AS base
WORKDIR /usr/src/app

# ---- Dependencies Stage ----
FROM base AS deps
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
python3 \
make \
g++ \
pkg-config \
librdkafka-dev \
&& rm -rf /var/lib/apt/lists/*
COPY package.json ./
COPY pnpm-lock.yaml ./
COPY patches ./patches
COPY prisma ./prisma
RUN yarn install
RUN npm install -g pnpm
RUN pnpm install

# ---- Build Stage ----
FROM deps AS build
COPY . .
RUN yarn prisma:generate
RUN yarn build
RUN pnpm prisma:generate
RUN pnpm build

# ---- Production Stage ----
FROM base AS production
ENV NODE_ENV production
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
librdkafka1 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /usr/src/app/dist ./dist
COPY --from=deps /usr/src/app/node_modules ./node_modules
EXPOSE 3000
Expand Down
36 changes: 14 additions & 22 deletions docs/SCHEDULER.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,14 @@ The Autopilot Scheduler is an event-based scheduling system that automatically t

### 1. Event-Based Scheduling Mechanism

The system uses NestJS's `@nestjs/schedule` module with `SchedulerRegistry` for dynamic job management:

```typescript
// Key Dependencies Added
"@nestjs/schedule": "^6.0.0"

// Module Configuration
ScheduleModule.forRoot() // Added to AppModule
```
The system uses BullMQ (Redis-backed queues) for dynamic job management and durable delayed execution.

#### Core Scheduling Features

- **Dynamic Job Registration**: Jobs are created and registered at runtime based on phase end times
- **Unique Job Identification**: Each job uses format `{projectId}:{phaseId}`
- **Automatic Cleanup**: Jobs are automatically removed from the registry after execution or cancellation.
- **Timeout-Based Execution**: Uses `setTimeout` for precise, one-time execution, which is ideal for phase deadlines.
- **Dynamic Job Registration**: Jobs are queued at runtime based on phase end times
- **Unique Job Identification**: Each BullMQ job uses format `{challengeId}:{phaseId}`
- **Automatic Cleanup**: Jobs are automatically removed from the queue after execution or cancellation
- **Durable Delays**: Redis stores the delay, so jobs survive restarts and are not subject to Node.js timer limits

### 2. Event Generation

Expand Down Expand Up @@ -180,27 +172,27 @@ Cancels a scheduled phase transition.

- `projectId`: Challenge project ID
- `phaseId`: Phase ID to cancel
- **Returns:** `true` if cancelled successfully
- **Returns:** `Promise<boolean>` resolving to `true` if cancelled successfully (false when the job has already run)

#### reschedulePhaseTransition(projectId: number, newPhaseData: PhaseTransitionPayload): string
#### reschedulePhaseTransition(projectId: number, newPhaseData: PhaseTransitionPayload): Promise<string>

Updates an existing schedule with new timing information.

**Parameters:**

- `projectId`: Challenge project ID
- `newPhaseData`: Updated phase information
- **Returns:** New job ID
- **Returns:** `Promise<string>` resolving to the new BullMQ job ID

### SchedulerService

#### schedulePhaseTransition(phaseData: PhaseTransitionPayload)
#### schedulePhaseTransition(phaseData: PhaseTransitionPayload): Promise<string>

Low-level job scheduling using NestJS SchedulerRegistry.
Queues a delayed phase transition using BullMQ (backed by Redis) and returns the job ID once scheduled.

#### cancelScheduledTransition(jobId: string): boolean
#### cancelScheduledTransition(jobId: string): Promise<boolean>

Removes a scheduled job by ID.
Removes a scheduled BullMQ job by ID.

#### getAllScheduledTransitions(): string[]

Expand All @@ -226,7 +218,7 @@ const phaseData = {
date: '2025-06-20T23:59:59Z'
};

const jobId = autopilotService.schedulePhaseTransition(phaseData);
const jobId = await autopilotService.schedulePhaseTransition(phaseData);
// Job scheduled, will trigger automatically at specified time
```

Expand All @@ -239,7 +231,7 @@ const updatedData = {
date: '2025-06-21T23:59:59Z' // New end time
};

const newJobId = autopilotService.reschedulePhaseTransition(101, updatedData);
const newJobId = await autopilotService.reschedulePhaseTransition(101, updatedData);
// Old job cancelled, new job scheduled with updated time
```

Expand Down
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json",
"prisma:generate": "prisma generate",
"postinstall": "prisma generate"
"prisma:generate": "prisma generate --schema prisma/challenge.schema.prisma && prisma generate --schema prisma/autopilot.schema.prisma",
"postinstall": "prisma generate --schema prisma/challenge.schema.prisma && prisma generate --schema prisma/autopilot.schema.prisma && patch-package",
"prisma:pushautopilot": "prisma db push --schema prisma/autopilot.schema.prisma"
},
"prisma": {
"schema": "prisma/challenge.schema.prisma"
Expand All @@ -37,13 +38,14 @@
"@nestjs/schedule": "^6.0.0",
"@nestjs/swagger": "^7.4.0",
"@nestjs/terminus": "^11.0.0",
"@platformatic/kafka": "^1.14.0",
"@prisma/client": "^6.4.1",
"@types/express": "^5.0.0",
"axios": "^1.9.0",
"bullmq": "^5.58.8",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.2",
"joi": "^17.13.3",
"node-rdkafka": "^2.16.0",
"nest-winston": "^1.10.2",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
Expand Down Expand Up @@ -71,6 +73,7 @@
"globals": "^16.0.0",
"jest": "^29.7.0",
"nodemon": "^3.0.0",
"patch-package": "^8.0.0",
"prettier": "^3.4.2",
"source-map-support": "^0.5.21",
"supertest": "^7.0.0",
Expand Down
55 changes: 55 additions & 0 deletions patches/@platformatic+kafka+1.14.0.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
--- a/node_modules/.pnpm/@platformatic+kafka@1.14.0/node_modules/@platformatic/kafka/dist/clients/consumer/consumer.js
+++ b/node_modules/.pnpm/@platformatic+kafka@1.14.0/node_modules/@platformatic/kafka/dist/clients/consumer/consumer.js
@@ -842,14 +842,44 @@
.appendInt32(0).buffer; // No user data
}
#decodeProtocolAssignment(buffer) {
- const reader = Reader.from(buffer);
- reader.skip(2); // Ignore Version information
- return reader.readArray(r => {
- return {
- topic: r.readString(false),
- partitions: r.readArray(r => r.readInt32(), false, false)
- };
- }, false, false);
+ if (!buffer || typeof buffer.length !== 'number' || buffer.length === 0) {
+ return [];
+ }
+ const totalLength = typeof buffer.length === 'number' ? buffer.length : 0;
+ if (totalLength < 2) {
+ return [];
+ }
+ const decode = (compact) => {
+ const reader = Reader.from(buffer);
+ reader.skip(2); // Ignore Version information
+ return reader.readArray(r => {
+ return {
+ topic: r.readString(compact),
+ partitions: r.readArray(r => r.readInt32(), compact, compact)
+ };
+ }, compact, compact);
+ };
+ const shouldFallback = (error) => error?.code === 'PLT_KFK_USER' && error?.message === 'Out of bounds.';
+ const preferCompact = totalLength - 2 < 4;
+ if (!preferCompact) {
+ try {
+ return decode(false);
+ }
+ catch (error) {
+ if (!shouldFallback(error)) {
+ throw error;
+ }
+ }
+ }
+ try {
+ return decode(true);
+ }
+ catch (error) {
+ if (shouldFallback(error)) {
+ return [];
+ }
+ throw error;
+ }
}
#createAssignments(metadata) {
const partitionTracker = new Map();
Loading