Skip to content
Draft
77 changes: 66 additions & 11 deletions packages/indexer-database/src/utils/BlockchainEventRepository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { DataSource, EntityTarget, ObjectLiteral } from "typeorm";
import {
DataSource,
EntityManager,
EntityTarget,
ObjectLiteral,
} from "typeorm";
import winston from "winston";
import { createHash } from "crypto";

import { SaveQueryResultType, SaveQueryResult } from "../model";

Expand All @@ -13,6 +19,16 @@ export function filterSaveQueryResults<Entity extends ObjectLiteral>(
.filter((data) => data !== undefined);
}

// Helper to convert a string (e.g. uniquess key) into a 64-bit integer for Postgres
function generateAdvisoryLockId(uniqueId: string): [number, number] {
const hash = createHash("sha256").update(uniqueId).digest("hex");
// Take first 8 bytes (16 hex chars) to create two 32-bit integers
const part1 = parseInt(hash.substring(0, 8), 16);
const part2 = parseInt(hash.substring(8, 16), 16);
// Postgres pg_advisory_xact_lock accepts two 32-bit ints to form one 64-bit key
return [part1, part2];
}

export class BlockchainEventRepository {
constructor(
protected postgres: DataSource,
Expand All @@ -33,16 +49,19 @@ export class BlockchainEventRepository {
uniqueKeys: (keyof Entity)[],
comparisonKeys: (keyof Entity)[],
): Promise<SaveQueryResult<Entity>[]> {
return Promise.all(
data.map((dataItem) =>
this.saveAndHandleFinalisation(
entity,
dataItem,
uniqueKeys,
comparisonKeys,
return this.postgres.transaction(async (manager) => {
return Promise.all(
data.map((dataItem) =>
this.saveAndHandleFinalisation(
entity,
dataItem,
uniqueKeys,
comparisonKeys,
manager,
),
),
),
);
);
});
}

/**
Expand All @@ -58,6 +77,7 @@ export class BlockchainEventRepository {
data: Partial<Entity>,
uniqueKeys: (keyof Entity)[],
comparisonKeys: (keyof Entity)[],
transactionalManager: EntityManager,
): Promise<SaveQueryResult<Entity>> {
const where = uniqueKeys.reduce(
(acc, key) => {
Expand All @@ -66,7 +86,42 @@ export class BlockchainEventRepository {
},
{} as Record<keyof Entity, any>,
);
const repository = this.postgres.getRepository(entity);

// Generate a unique string ID for this event
// e.g., if uniqueKeys is ['transactionHash'], idString is "0x123..."
const uniqueString = uniqueKeys.map((k) => data[k]).join("-");

// ACQUIRE ADVISORY LOCK (Virtual Mutex)
// We generate a deterministic 64-bit hash from the unique keys of the event.
// `pg_advisory_xact_lock` acquires an exclusive lock on this specific number number
// for the duration of the transaction.
//
// How this solves Race Conditions:
//
// 1. Solves "Insert Race" (The Gap Problem):
// - In standard Postgres (Read Committed), locking a non-existent row does nothing.
// - By locking the *hash* of the ID instead of the row itself, we create a
// "Virtual Gap Lock."
// - If Process A and Process B both try to process the same new event, Process A
// gets the lock. Process B MUST wait at this line, even if the row doesn't exist yet.
//
// 2. Solves "Update Race":
// - Identical to `SELECT FOR UPDATE`, this ensures that once the row is created,
// only one process can read/modify it at a time.
//
// 3. Guarantees Atomicity:
// - Process B will only unfreeze after Process A commits. When Process B finally
// runs `findOne`, it is guaranteed to see the record inserted by Process A,
// preventing a "Duplicate Key" error.
const [key1, key2] = generateAdvisoryLockId(uniqueString);

await transactionalManager.query(`SELECT pg_advisory_xact_lock($1, $2)`, [
key1,
key2,
]);

const repository = transactionalManager.getRepository(entity);
// the Advisory Lock guarantees we are the only one working on this ID.
const dbEntity = await repository.findOne({ where });

if (!dbEntity) {
Expand Down
Loading