Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/core/mempool/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Mempool } from "@/core/service/mempool/mempool.process.ts";
import { MEMPOOL_SLOT_CAPACITY } from "@/config/env.ts";

/**
* Singleton instance of the Mempool
* Will be initialized during application startup
*/
export let mempool: Mempool;

/**
* Initializes the mempool singleton instance
* Should be called during application startup
*/
export function initializeMempool(): void {
if (mempool) {
throw new Error("Mempool already initialized");
}
mempool = new Mempool(MEMPOOL_SLOT_CAPACITY);
}

/**
* Gets the mempool instance
* Throws if not initialized
*/
export function getMempool(): Mempool {
if (!mempool) {
throw new Error("Mempool not initialized. Call initializeMempool() first.");
}
return mempool;
}

182 changes: 38 additions & 144 deletions src/core/service/bundle/add-bundle.process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,48 @@ import { Buffer } from "buffer";
import type { JwtSessionData } from "@/http/middleware/auth/index.ts";
import { BundleStatus } from "@/persistence/drizzle/entity/operations-bundle.entity.ts";
import { drizzleClient } from "@/persistence/drizzle/config.ts";
import { CHANNEL_CLIENT } from "@/core/channel-client/index.ts";
import { TX_CONFIG, NETWORK_RPC_SERVER, OPEX_SK } from "@/config/env.ts";
import {
ChannelInvokeMethods,
MoonlightOperation,
MoonlightTransactionBuilder,
type OperationTypes,
UtxoBasedStellarAccount,
UTXOStatus,
} from "@moonlight/moonlight-sdk";
import { LOG } from "@/config/logger.ts";
import type { requestSchema } from "@/http/v1/bundle/post.ts";
import type { PostEndpointInput } from "@/http/pipelines/types.ts";
import type { SIM_ERRORS } from "@colibri/core";
import type { OperationTypes } from "@moonlight/moonlight-sdk";
import { MoonlightOperation } from "@moonlight/moonlight-sdk";
import {
classifyOperations,
calculateOperationAmounts,
calculateFee,
generateBundleId,
calculateBundleTtl,
calculateBundleWeight,
calculatePriorityScore,
} from "@/core/service/bundle/bundle.service.ts";
import type { SlotBundle, WeightConfig } from "@/core/service/bundle/bundle.types.ts";
import { MEMPOOL_EXPENSIVE_OP_WEIGHT, MEMPOOL_CHEAP_OP_WEIGHT } from "@/config/env.ts";
import { getMempool } from "@/core/mempool/index.ts";
import * as E from "@/core/service/bundle/bundle.errors.ts";
import type { ClassifiedOperations } from "@/core/service/bundle/bundle.types.ts";
import { logAndThrow } from "@/utils/error/log-and-throw.ts";
import type { OperationsBundle } from "@/persistence/drizzle/entity/operations-bundle.entity.ts";
import { TransactionStatus } from "@/persistence/drizzle/entity/transaction.entity.ts";
import {
OperationsBundleRepository,
SessionRepository,
UtxoRepository,
TransactionRepository,
BundleTransactionRepository,
} from "@/persistence/drizzle/repository/index.ts";

// Configuration constants
const BUNDLE_CONFIG = {
TTL_HOURS: 24,
OPEX_UTXO_BATCH_SIZE: 200,
REQUIRED_OPEX_UTXOS: 1,
TRANSACTION_EXPIRATION_OFFSET: 1000,
} as const;

// Repositories
const sessionRepository = new SessionRepository(drizzleClient);
const utxoRepository = new UtxoRepository(drizzleClient);
const operationsBundleRepository = new OperationsBundleRepository(drizzleClient);
const transactionRepository = new TransactionRepository();
const bundleTransactionRepository = new BundleTransactionRepository();

// Mempool configuration
const MEMPOOL_WEIGHT_CONFIG: WeightConfig = {
expensiveOpWeight: MEMPOOL_EXPENSIVE_OP_WEIGHT,
cheapOpWeight: MEMPOOL_CHEAP_OP_WEIGHT,
} as const;

