diff --git a/apps/indexer/config.yaml b/apps/indexer/config.yaml index 10a8d02c..911aa93b 100644 --- a/apps/indexer/config.yaml +++ b/apps/indexer/config.yaml @@ -135,113 +135,113 @@ networks: ####################### # MAINNET # ####################### - - id: 1 # mainnet - start_block: 18486688 - rpc: ${ENVIO_MAINNET_FALLBACK_RPC:-https://eth.llamarpc.com} - contracts: - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - name: Strategy - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - - id: 10 # optimism - rpc: ${ENVIO_OPTIMISM_FALLBACK_RPC:-https://optimism.llamarpc.com} - start_block: 111678968 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 42 # lukso-mainnet - start_block: 2400000 - rpc: http://34.91.99.187:854 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0xB087535DB0df98fC4327136e897A5985E5Cfbd66 - - - id: 100 # gnosis - rpc: ${ENVIO_GNOSIS_FALLBACK_RPC:-https://gnosis-rpc.publicnode.com} - start_block: 35900000 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 137 # polygon - rpc: ${ENVIO_POLYGON_FALLBACK_RPC:-https://polygon.llamarpc.com} - start_block: 49466006 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 250 # fantom - rpc: ${ENVIO_FANTOM_FALLBACK_RPC:-https://rpc.ankr.com/fantom} - start_block: 77624278 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 324 # zksync-era-mainnet - rpc: ${ENVIO_ZKSYNC_FALLBACK_RPC:-https://1rpc.io/zksync2-era} - start_block: 31154341 - contracts: - - name: Registry - address: - - 0xaa376Ef759c1f5A8b0B5a1e2FEC5C23f3bF30246 - - name: Strategy - - name: Allo - address: - - 0x9D1D1BF2835935C291C0f5228c86d5C4e235A249 - - - id: 1088 # metis - rpc: ${ENVIO_METIS_FALLBACK_RPC:-https://metis-rpc.publicnode.com} - start_block: 17860000 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 8453 # base - rpc: ${ENVIO_BASE_FALLBACK_RPC:-https://base-rpc.publicnode.com} - start_block: 6083365 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + # - id: 1 # mainnet + # start_block: 18486688 + # rpc: ${ENVIO_MAINNET_FALLBACK_RPC:-https://eth.llamarpc.com} + # contracts: + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + # - name: Strategy + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + + # - id: 10 # optimism + # rpc: ${ENVIO_OPTIMISM_FALLBACK_RPC:-https://optimism.llamarpc.com} + # start_block: 111678968 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 42 # lukso-mainnet + # start_block: 2400000 + # rpc: http://34.91.99.187:854 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0xB087535DB0df98fC4327136e897A5985E5Cfbd66 + + # - id: 100 # gnosis + # rpc: ${ENVIO_GNOSIS_FALLBACK_RPC:-https://gnosis-rpc.publicnode.com} + # start_block: 35900000 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 137 # polygon + # rpc: ${ENVIO_POLYGON_FALLBACK_RPC:-https://polygon.llamarpc.com} + # start_block: 49466006 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 250 # fantom + # rpc: ${ENVIO_FANTOM_FALLBACK_RPC:-https://rpc.ankr.com/fantom} + # start_block: 77624278 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 324 # zksync-era-mainnet + # rpc: ${ENVIO_ZKSYNC_FALLBACK_RPC:-https://1rpc.io/zksync2-era} + # start_block: 31154341 + # contracts: + # - name: Registry + # address: + # - 0xaa376Ef759c1f5A8b0B5a1e2FEC5C23f3bF30246 + # - name: Strategy + # - name: Allo + # address: + # - 0x9D1D1BF2835935C291C0f5228c86d5C4e235A249 + + # - id: 1088 # metis + # rpc: ${ENVIO_METIS_FALLBACK_RPC:-https://metis-rpc.publicnode.com} + # start_block: 17860000 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 8453 # base + # rpc: ${ENVIO_BASE_FALLBACK_RPC:-https://base-rpc.publicnode.com} + # start_block: 6083365 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - id: 42161 # arbitrum rpc: ${ENVIO_ARBITRUM_FALLBACK_RPC:-https://arbitrum.llamarpc.com} @@ -261,105 +261,105 @@ networks: address: - 0x2ce7E4cB5Edb140A9327e67De85463186E757C8f - - id: 43114 # avalanche - rpc: ${ENVIO_AVALANCHE_FALLBACK_RPC:-https://avalanche.public-rpc.com} - start_block: 34540051 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 42220 # celo-mainnet - rpc: ${ENVIO_CELO_FALLBACK_RPC:-https://1rpc.io/celo} - start_block: 22257475 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 534352 # scroll - rpc: ${ENVIO_SCROLL_FALLBACK_RPC:-https://1rpc.io/scroll} - start_block: 2683205 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + # - id: 43114 # avalanche + # rpc: ${ENVIO_AVALANCHE_FALLBACK_RPC:-https://avalanche.public-rpc.com} + # start_block: 34540051 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 42220 # celo-mainnet + # rpc: ${ENVIO_CELO_FALLBACK_RPC:-https://1rpc.io/celo} + # start_block: 22257475 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 534352 # scroll + # rpc: ${ENVIO_SCROLL_FALLBACK_RPC:-https://1rpc.io/scroll} + # start_block: 2683205 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 ####################### # Custom RPC # ####################### - - id: 295 # hedera-mainnet - rpc_config: - url: ${ENVIO_HEDERA_RPC_URL:-https://mainnet.hashio.io/api} - # initial_block_interval: 9000 # Number of blocks to request initially - # backoff_multiplicative: 0 # Factor to reduce batch size on error - # acceleration_additive: 0 # Increase batch size if no errors - # interval_ceiling: 9000 # Maximum blocks per request - # backoff_millis: 1000 # Wait time before retrying in milliseconds - # query_timeout_millis: 20000 # Timeout for RPC requests in milliseconds - start_block: 75239000 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - - id: 1329 # sei-mainnet - rpc_config: - url: ${ENVIO_SEI_RPC_URL:-https://evm-rpc.sei-apis.com} - # initial_block_interval: 2000 # Number of blocks to request initially - # backoff_multiplicative: 0 # Factor to reduce batch size on error - # acceleration_additive: 0 # Increase batch size if no errors - interval_ceiling: 2000 # Maximum blocks per request - # backoff_millis: 1000 # Wait time before retrying in milliseconds - # query_timeout_millis: 20000 # Timeout for RPC requests in milliseconds - start_block: 78000000 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + # - id: 295 # hedera-mainnet + # rpc_config: + # url: ${ENVIO_HEDERA_RPC_URL:-https://mainnet.hashio.io/api} + # # initial_block_interval: 9000 # Number of blocks to request initially + # # backoff_multiplicative: 0 # Factor to reduce batch size on error + # # acceleration_additive: 0 # Increase batch size if no errors + # # interval_ceiling: 9000 # Maximum blocks per request + # # backoff_millis: 1000 # Wait time before retrying in milliseconds + # # query_timeout_millis: 20000 # Timeout for RPC requests in milliseconds + # start_block: 75239000 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + + # - id: 1329 # sei-mainnet + # rpc_config: + # url: ${ENVIO_SEI_RPC_URL:-https://evm-rpc.sei-apis.com} + # # initial_block_interval: 2000 # Number of blocks to request initially + # # backoff_multiplicative: 0 # Factor to reduce batch size on error + # # acceleration_additive: 0 # Increase batch size if no errors + # interval_ceiling: 2000 # Maximum blocks per request + # # backoff_millis: 1000 # Wait time before retrying in milliseconds + # # query_timeout_millis: 20000 # Timeout for RPC requests in milliseconds + # start_block: 78000000 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 ####################### # TESTNET # ####################### - - id: 11155111 # sepolia - rpc: ${ENVIO_SEPOLIA_FALLBACK_RPC:-https://1rpc.io/sepolia} - start_block: 4617051 - contracts: - - name: Registry - address: - - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 - - name: Strategy - - name: Allo - address: - - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 - - name: AlloV1ToV2ProfileMigration - address: - - 0xCd5AbD09ee34BA604795F7f69413caf20ee0Ab60 - - name: GitcoinAttestationNetwork - address: - - 0xBAa70bbAB3C4a7265f879bb3658336DE893b8582 + # - id: 11155111 # sepolia + # rpc: ${ENVIO_SEPOLIA_FALLBACK_RPC:-https://1rpc.io/sepolia} + # start_block: 4617051 + # contracts: + # - name: Registry + # address: + # - 0x4AAcca72145e1dF2aeC137E1f3C5E3D75DB8b5f3 + # - name: Strategy + # - name: Allo + # address: + # - 0x1133eA7Af70876e64665ecD07C0A0476d09465a1 + # - name: AlloV1ToV2ProfileMigration + # address: + # - 0xCd5AbD09ee34BA604795F7f69413caf20ee0Ab60 + # - name: GitcoinAttestationNetwork + # address: + # - 0xBAa70bbAB3C4a7265f879bb3658336DE893b8582 # - id: 80001 # polygon-mumbai # start_block: 41939383 # contracts: diff --git a/packages/data-flow/src/data-loader/handlers/application.handlers.ts b/packages/data-flow/src/data-loader/handlers/application.handlers.ts index 38447eae..39c05b6b 100644 --- a/packages/data-flow/src/data-loader/handlers/application.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/application.handlers.ts @@ -1,4 +1,5 @@ import { ApplicationChangeset, IApplicationRepository } from "@grants-stack-indexer/repository"; +import { performanceLogger } from "@grants-stack-indexer/shared"; import { ChangesetHandler } from "../types/index.js"; @@ -33,11 +34,35 @@ export const createApplicationHandlers = ( }) satisfies ChangesetHandler<"UpdateApplication">, IncrementApplicationDonationStats: (async (changeset, txConnection): Promise => { + const startTime = performance.now(); const { chainId, roundId, applicationId, amountInUsd } = changeset.args; await repository.incrementApplicationDonationStats( { chainId, roundId, id: applicationId }, amountInUsd, txConnection, ); + const endTime = performance.now(); + const duration = endTime - startTime; + + // Get current application stats for logging + const application = await repository.getApplicationById(applicationId, chainId, roundId); + + performanceLogger.logMetric({ + timestamp: new Date().toISOString(), + eventType: "Application", + operation: "IncrementApplicationDonationStats", + duration, + totalTime: duration, + chainId, + roundId, + applicationId, + amountInUsd, + uniqueDonorsCount: application?.uniqueDonorsCount, + totalDonationsCount: application?.totalDonationsCount, + details: { + totalAmountDonatedInUsd: application?.totalAmountDonatedInUsd, + status: application?.status, + }, + }); }) satisfies ChangesetHandler<"IncrementApplicationDonationStats">, }); diff --git a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts index 841fbd31..c13fb7b6 100644 --- a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts @@ -1,4 +1,5 @@ import { DonationChangeset, IDonationRepository } from "@grants-stack-indexer/repository"; +import { performanceLogger } from "@grants-stack-indexer/shared"; import { ChangesetHandler } from "../types/index.js"; @@ -18,10 +19,58 @@ export type DonationHandlers = { */ export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({ InsertDonation: (async (changeset, txConnection): Promise => { + const startTime = performance.now(); await repository.insertDonation(changeset.args.donation, txConnection); + const endTime = performance.now(); + const duration = endTime - startTime; + + performanceLogger.logMetric({ + timestamp: new Date().toISOString(), + eventType: "Donation", + operation: "InsertDonation", + duration, + totalTime: duration, + blockNumber: Number(changeset.args.donation.blockNumber), + transactionHash: changeset.args.donation.transactionHash, + chainId: changeset.args.donation.chainId, + roundId: changeset.args.donation.roundId, + applicationId: changeset.args.donation.applicationId || undefined, + donorAddress: changeset.args.donation.donorAddress, + recipientAddress: changeset.args.donation.recipientAddress, + amount: changeset.args.donation.amount.toString(), + amountInUsd: changeset.args.donation.amountInUsd, + details: { + tokenAddress: changeset.args.donation.tokenAddress, + amountInRoundMatchToken: changeset.args.donation.amountInRoundMatchToken.toString(), + }, + }); }) satisfies ChangesetHandler<"InsertDonation">, InsertManyDonations: (async (changeset, txConnection): Promise => { + const startTime = performance.now(); await repository.insertManyDonations(changeset.args.donations, txConnection); + const endTime = performance.now(); + const duration = endTime - startTime; + + const firstDonation = changeset.args.donations[0]; + performanceLogger.logMetric({ + timestamp: new Date().toISOString(), + eventType: "Donation", + operation: "InsertManyDonations", + duration, + totalTime: duration, + blockNumber: firstDonation ? Number(firstDonation.blockNumber) : undefined, + transactionHash: firstDonation?.transactionHash, + chainId: firstDonation?.chainId, + roundId: firstDonation?.roundId, + amount: firstDonation ? firstDonation.amount.toString() : undefined, + amountInUsd: firstDonation?.amountInUsd, + details: { + count: changeset.args.donations.length, + totalAmountInUsd: changeset.args.donations + .reduce((sum, d) => sum + Number(d.amountInUsd), 0) + .toString(), + }, + }); }) satisfies ChangesetHandler<"InsertManyDonations">, }); diff --git a/packages/data-flow/src/data-loader/handlers/round.handlers.ts b/packages/data-flow/src/data-loader/handlers/round.handlers.ts index d0dd6cce..beaff624 100644 --- a/packages/data-flow/src/data-loader/handlers/round.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/round.handlers.ts @@ -1,4 +1,5 @@ import { IRoundRepository, RoundChangeset } from "@grants-stack-indexer/repository"; +import { performanceLogger } from "@grants-stack-indexer/shared"; import { ChangesetHandler } from "../types/index.js"; @@ -52,6 +53,7 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers }) satisfies ChangesetHandler<"IncrementRoundFundedAmount">, IncrementRoundDonationStats: (async (changeset, txConnection): Promise => { + const startTime = performance.now(); const { chainId, roundId, amountInUsd } = changeset.args; await repository.incrementRoundDonationStats( { @@ -61,6 +63,27 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers amountInUsd, txConnection, ); + const endTime = performance.now(); + const duration = endTime - startTime; + + // Get current round stats for logging + const round = await repository.getRoundById(chainId, roundId); + + performanceLogger.logMetric({ + timestamp: new Date().toISOString(), + eventType: "Round", + operation: "IncrementRoundDonationStats", + duration, + totalTime: duration, + chainId, + roundId, + amountInUsd, + uniqueDonorsCount: round?.uniqueDonorsCount, + totalDonationsCount: round?.totalDonationsCount, + details: { + totalAmountDonatedInUsd: round?.totalAmountDonatedInUsd, + }, + }); }) satisfies ChangesetHandler<"IncrementRoundDonationStats">, IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise => { diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 580e3d9d..decb83a0 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,3 +1,5 @@ +import * as fs from "fs"; +import * as path from "path"; import { isNativeError } from "util/types"; import pMap from "p-map"; @@ -49,6 +51,19 @@ type TokenWithTimestamps = { timestamps: TimestampMs[]; }; +/** + * Performance tracking data for events + */ +type EventPerformanceData = { + eventName: string; + slowCount: number; + totalDuration: number; + maxDuration: number; + minDuration: number; + lastTimestamp: string; + eventCount: number; +}; + /** * The Orchestrator is the central coordinator of the data flow system, managing the interaction between * three main components: @@ -83,6 +98,9 @@ export class Orchestrator { private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; private readonly retryHandler: RetryHandler; + private readonly performanceData: Map = new Map(); + private readonly performanceCsvPath: string; + private readonly slowEventThresholdMs: number = 0.5; // 0.5 milliseconds (changed from 500ms) /** * @param chainId - The chain id @@ -132,6 +150,158 @@ export class Orchestrator { this.eventsQueue = new Queue>(fetchLimit); this.eventsByBlockContext = new Map(); this.retryHandler = new RetryHandler(retryStrategy, this.logger); + + // Set up performance tracking + this.performanceCsvPath = path.join(process.cwd(), "performance.csv"); + this.initializePerformanceCsv(); + } + + /** + * Initialize the performance CSV file with headers if it doesn't exist + */ + private initializePerformanceCsv(): void { + if (!fs.existsSync(this.performanceCsvPath)) { + const headers = + "timestamp,eventName,slowCount,avgDuration,maxDuration,minDuration,chainId\n"; + fs.writeFileSync(this.performanceCsvPath, headers); + } + } + + /** + * Read the performance CSV file and return a map of event names to their data + * @returns A map of event names to their CSV data + */ + private readPerformanceCsv(): Map { + const eventMap = new Map(); + + if (!fs.existsSync(this.performanceCsvPath)) { + return eventMap; + } + + try { + const fileContent = fs.readFileSync(this.performanceCsvPath, "utf-8"); + const lines = fileContent.split("\n"); + + // Skip header + for (let i = 1; i < lines.length; i++) { + const line = lines[i]?.trim(); + if (!line) continue; + + const parts = line.split(","); + if (parts.length < 2) continue; + + const eventName = parts[1]; + if (eventName && typeof eventName === "string") { + eventMap.set(eventName, line); + } + } + } catch (error) { + this.logger.error(`Error reading performance CSV: ${error}`, { + className: Orchestrator.name, + chainId: this.chainId, + }); + } + + return eventMap; + } + + /** + * Write the combined performance data to the CSV file + * @param eventMap - Map of event names to their CSV data + */ + private writePerformanceCsv(eventMap: Map): void { + const headers = + "timestamp,eventName,slowCount,avgDuration,maxDuration,minDuration,chainId\n"; + const lines = [headers]; + + // Add all event data + for (const line of eventMap.values()) { + if (line) { + lines.push(line); + } + } + + fs.writeFileSync(this.performanceCsvPath, lines.join("\n")); + } + + /** + * Update performance data for an event and write to CSV + * @param eventName - The name of the event + * @param duration - The duration in milliseconds + */ + private updatePerformanceData(eventName: string, duration: number): void { + const now = new Date().toISOString(); + + // Update in-memory tracking + if (!this.performanceData.has(eventName)) { + this.performanceData.set(eventName, { + eventName, + slowCount: 0, + totalDuration: 0, + maxDuration: 0, + minDuration: Number.MAX_SAFE_INTEGER, + lastTimestamp: now, + eventCount: 0, + }); + this.logger.debug(`Created new performance data for ${eventName}`, { + className: Orchestrator.name, + chainId: this.chainId, + }); + } + + const data = this.performanceData.get(eventName)!; + data.totalDuration += duration; + data.maxDuration = Math.max(data.maxDuration, duration); + data.minDuration = Math.min(data.minDuration, duration); + data.lastTimestamp = now; + data.eventCount++; + + if (duration > this.slowEventThresholdMs) { + data.slowCount++; + } + + // Log summary every 10 events (changed from 100 for testing) + if (data.eventCount % 10 === 0 && data.eventCount > 0) { + this.logger.info( + `Writing performance summary for ${eventName} after ${data.eventCount} events`, + { + className: Orchestrator.name, + chainId: this.chainId, + }, + ); + + const avgDuration = (data.totalDuration / data.eventCount).toFixed(2); + const maxDuration = data.maxDuration.toFixed(2); + const minDuration = data.minDuration.toFixed(2); + + // Create CSV line for this event + const csvLine = `${now},${eventName},${data.slowCount},${avgDuration},${maxDuration},${minDuration},${this.chainId}`; + + // Read existing CSV data + const eventMap = this.readPerformanceCsv(); + + // Update or add this event's data + eventMap.set(eventName, csvLine); + + // Write back to CSV + this.writePerformanceCsv(eventMap); + + this.logger.info(`Performance summary for ${eventName}:`, { + className: Orchestrator.name, + chainId: this.chainId, + eventCount: data.eventCount, + slowCount: data.slowCount, + avgDuration, + maxDuration, + minDuration, + }); + } else if (data.eventCount % 5 === 0) { + // Log progress every 5 events (changed from 10 for testing) + this.logger.debug(`Processed ${data.eventCount} events for ${eventName}`, { + className: Orchestrator.name, + chainId: this.chainId, + }); + } } async run(signal: AbortSignal): Promise { @@ -430,28 +600,58 @@ export class Orchestrator { private async handleEvent( event: ProcessorEvent, ): Promise { - event = await this.enhanceStrategyId(event); - if (this.isPoolCreated(event)) { - const handleable = existsHandler(event.strategyId); - await this.strategyRegistry.saveStrategyId( - this.chainId, - event.params.strategy, - event.strategyId, - handleable, - ); - } else if (event.contractName === "Strategy" && "strategyId" in event) { - if (!existsHandler(event.strategyId)) { - this.logger.debug("Skipping event", { - event, - className: Orchestrator.name, - chainId: this.chainId, - }); - // we skip the event if the strategy id is not handled yet - return undefined; + const eventName = `${event.contractName}.${event.eventName}`; + const timerLabel = `Event: ${eventName}`; + + console.time(timerLabel); + const startTime = performance.now(); + + try { + event = await this.enhanceStrategyId(event); + if (this.isPoolCreated(event)) { + const handleable = existsHandler(event.strategyId); + await this.strategyRegistry.saveStrategyId( + this.chainId, + event.params.strategy, + event.strategyId, + handleable, + ); + } else if (event.contractName === "Strategy" && "strategyId" in event) { + if (!existsHandler(event.strategyId)) { + this.logger.debug("Skipping event", { + event, + className: Orchestrator.name, + chainId: this.chainId, + }); + // we skip the event if the strategy id is not handled yet + return undefined; + } } - } - return this.eventsProcessor.processEvent(event); + return this.eventsProcessor.processEvent(event); + } finally { + console.timeEnd(timerLabel); + + // Calculate duration + const endTime = performance.now(); + const duration = endTime - startTime; + + // Update performance tracking + this.updatePerformanceData(eventName, duration); + + // Log slow events + if (duration > this.slowEventThresholdMs) { + this.logger.warn( + `Slow event detected: ${eventName} took ${duration.toFixed(2)}ms`, + { + className: Orchestrator.name, + chainId: this.chainId, + eventName, + duration, + }, + ); + } + } } /** diff --git a/packages/processors/src/processors/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.ts b/packages/processors/src/processors/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.ts index d916ecfe..2feb041d 100644 --- a/packages/processors/src/processors/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.ts +++ b/packages/processors/src/processors/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.ts @@ -44,27 +44,45 @@ export class DVMDAllocatedHandler implements IEventHandler<"Strategy", "Allocate * @throws {MetadataParsingFailed} if the metadata is invalid */ async handle(): Promise { + const startTime = performance.now(); const { roundRepository, applicationRepository } = this.dependencies; const { srcAddress } = this.event; const { recipientId: _recipientId, amount: strAmount, token: _token } = this.event.params; const amount = BigInt(strAmount); + // Time round lookup + const roundLookupStart = performance.now(); const round = await roundRepository.getRoundByStrategyAddressOrThrow( this.chainId, getAddress(srcAddress), ); + const roundLookupTime = performance.now() - roundLookupStart; + console.log(`[AllocatedWithOrigin] Round lookup took ${roundLookupTime.toFixed(2)}ms`); + + // Time application lookup + const appLookupStart = performance.now(); const application = await applicationRepository.getApplicationByAnchorAddressOrThrow( this.chainId, round.id, getAddress(_recipientId), ); + const appLookupTime = performance.now() - appLookupStart; + console.log(`[AllocatedWithOrigin] Application lookup took ${appLookupTime.toFixed(2)}ms`); const donationId = getDonationId(this.event.blockNumber, this.event.logIndex); + // Time token validation + const tokenValidationStart = performance.now(); const token = getTokenOrThrow(this.chainId, _token); const matchToken = getTokenOrThrow(this.chainId, round.matchTokenAddress); + const tokenValidationTime = performance.now() - tokenValidationStart; + console.log( + `[AllocatedWithOrigin] Token validation took ${tokenValidationTime.toFixed(2)}ms`, + ); + // Time price calculations + const priceCalcStart = performance.now(); const { amountInUsd } = await getTokenAmountInUsd( this.dependencies.pricingProvider, token, @@ -83,8 +101,14 @@ export class DVMDAllocatedHandler implements IEventHandler<"Strategy", "Allocate this.event.blockTimestamp, ) ).amount; + const priceCalcTime = performance.now() - priceCalcStart; + console.log(`[AllocatedWithOrigin] Price calculations took ${priceCalcTime.toFixed(2)}ms`); + // Time metadata parsing + const metadataStart = performance.now(); const parsedMetadata = this.parseMetadataOrThrow(application.metadata); + const metadataTime = performance.now() - metadataStart; + console.log(`[AllocatedWithOrigin] Metadata parsing took ${metadataTime.toFixed(2)}ms`); const donation: Donation = { id: donationId, @@ -103,6 +127,9 @@ export class DVMDAllocatedHandler implements IEventHandler<"Strategy", "Allocate timestamp: new Date(this.event.blockTimestamp), }; + const totalTime = performance.now() - startTime; + console.log(`[AllocatedWithOrigin] Total processing time: ${totalTime.toFixed(2)}ms`); + return [ { type: "InsertDonation", diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index a5a2748f..3e87e0cd 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -1 +1,2 @@ export * from "./external.js"; +export * from "./utils/performance-logger.js"; diff --git a/packages/shared/src/utils/performance-logger.ts b/packages/shared/src/utils/performance-logger.ts new file mode 100644 index 00000000..7b26b0da --- /dev/null +++ b/packages/shared/src/utils/performance-logger.ts @@ -0,0 +1,99 @@ +import fs from "fs"; +import path from "path"; + +interface PerformanceMetric { + timestamp: string; + eventType: string; + operation: string; + duration: number; + totalTime: number; + blockNumber?: number; + transactionHash?: string; + chainId?: number; + roundId?: string; + applicationId?: string; + donorAddress?: string; + recipientAddress?: string; + amount?: string; + amountInUsd?: string; + uniqueDonorsCount?: number; + totalDonationsCount?: number; + details?: Record; +} + +export class PerformanceLogger { + private static instance: PerformanceLogger; + private csvPath: string; + private readonly THRESHOLD_MS = 500; // 0.5 seconds + + private constructor() { + const logsDir = path.join(process.cwd(), "logs"); + if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir); + } + this.csvPath = path.join(logsDir, "performance-metrics.csv"); + this.initializeCsvFile(); + } + + public static getInstance(): PerformanceLogger { + if (!PerformanceLogger.instance) { + PerformanceLogger.instance = new PerformanceLogger(); + } + return PerformanceLogger.instance; + } + + private initializeCsvFile(): void { + if (!fs.existsSync(this.csvPath)) { + const header = + [ + "timestamp", + "eventType", + "operation", + "duration", + "totalTime", + "blockNumber", + "transactionHash", + "chainId", + "roundId", + "applicationId", + "donorAddress", + "recipientAddress", + "amount", + "amountInUsd", + "uniqueDonorsCount", + "totalDonationsCount", + "details", + ].join(",") + "\n"; + fs.writeFileSync(this.csvPath, header); + } + } + + public logMetric(metric: PerformanceMetric): void { + if (metric.totalTime >= this.THRESHOLD_MS) { + const row = + [ + metric.timestamp, + metric.eventType, + metric.operation, + metric.duration.toFixed(2), + metric.totalTime.toFixed(2), + metric.blockNumber || "", + metric.transactionHash || "", + metric.chainId || "", + metric.roundId || "", + metric.applicationId || "", + metric.donorAddress || "", + metric.recipientAddress || "", + metric.amount || "", + metric.amountInUsd || "", + metric.uniqueDonorsCount || "", + metric.totalDonationsCount || "", + JSON.stringify(metric.details || {}), + ].join(",") + "\n"; + + fs.appendFileSync(this.csvPath, row); + } + } +} + +export const performanceLogger = PerformanceLogger.getInstance();