Skip to content

Conversation

vas3a
Copy link
Collaborator

@vas3a vas3a commented Oct 3, 2025

https://topcoder.atlassian.net/browse/PM-1551 - Integrate webhook event processing with tc workflow

NOTE: this is a draft PR. I'll update with more details after code cleanup.

status String @db.VarChar
usage Json? @db.JsonB
error String? @db.Text
scheduledJobId String? @db.Text
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming scheduledJobId to scheduledJobID for consistency with common naming conventions that capitalize acronyms.

constructor(
private readonly prisma: PrismaService,
private readonly prismaErrorService: PrismaErrorService,
private readonly scheduler: QueueSchedulerService,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QueueSchedulerService is injected but not used in the current code. Ensure that it is necessary for future implementation or remove it to keep the code clean.

private readonly prisma: PrismaService,
private readonly prismaErrorService: PrismaErrorService,
private readonly scheduler: QueueSchedulerService,
private readonly giteaService: GiteaService,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GiteaService is injected but not used in the current code. Ensure that it is necessary for future implementation or remove it to keep the code clean.


// Future extensibility: Add event-specific handlers here
this.handleEventSpecificProcessing(
await this.handleEventSpecificProcessing(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the promise rejection from await this.handleEventSpecificProcessing(...) to ensure that any errors are properly caught and managed. This can prevent unhandled promise rejections and improve error handling in the application.

* This method can be extended to handle different GitHub events differently
*/
private handleEventSpecificProcessing(event: string, payload: any): void {
private async handleEventSpecificProcessing(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider specifying the type of the payload parameter instead of using any to improve type safety and clarity.

// }
switch (event) {
case 'workflow_job':
await this.handleWorkflowEvents(event, payload);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming handleWorkflowEvents to handleWorkflowJobEvent for clarity, as it seems to handle the 'workflow_job' event specifically.

}
}

async handleWorkflowEvents(event: string, payload: any) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider specifying the type of the payload parameter instead of using any to improve type safety and code readability.

return;
}

let [aiWorkflowRun]: (aiWorkflowRun | null)[] = aiWorkflowRuns;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of array destructuring with (aiWorkflowRun | null)[] is unnecessary here. You can directly assign aiWorkflowRun from aiWorkflowRuns[0] since you've already checked the length of aiWorkflowRuns.

payload.workflow_job.id as number,
)) ?? ({} as any);

if (!aiWorkflowRunId) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the case where jobsCount is undefined or null after destructuring from getAiWorkflowDataFromLogs to avoid potential runtime errors.

timestamp: new Date().toISOString(),
});
break;
case 'completed':
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment about marking the run as completed only when the last job has been completed is helpful, but ensure that the logic correctly handles edge cases, such as when jobsCount is zero or undefined.

