From cef319a149f4ab78690f2edb0dff7c80c3164e1f Mon Sep 17 00:00:00 2001 From: "codex gpt5.2codex" <1900017707@pku.edu.cn> Date: Fri, 26 Dec 2025 14:40:11 +0000 Subject: [PATCH 1/3] optimistic apply: preexec pool + batch commit --- src/bootstrap/rpcbind.d.ts | 22 + src/bootstrap/rpcbind.js | 263 ++++++++- src/preexec/preexec_worker.ts | 162 ++++++ src/service.ts | 987 +++++++++++++++++++++++++++++----- 4 files changed, 1295 insertions(+), 139 deletions(-) create mode 100644 src/bootstrap/rpcbind.d.ts create mode 100644 src/preexec/preexec_worker.ts diff --git a/src/bootstrap/rpcbind.d.ts b/src/bootstrap/rpcbind.d.ts new file mode 100644 index 0000000..7416fe7 --- /dev/null +++ b/src/bootstrap/rpcbind.d.ts @@ -0,0 +1,22 @@ +export function get_leaf(root: Uint8Array | number[], index: bigint): number[]; +export function update_leaf( + root: Uint8Array | number[], + index: bigint, + data: Uint8Array | number[], +): number[]; + +export function get_record(hash: Uint8Array | number[]): bigint[]; +export function update_record(hash: Uint8Array | number[], data: bigint[]): unknown; + +export function apply_txs( + root: Uint8Array | number[], + txs: Array<{ + writes: Array<{ index: string; data: Uint8Array | number[] }>; + updateRecords: Array<{ hash: Uint8Array | number[]; data: string[] }>; + }>, +): number[][]; + +export function begin_session(): string; +export function drop_session(session: string): boolean; +export function reset_session(session: string): boolean; +export function commit_session(session: string): { merkle_records: number; data_records: number }; diff --git a/src/bootstrap/rpcbind.js b/src/bootstrap/rpcbind.js index c870eb7..69ad4d3 100644 --- a/src/bootstrap/rpcbind.js +++ b/src/bootstrap/rpcbind.js @@ -1,7 +1,7 @@ -//import fetch from 'sync-fetch'; -import requestMerkleData from './syncrpc.cjs'; +import requestMerkleData from './syncrpc.cjs'; let url = 'http://127.0.0.1:3030'; import dotenv from 'dotenv'; +import { createRequire } from 'node:module'; dotenv.config(); // Load environment variables from .env file @@ -11,6 +11,87 @@ if (process.env.MERKLE_SERVER) { } console.log("rpc bind merkle server:", url); +const MERKLE_RPC_MODE = process.env.MERKLE_RPC_MODE ?? 'syncproc'; +const require = createRequire(import.meta.url); +let syncFetch; + +function requestMerkle(requestData) { + if (MERKLE_RPC_MODE === 'mock') { + let result; + switch (requestData?.method) { + case 'get_leaf': + result = { leaf: requestData?.params?.index }; + break; + case 'get_record': + result = ['7', '8']; + break; + default: + result = { ok: true }; + } + return JSON.stringify({ jsonrpc: '2.0', id: requestData?.id, result }); + } + if (MERKLE_RPC_MODE === 'http') { + if (!syncFetch) { + syncFetch = require('sync-fetch'); + } + const resp = syncFetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(requestData), + }); + if (!resp.ok) { + throw new Error(`merkle rpc failed: ${resp.status}`); + } + return resp.text(); + } + return requestMerkleData(requestData); +} + +function getMerkleTrace() { + const trace = globalThis.__MERKLE_TRACE; + if (!trace || typeof trace !== 'object') return null; + return trace; +} + +function getMerkleSession() { + const session = globalThis.__MERKLE_SESSION; + if (typeof session !== 'string' || session.length === 0) return null; + return session; +} + +function traceOnlyWritesEnabled() { + return globalThis.__MERKLE_TRACE_ONLY_WRITES === true; +} + +function getMerkleOverlay() { + const overlay = globalThis.__MERKLE_TRACE_OVERLAY; + if (!overlay || typeof overlay !== 'object') return null; + return overlay; +} + +function overlayGet(overlay, kind, key) { + if (!overlay) return undefined; + const store = overlay[kind]; + if (!store) return undefined; + if (store instanceof Map) return store.get(key); + return store[key]; +} + +function overlaySet(overlay, kind, key, value) { + if (!overlay) return; + const store = overlay[kind]; + if (store instanceof Map) { + store.set(key, value); + } else if (store && typeof store === 'object') { + store[key] = value; + } +} + +function withSession(params) { + const session = getMerkleSession(); + if (!session) return params; + return { ...params, session }; +} function hash2array(hash) { const hasharray = []; @@ -33,11 +114,11 @@ function async_get_leaf(root, index) { const requestData = { jsonrpc: '2.0', method: 'get_leaf', - params: {root: roothash, index: index.toString()}, + params: withSession({root: roothash, index: index.toString()}), id: 1 }; //console.log("get leaf", root); - let data = requestMerkleData(requestData); + let data = requestMerkle(requestData); const response = JSON.parse(data); if (response.error==undefined) { //console.log(jsonResponse); @@ -49,11 +130,27 @@ function async_get_leaf(root, index) { } export function get_leaf(root, index) { + const overlay = getMerkleOverlay(); + const indexKey = index.toString(); + const overlayLeaf = overlayGet(overlay, "leaves", indexKey); + if (overlayLeaf !== undefined) { + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.reads)) trace.reads = []; + trace.reads.push(indexKey); + } + return overlayLeaf; + } const start = performance.now(); let data = async_get_leaf(root, index); const end = performance.now(); let lag = end - start; //console.log("bench-log: get_leaf", lag); + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.reads)) trace.reads = []; + trace.reads.push(index.toString()); + } return data; } @@ -63,11 +160,11 @@ function async_update_leaf(root, index, data) { const requestData = { jsonrpc: '2.0', method: 'update_leaf', - params: {root: roothash, index: index.toString(), data: datahash}, + params: withSession({root: roothash, index: index.toString(), data: datahash}), id: 2 }; //console.log("get leaf", root); - let responseStr = requestMerkleData(requestData); + let responseStr = requestMerkle(requestData); const response = JSON.parse(responseStr); if (response.error==undefined) { //console.log(jsonResponse); @@ -80,10 +177,25 @@ function async_update_leaf(root, index, data) { } export function update_leaf(root, index, data) { const start = performance.now(); - let r = async_update_leaf(root, index, data); + let r; + if (traceOnlyWritesEnabled()) { + const overlay = getMerkleOverlay(); + overlaySet(overlay, "leaves", index.toString(), hash2array(data)); + r = hash2array(root); + } else { + r = async_update_leaf(root, index, data); + } const end = performance.now(); let lag = end - start; //console.log("bench-log: update_leaf", lag); + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.writes)) trace.writes = []; + trace.writes.push({ + index: index.toString(), + data: hash2array(data), + }); + } return r; } @@ -93,10 +205,10 @@ function async_update_record(hash, data) { const requestData = { jsonrpc: '2.0', method: 'update_record', - params: {hash: roothash, data: datavec}, + params: withSession({hash: roothash, data: datavec}), id: 3 }; - let responseStr = requestMerkleData(requestData); + let responseStr = requestMerkle(requestData); const response = JSON.parse(responseStr); if (response.error==undefined) { return response.result; @@ -109,10 +221,52 @@ function async_update_record(hash, data) { export function update_record(hash, data) { const start = performance.now(); - let r = async_update_record(hash, data); + let r; + if (traceOnlyWritesEnabled()) { + const overlay = getMerkleOverlay(); + overlaySet(overlay, "records", hash2array(hash).join(","), data); + r = true; + } else { + r = async_update_record(hash, data); + } const end = performance.now(); let lag = end - start; //console.log("bench-log: update_record", lag); + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.updateRecords)) trace.updateRecords = []; + trace.updateRecords.push({ + hash: hash2array(hash), + data: bigintArray2array(data), + }); + } + return r; +} + +function async_apply_txs(root, txs) { + let roothash = hash2array(root); + const requestData = { + jsonrpc: '2.0', + method: 'apply_txs', + params: withSession({ root: roothash, txs }), + id: 9 + }; + let responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error==undefined) { + return response.result; + } else { + console.error('Failed to apply_txs:', response.error); + throw("Failed to apply_txs"); + } +} + +export function apply_txs(root, txs) { + const start = performance.now(); + let r = async_apply_txs(root, txs); + const end = performance.now(); + let lag = end - start; + //console.log("bench-log: apply_txs", lag); return r; } @@ -121,11 +275,11 @@ function async_get_record(hash) { const requestData = { jsonrpc: '2.0', method: 'get_record', - params: {hash: hasharray}, + params: withSession({hash: hasharray}), id: 4 }; - let responseStr = requestMerkleData(requestData); + let responseStr = requestMerkle(requestData); const response = JSON.parse(responseStr); if (response.error==undefined) { let result = response.result.map((x)=>{return BigInt(x)}); @@ -138,13 +292,98 @@ function async_get_record(hash) { } export function get_record(hash) { + const overlay = getMerkleOverlay(); + const overlayRecord = overlayGet(overlay, "records", hash2array(hash).join(",")); + if (overlayRecord !== undefined) { + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.getRecords)) trace.getRecords = []; + trace.getRecords.push({ + hash: hash2array(hash), + }); + } + return overlayRecord; + } const start = performance.now(); let r = async_get_record(hash); const end = performance.now(); let lag = end - start; //console.log("bench-log: update_record", lag); + const trace = getMerkleTrace(); + if (trace) { + if (!Array.isArray(trace.getRecords)) trace.getRecords = []; + trace.getRecords.push({ + hash: hash2array(hash), + }); + } return r; } +export function begin_session() { + const requestData = { + jsonrpc: '2.0', + method: 'begin_session', + params: {}, + id: 5, + }; + const responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to begin_session:', response.error); + throw new Error('Failed to begin_session'); + } +} + +export function drop_session(session) { + const requestData = { + jsonrpc: '2.0', + method: 'drop_session', + params: { session }, + id: 6, + }; + const responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to drop_session:', response.error); + throw new Error('Failed to drop_session'); + } +} + +export function reset_session(session) { + const requestData = { + jsonrpc: '2.0', + method: 'reset_session', + params: { session }, + id: 7, + }; + const responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to reset_session:', response.error); + throw new Error('Failed to reset_session'); + } +} +export function commit_session(session) { + const requestData = { + jsonrpc: '2.0', + method: 'commit_session', + params: { session }, + id: 8, + }; + const responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to commit_session:', response.error); + throw new Error('Failed to commit_session'); + } +} diff --git a/src/preexec/preexec_worker.ts b/src/preexec/preexec_worker.ts new file mode 100644 index 0000000..35732d2 --- /dev/null +++ b/src/preexec/preexec_worker.ts @@ -0,0 +1,162 @@ +import { parentPort } from "node:worker_threads"; + +import initBootstrap, * as bootstrap from "../bootstrap/bootstrap.js"; +import initApplication, * as application from "../application/application.js"; +import { signature_to_u64array } from "../signature.js"; + +import type { TxWitness } from "../prover.js"; + +type PreexecRequest = { + id: number; + root: bigint[]; + signature: TxWitness; + skipVerify?: boolean; +}; + +type Trace = { + reads?: string[]; + writes?: Array<{ index: string; data: number[] }>; + getRecords?: Array<{ hash: number[] }>; + updateRecords?: Array<{ hash: number[]; data: string[] }>; +}; + +type PreexecResponse = + | PreexecOk + | PreexecErr; + +type PreexecOk = { + id: number; + ok: true; + result: bigint[]; + finalRoot: bigint[]; + trace: { + reads: string[]; + writes: Array<{ index: string; data: number[] }>; + getRecords: Array<{ hash: number[] }>; + updateRecords: Array<{ hash: number[]; data: string[] }>; + }; + timingMs: { + verify?: number; + handleTx: number; + }; +}; + +type PreexecErr = { + id: number; + ok: false; + error: string; +}; + +function makeEmptyTrace(): Trace { + return { reads: [], writes: [], getRecords: [], updateRecords: [] }; +} + +function normalizeTrace(trace: Trace): PreexecOk["trace"] { + const reads = Array.isArray(trace.reads) ? trace.reads : []; + const writes = Array.isArray(trace.writes) ? trace.writes : []; + const getRecords = Array.isArray(trace.getRecords) ? trace.getRecords : []; + const updateRecords = Array.isArray(trace.updateRecords) ? trace.updateRecords : []; + + const uniqReads = Array.from(new Set(reads)); + const lastWriteByIndex = new Map(); + for (const w of writes) { + if (!w || typeof w.index !== "string" || !Array.isArray(w.data)) continue; + lastWriteByIndex.set(w.index, w.data); + } + const uniqWrites = Array.from(lastWriteByIndex.entries()).map(([index, data]) => ({ + index, + data, + })); + + const uniqGetRecords = []; + const seenGetRecords = new Set(); + for (const r of getRecords) { + if (!r || !Array.isArray(r.hash)) continue; + const key = r.hash.join(","); + if (seenGetRecords.has(key)) continue; + seenGetRecords.add(key); + uniqGetRecords.push(r); + } + + const lastUpdateByHash = new Map(); + for (const r of updateRecords) { + if (!r || !Array.isArray(r.hash) || !Array.isArray(r.data)) continue; + const key = r.hash.join(","); + lastUpdateByHash.set(key, r); + } + const uniqUpdateRecords = Array.from(lastUpdateByHash.values()); + + return { + reads: uniqReads, + writes: uniqWrites, + getRecords: uniqGetRecords, + updateRecords: uniqUpdateRecords, + }; +} + +function setGlobalTrace(trace: Trace | null) { + (globalThis as any).__MERKLE_TRACE = trace; +} + +function setTraceOnlyWrites(enabled: boolean) { + (globalThis as any).__MERKLE_TRACE_ONLY_WRITES = enabled; +} + +function setOverlay(overlay: { leaves: Map; records: Map } | null) { + (globalThis as any).__MERKLE_TRACE_OVERLAY = overlay; +} + +await (initBootstrap as any)(); +await (initApplication as any)(bootstrap); + +if (!parentPort) { + throw new Error("preexec_worker must run as a Worker"); +} + +parentPort.on("message", (msg: PreexecRequest) => { + void (async () => { + try { + const root = new BigUint64Array(msg.root); + application.initialize(root); + + const u64array = signature_to_u64array(msg.signature); + const timingMs: PreexecOk["timingMs"] = { handleTx: 0 }; + + if (!msg.skipVerify) { + const verifyStart = performance.now(); + application.verify_tx_signature(u64array); + timingMs.verify = performance.now() - verifyStart; + } + + const trace = makeEmptyTrace(); + setGlobalTrace(trace); + setTraceOnlyWrites(true); + setOverlay({ leaves: new Map(), records: new Map() }); + const start = performance.now(); + const result = application.handle_tx(u64array); + timingMs.handleTx = performance.now() - start; + setTraceOnlyWrites(false); + setOverlay(null); + setGlobalTrace(null); + + const finalRoot = application.query_root(); + + const response: PreexecResponse = { + id: msg.id, + ok: true, + result: Array.from(result), + finalRoot: Array.from(finalRoot), + trace: normalizeTrace(trace), + timingMs, + }; + parentPort!.postMessage(response); + } catch (err) { + const response: PreexecResponse = { + id: msg.id, + ok: false, + error: err instanceof Error ? err.stack || err.message : String(err), + }; + parentPort!.postMessage(response); + } + })(); +}); diff --git a/src/service.ts b/src/service.ts index e24b2b4..2fea82e 100644 --- a/src/service.ts +++ b/src/service.ts @@ -1,15 +1,20 @@ //import initHostBind, * as hostbind from "./wasmbind/hostbind.js"; import initBootstrap, * as bootstrap from "./bootstrap/bootstrap.js"; +import { apply_txs, begin_session, commit_session, drop_session } from "./bootstrap/rpcbind.js"; import initApplication, * as application from "./application/application.js"; import { test_merkle_db_service } from "./test.js"; -import { verifySign, LeHexBN, sign, PlayerConvention, ZKWasmAppRpc, createCommand } from "zkwasm-minirollup-rpc"; +import { LeHexBN, sign, PlayerConvention, ZKWasmAppRpc, createCommand } from "zkwasm-minirollup-rpc"; +import { signature_to_u64array } from "./signature.js"; import { Queue, Worker, Job } from 'bullmq'; import IORedis from 'ioredis'; import express, {Express} from 'express'; +import { createHash } from 'node:crypto'; +import { cpus } from "node:os"; +import { Worker as NodeWorker } from "node:worker_threads"; import { submitProofWithRetry, has_uncomplete_task, TxWitness, get_latest_proof, has_task } from "./prover.js"; import { ensureIndexes } from "./commit.js"; import cors from "cors"; -import { get_mongoose_db, modelBundle, modelJob, modelRand, get_service_port, get_server_admin_key, modelTx, get_contract_addr, get_chain_id } from "./config.js"; +import { get_chain_id, get_contract_addr, get_mongoose_db, get_queue_name, get_shard_count, get_shard_id, get_server_admin_key, get_service_port, modelBundle, modelJob, modelRand, modelTx } from "./config.js"; import { getMerkleArray } from "./contract.js"; import { ZkWasmUtil } from "zkwasm-service-helper"; import dotenv from 'dotenv'; @@ -17,11 +22,448 @@ import mongoose from 'mongoose'; import {hexStringToMerkleRoot, merkleRootToBeHexString} from "./lib.js"; import {sha256} from "ethers"; import {TxStateManager} from "./commit.js"; -import {queryAccounts, storeAccount} from "./account.js"; +import {ensureAccountIndexes, queryAccounts, storeAccount} from "./account.js"; +import { MongoWriteBuffer } from "./mongo_write_buffer.js"; // Load environment variables from .env file dotenv.config(); +const LOG_TX = process.env.LOG_TX === '1'; +const LOG_BUNDLE = process.env.LOG_BUNDLE === '1'; +const LOG_QUEUE_STATS = process.env.LOG_QUEUE_STATS === '1'; +const LOG_AUTOJOB = process.env.LOG_AUTOJOB === '1'; +const DISABLE_AUTOTICK = process.env.DISABLE_AUTOTICK === '1'; +const AUTOJOB_FATAL = process.env.AUTOJOB_FATAL === '1'; +const DISABLE_SNAPSHOT = process.env.DISABLE_SNAPSHOT === '1'; +const DISABLE_MONGO_TX_STORE = process.env.DISABLE_MONGO_TX_STORE === '1'; +const DISABLE_MONGO_JOB_STORE = process.env.DISABLE_MONGO_JOB_STORE === '1'; +const ASYNC_MONGO_WRITES = process.env.ASYNC_MONGO_WRITES === '1'; +const BATCH_MONGO_WRITES = process.env.BATCH_MONGO_WRITES === '1'; +const MONGO_BATCH_SIZE = Number.parseInt(process.env.MONGO_BATCH_SIZE ?? "200", 10); +const MONGO_FLUSH_MS = Number.parseInt(process.env.MONGO_FLUSH_MS ?? "50", 10); +const MONGO_BATCH_FATAL = process.env.MONGO_BATCH_FATAL !== '0'; +const MONGO_JOB_BATCH_FATAL = process.env.MONGO_JOB_BATCH_FATAL === '1'; +const LIGHT_JOB_RESULT = process.env.LIGHT_JOB_RESULT === '1'; +const MERKLE_SESSION_OVERLAY = process.env.MERKLE_SESSION_OVERLAY === '1'; +const ENFORCE_SHARD = process.env.ENFORCE_SHARD !== '0'; + +// Experimental: single-root optimistic concurrency via parallel pre-exec + conflict filtering. +// - Off by default; enable with OPTIMISTIC_APPLY=1. +// - Designed for higher TPS; correctness depends on complete trace coverage of state reads/writes. +const OPTIMISTIC_APPLY = process.env.OPTIMISTIC_APPLY === '1'; +const OPTIMISTIC_WORKERS = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_WORKERS ?? "", 10); + if (Number.isFinite(raw) && raw > 0) return raw; + return Math.max(1, cpus().length); +})(); +const OPTIMISTIC_BATCH = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_BATCH ?? "32", 10); + if (Number.isFinite(raw) && raw > 0) return raw; + return 32; +})(); +const OPTIMISTIC_FLUSH_MS = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_FLUSH_MS ?? "5", 10); + if (Number.isFinite(raw) && raw >= 0) return raw; + return 5; +})(); +const OPTIMISTIC_BUNDLE_SIZE = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_BUNDLE_SIZE ?? "100", 10); + if (Number.isFinite(raw) && raw > 0) return raw; + return 100; +})(); + +type PreexecTrace = { + reads: string[]; + writes: Array<{ index: string; data: number[] }>; + getRecords: Array<{ hash: number[] }>; + updateRecords: Array<{ hash: number[]; data: string[] }>; +}; + +type PreexecOk = { + id: number; + ok: true; + result: bigint[]; + finalRoot: bigint[]; + trace: PreexecTrace; + timingMs: { + verify?: number; + handleTx: number; + }; +}; + +type PreexecErr = { + id: number; + ok: false; + error: string; +}; + +type PreexecResponse = PreexecOk | PreexecErr; + +type PreexecRequest = { + id: number; + root: bigint[]; + signature: TxWitness; + skipVerify?: boolean; +}; + +function u64ToLeBytes(value: bigint): number[] { + const out: number[] = []; + let v = value; + for (let i = 0; i < 8; i++) { + out.push(Number(v & 0xffn)); + v >>= 8n; + } + return out; +} + +function rootU64ToBytes(root: BigUint64Array): number[] { + const bytes: number[] = []; + for (const limb of root) { + bytes.push(...u64ToLeBytes(limb)); + } + if (bytes.length !== 32) { + throw new Error(`invalid root byte length: ${bytes.length}`); + } + return bytes; +} + +function bytes32ToRootU64(bytes: readonly number[]): BigUint64Array { + if (bytes.length !== 32) { + throw new Error(`expected 32 bytes, got ${bytes.length}`); + } + const limbs: bigint[] = []; + for (let i = 0; i < 4; i++) { + let limb = 0n; + for (let j = 0; j < 8; j++) { + limb |= BigInt(bytes[i * 8 + j] ?? 0) << (8n * BigInt(j)); + } + limbs.push(limb); + } + return new BigUint64Array(limbs); +} + +function recordKey(hash: readonly number[]): string { + return hash.join(","); +} + +function intersects(a: Set, b: Set): boolean { + for (const x of a) { + if (b.has(x)) return true; + } + return false; +} + +function buildRwSets(trace: PreexecTrace): { reads: Set; writes: Set } { + const reads = new Set(); + const writes = new Set(); + for (const idx of trace.reads ?? []) { + reads.add(`leaf:${idx}`); + } + for (const w of trace.writes ?? []) { + writes.add(`leaf:${w.index}`); + } + for (const rec of trace.getRecords ?? []) { + if (!rec || !Array.isArray(rec.hash)) continue; + reads.add(`record:${recordKey(rec.hash)}`); + } + for (const rec of trace.updateRecords ?? []) { + if (!rec || !Array.isArray(rec.hash)) continue; + writes.add(`record:${recordKey(rec.hash)}`); + } + return { reads, writes }; +} + +function withMerkleSessionDisabled(fn: () => T): T { + const g = globalThis as any; + const prev = g.__MERKLE_SESSION; + g.__MERKLE_SESSION = null; + try { + return fn(); + } finally { + g.__MERKLE_SESSION = prev; + } +} + +class PreexecPool { + private workers: NodeWorker[]; + private idle: NodeWorker[]; + private queue: Array<{ req: PreexecRequest; resolve: (resp: PreexecResponse) => void; reject: (err: Error) => void }> = []; + private inflight: Map void; reject: (err: Error) => void }> = new Map(); + + constructor(private workerUrl: URL, size: number) { + const n = Number.isFinite(size) && size > 0 ? Math.floor(size) : 1; + this.workers = []; + this.idle = []; + for (let i = 0; i < n; i++) { + const w = new NodeWorker(workerUrl, { type: "module" } as any); + w.on("message", (msg: PreexecResponse) => { + const pending = this.inflight.get(msg.id); + if (!pending) return; + this.inflight.delete(msg.id); + this.idle.push(w); + pending.resolve(msg); + this.drain(); + }); + w.on("error", (err) => { + // bubble errors to all inflight tasks + for (const [, pending] of this.inflight) { + pending.reject(err instanceof Error ? err : new Error(String(err))); + } + this.inflight.clear(); + }); + this.workers.push(w); + this.idle.push(w); + } + } + + exec(req: PreexecRequest): Promise { + return new Promise((resolve, reject) => { + this.queue.push({ req, resolve, reject }); + this.drain(); + }); + } + + private drain() { + while (this.idle.length > 0 && this.queue.length > 0) { + const w = this.idle.pop()!; + const task = this.queue.shift()!; + this.inflight.set(task.req.id, { resolve: task.resolve, reject: task.reject }); + w.postMessage(task.req); + } + } + + async close(): Promise { + await Promise.all(this.workers.map((w) => w.terminate())); + } +} + +type PendingJob = { + job: Job; + resolve: (value: unknown) => void; + reject: (err: unknown) => void; +}; + +class OptimisticSequencer { + private pool: PreexecPool; + private pending: PendingJob[] = []; + private flushing = false; + private flushTimer: ReturnType | null = null; + private nextId = 1; + private rootBytes: number[]; + private bundleTxCount = 0; + + constructor(private service: Service) { + this.pool = new PreexecPool(new URL("./preexec/preexec_worker.js", import.meta.url), OPTIMISTIC_WORKERS); + this.rootBytes = rootU64ToBytes(service.merkleRoot); + } + + enqueue(job: Job): Promise { + return new Promise((resolve, reject) => { + this.pending.push({ job, resolve, reject }); + this.scheduleFlush(); + }); + } + + private scheduleFlush() { + if (this.flushing) return; + if (this.pending.length >= OPTIMISTIC_BATCH) { + void this.flush(); + return; + } + if (this.flushTimer) return; + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + void this.flush(); + }, OPTIMISTIC_FLUSH_MS); + } + + private async flush() { + if (this.flushing) return; + this.flushing = true; + try { + while (this.pending.length > 0) { + const batch = this.pending.splice(0, OPTIMISTIC_BATCH); + const snapshotRoot = bytes32ToRootU64(this.rootBytes); + const rootArr = Array.from(snapshotRoot); + + const preexecs = await Promise.all( + batch.map((p) => + this.pool.exec({ + id: this.nextId++, + root: rootArr, + signature: (p.job.data as any).value as TxWitness, + skipVerify: (p.job.data as any).verified === true, + }), + ), + ); + + const unionWrites = new Set(); + const unionReads = new Set(); + const deferred: PendingJob[] = []; + const chosen: Array<{ p: PendingJob; resp: PreexecOk; isReplay: boolean }> = []; + + for (let i = 0; i < batch.length; i++) { + const p = batch[i]!; + const resp = preexecs[i]!; + if (!resp.ok) { + p.reject(new Error(resp.error)); + continue; + } + + const { reads, writes } = buildRwSets(resp.trace); + const conflict = + intersects(writes, unionWrites) || + intersects(reads, unionWrites) || + intersects(writes, unionReads); + + const errCode = resp.result?.[0] ?? 0n; + const isOk = typeof errCode === "bigint" ? errCode === 0n : BigInt(errCode) === 0n; + + if (conflict) { + deferred.push(p); + continue; + } + + if (!isOk) { + // If it doesn't depend on prior writes (no conflict), treat it as permanent failure. + const errorMsg = application.decode_error(Number(errCode)); + p.reject(new Error(errorMsg)); + continue; + } + + chosen.push({ p, resp, isReplay: p.job.name === "replay" }); + for (const r of reads) unionReads.add(r); + for (const w of writes) unionWrites.add(w); + } + + // Put deferred jobs back to the head to preserve fairness. + this.pending = deferred.concat(this.pending); + + let cursor = 0; + while (cursor < chosen.length) { + const remainingInBundle = OPTIMISTIC_BUNDLE_SIZE - this.bundleTxCount; + const take = Math.min(remainingInBundle, chosen.length - cursor); + const segment = chosen.slice(cursor, cursor + take); + cursor += take; + + let roots: number[][]; + try { + const txs = segment.map((item) => ({ + writes: item.resp.trace.writes ?? [], + updateRecords: item.resp.trace.updateRecords ?? [], + })); + roots = withMerkleSessionDisabled(() => apply_txs(this.rootBytes, txs)); + if (!Array.isArray(roots) || roots.length !== segment.length) { + throw new Error(`apply_txs returned ${Array.isArray(roots) ? roots.length : typeof roots} roots, expected ${segment.length}`); + } + } catch (e) { + for (const item of segment) { + item.p.reject(e); + } + break; + } + + for (let i = 0; i < segment.length; i++) { + const item = segment[i]!; + const signature = (item.p.job.data as any).value as TxWitness; + const jobId = item.p.job.id; + try { + const events = new BigUint64Array(item.resp.result); + this.rootBytes = roots[i]!; + this.service.optimisticHeadRoot = bytes32ToRootU64(this.rootBytes); + await this.service.optimisticInstall(signature, jobId, events, item.isReplay); + item.p.resolve(await this.buildJobResult(item.p.job, signature)); + + this.bundleTxCount += 1; + if (this.bundleTxCount >= OPTIMISTIC_BUNDLE_SIZE) { + await this.flushBundle(this.rootBytes); + this.bundleTxCount = 0; + } + } catch (e) { + item.p.reject(e); + } + } + } + + if (chosen.length === 0) { + // All jobs in this batch were either failed or deferred; yield to allow more enqueues. + break; + } + } + } finally { + this.flushing = false; + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + if (this.pending.length > 0) { + this.scheduleFlush(); + } + } + } + + private async flushBundle(postRootBytes: number[]) { + const postRootU64 = bytes32ToRootU64(postRootBytes); + + // Track bundle using pre-root (service.merkleRoot), then advance to post-root. + await this.service.trackBundle(""); + + const bundledTxs = transactions_witness; + this.service.preMerkleRoot = this.service.merkleRoot; + this.service.merkleRoot = postRootU64; + this.service.optimisticHeadRoot = this.service.merkleRoot; + + await this.service.txBatched( + bundledTxs, + merkleRootToBeHexString(this.service.preMerkleRoot), + merkleRootToBeHexString(this.service.merkleRoot), + ); + + transactions_witness = new Array(); + + if (process.env.RELOAD_WASM_ON_BUNDLE === "1") { + await (initApplication as any)(bootstrap); + } + application.initialize(this.service.merkleRoot); + await this.service.txManager.moveToCommit(merkleRootToBeHexString(this.service.merkleRoot)); + + if (this.service.mongoWriteBuffer) { + await this.service.mongoWriteBuffer.flush("bundle"); + } + } + + private async buildJobResult(job: Job, signature: TxWitness) { + if (LIGHT_JOB_RESULT) { + return { bundle: this.service.txManager.currentUncommitMerkleRoot }; + } + // Sync wasm instance to the latest root before serving get_state/snapshot. + application.initialize(bytes32ToRootU64(this.rootBytes)); + if (!DISABLE_SNAPSHOT) { + snapshot = JSON.parse(application.snapshot()); + } + + let player = null; + if (job.name !== "replay") { + const pkx = new LeHexBN(signature.pkx).toU64Array(); + player = JSON.parse(application.get_state(pkx)); + } + return { + player, + state: snapshot, + bundle: this.service.txManager.currentUncommitMerkleRoot, + }; + } +} + +function shardForPkx(pkx: string, shardCount: number): number { + if (shardCount <= 1) return 0; + const hex = pkx.startsWith("0x") ? pkx.slice(2) : pkx; + if (!/^[0-9a-fA-F]*$/.test(hex)) return 0; + const buf = Buffer.from(hex.length % 2 === 0 ? hex : `0${hex}`, "hex"); + const digest = createHash("sha256").update(buf).digest(); + return digest.readUInt32LE(0) % shardCount; +} + let deploymode = false; let remote = false; let migrate = false; @@ -104,6 +546,10 @@ export class Service { preMerkleRoot: BigUint64Array | null; txManager: TxStateManager; blocklist: Map; + mongoWriteBuffer: MongoWriteBuffer | null; + merkleSession: string | null; + optimisticSequencer: OptimisticSequencer | null; + optimisticHeadRoot: BigUint64Array | null; constructor( cb: (arg: TxWitness, events: BigUint64Array) => Promise = async (arg: TxWitness, events: BigUint64Array) => {}, @@ -127,6 +573,10 @@ export class Service { this.preMerkleRoot = null; this.txManager = new TxStateManager(merkleRootToBeHexString(this.merkleRoot)); this.blocklist = new Map(); + this.mongoWriteBuffer = null; + this.merkleSession = null; + this.optimisticSequencer = null; + this.optimisticHeadRoot = null; } async syncToLatestMerkelRoot() { @@ -179,11 +629,15 @@ export class Service { } async trackBundle(taskId: string) { - console.log("track bundle:", this.bundleIndex); + if (LOG_BUNDLE) { + console.log("track bundle:", this.bundleIndex); + } let preMerkleRootStr = ""; if (this.preMerkleRoot != null) { preMerkleRootStr = merkleRootToBeHexString(this.preMerkleRoot); - console.log("update merkle chain ...", preMerkleRootStr); + if (LOG_BUNDLE) { + console.log("update merkle chain ...", preMerkleRootStr); + } try { const prevBundle = await modelBundle.findOneAndUpdate({ merkleRoot: merkleRootToBeHexString(this.preMerkleRoot), @@ -194,14 +648,18 @@ export class Service { console.log(`fatal: bundleIndex does not match: ${this.bundleIndex}, ${prevBundle!.bundleIndex}`); throw Error(`Bundle Index does not match: current index is ${this.bundleIndex}, previous index is ${prevBundle!.bundleIndex}`); } - console.log("merkle chain prev is", prevBundle); + if (LOG_BUNDLE) { + console.log("merkle chain prev is", prevBundle); + } } catch (e) { console.log(`fatal: can not find bundle for previous MerkleRoot: ${merkleRootToBeHexString(this.preMerkleRoot)}`); throw Error(`fatal: can not find bundle for previous MerkleRoot: ${merkleRootToBeHexString(this.preMerkleRoot)}`); } } this.bundleIndex += 1; - console.log("add transaction bundle:", this.bundleIndex, merkleRootToBeHexString(this.merkleRoot)); + if (LOG_BUNDLE) { + console.log("add transaction bundle:", this.bundleIndex, merkleRootToBeHexString(this.merkleRoot)); + } const bundleRecord = new modelBundle({ merkleRoot: merkleRootToBeHexString(this.merkleRoot), preMerkleRoot: preMerkleRootStr, @@ -210,7 +668,9 @@ export class Service { }); try { await bundleRecord.save(); - console.log(`task recorded with key: ${merkleRootToBeHexString(this.merkleRoot)}`); + if (LOG_BUNDLE) { + console.log(`task recorded with key: ${merkleRootToBeHexString(this.merkleRoot)}`); + } } catch (e) { let record = await modelBundle.findOneAndUpdate({ @@ -223,20 +683,25 @@ export class Service { }, {}); console.log("fatal: conflict db merkle"); // TODO: do we need to trim the corrputed branch? - console.log(record); + if (LOG_BUNDLE) { + console.log(record); + } //throw e } return bundleRecord; } - async install_transactions(tx: TxWitness, jobid: string | undefined, events: BigUint64Array, isReplay = false) { + async install_transactions(tx: TxWitness, jobid: string | undefined, events: BigUint64Array, isReplay = false): Promise { // const installStartTime = performance.now(); - console.log("installing transaction into rollup ..."); + let bundled = false; + if (LOG_TX) { + console.log("installing transaction into rollup ..."); + } transactions_witness.push(tx); // if (!isReplay) { // const insertStart = performance.now(); - const handled = await this.txManager.insertTxIntoCommit(tx); + const handled = await this.txManager.insertTxIntoCommit(tx, isReplay); // const insertEnd = performance.now(); // console.log(`[${getTimestamp()}] insertTxIntoCommit took: ${insertEnd - insertStart}ms, handled: ${handled}`); if (handled == false) { @@ -246,24 +711,48 @@ export class Service { // console.log(`[${getTimestamp()}] txCallback took: ${callbackEnd - callbackStart}ms`); } // } - snapshot = JSON.parse(application.snapshot()); - console.log("transaction installed, rollup pool length is:", transactions_witness.length); - try { - // const saveStart = performance.now(); - const txRecord = new modelTx(tx); - await txRecord.save(); - // const saveEnd = performance.now(); - // console.log(`[${getTimestamp()}] txRecord.save took: ${saveEnd - saveStart}ms`); - } catch (e) { - console.log("fatal: store tx failed ... process will terminate"); + if (!DISABLE_SNAPSHOT) { + snapshot = JSON.parse(application.snapshot()); + } + if (LOG_TX) { + console.log("transaction installed, rollup pool length is:", transactions_witness.length); + } + if (!isReplay && !DISABLE_MONGO_TX_STORE) { + if (this.mongoWriteBuffer) { + this.mongoWriteBuffer.enqueueTx({ + msg: tx.msg, + pkx: tx.pkx, + sigx: tx.sigx, + }); + } else { + const txRecord = new modelTx(tx); + if (ASYNC_MONGO_WRITES) { + void txRecord.save().catch((e) => { + console.log("fatal: store tx failed ... process will terminate", e); + process.exit(1); + }); + } else { + try { + await txRecord.save(); + } catch (e) { + console.log("fatal: store tx failed ... process will terminate", e); + process.exit(1); + } + } + } } if (application.preempt()) { + bundled = true; // const preemptStart = performance.now(); - console.log("rollup reach its preemption point, generating proof:"); + if (LOG_BUNDLE) { + console.log("rollup reach its preemption point, generating proof:"); + } let txdata = application.finalize(); // const finalizeEnd = performance.now(); // console.log(`[${getTimestamp()}] application.finalize took: ${finalizeEnd - preemptStart}ms`); - console.log("txdata is:", txdata); + if (LOG_BUNDLE) { + console.log("txdata is:", txdata); + } let task_id = null; // TODO: store a bundle before we fail @@ -277,34 +766,51 @@ export class Service { } } try { - console.log("proving task submitted at:", task_id); - console.log("tracking task in db current ...", merkleRootToBeHexString(this.merkleRoot)); + if (LOG_BUNDLE) { + console.log("proving task submitted at:", task_id); + console.log("tracking task in db current ...", merkleRootToBeHexString(this.merkleRoot)); + } // const trackStart = performance.now(); await this.trackBundle(task_id); // const trackEnd = performance.now(); // console.log(`[${getTimestamp()}] trackBundle took: ${trackEnd - trackStart}ms`); - // clear witness queue and set preMerkleRoot - transactions_witness = new Array(); + // preserve witness for batch callback before clearing + const bundledTxs = transactions_witness; + + // set pre/post roots for next bundle this.preMerkleRoot = this.merkleRoot; // need to update merkle_root as the input of next proof this.merkleRoot = application.query_root(); - // record all the txs externally so that the external db can preserve a snap shot // const batchStart = performance.now(); - await this.txBatched(transactions_witness, merkleRootToBeHexString(this.preMerkleRoot), merkleRootToBeHexString(this.merkleRoot)); + await this.txBatched(bundledTxs, merkleRootToBeHexString(this.preMerkleRoot), merkleRootToBeHexString(this.merkleRoot)); // const batchEnd = performance.now(); // console.log(`[${getTimestamp()}] txBatched took: ${batchEnd - batchStart}ms`); + // clear witness queue for next bundle + transactions_witness = new Array(); + // reset application here - console.log("restore root:", this.merkleRoot); + if (LOG_BUNDLE) { + console.log("restore root:", this.merkleRoot); + } // const resetStart = performance.now(); - await (initApplication as any)(bootstrap); + if (process.env.RELOAD_WASM_ON_BUNDLE === '1') { + await (initApplication as any)(bootstrap); + } application.initialize(this.merkleRoot); await this.txManager.moveToCommit(merkleRootToBeHexString(this.merkleRoot)); + + if (MERKLE_SESSION_OVERLAY && this.merkleSession) { + const commitResult = commit_session(this.merkleSession); + if (LOG_BUNDLE) { + console.log("merkle overlay committed:", commitResult); + } + } // const resetEnd = performance.now(); // console.log(`[${getTimestamp()}] Application reset took: ${resetEnd - resetStart}ms`); } catch (e) { @@ -315,6 +821,44 @@ export class Service { // let current_merkle_root = application.query_root(); // const installEndTime = performance.now(); // console.log("transaction installed with last root:", current_merkle_root); + return bundled; + } + + // Optimistic apply path: records tx/commit metadata but does not execute WASM in this process. + // The actual state transition is applied from pre-exec traces (Merkle leaf/record writes). + async optimisticInstall(tx: TxWitness, jobid: string | undefined, events: BigUint64Array, isReplay = false): Promise { + transactions_witness.push(tx); + + const handled = await this.txManager.insertTxIntoCommit(tx, isReplay); + if (handled === false) { + await this.txCallback(tx, events); + } + + if (!isReplay && !DISABLE_MONGO_TX_STORE) { + if (this.mongoWriteBuffer) { + this.mongoWriteBuffer.enqueueTx({ + msg: tx.msg, + pkx: tx.pkx, + sigx: tx.sigx, + }); + } else { + const txRecord = new modelTx({ + msg: tx.msg, + pkx: tx.pkx, + sigx: tx.sigx, + }); + if (ASYNC_MONGO_WRITES) { + void txRecord.save().catch((e) => { + console.log("fatal: store tx failed ... process will terminate", e); + process.exit(1); + }); + } else { + await txRecord.save(); + } + } + } + + return false; } async initialize() { @@ -334,6 +878,18 @@ export class Service { // Call ensureIndexes after connection is established await ensureIndexes(); + await ensureAccountIndexes(); + + if (BATCH_MONGO_WRITES) { + this.mongoWriteBuffer = new MongoWriteBuffer({ + txCollection: modelTx.collection as any, + jobCollection: modelJob.collection as any, + batchSize: MONGO_BATCH_SIZE, + flushMs: MONGO_FLUSH_MS, + fatalTxError: MONGO_BATCH_FATAL, + fatalJobError: MONGO_JOB_BATCH_FATAL, + }); + } console.log("connecting redis server:", redisHost); const connection = new IORedis( @@ -366,6 +922,25 @@ export class Service { console.log("check merkel database connection ..."); test_merkle_db_service(); + if (MERKLE_SESSION_OVERLAY) { + try { + this.merkleSession = begin_session(); + (globalThis as any).__MERKLE_SESSION = this.merkleSession; + console.log("merkle session overlay enabled:", this.merkleSession); + process.once("exit", () => { + if (!this.merkleSession) return; + try { + drop_session(this.merkleSession); + } catch { + // ignore + } + }); + } catch (e) { + console.log("fatal: begin merkle session failed", e); + process.exit(1); + } + } + if (migrate) { if (remote) { throw Error("Can't migrate in remote mode"); @@ -410,8 +985,11 @@ export class Service { await this.syncToLatestMerkelRoot(); } - console.log("initialize sequener queue ..."); - const myQueue = new Queue('sequencer', {connection}); + const shardCount = get_shard_count(); + const shardId = get_shard_id(); + const queueName = get_queue_name(); + console.log("initialize sequener queue ...", { queueName, shardId, shardCount }); + const myQueue = new Queue(queueName, {connection}); const waitingCount = await myQueue.getWaitingCount(); console.log("waiting Count is:", waitingCount, " perform draining ..."); @@ -427,9 +1005,20 @@ export class Service { // update the merkle root variable this.merkleRoot = application.query_root(); + this.optimisticHeadRoot = this.merkleRoot; + + if (OPTIMISTIC_APPLY) { + console.log("optimistic apply enabled", { + workers: OPTIMISTIC_WORKERS, + batch: OPTIMISTIC_BATCH, + flushMs: OPTIMISTIC_FLUSH_MS, + bundleSize: OPTIMISTIC_BUNDLE_SIZE, + }); + this.optimisticSequencer = new OptimisticSequencer(this); + } // Automatically add a job to the queue every few seconds - if (application.autotick()) { + if (!DISABLE_AUTOTICK && application.autotick()) { setInterval(async () => { try { await myQueue.add('autoJob', {command:0}); @@ -444,37 +1033,72 @@ export class Service { this.blocklist.clear(); }, 30000); - // Monitor queue length every 2 seconds - setInterval(async () => { - try { - const waitingCount = await myQueue.getWaitingCount(); - const activeCount = await myQueue.getActiveCount(); - const delayedCount = await myQueue.getDelayedCount(); - const completedCount = await myQueue.getCompletedCount(); - const failedCount = await myQueue.getFailedCount(); - - console.log(`[${getTimestamp()}] Queue Stats - Waiting: ${waitingCount}, Active: ${activeCount}, Delayed: ${delayedCount}, Completed: ${completedCount}, Failed: ${failedCount}`); - } catch (error) { - console.error('Error getting queue stats:', error); - } - }, 2000); + // Monitor queue length every 2 seconds (disabled by default for TPS) + if (LOG_QUEUE_STATS) { + setInterval(async () => { + try { + const waitingCount = await myQueue.getWaitingCount(); + const activeCount = await myQueue.getActiveCount(); + const delayedCount = await myQueue.getDelayedCount(); + const completedCount = await myQueue.getCompletedCount(); + const failedCount = await myQueue.getFailedCount(); + + console.log(`[${getTimestamp()}] Queue Stats - Waiting: ${waitingCount}, Active: ${activeCount}, Delayed: ${delayedCount}, Completed: ${completedCount}, Failed: ${failedCount}`); + } catch (error) { + console.error('Error getting queue stats:', error); + } + }, 2000); + } - this.worker = new Worker('sequencer', async job => { + this.worker = new Worker(queueName, async job => { const jobStartTime = performance.now(); // console.log(`[${getTimestamp()}] Worker started processing job: ${job.name}, id: ${job.id}`); if (job.name == 'autoJob') { + if (OPTIMISTIC_APPLY) { + try { + let rand = await generateRandomSeed(); + if (this.optimisticHeadRoot) { + application.initialize(this.optimisticHeadRoot); + } + let oldSeed = application.randSeed(); + + let seed = 0n; + if (oldSeed != 0n) { + const randRecord = await modelRand.find({ + commitment: oldSeed.toString(), + }); + seed = randRecord[0].seed!.readBigInt64LE(); + } + + const signature = sign(createCommand(0n, 0n, [seed, rand, 0n, 0n]), get_server_admin_key()); + await myQueue.add('transaction', { value: signature, verified: true }); + return { enqueued: true }; + } catch (error) { + const jobEndTime = performance.now(); + console.log(`[${getTimestamp()}] AutoJob failed after ${jobEndTime - jobStartTime}ms:`, error); + if (AUTOJOB_FATAL) { + console.log("fatal: handling auto tick error, process will terminate.", error); + process.exit(1); + } + throw error; + } + } // console.log(`[${getTimestamp()}] AutoJob tick started`); try { const randStartTime = performance.now(); let rand = await generateRandomSeed(); const randEndTime = performance.now(); - console.log(`[${getTimestamp()}] AutoJob generateRandomSeed took: ${randEndTime - randStartTime}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob generateRandomSeed took: ${randEndTime - randStartTime}ms`); + } const oldSeedStartTime = performance.now(); let oldSeed = application.randSeed(); const oldSeedEndTime = performance.now(); - console.log(`[${getTimestamp()}] AutoJob get old seed took: ${oldSeedEndTime - oldSeedStartTime}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob get old seed took: ${oldSeedEndTime - oldSeedStartTime}ms`); + } const oldSeedFindStartTime = performance.now(); let seed = 0n; @@ -485,7 +1109,9 @@ export class Service { seed = randRecord[0].seed!.readBigInt64LE(); }; const oldSeedFindEndTime = performance.now(); - console.log(`[${getTimestamp()}] AutoJob find old seed took: ${oldSeedFindEndTime - oldSeedFindStartTime}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob find old seed took: ${oldSeedFindEndTime - oldSeedFindStartTime}ms`); + } let signature = sign(createCommand(0n, 0n, [seed, rand, 0n, 0n]), get_server_admin_key()); //console.log("signautre is", signature); @@ -494,73 +1120,157 @@ export class Service { const verifyTxSignatureStartTime = performance.now(); application.verify_tx_signature(u64array); const verifyTxSignatureEndTime = performance.now(); - console.log(`[${getTimestamp()}] AutoJob verify_tx_signature took: ${verifyTxSignatureEndTime - verifyTxSignatureStartTime}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob verify_tx_signature took: ${verifyTxSignatureEndTime - verifyTxSignatureStartTime}ms`); + } const handleTxStart = performance.now(); let txResult = application.handle_tx(u64array); const handleTxEnd = performance.now(); - console.log(`[${getTimestamp()}] AutoJob handle_tx took: ${handleTxEnd - handleTxStart}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob handle_tx took: ${handleTxEnd - handleTxStart}ms`); + } const installStart = performance.now(); - await this.install_transactions(signature, job.id, txResult); + const bundled = await this.install_transactions(signature, job.id, txResult); + if (bundled && this.mongoWriteBuffer) { + await this.mongoWriteBuffer.flush("bundle"); + } const installEnd = performance.now(); - console.log(`[${getTimestamp()}] AutoJob install_transactions took: ${installEnd - installStart}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob install_transactions took: ${installEnd - installStart}ms`); + } } catch (error) { const jobEndTime = performance.now(); console.log(`[${getTimestamp()}] AutoJob failed after ${jobEndTime - jobStartTime}ms:`, error); - console.log("fatal: handling auto tick error, process will terminate.", error); - process.exit(1); + if (AUTOJOB_FATAL) { + console.log("fatal: handling auto tick error, process will terminate.", error); + process.exit(1); + } + throw error; } const jobEndTime = performance.now(); - console.log(`[${getTimestamp()}] AutoJob completed in ${jobEndTime - jobStartTime}ms`); + if (LOG_AUTOJOB) { + console.log(`[${getTimestamp()}] AutoJob completed in ${jobEndTime - jobStartTime}ms`); + } } else if (job.name == 'transaction' || job.name == 'replay') { - console.log("handle transaction ..."); + if (OPTIMISTIC_APPLY) { + if (!this.optimisticSequencer) { + throw new Error("optimistic apply enabled but sequencer not initialized"); + } + try { + const signature = job.data.value; + const result = await this.optimisticSequencer.enqueue(job); + if (job.name != 'replay' && !DISABLE_MONGO_JOB_STORE) { + if (this.mongoWriteBuffer) { + this.mongoWriteBuffer.enqueueJob({ + jobId: (signature as any).hash + (signature as any).pkx, + message: (signature as any).message, + result: "succeed", + }); + } else { + const jobRecord = new modelJob({ + jobId: (signature as any).hash + (signature as any).pkx, + message: (signature as any).message, + result: "succeed", + }); + if (ASYNC_MONGO_WRITES) { + void jobRecord.save().catch((e) => { + console.log("Error: store transaction job error", e); + }); + } else { + try { + await jobRecord.save(); + } catch (e) { + console.log("Error: store transaction job error", e); + throw e; + } + } + } + } + return result; + } catch (e) { + const jobEndTime = performance.now(); + console.log(`[${getTimestamp()}] ${job.name} failed after ${jobEndTime - jobStartTime}ms:`, e); + const pkx = job.data.value.pkx; + const fc = this.blocklist.get(pkx) || 0; + this.blocklist.set(pkx, fc + 1); + console.log("error optimistic apply", e); + throw e; + } + } + if (LOG_TX) { + console.log("handle transaction ..."); + } try { let signature = job.data.value; const verifySignatureStart = performance.now(); let u64array = signature_to_u64array(signature); - application.verify_tx_signature(u64array); + if (!(job.name === 'transaction' && job.data.verified === true)) { + application.verify_tx_signature(u64array); + } const verifySignatureEnd = performance.now(); - console.log(`[${getTimestamp()}] ${job.name} verify_tx_signature took: ${verifySignatureEnd - verifySignatureStart}ms`); + if (LOG_TX) { + console.log(`[${getTimestamp()}] ${job.name} verify_tx_signature took: ${verifySignatureEnd - verifySignatureStart}ms`); + } const handleTxStart = performance.now(); let txResult = application.handle_tx(u64array); const handleTxEnd = performance.now(); - console.log(`[${getTimestamp()}] ${job.name} handle_tx took: ${handleTxEnd - handleTxStart}ms`); + if (LOG_TX) { + console.log(`[${getTimestamp()}] ${job.name} handle_tx took: ${handleTxEnd - handleTxStart}ms`); + } let errorCode = txResult[0]; if (errorCode == 0n) { // make sure install transaction will succeed const installStart = performance.now(); - await this.install_transactions(signature, job.id, txResult, job.name=='replay'); + const bundled = await this.install_transactions(signature, job.id, txResult, job.name=='replay'); const installEnd = performance.now(); - console.log(`[${getTimestamp()}] ${job.name} install_transactions took: ${installEnd - installStart}ms`); - try { - // If this is the first time of running this tx, the store should work. - // If the store does not work (jobId conflict) then either there is a jobid - // conflict error in transaction mode or this is the second time running this - // transcation thus should in replay mode. - if (job.name != 'replay') { - const jobRecord = new modelJob({ - jobId: signature.hash + signature.pkx, - message: signature.message, - result: "succeed", + if (LOG_TX) { + console.log(`[${getTimestamp()}] ${job.name} install_transactions took: ${installEnd - installStart}ms`); + } + if (job.name != 'replay' && !DISABLE_MONGO_JOB_STORE) { + if (this.mongoWriteBuffer) { + this.mongoWriteBuffer.enqueueJob({ + jobId: signature.hash + signature.pkx, + message: signature.message, + result: "succeed", + }); + } else { + const jobRecord = new modelJob({ + jobId: signature.hash + signature.pkx, + message: signature.message, + result: "succeed", + }); + if (ASYNC_MONGO_WRITES) { + void jobRecord.save().catch((e) => { + console.log("Error: store transaction job error", e); }); - await jobRecord.save(); - } - } catch (e) { - if (job.name != 'replay') { - // if in replay mode, the tx can not been stored twice thus the error is expected - console.log("Error: store transaction job error"); - throw e + } else { + try { + await jobRecord.save(); + } catch (e) { + console.log("Error: store transaction job error", e); + throw e; + } + } } } + if (bundled && this.mongoWriteBuffer) { + await this.mongoWriteBuffer.flush("bundle"); + } } else { let errorMsg = application.decode_error(Number(errorCode)); throw Error(errorMsg) } // const jobEndTime = performance.now(); - console.log("done"); + if (LOG_TX) { + console.log("done"); + } + if (LIGHT_JOB_RESULT) { + return { bundle: this.txManager.currentUncommitMerkleRoot }; + } let player = null; const getStateStartTime = performance.now(); if (job.name != "replay") { @@ -575,7 +1285,9 @@ export class Service { bundle: this.txManager.currentUncommitMerkleRoot, }; const getStateEndTime = performance.now(); - console.log(`[${getTimestamp()}] ${job.name} get_state took: ${getStateEndTime - getStateStartTime}ms`); + if (LOG_TX) { + console.log(`[${getTimestamp()}] ${job.name} get_state took: ${getStateEndTime - getStateStartTime}ms`); + } return result } catch (e) { const jobEndTime = performance.now(); @@ -588,7 +1300,9 @@ export class Service { } } const jobEndTime = performance.now(); - console.log(`[${getTimestamp()}] ${job.name} completed in ${jobEndTime - jobStartTime}ms`); + if (LOG_TX) { + console.log(`[${getTimestamp()}] ${job.name} completed in ${jobEndTime - jobStartTime}ms`); + } }, {connection}); } @@ -601,6 +1315,8 @@ export class Service { console.log("start express server"); const app = express(); const port = get_service_port(); + const shardCount = get_shard_count(); + const shardId = get_shard_id(); app.use(express.json()); app.use(cors()); @@ -613,27 +1329,39 @@ export class Service { } try { - const hash = new LeHexBN(value.hash); - const pkx = new LeHexBN(value.pkx); - const pky = new LeHexBN(value.pky); - const sigx = new LeHexBN(value.sigx); - const sigy = new LeHexBN(value.sigy); - const sigr = new LeHexBN(value.sigr); - if (verifySign(hash, pkx, pky, sigx, sigy, sigr) == false) { - console.error('Invalid signature:'); - res.status(500).send('Invalid signature'); - } else { - const fc = this.blocklist.get(value.pkx) || 0; - if (fc > 3) { - res.status(500).send('This account is blocked for 1 minutes for multiple incorrect arguments'); - } else { - const job = await this.queue!.add('transaction', { value }); - res.status(201).send({ - success: true, - jobid: job.id + try { + const u64array = signature_to_u64array(value); + application.verify_tx_signature(u64array); + } catch (err) { + console.error('Invalid signature:', err); + return res.status(500).send('Invalid signature'); + } + + const fc = this.blocklist.get(value.pkx) || 0; + if (fc > 3) { + return res + .status(500) + .send( + 'This account is blocked for 1 minutes for multiple incorrect arguments' + ); + } + + if (ENFORCE_SHARD && shardCount > 1) { + const target = shardForPkx(value.pkx, shardCount); + if (target !== shardId) { + return res.status(409).send({ + success: false, + error: "WrongShard", + shard: target, }); } } + + const job = await this.queue!.add('transaction', { value, verified: true }); + return res.status(201).send({ + success: true, + jobid: job.id + }); } catch (error) { console.error('Error adding job to the queue:', error); res.status(500).send('Failed to add job to the queue'); @@ -669,6 +1397,12 @@ export class Service { //console.log("receive query command on: ", value.pkx); try { + if (OPTIMISTIC_APPLY && this.optimisticHeadRoot) { + application.initialize(this.optimisticHeadRoot); + if (!DISABLE_SNAPSHOT) { + snapshot = JSON.parse(application.snapshot()); + } + } const pkx = new LeHexBN(value.pkx).toU64Array(); let u64array = new BigUint64Array(4); u64array.set(pkx); @@ -678,7 +1412,9 @@ export class Service { player: player, state: snapshot } - await storeAccount(value.pkx, player, this.playerIndexer); + void storeAccount(value.pkx, player, this.playerIndexer).catch((error) => { + console.error('storeAccount failed:', error); + }); res.status(201).send({ success: true, data: JSON.stringify(result), @@ -748,6 +1484,25 @@ export class Service { } }); + app.get('/job_status/:id', async (req, res) => { + try { + const jobId = req.params.id; + const job = await Job.fromId(this.queue!, jobId); + if (!job) { + return res.status(404).json({ message: 'Job not found' }); + } + return res.status(200).json({ + id: job.id, + name: job.name, + processedOn: job.processedOn, + finishedOn: job.finishedOn, + failedReason: job.failedReason, + }); + } catch (err) { + res.status(500).json({ message: (err as Error).toString() }); + } + }); + app.get('/job/:id', async (req, res) => { try { let jobId = req.params.id; @@ -805,25 +1560,3 @@ export class Service { } } - -function signature_to_u64array(value: any) { - const msg = new LeHexBN(value.msg).toU64Array(value.msg.length/16); - const pkx = new LeHexBN(value.pkx).toU64Array(); - const pky = new LeHexBN(value.pky).toU64Array(); - const sigx = new LeHexBN(value.sigx).toU64Array(); - const sigy = new LeHexBN(value.sigy).toU64Array(); - const sigr = new LeHexBN(value.sigr).toU64Array(); - - let u64array = new BigUint64Array(20 + value.msg.length/16); - u64array.set(pkx, 0); - u64array.set(pky, 4); - u64array.set(sigx, 8); - u64array.set(sigy, 12); - u64array.set(sigr, 16); - u64array.set(msg, 20); - let cmdLength = (msg[0] >> 8n) % 256n; - if (Number(cmdLength) != msg.length) { - throw Error("Wrong Command Size"); - } - return u64array; -} From 0fe37a78d14a66fbc7f3ce40d96f519f003200fe Mon Sep 17 00:00:00 2001 From: "codex gpt5.2codex" <1900017707@pku.edu.cn> Date: Fri, 26 Dec 2025 16:53:24 +0000 Subject: [PATCH 2/3] optimistic apply: async batch merkle commit --- src/bootstrap/rpcbind.d.ts | 24 ++++++ src/bootstrap/rpcbind.js | 76 ++++++++++++++++++ src/preexec/preexec_worker.ts | 11 ++- src/service.ts | 143 +++++++++++++++++++++++++++++++--- 4 files changed, 240 insertions(+), 14 deletions(-) diff --git a/src/bootstrap/rpcbind.d.ts b/src/bootstrap/rpcbind.d.ts index 7416fe7..2ddedb2 100644 --- a/src/bootstrap/rpcbind.d.ts +++ b/src/bootstrap/rpcbind.d.ts @@ -16,6 +16,30 @@ export function apply_txs( }>, ): number[][]; +export function apply_txs_final( + root: Uint8Array | number[], + txs: Array<{ + writes: Array<{ index: string; data: Uint8Array | number[] }>; + updateRecords: Array<{ hash: Uint8Array | number[]; data: string[] }>; + }>, +): number[]; + +export function apply_txs_async( + root: Uint8Array | number[], + txs: Array<{ + writes: Array<{ index: string; data: Uint8Array | number[] }>; + updateRecords: Array<{ hash: Uint8Array | number[]; data: string[] }>; + }>, +): Promise; + +export function apply_txs_final_async( + root: Uint8Array | number[], + txs: Array<{ + writes: Array<{ index: string; data: Uint8Array | number[] }>; + updateRecords: Array<{ hash: Uint8Array | number[]; data: string[] }>; + }>, +): Promise; + export function begin_session(): string; export function drop_session(session: string): boolean; export function reset_session(session: string): boolean; diff --git a/src/bootstrap/rpcbind.js b/src/bootstrap/rpcbind.js index 69ad4d3..a9cb2be 100644 --- a/src/bootstrap/rpcbind.js +++ b/src/bootstrap/rpcbind.js @@ -47,6 +47,21 @@ function requestMerkle(requestData) { return requestMerkleData(requestData); } +async function requestMerkleAsync(requestData) { + if (MERKLE_RPC_MODE === 'mock') { + return JSON.parse(requestMerkle(requestData)); + } + const resp = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(requestData), + }); + if (!resp.ok) { + throw new Error(`merkle rpc failed: ${resp.status}`); + } + return await resp.json(); +} + function getMerkleTrace() { const trace = globalThis.__MERKLE_TRACE; if (!trace || typeof trace !== 'object') return null; @@ -270,6 +285,67 @@ export function apply_txs(root, txs) { return r; } +function async_apply_txs_final(root, txs) { + let roothash = hash2array(root); + const requestData = { + jsonrpc: '2.0', + method: 'apply_txs_final', + params: withSession({ root: roothash, txs }), + id: 10, + }; + let responseStr = requestMerkle(requestData); + const response = JSON.parse(responseStr); + if (response.error==undefined) { + return response.result; + } else { + console.error('Failed to apply_txs_final:', response.error); + throw("Failed to apply_txs_final"); + } +} + +export function apply_txs_final(root, txs) { + const start = performance.now(); + let r = async_apply_txs_final(root, txs); + const end = performance.now(); + let lag = end - start; + //console.log("bench-log: apply_txs_final", lag); + return r; +} + +export async function apply_txs_async(root, txs) { + let roothash = hash2array(root); + const requestData = { + jsonrpc: '2.0', + method: 'apply_txs', + params: withSession({ root: roothash, txs }), + id: 11, + }; + const response = await requestMerkleAsync(requestData); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to apply_txs_async:', response.error); + throw new Error('Failed to apply_txs_async'); + } +} + +export async function apply_txs_final_async(root, txs) { + let roothash = hash2array(root); + const requestData = { + jsonrpc: '2.0', + method: 'apply_txs_final', + params: withSession({ root: roothash, txs }), + id: 12, + }; + const response = await requestMerkleAsync(requestData); + if (response.error == undefined) { + return response.result; + } else { + console.error('Failed to apply_txs_final_async:', response.error); + throw new Error('Failed to apply_txs_final_async'); + } +} + function async_get_record(hash) { let hasharray = hash2array(hash); const requestData = { diff --git a/src/preexec/preexec_worker.ts b/src/preexec/preexec_worker.ts index 35732d2..34d5794 100644 --- a/src/preexec/preexec_worker.ts +++ b/src/preexec/preexec_worker.ts @@ -113,11 +113,18 @@ if (!parentPort) { throw new Error("preexec_worker must run as a Worker"); } +const PREEXEC_REUSE_INITIALIZE = process.env.PREEXEC_REUSE_INITIALIZE === "1"; +let lastRootKey: string | null = null; + parentPort.on("message", (msg: PreexecRequest) => { void (async () => { try { - const root = new BigUint64Array(msg.root); - application.initialize(root); + const rootKey = msg.root.join(","); + if (!PREEXEC_REUSE_INITIALIZE || lastRootKey !== rootKey) { + const root = new BigUint64Array(msg.root); + application.initialize(root); + lastRootKey = rootKey; + } const u64array = signature_to_u64array(msg.signature); const timingMs: PreexecOk["timingMs"] = { handleTx: 0 }; diff --git a/src/service.ts b/src/service.ts index 2fea82e..186b36d 100644 --- a/src/service.ts +++ b/src/service.ts @@ -1,6 +1,14 @@ //import initHostBind, * as hostbind from "./wasmbind/hostbind.js"; import initBootstrap, * as bootstrap from "./bootstrap/bootstrap.js"; -import { apply_txs, begin_session, commit_session, drop_session } from "./bootstrap/rpcbind.js"; +import { + apply_txs, + apply_txs_async, + apply_txs_final, + apply_txs_final_async, + begin_session, + commit_session, + drop_session, +} from "./bootstrap/rpcbind.js"; import initApplication, * as application from "./application/application.js"; import { test_merkle_db_service } from "./test.js"; import { LeHexBN, sign, PlayerConvention, ZKWasmAppRpc, createCommand } from "zkwasm-minirollup-rpc"; @@ -32,6 +40,7 @@ const LOG_TX = process.env.LOG_TX === '1'; const LOG_BUNDLE = process.env.LOG_BUNDLE === '1'; const LOG_QUEUE_STATS = process.env.LOG_QUEUE_STATS === '1'; const LOG_AUTOJOB = process.env.LOG_AUTOJOB === '1'; +const LOG_OPTIMISTIC = process.env.LOG_OPTIMISTIC === '1'; const DISABLE_AUTOTICK = process.env.DISABLE_AUTOTICK === '1'; const AUTOJOB_FATAL = process.env.AUTOJOB_FATAL === '1'; const DISABLE_SNAPSHOT = process.env.DISABLE_SNAPSHOT === '1'; @@ -71,6 +80,18 @@ const OPTIMISTIC_BUNDLE_SIZE = (() => { if (Number.isFinite(raw) && raw > 0) return raw; return 100; })(); +const OPTIMISTIC_SERIAL_THRESHOLD = (() => { + const raw = Number.parseFloat(process.env.OPTIMISTIC_SERIAL_THRESHOLD ?? "0.25"); + if (!Number.isFinite(raw)) return 0.25; + if (raw < 0) return 0; + if (raw > 1) return 1; + return raw; +})(); +const OPTIMISTIC_SERIAL_COOLDOWN_MS = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_SERIAL_COOLDOWN_MS ?? "1000", 10); + if (Number.isFinite(raw) && raw >= 0) return raw; + return 1000; +})(); type PreexecTrace = { reads: string[]; @@ -184,6 +205,17 @@ function withMerkleSessionDisabled(fn: () => T): T { } } +async function withMerkleSessionDisabledAsync(fn: () => Promise): Promise { + const g = globalThis as any; + const prev = g.__MERKLE_SESSION; + g.__MERKLE_SESSION = null; + try { + return await fn(); + } finally { + g.__MERKLE_SESSION = prev; + } +} + class PreexecPool { private workers: NodeWorker[]; private idle: NodeWorker[]; @@ -251,6 +283,7 @@ class OptimisticSequencer { private nextId = 1; private rootBytes: number[]; private bundleTxCount = 0; + private serialUntilMs = 0; constructor(private service: Service) { this.pool = new PreexecPool(new URL("./preexec/preexec_worker.js", import.meta.url), OPTIMISTIC_WORKERS); @@ -282,6 +315,20 @@ class OptimisticSequencer { this.flushing = true; try { while (this.pending.length > 0) { + if (OPTIMISTIC_SERIAL_COOLDOWN_MS > 0 && Date.now() < this.serialUntilMs) { + const batch = this.pending.splice(0, OPTIMISTIC_BATCH); + for (const p of batch) { + try { + const result = await this.applySerial(p); + p.resolve(result); + } catch (e) { + p.reject(e); + } + } + if (batch.length === 0) break; + continue; + } + const batch = this.pending.splice(0, OPTIMISTIC_BATCH); const snapshotRoot = bytes32ToRootU64(this.rootBytes); const rootArr = Array.from(snapshotRoot); @@ -336,9 +383,6 @@ class OptimisticSequencer { for (const w of writes) unionWrites.add(w); } - // Put deferred jobs back to the head to preserve fairness. - this.pending = deferred.concat(this.pending); - let cursor = 0; while (cursor < chosen.length) { const remainingInBundle = OPTIMISTIC_BUNDLE_SIZE - this.bundleTxCount; @@ -346,15 +390,27 @@ class OptimisticSequencer { const segment = chosen.slice(cursor, cursor + take); cursor += take; - let roots: number[][]; + let roots: number[][] = []; + let finalRootBytes: number[] | null = null; try { const txs = segment.map((item) => ({ writes: item.resp.trace.writes ?? [], updateRecords: item.resp.trace.updateRecords ?? [], })); - roots = withMerkleSessionDisabled(() => apply_txs(this.rootBytes, txs)); - if (!Array.isArray(roots) || roots.length !== segment.length) { - throw new Error(`apply_txs returned ${Array.isArray(roots) ? roots.length : typeof roots} roots, expected ${segment.length}`); + if (LIGHT_JOB_RESULT) { + finalRootBytes = await withMerkleSessionDisabledAsync(() => apply_txs_final_async(this.rootBytes, txs)); + if (!Array.isArray(finalRootBytes) || finalRootBytes.length !== 32) { + throw new Error( + `apply_txs_final returned ${Array.isArray(finalRootBytes) ? finalRootBytes.length : typeof finalRootBytes} bytes, expected 32`, + ); + } + } else { + roots = await withMerkleSessionDisabledAsync(() => apply_txs_async(this.rootBytes, txs)); + if (!Array.isArray(roots) || roots.length !== segment.length) { + throw new Error( + `apply_txs returned ${Array.isArray(roots) ? roots.length : typeof roots} roots, expected ${segment.length}`, + ); + } } } catch (e) { for (const item of segment) { @@ -363,14 +419,21 @@ class OptimisticSequencer { break; } + if (finalRootBytes) { + this.rootBytes = finalRootBytes; + this.service.optimisticHeadRoot = bytes32ToRootU64(this.rootBytes); + } + for (let i = 0; i < segment.length; i++) { const item = segment[i]!; const signature = (item.p.job.data as any).value as TxWitness; const jobId = item.p.job.id; try { const events = new BigUint64Array(item.resp.result); - this.rootBytes = roots[i]!; - this.service.optimisticHeadRoot = bytes32ToRootU64(this.rootBytes); + if (!finalRootBytes) { + this.rootBytes = roots[i]!; + this.service.optimisticHeadRoot = bytes32ToRootU64(this.rootBytes); + } await this.service.optimisticInstall(signature, jobId, events, item.isReplay); item.p.resolve(await this.buildJobResult(item.p.job, signature)); @@ -385,8 +448,33 @@ class OptimisticSequencer { } } - if (chosen.length === 0) { - // All jobs in this batch were either failed or deferred; yield to allow more enqueues. + for (const p of deferred) { + try { + const result = await this.applySerial(p); + p.resolve(result); + } catch (e) { + p.reject(e); + } + } + + const considered = chosen.length + deferred.length; + if (considered > 0) { + const ratio = chosen.length / considered; + if (deferred.length > 0 && ratio < OPTIMISTIC_SERIAL_THRESHOLD) { + this.serialUntilMs = Date.now() + OPTIMISTIC_SERIAL_COOLDOWN_MS; + if (LOG_OPTIMISTIC) { + console.log("optimistic apply: high conflict, switching to serial fallback", { + ratio: ratio.toFixed(3), + chosen: chosen.length, + deferred: deferred.length, + cooldownMs: OPTIMISTIC_SERIAL_COOLDOWN_MS, + }); + } + } + } + + if (considered === 0) { + // All jobs in this batch failed during preexec; yield to allow more enqueues. break; } } @@ -453,6 +541,37 @@ class OptimisticSequencer { bundle: this.service.txManager.currentUncommitMerkleRoot, }; } + + private async applySerial(p: PendingJob): Promise { + const signature = (p.job.data as any).value as TxWitness; + const u64array = signature_to_u64array(signature); + + application.initialize(bytes32ToRootU64(this.rootBytes)); + + if (!(p.job.name === "transaction" && (p.job.data as any).verified === true)) { + application.verify_tx_signature(u64array); + } + + const txResult = withMerkleSessionDisabled(() => application.handle_tx(u64array)); + const errorCode = txResult[0]; + if (errorCode !== 0n) { + throw new Error(application.decode_error(Number(errorCode))); + } + + const postRoot = application.query_root(); + this.rootBytes = rootU64ToBytes(postRoot); + this.service.optimisticHeadRoot = postRoot; + + await this.service.optimisticInstall(signature, p.job.id, txResult, p.job.name === "replay"); + + this.bundleTxCount += 1; + if (this.bundleTxCount >= OPTIMISTIC_BUNDLE_SIZE) { + await this.flushBundle(this.rootBytes); + this.bundleTxCount = 0; + } + + return await this.buildJobResult(p.job, signature); + } } function shardForPkx(pkx: string, shardCount: number): number { From 5ca414e15ba63dbd1fdc69e4b80a171ff0010f09 Mon Sep 17 00:00:00 2001 From: "codex gpt5.2codex" <1900017707@pku.edu.cn> Date: Fri, 26 Dec 2025 17:36:59 +0000 Subject: [PATCH 3/3] optimistic apply: increase queue worker concurrency --- src/service.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/service.ts b/src/service.ts index 186b36d..baddb0f 100644 --- a/src/service.ts +++ b/src/service.ts @@ -92,6 +92,15 @@ const OPTIMISTIC_SERIAL_COOLDOWN_MS = (() => { if (Number.isFinite(raw) && raw >= 0) return raw; return 1000; })(); +const OPTIMISTIC_QUEUE_CONCURRENCY = (() => { + const raw = Number.parseInt(process.env.OPTIMISTIC_QUEUE_CONCURRENCY ?? "", 10); + if (Number.isFinite(raw) && raw > 0) return raw; + if (OPTIMISTIC_APPLY) { + // Need >1 so multiple jobs can enqueue into OptimisticSequencer and form batches. + return Math.max(OPTIMISTIC_BATCH, OPTIMISTIC_WORKERS * 4); + } + return 1; +})(); type PreexecTrace = { reads: string[]; @@ -1422,7 +1431,10 @@ export class Service { if (LOG_TX) { console.log(`[${getTimestamp()}] ${job.name} completed in ${jobEndTime - jobStartTime}ms`); } - }, {connection}); + }, { + connection, + concurrency: OPTIMISTIC_QUEUE_CONCURRENCY, + }); } async serve() {