-
Notifications
You must be signed in to change notification settings - Fork 8
[DRAFT] PM-1551 - integrate the AI workflow events #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
status String @db.VarChar | ||
usage Json? @db.JsonB | ||
error String? @db.Text | ||
scheduledJobId String? @db.Text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming scheduledJobId
to scheduledJobID
for consistency with common naming conventions that capitalize acronyms.
constructor( | ||
private readonly prisma: PrismaService, | ||
private readonly prismaErrorService: PrismaErrorService, | ||
private readonly scheduler: QueueSchedulerService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The QueueSchedulerService
is injected but not used in the current code. Ensure that it is necessary for future implementation or remove it to keep the code clean.
private readonly prisma: PrismaService, | ||
private readonly prismaErrorService: PrismaErrorService, | ||
private readonly scheduler: QueueSchedulerService, | ||
private readonly giteaService: GiteaService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GiteaService
is injected but not used in the current code. Ensure that it is necessary for future implementation or remove it to keep the code clean.
|
||
// Future extensibility: Add event-specific handlers here | ||
this.handleEventSpecificProcessing( | ||
await this.handleEventSpecificProcessing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the promise rejection from await this.handleEventSpecificProcessing(...)
to ensure that any errors are properly caught and managed. This can prevent unhandled promise rejections and improve error handling in the application.
* This method can be extended to handle different GitHub events differently | ||
*/ | ||
private handleEventSpecificProcessing(event: string, payload: any): void { | ||
private async handleEventSpecificProcessing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider specifying the type of the payload
parameter instead of using any
to improve type safety and clarity.
// } | ||
switch (event) { | ||
case 'workflow_job': | ||
await this.handleWorkflowEvents(event, payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming handleWorkflowEvents
to handleWorkflowJobEvent
for clarity, as it seems to handle the 'workflow_job' event specifically.
} | ||
} | ||
|
||
async handleWorkflowEvents(event: string, payload: any) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider specifying the type of the payload
parameter instead of using any
to improve type safety and code readability.
return; | ||
} | ||
|
||
let [aiWorkflowRun]: (aiWorkflowRun | null)[] = aiWorkflowRuns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of array destructuring with (aiWorkflowRun | null)[]
is unnecessary here. You can directly assign aiWorkflowRun
from aiWorkflowRuns[0]
since you've already checked the length of aiWorkflowRuns
.
payload.workflow_job.id as number, | ||
)) ?? ({} as any); | ||
|
||
if (!aiWorkflowRunId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the case where jobsCount
is undefined or null after destructuring from getAiWorkflowDataFromLogs
to avoid potential runtime errors.
timestamp: new Date().toISOString(), | ||
}); | ||
break; | ||
case 'completed': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment about marking the run as completed only when the last job has been completed is helpful, but ensure that the logic correctly handles edge cases, such as when jobsCount
is zero or undefined.
await this.prisma.aiWorkflowRun.update({ | ||
where: { id: aiWorkflowRun.id }, | ||
data: { | ||
status: payload.workflow_job.conclusion.toUpperCase(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure that payload.workflow_job.conclusion
is always defined and in the expected format before calling toUpperCase()
to avoid potential runtime errors.
params: RequestParams = {}, | ||
) => | ||
this.request<void, any>({ | ||
this.request<string, any>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type has been changed from void
to string
. Ensure that this change is intentional and that the rest of the codebase correctly handles the new return type.
workflowId: string; | ||
ref: string; | ||
params: Record<string, string>; | ||
id: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming id
to workflowId
for clarity and consistency with previous naming conventions.
id: string; | ||
name: string; | ||
description: string; | ||
llmId: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property llmId
might need a more descriptive name if it is not immediately clear what 'llm' stands for.
description: string; | ||
llmId: string; | ||
defUrl: string; | ||
gitOwnerRepo: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property gitOwnerRepo
could be split into two separate properties, gitOwner
and gitRepo
, for better clarity and separation of concerns.
WHERE "challengeId" = ${challengeId} | ||
`; | ||
|
||
const workflows = await this.challengePrisma.$queryRaw<WorkflowData[]>` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using parameterized queries to prevent SQL injection vulnerabilities. For example, use Prisma's parameterized query syntax instead of embedding variables directly into the query string.
|
||
private mapChallenge(aggregate: ChallengeAggregate): ChallengeData { | ||
const { challenge, legacy, type, track, phases, metadata } = aggregate; | ||
const { challenge, legacy, type, track, phases, workflows } = aggregate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable workflows
is extracted from aggregate
but is not used in the mapChallenge
function. Consider removing it if it's not needed or ensure it is utilized appropriately.
jobId: number, | ||
retry = 0, | ||
): Promise<{ aiWorkflowRunId: string; jobsCount: number } | null> { | ||
// 120 re-tryies means ~60seconds (1/500ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 134 mentions '120 re-tryies', which seems to have a typo. It should be 'retries'.
) | ||
).data; | ||
|
||
const match = logs.match(/::AI_WORKFLOW_RUN_ID::\s*([a-z0-9-_]{9,})/i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regular expression used to match 'AI_WORKFLOW_RUN_ID' assumes a specific format for the ID. Consider adding validation or handling for unexpected formats to avoid potential runtime errors.
} | ||
const aiWorkflowRunId = match[1]; | ||
|
||
const jobCountMatch = logs.match(/::JOB_COUNT::(\d+)/i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value for 'jobsCount' is set to 1 if 'jobCountMatch' is not found. Ensure that this default value is appropriate for all cases, or consider logging a warning when the match is not found.
aiWorkflowRunId, | ||
jobsCount, | ||
}; | ||
} catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catch block does not differentiate between different types of errors. Consider logging the error details to help with debugging and understanding the failure reasons.
import { SubmissionService } from 'src/api/submission/submission.service'; | ||
import { ChallengePrismaService } from './challenge-prisma.service'; | ||
import { MemberPrismaService } from './member-prisma.service'; | ||
import { QueueSchedulerService } from './queue-scheduler.service'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The QueueSchedulerService
is imported but not used in this file. If it's not needed, consider removing the import to keep the code clean and avoid unnecessary dependencies.
|
||
constructor() { | ||
this.logger.log('QueueSchedulerService initialized'); | ||
this.boss = new PgBoss(process.env.PG_BOSS_DB_URL!); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the case where process.env.PG_BOSS_DB_URL
is undefined to prevent potential runtime errors.
this.logger.log(`Created queue with name "${queueName}"`); | ||
} | ||
|
||
async queueJob(queueName: string, jobId, payload?: any, options?: Queue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify the type for jobId
parameter for better type safety and clarity.
this.tasksMap.get(jobId)?.call(null); | ||
this.tasksMap.delete(jobId); | ||
} | ||
this.logger.log(`Job ${jobId} completed with result:`, result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using template literals for consistent logging format: this.logger.log(
Job ${jobId} completed with result: ${result});
* @param giteaService - Service to interact with Gitea. | ||
*/ | ||
constructor( | ||
private readonly prisma: PrismaService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the PrismaService initialization to ensure that any issues with database connectivity or configuration are caught early.
private readonly giteaService: GiteaService, | ||
) {} | ||
|
||
async onModuleInit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onModuleInit
method is asynchronous but does not handle potential errors. Consider wrapping the logic in a try-catch block to manage any exceptions that might occur during the initialization process.
}) | ||
).map((d) => d.gitWorkflowId); | ||
|
||
await this.scheduler.handleWorkForQueues<{ data: any }>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handleWorkForQueues
method is called with a generic type { data: any }
. Using any
can lead to runtime errors due to lack of type safety. Consider defining a more specific type for the data being handled.
|
||
await this.giteaService.checkAndCreateRepository( | ||
process.env.GITEA_SUBMISSION_REVIEWS_ORG || 'TC-Reviews-Tests', | ||
if (!Array.isArray(challenge?.workflows)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for challenge?.workflows
being an array has been moved up, but it would be beneficial to log a message indicating that no AI workflow is defined for the challenge before returning. This can help in debugging and understanding the flow of the program.
return; | ||
} | ||
|
||
await this.queueWorkflowRuns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function queueWorkflowRuns
is being called, but it is not clear from this diff if it handles errors internally. Ensure that any errors thrown by queueWorkflowRuns
are properly caught and logged to maintain consistency with the previous implementation where errors were logged and accumulated.
challengeId: string, | ||
submissionId: string, | ||
) { | ||
await this.prisma.$transaction(async (tx) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the transaction to ensure that any failures during the transaction are properly managed and do not leave the database in an inconsistent state.
}); | ||
|
||
for (const run of workflowRuns) { | ||
await this.scheduler.queueJob(run.workflow.gitWorkflowId, run.id, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The await
inside a loop can lead to performance issues as it waits for each job to be queued before proceeding to the next. Consider using Promise.all
to queue all jobs concurrently.
}); | ||
} | ||
|
||
async handleQueuedWorkflowRun([job]: [Job]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handleQueuedWorkflowRun
method does not seem to handle errors that might occur during the processing of the job. Consider adding error handling to ensure that any issues are logged and managed appropriately.
https://topcoder.atlassian.net/browse/PM-1551 - Integrate webhook event processing with tc workflow
NOTE: this is a draft PR. I'll update with more details after code cleanup.