diff --git a/src/core/mempool/index.ts b/src/core/mempool/index.ts new file mode 100644 index 0000000..4378353 --- /dev/null +++ b/src/core/mempool/index.ts @@ -0,0 +1,31 @@ +import { Mempool } from "@/core/service/mempool/mempool.process.ts"; +import { MEMPOOL_SLOT_CAPACITY } from "@/config/env.ts"; + +/** + * Singleton instance of the Mempool + * Will be initialized during application startup + */ +export let mempool: Mempool; + +/** + * Initializes the mempool singleton instance + * Should be called during application startup + */ +export function initializeMempool(): void { + if (mempool) { + throw new Error("Mempool already initialized"); + } + mempool = new Mempool(MEMPOOL_SLOT_CAPACITY); +} + +/** + * Gets the mempool instance + * Throws if not initialized + */ +export function getMempool(): Mempool { + if (!mempool) { + throw new Error("Mempool not initialized. Call initializeMempool() first."); + } + return mempool; +} + diff --git a/src/core/service/bundle/add-bundle.process.ts b/src/core/service/bundle/add-bundle.process.ts index 31c321a..ba19149 100644 --- a/src/core/service/bundle/add-bundle.process.ts +++ b/src/core/service/bundle/add-bundle.process.ts @@ -3,54 +3,48 @@ import { Buffer } from "buffer"; import type { JwtSessionData } from "@/http/middleware/auth/index.ts"; import { BundleStatus } from "@/persistence/drizzle/entity/operations-bundle.entity.ts"; import { drizzleClient } from "@/persistence/drizzle/config.ts"; -import { CHANNEL_CLIENT } from "@/core/channel-client/index.ts"; -import { TX_CONFIG, NETWORK_RPC_SERVER, OPEX_SK } from "@/config/env.ts"; -import { - ChannelInvokeMethods, - MoonlightOperation, - MoonlightTransactionBuilder, - type OperationTypes, - UtxoBasedStellarAccount, - UTXOStatus, -} from "@moonlight/moonlight-sdk"; import { LOG } from "@/config/logger.ts"; import type { requestSchema } from "@/http/v1/bundle/post.ts"; import type { PostEndpointInput } from "@/http/pipelines/types.ts"; -import type { SIM_ERRORS } from "@colibri/core"; +import type { OperationTypes } from "@moonlight/moonlight-sdk"; +import { MoonlightOperation } from "@moonlight/moonlight-sdk"; import { classifyOperations, calculateOperationAmounts, calculateFee, generateBundleId, calculateBundleTtl, + calculateBundleWeight, + calculatePriorityScore, } from "@/core/service/bundle/bundle.service.ts"; +import type { SlotBundle, WeightConfig } from "@/core/service/bundle/bundle.types.ts"; +import { MEMPOOL_EXPENSIVE_OP_WEIGHT, MEMPOOL_CHEAP_OP_WEIGHT } from "@/config/env.ts"; +import { getMempool } from "@/core/mempool/index.ts"; import * as E from "@/core/service/bundle/bundle.errors.ts"; import type { ClassifiedOperations } from "@/core/service/bundle/bundle.types.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import type { OperationsBundle } from "@/persistence/drizzle/entity/operations-bundle.entity.ts"; -import { TransactionStatus } from "@/persistence/drizzle/entity/transaction.entity.ts"; import { OperationsBundleRepository, SessionRepository, UtxoRepository, - TransactionRepository, - BundleTransactionRepository, } from "@/persistence/drizzle/repository/index.ts"; // Configuration constants const BUNDLE_CONFIG = { TTL_HOURS: 24, - OPEX_UTXO_BATCH_SIZE: 200, - REQUIRED_OPEX_UTXOS: 1, - TRANSACTION_EXPIRATION_OFFSET: 1000, } as const; // Repositories const sessionRepository = new SessionRepository(drizzleClient); const utxoRepository = new UtxoRepository(drizzleClient); const operationsBundleRepository = new OperationsBundleRepository(drizzleClient); -const transactionRepository = new TransactionRepository(); -const bundleTransactionRepository = new BundleTransactionRepository(); + +// Mempool configuration +const MEMPOOL_WEIGHT_CONFIG: WeightConfig = { + expensiveOpWeight: MEMPOOL_EXPENSIVE_OP_WEIGHT, + cheapOpWeight: MEMPOOL_CHEAP_OP_WEIGHT, +} as const; // ========== HELPER FUNCTIONS ========== @@ -108,26 +102,6 @@ function validateSpendOperations(operations: OperationTypes.SpendOperation[]): v } } -/** - * Ensures OPEX account has enough free UTXOs available - */ -async function ensureOpexUtxosAvailable( - opexHandler: UtxoBasedStellarAccount, - requiredCount: number -): Promise { - while (opexHandler.getUTXOsByState(UTXOStatus.FREE).length < requiredCount + 1) { - LOG.trace("Deriving UTXOs batch for OPEX account"); - await opexHandler.deriveBatch({}); - LOG.trace("Loading UTXOs batch for OPEX account"); - await opexHandler.batchLoad(); - - LOG.trace(`Derived UTXOS: ${opexHandler.getAllUTXOs().length}`); - LOG.trace(`Free UTXOS: ${opexHandler.getUTXOsByState(UTXOStatus.FREE).length}`); - LOG.trace(`SPENT: ${opexHandler.getUTXOsByState(UTXOStatus.SPENT).length}`); - LOG.trace(`UNSPENT: ${opexHandler.getUTXOsByState(UTXOStatus.UNSPENT).length}`); - LOG.trace(`UNLOADED: ${opexHandler.getUTXOsByState(UTXOStatus.UNLOADED).length}`); - } -} /** * Persists UTXOs in the database from create operations @@ -182,57 +156,29 @@ async function persistSpendOperations( } /** - * Adds operations to the transaction builder + * Creates a SlotBundle from bundle data */ -function addOperationsToTransaction( - txBuilder: MoonlightTransactionBuilder, +function createSlotBundle( + bundleEntity: OperationsBundle, classified: ClassifiedOperations -): void { - classified.deposit.forEach((op) => { - txBuilder.addOperation(op); - }); - - classified.create.forEach((op) => { - txBuilder.addOperation(op); - }); - - classified.spend.forEach((op) => { - txBuilder.addOperation(op); +): SlotBundle { + const weight = calculateBundleWeight(classified, MEMPOOL_WEIGHT_CONFIG); + const priorityScore = calculatePriorityScore({ + fee: bundleEntity.fee, + ttl: bundleEntity.ttl, + createdAt: bundleEntity.createdAt, }); -} -/** - * Gets transaction expiration from latest ledger - */ -async function getTransactionExpiration(): Promise { - const latestLedger = await NETWORK_RPC_SERVER.getLatestLedger(); - return latestLedger.sequence + BUNDLE_CONFIG.TRANSACTION_EXPIRATION_OFFSET; -} - -/** - * Submits transaction to channel contract - */ -async function submitTransaction( - txBuilder: MoonlightTransactionBuilder, - expiration: number -): Promise { - await txBuilder.signWithProvider(TX_CONFIG.signers[1], expiration); - - try { - const { hash } = await CHANNEL_CLIENT.invokeRaw({ - operationArgs: { - function: ChannelInvokeMethods.transact, - args: [txBuilder.buildXDR()], - auth: [...txBuilder.getSignedAuthEntries()], - }, - config: TX_CONFIG, - }); - - return hash.toString(); - } catch (error) { - LOG.error("Simulation failed: ", (error as SIM_ERRORS.SIMULATION_FAILED).meta.data.input.transaction.toXDR()); - logAndThrow(new E.INVALID_OPERATIONS("Simulation failed")); - } + return { + bundleId: bundleEntity.id, + operationsMLXDR: bundleEntity.operationsMLXDR, + operations: classified, + fee: bundleEntity.fee, + weight, + ttl: bundleEntity.ttl, + createdAt: bundleEntity.createdAt, + priorityScore, + }; } // ========== MAIN PROCESS ========== @@ -294,72 +240,20 @@ export const P_AddOperationsBundle = ProcessEngine.create( LOG.warn("This bundle doesn't have any fee"); } - // 6. Setup transaction builder and OPEX handler - const txBuilder = MoonlightTransactionBuilder.fromPrivacyChannel(CHANNEL_CLIENT); - const opexHandler = UtxoBasedStellarAccount.fromPrivacyChannel({ - channelClient: CHANNEL_CLIENT, - root: OPEX_SK, - options: { - batchSize: BUNDLE_CONFIG.OPEX_UTXO_BATCH_SIZE, - }, - }); - - // 7. OPEX UTXO management - await ensureOpexUtxosAvailable(opexHandler, BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS); - const reservedUtxos = opexHandler.reserveUTXOs(BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS); - - if (!reservedUtxos) { - const availableCount = opexHandler.getUTXOsByState(UTXOStatus.FREE).length; - logAndThrow(new E.INSUFFICIENT_UTXOS(BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS, availableCount)); - } - - // 8. Create fee operation - const feeOperation = MoonlightOperation.create( - reservedUtxos[0].publicKey, - feeCalculation.fee - ); - txBuilder.addOperation(feeOperation); - LOG.debug("Fee operation created", { mlxdr: feeOperation.toMLXDR() }); - - // 9. Get expiration and add operations to transaction - const expiration = await getTransactionExpiration(); - addOperationsToTransaction(txBuilder, classified); - - // 10. Persist UTXOs + // 6. Persist UTXOs await persistCreateOperations(classified.create, bundleEntity.id, userSession.accountId); await persistSpendOperations(classified.spend, bundleEntity.id, userSession.accountId); - // 11. Submit transaction - const transactionHash = await submitTransaction(txBuilder, expiration); + // 7. Create SlotBundle and add to Mempool + const slotBundle = createSlotBundle(bundleEntity, classified); + const mempool = getMempool(); + await mempool.addBundle(slotBundle); - // 12. Persist the transaction and vinculate it to the bundle - await transactionRepository.create({ - id: transactionHash, - status: TransactionStatus.VERIFIED, - timeout: new Date(Date.now() + BUNDLE_CONFIG.TRANSACTION_EXPIRATION_OFFSET), - ledgerSequence: (await (NETWORK_RPC_SERVER.getLatestLedger())).sequence.toString(), - createdAt: new Date(), - createdBy: userSession.accountId, - }); - - await bundleTransactionRepository.create({ - transactionId: transactionHash, - bundleId: bundleEntity.id, - createdAt: new Date(), - createdBy: userSession.accountId, - }); - - // 13. Update bundle status - await operationsBundleRepository.update(bundleEntity.id, { - status: BundleStatus.COMPLETED, - updatedAt: new Date(), - updatedBy: userSession.accountId, - }); + LOG.info(`Bundle ${bundleEntity.id} added to mempool for asynchronous processing`); return { ctx: input.ctx, operationsBundleId: bundleEntity.id, - transactionHash, }; }, { diff --git a/src/http/v1/bundle/post.ts b/src/http/v1/bundle/post.ts index ac5654e..00afc75 100644 --- a/src/http/v1/bundle/post.ts +++ b/src/http/v1/bundle/post.ts @@ -11,21 +11,20 @@ export const requestSchema = z.object({ export const responseSchema = z.object({ operationsBundleId: z.string(), - transactionHash: z.string(), + status: z.string(), }); type BundleProcessOutput = { ctx: Context; operationsBundleId: string; - transactionHash: string; }; const assembleResponse = ( input: BundleProcessOutput ): PostEndpointOutput => { - const message = "Bundle successfully processed"; + const message = "Bundle received and queued for processing"; - LOG.info(message); + LOG.info(message, { bundleId: input.operationsBundleId }); return { ctx: input.ctx, @@ -33,7 +32,7 @@ const assembleResponse = ( message, data: { operationsBundleId: input.operationsBundleId, - transactionHash: input.transactionHash, + status: "PENDING", }, }; };