From 5a7759c9a4926a2f57991154b733f57a1b32a68b Mon Sep 17 00:00:00 2001 From: Santiago Date: Wed, 18 Mar 2026 08:49:32 -0300 Subject: [PATCH 1/3] fix: catch-up stores during init --- crates/cardano/src/indexes/delta.rs | 95 +++++++++++++++++++++++ crates/cardano/src/lib.rs | 38 ++++++++++ crates/cardano/src/roll/batch.rs | 98 +----------------------- crates/core/src/bootstrap.rs | 114 +++++++++++++++++++++++++++- crates/core/src/lib.rs | 24 ++++++ 5 files changed, 270 insertions(+), 99 deletions(-) diff --git a/crates/cardano/src/indexes/delta.rs b/crates/cardano/src/indexes/delta.rs index babb98807..462ff8898 100644 --- a/crates/cardano/src/indexes/delta.rs +++ b/crates/cardano/src/indexes/delta.rs @@ -246,6 +246,101 @@ impl CardanoIndexDeltaBuilder { .push(Tag::new(archive::METADATA, label.to_be_bytes().to_vec())); } + /// Index all archive entries for a single block. + /// + /// Calls `start_block`, then iterates all transactions adding + /// tx hashes, metadata, inputs (with resolved UTxO lookups), + /// outputs (with script refs), witness scripts/datums, certs, and redeemers. + pub fn index_block( + &mut self, + block: &pallas::ledger::traverse::MultiEraBlock<'_>, + resolved_inputs: &std::collections::HashMap, + ) { + use pallas::ledger::{ + primitives::conway::ScriptRef, + traverse::{ComputeHash as _, OriginalHash as _}, + }; + + self.start_block(block.slot(), block.hash().to_vec(), Some(block.number())); + + for tx in block.txs() { + self.add_tx_hash(tx.hash().to_vec()); + + for (label, _) in tx.metadata().collect::>() { + self.add_metadata_label(label); + } + + for input in tx.inputs() { + self.add_spent_input(&input); + + let txo_ref: TxoRef = (&input).into(); + if let Some(resolved) = resolved_inputs.get(&txo_ref) { + resolved.with_dependent(|_, output| { + if let Ok(addr) = output.address() { + self.add_address(&addr); + } + self.add_assets(&output.value()); + if let Some(datum) = output.datum() { + self.add_datum(&datum); + } + }); + } + } + + for (_, output) in tx.produces() { + if let Ok(addr) = output.address() { + self.add_address(&addr); + } + self.add_assets(&output.value()); + if let Some(datum) = output.datum() { + self.add_datum(&datum); + } + + if let Some(script_ref) = output.script_ref() { + match script_ref { + ScriptRef::NativeScript(script) => { + self.add_script_hash(script.original_hash().to_vec()); + } + ScriptRef::PlutusV1Script(script) => { + self.add_script_hash(script.compute_hash().to_vec()); + } + ScriptRef::PlutusV2Script(script) => { + self.add_script_hash(script.compute_hash().to_vec()); + } + ScriptRef::PlutusV3Script(script) => { + self.add_script_hash(script.compute_hash().to_vec()); + } + } + } + } + + for script in tx.native_scripts() { + self.add_script_hash(script.original_hash().to_vec()); + } + for script in tx.plutus_v1_scripts() { + self.add_script_hash(script.compute_hash().to_vec()); + } + for script in tx.plutus_v2_scripts() { + self.add_script_hash(script.compute_hash().to_vec()); + } + for script in tx.plutus_v3_scripts() { + self.add_script_hash(script.compute_hash().to_vec()); + } + + for datum in tx.plutus_data() { + self.add_datum_hash(datum.original_hash().to_vec()); + } + + for cert in tx.certs() { + self.add_cert(&cert); + } + + for redeemer in tx.redeemers() { + self.add_datum_hash(redeemer.data().compute_hash().to_vec()); + } + } + } + /// Build the final `IndexDelta`. pub fn build(self) -> IndexDelta { self.delta diff --git a/crates/cardano/src/lib.rs b/crates/cardano/src/lib.rs index 133f54f32..d9fa3a72b 100644 --- a/crates/cardano/src/lib.rs +++ b/crates/cardano/src/lib.rs @@ -360,6 +360,44 @@ impl dolos_core::ChainLogic for CardanoLogic { }) } + fn compute_catchup( + block: &dolos_core::Cbor, + inputs: &std::collections::HashMap>, + point: ChainPoint, + ) -> Result { + let block_arc = Arc::new(block.clone()); + let blockd = OwnedMultiEraBlock::decode(block_arc)?; + let blockv = blockd.view(); + + let decoded_inputs: std::collections::HashMap<_, _> = inputs + .iter() + .map(|(k, v)| { + let out = (k.clone(), OwnedMultiEraOutput::decode(v.clone())?); + Result::<_, ChainError>::Ok(out) + }) + .collect::>()?; + + let utxo_delta = crate::utxoset::compute_apply_delta(blockv, &decoded_inputs) + .map_err(ChainError::from)?; + + let mut builder = crate::indexes::CardanoIndexDeltaBuilder::new(point); + + // UTxO filter changes + builder.add_produced_utxos_from_delta(&utxo_delta); + builder.add_consumed_utxos_from_delta(&utxo_delta); + + // Archive indexes (shared logic) + builder.index_block(&blockv, &decoded_inputs); + + let tx_hashes = blockv.txs().iter().map(|tx| tx.hash()).collect(); + + Ok(dolos_core::CatchUpBlockData { + utxo_delta, + index_delta: builder.build(), + tx_hashes, + }) + } + fn decode_utxo(&self, utxo: Arc) -> Result { let out = OwnedMultiEraOutput::decode(utxo)?; diff --git a/crates/cardano/src/roll/batch.rs b/crates/cardano/src/roll/batch.rs index ba96df8bd..0b48ba472 100644 --- a/crates/cardano/src/roll/batch.rs +++ b/crates/cardano/src/roll/batch.rs @@ -14,11 +14,6 @@ use dolos_core::{ NsKey, RawBlock, RawUtxoMap, StateError, StateStore as _, StateWriter as _, TxoRef, UtxoSetDelta, WalStore as _, }; -use pallas::ledger::{ - primitives::conway::ScriptRef, - traverse::{ComputeHash as _, OriginalHash as _}, -}; - use crate::indexes::CardanoIndexDeltaBuilder; use crate::{CardanoDelta, CardanoEntity, CardanoLogic, OwnedMultiEraBlock, OwnedMultiEraOutput}; @@ -325,7 +320,6 @@ impl WorkBatch { let mut builder = CardanoIndexDeltaBuilder::new(self.last_point()); for work_block in self.blocks.iter() { - let point = work_block.point(); let raw = work_block.raw(); // Decode block for tag extraction @@ -333,9 +327,6 @@ impl WorkBatch { continue; }; - // Start archive delta for this block - builder.start_block(point.slot(), block.hash().to_vec(), Some(block.number())); - // Process UTxO delta for filter indexes if let Some(utxo_delta) = &work_block.utxo_delta { // Produced UTxOs @@ -367,93 +358,8 @@ impl WorkBatch { } } - // Process transactions for archive indexes - for tx in block.txs() { - builder.add_tx_hash(tx.hash().to_vec()); - - // Metadata labels - for (label, _) in tx.metadata().collect::>() { - builder.add_metadata_label(label); - } - - // Inputs (spent UTxOs) - for input in tx.inputs() { - builder.add_spent_input(&input); - - // Try to get resolved input for address/asset tags - let txo_ref: TxoRef = (&input).into(); - if let Some(resolved) = self.utxos_decoded.get(&txo_ref) { - resolved.with_dependent(|_, output| { - if let Ok(addr) = output.address() { - builder.add_address(&addr); - } - builder.add_assets(&output.value()); - if let Some(datum) = output.datum() { - builder.add_datum(&datum); - } - }); - } - } - - // Outputs - for (_, output) in tx.produces() { - if let Ok(addr) = output.address() { - builder.add_address(&addr); - } - builder.add_assets(&output.value()); - if let Some(datum) = output.datum() { - builder.add_datum(&datum); - } - - if let Some(script_ref) = output.script_ref() { - match script_ref { - ScriptRef::NativeScript(script) => { - builder.add_script_hash(script.original_hash().to_vec()); - } - ScriptRef::PlutusV1Script(script) => { - builder.add_script_hash(script.compute_hash().to_vec()); - } - ScriptRef::PlutusV2Script(script) => { - builder.add_script_hash(script.compute_hash().to_vec()); - } - ScriptRef::PlutusV3Script(script) => { - builder.add_script_hash(script.compute_hash().to_vec()); - } - } - } - } - - // Witness scripts - { - for script in tx.native_scripts() { - builder.add_script_hash(script.original_hash().to_vec()); - } - for script in tx.plutus_v1_scripts() { - builder.add_script_hash(script.compute_hash().to_vec()); - } - for script in tx.plutus_v2_scripts() { - builder.add_script_hash(script.compute_hash().to_vec()); - } - for script in tx.plutus_v3_scripts() { - builder.add_script_hash(script.compute_hash().to_vec()); - } - } - - // Witness datums - for datum in tx.plutus_data() { - builder.add_datum_hash(datum.original_hash().to_vec()); - } - - // Certificates - for cert in tx.certs() { - builder.add_cert(&cert); - } - - // Redeemers - for redeemer in tx.redeemers() { - builder.add_datum_hash(redeemer.data().compute_hash().to_vec()); - } - } + // Archive indexes (shared logic) + builder.index_block(&block, &self.utxos_decoded); } builder.build() diff --git a/crates/core/src/bootstrap.rs b/crates/core/src/bootstrap.rs index 982cf7010..442082c37 100644 --- a/crates/core/src/bootstrap.rs +++ b/crates/core/src/bootstrap.rs @@ -7,7 +7,8 @@ use tracing::{error, info, warn}; use crate::{ - sync::drain_pending_work, ArchiveStore, ChainPoint, Domain, DomainError, StateStore, WalStore, + sync::drain_pending_work, ArchiveStore, ArchiveWriter as _, ChainLogic, ChainPoint, Domain, + DomainError, IndexStore, IndexWriter as _, StateStore, WalStore, }; /// Extension trait for domain bootstrapping operations. @@ -40,8 +41,7 @@ impl BootstrapExt for D { fn bootstrap(&self) -> Result<(), DomainError> { self.check_integrity()?; - // TODO: we should probably catch up stores here - // catch_up_stores(self)?; + catch_up_stores(self)?; // Drain any work that might have been defined by the initialization // using the sync lifecycle (full WAL + tip notifications) @@ -109,6 +109,114 @@ fn check_archive_in_sync_with_state(domain: &D) -> Result<(), DomainE Ok(()) } +/// Catch up archive and index stores by replaying WAL entries. +/// +/// After `ensure_wal_in_sync_with_state`, the WAL matches the state cursor. +/// If archive or index stores are behind (e.g., crash between state commit +/// and archive/index commit), this function replays the missing WAL entries +/// to bring them back in sync. +fn catch_up_stores(domain: &D) -> Result<(), DomainError> { + let state_cursor = match domain.state().read_cursor()? { + Some(cursor) => cursor, + None => return Ok(()), // nothing to catch up + }; + + catch_up_archive(domain, &state_cursor)?; + catch_up_indexes(domain, &state_cursor)?; + + Ok(()) +} + +/// Catch up archive store by replaying WAL blocks. +fn catch_up_archive( + domain: &D, + state_cursor: &ChainPoint, +) -> Result<(), DomainError> { + let archive_tip = domain.archive().get_tip()?.map(|(slot, _)| slot); + let state_slot = state_cursor.slot(); + + if archive_tip == Some(state_slot) { + return Ok(()); + } + + // Find the WAL start point: if archive has data, start from a point + // corresponding to the archive tip; otherwise start from the beginning. + let start = match archive_tip { + Some(slot) => domain.wal().locate_point(slot)?, + None => None, + }; + + let blocks = domain + .wal() + .iter_blocks(start, Some(state_cursor.clone()))?; + + let writer = domain.archive().start_writer()?; + let mut count = 0u64; + + for (point, block) in blocks { + // Skip the start point itself (already in archive) and anything at or before it + if Some(point.slot()) <= archive_tip { + continue; + } + + writer.apply(&point, &block)?; + count += 1; + } + + if count > 0 { + writer.commit()?; + info!(count, "archive caught up from WAL"); + } + + Ok(()) +} + +/// Catch up index store by replaying WAL log entries. +fn catch_up_indexes( + domain: &D, + state_cursor: &ChainPoint, +) -> Result<(), DomainError> { + let index_cursor = domain.indexes().cursor()?; + + if index_cursor.as_ref() == Some(state_cursor) { + return Ok(()); + } + + let index_slot = index_cursor.as_ref().map(|p| p.slot()); + + // Find the WAL start point from the index cursor + let start = match index_slot { + Some(slot) => domain.wal().locate_point(slot)?, + None => None, + }; + + let logs = domain + .wal() + .iter_logs(start, Some(state_cursor.clone()))?; + + let writer = domain.indexes().start_writer()?; + let mut count = 0u64; + + for (point, log) in logs { + // Skip entries at or before the current index cursor + if Some(point.slot()) <= index_slot { + continue; + } + + let catchup = D::Chain::compute_catchup(&log.block, &log.inputs, point)?; + + writer.apply(&catchup.index_delta)?; + count += 1; + } + + if count > 0 { + writer.commit()?; + info!(count, "indexes caught up from WAL"); + } + + Ok(()) +} + #[cfg(test)] mod tests { // Tests will be added once we have the full integration in place diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 308d99df5..c3637ea21 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -82,6 +82,18 @@ pub struct UndoBlockData { pub index_delta: IndexDelta, pub tx_hashes: Vec, } + +/// Data needed to catch up stores from a WAL entry during recovery. +/// +/// Chain-specific implementations compute this from the raw block CBOR +/// and the resolved inputs stored in the WAL. Used during bootstrap to +/// replay blocks that are in the WAL but not yet applied to indexes. +pub struct CatchUpBlockData { + pub utxo_delta: UtxoSetDelta, + pub index_delta: IndexDelta, + pub tx_hashes: Vec, +} + pub type OutputIdx = u64; pub type UtxoBody = (u16, Cbor); pub type ChainTip = pallas::network::miniprotocols::chainsync::Tip; @@ -504,6 +516,18 @@ pub trait ChainLogic: Sized + Send + Sync { point: ChainPoint, ) -> Result; + /// Compute catch-up data from a WAL entry for recovery. + /// + /// Given the raw block CBOR and the resolved inputs stored in the WAL, + /// computes the UTxO delta, index delta, and transaction hashes needed + /// to replay the block's effects. Used during bootstrap to catch up + /// stores that are behind the state store. + fn compute_catchup( + block: &Cbor, + inputs: &HashMap>, + point: ChainPoint, + ) -> Result; + // TODO: remove from the interface - this is Cardano-specific fn decode_utxo(&self, utxo: Arc) -> Result; From 6b109c51940351e97365ebfec92e15e1a298cffe Mon Sep 17 00:00:00 2001 From: Santiago Date: Wed, 18 Mar 2026 09:14:10 -0300 Subject: [PATCH 2/3] add test coverage --- Cargo.toml | 4 ++ crates/core/src/bootstrap.rs | 5 +- crates/redb3/src/wal/mod.rs | 8 +-- tests/bootstrap.rs | 107 +++++++++++++++++++++++++++++++++++ 4 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 tests/bootstrap.rs diff --git a/Cargo.toml b/Cargo.toml index c97f8c1c9..dff71adee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,6 +135,10 @@ path = "tests/memory.rs" name = "mempool" path = "tests/mempool.rs" +[[test]] +name = "bootstrap" +path = "tests/bootstrap.rs" + [features] strict = ["dolos-cardano/strict"] mithril = ["mithril-client"] diff --git a/crates/core/src/bootstrap.rs b/crates/core/src/bootstrap.rs index 442082c37..46d8e080a 100644 --- a/crates/core/src/bootstrap.rs +++ b/crates/core/src/bootstrap.rs @@ -219,5 +219,8 @@ fn catch_up_indexes( #[cfg(test)] mod tests { - // Tests will be added once we have the full integration in place + // Tests for bootstrap catch-up live in `tests/bootstrap.rs` (workspace-level + // integration test) because they need `ToyDomain` from `dolos-testing`, + // which re-exports `dolos-core` and would create a duplicate-crate conflict + // inside lib-level `#[cfg(test)]`. } diff --git a/crates/redb3/src/wal/mod.rs b/crates/redb3/src/wal/mod.rs index 5e0aeb4c9..a7bb6aa11 100644 --- a/crates/redb3/src/wal/mod.rs +++ b/crates/redb3/src/wal/mod.rs @@ -454,7 +454,7 @@ where let deltas: Vec<_> = range .map_ok(|(k, _)| k.value()) .map_ok(ChainPoint::from) - .map_ok(|point| (target - point.slot(), point)) + .map_ok(|point| (target.abs_diff(point.slot()), point)) .try_collect()?; let point = deltas.into_iter().min_by_key(|(x, _)| *x).map(|(_, v)| v); @@ -590,7 +590,7 @@ where let range = match (start, end) { (Some(start), Some(end)) => table.range(start..=end)?, (Some(start), None) => table.range(start..)?, - (None, Some(end)) => table.range(..end)?, + (None, Some(end)) => table.range(..=end)?, (None, None) => table.range::(..)?, }; @@ -701,8 +701,8 @@ where fn locate_point(&self, around: BlockSlot) -> Result, WalError> { let search = |retry| { let delta = 20 * retry as u64; - let start = around - delta; - let end = around + delta; + let start = around.saturating_sub(delta); + let end = around.saturating_add(delta); start..=end }; diff --git a/tests/bootstrap.rs b/tests/bootstrap.rs new file mode 100644 index 000000000..4f74d7710 --- /dev/null +++ b/tests/bootstrap.rs @@ -0,0 +1,107 @@ +//! Integration test for bootstrap catch-up logic. +//! +//! Exercises the full Cardano pipeline: feeds blocks through the sync +//! lifecycle with partial commits (WAL + state only), then verifies that +//! `bootstrap()` recovers archive and index stores from WAL replay. + +use std::sync::Arc; + +use dolos_core::{ + BootstrapExt, ChainLogic, Domain, IndexStore, StateStore, WorkUnit, +}; +use dolos_testing::{ + synthetic::{build_synthetic_blocks, SyntheticBlockConfig}, + toy_domain::ToyDomain, +}; + +/// Helper: feed blocks into a domain with partial work-unit execution. +/// +/// Runs load → compute → commit_wal → commit_state but **skips** +/// commit_archive and commit_indexes, simulating a crash between +/// the state commit and the archive/index commits. +fn feed_blocks_partial(domain: &ToyDomain, blocks: &[dolos_core::RawBlock]) { + let mut chain = domain.write_chain(); + + for block in blocks { + if !chain.can_receive_block() { + drain_partial(&mut *chain, domain); + } + chain.receive_block(block.clone()).unwrap(); + } + + drain_partial(&mut *chain, domain); +} + +fn drain_partial(chain: &mut dolos_cardano::CardanoLogic, domain: &ToyDomain) { + while let Some(mut work) = + ::pop_work::(chain, domain) + { + WorkUnit::::load(&mut work, domain).unwrap(); + WorkUnit::::compute(&mut work).unwrap(); + WorkUnit::::commit_wal(&mut work, domain).unwrap(); + WorkUnit::::commit_state(&mut work, domain).unwrap(); + // Intentionally skip commit_archive and commit_indexes. + } +} + +#[test] +fn test_catchup_recovers_archive_and_indexes() { + let cfg = SyntheticBlockConfig::default(); + let (blocks, vectors, cardano_config) = build_synthetic_blocks(cfg); + + let genesis = Arc::new(dolos_cardano::include::devnet::load()); + let domain = + ToyDomain::new_with_genesis_and_config(genesis, cardano_config, None, None); + + // Record baseline cursors — all stores are in sync after initial bootstrap. + let baseline_state = domain.state().read_cursor().unwrap(); + let baseline_archive = domain.archive().get_tip().unwrap().map(|(s, _)| s); + let baseline_index = domain.indexes().cursor().unwrap(); + + // Feed synthetic blocks with partial execution (skip archive + indexes). + feed_blocks_partial(&domain, &blocks); + + // State should have advanced. + let state_cursor = domain.state().read_cursor().unwrap().unwrap(); + assert_ne!( + Some(&state_cursor), + baseline_state.as_ref(), + "state should have advanced after feeding blocks" + ); + + // Archive and indexes should still be at the baseline. + let archive_tip = domain.archive().get_tip().unwrap().map(|(s, _)| s); + let index_cursor = domain.indexes().cursor().unwrap(); + assert_eq!(archive_tip, baseline_archive, "archive should not have advanced"); + assert_eq!(index_cursor, baseline_index, "indexes should not have advanced"); + + // --- Run bootstrap (which calls catch_up_stores internally) --- + domain.bootstrap().unwrap(); + + // Archive tip should now match state cursor. + let archive_tip_after = domain.archive().get_tip().unwrap().map(|(s, _)| s); + assert_eq!( + archive_tip_after, + Some(state_cursor.slot()), + "archive tip should match state cursor after catch-up" + ); + + // Index cursor should now match state cursor. + let index_cursor_after = domain.indexes().cursor().unwrap(); + assert_eq!( + index_cursor_after.as_ref(), + Some(&state_cursor), + "index cursor should match state cursor after catch-up" + ); + + // Verify index content: look up a synthetic tx hash to confirm + // compute_catchup produced the correct index delta. + let tx_hash_hex = &vectors.blocks[0].tx_hashes[0]; + let tx_hash_bytes = hex::decode(tx_hash_hex).unwrap(); + let slot = domain.indexes().slot_by_tx_hash(&tx_hash_bytes).unwrap(); + assert!( + slot.is_some(), + "tx hash {} should be found in index after catch-up", + tx_hash_hex + ); +} From 52160edab7ce62114532862e1eb475b258d18a59 Mon Sep 17 00:00:00 2001 From: Santiago Date: Wed, 18 Mar 2026 09:19:10 -0300 Subject: [PATCH 3/3] fix lints --- crates/cardano/src/lib.rs | 2 +- tests/bootstrap.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/cardano/src/lib.rs b/crates/cardano/src/lib.rs index d9fa3a72b..edfab0093 100644 --- a/crates/cardano/src/lib.rs +++ b/crates/cardano/src/lib.rs @@ -387,7 +387,7 @@ impl dolos_core::ChainLogic for CardanoLogic { builder.add_consumed_utxos_from_delta(&utxo_delta); // Archive indexes (shared logic) - builder.index_block(&blockv, &decoded_inputs); + builder.index_block(blockv, &decoded_inputs); let tx_hashes = blockv.txs().iter().map(|tx| tx.hash()).collect(); diff --git a/tests/bootstrap.rs b/tests/bootstrap.rs index 4f74d7710..c0cee7257 100644 --- a/tests/bootstrap.rs +++ b/tests/bootstrap.rs @@ -24,12 +24,12 @@ fn feed_blocks_partial(domain: &ToyDomain, blocks: &[dolos_core::RawBlock]) { for block in blocks { if !chain.can_receive_block() { - drain_partial(&mut *chain, domain); + drain_partial(&mut chain, domain); } chain.receive_block(block.clone()).unwrap(); } - drain_partial(&mut *chain, domain); + drain_partial(&mut chain, domain); } fn drain_partial(chain: &mut dolos_cardano::CardanoLogic, domain: &ToyDomain) {