-
Notifications
You must be signed in to change notification settings - Fork 4
fix: prevent synced invoices from getting updated by outdated webhooks #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,51 @@ export class PostgresClient { | |
return results.flatMap((it) => it.rows); | ||
} | ||
|
||
async upsertManyWithTimestampProtection< | ||
T extends { | ||
[Key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any | ||
}, | ||
>( | ||
entries: T[], | ||
table: string, | ||
tableSchema: JsonSchema, | ||
syncTimestamp: string | ||
): Promise<T[]> { | ||
if (!entries.length) return []; | ||
|
||
// Max 5 in parallel to avoid exhausting connection pool | ||
const chunkSize = 5; | ||
const results: pg.QueryResult<T>[] = []; | ||
|
||
for (let i = 0; i < entries.length; i += chunkSize) { | ||
const chunk = entries.slice(i, i + chunkSize); | ||
|
||
const queries: Promise<pg.QueryResult<T>>[] = []; | ||
chunk.forEach((entry) => { | ||
// Inject the values | ||
const cleansed = this.cleanseArrayField(entry, tableSchema); | ||
// Add last_synced_at to the cleansed data for SQL parameter binding | ||
cleansed.last_synced_at = syncTimestamp; | ||
|
||
const upsertSql = this.constructUpsertWithTimestampProtectionSql( | ||
this.config.schema, | ||
table, | ||
tableSchema | ||
); | ||
|
||
const prepared = sql(upsertSql, { | ||
useNullForMissing: true, | ||
})(cleansed); | ||
|
||
queries.push(this.pool.query(prepared.text, prepared.values)); | ||
}); | ||
|
||
results.push(...(await Promise.all(queries))); | ||
} | ||
|
||
return results.flatMap((it) => it.rows); | ||
} | ||
|
||
private constructUpsertSql = (schema: string, table: string, tableSchema: JsonSchema): string => { | ||
const conflict = 'id'; | ||
const properties = tableSchema.properties; | ||
|
@@ -72,6 +117,38 @@ export class PostgresClient { | |
;`; | ||
}; | ||
|
||
private constructUpsertWithTimestampProtectionSql = ( | ||
schema: string, | ||
table: string, | ||
tableSchema: JsonSchema | ||
): string => { | ||
const conflict = 'id'; | ||
const properties = tableSchema.properties; | ||
|
||
// The WHERE clause in ON CONFLICT DO UPDATE only applies to the conflicting row | ||
// (the row being updated), not to all rows in the table. PostgreSQL ensures that | ||
// the condition is evaluated only for the specific row that conflicts with the INSERT. | ||
return ` | ||
INSERT INTO "${schema}"."${table}" ( | ||
${Object.keys(properties) | ||
.map((x) => `"${x}"`) | ||
.join(',')} | ||
) | ||
VALUES ( | ||
${Object.keys(properties) | ||
.map((x) => `:${x}`) | ||
.join(',')} | ||
) | ||
ON CONFLICT (${conflict}) DO UPDATE SET | ||
${Object.keys(properties) | ||
.filter((x) => x !== 'last_synced_at') | ||
.map((x) => `"${x}" = EXCLUDED."${x}"`) | ||
.join(',')}, | ||
last_synced_at = :last_synced_at | ||
WHERE "${table}"."last_synced_at" IS NULL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will upsert if ANY entry in the table matches this, but not necessarily this id, we can do something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The integration tests likely did not catch this (if true), given you test for a single entry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch! You're right - I'll add a test to cover this too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to do a deep dive into the Postgres docs, but it seems that Postgres actually scopes the Specifically:
So even though it looks like Source:
That said, I find this to be quite non-intuitive (even a bit misleading 😅). I've added a test to verify that this works and reviewed it thoroughly, and I believe it addresses this case. I've added a comment too. Let me know what you think :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sgtm, also tested this locally and it makes sense |
||
OR "${table}"."last_synced_at" < :last_synced_at;`; | ||
}; | ||
|
||
/** | ||
* Updates a subscription's billing cycle dates, provided that the current end date is in the past (i.e. the subscription | ||
* data in the database being stale). | ||
|
Uh oh!
There was an error while loading. Please reload this page.