diff --git a/packages/indexer-database/src/utils/BlockchainEventRepository.ts b/packages/indexer-database/src/utils/BlockchainEventRepository.ts index 26745f24..2c06f651 100644 --- a/packages/indexer-database/src/utils/BlockchainEventRepository.ts +++ b/packages/indexer-database/src/utils/BlockchainEventRepository.ts @@ -1,5 +1,11 @@ -import { DataSource, EntityTarget, ObjectLiteral } from "typeorm"; +import { + DataSource, + EntityManager, + EntityTarget, + ObjectLiteral, +} from "typeorm"; import winston from "winston"; +import { createHash } from "crypto"; import { SaveQueryResultType, SaveQueryResult } from "../model"; @@ -13,6 +19,16 @@ export function filterSaveQueryResults( .filter((data) => data !== undefined); } +// Helper to convert a string (e.g. uniquess key) into a 64-bit integer for Postgres +function generateAdvisoryLockId(uniqueId: string): [number, number] { + const hash = createHash("sha256").update(uniqueId).digest("hex"); + // Take first 8 bytes (16 hex chars) to create two 32-bit integers + const part1 = parseInt(hash.substring(0, 8), 16); + const part2 = parseInt(hash.substring(8, 16), 16); + // Postgres pg_advisory_xact_lock accepts two 32-bit ints to form one 64-bit key + return [part1, part2]; +} + export class BlockchainEventRepository { constructor( protected postgres: DataSource, @@ -33,16 +49,19 @@ export class BlockchainEventRepository { uniqueKeys: (keyof Entity)[], comparisonKeys: (keyof Entity)[], ): Promise[]> { - return Promise.all( - data.map((dataItem) => - this.saveAndHandleFinalisation( - entity, - dataItem, - uniqueKeys, - comparisonKeys, + return this.postgres.transaction(async (manager) => { + return Promise.all( + data.map((dataItem) => + this.saveAndHandleFinalisation( + entity, + dataItem, + uniqueKeys, + comparisonKeys, + manager, + ), ), - ), - ); + ); + }); } /** @@ -58,6 +77,7 @@ export class BlockchainEventRepository { data: Partial, uniqueKeys: (keyof Entity)[], comparisonKeys: (keyof Entity)[], + transactionalManager: EntityManager, ): Promise> { const where = uniqueKeys.reduce( (acc, key) => { @@ -66,7 +86,42 @@ export class BlockchainEventRepository { }, {} as Record, ); - const repository = this.postgres.getRepository(entity); + + // Generate a unique string ID for this event + // e.g., if uniqueKeys is ['transactionHash'], idString is "0x123..." + const uniqueString = uniqueKeys.map((k) => data[k]).join("-"); + + // ACQUIRE ADVISORY LOCK (Virtual Mutex) + // We generate a deterministic 64-bit hash from the unique keys of the event. + // `pg_advisory_xact_lock` acquires an exclusive lock on this specific number number + // for the duration of the transaction. + // + // How this solves Race Conditions: + // + // 1. Solves "Insert Race" (The Gap Problem): + // - In standard Postgres (Read Committed), locking a non-existent row does nothing. + // - By locking the *hash* of the ID instead of the row itself, we create a + // "Virtual Gap Lock." + // - If Process A and Process B both try to process the same new event, Process A + // gets the lock. Process B MUST wait at this line, even if the row doesn't exist yet. + // + // 2. Solves "Update Race": + // - Identical to `SELECT FOR UPDATE`, this ensures that once the row is created, + // only one process can read/modify it at a time. + // + // 3. Guarantees Atomicity: + // - Process B will only unfreeze after Process A commits. When Process B finally + // runs `findOne`, it is guaranteed to see the record inserted by Process A, + // preventing a "Duplicate Key" error. + const [key1, key2] = generateAdvisoryLockId(uniqueString); + + await transactionalManager.query(`SELECT pg_advisory_xact_lock($1, $2)`, [ + key1, + key2, + ]); + + const repository = transactionalManager.getRepository(entity); + // the Advisory Lock guarantees we are the only one working on this ID. const dbEntity = await repository.findOne({ where }); if (!dbEntity) {