@@ -32,13 +32,32 @@ import type {
32
32
PoolIdChainIdBody ,
33
33
} from './types' ;
34
34
import evaluationQuestionService from '@/service/EvaluationQuestionService' ;
35
+ import { rateLimit } from 'express-rate-limit' ;
35
36
36
37
const logger = createLogger ( ) ;
37
38
38
39
interface EvaluationBody extends CreateEvaluationParams {
39
40
signature : Hex ;
40
41
}
41
42
43
+ export const evaluationRateLimiter = rateLimit ( {
44
+ windowMs : 60 * 1000 ,
45
+ max : 20 , // 20 requests per minute
46
+ message : 'Too many evaluation requests' ,
47
+ } ) ;
48
+
49
+ export const recreateEvaluationQuestionsLimiter = rateLimit ( {
50
+ windowMs : 60 * 1000 ,
51
+ max : 5 , // More restrictive for question recreation
52
+ message : 'Too many question recreation requests' ,
53
+ } ) ;
54
+
55
+ export const triggerLLMEvaluationLimiter = rateLimit ( {
56
+ windowMs : 60 * 1000 ,
57
+ max : 10 , // 10 requests per minute
58
+ message : 'Too many LLM evaluation requests' ,
59
+ } ) ;
60
+
42
61
export const recreateEvaluationQuestions = async (
43
62
req : Request ,
44
63
res : Response
@@ -304,117 +323,131 @@ export const createLLMEvaluations = async (
304
323
const roundCache : Record < string , RoundWithApplications > = { } ;
305
324
const failedProjects : string [ ] = [ ] ;
306
325
307
- // Split the paramsArray into batches of 10
308
- const batchedParams = batchPromises ( paramsArray , 10 ) ;
326
+ // Increase batch size and add delay between batches
327
+ const BATCH_SIZE = 25 ;
328
+ const BATCH_DELAY = 2000 ; // 2 seconds between batches
329
+
330
+ // Deduplicate params array based on unique application IDs
331
+ const uniqueParams = paramsArray . filter (
332
+ ( param , index , self ) =>
333
+ index ===
334
+ self . findIndex (
335
+ p =>
336
+ p . alloApplicationId === param . alloApplicationId &&
337
+ p . chainId === param . chainId
338
+ )
339
+ ) ;
340
+
341
+ const batchedParams = batchPromises ( uniqueParams , BATCH_SIZE ) ;
309
342
310
343
for ( const batch of batchedParams ) {
311
344
try {
312
- // Process each batch of promises concurrently
313
- const evaluationPromises = batch . map ( async params => {
314
- try {
315
- const evaluationQuestions =
316
- params . questions === undefined || params . questions . length === 0
317
- ? await evaluationService . getQuestionsByChainAndAlloPoolId (
318
- params . chainId ,
319
- params . alloPoolId
320
- )
321
- : params . questions ;
322
-
323
- if (
324
- evaluationQuestions === null ||
325
- evaluationQuestions . length === 0
326
- ) {
327
- logger . error (
328
- 'createLLMEvaluations:Failed to get evaluation questions'
329
- ) ;
330
- throw new Error ( 'Failed to get evaluation questions' ) ;
331
- }
332
-
333
- let roundMetadata = params . roundMetadata ;
334
- let applicationMetadata = params . applicationMetadata ;
335
-
336
- // Check if the round is already in cache
337
- if ( roundMetadata == null || applicationMetadata == null ) {
338
- let round : RoundWithApplications | null ;
339
-
340
- // If the round is cached, use it
341
- if ( roundCache [ params . alloPoolId ] != null ) {
342
- round = roundCache [ params . alloPoolId ] ;
343
- logger . debug (
344
- `Using cached round data for roundId: ${ params . alloPoolId } `
345
- ) ;
346
- } else {
347
- // Fetch the round and store it in the cache
348
- const [ error , fetchedRound ] = await catchError (
349
- indexerClient . getRoundWithApplications ( {
350
- chainId : params . chainId ,
351
- roundId : params . alloPoolId ,
352
- } )
353
- ) ;
354
-
355
- if ( error !== undefined || fetchedRound == null ) {
356
- logger . error ( 'Failed to fetch round with applications' ) ;
357
- throw new Error ( 'Failed to fetch round with applications' ) ;
358
- }
359
-
360
- round = fetchedRound ;
361
- roundCache [ params . alloPoolId ] = round ;
362
- logger . info (
363
- `Fetched and cached round with ID: ${ round . id } , which includes ${ round . applications . length } applications`
364
- ) ;
365
- }
366
-
367
- const application = round . applications . find (
368
- app => app . id === params . alloApplicationId
369
- ) ;
370
- if ( application == null ) {
371
- logger . error (
372
- `Application with ID: ${ params . alloApplicationId } not found in round`
373
- ) ;
374
- throw new NotFoundError (
375
- `Application with ID: ${ params . alloApplicationId } not found in round`
376
- ) ;
377
- }
378
-
379
- roundMetadata = round . roundMetadata ;
380
- applicationMetadata = application . metadata ;
381
- }
382
-
383
- const evaluation = await requestEvaluation (
384
- roundMetadata ,
385
- applicationMetadata ,
386
- evaluationQuestions
387
- ) ;
388
-
389
- await createEvaluation ( {
390
- chainId : params . chainId ,
391
- alloPoolId : params . alloPoolId ,
392
- alloApplicationId : params . alloApplicationId ,
393
- cid : params . cid ,
394
- evaluator : params . evaluator ,
395
- summaryInput : evaluation ,
396
- evaluatorType : EVALUATOR_TYPE . LLM_GPT3 ,
397
- } ) ;
398
- } catch ( error ) {
399
- // If an error occurs, add the project ID to the failedProjects array
400
- failedProjects . push ( params . alloApplicationId ) ;
401
- throw error ;
402
- }
403
- } ) ;
404
-
405
- await Promise . all ( evaluationPromises ) ;
345
+ // Process batch with concurrency limit
346
+ await Promise . all (
347
+ batch . map ( async params => {
348
+ await processSingleEvaluation ( params , roundCache , failedProjects ) ;
349
+ } )
350
+ ) ;
406
351
407
- await new Promise ( resolve => setTimeout ( resolve , 1000 ) ) ;
352
+ // Add delay between batches to prevent overwhelming the system
353
+ if ( batchedParams . indexOf ( batch ) < batchedParams . length - 1 ) {
354
+ await new Promise ( resolve => setTimeout ( resolve , BATCH_DELAY ) ) ;
355
+ }
408
356
} catch ( batchError ) {
409
- // Handle any error within the batch (if any promise fails)
410
- logger . error (
411
- 'Error processing batch, skipping to the next one:' ,
412
- batchError
413
- ) ;
414
- // Continue to the next batch even if an error occurred
357
+ logger . error ( 'Error processing batch:' , batchError ) ;
415
358
continue ;
416
359
}
417
360
}
418
361
419
362
return failedProjects ;
420
363
} ;
364
+
365
+ async function processSingleEvaluation (
366
+ params : CreateLLMEvaluationParams ,
367
+ roundCache : Record < string , RoundWithApplications > ,
368
+ failedProjects : string [ ]
369
+ ) : Promise < void > {
370
+ try {
371
+ const evaluationQuestions =
372
+ params . questions === undefined || params . questions . length === 0
373
+ ? await evaluationService . getQuestionsByChainAndAlloPoolId (
374
+ params . chainId ,
375
+ params . alloPoolId
376
+ )
377
+ : params . questions ;
378
+
379
+ if ( evaluationQuestions === null || evaluationQuestions . length === 0 ) {
380
+ logger . error ( 'createLLMEvaluations:Failed to get evaluation questions' ) ;
381
+ throw new Error ( 'Failed to get evaluation questions' ) ;
382
+ }
383
+
384
+ let roundMetadata = params . roundMetadata ;
385
+ let applicationMetadata = params . applicationMetadata ;
386
+
387
+ // Check if the round is already in cache
388
+ if ( roundMetadata == null || applicationMetadata == null ) {
389
+ let round : RoundWithApplications | null ;
390
+
391
+ // If the round is cached, use it
392
+ if ( roundCache [ params . alloPoolId ] != null ) {
393
+ round = roundCache [ params . alloPoolId ] ;
394
+ logger . debug (
395
+ `Using cached round data for roundId: ${ params . alloPoolId } `
396
+ ) ;
397
+ } else {
398
+ // Fetch the round and store it in the cache
399
+ const [ error , fetchedRound ] = await catchError (
400
+ indexerClient . getRoundWithApplications ( {
401
+ chainId : params . chainId ,
402
+ roundId : params . alloPoolId ,
403
+ } )
404
+ ) ;
405
+
406
+ if ( error !== undefined || fetchedRound == null ) {
407
+ logger . error ( 'Failed to fetch round with applications' ) ;
408
+ throw new Error ( 'Failed to fetch round with applications' ) ;
409
+ }
410
+
411
+ round = fetchedRound ;
412
+ roundCache [ params . alloPoolId ] = round ;
413
+ logger . info (
414
+ `Fetched and cached round with ID: ${ round . id } , which includes ${ round . applications . length } applications`
415
+ ) ;
416
+ }
417
+
418
+ const application = round . applications . find (
419
+ app => app . id === params . alloApplicationId
420
+ ) ;
421
+ if ( application == null ) {
422
+ logger . error (
423
+ `Application with ID: ${ params . alloApplicationId } not found in round`
424
+ ) ;
425
+ throw new NotFoundError (
426
+ `Application with ID: ${ params . alloApplicationId } not found in round`
427
+ ) ;
428
+ }
429
+
430
+ roundMetadata = round . roundMetadata ;
431
+ applicationMetadata = application . metadata ;
432
+ }
433
+
434
+ const evaluation = await requestEvaluation (
435
+ roundMetadata ,
436
+ applicationMetadata ,
437
+ evaluationQuestions
438
+ ) ;
439
+
440
+ await createEvaluation ( {
441
+ chainId : params . chainId ,
442
+ alloPoolId : params . alloPoolId ,
443
+ alloApplicationId : params . alloApplicationId ,
444
+ cid : params . cid ,
445
+ evaluator : params . evaluator ,
446
+ summaryInput : evaluation ,
447
+ evaluatorType : EVALUATOR_TYPE . LLM_GPT3 ,
448
+ } ) ;
449
+ } catch ( error ) {
450
+ failedProjects . push ( params . alloApplicationId ) ;
451
+ throw error ;
452
+ }
453
+ }
0 commit comments