// ========== HELPER FUNCTIONS ==========

Expand Down Expand Up @@ -108,26 +102,6 @@ function validateSpendOperations(operations: OperationTypes.SpendOperation[]): v
}
}

/**
* Ensures OPEX account has enough free UTXOs available
*/
async function ensureOpexUtxosAvailable(
opexHandler: UtxoBasedStellarAccount,
requiredCount: number
): Promise<void> {
while (opexHandler.getUTXOsByState(UTXOStatus.FREE).length < requiredCount + 1) {
LOG.trace("Deriving UTXOs batch for OPEX account");
await opexHandler.deriveBatch({});
LOG.trace("Loading UTXOs batch for OPEX account");
await opexHandler.batchLoad();

LOG.trace(`Derived UTXOS: ${opexHandler.getAllUTXOs().length}`);
LOG.trace(`Free UTXOS: ${opexHandler.getUTXOsByState(UTXOStatus.FREE).length}`);
LOG.trace(`SPENT: ${opexHandler.getUTXOsByState(UTXOStatus.SPENT).length}`);
LOG.trace(`UNSPENT: ${opexHandler.getUTXOsByState(UTXOStatus.UNSPENT).length}`);
LOG.trace(`UNLOADED: ${opexHandler.getUTXOsByState(UTXOStatus.UNLOADED).length}`);
}
}

/**
* Persists UTXOs in the database from create operations
Expand Down Expand Up @@ -182,57 +156,29 @@ async function persistSpendOperations(
}

/**
* Adds operations to the transaction builder
* Creates a SlotBundle from bundle data
*/
function addOperationsToTransaction(
txBuilder: MoonlightTransactionBuilder,
function createSlotBundle(
bundleEntity: OperationsBundle,
classified: ClassifiedOperations
): void {
classified.deposit.forEach((op) => {
txBuilder.addOperation(op);
});

classified.create.forEach((op) => {
txBuilder.addOperation(op);
});

classified.spend.forEach((op) => {
txBuilder.addOperation(op);
): SlotBundle {
const weight = calculateBundleWeight(classified, MEMPOOL_WEIGHT_CONFIG);
const priorityScore = calculatePriorityScore({
fee: bundleEntity.fee,
ttl: bundleEntity.ttl,
createdAt: bundleEntity.createdAt,
});
}

/**
* Gets transaction expiration from latest ledger
*/
async function getTransactionExpiration(): Promise<number> {
const latestLedger = await NETWORK_RPC_SERVER.getLatestLedger();
return latestLedger.sequence + BUNDLE_CONFIG.TRANSACTION_EXPIRATION_OFFSET;
}

/**
* Submits transaction to channel contract
*/
async function submitTransaction(
txBuilder: MoonlightTransactionBuilder,
expiration: number
): Promise<string> {
await txBuilder.signWithProvider(TX_CONFIG.signers[1], expiration);

try {
const { hash } = await CHANNEL_CLIENT.invokeRaw({
operationArgs: {
function: ChannelInvokeMethods.transact,
args: [txBuilder.buildXDR()],
auth: [...txBuilder.getSignedAuthEntries()],
},
config: TX_CONFIG,
});

return hash.toString();
} catch (error) {
LOG.error("Simulation failed: ", (error as SIM_ERRORS.SIMULATION_FAILED).meta.data.input.transaction.toXDR());
logAndThrow(new E.INVALID_OPERATIONS("Simulation failed"));
}
return {
bundleId: bundleEntity.id,
operationsMLXDR: bundleEntity.operationsMLXDR,
operations: classified,
fee: bundleEntity.fee,
weight,
ttl: bundleEntity.ttl,
createdAt: bundleEntity.createdAt,
priorityScore,
};
}

// ========== MAIN PROCESS ==========
Expand Down Expand Up @@ -294,72 +240,20 @@ export const P_AddOperationsBundle = ProcessEngine.create(
LOG.warn("This bundle doesn't have any fee");
}

// 6. Setup transaction builder and OPEX handler
const txBuilder = MoonlightTransactionBuilder.fromPrivacyChannel(CHANNEL_CLIENT);
const opexHandler = UtxoBasedStellarAccount.fromPrivacyChannel({
channelClient: CHANNEL_CLIENT,
root: OPEX_SK,
options: {
batchSize: BUNDLE_CONFIG.OPEX_UTXO_BATCH_SIZE,
},
});

// 7. OPEX UTXO management
await ensureOpexUtxosAvailable(opexHandler, BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS);
const reservedUtxos = opexHandler.reserveUTXOs(BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS);

if (!reservedUtxos) {
const availableCount = opexHandler.getUTXOsByState(UTXOStatus.FREE).length;
logAndThrow(new E.INSUFFICIENT_UTXOS(BUNDLE_CONFIG.REQUIRED_OPEX_UTXOS, availableCount));
}

// 8. Create fee operation
const feeOperation = MoonlightOperation.create(
reservedUtxos[0].publicKey,
feeCalculation.fee
);
txBuilder.addOperation(feeOperation);
LOG.debug("Fee operation created", { mlxdr: feeOperation.toMLXDR() });

// 9. Get expiration and add operations to transaction
const expiration = await getTransactionExpiration();
addOperationsToTransaction(txBuilder, classified);

// 10. Persist UTXOs
// 6. Persist UTXOs
await persistCreateOperations(classified.create, bundleEntity.id, userSession.accountId);
await persistSpendOperations(classified.spend, bundleEntity.id, userSession.accountId);

// 11. Submit transaction
const transactionHash = await submitTransaction(txBuilder, expiration);
// 7. Create SlotBundle and add to Mempool
const slotBundle = createSlotBundle(bundleEntity, classified);
const mempool = getMempool();
await mempool.addBundle(slotBundle);

// 12. Persist the transaction and vinculate it to the bundle
await transactionRepository.create({
id: transactionHash,
status: TransactionStatus.VERIFIED,
timeout: new Date(Date.now() + BUNDLE_CONFIG.TRANSACTION_EXPIRATION_OFFSET),
ledgerSequence: (await (NETWORK_RPC_SERVER.getLatestLedger())).sequence.toString(),
createdAt: new Date(),
createdBy: userSession.accountId,
});

await bundleTransactionRepository.create({
transactionId: transactionHash,
bundleId: bundleEntity.id,
createdAt: new Date(),
createdBy: userSession.accountId,
});

// 13. Update bundle status
await operationsBundleRepository.update(bundleEntity.id, {
status: BundleStatus.COMPLETED,
updatedAt: new Date(),
updatedBy: userSession.accountId,
});
LOG.info(`Bundle ${bundleEntity.id} added to mempool for asynchronous processing`);

return {
ctx: input.ctx,
operationsBundleId: bundleEntity.id,
transactionHash,
};
},
{
Expand Down
9 changes: 4 additions & 5 deletions src/http/v1/bundle/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,28 @@ export const requestSchema = z.object({

export const responseSchema = z.object({
operationsBundleId: z.string(),
transactionHash: z.string(),
status: z.string(),
});

type BundleProcessOutput = {
ctx: Context;
operationsBundleId: string;
transactionHash: string;
};

const assembleResponse = (
input: BundleProcessOutput
): PostEndpointOutput<typeof responseSchema> => {
const message = "Bundle successfully processed";
const message = "Bundle received and queued for processing";

LOG.info(message);
LOG.info(message, { bundleId: input.operationsBundleId });

return {
ctx: input.ctx,
status: Status.OK,
message,
data: {
operationsBundleId: input.operationsBundleId,
transactionHash: input.transactionHash,
status: "PENDING",
},
};
};
Expand Down