From 1faf3257c9b5aa836ee8a9e0e8b60c1ce954a902 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Fri, 3 Oct 2025 13:39:56 +0300 Subject: [PATCH 1/9] PM-1551 - integrate the AI workflow events --- package.json | 5 +- pnpm-lock.yaml | 144 +++++++++++++ .../migration.sql | 2 + prisma/schema.prisma | 2 +- src/api/webhook/webhook.service.ts | 198 ++++++++++++++++-- src/shared/clients/gitea/gitea.client.ts | 4 +- .../modules/global/challenge.service.ts | 170 +++------------ src/shared/modules/global/gitea.service.ts | 85 +++++++- .../modules/global/globalProviders.module.ts | 3 + .../modules/global/queue-scheduler.service.ts | 89 ++++++++ .../submission-scan-complete.orchestrator.ts | 124 ++++++++--- 11 files changed, 620 insertions(+), 206 deletions(-) create mode 100644 prisma/migrations/20251003103752_rename_workflow_error_to_scheduledjobid/migration.sql create mode 100644 src/shared/modules/global/queue-scheduler.service.ts diff --git a/package.json b/package.json index d8857e4..0f6f9fa 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "@platformatic/kafka": "^1.12.0", "@prisma/client": "^6.3.1", "@types/jsonwebtoken": "^9.0.9", + "archiver": "^6.0.2", "axios": "^1.9.0", "class-transformer": "^0.5.1", "class-validator": "^0.14.1", @@ -42,10 +43,10 @@ "lodash": "^4.17.21", "multer": "^2.0.1", "nanoid": "~5.1.2", + "pg-boss": "^11.0.2", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", - "tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1", - "archiver": "^6.0.2" + "tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1" }, "devDependencies": { "@eslint/eslintrc": "^3.2.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8ff708c..508952d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -71,6 +71,9 @@ importers: nanoid: specifier: ~5.1.2 version: 5.1.2 + pg-boss: + specifier: ^11.0.2 + version: 11.0.2 reflect-metadata: specifier: ^0.2.2 version: 0.2.2 @@ -2563,6 +2566,10 @@ packages: create-require@1.1.1: resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} + cron-parser@4.9.0: + resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} + engines: {node: '>=12.0.0'} + cross-spawn@7.0.6: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} engines: {node: '>= 8'} @@ -3647,6 +3654,10 @@ packages: lru-memoizer@2.3.0: resolution: {integrity: sha512-GXn7gyHAMhO13WSKrIiNfztwxodVsP8IoZ3XfrJV4yH2x0/OeTO/FIaAHTY5YekdGgW94njfuKmyyt1E0mR6Ug==} + luxon@3.7.2: + resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==} + engines: {node: '>=12'} + lz4-napi@2.9.0: resolution: {integrity: sha512-ZOWqxBMIK5768aD20tYn5B6Pp9WPM9UG/LHk8neG9p0gC1DtjdzhTtlkxhAjvTRpmJvMtnnqLKlT+COlqAt9cQ==} engines: {node: '>= 10'} @@ -4011,6 +4022,44 @@ packages: perfect-debounce@1.0.0: resolution: {integrity: sha512-xCy9V055GLEqoFaHoC1SoLIaLmWctgCUaBaWxDZ7/Zx4CTyX7cJQLJOok/orfjZAh9kEYpjJa4d0KcJmCbctZA==} + pg-boss@11.0.2: + resolution: {integrity: sha512-33KQtJpBvsF1C0zkMk2fGKZAptssrSxlban8CqSLJkoY5cwhiJwZL8uOv4T4OkZgbmlcy3nDdBdYOnlRFM+Qcw==} + engines: {node: '>=22'} + + pg-cloudflare@1.2.7: + resolution: {integrity: sha512-YgCtzMH0ptvZJslLM1ffsY4EuGaU0cx4XSdXLRFae8bPP4dS5xL1tNB3k2o/N64cHJpwU7dxKli/nZ2lUa5fLg==} + + pg-connection-string@2.9.1: + resolution: {integrity: sha512-nkc6NpDcvPVpZXxrreI/FOtX3XemeLl8E0qFr6F2Lrm/I8WOnaWNhIPK2Z7OHpw7gh5XJThi6j6ppgNoaT1w4w==} + + pg-int8@1.0.1: + resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} + engines: {node: '>=4.0.0'} + + pg-pool@3.10.1: + resolution: {integrity: sha512-Tu8jMlcX+9d8+QVzKIvM/uJtp07PKr82IUOYEphaWcoBhIYkoHpLXN3qO59nAI11ripznDsEzEv8nUxBVWajGg==} + peerDependencies: + pg: '>=8.0' + + pg-protocol@1.10.3: + resolution: {integrity: sha512-6DIBgBQaTKDJyxnXaLiLR8wBpQQcGWuAESkRBX/t6OwA8YsqP+iVSiond2EDy6Y/dsGk8rh/jtax3js5NeV7JQ==} + + pg-types@2.2.0: + resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} + engines: {node: '>=4'} + + pg@8.16.3: + resolution: {integrity: sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==} + engines: {node: '>= 16.0.0'} + peerDependencies: + pg-native: '>=3.0.1' + peerDependenciesMeta: + pg-native: + optional: true + + pgpass@1.0.5: + resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==} + picocolors@1.1.1: resolution: {integrity: sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==} @@ -4040,6 +4089,22 @@ packages: resolution: {integrity: sha512-Nc3IT5yHzflTfbjgqWcCPpo7DaKy4FnpB0l/zCAW0Tc7jxAiuqSxHasntB3D7887LSrA93kDJ9IXovxJYxyLCA==} engines: {node: '>=4'} + postgres-array@2.0.0: + resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==} + engines: {node: '>=4'} + + postgres-bytea@1.0.0: + resolution: {integrity: sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==} + engines: {node: '>=0.10.0'} + + postgres-date@1.0.7: + resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==} + engines: {node: '>=0.10.0'} + + postgres-interval@1.2.0: + resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==} + engines: {node: '>=0.10.0'} + precond@0.2.3: resolution: {integrity: sha512-QCYG84SgGyGzqJ/vlMsxeXd/pgL/I94ixdNFyh1PusWmTCyVfPJjZ1K1jvHtsbfnXQs2TSkEP2fR7QiMZAnKFQ==} engines: {node: '>= 0.6'} @@ -4285,6 +4350,10 @@ packages: resolution: {integrity: sha512-uaW0WwXKpL9blXE2o0bRhoL2EGXIrZxQ2ZQ4mgcfoBxdFmQold+qWsD2jLrfZ0trjKL6vOw0j//eAwcALFjKSw==} engines: {node: '>= 18'} + serialize-error@8.1.0: + resolution: {integrity: sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ==} + engines: {node: '>=10'} + serialize-javascript@6.0.2: resolution: {integrity: sha512-Saa1xPByTTq2gdeFZYLLo+RFE35NHZkAbqZeWNd3BpzppeVisAqpDjcp8dyf6uIvEqJRd46jemmyA4iFIeVk8g==} @@ -4384,6 +4453,10 @@ packages: resolution: {integrity: sha512-l3BikUxvPOcn5E74dZiq5BGsTb5yEwhaTSzccU6t4sDOH8NWJCstKO5QT2CvtFoK6F0saL7p9xHAqHOlCPJygA==} engines: {node: '>= 8'} + split2@4.2.0: + resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==} + engines: {node: '>= 10.x'} + sprintf-js@1.0.3: resolution: {integrity: sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==} @@ -4652,6 +4725,10 @@ packages: resolution: {integrity: sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==} engines: {node: '>=4'} + type-fest@0.20.2: + resolution: {integrity: sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==} + engines: {node: '>=10'} + type-fest@0.21.3: resolution: {integrity: sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==} engines: {node: '>=10'} @@ -7828,6 +7905,10 @@ snapshots: create-require@1.1.1: {} + cron-parser@4.9.0: + dependencies: + luxon: 3.7.2 + cross-spawn@7.0.6: dependencies: path-key: 3.1.1 @@ -9184,6 +9265,8 @@ snapshots: lodash.clonedeep: 4.5.0 lru-cache: 6.0.0 + luxon@3.7.2: {} + lz4-napi@2.9.0: dependencies: '@node-rs/helper': 1.6.0 @@ -9528,6 +9611,49 @@ snapshots: perfect-debounce@1.0.0: {} + pg-boss@11.0.2: + dependencies: + cron-parser: 4.9.0 + pg: 8.16.3 + serialize-error: 8.1.0 + transitivePeerDependencies: + - pg-native + + pg-cloudflare@1.2.7: + optional: true + + pg-connection-string@2.9.1: {} + + pg-int8@1.0.1: {} + + pg-pool@3.10.1(pg@8.16.3): + dependencies: + pg: 8.16.3 + + pg-protocol@1.10.3: {} + + pg-types@2.2.0: + dependencies: + pg-int8: 1.0.1 + postgres-array: 2.0.0 + postgres-bytea: 1.0.0 + postgres-date: 1.0.7 + postgres-interval: 1.2.0 + + pg@8.16.3: + dependencies: + pg-connection-string: 2.9.1 + pg-pool: 3.10.1(pg@8.16.3) + pg-protocol: 1.10.3 + pg-types: 2.2.0 + pgpass: 1.0.5 + optionalDependencies: + pg-cloudflare: 1.2.7 + + pgpass@1.0.5: + dependencies: + split2: 4.2.0 + picocolors@1.1.1: {} picomatch@2.3.1: {} @@ -9552,6 +9678,16 @@ snapshots: pluralize@8.0.0: {} + postgres-array@2.0.0: {} + + postgres-bytea@1.0.0: {} + + postgres-date@1.0.7: {} + + postgres-interval@1.2.0: + dependencies: + xtend: 4.0.2 + precond@0.2.3: {} prelude-ls@1.2.1: {} @@ -9813,6 +9949,10 @@ snapshots: transitivePeerDependencies: - supports-color + serialize-error@8.1.0: + dependencies: + type-fest: 0.20.2 + serialize-javascript@6.0.2: dependencies: randombytes: 2.1.0 @@ -9953,6 +10093,8 @@ snapshots: source-map@0.7.4: {} + split2@4.2.0: {} + sprintf-js@1.0.3: {} stack-trace@0.0.10: {} @@ -10265,6 +10407,8 @@ snapshots: type-detect@4.0.8: {} + type-fest@0.20.2: {} + type-fest@0.21.3: {} type-is@1.6.18: diff --git a/prisma/migrations/20251003103752_rename_workflow_error_to_scheduledjobid/migration.sql b/prisma/migrations/20251003103752_rename_workflow_error_to_scheduledjobid/migration.sql new file mode 100644 index 0000000..8d1577d --- /dev/null +++ b/prisma/migrations/20251003103752_rename_workflow_error_to_scheduledjobid/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "aiWorkflowRun" RENAME COLUMN "error" TO "scheduledJobId"; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 2c101d7..af30372 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -629,7 +629,7 @@ model aiWorkflowRun { score Float? @db.DoublePrecision status String @db.VarChar usage Json? @db.JsonB - error String? @db.Text + scheduledJobId String? @db.Text completedJobs Int? @default(0) jobsCount Int? @default(0) diff --git a/src/api/webhook/webhook.service.ts b/src/api/webhook/webhook.service.ts index 257e8ba..afd7f5e 100644 --- a/src/api/webhook/webhook.service.ts +++ b/src/api/webhook/webhook.service.ts @@ -6,6 +6,9 @@ import { WebhookEventDto, WebhookResponseDto, } from '../../dto/webhook-event.dto'; +import { QueueSchedulerService } from 'src/shared/modules/global/queue-scheduler.service'; +import { GiteaService } from 'src/shared/modules/global/gitea.service'; +import { aiWorkflowRun } from '@prisma/client'; @Injectable() export class WebhookService { @@ -14,6 +17,8 @@ export class WebhookService { constructor( private readonly prisma: PrismaService, private readonly prismaErrorService: PrismaErrorService, + private readonly scheduler: QueueSchedulerService, + private readonly giteaService: GiteaService, ) {} async processWebhook( @@ -45,7 +50,7 @@ export class WebhookService { }); // Future extensibility: Add event-specific handlers here - this.handleEventSpecificProcessing( + await this.handleEventSpecificProcessing( webhookEvent.event, webhookEvent.eventPayload, ); @@ -76,7 +81,10 @@ export class WebhookService { * Placeholder for future event-specific processing logic * This method can be extended to handle different GitHub events differently */ - private handleEventSpecificProcessing(event: string, payload: any): void { + private async handleEventSpecificProcessing( + event: string, + payload: any, + ): Promise { this.logger.log({ message: 'Event-specific processing placeholder', event, @@ -84,19 +92,22 @@ export class WebhookService { }); // Future implementation examples: - // switch (event) { - // case 'push': - // await this.handlePushEvent(payload); - // break; - // case 'pull_request': - // await this.handlePullRequestEvent(payload); - // break; - // case 'issues': - // await this.handleIssuesEvent(payload); - // break; - // default: - // this.logger.log(`No specific handler for event type: ${event}`); - // } + switch (event) { + case 'workflow_job': + await this.handleWorkflowEvents(event, payload); + break; + // case 'push': + // await this.handlePushEvent(payload); + // break; + // case 'pull_request': + // await this.handlePullRequestEvent(payload); + // break; + // case 'issues': + // await this.handleIssuesEvent(payload); + // break; + default: + this.logger.log(`No specific handler for event type: ${event}`); + } } /** @@ -171,4 +182,161 @@ export class WebhookService { throw error; } } + + async handleWorkflowEvents(event: string, payload: any) { + const aiWorkflowRuns = await this.prisma.aiWorkflowRun.findMany({ + where: { + status: { in: ['DISPATCHED', 'IN_PROGRESS'] }, + gitRunId: `${payload.workflow_job.run_id}`, + }, + include: { + workflow: true, + }, + }); + + if (aiWorkflowRuns.length > 1) { + this.logger.error( + `ERROR! There are more than 1 workflow runs in DISPATCHED status and workflow.gitWorkflowId=${payload.workflow_job.name}!`, + ); + return; + } + + let [aiWorkflowRun]: (aiWorkflowRun | null)[] = aiWorkflowRuns; + + if ( + !aiWorkflowRun && + payload.action === 'in_progress' && + payload.workflow_job.name === 'dump-workflow-context' + ) { + const [owner, repo] = payload.repository.full_name.split('/'); + const { aiWorkflowRunId, jobsCount } = + (await this.giteaService.getAiWorkflowDataFromLogs( + owner, + repo, + payload.workflow_job.id as number, + )) ?? ({} as any); + + if (!aiWorkflowRunId) { + this.logger.error( + `Failed to find workflow run ID from logs for job with id ${payload.workflow_job.id}`, + ); + return; + } + + aiWorkflowRun = await this.prisma.aiWorkflowRun.findUnique({ + where: { + id: aiWorkflowRunId, + }, + include: { + workflow: true, + }, + }); + + if (!aiWorkflowRun || aiWorkflowRun.status !== 'DISPATCHED') { + this.logger.error( + `Workflow run with id ${aiWorkflowRunId} is not in DISPATCHED status or not found. Status: ${aiWorkflowRun?.status}`, + ); + return; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRunId }, + data: { + gitRunId: `${payload.workflow_job.run_id}`, + jobsCount, + completedJobs: { increment: 1 }, + }, + }); + + this.logger.log({ + message: 'Updated aiWorkflowRun with gitRunId after lookup', + aiWorkflowRunId, + gitRunId: payload.workflow_job.run_id, + jobId: payload.workflow_job.id, + }); + } + + if (!aiWorkflowRun) { + this.logger.error({ + message: 'No matching aiWorkflowRun found for workflow_job event', + event, + workflowJobId: payload.workflow_job.id, + gitRunId: payload.workflow_job.run_id, + gitJobStatus: payload.action, + }); + + return; + } + + if (payload.workflow_job.name === 'dump-workflow-context') { + // no further processing needed, this job is meant to sync our db run with the git run + return; + } + + switch (payload.action) { + case 'in_progress': + if (aiWorkflowRun.status !== 'DISPATCHED') { + break; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + status: 'IN_PROGRESS', + startedAt: new Date(), + }, + }); + this.logger.log({ + message: 'Workflow job is now in progress', + aiWorkflowRunId: aiWorkflowRun.id, + gitRunId: payload.workflow_job.run_id, + jobId: payload.workflow_job.id, + status: 'IN_PROGRESS', + timestamp: new Date().toISOString(), + }); + break; + case 'completed': + // we need to mark the run as completed only when the last job in the run has been completed + if ( + (aiWorkflowRun.completedJobs ?? 0) + 1 !== + aiWorkflowRun.jobsCount + ) { + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + completedJobs: { increment: 1 }, + }, + }); + this.logger.log( + `Workflow job ${(aiWorkflowRun.completedJobs ?? 0) + 1}/${aiWorkflowRun.jobsCount} completed.`, + ); + break; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + status: payload.workflow_job.conclusion.toUpperCase(), + completedAt: new Date(), + completedJobs: { increment: 1 }, + }, + }); + await this.scheduler.completeJob( + (aiWorkflowRun as any).workflow.gitWorkflowId, + aiWorkflowRun.scheduledJobId as string, + ); + + this.logger.log({ + message: 'Workflow job completed', + aiWorkflowRunId: aiWorkflowRun.id, + gitRunId: payload.workflow_job.run_id, + jobId: payload.workflow_job.id, + status: payload.workflow_job.conclusion.toUpperCase(), + timestamp: new Date().toISOString(), + }); + break; + default: + break; + } + } } diff --git a/src/shared/clients/gitea/gitea.client.ts b/src/shared/clients/gitea/gitea.client.ts index c433a94..75b868d 100644 --- a/src/shared/clients/gitea/gitea.client.ts +++ b/src/shared/clients/gitea/gitea.client.ts @@ -5734,9 +5734,9 @@ export class Api< jobId: number, params: RequestParams = {}, ) => - this.request({ + this.request({ path: `/repos/${owner}/${repo}/actions/jobs/${jobId}/logs`, - method: "GET", + method: 'GET', secure: true, ...params, }), diff --git a/src/shared/modules/global/challenge.service.ts b/src/shared/modules/global/challenge.service.ts index 9cc6061..cd37f7f 100644 --- a/src/shared/modules/global/challenge.service.ts +++ b/src/shared/modules/global/challenge.service.ts @@ -34,9 +34,13 @@ export class ChallengeData { } export class WorkflowData { - workflowId: string; - ref: string; - params: Record; + id: string; + name: string; + description: string; + llmId: string; + defUrl: string; + gitOwnerRepo: string; + scorecardId: string; } interface ChallengeRow { @@ -88,6 +92,7 @@ interface ChallengeAggregate { track?: ChallengeTrackRow; phases: ChallengePhaseRow[]; metadata: ChallengeMetadataRow[]; + workflows: WorkflowData[]; } @Injectable() @@ -169,6 +174,22 @@ export class ChallengeApiService { WHERE "challengeId" = ${challengeId} `; + const workflows = await this.challengePrisma.$queryRaw` + SELECT + id, + name, + description, + "llmId", + "defUrl", + "gitOwnerRepo", + "scorecardId" + FROM reviews."aiWorkflow" + WHERE id IN ( + SELECT "aiWorkflowId" FROM "ChallengeReviewer" + WHERE "isMemberReview"=false AND "challengeId" = ${challengeId} + ) + `; + const metadata = await this.challengePrisma.$queryRaw< ChallengeMetadataRow[] >` @@ -184,6 +205,7 @@ export class ChallengeApiService { track, phases, metadata, + workflows, }); } catch (error) { this.logger.error( @@ -195,7 +217,7 @@ export class ChallengeApiService { } private mapChallenge(aggregate: ChallengeAggregate): ChallengeData { - const { challenge, legacy, type, track, phases, metadata } = aggregate; + const { challenge, legacy, type, track, phases, workflows } = aggregate; const mappedPhases = phases?.map((phase) => ({ id: phase.id, @@ -214,7 +236,7 @@ export class ChallengeApiService { } : undefined; - const workflows = this.extractWorkflows(metadata, challenge.id); + // const workflows = this.extractWorkflows(metadata, challenge.id); const legacyId = challenge.legacyId ?? legacy?.legacySystemId; @@ -246,144 +268,6 @@ export class ChallengeApiService { }; } - private extractWorkflows( - metadata: ChallengeMetadataRow[], - challengeId: string, - ): WorkflowData[] | undefined { - if (!Array.isArray(metadata) || !metadata.length) { - return undefined; - } - - const workflowEntry = metadata.find( - (meta) => meta.name?.toLowerCase() === 'workflows', - ); - - if (!workflowEntry?.value) { - return undefined; - } - - try { - const parsed: unknown = JSON.parse(workflowEntry.value); - const workflowEntries = this.getWorkflowEntries(parsed); - - if (!workflowEntries?.length) { - return undefined; - } - - const workflows = this.parseWorkflowEntries(workflowEntries); - - if (!workflows.length) { - return undefined; - } - - return workflows; - } catch (error) { - this.logger.warn( - `Unable to parse workflows metadata for challenge ${challengeId}: ${error}`, - ); - return undefined; - } - } - - private getWorkflowEntries(parsed: unknown): unknown[] | undefined { - if (Array.isArray(parsed)) { - return parsed as unknown[]; - } - - if ( - parsed && - typeof parsed === 'object' && - Array.isArray((parsed as { workflows?: unknown }).workflows) - ) { - return (parsed as { workflows: unknown[] }).workflows; - } - - return undefined; - } - - private toWorkflowData(entry: unknown): WorkflowData | undefined { - if (!entry || typeof entry !== 'object') { - return undefined; - } - - const candidate = entry as { - workflowId?: unknown; - ref?: unknown; - params?: unknown; - }; - - if (candidate.workflowId == null || candidate.ref == null) { - return undefined; - } - - if ( - typeof candidate.workflowId !== 'string' && - typeof candidate.workflowId !== 'number' - ) { - return undefined; - } - - if ( - typeof candidate.ref !== 'string' && - typeof candidate.ref !== 'number' - ) { - return undefined; - } - - return { - workflowId: String(candidate.workflowId), - ref: String(candidate.ref), - params: this.coerceWorkflowParams(candidate.params), - }; - } - - private coerceWorkflowParams(input: unknown): Record { - if (!input || typeof input !== 'object' || Array.isArray(input)) { - return {}; - } - - const params: Record = {}; - for (const [key, value] of Object.entries( - input as Record, - )) { - if (value == null) { - continue; - } - - if (typeof value === 'object') { - this.logger.warn(`Skipping workflow parameter for key "${key}"`); - continue; - } - - if ( - typeof value !== 'string' && - typeof value !== 'number' && - typeof value !== 'boolean' - ) { - this.logger.warn( - `Skipping workflow parameter for key "${key}" due to unsupported type`, - ); - continue; - } - - params[key] = String(value); - } - - return params; - } - - private parseWorkflowEntries(entries: unknown[]): WorkflowData[] { - const workflows: WorkflowData[] = []; - for (const entry of entries) { - const parsedWorkflow = this.toWorkflowData(entry); - if (parsedWorkflow) { - workflows.push(parsedWorkflow); - } - } - - return workflows; - } - /** * Check if one of the specified phases is currently open for a challenge */ diff --git a/src/shared/modules/global/gitea.service.ts b/src/shared/modules/global/gitea.service.ts index 923681a..2ee5723 100644 --- a/src/shared/modules/global/gitea.service.ts +++ b/src/shared/modules/global/gitea.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { Api, Repository } from 'src/shared/clients/gitea/gitea.client'; -import { WorkflowData } from './challenge.service'; +import { aiWorkflow, aiWorkflowRun } from '@prisma/client'; /** * GiteaService handles interactions with the Gitea API, specifically for managing repositories. @@ -86,30 +86,93 @@ export class GiteaService { * @param challengeId The ID of the challenge (same as repo). */ async runDispatchWorkflow( - owner: string, - workflow: WorkflowData, - challengeId: string, + workflow: aiWorkflow, + workflowRun: aiWorkflowRun, + dispatchInputs: any, ): Promise { this.logger.log( - `Running workflow: ${workflow.workflowId} with ref: ${workflow.ref}`, + `Running workflow ${workflowRun.workflowId} for submission ${workflowRun.submissionId}`, ); + const [owner, repo] = workflow.gitOwnerRepo.split('/'); + this.logger.log(`Calling dispatch`, { + owner, + repo, + workflowId: workflow.gitWorkflowId, + inputs: dispatchInputs, + }); + try { const response = await this.giteaClient.repos.actionsDispatchWorkflow( owner, - challengeId, - workflow.workflowId, + repo, + workflow.gitWorkflowId, { - ref: workflow.ref, - inputs: workflow.params, + ref: 'refs/heads/main', + inputs: dispatchInputs, }, ); // successful execution of workflow dispatch actually just returns "204 No Content". So we only log status. - this.logger.log(`Workflow dispatched successfully: ${response.status}`); + this.logger.log( + `Workflow dispatched successfully: ${response.status} ${response.statusText}`, + JSON.stringify(response.data), + ); } catch (error) { this.logger.error( - `Error dispatching workflow ${workflow.workflowId}: ${error.message}`, + `Error dispatching workflow ${workflowRun.workflowId}: ${error.message}`, + error, ); throw error; } } + + async getAiWorkflowDataFromLogs( + owner: string, + repo: string, + jobId: number, + retry = 0, + ): Promise<{ aiWorkflowRunId: string; jobsCount: number } | null> { + // 120 re-tryies means ~60seconds (1/500ms) + if (retry >= 120) { + this.logger.error( + `Error retrieving logs for job ${jobId}. retry limit reached!`, + ); + return null; + } + + let logs: string; + try { + logs = ( + await this.giteaClient.repos.downloadActionsRunJobLogs( + owner, + repo, + jobId, + ) + ).data; + + const match = logs.match(/::AI_WORKFLOW_RUN_ID::\s*([a-z0-9-_]{9,})/i); + if (!match?.[1]) { + throw new Error('not found aiWorkflowRunId'); + } + const aiWorkflowRunId = match[1]; + + const jobCountMatch = logs.match(/::JOB_COUNT::(\d+)/i); + const jobsCount = parseInt(jobCountMatch?.[1] ?? '1'); + + this.logger.log('Fetched aiWorkflowRun data from logs:', { + jobsCount, + aiWorkflowRunId, + }); + + return { + aiWorkflowRunId, + jobsCount, + }; + } catch { + // not handling specific errors because API will throw 500 error before the job is queued + // and 404 after it started but no logs are available + // so, seems reasonable to treat it the same + await new Promise((resolve) => setTimeout(resolve, 500)); + return this.getAiWorkflowDataFromLogs(owner, repo, jobId, retry + 1); + } + } } diff --git a/src/shared/modules/global/globalProviders.module.ts b/src/shared/modules/global/globalProviders.module.ts index ae85c43..1c9f28b 100644 --- a/src/shared/modules/global/globalProviders.module.ts +++ b/src/shared/modules/global/globalProviders.module.ts @@ -20,6 +20,7 @@ import { ChallengeCatalogService } from './challenge-catalog.service'; import { SubmissionService } from 'src/api/submission/submission.service'; import { ChallengePrismaService } from './challenge-prisma.service'; import { MemberPrismaService } from './member-prisma.service'; +import { QueueSchedulerService } from './queue-scheduler.service'; // Global module for providing global providers // Add any provider you want to be global here @@ -53,6 +54,7 @@ import { MemberPrismaService } from './member-prisma.service'; GiteaService, SubmissionScanCompleteOrchestrator, SubmissionService, + QueueSchedulerService, ], exports: [ PrismaService, @@ -71,6 +73,7 @@ import { MemberPrismaService } from './member-prisma.service'; SubmissionBaseService, GiteaService, SubmissionScanCompleteOrchestrator, + QueueSchedulerService, ], }) export class GlobalProvidersModule {} diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts new file mode 100644 index 0000000..649fc10 --- /dev/null +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -0,0 +1,89 @@ +import { + Injectable, + Logger, + OnModuleDestroy, + OnModuleInit, +} from '@nestjs/common'; +import * as PgBoss from 'pg-boss'; +import { policies, Queue } from 'pg-boss'; + +/** + * QueueSchedulerService + */ +@Injectable() +export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { + private readonly logger: Logger = new Logger(QueueSchedulerService.name); + private boss: PgBoss; + + private tasksMap = new Map void>(); + + constructor() { + this.logger.log('QueueSchedulerService initialized'); + this.boss = new PgBoss(process.env.PG_BOSS_DB_URL!); + this.boss.on('error', (err) => this.logger.error('pg-boss error:', err)); + } + + async onModuleInit() { + await this.boss.start(); + } + + async onModuleDestroy() { + await this.boss.stop(); + } + + async createQueue(queueName: string, options?: Partial) { + await this.boss.createQueue(queueName, { + name: queueName, + policy: policies.singleton, + ...options, + }); + + this.logger.log(`Created queue with name "${queueName}"`); + } + + async queueJob(queueName: string, jobId, payload?: any, options?: Queue) { + if (!(await this.boss.getQueue(queueName))) { + await this.createQueue(queueName, options); + } + + await this.boss.send(queueName, { + jobId, + ...payload, + }); + + this.logger.log(`Started job ${jobId}`); + } + + async completeJob(queueName: string, jobId: string, result?: any) { + await this.boss.complete(queueName, jobId, result); + if (this.tasksMap.has(jobId)) { + this.tasksMap.get(jobId)?.call(null); + this.tasksMap.delete(jobId); + } + this.logger.log(`Job ${jobId} completed with result:`, result); + } + + async handleWorkForQueues( + queuesNames: string[], + handlerFn: PgBoss.WorkHandler, + ) { + await this.boss.start(); + return Promise.all( + queuesNames.map(async (queueName) => { + const queue = await this.boss.getQueue(queueName); + + // if queue not found, create it so we can start the worker + if (!queue) { + this.logger.warn(`Queue ${queueName} not found!`); + await this.createQueue(queueName); + } + + return this.boss.work(queueName, handlerFn); + }), + ); + } + + trackTask(jobId: string, handler: () => void) { + this.tasksMap.set(jobId, handler); + } +} diff --git a/src/shared/modules/global/submission-scan-complete.orchestrator.ts b/src/shared/modules/global/submission-scan-complete.orchestrator.ts index 06cf9dd..9dc0c97 100644 --- a/src/shared/modules/global/submission-scan-complete.orchestrator.ts +++ b/src/shared/modules/global/submission-scan-complete.orchestrator.ts @@ -1,15 +1,18 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { SubmissionBaseService } from './submission-base.service'; import { ChallengeApiService, ChallengeData } from './challenge.service'; import { GiteaService } from './gitea.service'; import { SubmissionResponseDto } from 'src/dto/submission.dto'; +import { PrismaService } from './prisma.service'; +import { QueueSchedulerService } from './queue-scheduler.service'; +import { Job } from 'pg-boss'; /** * Orchestrator for handling submission scan completion events. * This service coordinates the actions to be taken when a submission scan is complete. */ @Injectable() -export class SubmissionScanCompleteOrchestrator { +export class SubmissionScanCompleteOrchestrator implements OnModuleInit { private readonly logger: Logger = new Logger( SubmissionScanCompleteOrchestrator.name, ); @@ -24,11 +27,26 @@ export class SubmissionScanCompleteOrchestrator { * @param giteaService - Service to interact with Gitea. */ constructor( + private readonly prisma: PrismaService, + private readonly scheduler: QueueSchedulerService, private readonly submissionBaseService: SubmissionBaseService, private readonly challengeApiService: ChallengeApiService, private readonly giteaService: GiteaService, ) {} + async onModuleInit() { + const queues = ( + await this.prisma.aiWorkflow.groupBy({ + by: ['gitWorkflowId'], + }) + ).map((d) => d.gitWorkflowId); + + await this.scheduler.handleWorkForQueues<{ data: any }>( + queues, + this.handleQueuedWorkflowRun.bind(this), + ); + } + async orchestrateScanComplete(submissionId: string): Promise { this.logger.log( `Orchestrating scan complete for submission ID: ${submissionId}`, @@ -44,38 +62,16 @@ export class SubmissionScanCompleteOrchestrator { ); this.logger.log(`Challenge details: ${JSON.stringify(challenge)}`); - await this.giteaService.checkAndCreateRepository( - process.env.GITEA_SUBMISSION_REVIEWS_ORG || 'TC-Reviews-Tests', + if (!Array.isArray(challenge?.workflows)) { + // no ai workflow defined for challenge, return + return; + } + + await this.queueWorkflowRuns( + challenge.workflows, challenge.id, + submissionId, ); - this.logger.log(`Retrieved or created repository`); - - // iterate available workflows for the challenge - if (Array.isArray(challenge?.workflows)) { - let allErrors = ''; - for (const workflow of challenge.workflows) { - try { - await this.giteaService.runDispatchWorkflow( - process.env.GITEA_SUBMISSION_REVIEWS_ORG || 'TC-Reviews-Tests', - workflow, - challenge.id, - ); - } catch (error) { - const errorMessage = `Error processing workflow: ${workflow.workflowId}. Error: ${error.message}.`; - this.logger.error(errorMessage, error); - // don't rethrow error as we want to continue processing other workflows - allErrors += `${errorMessage}. `; - } - } - if (allErrors !== '') { - this.logger.error( - `Errors occurred while processing workflows: ${allErrors}`, - ); - throw new Error(allErrors); - } else { - this.logger.log('All workflows processed successfully.'); - } - } } catch (error) { this.logger.error( `Error orchestrating scan complete for submission ID ${submissionId}`, @@ -84,4 +80,68 @@ export class SubmissionScanCompleteOrchestrator { throw error; } } + + async queueWorkflowRuns( + aiWorkflows: { id: string }[], + challengeId: string, + submissionId: string, + ) { + await this.prisma.$transaction(async (tx) => { + const workflowRuns = await tx.aiWorkflowRun.createManyAndReturn({ + data: aiWorkflows.map((workflow) => ({ + workflowId: workflow.id, + submissionId, + status: 'QUEUED', + gitRunId: '', + })), + include: { + workflow: { select: { gitWorkflowId: true } }, + }, + }); + + for (const run of workflowRuns) { + await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, { + workflowId: run.workflowId, + params: { + challengeId, + submissionId, + aiWorkflowId: run.workflowId, + aiWorkflowRunId: run.id, + }, + }); + } + }); + } + + async handleQueuedWorkflowRun([job]: [Job]) { + this.logger.log(`Processing job ${job.id}`); + + const workflow = await this.prisma.aiWorkflow.findUniqueOrThrow({ + where: { id: (job.data as { workflowId: string })?.workflowId }, + }); + const workflowRun = await this.prisma.aiWorkflowRun.findUniqueOrThrow({ + where: { id: (job.data as { jobId: string })?.jobId }, + }); + + await this.giteaService.runDispatchWorkflow( + workflow, + workflowRun, + (job.data as { params: any })?.params, + ); + + await this.prisma.aiWorkflowRun.update({ + where: { id: workflowRun.id }, + data: { + status: 'DISPATCHED', + scheduledJobId: job.id, + }, + }); + + // return not-resolved promise, + // this will put a pause on the job + // until it is marked as completed via webhook call + return new Promise((resolve) => { + this.scheduler.trackTask(job.id, () => resolve()); + }); + } } From 43a3062524feaad4e9bd2316286011611e175a40 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 22:10:53 +0300 Subject: [PATCH 2/9] Refactor ai workflow handling into its own service --- src/api/webhook/webhook.service.ts | 166 +---------- .../modules/global/globalProviders.module.ts | 3 + .../modules/global/queue-scheduler.service.ts | 12 +- .../submission-scan-complete.orchestrator.ts | 92 +------ .../modules/global/workflow-queue.handler.ts | 259 ++++++++++++++++++ 5 files changed, 276 insertions(+), 256 deletions(-) create mode 100644 src/shared/modules/global/workflow-queue.handler.ts diff --git a/src/api/webhook/webhook.service.ts b/src/api/webhook/webhook.service.ts index afd7f5e..3f42e78 100644 --- a/src/api/webhook/webhook.service.ts +++ b/src/api/webhook/webhook.service.ts @@ -6,9 +6,7 @@ import { WebhookEventDto, WebhookResponseDto, } from '../../dto/webhook-event.dto'; -import { QueueSchedulerService } from 'src/shared/modules/global/queue-scheduler.service'; -import { GiteaService } from 'src/shared/modules/global/gitea.service'; -import { aiWorkflowRun } from '@prisma/client'; +import { WorkflowQueueHandler } from 'src/shared/modules/global/workflow-queue.handler'; @Injectable() export class WebhookService { @@ -17,8 +15,7 @@ export class WebhookService { constructor( private readonly prisma: PrismaService, private readonly prismaErrorService: PrismaErrorService, - private readonly scheduler: QueueSchedulerService, - private readonly giteaService: GiteaService, + private readonly workflowQueueHandler: WorkflowQueueHandler, ) {} async processWebhook( @@ -94,7 +91,7 @@ export class WebhookService { // Future implementation examples: switch (event) { case 'workflow_job': - await this.handleWorkflowEvents(event, payload); + await this.workflowQueueHandler.handleWorkflowRunEvents(payload); break; // case 'push': // await this.handlePushEvent(payload); @@ -182,161 +179,4 @@ export class WebhookService { throw error; } } - - async handleWorkflowEvents(event: string, payload: any) { - const aiWorkflowRuns = await this.prisma.aiWorkflowRun.findMany({ - where: { - status: { in: ['DISPATCHED', 'IN_PROGRESS'] }, - gitRunId: `${payload.workflow_job.run_id}`, - }, - include: { - workflow: true, - }, - }); - - if (aiWorkflowRuns.length > 1) { - this.logger.error( - `ERROR! There are more than 1 workflow runs in DISPATCHED status and workflow.gitWorkflowId=${payload.workflow_job.name}!`, - ); - return; - } - - let [aiWorkflowRun]: (aiWorkflowRun | null)[] = aiWorkflowRuns; - - if ( - !aiWorkflowRun && - payload.action === 'in_progress' && - payload.workflow_job.name === 'dump-workflow-context' - ) { - const [owner, repo] = payload.repository.full_name.split('/'); - const { aiWorkflowRunId, jobsCount } = - (await this.giteaService.getAiWorkflowDataFromLogs( - owner, - repo, - payload.workflow_job.id as number, - )) ?? ({} as any); - - if (!aiWorkflowRunId) { - this.logger.error( - `Failed to find workflow run ID from logs for job with id ${payload.workflow_job.id}`, - ); - return; - } - - aiWorkflowRun = await this.prisma.aiWorkflowRun.findUnique({ - where: { - id: aiWorkflowRunId, - }, - include: { - workflow: true, - }, - }); - - if (!aiWorkflowRun || aiWorkflowRun.status !== 'DISPATCHED') { - this.logger.error( - `Workflow run with id ${aiWorkflowRunId} is not in DISPATCHED status or not found. Status: ${aiWorkflowRun?.status}`, - ); - return; - } - - await this.prisma.aiWorkflowRun.update({ - where: { id: aiWorkflowRunId }, - data: { - gitRunId: `${payload.workflow_job.run_id}`, - jobsCount, - completedJobs: { increment: 1 }, - }, - }); - - this.logger.log({ - message: 'Updated aiWorkflowRun with gitRunId after lookup', - aiWorkflowRunId, - gitRunId: payload.workflow_job.run_id, - jobId: payload.workflow_job.id, - }); - } - - if (!aiWorkflowRun) { - this.logger.error({ - message: 'No matching aiWorkflowRun found for workflow_job event', - event, - workflowJobId: payload.workflow_job.id, - gitRunId: payload.workflow_job.run_id, - gitJobStatus: payload.action, - }); - - return; - } - - if (payload.workflow_job.name === 'dump-workflow-context') { - // no further processing needed, this job is meant to sync our db run with the git run - return; - } - - switch (payload.action) { - case 'in_progress': - if (aiWorkflowRun.status !== 'DISPATCHED') { - break; - } - - await this.prisma.aiWorkflowRun.update({ - where: { id: aiWorkflowRun.id }, - data: { - status: 'IN_PROGRESS', - startedAt: new Date(), - }, - }); - this.logger.log({ - message: 'Workflow job is now in progress', - aiWorkflowRunId: aiWorkflowRun.id, - gitRunId: payload.workflow_job.run_id, - jobId: payload.workflow_job.id, - status: 'IN_PROGRESS', - timestamp: new Date().toISOString(), - }); - break; - case 'completed': - // we need to mark the run as completed only when the last job in the run has been completed - if ( - (aiWorkflowRun.completedJobs ?? 0) + 1 !== - aiWorkflowRun.jobsCount - ) { - await this.prisma.aiWorkflowRun.update({ - where: { id: aiWorkflowRun.id }, - data: { - completedJobs: { increment: 1 }, - }, - }); - this.logger.log( - `Workflow job ${(aiWorkflowRun.completedJobs ?? 0) + 1}/${aiWorkflowRun.jobsCount} completed.`, - ); - break; - } - - await this.prisma.aiWorkflowRun.update({ - where: { id: aiWorkflowRun.id }, - data: { - status: payload.workflow_job.conclusion.toUpperCase(), - completedAt: new Date(), - completedJobs: { increment: 1 }, - }, - }); - await this.scheduler.completeJob( - (aiWorkflowRun as any).workflow.gitWorkflowId, - aiWorkflowRun.scheduledJobId as string, - ); - - this.logger.log({ - message: 'Workflow job completed', - aiWorkflowRunId: aiWorkflowRun.id, - gitRunId: payload.workflow_job.run_id, - jobId: payload.workflow_job.id, - status: payload.workflow_job.conclusion.toUpperCase(), - timestamp: new Date().toISOString(), - }); - break; - default: - break; - } - } } diff --git a/src/shared/modules/global/globalProviders.module.ts b/src/shared/modules/global/globalProviders.module.ts index 1c9f28b..930900b 100644 --- a/src/shared/modules/global/globalProviders.module.ts +++ b/src/shared/modules/global/globalProviders.module.ts @@ -21,6 +21,7 @@ import { SubmissionService } from 'src/api/submission/submission.service'; import { ChallengePrismaService } from './challenge-prisma.service'; import { MemberPrismaService } from './member-prisma.service'; import { QueueSchedulerService } from './queue-scheduler.service'; +import { WorkflowQueueHandler } from './workflow-queue.handler'; // Global module for providing global providers // Add any provider you want to be global here @@ -55,6 +56,7 @@ import { QueueSchedulerService } from './queue-scheduler.service'; SubmissionScanCompleteOrchestrator, SubmissionService, QueueSchedulerService, + WorkflowQueueHandler, ], exports: [ PrismaService, @@ -74,6 +76,7 @@ import { QueueSchedulerService } from './queue-scheduler.service'; GiteaService, SubmissionScanCompleteOrchestrator, QueueSchedulerService, + WorkflowQueueHandler, ], }) export class GlobalProvidersModule {} diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 649fc10..e7d443c 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -15,7 +15,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { private readonly logger: Logger = new Logger(QueueSchedulerService.name); private boss: PgBoss; - private tasksMap = new Map void>(); + private jobsHandlersMap = new Map void>(); constructor() { this.logger.log('QueueSchedulerService initialized'); @@ -56,9 +56,9 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { async completeJob(queueName: string, jobId: string, result?: any) { await this.boss.complete(queueName, jobId, result); - if (this.tasksMap.has(jobId)) { - this.tasksMap.get(jobId)?.call(null); - this.tasksMap.delete(jobId); + if (this.jobsHandlersMap.has(jobId)) { + this.jobsHandlersMap.get(jobId)?.call(null); + this.jobsHandlersMap.delete(jobId); } this.logger.log(`Job ${jobId} completed with result:`, result); } @@ -83,7 +83,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { ); } - trackTask(jobId: string, handler: () => void) { - this.tasksMap.set(jobId, handler); + trackJobHandler(jobId: string, handler: () => void) { + this.jobsHandlersMap.set(jobId, handler); } } diff --git a/src/shared/modules/global/submission-scan-complete.orchestrator.ts b/src/shared/modules/global/submission-scan-complete.orchestrator.ts index 9dc0c97..9d0556e 100644 --- a/src/shared/modules/global/submission-scan-complete.orchestrator.ts +++ b/src/shared/modules/global/submission-scan-complete.orchestrator.ts @@ -1,18 +1,15 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { SubmissionBaseService } from './submission-base.service'; import { ChallengeApiService, ChallengeData } from './challenge.service'; -import { GiteaService } from './gitea.service'; import { SubmissionResponseDto } from 'src/dto/submission.dto'; -import { PrismaService } from './prisma.service'; -import { QueueSchedulerService } from './queue-scheduler.service'; -import { Job } from 'pg-boss'; +import { WorkflowQueueHandler } from './workflow-queue.handler'; /** * Orchestrator for handling submission scan completion events. * This service coordinates the actions to be taken when a submission scan is complete. */ @Injectable() -export class SubmissionScanCompleteOrchestrator implements OnModuleInit { +export class SubmissionScanCompleteOrchestrator { private readonly logger: Logger = new Logger( SubmissionScanCompleteOrchestrator.name, ); @@ -27,26 +24,11 @@ export class SubmissionScanCompleteOrchestrator implements OnModuleInit { * @param giteaService - Service to interact with Gitea. */ constructor( - private readonly prisma: PrismaService, - private readonly scheduler: QueueSchedulerService, private readonly submissionBaseService: SubmissionBaseService, private readonly challengeApiService: ChallengeApiService, - private readonly giteaService: GiteaService, + private readonly workflowQueueHandler: WorkflowQueueHandler, ) {} - async onModuleInit() { - const queues = ( - await this.prisma.aiWorkflow.groupBy({ - by: ['gitWorkflowId'], - }) - ).map((d) => d.gitWorkflowId); - - await this.scheduler.handleWorkForQueues<{ data: any }>( - queues, - this.handleQueuedWorkflowRun.bind(this), - ); - } - async orchestrateScanComplete(submissionId: string): Promise { this.logger.log( `Orchestrating scan complete for submission ID: ${submissionId}`, @@ -67,7 +49,7 @@ export class SubmissionScanCompleteOrchestrator implements OnModuleInit { return; } - await this.queueWorkflowRuns( + await this.workflowQueueHandler.queueWorkflowRuns( challenge.workflows, challenge.id, submissionId, @@ -80,68 +62,4 @@ export class SubmissionScanCompleteOrchestrator implements OnModuleInit { throw error; } } - - async queueWorkflowRuns( - aiWorkflows: { id: string }[], - challengeId: string, - submissionId: string, - ) { - await this.prisma.$transaction(async (tx) => { - const workflowRuns = await tx.aiWorkflowRun.createManyAndReturn({ - data: aiWorkflows.map((workflow) => ({ - workflowId: workflow.id, - submissionId, - status: 'QUEUED', - gitRunId: '', - })), - include: { - workflow: { select: { gitWorkflowId: true } }, - }, - }); - - for (const run of workflowRuns) { - await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, { - workflowId: run.workflowId, - params: { - challengeId, - submissionId, - aiWorkflowId: run.workflowId, - aiWorkflowRunId: run.id, - }, - }); - } - }); - } - - async handleQueuedWorkflowRun([job]: [Job]) { - this.logger.log(`Processing job ${job.id}`); - - const workflow = await this.prisma.aiWorkflow.findUniqueOrThrow({ - where: { id: (job.data as { workflowId: string })?.workflowId }, - }); - const workflowRun = await this.prisma.aiWorkflowRun.findUniqueOrThrow({ - where: { id: (job.data as { jobId: string })?.jobId }, - }); - - await this.giteaService.runDispatchWorkflow( - workflow, - workflowRun, - (job.data as { params: any })?.params, - ); - - await this.prisma.aiWorkflowRun.update({ - where: { id: workflowRun.id }, - data: { - status: 'DISPATCHED', - scheduledJobId: job.id, - }, - }); - - // return not-resolved promise, - // this will put a pause on the job - // until it is marked as completed via webhook call - return new Promise((resolve) => { - this.scheduler.trackTask(job.id, () => resolve()); - }); - } } diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts new file mode 100644 index 0000000..04612af --- /dev/null +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -0,0 +1,259 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { GiteaService } from './gitea.service'; +import { PrismaService } from './prisma.service'; +import { QueueSchedulerService } from './queue-scheduler.service'; +import { Job } from 'pg-boss'; +import { aiWorkflowRun } from '@prisma/client'; + +@Injectable() +export class WorkflowQueueHandler implements OnModuleInit { + private readonly logger: Logger = new Logger(WorkflowQueueHandler.name); + + constructor( + private readonly prisma: PrismaService, + private readonly scheduler: QueueSchedulerService, + private readonly giteaService: GiteaService, + ) {} + + async onModuleInit() { + const queues = ( + await this.prisma.aiWorkflow.groupBy({ + by: ['gitWorkflowId'], + }) + ).map((d) => d.gitWorkflowId); + + await this.scheduler.handleWorkForQueues<{ data: any }>( + queues, + this.handleQueuedWorkflowRun.bind(this), + ); + } + + async queueWorkflowRuns( + aiWorkflows: { id: string }[], + challengeId: string, + submissionId: string, + ) { + await this.prisma.$transaction(async (tx) => { + const workflowRuns = await tx.aiWorkflowRun.createManyAndReturn({ + data: aiWorkflows.map((workflow) => ({ + workflowId: workflow.id, + submissionId, + status: 'QUEUED', + gitRunId: '', + })), + include: { + workflow: { select: { gitWorkflowId: true } }, + }, + }); + + for (const run of workflowRuns) { + await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, { + workflowId: run.workflowId, + params: { + challengeId, + submissionId, + aiWorkflowId: run.workflowId, + aiWorkflowRunId: run.id, + }, + }); + } + }); + } + + async handleQueuedWorkflowRun([job]: [Job]) { + this.logger.log(`Processing job ${job.id}`); + + const workflow = await this.prisma.aiWorkflow.findUniqueOrThrow({ + where: { id: (job.data as { workflowId: string })?.workflowId }, + }); + const workflowRun = await this.prisma.aiWorkflowRun.findUniqueOrThrow({ + where: { id: (job.data as { jobId: string })?.jobId }, + }); + + await this.giteaService.runDispatchWorkflow( + workflow, + workflowRun, + (job.data as { params: any })?.params, + ); + + await this.prisma.aiWorkflowRun.update({ + where: { id: workflowRun.id }, + data: { + status: 'DISPATCHED', + scheduledJobId: job.id, + }, + }); + + // return not-resolved promise, + // this will put a pause on the job + // until it is marked as completed via webhook call + return new Promise((resolve) => { + this.scheduler.trackJobHandler(job.id, () => resolve()); + }); + } + + async handleWorkflowRunEvents(event: { + action: 'queued' | 'in_progress' | 'completed'; + workflow_job: { + id: number; + run_id: string; + name: string; + conclusion: string; + }; + repository: { full_name: string }; + }) { + const aiWorkflowRuns = await this.prisma.aiWorkflowRun.findMany({ + where: { + status: { in: ['DISPATCHED', 'IN_PROGRESS'] }, + gitRunId: `${event.workflow_job.run_id}`, + }, + include: { + workflow: true, + }, + }); + + if (aiWorkflowRuns.length > 1) { + this.logger.error( + `ERROR! There are more than 1 workflow runs in DISPATCHED status and workflow.gitWorkflowId=${event.workflow_job.name}!`, + ); + return; + } + + let [aiWorkflowRun]: (aiWorkflowRun | null)[] = aiWorkflowRuns; + + if ( + !aiWorkflowRun && + event.action === 'in_progress' && + event.workflow_job.name === 'dump-workflow-context' + ) { + const [owner, repo] = event.repository.full_name.split('/'); + const { aiWorkflowRunId, jobsCount } = + (await this.giteaService.getAiWorkflowDataFromLogs( + owner, + repo, + event.workflow_job.id, + )) ?? ({} as any); + + if (!aiWorkflowRunId) { + this.logger.error( + `Failed to find workflow run ID from logs for job with id ${event.workflow_job.id}`, + ); + return; + } + + aiWorkflowRun = await this.prisma.aiWorkflowRun.findUnique({ + where: { + id: aiWorkflowRunId, + }, + include: { + workflow: true, + }, + }); + + if (!aiWorkflowRun || aiWorkflowRun.status !== 'DISPATCHED') { + this.logger.error( + `Workflow run with id ${aiWorkflowRunId} is not in DISPATCHED status or not found. Status: ${aiWorkflowRun?.status}`, + ); + return; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRunId }, + data: { + gitRunId: `${event.workflow_job.run_id}`, + jobsCount, + completedJobs: { increment: 1 }, + }, + }); + + this.logger.log({ + message: 'Updated aiWorkflowRun with gitRunId after lookup', + aiWorkflowRunId, + gitRunId: event.workflow_job.run_id, + jobId: event.workflow_job.id, + }); + } + + if (!aiWorkflowRun) { + this.logger.error({ + message: 'No matching aiWorkflowRun found for workflow_job event', + workflowJobId: event.workflow_job.id, + gitRunId: event.workflow_job.run_id, + gitJobStatus: event.action, + }); + + return; + } + + if (event.workflow_job.name === 'dump-workflow-context') { + // no further processing needed, this job is meant to sync our db run with the git run + return; + } + + switch (event.action) { + case 'in_progress': + if (aiWorkflowRun.status !== 'DISPATCHED') { + break; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + status: 'IN_PROGRESS', + startedAt: new Date(), + }, + }); + this.logger.log({ + message: 'Workflow job is now in progress', + aiWorkflowRunId: aiWorkflowRun.id, + gitRunId: event.workflow_job.run_id, + jobId: event.workflow_job.id, + status: 'IN_PROGRESS', + timestamp: new Date().toISOString(), + }); + break; + case 'completed': + // we need to mark the run as completed only when the last job in the run has been completed + if ( + (aiWorkflowRun.completedJobs ?? 0) + 1 !== + aiWorkflowRun.jobsCount + ) { + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + completedJobs: { increment: 1 }, + }, + }); + this.logger.log( + `Workflow job ${(aiWorkflowRun.completedJobs ?? 0) + 1}/${aiWorkflowRun.jobsCount} completed.`, + ); + break; + } + + await this.prisma.aiWorkflowRun.update({ + where: { id: aiWorkflowRun.id }, + data: { + status: event.workflow_job.conclusion.toUpperCase(), + completedAt: new Date(), + completedJobs: { increment: 1 }, + }, + }); + await this.scheduler.completeJob( + (aiWorkflowRun as any).workflow.gitWorkflowId, + aiWorkflowRun.scheduledJobId as string, + ); + + this.logger.log({ + message: 'Workflow job completed', + aiWorkflowRunId: aiWorkflowRun.id, + gitRunId: event.workflow_job.run_id, + jobId: event.workflow_job.id, + status: event.workflow_job.conclusion.toUpperCase(), + timestamp: new Date().toISOString(), + }); + break; + default: + break; + } + } +} From 44d3163db78966840f55ce1a8322e52f46ab803d Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 22:32:59 +0300 Subject: [PATCH 3/9] skip unnecessary events --- src/shared/modules/global/workflow-queue.handler.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index 04612af..c3c49f6 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -93,7 +93,7 @@ export class WorkflowQueueHandler implements OnModuleInit { } async handleWorkflowRunEvents(event: { - action: 'queued' | 'in_progress' | 'completed'; + action: 'queued' | 'waiting' | 'in_progress' | 'completed'; workflow_job: { id: number; run_id: string; @@ -102,6 +102,13 @@ export class WorkflowQueueHandler implements OnModuleInit { }; repository: { full_name: string }; }) { + if (!['in_progress', 'completed'].includes(event.action)) { + this.logger.log( + `Skipping ${event.action} event for git workflow id ${event.workflow_job.id}.`, + ); + return; + } + const aiWorkflowRuns = await this.prisma.aiWorkflowRun.findMany({ where: { status: { in: ['DISPATCHED', 'IN_PROGRESS'] }, From 58d483f4b7d0956b1bf471a41c6bdcc667283ce9 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 22:57:31 +0300 Subject: [PATCH 4/9] PR feedback & add DISPATCH_AI_REVIEW_WORKFLOWS env flag --- src/shared/modules/global/gitea.service.ts | 4 +- .../modules/global/queue-scheduler.service.ts | 40 ++++++++++++++++++- .../modules/global/workflow-queue.handler.ts | 18 ++++++++- 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/shared/modules/global/gitea.service.ts b/src/shared/modules/global/gitea.service.ts index 2ee5723..8f7a051 100644 --- a/src/shared/modules/global/gitea.service.ts +++ b/src/shared/modules/global/gitea.service.ts @@ -131,7 +131,7 @@ export class GiteaService { jobId: number, retry = 0, ): Promise<{ aiWorkflowRunId: string; jobsCount: number } | null> { - // 120 re-tryies means ~60seconds (1/500ms) + // 120 re-tries means ~60seconds (1/500ms) if (retry >= 120) { this.logger.error( `Error retrieving logs for job ${jobId}. retry limit reached!`, @@ -156,7 +156,7 @@ export class GiteaService { const aiWorkflowRunId = match[1]; const jobCountMatch = logs.match(/::JOB_COUNT::(\d+)/i); - const jobsCount = parseInt(jobCountMatch?.[1] ?? '1'); + const jobsCount = parseInt(jobCountMatch?.[1] ?? ''); this.logger.log('Fetched aiWorkflowRun data from logs:', { jobsCount, diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index e7d443c..682239a 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -17,21 +17,47 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { private jobsHandlersMap = new Map void>(); + get isEnabled() { + return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true'; + } + constructor() { + if (!this.isEnabled) { + this.logger.log( + 'env.DISPATCH_AI_REVIEW_WORKFLOWS is not true, pgboss is disabled.', + ); + return; + } + if (!process.env.PG_BOSS_DB_URL) { + throw new Error( + `Env var 'PG_BOSS_DB_URL' is missing! Please configure it or set 'DISPATCH_AI_REVIEW_WORKFLOWS' to false.`, + ); + } this.logger.log('QueueSchedulerService initialized'); - this.boss = new PgBoss(process.env.PG_BOSS_DB_URL!); + this.boss = new PgBoss(process.env.PG_BOSS_DB_URL); this.boss.on('error', (err) => this.logger.error('pg-boss error:', err)); } async onModuleInit() { + if (!this.isEnabled) { + return; + } + await this.boss.start(); } async onModuleDestroy() { + if (!this.isEnabled) { + return; + } + await this.boss.stop(); } async createQueue(queueName: string, options?: Partial) { + if (!this.isEnabled) { + return; + } await this.boss.createQueue(queueName, { name: queueName, policy: policies.singleton, @@ -42,6 +68,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { } async queueJob(queueName: string, jobId, payload?: any, options?: Queue) { + if (!this.isEnabled) { + return; + } + if (!(await this.boss.getQueue(queueName))) { await this.createQueue(queueName, options); } @@ -55,6 +85,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { } async completeJob(queueName: string, jobId: string, result?: any) { + if (!this.isEnabled) { + return; + } + await this.boss.complete(queueName, jobId, result); if (this.jobsHandlersMap.has(jobId)) { this.jobsHandlersMap.get(jobId)?.call(null); @@ -67,6 +101,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { queuesNames: string[], handlerFn: PgBoss.WorkHandler, ) { + if (!this.isEnabled) { + return; + } + await this.boss.start(); return Promise.all( queuesNames.map(async (queueName) => { diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index c3c49f6..81454f8 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -38,7 +38,7 @@ export class WorkflowQueueHandler implements OnModuleInit { data: aiWorkflows.map((workflow) => ({ workflowId: workflow.id, submissionId, - status: 'QUEUED', + status: 'INIT', gitRunId: '', })), include: { @@ -46,6 +46,10 @@ export class WorkflowQueueHandler implements OnModuleInit { }, }); + if (!this.scheduler.isEnabled) { + return; + } + for (const run of workflowRuns) { await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, { workflowId: run.workflowId, @@ -56,6 +60,11 @@ export class WorkflowQueueHandler implements OnModuleInit { aiWorkflowRunId: run.id, }, }); + + await tx.aiWorkflowRun.update({ + where: { id: run.id }, + data: { status: 'QUEUED' }, + }); } }); } @@ -148,6 +157,13 @@ export class WorkflowQueueHandler implements OnModuleInit { return; } + if (!jobsCount) { + this.logger.error( + `Failed to find jobs count from logs for job with id ${event.workflow_job.id}`, + ); + return; + } + aiWorkflowRun = await this.prisma.aiWorkflowRun.findUnique({ where: { id: aiWorkflowRunId, From 4c999acec7c24c3b459a6059c1756c72bb10ec83 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 23:02:39 +0300 Subject: [PATCH 5/9] PR feedback --- src/shared/modules/global/queue-scheduler.service.ts | 9 +++++++-- src/shared/modules/global/workflow-queue.handler.ts | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 682239a..1bfa424 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -67,7 +67,12 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { this.logger.log(`Created queue with name "${queueName}"`); } - async queueJob(queueName: string, jobId, payload?: any, options?: Queue) { + async queueJob( + queueName: string, + jobId: string, + payload?: any, + options?: Queue, + ) { if (!this.isEnabled) { return; } @@ -121,7 +126,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { ); } - trackJobHandler(jobId: string, handler: () => void) { + registerJobHandler(jobId: string, handler: () => void) { this.jobsHandlersMap.set(jobId, handler); } } diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index 81454f8..949a737 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -97,7 +97,7 @@ export class WorkflowQueueHandler implements OnModuleInit { // this will put a pause on the job // until it is marked as completed via webhook call return new Promise((resolve) => { - this.scheduler.trackJobHandler(job.id, () => resolve()); + this.scheduler.registerJobHandler(job.id, () => resolve()); }); } From 1d15788255701935c75b3c142b1b315650bed01d Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 23:07:20 +0300 Subject: [PATCH 6/9] add loggers in queue-scheduler --- .../modules/global/queue-scheduler.service.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 1bfa424..ac3d1c8 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -74,6 +74,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { options?: Queue, ) { if (!this.isEnabled) { + this.logger.log('PgBoss is disabled, skipping queueing job!', { + queueName, + jobId, + }); return; } @@ -91,6 +95,13 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { async completeJob(queueName: string, jobId: string, result?: any) { if (!this.isEnabled) { + this.logger.log( + 'PgBoss is disabled, skipping marking job as completed!', + { + queueName, + jobId, + }, + ); return; } @@ -107,6 +118,9 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { handlerFn: PgBoss.WorkHandler, ) { if (!this.isEnabled) { + this.logger.log('PgBoss is disabled, cannot register worker!', { + queuesNames, + }); return; } From c7efa27afca34ab08220a4df1c52d80e436c1fab Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Sun, 5 Oct 2025 23:09:17 +0300 Subject: [PATCH 7/9] more logging --- src/shared/modules/global/workflow-queue.handler.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index 949a737..a522506 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -47,6 +47,9 @@ export class WorkflowQueueHandler implements OnModuleInit { }); if (!this.scheduler.isEnabled) { + this.logger.log( + 'Scheduler is disabled, skipping scheduling workflowRuns for now!', + ); return; } From 8e819680cdc2d4034977455b484a1d9dc7bf99e2 Mon Sep 17 00:00:00 2001 From: Kiril Kartunov Date: Mon, 6 Oct 2025 11:01:19 +0300 Subject: [PATCH 8/9] docs clean up --- .env.sample | 2 -- docs/GITEA_WEBHOOK_SETUP.md | 2 +- docs/GITEA_WORKFLOW_SETUP.md | 2 -- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/.env.sample b/.env.sample index 4cbf963..f8446fd 100644 --- a/.env.sample +++ b/.env.sample @@ -9,8 +9,6 @@ VALID_ISSUERS='["https://testsachin.topcoder-dev.com/","https://test-sachin-rs25 GITEA_WEBHOOK_AUTH="your_webhook_secret_here" GITEA_TOKEN="your_gitea_token_here" GITEA_BASE_URL="https://git.topcoder-dev.com/api/v1" -GITEA_SUBMISSION_REVIEWS_ORG="TC-Reviews-Tests" -GITEA_SUBMISSION_REVIEW_NEW_REPO_DEF_BRANCH="develop" # AI Review Workflows DISPATCH_AI_REVIEW_WORKFLOWS=true|false|undefined diff --git a/docs/GITEA_WEBHOOK_SETUP.md b/docs/GITEA_WEBHOOK_SETUP.md index a68d2d0..84d8ef4 100644 --- a/docs/GITEA_WEBHOOK_SETUP.md +++ b/docs/GITEA_WEBHOOK_SETUP.md @@ -203,7 +203,7 @@ git push origin main - `Content-Type: application/json` - `X-Gitea-Event: {event_type}` - Gitea event type (push, pull_request, etc.) - `X-Gitea-Delivery: {delivery_id}` - Unique delivery identifier from Gitea -- `Authorization: Bearer {GITEA_WEBHOOK_AUTH}` - Token used to verify authorization +- `Authorization: SecretKey {GITEA_WEBHOOK_AUTH}` - Token used to verify authorization **Request Body:** diff --git a/docs/GITEA_WORKFLOW_SETUP.md b/docs/GITEA_WORKFLOW_SETUP.md index 4894aee..4554802 100644 --- a/docs/GITEA_WORKFLOW_SETUP.md +++ b/docs/GITEA_WORKFLOW_SETUP.md @@ -4,8 +4,6 @@ New environment properties were added to support Gitea Workflows. These new prop - GITEA_TOKEN - the gitea token - GITEA_BASE_URL - gitea base URL which will be used by generated gitea client -- GITEA_SUBMISSION_REVIEWS_ORG - the gitea Org name used for submission reviews -- GITEA_SUBMISSION_REVIEW_NEW_REPO_DEF_BRANCH - default branch name to use when creating new repositories for submission reviews via AI These new properties are added in .env.sample. From 963bb06ec409a7e861905438cd728e45756bd58b Mon Sep 17 00:00:00 2001 From: Kiril Kartunov Date: Mon, 6 Oct 2025 11:10:39 +0300 Subject: [PATCH 9/9] rename pg-boss url --- src/shared/modules/global/queue-scheduler.service.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index ac3d1c8..a05a738 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -28,13 +28,13 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { ); return; } - if (!process.env.PG_BOSS_DB_URL) { + if (!process.env.PGBOSS_DATABASE_URL) { throw new Error( - `Env var 'PG_BOSS_DB_URL' is missing! Please configure it or set 'DISPATCH_AI_REVIEW_WORKFLOWS' to false.`, + `Env var 'PGBOSS_DATABASE_URL' is missing! Please configure it or set 'DISPATCH_AI_REVIEW_WORKFLOWS' to false.`, ); } this.logger.log('QueueSchedulerService initialized'); - this.boss = new PgBoss(process.env.PG_BOSS_DB_URL); + this.boss = new PgBoss(process.env.PGBOSS_DATABASE_URL); this.boss.on('error', (err) => this.logger.error('pg-boss error:', err)); }