await this.prisma.aiWorkflowRun.update({
where: { id: aiWorkflowRun.id },
data: {
status: payload.workflow_job.conclusion.toUpperCase(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that payload.workflow_job.conclusion is always defined and in the expected format before calling toUpperCase() to avoid potential runtime errors.

params: RequestParams = {},
) =>
this.request<void, any>({
this.request<string, any>({
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type has been changed from void to string. Ensure that this change is intentional and that the rest of the codebase correctly handles the new return type.

workflowId: string;
ref: string;
params: Record<string, string>;
id: string;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming id to workflowId for clarity and consistency with previous naming conventions.

id: string;
name: string;
description: string;
llmId: string;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property llmId might need a more descriptive name if it is not immediately clear what 'llm' stands for.

description: string;
llmId: string;
defUrl: string;
gitOwnerRepo: string;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property gitOwnerRepo could be split into two separate properties, gitOwner and gitRepo, for better clarity and separation of concerns.

WHERE "challengeId" = ${challengeId}
`;

const workflows = await this.challengePrisma.$queryRaw<WorkflowData[]>`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using parameterized queries to prevent SQL injection vulnerabilities. For example, use Prisma's parameterized query syntax instead of embedding variables directly into the query string.


private mapChallenge(aggregate: ChallengeAggregate): ChallengeData {
const { challenge, legacy, type, track, phases, metadata } = aggregate;
const { challenge, legacy, type, track, phases, workflows } = aggregate;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable workflows is extracted from aggregate but is not used in the mapChallenge function. Consider removing it if it's not needed or ensure it is utilized appropriately.

jobId: number,
retry = 0,
): Promise<{ aiWorkflowRunId: string; jobsCount: number } | null> {
// 120 re-tryies means ~60seconds (1/500ms)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 134 mentions '120 re-tryies', which seems to have a typo. It should be 'retries'.

)
).data;

const match = logs.match(/::AI_WORKFLOW_RUN_ID::\s*([a-z0-9-_]{9,})/i);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regular expression used to match 'AI_WORKFLOW_RUN_ID' assumes a specific format for the ID. Consider adding validation or handling for unexpected formats to avoid potential runtime errors.

}
const aiWorkflowRunId = match[1];

const jobCountMatch = logs.match(/::JOB_COUNT::(\d+)/i);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value for 'jobsCount' is set to 1 if 'jobCountMatch' is not found. Ensure that this default value is appropriate for all cases, or consider logging a warning when the match is not found.

aiWorkflowRunId,
jobsCount,
};
} catch {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch block does not differentiate between different types of errors. Consider logging the error details to help with debugging and understanding the failure reasons.

import { SubmissionService } from 'src/api/submission/submission.service';
import { ChallengePrismaService } from './challenge-prisma.service';
import { MemberPrismaService } from './member-prisma.service';
import { QueueSchedulerService } from './queue-scheduler.service';
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QueueSchedulerService is imported but not used in this file. If it's not needed, consider removing the import to keep the code clean and avoid unnecessary dependencies.


constructor() {
this.logger.log('QueueSchedulerService initialized');
this.boss = new PgBoss(process.env.PG_BOSS_DB_URL!);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the case where process.env.PG_BOSS_DB_URL is undefined to prevent potential runtime errors.

this.logger.log(`Created queue with name "${queueName}"`);
}

async queueJob(queueName: string, jobId, payload?: any, options?: Queue) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify the type for jobId parameter for better type safety and clarity.

this.tasksMap.get(jobId)?.call(null);
this.tasksMap.delete(jobId);
}
this.logger.log(`Job ${jobId} completed with result:`, result);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using template literals for consistent logging format: this.logger.log(Job ${jobId} completed with result: ${result});

* @param giteaService - Service to interact with Gitea.
*/
constructor(
private readonly prisma: PrismaService,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for the PrismaService initialization to ensure that any issues with database connectivity or configuration are caught early.

private readonly giteaService: GiteaService,
) {}

async onModuleInit() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onModuleInit method is asynchronous but does not handle potential errors. Consider wrapping the logic in a try-catch block to manage any exceptions that might occur during the initialization process.

})
).map((d) => d.gitWorkflowId);

await this.scheduler.handleWorkForQueues<{ data: any }>(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleWorkForQueues method is called with a generic type { data: any }. Using any can lead to runtime errors due to lack of type safety. Consider defining a more specific type for the data being handled.


await this.giteaService.checkAndCreateRepository(
process.env.GITEA_SUBMISSION_REVIEWS_ORG || 'TC-Reviews-Tests',
if (!Array.isArray(challenge?.workflows)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for challenge?.workflows being an array has been moved up, but it would be beneficial to log a message indicating that no AI workflow is defined for the challenge before returning. This can help in debugging and understanding the flow of the program.

return;
}

await this.queueWorkflowRuns(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function queueWorkflowRuns is being called, but it is not clear from this diff if it handles errors internally. Ensure that any errors thrown by queueWorkflowRuns are properly caught and logged to maintain consistency with the previous implementation where errors were logged and accumulated.

challengeId: string,
submissionId: string,
) {
await this.prisma.$transaction(async (tx) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for the transaction to ensure that any failures during the transaction are properly managed and do not leave the database in an inconsistent state.

});

for (const run of workflowRuns) {
await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The await inside a loop can lead to performance issues as it waits for each job to be queued before proceeding to the next. Consider using Promise.all to queue all jobs concurrently.

});
}

async handleQueuedWorkflowRun([job]: [Job]) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleQueuedWorkflowRun method does not seem to handle errors that might occur during the processing of the job. Consider adding error handling to ensure that any issues are logged and managed appropriately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant