Skip to content

Comments

feat: Implement the Mempool engine foundation#12

Open
toruguera wants to merge 1 commit intodevfrom
feat/mempool-engine
Open

feat: Implement the Mempool engine foundation#12
toruguera wants to merge 1 commit intodevfrom
feat/mempool-engine

Conversation

@toruguera
Copy link

Asynchronous Mempool System Implementation - Phases 1-3

Summary

Initial implementation of an asynchronous mempool system for processing bundles in a queue, with slot-based grouping and priority management.

Implemented Changes

Phase 1: Preparation and Base Structure

  • Added FAILED status to TransactionStatus enum
  • Created migration 0004_add_failed_status_to_transaction.sql
  • Updated database schema documentation
  • Added Mempool environment variables:
    • MEMPOOL_SLOT_CAPACITY
    • MEMPOOL_EXPENSIVE_OP_WEIGHT
    • MEMPOOL_CHEAP_OP_WEIGHT
    • MEMPOOL_EXECUTOR_INTERVAL_MS
    • MEMPOOL_VERIFIER_INTERVAL_MS
    • MEMPOOL_TTL_CHECK_INTERVAL_MS

Phase 2: Weight and Priority Calculation

  • Added calculateBundleWeight() function to calculate weight based on operation types
  • Added calculatePriorityScore() function with criteria:
    1. Fee (60%)
    2. Created at (30%)
    3. TTL (10%)
  • Created types: SlotBundle, WeightConfig, PriorityScore

Phase 3: Mempool Service

  • Implemented Slot class to manage bundles within a slot
    • Insertion logic with priority-based replacement
    • Capacity and weight verification
    • Bundle removal
  • Implemented Mempool class to manage transaction queue
    • Initialization from database (PENDING/PROCESSING bundles)
    • Bundle addition with automatic slot distribution
    • Automatic bundle expiration (TTL verification)
    • Queue statistics
  • Created pure functions in mempool.service.ts:
    • calculateSlotWeight()
    • canBundleFitInSlot()
    • compareBundlePriority()
    • isBundleExpired()
    • findLowestPriorityBundle()
  • Added findPendingOrProcessing() method to OperationsBundleRepository

Technical Notes

  • Mempool is ephemeral (in-memory) and rebuilt on initialization from the database
  • Slots are created dynamically as needed
  • Expired bundles are automatically removed and marked as EXPIRED
  • Priority system allows replacement of lower priority bundles in full slots

…acity to send transactions with the moonlight bundle operations to the Stellar network
@toruguera toruguera requested a review from fazzatti January 9, 2026 13:53
@toruguera toruguera self-assigned this Jan 9, 2026
const feeScore = Number(bundle.fee) / 1_000_000; // Normalize by dividing by 1M

// Age: older bundles get higher priority
// Calculate age in hours, normalize (older = higher score)
Copy link

@willemneal willemneal Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the decision to use hours? Won't this mean that a lot within the same hour will have equal priority?

/**
* Error thrown when a bundle is not found in the mempool
*/
export class BUNDLE_NOT_FOUND extends PlatformError<{ bundleId: string }> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised by all caps? Is that part of the project's style?

async function parseOperationsFromBundle(
operationsMLXDR: string[]
): Promise<Array<OperationTypes.CreateOperation | OperationTypes.SpendOperation | OperationTypes.DepositOperation | OperationTypes.WithdrawOperation>> {
const { MoonlightOperation } = await import("@moonlight/moonlight-sdk");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while and I've totally forgotten all the different JS import types. Is there a reason that we need to import here vs at the top like all the rest of the imports?

const operations = await Promise.all(
operationsMLXDR.map((xdr) => MoonlightOperation.fromMLXDR(xdr))
);
return operations as Array<OperationTypes.CreateOperation | OperationTypes.SpendOperation | OperationTypes.DepositOperation | OperationTypes.WithdrawOperation>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these all possible Operation types? Could this be Array<OperationTypes>? Otherwise seems like it should be a type alias for better readability.

Comment on lines +115 to +132
const lowestPriority = findLowestPriorityBundle({
bundles: this.bundles,
currentWeight: this.currentWeight,
capacity: this.capacity,
});

if (lowestPriority && bundle.priorityScore > lowestPriority.priorityScore) {
// Replace the lowest priority bundle
const removedIndex = this.bundles.indexOf(lowestPriority);
this.bundles.splice(removedIndex, 1);
this.currentWeight -= lowestPriority.weight;

this.bundles.push(bundle);
this.bundles.sort(compareBundlePriority);
this.currentWeight += bundle.weight;

return lowestPriority;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Given that the array is sorted by priority would this not always be the last element?
  2. Shouldn't you still check if this.currentWeight - lowestPriority.weight + bundle.weight <= this.capacity?

* Removes a specific bundle by bundleId
* Returns true if bundle was found and removed, false otherwise
*/
removeBundle(bundleId: string): boolean {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to still see this in the context where it is used but wouldn't you want to return the bundle instead of just deleting it?


// If bundle still needs to be added, create a new slot
if (bundleToAdd) {
const newSlot = new Slot(this.capacity);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly there is no limit on the number of slots. So If the capacity of the mempool is greater than the new bundle it will be guaranteed to be added?

Also then shouldn't this check be done at the beginning of the method since that would mean that it couldn't fit in any of the current slots?

private removeBundleFromSlot(slot: Slot, bundleId: string): void {
const removed = slot.removeBundle(bundleId);
if (!removed) {
LOG.warn(`Bundle ${bundleId} not found in slot for removal`);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not an error?

* Mempool service for managing transaction queue with slots
*/
export class Mempool {
private slots: Slot[] = [];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bundles within slots are sorted by priority. However, it seems to me that when adding a new bundle it is placed in the first slot with remaining capacity regardless of its priority. Is this the correct design or should the slots themselves be ordered by priority as well?

If it is the latter then I think it might be worth looking into a different datastructure where all bundles are ordered by priority and then when picking the next "slot" is just removing the first x bundles the weights of which add up to the capacity. This way if you have a new bundle with a high priority which would go into the first slot, you wouldn't need to potentially update all the multiple slots if one gets kick out of the first.

this.slots.push(newSlot);
LOG.debug(`Bundle ${bundleData.bundleId} added to new slot`);
} else {
// Even new slot can't fit (shouldn't happen if capacity is reasonable)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this capacity statically known? Could it be a constraint on bundles such that they could never have a capacity above the limit? Or is this what you are hoping to increase overtime and is currently limited by the contract execution costs?

} else {
// Even new slot can't fit (shouldn't happen if capacity is reasonable)
LOG.error(`Bundle ${bundleData.bundleId} cannot fit in any slot, weight: ${bundleData.weight}, capacity: ${this.capacity}`);
throw new E.SLOT_FULL(bundleData.weight, this.capacity);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the incorrect error since really it is that the capacity of the bundle is bigger than the max capacity of any slot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants