Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
406 changes: 203 additions & 203 deletions apps/indexer/config.yaml

Large diffs are not rendered by default.

158 changes: 139 additions & 19 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,32 @@ export class Orchestrator {

await this.retryHandler.execute(
async () => {
this.logger.debug("Starting event processing", {
className: Orchestrator.name,
chainId: this.chainId,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
blockNumber: event!.blockNumber,
logIndex: event!.logIndex,
});

const changesets = await this.handleEvent(event!);

this.logger.debug("Event handling completed", {
className: Orchestrator.name,
chainId: this.chainId,
hasChangesets: !!changesets,
changesetsCount: changesets?.length ?? 0,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
});

if (changesets) {
this.logger.debug("Applying changesets with processed event", {
className: Orchestrator.name,
chainId: this.chainId,
totalChangesets: changesets.length + 1,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
});

await this.dataLoader.applyChanges([
...changesets,
{
Expand All @@ -179,6 +203,12 @@ export class Orchestrator {
},
]);
} else {
this.logger.debug("Applying only processed event record", {
className: Orchestrator.name,
chainId: this.chainId,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
});

await this.dataLoader.applyChanges([
{
type: "InsertProcessedEvent",
Expand All @@ -192,21 +222,45 @@ export class Orchestrator {
},
]);
}

this.logger.debug("Changes applied successfully", {
className: Orchestrator.name,
chainId: this.chainId,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
});
},
{ abortSignal: signal },
);
processedEvents++;
this.logger.info(`Processed events: ${processedEvents}/${totalEvents}`, {
className: Orchestrator.name,
chainId: this.chainId,
progress: `${((processedEvents / totalEvents) * 100).toFixed(2)}%`,
currentEventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
currentBlock: event!.blockNumber,
});
} catch (error: unknown) {
this.logger.warn("Error encountered during event processing", {
className: Orchestrator.name,
chainId: this.chainId,
eventIdentifier: event ? `${event.blockNumber}:${event.logIndex}` : undefined,
errorType: error instanceof Error ? error.constructor.name : typeof error,
});

if (event) {
this.logger.debug("Saving last processed event before error handling", {
className: Orchestrator.name,
chainId: this.chainId,
eventIdentifier: `${event.blockNumber}:${event.logIndex}`,
blockNumber: event.blockNumber,
});

await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});
}

if (
error instanceof UnsupportedStrategy ||
error instanceof InvalidEvent ||
Expand All @@ -218,6 +272,8 @@ export class Orchestrator {
className: Orchestrator.name,
chainId: this.chainId,
event,
errorType: error.constructor.name,
errorDetails: error.message,
},
);
} else {
Expand All @@ -227,6 +283,8 @@ export class Orchestrator {
event,
className: Orchestrator.name,
chainId: this.chainId,
lastRetryDelay: error.metadata?.retryAfterInMs,
reason: error.metadata?.failureReason,
});
void this.notifier.send(error.message, {
chainId: this.chainId,
Expand All @@ -238,11 +296,21 @@ export class Orchestrator {
error,
event!,
);

this.logger.debug("Checking if error should be ignored", {
className: Orchestrator.name,
chainId: this.chainId,
shouldIgnoreError,
errorType: error.constructor.name,
eventIdentifier: `${event!.blockNumber}:${event!.logIndex}`,
});

if (!shouldIgnoreError) {
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
errorStack: error.stack,
});
void this.notifier.send(error.message, {
chainId: this.chainId,
Expand All @@ -251,20 +319,16 @@ export class Orchestrator {
});
}
} else {
this.logger.error(
new Error(`Error processing event: ${stringify(event)} ${error}`),
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
void this.notifier.send(
`Error processing event: ${stringify(event)} ${error}`,
{
chainId: this.chainId,
event: event!,
},
);
const errorMessage = `Error processing event: ${stringify(event)} ${error}`;
this.logger.error(new Error(errorMessage), {
className: Orchestrator.name,
chainId: this.chainId,
unknownErrorType: typeof error,
});
void this.notifier.send(errorMessage, {
chainId: this.chainId,
event: event!,
});
}
}
}
Expand Down Expand Up @@ -405,23 +469,79 @@ export class Orchestrator {
* @returns The token prices
*/
private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> {
this.logger.info(`Starting bulk fetch for ${tokens.length} token prices`, {
className: Orchestrator.name,
chainId: this.chainId,
tokens: tokens.map((t) => t.token.priceSourceCode),
timestampCount: tokens[0]?.timestamps.length ?? 0,
});

const results = await Promise.allSettled(
tokens.map(({ token, timestamps }) =>
this.retryHandler.execute(async () => {
const prices = await this.dependencies.pricingProvider.getTokenPrices(
token.priceSourceCode,
timestamps,
);
return prices;
this.logger.debug(`Fetching prices for token ${token.priceSourceCode}`, {
className: Orchestrator.name,
chainId: this.chainId,
timestampCount: timestamps.length,
});

try {
const prices = await this.dependencies.pricingProvider.getTokenPrices(
token.priceSourceCode,
timestamps,
);
this.logger.debug(
`Successfully fetched prices for ${token.priceSourceCode}`,
{
className: Orchestrator.name,
chainId: this.chainId,
priceCount: prices.length,
},
);
return prices;
} catch (error) {
this.logger.error(`Failed to fetch prices for ${token.priceSourceCode}`, {
className: Orchestrator.name,
chainId: this.chainId,
error: error instanceof Error ? error.message : String(error),
timestamps: timestamps.map((t) => new Date(t).toISOString()),
});
throw error;
}
}),
),
);

const tokenPrices: TokenPrice[] = [];
let fulfilledCount = 0;
let rejectedCount = 0;

for (const result of results) {
if (result.status === "fulfilled" && result.value) {
tokenPrices.push(...result.value);
fulfilledCount++;
} else if (result.status === "rejected") {
rejectedCount++;
this.logger.warn(`Token price fetch rejected`, {
className: Orchestrator.name,
chainId: this.chainId,
error:
result.reason instanceof Error
? result.reason.message
: String(result.reason),
});
}
}

this.logger.info(`Completed bulk token price fetch`, {
className: Orchestrator.name,
chainId: this.chainId,
totalRequests: results.length,
successful: fulfilledCount,
failed: rejectedCount,
totalPricesFetched: tokenPrices.length,
});

return tokenPrices;
}

Expand Down
Loading
Loading