From e3e7968d87d7f60b39e944055fdbe196087c83a4 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Wed, 2 Jul 2025 18:26:59 +0530 Subject: [PATCH 1/7] add mutlithreading support for codex tree --- codex/erasure/erasure.nim | 8 ++- codex/merkletree/codex/codex.nim | 75 ++++++++++++++++++++++-- codex/merkletree/merkletree.nim | 25 +++++++- codex/node.nim | 2 +- tests/codex/merkletree/testasynctree.nim | 51 ++++++++++++++++ 5 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 tests/codex/merkletree/testasynctree.nim diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500f9..d378b3c38e 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -428,7 +428,7 @@ proc encodeData( return failure("Unable to store block!") idx.inc(params.steps) - without tree =? CodexTree.init(cids[]), err: + without tree =? (await CodexTree.init(self.taskPool, cids[])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -649,7 +649,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -680,7 +681,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = without (cids, _) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index 0eec92e4c2..b35de0b24a 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -10,12 +10,14 @@ {.push raises: [].} import std/bitops -import std/sequtils +import std/[atomics, sequtils] import pkg/questionable import pkg/questionable/results import pkg/libp2p/[cid, multicodec, multihash] import pkg/constantine/hashes +import pkg/taskpools +import pkg/chronos/threadsync import ../../utils import ../../rng import ../../errors @@ -48,7 +50,7 @@ type mcodec*: MultiCodec # CodeHashes is not exported from libp2p -# So we need to recreate it instead of +# So we need to recreate it instead of proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} = for item in HashesList: result[item.mcodec] = item @@ -160,7 +162,60 @@ func init*( self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self -func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = +proc initCodexTreeAsync*( + tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let + mhash = ?mcodec.mhash() + compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} = + compress(x, y, key, mhash) + Zero: ByteHash = newSeq[byte](mhash.size) + + if mhash.size != leaves[0].len: + return failure "Invalid hash length" + + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) + + var task = MerkleTask[ByteHash, ByteTreeKey]( + tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree), + leaves: @leaves, + signal: threadPtr, + ) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn asyncMerkleTreeWorker(addr task) + + let threadFut = threadPtr.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + defer: + task.layers = default(Isolated[seq[seq[ByteHash]]]) + + tree.layers = task.layers.extract + + success tree + +proc init*( + _: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash] +): Future[?!CodexTree] = if leaves.len == 0: return failure "Empty leaves" @@ -168,7 +223,19 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = mcodec = leaves[0].mcodec leaves = leaves.mapIt(it.digestBytes) - CodexTree.init(mcodec, leaves) + return initCodexTreeAsync(tp, mcodec, leaves) + +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async.} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + ?catch(await initCodexTreeAsync(tp, mcodec, leaves)) func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index f1905becbd..1b420863db 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -9,9 +9,11 @@ {.push raises: [].} -import std/bitops +import std/[bitops, atomics] import pkg/questionable/results +import pkg/taskpools +import pkg/chronos/threadsync import ../errors @@ -30,6 +32,13 @@ type compress*: CompressFn[H, K] # compress function zero*: H # zero value + MerkleTask*[H, K] = object + tree*: ptr MerkleTree[H, K] + leaves*: seq[H] + signal*: ThreadSignalPtr + layers*: Isolated[seq[seq[H]]] + success*: Atomic[bool] + func depth*[H, K](self: MerkleTree[H, K]): int = return self.layers.len - 1 @@ -151,3 +160,17 @@ func merkleTreeWorker*[H, K]( ys[halfn] = ?self.compress(xs[n], self.zero, key = key) success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) + +proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + let res = merkleTreeWorker(task[].tree[], task[].leaves, isBottomLayer = true) + + if res.isErr: + task[].success.store(false) + return + + var isolatedLayers = isolate(res.get()) + task[].layers = isolatedLayers + task[].success.store(true) diff --git a/codex/node.nim b/codex/node.nim index e010b08540..e7a3b5558c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -439,7 +439,7 @@ proc store*( finally: await stream.close() - without tree =? CodexTree.init(cids), err: + without tree =? (await CodexTree.init(self.taskpool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: diff --git a/tests/codex/merkletree/testasynctree.nim b/tests/codex/merkletree/testasynctree.nim new file mode 100644 index 0000000000..604663dfdf --- /dev/null +++ b/tests/codex/merkletree/testasynctree.nim @@ -0,0 +1,51 @@ +import std/sequtils + +import pkg/questionable/results +import pkg/stew/byteutils +import pkg/libp2p +import pkg/taskpools +import pkg/chronos + +import pkg/asynctest/chronos/unittest2 + +export unittest2 + +import pkg/codex/codextypes +import pkg/codex/merkletree +import pkg/codex/utils/digest + +import ./helpers + +# TODO: Generalize to other hashes + +const + data = [ + "00000000000000000000000000000001".toBytes, + "00000000000000000000000000000002".toBytes, + "00000000000000000000000000000003".toBytes, + "00000000000000000000000000000004".toBytes, + "00000000000000000000000000000005".toBytes, + "00000000000000000000000000000006".toBytes, + "00000000000000000000000000000007".toBytes, + "00000000000000000000000000000008".toBytes, + "00000000000000000000000000000009".toBytes, + "00000000000000000000000000000010".toBytes, + ] + sha256 = Sha256HashCodec + +suite "Test CodexTree": + var taskpool: Taskpool + + setup: + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() + + test "Should build tree from multihash leaves asyncronosly": + var t = await CodexTree.init(taskpool, sha256, leaves = data) + var tree = t.tryGet() + check: + tree.isOk + tree.get().leaves == data + tree.get().mcodec == sha256 From 26d17de4622ff29ae0adc21c0ef32beb92d891d4 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Fri, 4 Jul 2025 19:35:25 +0530 Subject: [PATCH 2/7] threading support for tree building --- codex/merkletree/codex/codex.nim | 54 +++++++++++-------- codex/merkletree/merkletree.nim | 2 +- codex/merkletree/poseidon2.nim | 55 +++++++++++++++++++- tests/codex/merkletree/testasynctree.nim | 51 ------------------ tests/codex/merkletree/testcodextree.nim | 49 +++++++++++++++-- tests/codex/merkletree/testposeidon2tree.nim | 36 +++++++++++-- 6 files changed, 165 insertions(+), 82 deletions(-) delete mode 100644 tests/codex/merkletree/testasynctree.nim diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index b35de0b24a..b0892dda9b 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -43,6 +43,8 @@ type ByteTree* = MerkleTree[ByteHash, ByteTreeKey] ByteProof* = MerkleProof[ByteHash, ByteTreeKey] + CodexTreeTask* = MerkleTask[ByteHash, ByteTreeKey] + CodexTree* = ref object of ByteTree mcodec*: MultiCodec @@ -162,8 +164,11 @@ func init*( self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self -proc initCodexTreeAsync*( - tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash] +proc init*( + _: type CodexTree, + tp: Taskpool, + mcodec: MultiCodec = Sha256HashCodec, + leaves: seq[ByteHash], ): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = if leaves.len == 0: return failure "Empty leaves" @@ -177,26 +182,23 @@ proc initCodexTreeAsync*( if mhash.size != leaves[0].len: return failure "Invalid hash length" - without threadPtr =? ThreadSignalPtr.new(): + without signal =? ThreadSignalPtr.new(): return failure("Unable to create thread signal") defer: - threadPtr.close().expect("closing once works") + signal.close().expect("closing once works") var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) - var task = MerkleTask[ByteHash, ByteTreeKey]( - tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree), - leaves: @leaves, - signal: threadPtr, - ) + var task = + CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal) doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - tp.spawn asyncMerkleTreeWorker(addr task) + tp.spawn merkleTreeWorker(addr task) - let threadFut = threadPtr.wait() + let threadFut = signal.wait() if err =? catch(await threadFut.join()).errorOption: ?catch(await noCancel threadFut) @@ -213,9 +215,7 @@ proc initCodexTreeAsync*( success tree -proc init*( - _: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash] -): Future[?!CodexTree] = +func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -223,19 +223,19 @@ proc init*( mcodec = leaves[0].mcodec leaves = leaves.mapIt(it.digestBytes) - return initCodexTreeAsync(tp, mcodec, leaves) + CodexTree.init(mcodec, leaves) proc init*( - _: type CodexTree, tp: Taskpool, leaves: seq[Cid] -): Future[?!CodexTree] {.async.} = + _: type CodexTree, tp: Taskpool, leaves: seq[MultiHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = if leaves.len == 0: - return failure("Empty leaves") + return failure "Empty leaves" let - mcodec = (?leaves[0].mhash.mapFailure).mcodec - leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + mcodec = leaves[0].mcodec + leaves = leaves.mapIt(it.digestBytes) - ?catch(await initCodexTreeAsync(tp, mcodec, leaves)) + await CodexTree.init(tp, mcodec, leaves) func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: @@ -247,6 +247,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + proc fromNodes*( _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 1b420863db..c973a7e1a8 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -161,7 +161,7 @@ func merkleTreeWorker*[H, K]( success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) -proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = +proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = defer: discard task[].signal.fireSync() diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 56ad1e4d46..0b6fdd77a3 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -9,9 +9,11 @@ {.push raises: [].} -import std/sequtils +import std/[sequtils, atomics] import pkg/poseidon2 +import pkg/taskpools +import pkg/chronos/threadsync import pkg/constantine/math/io/io_fields import pkg/constantine/platforms/abstractions import pkg/questionable/results @@ -44,6 +46,8 @@ type Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum] + Poseidon2TreeTask* = MerkleTask[Poseidon2Hash, PoseidonKeysEnum] + proc `$`*(self: Poseidon2Tree): string = let root = if self.root.isOk: self.root.get.toHex else: "none" "Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount & @@ -77,9 +81,58 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2 self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let compressor = proc( + x, y: Poseidon2Hash, key: PoseidonKeysEnum + ): ?!Poseidon2Hash {.noSideEffect.} = + success compress(x, y, key.toKey) + + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) + var task = Poseidon2TreeTask( + tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal + ) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn merkleTreeWorker(addr task) + + let threadFut = signal.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + defer: + task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) + + tree.layers = task.layers.extract + + success tree + func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it))) +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it))) + proc fromNodes*( _: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int ): ?!Poseidon2Tree = diff --git a/tests/codex/merkletree/testasynctree.nim b/tests/codex/merkletree/testasynctree.nim deleted file mode 100644 index 604663dfdf..0000000000 --- a/tests/codex/merkletree/testasynctree.nim +++ /dev/null @@ -1,51 +0,0 @@ -import std/sequtils - -import pkg/questionable/results -import pkg/stew/byteutils -import pkg/libp2p -import pkg/taskpools -import pkg/chronos - -import pkg/asynctest/chronos/unittest2 - -export unittest2 - -import pkg/codex/codextypes -import pkg/codex/merkletree -import pkg/codex/utils/digest - -import ./helpers - -# TODO: Generalize to other hashes - -const - data = [ - "00000000000000000000000000000001".toBytes, - "00000000000000000000000000000002".toBytes, - "00000000000000000000000000000003".toBytes, - "00000000000000000000000000000004".toBytes, - "00000000000000000000000000000005".toBytes, - "00000000000000000000000000000006".toBytes, - "00000000000000000000000000000007".toBytes, - "00000000000000000000000000000008".toBytes, - "00000000000000000000000000000009".toBytes, - "00000000000000000000000000000010".toBytes, - ] - sha256 = Sha256HashCodec - -suite "Test CodexTree": - var taskpool: Taskpool - - setup: - taskpool = Taskpool.new() - - teardown: - taskpool.shutdown() - - test "Should build tree from multihash leaves asyncronosly": - var t = await CodexTree.init(taskpool, sha256, leaves = data) - var tree = t.tryGet() - check: - tree.isOk - tree.get().leaves == data - tree.get().mcodec == sha256 diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index 29390c168b..e8cc537ab0 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils import pkg/libp2p @@ -9,8 +8,11 @@ import pkg/codex/codextypes import pkg/codex/merkletree import pkg/codex/utils/digest +import pkg/taskpools + import ./helpers import ./generictreetests +import ../../asynctest # TODO: Generalize to other hashes @@ -43,9 +45,22 @@ suite "Test CodexTree": CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr test "Should build tree from multihash leaves": - var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + var + expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + tree = CodexTree.init(leaves = expectedLeaves) + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) + tree.get().mcodec == sha256 + + test "Should build tree from multihash leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() - var tree = CodexTree.init(leaves = expectedLeaves) + let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) @@ -63,6 +78,22 @@ suite "Test CodexTree": tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) tree.get().mcodec == sha256 + test "Should build tree from cid leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt( + Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet + ) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) + + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) + tree.get().mcodec == sha256 + test "Should build from raw digestbytes (should not hash leaves)": let tree = CodexTree.init(sha256, leaves = data).tryGet @@ -70,6 +101,18 @@ suite "Test CodexTree": tree.mcodec == sha256 tree.leaves == data + test "Should build from raw digestbytes (should not hash leaves) asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await CodexTree.init(tp, sha256, leaves = @data)) + + check: + tree.isOk + tree.get().mcodec == sha256 + tree.get().leaves == data + test "Should build from nodes": let tree = CodexTree.init(sha256, leaves = data).tryGet diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index e12751b713..0a3e48ac51 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/io import pkg/questionable/results @@ -9,9 +8,11 @@ import pkg/stew/byteutils import pkg/stew/arrayops import pkg/codex/merkletree +import pkg/taskpools import ./generictreetests import ./helpers +import ../../asynctest const data = [ "0000000000000000000000000000001".toBytes, @@ -36,13 +37,14 @@ suite "Test Poseidon2Tree": check: Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr - test "Init tree from poseidon2 leaves": - let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet + test "Build tree from poseidon2 leaves": + var taskpool = Taskpool.new(numThreads = 2) + let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet() check: tree.leaves == expectedLeaves - test "Init tree from byte leaves": + test "Build tree from byte leaves": let tree = Poseidon2Tree.init( leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) ).tryGet @@ -50,7 +52,7 @@ suite "Test Poseidon2Tree": check: tree.leaves == expectedLeaves - test "Should build from nodes": + test "Build tree from nodes": let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet fromNodes = Poseidon2Tree.fromNodes( @@ -60,6 +62,30 @@ suite "Test Poseidon2Tree": check: tree == fromNodes + test "Build poseidon2 tree from poseidon2 leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() + + check: + tree.leaves == expectedLeaves + + test "Build poseidon2 tree from byte leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = ( + await Poseidon2Tree.init( + tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) + ) + ).tryGet() + + check: + tree.leaves == expectedLeaves + let compressor = proc( x, y: Poseidon2Hash, key: PoseidonKeysEnum From 0c1b6822ccd26576d58a656d75e3d2bde477025f Mon Sep 17 00:00:00 2001 From: munna0908 Date: Mon, 7 Jul 2025 23:44:52 +0530 Subject: [PATCH 3/7] test --- codex/codex.nim | 22 +++---- codex/merkletree/poseidon2.nim | 2 +- codex/node.nim | 21 ++++--- codex/slots/builder/builder.nim | 39 +++++++++++- codex/slots/proofs/prover.nim | 8 ++- codex/utils/poseidon2digest.nim | 62 ++++++++++++++++++- tests/codex/merkletree/testcodextree.nim | 2 + tests/codex/merkletree/testposeidon2tree.nim | 8 ++- tests/codex/node/testcontracts.nim | 10 ++- tests/codex/node/testnode.nim | 10 ++- tests/codex/node/testslotrepair.nim | 15 ++++- .../codex/slots/backends/testcircomcompat.nim | 9 ++- tests/codex/slots/helpers.nim | 6 +- tests/codex/slots/sampler/testsampler.nim | 16 ++++- tests/codex/slots/testprover.nim | 9 ++- tests/codex/slots/testslotbuilder.nim | 60 +++++++++++------- tests/codex/utils/testPoseidon.nim | 40 ++++++++++++ 17 files changed, 269 insertions(+), 70 deletions(-) create mode 100644 tests/codex/utils/testPoseidon.nim diff --git a/codex/codex.nim b/codex/codex.nim index 8135746410..928305c149 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,7 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer - taskpool: Taskpool + taskPool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -194,8 +194,8 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" - if not s.taskpool.isNil: - s.taskpool.shutdown() + if not s.taskPool.isNil: + s.taskPool.shutdown() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey @@ -216,16 +216,16 @@ proc new*( var cache: CacheStore = nil - taskpool: Taskpool + taskPool: Taskpool try: if config.numThreads == ThreadCount(0): - taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + taskPool = Taskpool.new(numThreads = min(countProcessors(), 16)) else: - taskpool = Taskpool.new(numThreads = int(config.numThreads)) - info "Threadpool started", numThreads = taskpool.numThreads + taskPool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskPool.numThreads except CatchableError as exc: - raiseAssert("Failure in taskpool initialization:" & exc.msg) + raiseAssert("Failure in taskPool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -307,7 +307,7 @@ proc new*( if config.prover: let backend = config.initializeBackend().expect("Unable to create prover backend.") - some Prover.new(store, backend, config.numProofSamples) + some Prover.new(store, backend, config.numProofSamples, taskPool) else: none Prover @@ -317,7 +317,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, - taskPool = taskpool, + taskPool = taskPool, ) restServer = RestServerRef @@ -337,5 +337,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, - taskpool: taskpool, + taskPool: taskPool, ) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 0b6fdd77a3..64f9bc01af 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -100,7 +100,7 @@ proc init*( var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) var task = Poseidon2TreeTask( - tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal + tree: cast[ptr Poseidon2Tree](addr tree), leaves: leaves, signal: signal ) doAssert tp.numThreads > 1, diff --git a/codex/node.nim b/codex/node.nim index e7a3b5558c..724f8bf372 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -72,7 +72,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts - taskpool: Taskpool + taskPool: Taskpool trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -294,7 +294,7 @@ proc streamEntireDataset( try: # Spawn an erasure decoding job let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg @@ -439,7 +439,7 @@ proc store*( finally: await stream.close() - without tree =? (await CodexTree.init(self.taskpool, cids)), err: + without tree =? (await CodexTree.init(self.taskPool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: @@ -533,14 +533,15 @@ proc setupRequest( # Erasure code the dataset according to provided parameters let erasure = Erasure.new( - self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" return failure(error) - without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: + without builder =? + Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err: trace "Unable to create slot builder" return failure(err) @@ -644,7 +645,9 @@ proc onStore( return failure(err) without builder =? - Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err: + Poseidon2Builder.new( + self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -679,7 +682,7 @@ proc onStore( trace "start repairing slot", slotIdx try: let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) if err =? (await erasure.repair(manifest)).errorOption: error "Unable to erasure decode repairing manifest", @@ -880,7 +883,7 @@ proc new*( networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, - taskpool: Taskpool, + taskPool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -893,7 +896,7 @@ proc new*( engine: engine, prover: prover, discovery: discovery, - taskPool: taskpool, + taskPool: taskPool, contracts: contracts, trackedFutures: TrackedFutures(), ) diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 5fbb0fe191..34c3ed9aed 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -18,18 +18,20 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results import pkg/constantine/math/io/io_fields +import pkg/taskpools import ../../logutils import ../../utils import ../../stores import ../../manifest import ../../merkletree +import ../../utils/poseidon2digest import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, asynciter, poseidon2digest logScope: topics = "codex slotsbuilder" @@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj emptyBlock: seq[byte] # empty block verifiableTree: ?T # verification tree (dataset tree) emptyDigestTree: T # empty digest tree for empty blocks + taskPool: Taskpool func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} = ## Returns true if the slots are verifiable. @@ -165,6 +168,35 @@ proc buildBlockTree*[T, H]( success (blk.data, tree) +proc getBlockDigest*[T, H]( + self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural +): Future[?!H] {.async: (raises: [CancelledError]).} = + logScope: + blkIdx = blkIdx + slotPos = slotPos + numSlotBlocks = self.manifest.numSlotBlocks + cellSize = self.cellSize + + trace "Building block tree" + + if slotPos > (self.manifest.numSlotBlocks - 1): + # pad blocks are 0 byte blocks + trace "Returning empty digest tree for pad block" + return self.emptyDigestTree.root + + without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err: + error "Failed to get block CID for tree at index", err = err.msg + return failure(err) + + if blk.isEmpty: + return self.emptyDigestTree.root + + without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + error "Failed to create digest for block", err = err.msg + return failure(err) + + return success digest + proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural ): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = @@ -190,8 +222,7 @@ proc getCellHashes*[T, H]( pos = i trace "Getting block CID for tree at index" - without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, - err: + without digest =? (await self.getBlockDigest(blkIdx, i)), err: error "Failed to get block CID for tree at index", err = err.msg return failure(err) @@ -310,6 +341,7 @@ proc new*[T, H]( _: type SlotsBuilder[T, H], store: BlockStore, manifest: Manifest, + taskPool: Taskpool, strategy = LinearStrategy, cellSize = DefaultCellSize, ): ?!SlotsBuilder[T, H] = @@ -383,6 +415,7 @@ proc new*[T, H]( emptyBlock: emptyBlock, numSlotBlocks: numSlotBlocksTotal, emptyDigestTree: emptyDigestTree, + taskPool: taskPool, ) if manifest.verifiable: diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index 1afcd06841..bba39e8c84 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -13,6 +13,7 @@ import pkg/chronicles import pkg/circomcompat import pkg/poseidon2 import pkg/questionable/results +import pkg/taskpools import pkg/libp2p/cid @@ -47,6 +48,7 @@ type backend: AnyBackend store: BlockStore nSamples: int + taskPool: Taskpool proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge @@ -61,7 +63,7 @@ proc prove*( trace "Received proof challenge" - without builder =? AnyBuilder.new(self.store, manifest), err: + without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err: error "Unable to create slots builder", err = err.msg return failure(err) @@ -88,6 +90,6 @@ proc verify*( self.backend.verify(proof, inputs) proc new*( - _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int + _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool ): Prover = - Prover(store: store, backend: backend, nSamples: nSamples) + Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp) diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 6eaf21e98a..7607aee2b3 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -7,13 +7,26 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/[atomics] import pkg/poseidon2 import pkg/questionable/results import pkg/libp2p/multihash import pkg/stew/byteutils +import pkg/taskpools +import pkg/chronos +import pkg/chronos/threadsync import ../merkletree +type DigestTask* = object + signal: ThreadSignalPtr + bytes: seq[byte] + chunkSize: int + success: Atomic[bool] + digest: Isolated[Poseidon2Hash] + +export DigestTask + func spongeDigest*( _: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2 ): ?!Poseidon2Hash = @@ -30,7 +43,7 @@ func spongeDigest*( success Sponge.digest(bytes, rate) -func digestTree*( +proc digestTree*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!Poseidon2Tree = ## Hashes chunks of data with a sponge of rate 2, and combines the @@ -44,6 +57,7 @@ func digestTree*( var index = 0 var leaves: seq[Poseidon2Hash] + while index < bytes.len: let start = index let finish = min(index + chunkSize, bytes.len) @@ -61,6 +75,52 @@ func digest*( (?Poseidon2Tree.digestTree(bytes, chunkSize)).root +proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize) + + if res.isErr: + task[].success.store(false) + return + + task[].digest = isolate(res.get()) + task[].success.store(true) + +proc digest*( + _: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int +): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + + tp.spawn digestWorker(tp, addr task) + + let signalFut = signal.wait() + + if err =? catch(await signalFut.join()).errorOption: + ?catch(await noCancel signalFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("digest task failed") + + defer: + task.digest = default(Isolated[Poseidon2Hash]) + + var digest = task.digest.extract + + success digest + func digestMhash*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!MultiHash = diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index e8cc537ab0..9ab680cebd 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,4 +1,5 @@ import std/sequtils +import std/times import pkg/questionable/results import pkg/stew/byteutils @@ -48,6 +49,7 @@ suite "Test CodexTree": var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) tree = CodexTree.init(leaves = expectedLeaves) + check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index 0a3e48ac51..f574d63775 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -63,17 +63,19 @@ suite "Test Poseidon2Tree": tree == fromNodes test "Build poseidon2 tree from poseidon2 leaves asynchronously": - var tp = Taskpool.new(numThreads = 2) + echo "Build poseidon2 tree from poseidon2 leaves asynchronously" + var tp = Taskpool.new() defer: tp.shutdown() + echo "@@@@@" let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() - check: tree.leaves == expectedLeaves test "Build poseidon2 tree from byte leaves asynchronously": - var tp = Taskpool.new(numThreads = 2) + echo "Build poseidon2 tree from byte leaves asynchronously" + var tp = Taskpool.new() defer: tp.shutdown() diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743e4..fcb91e8f70 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts": verifiable: Manifest verifiableBlock: bt.Block protected: Manifest + taskPool: Taskpool setup: # Setup Host Contracts and dependencies market = MockMarket.new() sales = Sales.new(market, clock, localStore) - + taskPool = Taskpool.new() node.contracts = ( none ClientInteractions, some HostInteractions.new(clock, sales), @@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifestCid = manifestBlock.cid (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() (await localStore.putBlock(verifiableBlock)).tryGet() + teardown: + taskPool.shutdown() + test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 78298ad758..46e6df3afd 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + var taskPool: Taskpool setup: + taskPool = Taskpool.new() await node.start() + teardown: + taskPool.shutdown() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) @@ -174,13 +179,12 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let - erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index d96078d292..3d588a6d73 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -13,6 +13,7 @@ import pkg/codex/contracts import pkg/codex/slots import pkg/codex/manifest import pkg/codex/erasure +import pkg/taskpools import pkg/codex/blocktype as bt import pkg/chronos/transports/stream @@ -51,6 +52,7 @@ asyncchecksuite "Test Node - Slot Repair": ) var manifest: Manifest + taskPool: Taskpool builder: Poseidon2Builder verifiable: Manifest verifiableBlock: bt.Block @@ -100,7 +102,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -118,6 +120,7 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[1].switch.stop() # slot 0 missing now # repair missing slot + (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now @@ -131,16 +134,19 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[4].switch.stop() # slot 0 missing now # repair missing slot from repaired slots + (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 1 missing now # repair missing slot from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() await nodes[6].switch.stop() # slot 2 missing now # repair missing slot from repaired slots + (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() let @@ -179,7 +185,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -198,19 +204,24 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[3].switch.stop() # slot 2 missing now # repair missing slots + (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now await nodes[4].switch.stop() # slot 3 missing now # repair missing slots from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 4 missing now # repair missing slot from repaired slots + (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() let diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index b61d4f188c..637ee36b18 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -3,6 +3,7 @@ import std/options import ../../../asynctest import pkg/chronos +import pkg/taskpools import pkg/poseidon2 import pkg/serde/json @@ -77,6 +78,7 @@ suite "Test Circom Compat Backend": challenge: array[32, byte] builder: Poseidon2Builder sampler: Poseidon2Sampler + taskPool: Taskpool setup: let @@ -85,11 +87,13 @@ suite "Test Circom Compat Backend": store = RepoStore.new(repoDs, metaDs) + taskPool = Taskpool.new() + (manifest, protected, verifiable) = await createVerifiableManifest( - store, numDatasetBlocks, ecK, ecM, blockSize, cellSize + store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool ) - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet circom = CircomCompat.init(r1cs, wasm, zkey) @@ -101,6 +105,7 @@ suite "Test Circom Compat Backend": circom.release() # this comes from the rust FFI await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should verify with correct input": var proof = circom.prove(proofInputs).tryGet diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index fced1f1c40..01159c217d 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -12,6 +12,7 @@ import pkg/codex/chunker import pkg/codex/indexingstrategy import pkg/codex/slots import pkg/codex/rng +import pkg/taskpools import ../helpers @@ -145,6 +146,7 @@ proc createVerifiableManifest*( ecM: int, blockSize: NBytes, cellSize: NBytes, + taskPool: Taskpool, ): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {. async .} = @@ -165,7 +167,9 @@ proc createVerifiableManifest*( totalDatasetSize, ) - builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet + builder = Poseidon2Builder.new( + store, protectedManifest, cellSize = cellSize, taskPool = taskPool + ).tryGet verifiableManifest = (await builder.buildManifest()).tryGet # build the slots and manifest diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 78b245a349..bf7277a37e 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -5,6 +5,7 @@ import ../../../asynctest import pkg/questionable/results +import pkg/taskpools import pkg/codex/stores import pkg/codex/merkletree import pkg/codex/utils/json @@ -26,11 +27,16 @@ suite "Test Sampler - control samples": inputData: string inputJson: JsonNode proofInput: ProofInputs[Poseidon2Hash] + taskpool: Taskpool setup: inputData = readFile("tests/circuits/fixtures/input.json") inputJson = !JsonNode.parse(inputData) proofInput = Poseidon2Hash.jsonToProofInput(inputJson) + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() test "Should verify control samples": let @@ -87,25 +93,29 @@ suite "Test Sampler": manifest: Manifest protected: Manifest verifiable: Manifest + taskpool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() + taskpool = Taskpool.new() + store = RepoStore.new(repoDs, metaDs) (manifest, protected, verifiable) = await createVerifiableManifest( - store, datasetBlocks, ecK, ecM, blockSize, cellSize + store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool ) # create sampler - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet teardown: await store.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() test "Should fail instantiating for invalid slot index": let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder) @@ -114,7 +124,7 @@ suite "Test Sampler": test "Should fail instantiating for non verifiable builder": let - nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet + nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder) check sampler.isErr diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55dd..34ff96bad3 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -4,6 +4,7 @@ import pkg/chronos import pkg/libp2p/cid import pkg/codex/merkletree +import pkg/taskpools import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/slots @@ -29,6 +30,7 @@ suite "Test Prover": var store: BlockStore prover: Prover + taskPool: Taskpool setup: let @@ -45,13 +47,14 @@ suite "Test Prover": numProofSamples: samples, ) backend = config.initializeBackend().tryGet() - + taskPool = Taskpool.new() store = RepoStore.new(repoDs, metaDs) - prover = Prover.new(store, backend, config.numProofSamples) + prover = Prover.new(store, backend, config.numProofSamples, taskPool) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should sample and prove a slot": let (_, _, verifiable) = await createVerifiableManifest( @@ -61,6 +64,7 @@ suite "Test Prover": 3, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet @@ -80,6 +84,7 @@ suite "Test Prover": 1, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index fc3c7bd55b..55f917ef4d 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import pkg/codex/utils/digest import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import ./helpers import ../helpers @@ -72,12 +73,13 @@ suite "Slot builder": protectedManifest: Manifest builder: Poseidon2Builder chunker: Chunker + taskPool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() - + taskPool = Taskpool.new() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -92,6 +94,7 @@ suite "Slot builder": await localStore.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually @@ -113,8 +116,9 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg == - "Manifest is not protected." + Poseidon2Builder.new( + localStore, unprotectedManifest, taskPool, cellSize = cellSize + ).error.msg == "Manifest is not protected." test "Number of blocks must be devisable by number of slots": let mismatchManifest = Manifest.new( @@ -131,7 +135,7 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Number of blocks must be divisible by number of slots." test "Block size must be divisable by cell size": @@ -149,12 +153,13 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Block size must be divisible by cell size." test "Should build correct slot builder": - builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.cellSize == cellSize @@ -171,7 +176,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -196,7 +201,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -215,8 +220,9 @@ suite "Slot builder": slotTree.root().tryGet() == expectedRoot test "Should persist trees for all slots": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() for i in 0 ..< numSlots: let @@ -242,7 +248,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() (await builder.buildSlots()).tryGet @@ -270,7 +276,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() slotsHashes = collect(newSeq): @@ -296,45 +302,53 @@ suite "Slot builder": test "Should not build from verifiable manifest with 0 slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots = @[] - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with incorrect number of slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with invalid verify root": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() var verifyManifest = (await builder.buildManifest()).tryGet() rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should build from verifiable manifest": let builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() - verificationBuilder = - Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet() + verificationBuilder = Poseidon2Builder + .new(localStore, verifyManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.slotRoots == verificationBuilder.slotRoots diff --git a/tests/codex/utils/testPoseidon.nim b/tests/codex/utils/testPoseidon.nim new file mode 100644 index 0000000000..aedf5fcf5f --- /dev/null +++ b/tests/codex/utils/testPoseidon.nim @@ -0,0 +1,40 @@ +{.push raises: [].} + +import std/[times, strformat, random] +import pkg/questionable/results + +import pkg/codex/merkletree/poseidon2 + +import pkg/codex/utils/poseidon2digest +import ../../asynctest + +test "Test poseidon2 digestTree": + randomize(42) + const + dataSize = 64 * 1024 # 64KB + chunkSize = 2 * 1024 # 2KB + iterations = 10 # Number of iterations + + echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes" + + # Generate random data + var data = newSeq[byte](dataSize) + for i in 0 ..< dataSize: + data[i] = byte(rand(255)) + + # Actual benchmark + let startTime = cpuTime() + + for i in 1 .. iterations: + let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet() + + # Optionally print info about each iteration + + let endTime = cpuTime() + let totalTime = endTime - startTime + let avgTime = totalTime / iterations.float + + echo &"Results:" + echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds" + echo &" Average time per iteration: {avgTime:.6f} seconds" + echo &" Iterations per second: {iterations.float / totalTime:.2f}" From 93b2df10b712b9b0afe08227af59ffc168dadcd9 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Wed, 9 Jul 2025 20:40:55 +0530 Subject: [PATCH 4/7] fix segfault issues --- codex/merkletree/codex/codex.nim | 8 ++-- codex/merkletree/merkletree.nim | 74 +++++++++++++++++++++++++++-- codex/merkletree/poseidon2.nim | 6 +-- codex/slots/builder/builder.nim | 4 +- codex/utils/poseidon2digest.nim | 20 ++++---- tests/codex/node/testnode.nim | 2 + tests/codex/node/testslotrepair.nim | 3 +- 7 files changed, 95 insertions(+), 22 deletions(-) diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index b0892dda9b..67e1507e7d 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -191,7 +191,7 @@ proc init*( var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) var task = - CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal) + CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: leaves, signal: signal) doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" @@ -208,10 +208,10 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - defer: - task.layers = default(Isolated[seq[seq[ByteHash]]]) + # defer: + # task.layers = default(Isolated[seq[seq[ByteHash]]]) - tree.layers = task.layers.extract + tree.layers = extractValue(task.layers) success tree diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index c973a7e1a8..bf9d4c921e 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -17,6 +17,68 @@ import pkg/chronos/threadsync import ../errors +type UniqueSeq*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr seq[seq[T]] + +proc newUniqueSeq*[T](data: sink Isolated[seq[seq[T]]]): UniqueSeq[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr seq[seq[T]]](allocShared0(sizeof(seq[seq[T]]))) + + result.data[] = extract(data) + +proc `=destroy`*[T](p: var UniqueSeq[T]) = + ## Destructor for UniqueSeq + if p.data != nil: + # Clear the sequence to release inner sequences + p.data[].setLen(0) + echo "destroying unique seq" + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniqueSeq[T], src: UniqueSeq[T] +) {.error: "UniqueSeq cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniqueSeq[T], src: UniqueSeq[T]) = + ## Move constructor for UniqueSeq + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniqueSeq[T]): lent seq[seq[T]] = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniqueSeq") + p.data[] + +proc `[]`*[T](p: var UniqueSeq[T]): var seq[seq[T]] = + ## Access the data (mutable) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniqueSeq") + p.data[] + +proc isNil*[T](p: UniqueSeq[T]): bool = + ## Check if the UniqueSeq is nil + p.data == nil + +proc extractValue*[T](p: var UniqueSeq[T]): seq[seq[T]] = + ## Extract the value from the UniqueSeq and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniqueSeq") + + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + + # Free the shared memory + deallocShared(p.data) + p.data = nil + type CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].} @@ -36,7 +98,7 @@ type tree*: ptr MerkleTree[H, K] leaves*: seq[H] signal*: ThreadSignalPtr - layers*: Isolated[seq[seq[H]]] + layers*: UniqueSeq[H] success*: Atomic[bool] func depth*[H, K](self: MerkleTree[H, K]): int = @@ -171,6 +233,12 @@ proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = task[].success.store(false) return - var isolatedLayers = isolate(res.get()) - task[].layers = isolatedLayers + var l = res.get() + var newOuterSeq = newSeq[seq[H]](l.len) + for i in 0 ..< l.len: + var isoInner = isolate(l[i]) + newOuterSeq[i] = extract(isoInner) + + var isolatedLayers = isolate(newOuterSeq) + task[].layers = newUniqueSeq(isolatedLayers) task[].success.store(true) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 64f9bc01af..0917276c0a 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -118,10 +118,10 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - defer: - task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) + # defer: + # task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) - tree.layers = task.layers.extract + tree.layers = extractValue(task.layers) success tree diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 34c3ed9aed..a2abb80be3 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -191,11 +191,11 @@ proc getBlockDigest*[T, H]( if blk.isEmpty: return self.emptyDigestTree.root - without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: error "Failed to create digest for block", err = err.msg return failure(err) - return success digest + return success dg proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 7607aee2b3..7e7c37f28b 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -23,7 +23,7 @@ type DigestTask* = object bytes: seq[byte] chunkSize: int success: Atomic[bool] - digest: Isolated[Poseidon2Hash] + digest: ptr Poseidon2Hash export DigestTask @@ -85,7 +85,8 @@ proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = task[].success.store(false) return - task[].digest = isolate(res.get()) + var isolatedDigest = isolate(res.get()) + task[].digest[] = extract(isolatedDigest) task[].success.store(true) proc digest*( @@ -93,14 +94,18 @@ proc digest*( ): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = without signal =? ThreadSignalPtr.new(): return failure("Unable to create thread signal") - defer: signal.close().expect("closing once works") doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + var task = DigestTask( + signal: signal, + bytes: bytes, + chunkSize: chunkSize, + digest: cast[ptr Poseidon2Hash](allocShared(sizeof(Poseidon2Hash))), + ) tp.spawn digestWorker(tp, addr task) @@ -114,11 +119,10 @@ proc digest*( if not task.success.load(): return failure("digest task failed") + var isolatedDigest = isolate(task.digest[]) + var digest = extract(isolatedDigest) defer: - task.digest = default(Isolated[Poseidon2Hash]) - - var digest = task.digest.extract - + deallocShared(task.digest) success digest func digestMhash*( diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 46e6df3afd..d182927d69 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -178,12 +178,14 @@ asyncchecksuite "Test Node - Basic": check string.fromBytes(data) == testString test "Setup purchase request": + echo "Here the tedt" let erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() + let builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index 3d588a6d73..d074efca67 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -52,7 +52,6 @@ asyncchecksuite "Test Node - Slot Repair": ) var manifest: Manifest - taskPool: Taskpool builder: Poseidon2Builder verifiable: Manifest verifiableBlock: bt.Block @@ -102,7 +101,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() From 7c2662d7fe260daa0298386af1c7a806371ac794 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Thu, 10 Jul 2025 18:55:43 +0530 Subject: [PATCH 5/7] use uniqueptr for safe memory managment --- codex/merkletree/codex/codex.nim | 5 +- codex/merkletree/merkletree.nim | 76 ++------------------ codex/merkletree/poseidon2.nim | 4 +- codex/nat.nim | 2 +- codex/utils/poseidon2digest.nim | 19 ++--- codex/utils/uniqueptr.nim | 58 +++++++++++++++ tests/codex/merkletree/testposeidon2tree.nim | 2 - 7 files changed, 74 insertions(+), 92 deletions(-) create mode 100644 codex/utils/uniqueptr.nim diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index 67e1507e7d..dc4544c046 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -25,6 +25,8 @@ import ../../blocktype from ../../utils/digest import digestBytes +import ../../utils/uniqueptr + import ../merkletree export merkletree @@ -208,9 +210,6 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - # defer: - # task.layers = default(Isolated[seq[seq[ByteHash]]]) - tree.layers = extractValue(task.layers) success tree diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index bf9d4c921e..5c12f19870 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -16,68 +16,7 @@ import pkg/taskpools import pkg/chronos/threadsync import ../errors - -type UniqueSeq*[T] = object - ## A unique pointer to a seq[seq[T]] in shared memory - ## Can only be moved, not copied - data: ptr seq[seq[T]] - -proc newUniqueSeq*[T](data: sink Isolated[seq[seq[T]]]): UniqueSeq[T] = - ## Creates a new unique sequence in shared memory - ## The memory is automatically freed when the object is destroyed - result.data = cast[ptr seq[seq[T]]](allocShared0(sizeof(seq[seq[T]]))) - - result.data[] = extract(data) - -proc `=destroy`*[T](p: var UniqueSeq[T]) = - ## Destructor for UniqueSeq - if p.data != nil: - # Clear the sequence to release inner sequences - p.data[].setLen(0) - echo "destroying unique seq" - deallocShared(p.data) - p.data = nil - -proc `=copy`*[T]( - dest: var UniqueSeq[T], src: UniqueSeq[T] -) {.error: "UniqueSeq cannot be copied, only moved".} - -proc `=sink`*[T](dest: var UniqueSeq[T], src: UniqueSeq[T]) = - ## Move constructor for UniqueSeq - if dest.data != nil: - `=destroy`(dest) - dest.data = src.data - # We need to nil out the source data to prevent double-free - # This is handled by Nim's destructive move semantics - -proc `[]`*[T](p: UniqueSeq[T]): lent seq[seq[T]] = - ## Access the data (read-only) - if p.data == nil: - raise newException(NilAccessDefect, "accessing nil UniqueSeq") - p.data[] - -proc `[]`*[T](p: var UniqueSeq[T]): var seq[seq[T]] = - ## Access the data (mutable) - if p.data == nil: - raise newException(NilAccessDefect, "accessing nil UniqueSeq") - p.data[] - -proc isNil*[T](p: UniqueSeq[T]): bool = - ## Check if the UniqueSeq is nil - p.data == nil - -proc extractValue*[T](p: var UniqueSeq[T]): seq[seq[T]] = - ## Extract the value from the UniqueSeq and release the memory - if p.data == nil: - raise newException(NilAccessDefect, "extracting from nil UniqueSeq") - - # Move the value out - var isolated = isolate(p.data[]) - result = extract(isolated) - - # Free the shared memory - deallocShared(p.data) - p.data = nil +import ../utils/uniqueptr type CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].} @@ -98,7 +37,7 @@ type tree*: ptr MerkleTree[H, K] leaves*: seq[H] signal*: ThreadSignalPtr - layers*: UniqueSeq[H] + layers*: UniquePtr[seq[seq[H]]] success*: Atomic[bool] func depth*[H, K](self: MerkleTree[H, K]): int = @@ -233,12 +172,11 @@ proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = task[].success.store(false) return - var l = res.get() - var newOuterSeq = newSeq[seq[H]](l.len) - for i in 0 ..< l.len: - var isoInner = isolate(l[i]) + var layers = res.get() + var newOuterSeq = newSeq[seq[H]](layers.len) + for i in 0 ..< layers.len: + var isoInner = isolate(layers[i]) newOuterSeq[i] = extract(isoInner) - var isolatedLayers = isolate(newOuterSeq) - task[].layers = newUniqueSeq(isolatedLayers) + task[].layers = newUniquePtr(newOuterSeq) task[].success.store(true) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 0917276c0a..6feb9df44e 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -19,6 +19,7 @@ import pkg/constantine/platforms/abstractions import pkg/questionable/results import ../utils +import ../utils/uniqueptr import ../rng import ./merkletree @@ -118,9 +119,6 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - # defer: - # task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) - tree.layers = extractValue(task.layers) success tree diff --git a/codex/nat.nim b/codex/nat.nim index d022dad6cb..275367df09 100644 --- a/codex/nat.nim +++ b/codex/nat.nim @@ -423,7 +423,7 @@ proc nattedAddress*( it.remapAddr(ip = newIP, port = tcp) else: # NAT mapping failed - use original address - echo "Failed to get external IP, using original address", it + error "Failed to get external IP, using original address", it discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) it else: diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 7e7c37f28b..c023928e2c 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -15,6 +15,7 @@ import pkg/stew/byteutils import pkg/taskpools import pkg/chronos import pkg/chronos/threadsync +import ./uniqueptr import ../merkletree @@ -23,7 +24,7 @@ type DigestTask* = object bytes: seq[byte] chunkSize: int success: Atomic[bool] - digest: ptr Poseidon2Hash + digest: UniquePtr[Poseidon2Hash] export DigestTask @@ -85,8 +86,7 @@ proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = task[].success.store(false) return - var isolatedDigest = isolate(res.get()) - task[].digest[] = extract(isolatedDigest) + task[].digest = newUniquePtr(res.get()) task[].success.store(true) proc digest*( @@ -100,12 +100,7 @@ proc digest*( doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - var task = DigestTask( - signal: signal, - bytes: bytes, - chunkSize: chunkSize, - digest: cast[ptr Poseidon2Hash](allocShared(sizeof(Poseidon2Hash))), - ) + var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) tp.spawn digestWorker(tp, addr task) @@ -119,11 +114,7 @@ proc digest*( if not task.success.load(): return failure("digest task failed") - var isolatedDigest = isolate(task.digest[]) - var digest = extract(isolatedDigest) - defer: - deallocShared(task.digest) - success digest + success extractValue(task.digest) func digestMhash*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int diff --git a/codex/utils/uniqueptr.nim b/codex/utils/uniqueptr.nim new file mode 100644 index 0000000000..2aec0d3846 --- /dev/null +++ b/codex/utils/uniqueptr.nim @@ -0,0 +1,58 @@ +import std/isolation +type UniquePtr*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr T + +proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr T](allocShared0(sizeof(T))) + result.data[] = extract(data) + +template newUniquePtr*[T](data: T): UniquePtr[T] = + newUniquePtr(isolate(data)) + +proc `=destroy`*[T](p: var UniquePtr[T]) = + ## Destructor for UniquePtr + if p.data != nil: + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniquePtr[T], src: UniquePtr[T] +) {.error: "UniquePtr cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) = + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniquePtr[T]): lent T = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniquePtr") + p.data[] + +# proc `[]`*[T](p: var UniquePtr[T]): var T = +# ## Access the data (mutable) +# if p.data == nil: +# raise newException(NilAccessDefect, "accessing nil UniquePtr") +# p.data[] + +proc isNil*[T](p: UniquePtr[T]): bool = + ## Check if the UniquePtr is nil + p.data == nil + +proc extractValue*[T](p: var UniquePtr[T]): T = + ## Extract the value from the UniquePtr and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniquePtr") + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + # Free the shared memory + deallocShared(p.data) + p.data = nil diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index f574d63775..45a727e5b4 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -63,11 +63,9 @@ suite "Test Poseidon2Tree": tree == fromNodes test "Build poseidon2 tree from poseidon2 leaves asynchronously": - echo "Build poseidon2 tree from poseidon2 leaves asynchronously" var tp = Taskpool.new() defer: tp.shutdown() - echo "@@@@@" let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() check: From 43150bdbed54c4769133e15529160cb2fce33c04 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Thu, 10 Jul 2025 19:16:08 +0530 Subject: [PATCH 6/7] fix log stmt --- codex/nat.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/nat.nim b/codex/nat.nim index 275367df09..19ec76e8d8 100644 --- a/codex/nat.nim +++ b/codex/nat.nim @@ -423,7 +423,7 @@ proc nattedAddress*( it.remapAddr(ip = newIP, port = tcp) else: # NAT mapping failed - use original address - error "Failed to get external IP, using original address", it + error "Failed to get external IP, using original address" discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) it else: From 20b6d76b4203c1e76a1c10f05425dea7112b9af8 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Thu, 10 Jul 2025 19:48:04 +0530 Subject: [PATCH 7/7] code cleanup --- codex/nat.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/nat.nim b/codex/nat.nim index 19ec76e8d8..d022dad6cb 100644 --- a/codex/nat.nim +++ b/codex/nat.nim @@ -423,7 +423,7 @@ proc nattedAddress*( it.remapAddr(ip = newIP, port = tcp) else: # NAT mapping failed - use original address - error "Failed to get external IP, using original address" + echo "Failed to get external IP, using original address", it discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) it else: