From 44be98436a43b711a3739d16cd086f16a9942b93 Mon Sep 17 00:00:00 2001 From: bhartnett Date: Fri, 10 Oct 2025 14:49:17 +0800 Subject: [PATCH 1/4] Implement snapshots queue in aristo database. --- execution_chain/db/aristo/aristo_desc.nim | 29 +++----- .../db/aristo/aristo_init/init_common.nim | 1 + execution_chain/db/aristo/aristo_tx_frame.nim | 69 +++++++++++++------ tests/test_aristo/test_tx_frame.nim | 5 +- 4 files changed, 61 insertions(+), 43 deletions(-) diff --git a/execution_chain/db/aristo/aristo_desc.nim b/execution_chain/db/aristo/aristo_desc.nim index d74338d9e3..c1d1926a53 100644 --- a/execution_chain/db/aristo/aristo_desc.nim +++ b/execution_chain/db/aristo/aristo_desc.nim @@ -22,7 +22,7 @@ {.push raises: [].} import - std/[hashes, sequtils, sets, tables], + std/[hashes, sequtils, sets, tables, heapqueue], eth/common/hashes, eth/trie/nibbles, results, ./aristo_constants, @@ -34,7 +34,7 @@ import # Not auto-exporting backend export tables, aristo_constants, desc_error, desc_identifiers, nibbles, - desc_structural, minilru, hashes, PutHdlRef + desc_structural, minilru, hashes, heapqueue, PutHdlRef type AristoTxRef* = ref object @@ -124,6 +124,12 @@ type ## MPT level where "most" leaves can be found, for static vid lookups lookups*: tuple[lower, hits, higher: int] + snapshots*: HeapQueue[AristoTxRef] + ## TODO + + maxSnapshots*: int + ## TODO + Leg* = object ## For constructing a `VertexPath` wp*: VidVtxPair ## Vertex ID and data ref @@ -139,9 +145,7 @@ const dbLevel* = -1 disposedLevel* = int.low -# ------------------------------------------------------------------------------ -# Public helpers -# ------------------------------------------------------------------------------ +proc `<`*(a, b: AristoTxRef): bool = a.level < b.level template mixUp*(accPath, stoPath: Hash32): Hash32 = # Insecure but fast way of mixing the values of two hashes, for the purpose @@ -170,8 +174,6 @@ func getOrVoid*[W](tab: Table[W,RootedVertexID]; w: W): RootedVertexID = func getOrVoid*[W](tab: Table[W,HashSet[RootedVertexID]]; w: W): HashSet[RootedVertexID] = tab.getOrDefault(w, default(HashSet[RootedVertexID])) -# -------- - func isValid*(vtx: VertexRef): bool = not isNil(vtx) @@ -197,20 +199,12 @@ func isValid*(rvid: RootedVertexID): bool = func isValid*(sqv: HashSet[RootedVertexID]): bool = sqv.len > 0 -# ------------------------------------------------------------------------------ -# Public functions, miscellaneous -# ------------------------------------------------------------------------------ - func hash*(db: AristoDbRef): Hash {.error.} func hash*(db: AristoTxRef): Hash {.error.} proc baseTxFrame*(db: AristoDbRef): AristoTxRef = db.txRef -# ------------------------------------------------------------------------------ -# Public helpers -# ------------------------------------------------------------------------------ - iterator rstack*(tx: AristoTxRef, stopAtSnapshot = false): AristoTxRef = # Stack in reverse order, ie going from tx to base var tx = tx @@ -254,8 +248,3 @@ func getStaticLevel*(db: AristoDbRef): int = db.staticLevel = 1 db.staticLevel - - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/execution_chain/db/aristo/aristo_init/init_common.nim b/execution_chain/db/aristo/aristo_init/init_common.nim index 73b0628d90..b943cbf0ab 100644 --- a/execution_chain/db/aristo/aristo_init/init_common.nim +++ b/execution_chain/db/aristo/aristo_init/init_common.nim @@ -86,6 +86,7 @@ proc initInstance*(db: AristoDbRef): Result[void, AristoError] = db.txRef = AristoTxRef(db: db, vTop: vTop, snapshot: Snapshot(level: Opt.some(0))) db.accLeaves = LruCache[Hash32, AccLeafRef].init(ACC_LRU_SIZE) db.stoLeaves = LruCache[Hash32, StoLeafRef].init(ACC_LRU_SIZE) + db.maxSnapshots = 10 ok() proc finish*(db: AristoDbRef; eradicate = false) = diff --git a/execution_chain/db/aristo/aristo_tx_frame.nim b/execution_chain/db/aristo/aristo_tx_frame.nim index 5f8f7dfecb..c025366888 100644 --- a/execution_chain/db/aristo/aristo_tx_frame.nim +++ b/execution_chain/db/aristo/aristo_tx_frame.nim @@ -25,6 +25,44 @@ template isDisposed(txFrame: AristoTxRef): bool = proc isKeyframe(txFrame: AristoTxRef): bool = txFrame == txFrame.db.txRef +proc clearSnapshot*(txFrame: AristoTxRef) {.raises: [], gcsafe.} + +proc removeSnapshotFrame(db: AristoDbRef, txFrame: AristoTxRef) = + let index = db.snapshots.find(txFrame) + if index != -1: + db.snapshots.del(index) + +proc addSnapshotFrame(db: AristoDbRef, txFrame: AristoTxRef) = + doAssert txFrame.snapshot.level.isSome() + + if db.snapshots.len() == db.maxSnapshots: + let frame = db.snapshots.pop() + frame.clearSnapshot() + + db.snapshots.push(txFrame) + +proc cleanSnapshots(db: AristoDbRef) = + let minLevel = db.txRef.level + + # Some of the snapshot content may have already been persisted to disk + # (since they were made based on the in-memory frames at the time of creation). + # Annoyingly, there's no way to remove items while iterating but even + # with the extra seq, move + remove turns out to be faster than + # creating a new table - specially when the ratio between old and + # and current items favors current items. + template delIfIt(tbl: var Table, body: untyped) = + var toRemove = newSeqOfCap[typeof(tbl).A](tbl.len div 2) + for k, it {.inject.} in tbl: + if body: + toRemove.add k + for k in toRemove: + tbl.del(k) + + for frame in db.snapshots: + frame.snapshot.vtx.delIfIt(it[2] < minLevel) + frame.snapshot.acc.delIfIt(it[1] < minLevel) + frame.snapshot.sto.delIfIt(it[1] < minLevel) + proc buildSnapshot(txFrame: AristoTxRef, minLevel: int) = # Starting from the previous snapshot, build a snapshot that includes all # ancestor changes as well as the changes in txFrame itself @@ -41,30 +79,11 @@ proc buildSnapshot(txFrame: AristoTxRef, minLevel: int) = if frame.snapshot.level.isSome() and not isKeyframe: # `frame` has a snapshot only in the first iteration of the for loop txFrame.snapshot = move(frame.snapshot) + txFrame.db.removeSnapshotFrame(frame) # Verify that https://github.com/nim-lang/Nim/issues/23759 is not present assert frame.snapshot.vtx.len == 0 and frame.snapshot.level.isNone() - if txFrame.snapshot.level != Opt.some(minLevel): - # When recycling an existing snapshot, some of its content may have - # already been persisted to disk (since it was made base on the - # in-memory frames at the time of its creation). - # Annoyingly, there's no way to remove items while iterating but even - # with the extra seq, move + remove turns out to be faster than - # creating a new table - specially when the ratio between old and - # and current items favors current items. - template delIfIt(tbl: var Table, body: untyped) = - var toRemove = newSeqOfCap[typeof(tbl).A](tbl.len div 2) - for k, it {.inject.} in tbl: - if body: - toRemove.add k - for k in toRemove: - tbl.del(k) - - txFrame.snapshot.vtx.delIfIt(it[2] < minLevel) - txFrame.snapshot.acc.delIfIt(it[1] < minLevel) - txFrame.snapshot.sto.delIfIt(it[1] < minLevel) - if frame.snapshot.level.isSome() and isKeyframe: txFrame.snapshot.vtx = initTable[RootedVertexID, VtxSnapshot]( max(1024, max(frame.sTab.len, frame.snapshot.vtx.len)) @@ -96,6 +115,9 @@ proc buildSnapshot(txFrame: AristoTxRef, minLevel: int) = txFrame.snapshot.level = Opt.some(minLevel) + if not txFrame.isKeyframe(): + txFrame.db.addSnapshotFrame(txFrame) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -115,6 +137,8 @@ proc txFrameBegin*( level: parent.level + 1) proc dispose*(txFrame: AristoTxRef) = + if not txFrame.db.isNil(): + txFrame.db.removeSnapshotFrame(txFrame) txFrame[].reset() txFrame.level = disposedLevel @@ -246,6 +270,11 @@ proc persist*(db: AristoDbRef, batch: PutHdlRef, txFrame: AristoTxRef) = else: discard db.stoLeaves.update(mixPath, vtx) + # Remove snapshot data that has been persisted to disk to save memory. + # All snapshot records with a level lower than the current base level + # are deleted. + db.cleanSnapshots() + txFrame.snapshot.vtx.clear() txFrame.snapshot.acc.clear() txFrame.snapshot.sto.clear() diff --git a/tests/test_aristo/test_tx_frame.nim b/tests/test_aristo/test_tx_frame.nim index d0affc3ff2..d9253ba72c 100644 --- a/tests/test_aristo/test_tx_frame.nim +++ b/tests/test_aristo/test_tx_frame.nim @@ -315,8 +315,7 @@ suite "Aristo TxFrame": # Verify that getting the state root of the level 3 txFrame does not impact # the persisted state in the database. let stateRootBefore = tx3.fetchStateRoot().get() - expect(Defect): - discard tx4.fetchStateRoot() + discard tx4.fetchStateRoot() let stateRootAfter = tx3.fetchStateRoot().get() check stateRootBefore == stateRootAfter @@ -346,7 +345,7 @@ suite "Aristo TxFrame": for i in 1..<10: let acc = makeAccount(i.uint64) check tx1.mergeAccountRecord(acc[0], acc[1]).isOk() - + tx1.checkpoint(1, skipSnapshot = false) let From fe85e3627c2280cedf474d0317e59a110fb3ac9f Mon Sep 17 00:00:00 2001 From: bhartnett Date: Fri, 10 Oct 2025 15:12:55 +0800 Subject: [PATCH 2/4] Update comments. --- execution_chain/db/aristo/aristo_desc.nim | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/execution_chain/db/aristo/aristo_desc.nim b/execution_chain/db/aristo/aristo_desc.nim index c1d1926a53..b7e47b9296 100644 --- a/execution_chain/db/aristo/aristo_desc.nim +++ b/execution_chain/db/aristo/aristo_desc.nim @@ -125,10 +125,15 @@ type lookups*: tuple[lower, hits, higher: int] snapshots*: HeapQueue[AristoTxRef] - ## TODO + ## A priority queue of txFrames holding snapshots. Used to limit the number + ## of snapshots that can be taken and therefore limit memory usage and also + ## to support cleaning old values out of snapshots after persisting to the + ## database. txFrames in the queue are sorted by their level in ascending order. maxSnapshots*: int - ## TODO + ## The maximum number of snapshots to hold in the snapshots queue. When the queue + ## is full (queue.len == maxSnapshots) then the oldest snapshot is removed from + ## the queue and cleaned up. Leg* = object ## For constructing a `VertexPath` From a4e9a7ffebd0e15da80f9b4479a7efea5eedcb53 Mon Sep 17 00:00:00 2001 From: bhartnett Date: Thu, 23 Oct 2025 12:59:13 +0800 Subject: [PATCH 3/4] Prevent storing duplicate txFrames in cache. --- execution_chain/db/aristo/aristo_tx_frame.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/execution_chain/db/aristo/aristo_tx_frame.nim b/execution_chain/db/aristo/aristo_tx_frame.nim index c025366888..d7d14cc5cd 100644 --- a/execution_chain/db/aristo/aristo_tx_frame.nim +++ b/execution_chain/db/aristo/aristo_tx_frame.nim @@ -34,6 +34,9 @@ proc removeSnapshotFrame(db: AristoDbRef, txFrame: AristoTxRef) = proc addSnapshotFrame(db: AristoDbRef, txFrame: AristoTxRef) = doAssert txFrame.snapshot.level.isSome() + if db.snapshots.contains(txFrame): + # No-op if the queue already contains the snapshot + return if db.snapshots.len() == db.maxSnapshots: let frame = db.snapshots.pop() From 05c930615da1baf4bfe9ce6e84bf684372fa4d24 Mon Sep 17 00:00:00 2001 From: bhartnett Date: Sat, 1 Nov 2025 00:39:10 +0800 Subject: [PATCH 4/4] Create snapshots after processing blocks. --- execution_chain/core/chain/forked_chain.nim | 13 ++++++------- .../core/chain/forked_chain/chain_serialize.nim | 13 +++++-------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/execution_chain/core/chain/forked_chain.nim b/execution_chain/core/chain/forked_chain.nim index 379d2c44d4..57eedb85fa 100644 --- a/execution_chain/core/chain/forked_chain.nim +++ b/execution_chain/core/chain/forked_chain.nim @@ -506,13 +506,7 @@ proc validateBlock(c: ForkedChainRef, parentTxFrame=cast[uint](parentFrame), txFrame=cast[uint](txFrame) - # Checkpoint creates a snapshot of ancestor changes in txFrame - it is an - # expensive operation, specially when creating a new branch (ie when blk - # is being applied to a block that is currently not a head). - # Create the snapshot before processing the block so that any vertexes in snapshots - # from lower levels than the baseTxFrame are removed from the snapshot before running - # the stateroot computation. - parentFrame.checkpoint(parent.blk.header.number, skipSnapshot = false) + var receipts = c.processBlock(parent, txFrame, blk, blkHash, finalized).valueOr: txFrame.dispose() @@ -520,6 +514,11 @@ proc validateBlock(c: ForkedChainRef, c.writeBaggage(blk, blkHash, txFrame, receipts) + # Checkpoint creates a snapshot of ancestor changes in txFrame - it is an + # expensive operation, specially when creating a new branch (ie when blk + # is being applied to a block that is currently not a head). + parentFrame.checkpoint(blk.header.number, skipSnapshot = false) + let newBlock = c.appendBlock(parent, blk, blkHash, txFrame, move(receipts)) for i, tx in blk.transactions: diff --git a/execution_chain/core/chain/forked_chain/chain_serialize.nim b/execution_chain/core/chain/forked_chain/chain_serialize.nim index e9f7fc9c76..1254ea0f81 100644 --- a/execution_chain/core/chain/forked_chain/chain_serialize.nim +++ b/execution_chain/core/chain/forked_chain/chain_serialize.nim @@ -126,14 +126,6 @@ proc replayBlock(fc: ForkedChainRef; parentFrame = parent.txFrame txFrame = parentFrame.txFrameBegin() - # Checkpoint creates a snapshot of ancestor changes in txFrame - it is an - # expensive operation, specially when creating a new branch (ie when blk - # is being applied to a block that is currently not a head). - # Create the snapshot before processing the block so that any vertexes in snapshots - # from lower levels than the baseTxFrame are removed from the snapshot before running - # the stateroot computation. - parentFrame.checkpoint(parent.blk.header.number, skipSnapshot = false) - # Set finalized to true in order to skip the stateroot check when replaying the # block because the blocks should have already been checked previously during # the initial block execution. @@ -143,6 +135,11 @@ proc replayBlock(fc: ForkedChainRef; fc.writeBaggage(blk.blk, blk.hash, txFrame, receipts) + # Checkpoint creates a snapshot of ancestor changes in txFrame - it is an + # expensive operation, specially when creating a new branch (ie when blk + # is being applied to a block that is currently not a head). + parentFrame.checkpoint(blk.header.number, skipSnapshot = false) + blk.txFrame = txFrame blk.receipts = move(receipts)