From 789f0e4b6075a8a1dd56e12d0cfb61a2abd79468 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Mon, 29 Dec 2025 20:12:37 -0300 Subject: [PATCH 1/2] experiment: Build only archive on mithril bootstrap --- crates/cardano/src/lib.rs | 6 +++++ crates/cardano/src/roll/mod.rs | 18 +++++++------ crates/core/src/archive.rs | 19 ++++++++++++-- crates/core/src/batch.rs | 29 ++++++++++++++++++++- crates/core/src/facade.rs | 42 +++++++++++++++++++++++++++--- crates/core/src/lib.rs | 4 ++- src/bin/dolos/bootstrap/mithril.rs | 17 +++++++++--- 7 files changed, 117 insertions(+), 18 deletions(-) diff --git a/crates/cardano/src/lib.rs b/crates/cardano/src/lib.rs index 448027a03..e7e8d6c9c 100644 --- a/crates/cardano/src/lib.rs +++ b/crates/cardano/src/lib.rs @@ -376,6 +376,12 @@ impl dolos_core::ChainLogic for CardanoLogic { Ok(out) } + fn decode_block(&self, raw: Arc) -> Result { + let out = OwnedMultiEraBlock::decode(raw)?; + + Ok(out) + } + fn mutable_slots(domain: &impl Domain) -> BlockSlot { utils::mutable_slots(&domain.genesis()) } diff --git a/crates/cardano/src/roll/mod.rs b/crates/cardano/src/roll/mod.rs index d49a78695..e24232584 100644 --- a/crates/cardano/src/roll/mod.rs +++ b/crates/cardano/src/roll/mod.rs @@ -183,13 +183,13 @@ macro_rules! maybe_visit { macro_rules! visit_all { ($self:ident, $deltas:expr, $method:ident, $($args:tt)*) => { - maybe_visit!($self, $deltas, account_state, $method, $($args)*); - maybe_visit!($self, $deltas, asset_state, $method, $($args)*); - maybe_visit!($self, $deltas, drep_state, $method, $($args)*); - maybe_visit!($self, $deltas, epoch_state, $method, $($args)*); - maybe_visit!($self, $deltas, pool_state, $method, $($args)*); + // maybe_visit!($self, $deltas, account_state, $method, $($args)*); + // maybe_visit!($self, $deltas, asset_state, $method, $($args)*); + // maybe_visit!($self, $deltas, drep_state, $method, $($args)*); + // maybe_visit!($self, $deltas, epoch_state, $method, $($args)*); + // maybe_visit!($self, $deltas, pool_state, $method, $($args)*); maybe_visit!($self, $deltas, tx_logs, $method, $($args)*); - maybe_visit!($self, $deltas, proposal_logs, $method, $($args)*); + // maybe_visit!($self, $deltas, proposal_logs, $method, $($args)*); }; } @@ -332,9 +332,11 @@ pub fn compute_delta( state: &D::State, batch: &mut WorkBatch, ) -> Result<(), ChainError> { - let (epoch, _) = cache.eras.slot_epoch(batch.first_slot()); + // let (epoch, _) = cache.eras.slot_epoch(batch.first_slot()); + let epoch = 1; - let (protocol, _) = cache.eras.protocol_and_era_for_epoch(epoch); + // let (protocol, _) = cache.eras.protocol_and_era_for_epoch(epoch); + let protocol = &1; debug!( from = batch.first_slot(), diff --git a/crates/core/src/archive.rs b/crates/core/src/archive.rs index 9fee53e06..557c4f7f9 100644 --- a/crates/core/src/archive.rs +++ b/crates/core/src/archive.rs @@ -1,12 +1,15 @@ use std::{marker::PhantomData, ops::Range}; -use pallas::{crypto::hash::Hash, ledger::primitives::PlutusData}; +use pallas::{ + crypto::hash::Hash, + ledger::{primitives::PlutusData, traverse::MultiEraTx}, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ state::KEY_SIZE, BlockBody, BlockSlot, BrokenInvariant, ChainPoint, Entity, EntityKey, - EntityValue, EraCbor, Namespace, RawBlock, TxHash, TxOrder, + EntityValue, EraCbor, Namespace, RawBlock, TxHash, TxOrder, TxoRef, }; const TEMPORAL_KEY_SIZE: usize = 8; @@ -295,6 +298,18 @@ pub trait ArchiveStore: Clone + Send + Sync + 'static { fn get_tx(&self, tx_hash: &[u8]) -> Result, ArchiveError>; + fn get_utxo(&self, txo_ref: &TxoRef) -> Result, ArchiveError> { + let Some(tx) = self.get_tx(txo_ref.0.as_ref())? else { + return Ok(None); + }; + let era = tx.era(); + let decoded = MultiEraTx::decode_for_era(tx.era().try_into().unwrap(), tx.cbor())?; + let Some(output) = decoded.output_at(txo_ref.1 as usize) else { + return Ok(None); + }; + Ok(Some(EraCbor(era, output.encode()))) + } + fn get_plutus_data(&self, datum_hash: &Hash<32>) -> Result, ArchiveError>; fn get_slot_for_tx(&self, tx_hash: &[u8]) -> Result, ArchiveError>; diff --git a/crates/core/src/batch.rs b/crates/core/src/batch.rs index 78f7c1850..eae523662 100644 --- a/crates/core/src/batch.rs +++ b/crates/core/src/batch.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::RangeInclusive}; +use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use itertools::Itertools as _; use rayon::prelude::*; @@ -187,6 +187,33 @@ impl WorkBatch { Ok(()) } + pub fn load_utxos_using_archive(&mut self, domain: &D) -> Result<(), DomainError> + where + D: Domain, + { + // TODO: paralelize in chunks + + let all_refs: Vec<_> = self + .blocks + .iter() + .flat_map(|x| x.depends_on(&mut self.utxos)) + .unique() + .collect(); + + let inputs: HashMap<_, _> = all_refs + .into_iter() + .flat_map(|txoref| match domain.archive().get_utxo(&txoref) { + Ok(Some(eracbor)) => Some(Ok((txoref, Arc::new(eracbor)))), + Ok(None) => None, + Err(err) => Some(Err(DomainError::from(err))), + }) + .collect::>()?; + + self.utxos.extend(inputs); + + Ok(()) + } + pub fn decode_utxos(&mut self, chain: &C) -> Result<(), DomainError> { let pairs: Vec<_> = self .utxos diff --git a/crates/core/src/facade.rs b/crates/core/src/facade.rs index 439c944d6..0a3b4948b 100644 --- a/crates/core/src/facade.rs +++ b/crates/core/src/facade.rs @@ -1,9 +1,10 @@ use tracing::{error, info, warn}; use crate::{ - batch::WorkBatch, ArchiveStore, Block as _, BlockSlot, ChainLogic, ChainPoint, Domain, - DomainError, MempoolAwareUtxoStore, MempoolStore, MempoolTx, RawBlock, StateStore, TipEvent, - TxHash, WalStore, WorkUnit, + batch::{WorkBatch, WorkBlock}, + ArchiveStore, Block as _, BlockSlot, ChainLogic, ChainPoint, Domain, DomainError, + MempoolAwareUtxoStore, MempoolStore, MempoolTx, RawBlock, StateStore, TipEvent, TxHash, + WalStore, WorkUnit, }; /// Process a batch of blocks during bulk import operations, skipping the WAL @@ -36,6 +37,22 @@ fn execute_batch( Ok(batch.last_slot()) } +/// Process a batch of blocks during bulk import operations into archive. +/// +/// This will resolve the utxo batch using the same archive and skip all state related steps. +fn execute_batch_into_archive( + chain: &D::Chain, + domain: &D, + batch: &mut WorkBatch, +) -> Result { + batch.load_utxos_using_archive(domain)?; + batch.decode_utxos(chain)?; + chain.compute_delta::(domain.state(), domain.genesis(), batch)?; + batch.commit_archive(domain)?; + + Ok(batch.last_slot()) +} + fn notify_work(domain: &D, work: &WorkUnit) { let WorkUnit::Blocks(batch) = work else { return; @@ -117,6 +134,25 @@ pub async fn import_blocks( Ok(last) } +pub async fn import_blocks_to_archive( + domain: &D, + raw: Vec, +) -> Result { + let mut last = 0; + let mut chain = domain.write_chain().await; + let mut batch = WorkBatch::default(); + + for block in raw { + let decoded = chain.decode_block(block)?; + last = decoded.slot(); + batch.add_work(WorkBlock::new(decoded)); + } + + execute_batch_into_archive(&mut *chain, domain, &mut batch)?; + + Ok(last) +} + pub async fn roll_forward( domain: &D, block: RawBlock, diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 18cc98f8a..2b0b27dc1 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -70,7 +70,7 @@ pub use point::*; pub use state::*; pub use wal::*; -use crate::batch::WorkBatch; +use crate::batch::{WorkBatch, WorkBlock}; #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub struct EraCbor(pub Era, pub Cbor); @@ -544,6 +544,8 @@ pub trait ChainLogic: Sized + Send + Sync { // TODO: remove from the interface fn decode_utxo(&self, utxo: Arc) -> Result; + fn decode_block(&self, raw: Arc) -> Result; + // TODO: remove from the interface fn mutable_slots(domain: &impl Domain) -> BlockSlot; diff --git a/src/bin/dolos/bootstrap/mithril.rs b/src/bin/dolos/bootstrap/mithril.rs index 0d3718b6e..d4029e09f 100644 --- a/src/bin/dolos/bootstrap/mithril.rs +++ b/src/bin/dolos/bootstrap/mithril.rs @@ -40,6 +40,10 @@ pub struct Args { #[arg(long)] start_from: Option, + + /// Only build using archive. + #[arg(long, action)] + only_archive: bool, } impl Default for Args { @@ -53,6 +57,7 @@ impl Default for Args { verbose: Default::default(), chunk_size: 500, start_from: None, + only_archive: false, } } } @@ -222,9 +227,15 @@ async fn import_hardano_into_domain( // around throughout the pipeline let batch: Vec<_> = batch.into_iter().map(Arc::new).collect(); - let last = dolos_core::facade::import_blocks(&domain, batch) - .await - .map_err(|e| miette::miette!(e.to_string()))?; + let last = if args.only_archive { + dolos_core::facade::import_blocks_to_archive(&domain, batch) + .await + .map_err(|e| miette::miette!(e.to_string()))? + } else { + dolos_core::facade::import_blocks(&domain, batch) + .await + .map_err(|e| miette::miette!(e.to_string()))? + }; progress.set_position(last); } From a19560a12a26be843f2c0761c650bd4423ed114d Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 30 Dec 2025 10:38:33 -0300 Subject: [PATCH 2/2] Consider produced utxos in batch for resolution --- crates/cardano/src/owned.rs | 20 ++++++++++++++++++- crates/cardano/src/roll/mod.rs | 36 +++++++++++++++++++--------------- crates/core/src/batch.rs | 12 ++++++++---- crates/core/src/lib.rs | 3 ++- 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/crates/cardano/src/owned.rs b/crates/cardano/src/owned.rs index 6c9e926cc..f3d191b23 100644 --- a/crates/cardano/src/owned.rs +++ b/crates/cardano/src/owned.rs @@ -1,4 +1,4 @@ -use dolos_core::{BlockBody, BlockHash, BlockSlot, EraCbor, RawBlock, RawUtxoMap, TxoRef}; +use dolos_core::{BlockBody, BlockHash, BlockSlot, EraCbor, RawBlock, RawUtxoMap, TxoRef, UtxoMap}; use pallas::ledger::traverse::{MultiEraBlock, MultiEraOutput}; use self_cell::self_cell; use std::sync::Arc; @@ -27,6 +27,24 @@ impl dolos_core::Block for OwnedMultiEraBlock { crate::utxoset::compute_block_dependencies(self.view(), loaded) } + fn produces(&self) -> UtxoMap { + self.view() + .txs() + .iter() + .flat_map(|tx| { + tx.produces() + .iter() + .map(|(at, output)| { + ( + TxoRef(tx.hash(), *at as u32), + Arc::new(EraCbor(tx.era().into(), output.encode())), + ) + }) + .collect::)>>() + }) + .collect() + } + fn slot(&self) -> BlockSlot { self.view().slot() } diff --git a/crates/cardano/src/roll/mod.rs b/crates/cardano/src/roll/mod.rs index e24232584..56e94a203 100644 --- a/crates/cardano/src/roll/mod.rs +++ b/crates/cardano/src/roll/mod.rs @@ -15,7 +15,7 @@ use pallas::{ }, }, }; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, warn}; use crate::{ load_effective_pparams, owned::OwnedMultiEraOutput, roll::proposals::ProposalVisitor, utxoset, @@ -261,14 +261,17 @@ impl<'a> DeltaBuilder<'a> { for input in tx.consumes() { let txoref = TxoRef::from(&input); - let resolved = self.utxos.get(&txoref).ok_or_else(|| { - StateError::InvariantViolation(InvariantViolation::InputNotFound(txoref)) - })?; - - resolved.with_dependent(|_, resolved| { - visit_all!(self, deltas, visit_input, block, &tx, &input, &resolved); - Result::<_, ChainError>::Ok(()) - })?; + match self.utxos.get(&txoref) { + Some(resolved) => { + resolved.with_dependent(|_, resolved| { + visit_all!(self, deltas, visit_input, block, &tx, &input, &resolved); + Result::<_, ChainError>::Ok(()) + })?; + } + None => { + warn!(txoref =? txoref, "failed to resolve input"); + } + } } for (index, output) in tx.produces() { @@ -345,7 +348,8 @@ pub fn compute_delta( "computing delta" ); - let active_params = load_effective_pparams::(state)?; + //let active_params = load_effective_pparams::(state)?; + let active_params = PParamsSet::default(); for block in batch.blocks.iter_mut() { let mut builder = DeltaBuilder::new( @@ -360,12 +364,12 @@ pub fn compute_delta( builder.crawl()?; - // TODO: we treat the UTxO set differently due to tech-debt. We should migrate - // this into the entity system. - let blockd = block.decoded(); - let blockd = blockd.view(); - let utxos = utxoset::compute_apply_delta(blockd, &batch.utxos_decoded)?; - block.utxo_delta = Some(utxos); + // // TODO: we treat the UTxO set differently due to tech-debt. We should migrate + // // this into the entity system. + // let blockd = block.decoded(); + // let blockd = blockd.view(); + // let utxos = utxoset::compute_apply_delta(blockd, &batch.utxos_decoded)?; + // block.utxo_delta = Some(utxos); } Ok(()) diff --git a/crates/core/src/batch.rs b/crates/core/src/batch.rs index eae523662..f67598de4 100644 --- a/crates/core/src/batch.rs +++ b/crates/core/src/batch.rs @@ -6,7 +6,7 @@ use rayon::prelude::*; use crate::{ ArchiveStore, ArchiveWriter as _, Block as _, BlockSlot, ChainLogic, ChainPoint, Domain, DomainError, EntityDelta, EntityMap, LogValue, NsKey, RawBlock, RawUtxoMap, SlotTags, - StateError, StateStore as _, StateWriter as _, TxoRef, UtxoSetDelta, WalStore as _, + StateError, StateStore as _, StateWriter as _, TxoRef, UtxoMap, UtxoSetDelta, WalStore as _, }; #[derive(Debug)] @@ -67,6 +67,10 @@ impl WorkBlock { self.block.depends_on(loaded) } + pub fn produces(&self) -> UtxoMap { + self.block.produces() + } + pub fn point(&self) -> ChainPoint { let decoded = self.decoded(); let slot = decoded.slot(); @@ -191,8 +195,6 @@ impl WorkBatch { where D: Domain, { - // TODO: paralelize in chunks - let all_refs: Vec<_> = self .blocks .iter() @@ -200,11 +202,13 @@ impl WorkBatch { .unique() .collect(); + let produced: UtxoMap = self.blocks.iter().flat_map(|x| x.produces()).collect(); + let inputs: HashMap<_, _> = all_refs .into_iter() .flat_map(|txoref| match domain.archive().get_utxo(&txoref) { Ok(Some(eracbor)) => Some(Ok((txoref, Arc::new(eracbor)))), - Ok(None) => None, + Ok(None) => Ok(produced.get(&txoref).cloned().map(|x| (txoref, x))).transpose(), Err(err) => Some(Err(DomainError::from(err))), }) .collect::>()?; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 2b0b27dc1..a333403fe 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -70,7 +70,7 @@ pub use point::*; pub use state::*; pub use wal::*; -use crate::batch::{WorkBatch, WorkBlock}; +use crate::batch::WorkBatch; #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub struct EraCbor(pub Era, pub Cbor); @@ -379,6 +379,7 @@ pub trait MempoolStore: Clone + Send + Sync + 'static { pub trait Block: Sized + Send + Sync { fn depends_on(&self, loaded: &mut RawUtxoMap) -> Vec; + fn produces(&self) -> UtxoMap; fn slot(&self) -> BlockSlot; fn hash(&self) -> BlockHash; fn raw(&self) -> RawBlock;