diff --git a/codex/codex.nim b/codex/codex.nim index 8135746410..928305c149 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,7 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer - taskpool: Taskpool + taskPool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -194,8 +194,8 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" - if not s.taskpool.isNil: - s.taskpool.shutdown() + if not s.taskPool.isNil: + s.taskPool.shutdown() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey @@ -216,16 +216,16 @@ proc new*( var cache: CacheStore = nil - taskpool: Taskpool + taskPool: Taskpool try: if config.numThreads == ThreadCount(0): - taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + taskPool = Taskpool.new(numThreads = min(countProcessors(), 16)) else: - taskpool = Taskpool.new(numThreads = int(config.numThreads)) - info "Threadpool started", numThreads = taskpool.numThreads + taskPool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskPool.numThreads except CatchableError as exc: - raiseAssert("Failure in taskpool initialization:" & exc.msg) + raiseAssert("Failure in taskPool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -307,7 +307,7 @@ proc new*( if config.prover: let backend = config.initializeBackend().expect("Unable to create prover backend.") - some Prover.new(store, backend, config.numProofSamples) + some Prover.new(store, backend, config.numProofSamples, taskPool) else: none Prover @@ -317,7 +317,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, - taskPool = taskpool, + taskPool = taskPool, ) restServer = RestServerRef @@ -337,5 +337,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, - taskpool: taskpool, + taskPool: taskPool, ) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500f9..d378b3c38e 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -428,7 +428,7 @@ proc encodeData( return failure("Unable to store block!") idx.inc(params.steps) - without tree =? CodexTree.init(cids[]), err: + without tree =? (await CodexTree.init(self.taskPool, cids[])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -649,7 +649,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -680,7 +681,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = without (cids, _) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index 0eec92e4c2..dc4544c046 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -10,12 +10,14 @@ {.push raises: [].} import std/bitops -import std/sequtils +import std/[atomics, sequtils] import pkg/questionable import pkg/questionable/results import pkg/libp2p/[cid, multicodec, multihash] import pkg/constantine/hashes +import pkg/taskpools +import pkg/chronos/threadsync import ../../utils import ../../rng import ../../errors @@ -23,6 +25,8 @@ import ../../blocktype from ../../utils/digest import digestBytes +import ../../utils/uniqueptr + import ../merkletree export merkletree @@ -41,6 +45,8 @@ type ByteTree* = MerkleTree[ByteHash, ByteTreeKey] ByteProof* = MerkleProof[ByteHash, ByteTreeKey] + CodexTreeTask* = MerkleTask[ByteHash, ByteTreeKey] + CodexTree* = ref object of ByteTree mcodec*: MultiCodec @@ -48,7 +54,7 @@ type mcodec*: MultiCodec # CodeHashes is not exported from libp2p -# So we need to recreate it instead of +# So we need to recreate it instead of proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} = for item in HashesList: result[item.mcodec] = item @@ -160,6 +166,54 @@ func init*( self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self +proc init*( + _: type CodexTree, + tp: Taskpool, + mcodec: MultiCodec = Sha256HashCodec, + leaves: seq[ByteHash], +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let + mhash = ?mcodec.mhash() + compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} = + compress(x, y, key, mhash) + Zero: ByteHash = newSeq[byte](mhash.size) + + if mhash.size != leaves[0].len: + return failure "Invalid hash length" + + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) + + var task = + CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: leaves, signal: signal) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn merkleTreeWorker(addr task) + + let threadFut = signal.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + tree.layers = extractValue(task.layers) + + success tree + func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -170,6 +224,18 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[MultiHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let + mcodec = leaves[0].mcodec + leaves = leaves.mapIt(it.digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -180,6 +246,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + proc fromNodes*( _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index f1905becbd..5c12f19870 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -9,11 +9,14 @@ {.push raises: [].} -import std/bitops +import std/[bitops, atomics] import pkg/questionable/results +import pkg/taskpools +import pkg/chronos/threadsync import ../errors +import ../utils/uniqueptr type CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].} @@ -30,6 +33,13 @@ type compress*: CompressFn[H, K] # compress function zero*: H # zero value + MerkleTask*[H, K] = object + tree*: ptr MerkleTree[H, K] + leaves*: seq[H] + signal*: ThreadSignalPtr + layers*: UniquePtr[seq[seq[H]]] + success*: Atomic[bool] + func depth*[H, K](self: MerkleTree[H, K]): int = return self.layers.len - 1 @@ -151,3 +161,22 @@ func merkleTreeWorker*[H, K]( ys[halfn] = ?self.compress(xs[n], self.zero, key = key) success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) + +proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + let res = merkleTreeWorker(task[].tree[], task[].leaves, isBottomLayer = true) + + if res.isErr: + task[].success.store(false) + return + + var layers = res.get() + var newOuterSeq = newSeq[seq[H]](layers.len) + for i in 0 ..< layers.len: + var isoInner = isolate(layers[i]) + newOuterSeq[i] = extract(isoInner) + + task[].layers = newUniquePtr(newOuterSeq) + task[].success.store(true) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 56ad1e4d46..6feb9df44e 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -9,14 +9,17 @@ {.push raises: [].} -import std/sequtils +import std/[sequtils, atomics] import pkg/poseidon2 +import pkg/taskpools +import pkg/chronos/threadsync import pkg/constantine/math/io/io_fields import pkg/constantine/platforms/abstractions import pkg/questionable/results import ../utils +import ../utils/uniqueptr import ../rng import ./merkletree @@ -44,6 +47,8 @@ type Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum] + Poseidon2TreeTask* = MerkleTask[Poseidon2Hash, PoseidonKeysEnum] + proc `$`*(self: Poseidon2Tree): string = let root = if self.root.isOk: self.root.get.toHex else: "none" "Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount & @@ -77,9 +82,55 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2 self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let compressor = proc( + x, y: Poseidon2Hash, key: PoseidonKeysEnum + ): ?!Poseidon2Hash {.noSideEffect.} = + success compress(x, y, key.toKey) + + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) + var task = Poseidon2TreeTask( + tree: cast[ptr Poseidon2Tree](addr tree), leaves: leaves, signal: signal + ) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn merkleTreeWorker(addr task) + + let threadFut = signal.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + tree.layers = extractValue(task.layers) + + success tree + func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it))) +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it))) + proc fromNodes*( _: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int ): ?!Poseidon2Tree = diff --git a/codex/node.nim b/codex/node.nim index e010b08540..724f8bf372 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -72,7 +72,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts - taskpool: Taskpool + taskPool: Taskpool trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -294,7 +294,7 @@ proc streamEntireDataset( try: # Spawn an erasure decoding job let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg @@ -439,7 +439,7 @@ proc store*( finally: await stream.close() - without tree =? CodexTree.init(cids), err: + without tree =? (await CodexTree.init(self.taskPool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: @@ -533,14 +533,15 @@ proc setupRequest( # Erasure code the dataset according to provided parameters let erasure = Erasure.new( - self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" return failure(error) - without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: + without builder =? + Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err: trace "Unable to create slot builder" return failure(err) @@ -644,7 +645,9 @@ proc onStore( return failure(err) without builder =? - Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err: + Poseidon2Builder.new( + self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -679,7 +682,7 @@ proc onStore( trace "start repairing slot", slotIdx try: let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) if err =? (await erasure.repair(manifest)).errorOption: error "Unable to erasure decode repairing manifest", @@ -880,7 +883,7 @@ proc new*( networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, - taskpool: Taskpool, + taskPool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -893,7 +896,7 @@ proc new*( engine: engine, prover: prover, discovery: discovery, - taskPool: taskpool, + taskPool: taskPool, contracts: contracts, trackedFutures: TrackedFutures(), ) diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 5fbb0fe191..a2abb80be3 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -18,18 +18,20 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results import pkg/constantine/math/io/io_fields +import pkg/taskpools import ../../logutils import ../../utils import ../../stores import ../../manifest import ../../merkletree +import ../../utils/poseidon2digest import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, asynciter, poseidon2digest logScope: topics = "codex slotsbuilder" @@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj emptyBlock: seq[byte] # empty block verifiableTree: ?T # verification tree (dataset tree) emptyDigestTree: T # empty digest tree for empty blocks + taskPool: Taskpool func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} = ## Returns true if the slots are verifiable. @@ -165,6 +168,35 @@ proc buildBlockTree*[T, H]( success (blk.data, tree) +proc getBlockDigest*[T, H]( + self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural +): Future[?!H] {.async: (raises: [CancelledError]).} = + logScope: + blkIdx = blkIdx + slotPos = slotPos + numSlotBlocks = self.manifest.numSlotBlocks + cellSize = self.cellSize + + trace "Building block tree" + + if slotPos > (self.manifest.numSlotBlocks - 1): + # pad blocks are 0 byte blocks + trace "Returning empty digest tree for pad block" + return self.emptyDigestTree.root + + without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err: + error "Failed to get block CID for tree at index", err = err.msg + return failure(err) + + if blk.isEmpty: + return self.emptyDigestTree.root + + without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + error "Failed to create digest for block", err = err.msg + return failure(err) + + return success dg + proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural ): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = @@ -190,8 +222,7 @@ proc getCellHashes*[T, H]( pos = i trace "Getting block CID for tree at index" - without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, - err: + without digest =? (await self.getBlockDigest(blkIdx, i)), err: error "Failed to get block CID for tree at index", err = err.msg return failure(err) @@ -310,6 +341,7 @@ proc new*[T, H]( _: type SlotsBuilder[T, H], store: BlockStore, manifest: Manifest, + taskPool: Taskpool, strategy = LinearStrategy, cellSize = DefaultCellSize, ): ?!SlotsBuilder[T, H] = @@ -383,6 +415,7 @@ proc new*[T, H]( emptyBlock: emptyBlock, numSlotBlocks: numSlotBlocksTotal, emptyDigestTree: emptyDigestTree, + taskPool: taskPool, ) if manifest.verifiable: diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index 1afcd06841..bba39e8c84 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -13,6 +13,7 @@ import pkg/chronicles import pkg/circomcompat import pkg/poseidon2 import pkg/questionable/results +import pkg/taskpools import pkg/libp2p/cid @@ -47,6 +48,7 @@ type backend: AnyBackend store: BlockStore nSamples: int + taskPool: Taskpool proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge @@ -61,7 +63,7 @@ proc prove*( trace "Received proof challenge" - without builder =? AnyBuilder.new(self.store, manifest), err: + without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err: error "Unable to create slots builder", err = err.msg return failure(err) @@ -88,6 +90,6 @@ proc verify*( self.backend.verify(proof, inputs) proc new*( - _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int + _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool ): Prover = - Prover(store: store, backend: backend, nSamples: nSamples) + Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp) diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 6eaf21e98a..c023928e2c 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -7,13 +7,27 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/[atomics] import pkg/poseidon2 import pkg/questionable/results import pkg/libp2p/multihash import pkg/stew/byteutils +import pkg/taskpools +import pkg/chronos +import pkg/chronos/threadsync +import ./uniqueptr import ../merkletree +type DigestTask* = object + signal: ThreadSignalPtr + bytes: seq[byte] + chunkSize: int + success: Atomic[bool] + digest: UniquePtr[Poseidon2Hash] + +export DigestTask + func spongeDigest*( _: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2 ): ?!Poseidon2Hash = @@ -30,7 +44,7 @@ func spongeDigest*( success Sponge.digest(bytes, rate) -func digestTree*( +proc digestTree*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!Poseidon2Tree = ## Hashes chunks of data with a sponge of rate 2, and combines the @@ -44,6 +58,7 @@ func digestTree*( var index = 0 var leaves: seq[Poseidon2Hash] + while index < bytes.len: let start = index let finish = min(index + chunkSize, bytes.len) @@ -61,6 +76,46 @@ func digest*( (?Poseidon2Tree.digestTree(bytes, chunkSize)).root +proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize) + + if res.isErr: + task[].success.store(false) + return + + task[].digest = newUniquePtr(res.get()) + task[].success.store(true) + +proc digest*( + _: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int +): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + defer: + signal.close().expect("closing once works") + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + + tp.spawn digestWorker(tp, addr task) + + let signalFut = signal.wait() + + if err =? catch(await signalFut.join()).errorOption: + ?catch(await noCancel signalFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("digest task failed") + + success extractValue(task.digest) + func digestMhash*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!MultiHash = diff --git a/codex/utils/uniqueptr.nim b/codex/utils/uniqueptr.nim new file mode 100644 index 0000000000..2aec0d3846 --- /dev/null +++ b/codex/utils/uniqueptr.nim @@ -0,0 +1,58 @@ +import std/isolation +type UniquePtr*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr T + +proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr T](allocShared0(sizeof(T))) + result.data[] = extract(data) + +template newUniquePtr*[T](data: T): UniquePtr[T] = + newUniquePtr(isolate(data)) + +proc `=destroy`*[T](p: var UniquePtr[T]) = + ## Destructor for UniquePtr + if p.data != nil: + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniquePtr[T], src: UniquePtr[T] +) {.error: "UniquePtr cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) = + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniquePtr[T]): lent T = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniquePtr") + p.data[] + +# proc `[]`*[T](p: var UniquePtr[T]): var T = +# ## Access the data (mutable) +# if p.data == nil: +# raise newException(NilAccessDefect, "accessing nil UniquePtr") +# p.data[] + +proc isNil*[T](p: UniquePtr[T]): bool = + ## Check if the UniquePtr is nil + p.data == nil + +proc extractValue*[T](p: var UniquePtr[T]): T = + ## Extract the value from the UniquePtr and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniquePtr") + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + # Free the shared memory + deallocShared(p.data) + p.data = nil diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index 29390c168b..9ab680cebd 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,6 +1,6 @@ import std/sequtils +import std/times -import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils import pkg/libp2p @@ -9,8 +9,11 @@ import pkg/codex/codextypes import pkg/codex/merkletree import pkg/codex/utils/digest +import pkg/taskpools + import ./helpers import ./generictreetests +import ../../asynctest # TODO: Generalize to other hashes @@ -43,9 +46,23 @@ suite "Test CodexTree": CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr test "Should build tree from multihash leaves": - var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + var + expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + tree = CodexTree.init(leaves = expectedLeaves) + + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) + tree.get().mcodec == sha256 - var tree = CodexTree.init(leaves = expectedLeaves) + test "Should build tree from multihash leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) @@ -63,6 +80,22 @@ suite "Test CodexTree": tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) tree.get().mcodec == sha256 + test "Should build tree from cid leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt( + Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet + ) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) + + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) + tree.get().mcodec == sha256 + test "Should build from raw digestbytes (should not hash leaves)": let tree = CodexTree.init(sha256, leaves = data).tryGet @@ -70,6 +103,18 @@ suite "Test CodexTree": tree.mcodec == sha256 tree.leaves == data + test "Should build from raw digestbytes (should not hash leaves) asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await CodexTree.init(tp, sha256, leaves = @data)) + + check: + tree.isOk + tree.get().mcodec == sha256 + tree.get().leaves == data + test "Should build from nodes": let tree = CodexTree.init(sha256, leaves = data).tryGet diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index e12751b713..45a727e5b4 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/io import pkg/questionable/results @@ -9,9 +8,11 @@ import pkg/stew/byteutils import pkg/stew/arrayops import pkg/codex/merkletree +import pkg/taskpools import ./generictreetests import ./helpers +import ../../asynctest const data = [ "0000000000000000000000000000001".toBytes, @@ -36,13 +37,14 @@ suite "Test Poseidon2Tree": check: Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr - test "Init tree from poseidon2 leaves": - let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet + test "Build tree from poseidon2 leaves": + var taskpool = Taskpool.new(numThreads = 2) + let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet() check: tree.leaves == expectedLeaves - test "Init tree from byte leaves": + test "Build tree from byte leaves": let tree = Poseidon2Tree.init( leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) ).tryGet @@ -50,7 +52,7 @@ suite "Test Poseidon2Tree": check: tree.leaves == expectedLeaves - test "Should build from nodes": + test "Build tree from nodes": let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet fromNodes = Poseidon2Tree.fromNodes( @@ -60,6 +62,30 @@ suite "Test Poseidon2Tree": check: tree == fromNodes + test "Build poseidon2 tree from poseidon2 leaves asynchronously": + var tp = Taskpool.new() + defer: + tp.shutdown() + + let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() + check: + tree.leaves == expectedLeaves + + test "Build poseidon2 tree from byte leaves asynchronously": + echo "Build poseidon2 tree from byte leaves asynchronously" + var tp = Taskpool.new() + defer: + tp.shutdown() + + let tree = ( + await Poseidon2Tree.init( + tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) + ) + ).tryGet() + + check: + tree.leaves == expectedLeaves + let compressor = proc( x, y: Poseidon2Hash, key: PoseidonKeysEnum diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743e4..fcb91e8f70 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts": verifiable: Manifest verifiableBlock: bt.Block protected: Manifest + taskPool: Taskpool setup: # Setup Host Contracts and dependencies market = MockMarket.new() sales = Sales.new(market, clock, localStore) - + taskPool = Taskpool.new() node.contracts = ( none ClientInteractions, some HostInteractions.new(clock, sales), @@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifestCid = manifestBlock.cid (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() (await localStore.putBlock(verifiableBlock)).tryGet() + teardown: + taskPool.shutdown() + test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 78298ad758..d182927d69 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + var taskPool: Taskpool setup: + taskPool = Taskpool.new() await node.start() + teardown: + taskPool.shutdown() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) @@ -173,14 +178,15 @@ asyncchecksuite "Test Node - Basic": check string.fromBytes(data) == testString test "Setup purchase request": + echo "Here the tedt" let - erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + let + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index d96078d292..d074efca67 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -13,6 +13,7 @@ import pkg/codex/contracts import pkg/codex/slots import pkg/codex/manifest import pkg/codex/erasure +import pkg/taskpools import pkg/codex/blocktype as bt import pkg/chronos/transports/stream @@ -100,7 +101,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -118,6 +119,7 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[1].switch.stop() # slot 0 missing now # repair missing slot + (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now @@ -131,16 +133,19 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[4].switch.stop() # slot 0 missing now # repair missing slot from repaired slots + (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 1 missing now # repair missing slot from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() await nodes[6].switch.stop() # slot 2 missing now # repair missing slot from repaired slots + (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() let @@ -179,7 +184,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -198,19 +203,24 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[3].switch.stop() # slot 2 missing now # repair missing slots + (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now await nodes[4].switch.stop() # slot 3 missing now # repair missing slots from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 4 missing now # repair missing slot from repaired slots + (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() let diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index b61d4f188c..637ee36b18 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -3,6 +3,7 @@ import std/options import ../../../asynctest import pkg/chronos +import pkg/taskpools import pkg/poseidon2 import pkg/serde/json @@ -77,6 +78,7 @@ suite "Test Circom Compat Backend": challenge: array[32, byte] builder: Poseidon2Builder sampler: Poseidon2Sampler + taskPool: Taskpool setup: let @@ -85,11 +87,13 @@ suite "Test Circom Compat Backend": store = RepoStore.new(repoDs, metaDs) + taskPool = Taskpool.new() + (manifest, protected, verifiable) = await createVerifiableManifest( - store, numDatasetBlocks, ecK, ecM, blockSize, cellSize + store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool ) - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet circom = CircomCompat.init(r1cs, wasm, zkey) @@ -101,6 +105,7 @@ suite "Test Circom Compat Backend": circom.release() # this comes from the rust FFI await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should verify with correct input": var proof = circom.prove(proofInputs).tryGet diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index fced1f1c40..01159c217d 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -12,6 +12,7 @@ import pkg/codex/chunker import pkg/codex/indexingstrategy import pkg/codex/slots import pkg/codex/rng +import pkg/taskpools import ../helpers @@ -145,6 +146,7 @@ proc createVerifiableManifest*( ecM: int, blockSize: NBytes, cellSize: NBytes, + taskPool: Taskpool, ): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {. async .} = @@ -165,7 +167,9 @@ proc createVerifiableManifest*( totalDatasetSize, ) - builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet + builder = Poseidon2Builder.new( + store, protectedManifest, cellSize = cellSize, taskPool = taskPool + ).tryGet verifiableManifest = (await builder.buildManifest()).tryGet # build the slots and manifest diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 78b245a349..bf7277a37e 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -5,6 +5,7 @@ import ../../../asynctest import pkg/questionable/results +import pkg/taskpools import pkg/codex/stores import pkg/codex/merkletree import pkg/codex/utils/json @@ -26,11 +27,16 @@ suite "Test Sampler - control samples": inputData: string inputJson: JsonNode proofInput: ProofInputs[Poseidon2Hash] + taskpool: Taskpool setup: inputData = readFile("tests/circuits/fixtures/input.json") inputJson = !JsonNode.parse(inputData) proofInput = Poseidon2Hash.jsonToProofInput(inputJson) + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() test "Should verify control samples": let @@ -87,25 +93,29 @@ suite "Test Sampler": manifest: Manifest protected: Manifest verifiable: Manifest + taskpool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() + taskpool = Taskpool.new() + store = RepoStore.new(repoDs, metaDs) (manifest, protected, verifiable) = await createVerifiableManifest( - store, datasetBlocks, ecK, ecM, blockSize, cellSize + store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool ) # create sampler - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet teardown: await store.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() test "Should fail instantiating for invalid slot index": let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder) @@ -114,7 +124,7 @@ suite "Test Sampler": test "Should fail instantiating for non verifiable builder": let - nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet + nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder) check sampler.isErr diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55dd..34ff96bad3 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -4,6 +4,7 @@ import pkg/chronos import pkg/libp2p/cid import pkg/codex/merkletree +import pkg/taskpools import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/slots @@ -29,6 +30,7 @@ suite "Test Prover": var store: BlockStore prover: Prover + taskPool: Taskpool setup: let @@ -45,13 +47,14 @@ suite "Test Prover": numProofSamples: samples, ) backend = config.initializeBackend().tryGet() - + taskPool = Taskpool.new() store = RepoStore.new(repoDs, metaDs) - prover = Prover.new(store, backend, config.numProofSamples) + prover = Prover.new(store, backend, config.numProofSamples, taskPool) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should sample and prove a slot": let (_, _, verifiable) = await createVerifiableManifest( @@ -61,6 +64,7 @@ suite "Test Prover": 3, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet @@ -80,6 +84,7 @@ suite "Test Prover": 1, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index fc3c7bd55b..55f917ef4d 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import pkg/codex/utils/digest import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import ./helpers import ../helpers @@ -72,12 +73,13 @@ suite "Slot builder": protectedManifest: Manifest builder: Poseidon2Builder chunker: Chunker + taskPool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() - + taskPool = Taskpool.new() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -92,6 +94,7 @@ suite "Slot builder": await localStore.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually @@ -113,8 +116,9 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg == - "Manifest is not protected." + Poseidon2Builder.new( + localStore, unprotectedManifest, taskPool, cellSize = cellSize + ).error.msg == "Manifest is not protected." test "Number of blocks must be devisable by number of slots": let mismatchManifest = Manifest.new( @@ -131,7 +135,7 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Number of blocks must be divisible by number of slots." test "Block size must be divisable by cell size": @@ -149,12 +153,13 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Block size must be divisible by cell size." test "Should build correct slot builder": - builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.cellSize == cellSize @@ -171,7 +176,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -196,7 +201,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -215,8 +220,9 @@ suite "Slot builder": slotTree.root().tryGet() == expectedRoot test "Should persist trees for all slots": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() for i in 0 ..< numSlots: let @@ -242,7 +248,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() (await builder.buildSlots()).tryGet @@ -270,7 +276,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() slotsHashes = collect(newSeq): @@ -296,45 +302,53 @@ suite "Slot builder": test "Should not build from verifiable manifest with 0 slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots = @[] - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with incorrect number of slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with invalid verify root": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() var verifyManifest = (await builder.buildManifest()).tryGet() rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should build from verifiable manifest": let builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() - verificationBuilder = - Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet() + verificationBuilder = Poseidon2Builder + .new(localStore, verifyManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.slotRoots == verificationBuilder.slotRoots diff --git a/tests/codex/utils/testPoseidon.nim b/tests/codex/utils/testPoseidon.nim new file mode 100644 index 0000000000..aedf5fcf5f --- /dev/null +++ b/tests/codex/utils/testPoseidon.nim @@ -0,0 +1,40 @@ +{.push raises: [].} + +import std/[times, strformat, random] +import pkg/questionable/results + +import pkg/codex/merkletree/poseidon2 + +import pkg/codex/utils/poseidon2digest +import ../../asynctest + +test "Test poseidon2 digestTree": + randomize(42) + const + dataSize = 64 * 1024 # 64KB + chunkSize = 2 * 1024 # 2KB + iterations = 10 # Number of iterations + + echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes" + + # Generate random data + var data = newSeq[byte](dataSize) + for i in 0 ..< dataSize: + data[i] = byte(rand(255)) + + # Actual benchmark + let startTime = cpuTime() + + for i in 1 .. iterations: + let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet() + + # Optionally print info about each iteration + + let endTime = cpuTime() + let totalTime = endTime - startTime + let avgTime = totalTime / iterations.float + + echo &"Results:" + echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds" + echo &" Average time per iteration: {avgTime:.6f} seconds" + echo &" Iterations per second: {iterations.float / totalTime:.2f}"