diff --git a/packages/indexer-database/src/entities/Deposit.ts b/packages/indexer-database/src/entities/Deposit.ts new file mode 100644 index 00000000..00333049 --- /dev/null +++ b/packages/indexer-database/src/entities/Deposit.ts @@ -0,0 +1,138 @@ +import { + Column, + CreateDateColumn, + Entity, + Index, + JoinColumn, + OneToOne, + PrimaryGeneratedColumn, + UpdateDateColumn, + Unique, +} from "typeorm"; + +// Import your existing entities +import { V3FundsDeposited } from "./evm/V3FundsDeposited"; +import { FilledV3Relay } from "./evm/FilledV3Relay"; +import { DepositForBurn } from "./evm/DepositForBurn"; +import { MintAndWithdraw } from "./evm/MintAndWithdraw"; +import { OFTSent } from "./evm/OftSent"; +import { OFTReceived } from "./evm/OftReceived"; + +export enum DepositType { + ACROSS = "across", + CCTP = "cctp", + OFT = "oft", +} + +export enum DepositStatus { + PENDING = "pending", + FILLED = "filled", +} + +@Entity({ schema: "public" }) +@Unique("UK_deposits_uniqueId", ["uniqueId"]) +// 1. Global Feed Index: Instant sorting by time +@Index("IX_deposits_blockTimestamp", ["blockTimestamp"]) +// 2. User History Indices: Instant filtering by user + sorting +@Index("IX_deposits_depositor_timestamp", ["depositor", "blockTimestamp"]) +@Index("IX_deposits_recipient_timestamp", ["recipient", "blockTimestamp"]) +// 3. Status Index: Fast "Unfilled" lookups +@Index("IX_deposits_status_timestamp", ["status", "blockTimestamp"]) +export class Deposit { + @PrimaryGeneratedColumn() + id: number; + + /** + * The ID which stitches together all the relevant events for a given transfer type. + * OFT: guid + * CCTP: nonce-destinationChainId + * Across: relayHash / internalHash + */ + @Column() + uniqueId: string; + + @Column({ type: "enum", enum: DepositType }) + type: DepositType; + + @Column({ type: "enum", enum: DepositStatus, default: DepositStatus.PENDING }) + status: DepositStatus; + + // --- Denormalized Search Fields --- + + /** + * The timestamp of the first event seen for a given uniqueId. + */ + @Column() + blockTimestamp: Date; + + @Column({ type: "bigint" }) + originChainId: string; + + @Column({ type: "bigint" }) + destinationChainId: string; + + /** + * Nullable because an Orphan Fill (e.g. OFTReceived) does not know the depositor. + * We update this when the source event arrives. + */ + @Column({ nullable: true }) + depositor: string; + + @Column({ nullable: true }) + recipient: string; + + // --- Foreign Keys (Nullable for Orphan Support) --- + + // Across V3 + @Column({ nullable: true }) + v3FundsDepositedId: number | null; + + @OneToOne(() => V3FundsDeposited, { nullable: true }) + @JoinColumn({ name: "v3FundsDepositedId" }) + v3FundsDeposited: V3FundsDeposited; + + @Column({ nullable: true }) + filledV3RelayId: number | null; + + @OneToOne(() => FilledV3Relay, { nullable: true }) + @JoinColumn({ name: "filledV3RelayId" }) + filledV3Relay: FilledV3Relay; + + // CCTP + @Column({ nullable: true }) + depositForBurnId: number | null; + + @OneToOne(() => DepositForBurn, { nullable: true }) + @JoinColumn({ name: "depositForBurnId" }) + depositForBurn: DepositForBurn; + + @Column({ nullable: true }) + mintAndWithdrawId: number | null; + + @OneToOne(() => MintAndWithdraw, { nullable: true }) + @JoinColumn({ name: "mintAndWithdrawId" }) + mintAndWithdraw: MintAndWithdraw; + + // OFT + @Column({ nullable: true }) + oftSentId: number | null; + + @OneToOne(() => OFTSent, { nullable: true }) + @JoinColumn({ name: "oftSentId" }) + oftSent: OFTSent; + + @Column({ nullable: true }) + oftReceivedId: number | null; + + @OneToOne(() => OFTReceived, { nullable: true }) + @JoinColumn({ name: "oftReceivedId" }) + oftReceived: OFTReceived; + + // --- Metadata --- + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/packages/indexer-database/src/entities/index.ts b/packages/indexer-database/src/entities/index.ts index 829d1662..156ba1c1 100644 --- a/packages/indexer-database/src/entities/index.ts +++ b/packages/indexer-database/src/entities/index.ts @@ -54,3 +54,6 @@ export * from "./evm/FallbackHyperEVMFlowCompleted"; export * from "./evm/SponsoredAccountActivation"; export * from "./evm/SwapFlowInitialized"; export * from "./evm/SwapFlowFinalized"; + +// Deposits +export * from "./Deposit"; diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 5e96a412..f51d68c9 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -83,6 +83,8 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { entities.SponsoredAccountActivation, entities.SwapFlowInitialized, entities.SwapFlowFinalized, + // Deposits + entities.Deposit, ], migrationsTableName: "_migrations", migrations: ["migrations/*.ts"], diff --git a/packages/indexer-database/src/migrations/1764868811392-Deposit.ts b/packages/indexer-database/src/migrations/1764868811392-Deposit.ts new file mode 100644 index 00000000..e0228c9f --- /dev/null +++ b/packages/indexer-database/src/migrations/1764868811392-Deposit.ts @@ -0,0 +1,123 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class CreateDepositTable1764868811392 implements MigrationInterface { + name = "CreateDepositTable1764868811392"; + + public async up(queryRunner: QueryRunner): Promise { + // Create Enums + await queryRunner.query( + `CREATE TYPE "public"."deposit_type_enum" AS ENUM('across', 'cctp', 'oft')`, + ); + await queryRunner.query( + `CREATE TYPE "public"."deposit_status_enum" AS ENUM('pending', 'filled')`, + ); + + // Create Table + await queryRunner.query( + `CREATE TABLE "public"."deposit" ( + "id" SERIAL NOT NULL, + "uniqueId" character varying NOT NULL, + "type" "public"."deposit_type_enum" NOT NULL, + "status" "public"."deposit_status_enum" NOT NULL DEFAULT 'pending', + "blockTimestamp" TIMESTAMP NOT NULL, + "originChainId" bigint NOT NULL, + "destinationChainId" bigint NOT NULL, + "depositor" character varying, + "recipient" character varying, + "v3FundsDepositedId" integer, + "filledV3RelayId" integer, + "depositForBurnId" integer, + "mintAndWithdrawId" integer, + "oftSentId" integer, + "oftReceivedId" integer, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_deposits_uniqueId" UNIQUE ("uniqueId"), + CONSTRAINT "PK_deposit" PRIMARY KEY ("id") + )`, + ); + + // Create Indices + await queryRunner.query( + `CREATE INDEX "IX_deposits_blockTimestamp" ON "public"."deposit" ("blockTimestamp")`, + ); + // User history lookups + await queryRunner.query( + `CREATE INDEX "IX_deposits_depositor_timestamp" ON "public"."deposit" ("depositor", "blockTimestamp")`, + ); + await queryRunner.query( + `CREATE INDEX "IX_deposits_recipient_timestamp" ON "public"."deposit" ("recipient", "blockTimestamp")`, + ); + // Status lookups (for finding unfilled deposits) + await queryRunner.query( + `CREATE INDEX "IX_deposits_status_timestamp" ON "public"."deposit" ("status", "blockTimestamp")`, + ); + + // Add Foreign Keys + + // Across + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_v3FundsDeposited" FOREIGN KEY ("v3FundsDepositedId") REFERENCES "evm"."v3_funds_deposited"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_filledV3Relay" FOREIGN KEY ("filledV3RelayId") REFERENCES "evm"."filled_v3_relay"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + + // CCTP + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_depositForBurn" FOREIGN KEY ("depositForBurnId") REFERENCES "evm"."deposit_for_burn"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_mintAndWithdraw" FOREIGN KEY ("mintAndWithdrawId") REFERENCES "evm"."mint_and_withdraw"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + + // OFT + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_oftSent" FOREIGN KEY ("oftSentId") REFERENCES "evm"."oft_sent"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" ADD CONSTRAINT "FK_deposit_oftReceived" FOREIGN KEY ("oftReceivedId") REFERENCES "evm"."oft_received"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + // Drop Foreign Keys + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_oftReceived"`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_oftSent"`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_mintAndWithdraw"`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_depositForBurn"`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_filledV3Relay"`, + ); + await queryRunner.query( + `ALTER TABLE "public"."deposit" DROP CONSTRAINT "FK_deposit_v3FundsDeposited"`, + ); + + // Drop Indices + await queryRunner.query( + `DROP INDEX "public"."IX_deposits_status_timestamp"`, + ); + await queryRunner.query( + `DROP INDEX "public"."IX_deposits_recipient_timestamp"`, + ); + await queryRunner.query( + `DROP INDEX "public"."IX_deposits_depositor_timestamp"`, + ); + await queryRunner.query(`DROP INDEX "public"."IX_deposits_blockTimestamp"`); + + // Drop Table + await queryRunner.query(`DROP TABLE "public"."deposit"`); + + // Drop Enums + await queryRunner.query(`DROP TYPE "public"."deposit_status_enum"`); + await queryRunner.query(`DROP TYPE "public"."deposit_type_enum"`); + } +} diff --git a/packages/indexer/src/data-indexing/service/CCTPIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/CCTPIndexerDataHandler.ts index 036dee6b..0f4da710 100644 --- a/packages/indexer/src/data-indexing/service/CCTPIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/CCTPIndexerDataHandler.ts @@ -3,6 +3,7 @@ import { ethers, providers, Transaction } from "ethers"; import * as across from "@across-protocol/sdk"; import { CHAIN_IDs, TEST_NETWORKS } from "@across-protocol/constants"; import { formatFromAddressToChainFormat } from "../../utils"; +import { updateDeposits } from "../../database/Deposits"; import { BlockRange, SimpleTransferFlowCompletedLog, @@ -700,6 +701,36 @@ export class CCTPIndexerDataHandler implements IndexerDataHandler { ), ]); + // We update the deposits table if we see new burn or mint events + await Promise.all([ + ...savedBurnEvents.map(({ depositForBurnEvent, messageSentEvent }) => + updateDeposits({ + dataSource: (this.cctpRepository as any).postgres, + depositUpdate: { + cctp: { + burn: { + depositForBurn: depositForBurnEvent.data, + messageSent: messageSentEvent.data, + }, + }, + }, + }), + ), + ...savedMintEvents.map(({ mintAndWithdrawEvent, messageReceivedEvent }) => + updateDeposits({ + dataSource: (this.cctpRepository as any).postgres, + depositUpdate: { + cctp: { + mint: { + mintAndWithdraw: mintAndWithdrawEvent.data, + messageReceived: messageReceivedEvent.data, + }, + }, + }, + }), + ), + ]); + return { savedBurnEvents, savedMintEvents, diff --git a/packages/indexer/src/data-indexing/service/OFTIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/OFTIndexerDataHandler.ts index a5d4b230..d78630c6 100644 --- a/packages/indexer/src/data-indexing/service/OFTIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/OFTIndexerDataHandler.ts @@ -3,7 +3,7 @@ import { ethers, providers, Transaction } from "ethers"; import * as across from "@across-protocol/sdk"; import { entities, SaveQueryResult } from "@repo/indexer-database"; - +import { updateDeposits } from "../../database/Deposits"; import { ArbitraryActionsExecutedLog, BlockRange, @@ -417,6 +417,30 @@ export class OFTIndexerDataHandler implements IndexerDataHandler { ), ]); + // We update the deposits table if we see new sent or received events + await Promise.all([ + ...savedOftSentEvents.map((oftSent) => + updateDeposits({ + dataSource: (this.oftRepository as any).postgres, + depositUpdate: { + oft: { + sent: oftSent.data, + }, + }, + }), + ), + ...savedOftReceivedEvents.map((oftReceived) => + updateDeposits({ + dataSource: (this.oftRepository as any).postgres, + depositUpdate: { + oft: { + received: oftReceived.data, + }, + }, + }), + ), + ]); + return { oftSentEvents: savedOftSentEvents, oftReceivedEvents: savedOftReceivedEvents, diff --git a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts index a39fbdb5..33385b5e 100644 --- a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts @@ -12,7 +12,7 @@ import { } from "@repo/indexer-database"; import { BlockRange } from "../model"; import { IndexerDataHandler } from "./IndexerDataHandler"; - +import { updateDeposits } from "../../database/Deposits"; import * as utils from "../../utils"; import { SpokePoolRepository, @@ -803,6 +803,31 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { lastFinalisedBlock, ), ]); + + // We update the deposits table if we see a new deposit or fill event + await Promise.all([ + ...savedV3FundsDepositedEvents.map((depositEvent) => { + return updateDeposits({ + dataSource: (this.spokePoolClientRepository as any).postgres, + depositUpdate: { + across: { + deposit: depositEvent.data, + }, + }, + }); + }), + ...savedFilledV3RelayEvents.map((fillEvent) => { + return updateDeposits({ + dataSource: (this.spokePoolClientRepository as any).postgres, + depositUpdate: { + across: { + fill: fillEvent.data, + }, + }, + }); + }), + ]); + return { deposits: savedV3FundsDepositedEvents, fills: savedFilledV3RelayEvents, diff --git a/packages/indexer/src/data-indexing/service/eventProcessing.ts b/packages/indexer/src/data-indexing/service/eventProcessing.ts index 11aa14ca..e315c864 100644 --- a/packages/indexer/src/data-indexing/service/eventProcessing.ts +++ b/packages/indexer/src/data-indexing/service/eventProcessing.ts @@ -43,7 +43,10 @@ export async function formatAndSaveEvents( return formatEvent(event, finalised, blockTimestamp, chainId); }); - const chunkedEvents = across.utils.chunk(formattedEvents, chunkSize); + const chunkedEvents: Partial[][] = across.utils.chunk( + formattedEvents, + chunkSize, + ); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => repository.saveAndHandleFinalisationBatch( diff --git a/packages/indexer/src/database/CctpRepository.ts b/packages/indexer/src/database/CctpRepository.ts index 9776277a..17b5d147 100644 --- a/packages/indexer/src/database/CctpRepository.ts +++ b/packages/indexer/src/database/CctpRepository.ts @@ -190,7 +190,10 @@ export class CCTPRepository extends dbUtils.BlockchainEventRepository { depositForBurnEvent: SaveQueryResult; messageSentEvent: SaveQueryResult; }[] = []; - const chunkedEvents = across.utils.chunk(burnEvents, this.chunkSize); + const chunkedEvents: BurnEventsPair[][] = across.utils.chunk( + burnEvents, + this.chunkSize, + ); for (const eventsChunk of chunkedEvents) { const savedEventsChunk = await Promise.all( eventsChunk.map(async (eventsPair) => { @@ -231,7 +234,8 @@ export class CCTPRepository extends dbUtils.BlockchainEventRepository { }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -313,7 +317,8 @@ export class CCTPRepository extends dbUtils.BlockchainEventRepository { }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -338,7 +343,10 @@ export class CCTPRepository extends dbUtils.BlockchainEventRepository { messageReceivedEvent: SaveQueryResult; mintAndWithdrawEvent: SaveQueryResult; }[] = []; - const chunkedEvents = across.utils.chunk(mintEvents, this.chunkSize); + const chunkedEvents: MintEventsPair[][] = across.utils.chunk( + mintEvents, + this.chunkSize, + ); for (const eventsChunk of chunkedEvents) { const savedEventsChunk = await Promise.all( eventsChunk.map(async (eventsPair) => { diff --git a/packages/indexer/src/database/Deposits.ts b/packages/indexer/src/database/Deposits.ts new file mode 100644 index 00000000..1ec0d369 --- /dev/null +++ b/packages/indexer/src/database/Deposits.ts @@ -0,0 +1,342 @@ +import { Repository, DataSource } from "typeorm"; +import { entities } from "@repo/indexer-database"; +import { getCctpDestinationChainFromDomain } from "../data-indexing/adapter/cctp-v2/service"; +import { getChainIdForEndpointId } from "../data-indexing/adapter/oft/service"; + +/** + * Enum to define the type of update being performed on the Deposit index. + */ +export enum DepositUpdateType { + DEPOSIT = "DEPOSIT", // Source event + FILL = "FILL", // Destination event +} + +// --- Input Types --- + +export type AcrossDepositUpdate = { + deposit?: entities.V3FundsDeposited; + fill?: entities.FilledV3Relay; +}; + +export type OftDepositUpdate = { + sent?: entities.OFTSent; + received?: entities.OFTReceived; +}; + +export type CctpDepositUpdate = { + burn?: { + depositForBurn?: entities.DepositForBurn; + messageSent: entities.MessageSent; + }; + mint?: { + mintAndWithdraw?: entities.MintAndWithdraw; + messageReceived: entities.MessageReceived; + }; +}; + +export type DepositUpdaterRequestType = { + dataSource: DataSource; + depositUpdate: { + across?: AcrossDepositUpdate; + cctp?: CctpDepositUpdate; + oft?: OftDepositUpdate; + }; +}; + +/** + * Main entry point to update the central Deposit index. + * This function orchestrates the update process by delegating to protocol-specific handlers. + * @param request - The request object containing the data source and the deposit update payload. + * @returns A promise that resolves when the update is complete. + */ +export async function updateDeposits( + request: DepositUpdaterRequestType, +): Promise { + const { dataSource, depositUpdate } = request; + const depositRepo = dataSource.getRepository(entities.Deposit); + + // --- ACROSS --- + if (depositUpdate.across) { + const { deposit, fill } = depositUpdate.across; + if (deposit) { + await handleAcrossDeposit(deposit, depositRepo); + } + if (fill) { + await handleAcrossFill(fill, depositRepo); + } + } + + // --- CCTP --- + else if (depositUpdate.cctp) { + const { burn, mint } = depositUpdate.cctp; + if (burn) { + await handleCctpBurn(burn, depositRepo); + } + if (mint) { + await handleCctpMint(mint, depositRepo); + } + } + + // --- OFT --- + else if (depositUpdate.oft) { + const { sent, received } = depositUpdate.oft; + if (sent) { + await handleOftSent(sent, depositRepo); + } + if (received) { + await handleOftReceived(received, depositRepo); + } + } +} + +// --- Protocol Handlers --- + +/** + * Handles the processing of an Across deposit event (V3FundsDeposited). + * It creates or updates a deposit record based on the event data. + * @param event - The V3FundsDeposited entity from the database. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleAcrossDeposit( + event: entities.V3FundsDeposited, + depositRepo: Repository, +) { + // Across uses internalHash (or relayHash) as the unique identifier + const uniqueId = event.internalHash; + + await upsertDepositRecord( + depositRepo, + uniqueId, + entities.DepositType.ACROSS, + { + originChainId: event.originChainId, + destinationChainId: event.destinationChainId, + depositor: event.depositor, + recipient: event.recipient, + blockTimestamp: event.blockTimestamp, + v3FundsDepositedId: event.id, + }, + DepositUpdateType.DEPOSIT, + ); +} + +/** + * Handles the processing of an Across fill event (FilledV3Relay). + * It updates an existing deposit record with the fill information. + * @param event - The FilledV3Relay entity from the database. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleAcrossFill( + event: entities.FilledV3Relay, + depositRepo: Repository, +) { + const uniqueId = event.internalHash; + + await upsertDepositRecord( + depositRepo, + uniqueId, + entities.DepositType.ACROSS, + { + originChainId: event.originChainId, + destinationChainId: event.destinationChainId, + filledV3RelayId: event.id, + recipient: event.recipient, + depositor: event.depositor, + blockTimestamp: event.blockTimestamp, // Fallback timestamp + }, + DepositUpdateType.FILL, + ); +} + +/** + * Handles the processing of a CCTP burn event, which signifies the start of a CCTP transfer. + * It combines data from `MessageSent` and optionally `DepositForBurn` to create a deposit record. + * @param data - An object containing the `MessageSent` and optional `DepositForBurn` entities. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleCctpBurn( + data: { + depositForBurn?: entities.DepositForBurn; + messageSent: entities.MessageSent; + }, + depositRepo: Repository, +) { + const { depositForBurn, messageSent } = data; + const destinationChainId = getCctpDestinationChainFromDomain( + messageSent.destinationDomain, + ).toString(); + // CCTP's unique identifier for a transfer is the combination of the message nonce and the destination chain's domain. + const uniqueId = `${messageSent.nonce}-${destinationChainId}`; + + // Prepare updates + const updates: Partial = { + originChainId: getCctpDestinationChainFromDomain( + messageSent.sourceDomain, + ).toString(), + destinationChainId, + recipient: messageSent.recipient, + blockTimestamp: messageSent.blockTimestamp, + }; + + if (depositForBurn) { + updates.depositForBurnId = depositForBurn.id; + updates.depositor = depositForBurn.depositor; + } + + await upsertDepositRecord( + depositRepo, + uniqueId, + entities.DepositType.CCTP, + updates, + DepositUpdateType.DEPOSIT, + ); +} + +/** + * Handles the processing of a CCTP mint event, which signifies the completion of a CCTP transfer. + * It combines data from `MessageReceived` and optionally `MintAndWithdraw` to update a deposit record. + * @param data - An object containing the `MessageReceived` and optional `MintAndWithdraw` entities. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleCctpMint( + data: { + mintAndWithdraw?: entities.MintAndWithdraw; + messageReceived: entities.MessageReceived; + }, + depositRepo: Repository, +) { + const { mintAndWithdraw, messageReceived } = data; + + // The unique identifier is derived from the nonce and the chain ID where the message was received. + const uniqueId = `${messageReceived.nonce}-${messageReceived.chainId}`; + + const updates: Partial = { + originChainId: getCctpDestinationChainFromDomain( + messageReceived.sourceDomain, + ).toString(), + destinationChainId: messageReceived.chainId, + blockTimestamp: messageReceived.blockTimestamp, + }; + + if (mintAndWithdraw) { + updates.mintAndWithdrawId = mintAndWithdraw.id; + updates.recipient = mintAndWithdraw.mintRecipient; + } + + await upsertDepositRecord( + depositRepo, + uniqueId, + entities.DepositType.CCTP, + updates, + DepositUpdateType.FILL, + ); +} + +/** + * Handles the processing of an OFT (Omnichain Fungible Token) sent event. + * This function creates or updates a deposit record when an OFT transfer is initiated. + * @param event - The OFTSent entity from the database. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleOftSent( + event: entities.OFTSent, + depositRepo: Repository, +) { + await upsertDepositRecord( + depositRepo, + event.guid, + entities.DepositType.OFT, + { + originChainId: event.chainId, + destinationChainId: getChainIdForEndpointId(event.dstEid).toString(), + blockTimestamp: event.blockTimestamp, + depositor: event.fromAddress, + oftSentId: event.id, + }, + DepositUpdateType.DEPOSIT, + ); +} + +/** + * Handles the processing of an OFT (Omnichain Fungible Token) received event. + * This function updates a deposit record when an OFT transfer is completed. + * @param event - The OFTReceived entity from the database. + * @param depositRepo - The TypeORM repository for the Deposit entity. + */ +async function handleOftReceived( + event: entities.OFTReceived, + depositRepo: Repository, +) { + await upsertDepositRecord( + depositRepo, + event.guid, + entities.DepositType.OFT, + { + destinationChainId: event.chainId, + originChainId: getChainIdForEndpointId(event.srcEid).toString(), + recipient: event.toAddress, + oftReceivedId: event.id, + blockTimestamp: event.blockTimestamp, + }, + DepositUpdateType.FILL, + ); +} + +// --- Shared Core Logic --- + +/** + * Performs an "upsert" operation for a deposit record. It atomically inserts a new record + * or updates an existing one based on a unique identifier. This is optimized to reduce + * database round trips by using a single `INSERT ... ON CONFLICT` statement. + * @param depositRepo - The TypeORM repository for the Deposit entity. + * @param uniqueId - The unique identifier for the deposit (e.g., relayHash, CCTP nonce-chain, OFT guid). + * @param type - The type of the deposit (e.g., ACROSS, CCTP, OFT). + * @param updates - An object containing the fields to be inserted or updated. + * @param updateType - The type of event triggering the upsert (DEPOSIT or FILL), which determines status handling. + */ +async function upsertDepositRecord( + depositRepo: Repository, + uniqueId: string, + type: entities.DepositType, + updates: Partial, + updateType: DepositUpdateType, +): Promise { + // Prepare the full object to be inserted if the record does not exist. + // The initial status is determined by whether the first seen event is a deposit or a fill. + const insertData = { + uniqueId, + type, + status: + updateType === DepositUpdateType.FILL + ? entities.DepositStatus.FILLED + : entities.DepositStatus.PENDING, + ...updates, + }; + + // Define the conflict target for the upsert operation. + const conflictPaths = ["uniqueId"]; + + // Filter out any keys from the `updates` object that have an `undefined` value. + // This is crucial to prevent `null`ing out columns in the database that already have data + // if the incoming update for that field is not present. + // If an entry already exists we omit the blockTimestamp from being updated. The blockTimestamp will be from whatever event was observed first. + const columnsToUpdate = Object.entries(updates) + .filter(([key, value]) => value !== undefined && key !== "blockTimestamp") + .map(([key]) => key); + + // If the event is a 'FILL', the status must be updated to 'FILLED'. + // If it's a 'DEPOSIT' event, the status is only set on insert and not on update. + // This prevents a late DEPOSIT event from overwriting a FILLED status. + if (updateType === DepositUpdateType.FILL) { + columnsToUpdate.push("status"); + } + + // Execute the upsert using the query builder for `INSERT ... ON CONFLICT` behavior. + await depositRepo + .createQueryBuilder() + .insert() + .into(entities.Deposit) + .values(insertData) + .orUpdate(columnsToUpdate, conflictPaths) + .execute(); +} diff --git a/packages/indexer/src/database/OftRepository.ts b/packages/indexer/src/database/OftRepository.ts index 56c4199f..d651e27a 100644 --- a/packages/indexer/src/database/OftRepository.ts +++ b/packages/indexer/src/database/OftRepository.ts @@ -115,7 +115,10 @@ export class OftRepository extends dbUtils.BlockchainEventRepository { }; }, ); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = across.utils.chunk( + formattedEvents, + this.chunkSize, + ); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -183,7 +186,8 @@ export class OftRepository extends dbUtils.BlockchainEventRepository { }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -219,7 +223,10 @@ export class OftRepository extends dbUtils.BlockchainEventRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = across.utils.chunk( + formattedEvents, + this.chunkSize, + ); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( diff --git a/packages/indexer/src/database/SpokePoolRepository.ts b/packages/indexer/src/database/SpokePoolRepository.ts index 5ff4f323..ac129b03 100644 --- a/packages/indexer/src/database/SpokePoolRepository.ts +++ b/packages/indexer/src/database/SpokePoolRepository.ts @@ -120,7 +120,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { blockTimestamp, }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -178,7 +179,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { blockTimestamp, }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -230,7 +232,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { }; }) .filter((event) => event !== undefined); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -272,7 +275,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { }), ), ); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -303,7 +307,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -347,7 +352,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( @@ -383,7 +389,8 @@ export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { finalised: event.blockNumber <= lastFinalisedBlock, }; }); - const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const chunkedEvents: Partial[][] = + across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => this.saveAndHandleFinalisationBatch( diff --git a/packages/indexer/src/database/tests/updateDeposits.integration.test.ts b/packages/indexer/src/database/tests/updateDeposits.integration.test.ts new file mode 100644 index 00000000..03020fb4 --- /dev/null +++ b/packages/indexer/src/database/tests/updateDeposits.integration.test.ts @@ -0,0 +1,686 @@ +import { assert, expect } from "chai"; +import { DataSource, Repository } from "typeorm"; +import { entities, fixtures } from "@repo/indexer-database"; +import { getTestDataSource } from "../../tests/setup"; +import { updateDeposits, DepositUpdaterRequestType } from "../Deposits"; +import { getChainIdForEndpointId } from "../../data-indexing/adapter/oft/service"; +import { getCctpDestinationChainFromDomain } from "../../data-indexing/adapter/cctp-v2/service"; + +// --- Mock Data Generators (Defaults) --- + +const mockV3Deposit = (overrides: Partial = {}) => + ({ + relayHash: "0xRelayHash1", + internalHash: "0xInternalHash1", + depositId: "1", + originChainId: "10", + destinationChainId: "42161", + amount: "1000", + depositor: "0xAlice", + recipient: "0xBob", + inputToken: "0xTokenIn", + inputAmount: "1000", + outputToken: "0xTokenOut", + outputAmount: "990", + message: "0x", + exclusiveRelayer: "0xRelayer", + fillDeadline: new Date(), + quoteTimestamp: new Date(), + transactionHash: "0xTxHash1", + transactionIndex: 1, + logIndex: 0, + blockNumber: 100, + finalised: true, + fromLiteChain: false, + toLiteChain: false, + blockTimestamp: new Date("2023-01-01T10:00:00Z"), + ...overrides, + }) as entities.V3FundsDeposited; + +const mockV3Fill = (overrides: Partial = {}) => + ({ + internalHash: "0xInternalHash1", + depositId: "1", + originChainId: "10", + destinationChainId: "42161", + depositor: "0xAlice", + recipient: "0xBob", + inputToken: "0xTokenIn", + inputAmount: "1000", + outputToken: "0xTokenOut", + outputAmount: "990", + message: "0x", + exclusiveRelayer: "0xRelayer", + fillDeadline: new Date(), + updatedRecipient: "0xBob", + updatedMessage: "0x", + updatedOutputAmount: "990", + fillType: 0, + relayer: "0xRelayer", + repaymentChainId: 10, + transactionHash: "0xTxHash2", + transactionIndex: 1, + logIndex: 0, + blockNumber: 200, + finalised: true, + blockTimestamp: new Date("2023-01-01T10:05:00Z"), + ...overrides, + }) as entities.FilledV3Relay; + +const mockOftSent = (overrides: Partial = {}) => + ({ + guid: "0xGuid123", + dstEid: 30101, + fromAddress: "0xFrom", + amountSentLD: "100", + amountReceivedLD: "99", + token: "0xToken", + chainId: "1", + blockHash: "0xHash", + blockNumber: 100, + transactionHash: "0xTx1", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.OFTSent; + +const mockOftReceived = (overrides: Partial = {}) => + ({ + guid: "0xGuid123", + srcEid: 30110, + toAddress: "0xTo", + amountReceivedLD: "99", + token: "0xToken", + chainId: "2", + blockHash: "0xHash2", + blockNumber: 200, + transactionHash: "0xTx2", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.OFTReceived; + +const mockMessageSent = (overrides: Partial = {}) => + ({ + message: "0x", + version: 1, + sourceDomain: 0, + destinationDomain: 2, + nonce: "50", + sender: "0xSender", + recipient: "0xRecipient", + destinationCaller: "0x", + minFinalityThreshold: 1, + finalityThresholdExecuted: 1, + messageBody: "0x", + chainId: "1", + blockNumber: 100, + transactionHash: "0xTxCCTP1", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.MessageSent; + +const mockDepositForBurn = (overrides: Partial = {}) => + ({ + amount: "1000000", + burnToken: "0xUSDC", + mintRecipient: "0xRecipient", + destinationTokenMessenger: "0xMessenger", + destinationCaller: "0xCaller", + destinationDomain: 2, + depositor: "0xDepositor", + hookData: "0x", + chainId: "1", + maxFee: "0", + minFinalityThreshold: "0", + feeCollected: "0", + blockNumber: 100, + transactionHash: "0xTxCCTP1", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.DepositForBurn; + +const mockMessageReceived = ( + overrides: Partial = {}, +) => + ({ + caller: "0xCaller", + sourceDomain: 0, + nonce: "50", + sender: "0xSender", + finalityThresholdExecuted: 1, + messageBody: "0x", + chainId: "42161", + blockNumber: 200, + transactionHash: "0xTxCCTP2", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.MessageReceived; + +const mockMintAndWithdraw = ( + overrides: Partial = {}, +) => + ({ + mintRecipient: "0xMintRecipient", + amount: "1000000", + mintToken: "0xUSDC", + feeCollected: "0", + chainId: "42161", + blockNumber: 200, + transactionHash: "0xTxCCTP2", + transactionIndex: 0, + logIndex: 0, + finalised: true, + blockTimestamp: new Date(), + ...overrides, + }) as entities.MintAndWithdraw; + +// --- Tests --- + +describe("DepositUpdater", () => { + let dataSource: DataSource; + let depositRepo: Repository; + + let v3FundsDepositedFixture: fixtures.GenericFixture; + let filledV3RelayFixture: fixtures.GenericFixture; + let oftSentFixture: fixtures.GenericFixture; + let oftReceivedFixture: fixtures.GenericFixture; + let messageSentFixture: fixtures.GenericFixture; + let depositForBurnFixture: fixtures.GenericFixture; + let messageReceivedFixture: fixtures.GenericFixture; + let mintAndWithdrawFixture: fixtures.GenericFixture; + + beforeEach(async () => { + dataSource = await getTestDataSource(); + depositRepo = dataSource.getRepository(entities.Deposit); + + // Initialize Fixtures + v3FundsDepositedFixture = new fixtures.GenericFixture( + dataSource, + entities.V3FundsDeposited, + ); + filledV3RelayFixture = new fixtures.GenericFixture( + dataSource, + entities.FilledV3Relay, + ); + oftSentFixture = new fixtures.GenericFixture(dataSource, entities.OFTSent); + oftReceivedFixture = new fixtures.GenericFixture( + dataSource, + entities.OFTReceived, + ); + messageSentFixture = new fixtures.GenericFixture( + dataSource, + entities.MessageSent, + ); + depositForBurnFixture = new fixtures.GenericFixture( + dataSource, + entities.DepositForBurn, + ); + messageReceivedFixture = new fixtures.GenericFixture( + dataSource, + entities.MessageReceived, + ); + mintAndWithdrawFixture = new fixtures.GenericFixture( + dataSource, + entities.MintAndWithdraw, + ); + }); + + afterEach(async () => { + if (dataSource && dataSource.isInitialized) { + await dataSource.destroy(); + } + }); + + describe("ACROSS Protocol Updates", () => { + it("should create a new PENDING deposit when only V3FundsDeposited (Source) is processed", async () => { + const [depositEvent] = await v3FundsDepositedFixture.insert([ + mockV3Deposit({ + internalHash: "0xInternalHash1", + depositId: "1", + originChainId: "10", + depositor: "0xAlice", + }), + ]); + assert(depositEvent); + + const request: DepositUpdaterRequestType = { + dataSource, + depositUpdate: { + across: { deposit: depositEvent }, + }, + }; + + await updateDeposits(request); + const savedDeposit = await depositRepo.findOne({ + where: { uniqueId: depositEvent.internalHash }, + }); + + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.PENDING, + type: entities.DepositType.ACROSS, + originChainId: depositEvent.originChainId, + depositor: depositEvent.depositor, + v3FundsDepositedId: depositEvent.id, + filledV3RelayId: null, + }); + }); + + it("should create a new FILLED deposit when only FilledV3Relay (Destination) is processed (Orphan Fill)", async () => { + const [fillEvent] = await filledV3RelayFixture.insert([ + mockV3Fill({ + internalHash: "0xInternalHash2", + destinationChainId: "42161", + }), + ]); + assert(fillEvent); + + const request: DepositUpdaterRequestType = { + dataSource, + depositUpdate: { + across: { fill: fillEvent }, + }, + }; + + await updateDeposits(request); + + const savedDeposit = await depositRepo.findOne({ + where: { uniqueId: fillEvent.internalHash }, + }); + + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.FILLED, + destinationChainId: fillEvent.destinationChainId, + filledV3RelayId: fillEvent.id, + v3FundsDepositedId: null, + type: entities.DepositType.ACROSS, + depositor: fillEvent.depositor, + recipient: fillEvent.recipient, + originChainId: fillEvent.originChainId, + blockTimestamp: fillEvent.blockTimestamp, + }); + }); + + it("should correctly merge: Deposit (Source) processed FIRST, then Fill (Dest)", async () => { + const internalHash = "0xSharedHash"; + + // Process Source + const [depositEvent] = await v3FundsDepositedFixture.insert([ + mockV3Deposit({ internalHash }), + ]); + assert(depositEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { across: { deposit: depositEvent } }, + }); + + let savedDeposit = await depositRepo.findOne({ + where: { uniqueId: internalHash }, + }); + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.PENDING, + }); + + // Process Fill + const [fillEvent] = await filledV3RelayFixture.insert([ + mockV3Fill({ internalHash }), + ]); + assert(fillEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { across: { fill: fillEvent } }, + }); + + // Verify Final State + savedDeposit = await depositRepo.findOne({ + where: { uniqueId: internalHash }, + }); + + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.FILLED, + v3FundsDepositedId: depositEvent.id, + filledV3RelayId: fillEvent.id, + destinationChainId: depositEvent.destinationChainId, + type: entities.DepositType.ACROSS, + depositor: fillEvent.depositor, + recipient: fillEvent.recipient, + originChainId: depositEvent.originChainId, + // The block timestamp is from whatever event was observed first + blockTimestamp: depositEvent.blockTimestamp, + }); + }); + + it("should correctly merge: Fill (Dest) processed FIRST, then Deposit (Source)", async () => { + const internalHash = "0xReverseHash"; + + // Process Fill + const [fillEvent] = await filledV3RelayFixture.insert([ + mockV3Fill({ internalHash }), + ]); + assert(fillEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { across: { fill: fillEvent } }, + }); + + let savedDeposit = await depositRepo.findOne({ + where: { uniqueId: internalHash }, + }); + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.FILLED, + }); + + // Process Source + const [depositEvent] = await v3FundsDepositedFixture.insert([ + mockV3Deposit({ + internalHash, + }), + ]); + assert(depositEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { across: { deposit: depositEvent } }, + }); + + // Verify Final State + savedDeposit = await depositRepo.findOne({ + where: { uniqueId: internalHash }, + }); + + expect(savedDeposit).to.exist; + expect(savedDeposit).to.deep.include({ + status: entities.DepositStatus.FILLED, + v3FundsDepositedId: depositEvent.id, + filledV3RelayId: fillEvent.id, + destinationChainId: depositEvent.destinationChainId, + type: entities.DepositType.ACROSS, + depositor: fillEvent.depositor, + recipient: fillEvent.recipient, + originChainId: depositEvent.originChainId, + // The block timestamp is from whatever event was observed first + blockTimestamp: fillEvent.blockTimestamp, + }); + }); + }); + + describe("OFT Protocol Updates", () => { + const guid = "0xGuid123"; + + it("should merge OFT Sent and Received events correctly", async () => { + // Save OFT Sent + const originEndpointId = 30110; + const destinationEndpointId = 30101; + const [sentEvent] = await oftSentFixture.insert([ + mockOftSent({ + guid, + dstEid: destinationEndpointId, + chainId: getChainIdForEndpointId(originEndpointId).toString(), + }), + ]); + assert(sentEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { oft: { sent: sentEvent } }, + }); + + let deposit = await depositRepo.findOne({ where: { uniqueId: guid } }); + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + status: entities.DepositStatus.PENDING, + type: entities.DepositType.OFT, + depositor: sentEvent.fromAddress, + oftSentId: sentEvent.id, + destinationChainId: getChainIdForEndpointId(destinationEndpointId), + originChainId: sentEvent.chainId, + }); + + // Save OFT Received + const [receivedEvent] = await oftReceivedFixture.insert([ + mockOftReceived({ + guid, + srcEid: originEndpointId, + chainId: getChainIdForEndpointId(destinationEndpointId).toString(), + }), + ]); + assert(receivedEvent); + + await updateDeposits({ + dataSource, + depositUpdate: { oft: { received: receivedEvent } }, + }); + + deposit = await depositRepo.findOne({ where: { uniqueId: guid } }); + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + status: entities.DepositStatus.FILLED, + oftSentId: sentEvent.id, + oftReceivedId: receivedEvent.id, + recipient: receivedEvent.toAddress, + destinationChainId: receivedEvent.chainId, + type: entities.DepositType.OFT, + depositor: sentEvent.fromAddress, + originChainId: sentEvent.chainId, + }); + }); + }); + + describe("CCTP Protocol Updates", () => { + // Note: We use real DB inserts here to respect potential FK constraints, + // even though the service receives objects. + + it("should insert CCTP Burn event", async () => { + const [messageSent] = await messageSentFixture.insert([ + mockMessageSent({ + nonce: "50", + destinationDomain: 2, + chainId: "1", + }), + ]); + assert(messageSent); + const [depositForBurn] = await depositForBurnFixture.insert([ + // DepositForBurn and MessageSent are in the same transaction + mockDepositForBurn({ + transactionHash: messageSent.transactionHash, + mintRecipient: messageSent.recipient, + }), + ]); + assert(depositForBurn); + + await updateDeposits({ + dataSource, + depositUpdate: { + cctp: { + burn: { + messageSent, + depositForBurn, + }, + }, + }, + }); + + // Expected ID logic from handler: nonce-destinationChainId + const destinationChainId = getCctpDestinationChainFromDomain( + messageSent.destinationDomain, + ); + const expectedId = `${messageSent.nonce}-${destinationChainId}`; + const deposit = await depositRepo.findOne({ + where: { uniqueId: expectedId }, + }); + + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + type: entities.DepositType.CCTP, + status: entities.DepositStatus.PENDING, + depositForBurnId: depositForBurn.id, + blockTimestamp: messageSent.blockTimestamp, + depositor: depositForBurn.depositor, + destinationChainId: getCctpDestinationChainFromDomain( + depositForBurn.destinationDomain, + ), + originChainId: depositForBurn.chainId, + recipient: depositForBurn.mintRecipient, + }); + }); + + it("should insert CCTP Mint event and set status to FILLED", async () => { + const [messageReceived] = await messageReceivedFixture.insert([ + mockMessageReceived({ + nonce: "50", + sourceDomain: 0, + chainId: "42161", + }), + ]); + assert(messageReceived); + const [mintAndWithdraw] = await mintAndWithdrawFixture.insert([ + mockMintAndWithdraw({ + mintRecipient: messageReceived.sender, + transactionHash: messageReceived.transactionHash, + }), + ]); + assert(mintAndWithdraw); + + await updateDeposits({ + dataSource, + depositUpdate: { + cctp: { + mint: { + messageReceived, + mintAndWithdraw, + }, + }, + }, + }); + + // Expected ID logic from handler: nonce-destinationChainId + const expectedId = `${messageReceived.nonce}-${mintAndWithdraw.chainId}`; + const deposit = await depositRepo.findOne({ + where: { uniqueId: expectedId }, + }); + + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + type: entities.DepositType.CCTP, + status: entities.DepositStatus.FILLED, + mintAndWithdrawId: mintAndWithdraw.id, + blockTimestamp: messageReceived.blockTimestamp, + // Without the MessageSent event we do not know who the depositor is + depositor: null, + destinationChainId: messageReceived.chainId, + originChainId: getCctpDestinationChainFromDomain( + messageReceived.sourceDomain, + ), + recipient: mintAndWithdraw.mintRecipient, + }); + }); + }); + it("should create a PENDING deposit when only CCTP MessageSent is processed (without DepositForBurn)", async () => { + const [messageSent] = await messageSentFixture.insert([ + mockMessageSent({ + nonce: "60", + destinationDomain: 2, + sourceDomain: 0, + sender: "0xSenderOnly", + }), + ]); + assert(messageSent); + + await updateDeposits({ + dataSource, + depositUpdate: { + cctp: { + burn: { + messageSent, + // depositForBurn is explicitly undefined/missing + }, + }, + }, + }); + + const destinationChainId = getCctpDestinationChainFromDomain( + messageSent.destinationDomain, + ); + const expectedId = `${messageSent.nonce}-${destinationChainId}`; + const deposit = await depositRepo.findOne({ + where: { uniqueId: expectedId }, + }); + + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + type: entities.DepositType.CCTP, + status: entities.DepositStatus.PENDING, + depositForBurnId: null, + blockTimestamp: messageSent.blockTimestamp, + depositor: null, + destinationChainId: destinationChainId, + originChainId: messageSent.chainId, + recipient: messageSent.recipient, + }); + }); + + it("should create a FILLED deposit when only CCTP MessageReceived is processed (without MintAndWithdraw)", async () => { + const [messageReceived] = await messageReceivedFixture.insert([ + mockMessageReceived({ + nonce: "60", + sourceDomain: 0, + chainId: "42161", + sender: "0xSenderOnly", + }), + ]); + assert(messageReceived); + + await updateDeposits({ + dataSource, + depositUpdate: { + cctp: { + mint: { + messageReceived, + // mintAndWithdraw is explicitly undefined/missing + }, + }, + }, + }); + + // Expected ID logic from handler: nonce-destinationChainId + const expectedId = `${messageReceived.nonce}-${messageReceived.chainId}`; + const deposit = await depositRepo.findOne({ + where: { uniqueId: expectedId }, + }); + + expect(deposit).to.exist; + expect(deposit).to.deep.include({ + type: entities.DepositType.CCTP, + status: entities.DepositStatus.FILLED, + mintAndWithdrawId: null, + blockTimestamp: messageReceived.blockTimestamp, + depositor: null, + recipient: null, + originChainId: getCctpDestinationChainFromDomain( + messageReceived.sourceDomain, + ), + destinationChainId: messageReceived.chainId, + }); + }); +}); diff --git a/packages/indexer/src/index.ts b/packages/indexer/src/index.ts index be48cd95..3c9f9a25 100644 --- a/packages/indexer/src/index.ts +++ b/packages/indexer/src/index.ts @@ -5,3 +5,4 @@ export { getChainIdForEndpointId, getCorrespondingTokenAddress, } from "./data-indexing/adapter/oft/service"; +export * from "./database/Deposits";