From 2ea298398e856fb0667cc565f83f69cea5e1ba88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Mon, 23 Mar 2026 16:49:21 +0300 Subject: [PATCH] feat: lazy chain config init via ChainConfigWatch with epoch-boundary refresh --- Cargo.lock | 2 + crates/common/src/chain_config.rs | 27 ++- crates/common/src/errors.rs | 11 + crates/node/src/chain_config.rs | 9 +- crates/node/src/chain_config_watch.rs | 283 ++++++++++++++++++++++ crates/node/src/ledger.rs | 27 --- crates/node/src/lib.rs | 1 + crates/platform/src/main.rs | 2 +- crates/platform/src/server.rs | 14 +- crates/platform/tests/common/mod.rs | 10 +- crates/platform/tests/data_node_test.rs | 6 +- crates/platform/tests/icebreakers_test.rs | 2 +- crates/platform/tests/metrics_test.rs | 2 +- crates/platform/tests/root_test.rs | 2 +- crates/platform/tests/submit_test.rs | 8 +- crates/tx_evaluator/Cargo.toml | 2 + crates/tx_evaluator/src/external.rs | 183 +++++++++----- crates/tx_evaluator/src/native.rs | 5 + 18 files changed, 484 insertions(+), 112 deletions(-) create mode 100644 crates/node/src/chain_config_watch.rs diff --git a/Cargo.lock b/Cargo.lock index 82294e18..ac515f75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -763,6 +763,8 @@ dependencies = [ "pallas-validate", "serde", "serde_json", + "tokio", + "tracing", ] [[package]] diff --git a/crates/common/src/chain_config.rs b/crates/common/src/chain_config.rs index e17c0ce6..e677e70e 100644 --- a/crates/common/src/chain_config.rs +++ b/crates/common/src/chain_config.rs @@ -2,15 +2,31 @@ use crate::helpers::system_start_to_epoch_millis; use pallas_network::miniprotocols::localstate::queries_v16::{CurrentProtocolParam, GenesisConfig}; use serde::Serialize; -/// This structure is used to share server-wide cachables +/// Cached chain configuration queried from the Cardano node at startup and +/// refreshed at epoch boundaries by [`ChainConfigWatch`]. pub struct ChainConfigCache { + /// Shelley genesis configuration (network magic, system start, epoch length, etc.). pub genesis_config: GenesisConfig, + /// Current protocol parameters (fees, execution unit prices, cost models, etc.). + /// May change at epoch boundaries via governance actions. pub protocol_params: CurrentProtocolParam, + /// Slot timing derived from genesis. pub slot_config: SlotConfig, + /// Current Cardano era index (see [`Self::CONWAY_ERA`]). pub era: u16, } impl ChainConfigCache { + /// Conway era index in the Cardano era sequence used by Ouroboros: + /// Byron=0, Shelley=1, Allegra=2, Mary=3, Alonzo=4, Babbage=5, Conway=6. + /// + /// This is an application policy constant, not a runtime-discoverable value. + /// The actual era is queried from the node via `get_current_era()` — this + /// constant defines the minimum era we require. A future era bump will + /// require code changes well beyond this constant (CBOR codecs, protocol + /// params, pallas, testgen-hs), so hardcoding is intentional. + pub const CONWAY_ERA: u16 = 6; + pub fn new( genesis_config: GenesisConfig, protocol_params: CurrentProtocolParam, @@ -21,10 +37,7 @@ impl ChainConfigCache { genesis_config, protocol_params, slot_config, - // The era number in the Cardano era sequence used by Ogmios/testgen-hs: - // Byron=0, Shelley=1, Allegra=2, Mary=3, Alonzo=4, Babbage=5, Conway=6. - // Hardcoded to Conway (6) since that is the only era we currently support. - era: 6, + era: Self::CONWAY_ERA, }) } } @@ -32,9 +45,13 @@ impl ChainConfigCache { #[derive(Debug, PartialEq, Eq, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct SlotConfig { + /// Duration of a single slot in milliseconds (e.g. 1000 for all known networks). pub slot_length: u64, + /// Absolute slot number at the start of the Shelley era (Byron/Shelley transition point). pub zero_slot: u64, + /// Unix timestamp in milliseconds corresponding to `zero_slot`. pub zero_time: u64, + /// Number of slots per epoch. pub epoch_length: u64, } diff --git a/crates/common/src/errors.rs b/crates/common/src/errors.rs index 65d01919..ff6833ab 100644 --- a/crates/common/src/errors.rs +++ b/crates/common/src/errors.rs @@ -228,6 +228,16 @@ impl BlockfrostError { } } + /// error for 503 Service Unavailable + /// returned when a feature is not ready or available + pub fn service_unavailable(message: String) -> Self { + Self { + error: "Service Unavailable".to_string(), + message, + status_code: 503, + } + } + /// This error is converted in middleware to internal_server_error_user pub fn internal_server_error(error: String) -> Self { Self { @@ -269,6 +279,7 @@ impl IntoResponse for BlockfrostError { 404 => StatusCode::NOT_FOUND, 405 => StatusCode::METHOD_NOT_ALLOWED, 500 => StatusCode::INTERNAL_SERVER_ERROR, + 503 => StatusCode::SERVICE_UNAVAILABLE, _ => StatusCode::INTERNAL_SERVER_ERROR, }; diff --git a/crates/node/src/chain_config.rs b/crates/node/src/chain_config.rs index 204e6574..cc253943 100644 --- a/crates/node/src/chain_config.rs +++ b/crates/node/src/chain_config.rs @@ -13,10 +13,9 @@ async fn init_genesis_config( node_pool: NodePool, ) -> Result<(GenesisConfig, CurrentProtocolParam), AppError> { let mut node = node_pool.get().await?; - match node.genesis_config_and_pp().await { - Ok((genesis_config, protocol_params)) => Ok((genesis_config, protocol_params)), - Err(e) => Err(AppError::Server(format!( + node.genesis_config_and_pp().await.map_err(|e| { + AppError::Server(format!( "Could not fetch genesis and protocol parameters. Is the Cardano node running? {e}" - ))), - } + )) + }) } diff --git a/crates/node/src/chain_config_watch.rs b/crates/node/src/chain_config_watch.rs new file mode 100644 index 00000000..c8f0c442 --- /dev/null +++ b/crates/node/src/chain_config_watch.rs @@ -0,0 +1,283 @@ +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use bf_common::chain_config::ChainConfigCache; +use bf_common::errors::BlockfrostError; +use tokio::sync::watch; +use tokio::time; + +use crate::chain_config::init_caches; +use crate::pool::NodePool; + +const CONWAY_ERA: u16 = ChainConfigCache::CONWAY_ERA; +const SYNC_THRESHOLD: f64 = 99.9; +const SYNC_POLL_INTERVAL: Duration = Duration::from_secs(60); +const RETRY_INTERVAL: Duration = Duration::from_secs(60); +/// Buffer after computed epoch boundary before re-querying protocol params. +const EPOCH_BOUNDARY_BUFFER: Duration = Duration::from_secs(30); +/// Error substring returned by `sync_progress()` for non-well-known networks. +const UNSUPPORTED_NETWORK_ERROR: &str = "Only well-known networks"; + +/// Watches the Cardano node for sync readiness and epoch-boundary protocol +/// parameter changes, publishing [`ChainConfigCache`] updates via a +/// `tokio::sync::watch` channel. +#[derive(Clone)] +pub struct ChainConfigWatch { + rx: watch::Receiver>>, +} + +impl ChainConfigWatch { + /// Spawn the background monitor and return immediately. + /// + /// The watch value starts as `None` and is set to `Some(…)` once the node + /// reaches the Conway era with sufficient sync progress. + pub fn spawn(node_pool: NodePool) -> Self { + let (tx, rx) = watch::channel(None); + + tokio::spawn(async move { + monitor_loop(node_pool, tx).await; + }); + + Self { rx } + } + + /// Returns the current config or a 503 if the node is not yet synced. + pub fn get(&self) -> Result, BlockfrostError> { + self.rx.borrow().clone().ok_or_else(|| { + BlockfrostError::service_unavailable( + "Chain configuration is not yet available. The Cardano node may still be syncing." + .to_string(), + ) + }) + } + + /// Wait until the first config is available (node synced and init complete). + pub async fn wait_ready(&mut self) { + while self.rx.borrow().is_none() { + if self.rx.changed().await.is_err() { + break; + } + } + } +} + +/// The main background loop: wait for sync, init, then watch for epoch changes. +async fn monitor_loop(node_pool: NodePool, tx: watch::Sender>>) { + // Phase 1 – wait until the node is synced (or ready enough). + wait_for_sync(&node_pool).await; + + // Phase 2 – first successful init. + let config = init_until_success(&node_pool).await; + let slot_config = config.slot_config.clone(); + let _ = tx.send(Some(Arc::new(config))); + tracing::info!("ChainConfigWatch: chain configuration loaded successfully"); + + // Phase 3 – watch for epoch changes. + loop { + let sleep_dur = duration_until_next_epoch(&slot_config); + tracing::info!( + sleep_secs = sleep_dur.as_secs(), + "ChainConfigWatch: next epoch boundary check scheduled" + ); + time::sleep(sleep_dur).await; + + let new_config = loop { + match init_caches(node_pool.clone()).await { + Ok(c) => break c, + Err(e) => { + tracing::error!( + "ChainConfigWatch: failed to refresh chain config at epoch boundary: {e}. \ + Current config remains active, retrying in {}s. \ + If this persists, check your Cardano node connectivity.", + RETRY_INTERVAL.as_secs() + ); + time::sleep(RETRY_INTERVAL).await; + }, + } + }; + + let params_changed = tx + .borrow() + .as_ref() + .is_none_or(|old| old.protocol_params != new_config.protocol_params); + + if params_changed { + tracing::info!( + "ChainConfigWatch: protocol parameters changed at epoch boundary — reloading chain config" + ); + let _ = tx.send(Some(Arc::new(new_config))); + } else { + tracing::debug!( + "ChainConfigWatch: epoch boundary reached, protocol parameters unchanged — no action needed" + ); + } + } +} + +/// Block until the node reports Conway era and sync progress ≥ threshold. +/// +/// For custom (non-well-known) networks, `sync_progress()` is not available, so +/// we skip the sync check and rely on `init_caches` succeeding. +async fn wait_for_sync(node_pool: &NodePool) { + loop { + match check_sync(node_pool).await { + SyncStatus::Ready => return, + SyncStatus::NotReady(reason) => { + tracing::info!( + "ChainConfigWatch: {reason}, retrying in {}s", + SYNC_POLL_INTERVAL.as_secs() + ); + time::sleep(SYNC_POLL_INTERVAL).await; + }, + SyncStatus::CustomNetwork => { + tracing::info!( + "ChainConfigWatch: custom network detected — skipping sync check, will attempt init directly" + ); + return; + }, + } + } +} + +enum SyncStatus { + Ready, + NotReady(String), + CustomNetwork, +} + +async fn check_sync(node_pool: &NodePool) -> SyncStatus { + let mut node = match node_pool.get().await { + Ok(n) => n, + Err(e) => { + return SyncStatus::NotReady(format!( + "Cannot connect to Cardano node: {e}. \ + Ensure node is running and socket path is correct" + )); + }, + }; + + match node.sync_progress().await { + Ok(info) => { + if info.era != CONWAY_ERA { + SyncStatus::NotReady(format!( + "Node is in era {} (need Conway era {CONWAY_ERA})", + info.era + )) + } else if info.sync_progress < SYNC_THRESHOLD { + SyncStatus::NotReady(format!( + "Node sync progress {:.2}% < {SYNC_THRESHOLD}% threshold", + info.sync_progress + )) + } else { + SyncStatus::Ready + } + }, + Err(e) => { + let msg = e.to_string(); + if msg.contains(UNSUPPORTED_NETWORK_ERROR) { + SyncStatus::CustomNetwork + } else { + SyncStatus::NotReady(format!( + "Cannot query node sync status: {e}. \ + Ensure the Cardano node is running" + )) + } + }, + } +} + +/// Retry `init_caches` until it succeeds. +async fn init_until_success(node_pool: &NodePool) -> ChainConfigCache { + loop { + match init_caches(node_pool.clone()).await { + Ok(config) => return config, + Err(e) => { + tracing::warn!( + "ChainConfigWatch: failed to load chain config from node: {e}. \ + Retrying in {}s.", + RETRY_INTERVAL.as_secs() + ); + time::sleep(RETRY_INTERVAL).await; + }, + } + } +} + +/// Compute how long to sleep until the next epoch boundary (plus buffer). +/// +/// Returns [`RETRY_INTERVAL`] as a fallback if `slot_config` contains zero +/// values that would cause a division-by-zero. +fn duration_until_next_epoch(slot_config: &bf_common::chain_config::SlotConfig) -> Duration { + if slot_config.slot_length == 0 || slot_config.epoch_length == 0 { + return RETRY_INTERVAL; + } + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + // Current slot (may be approximate but close enough for scheduling). + let elapsed_ms = now_ms.saturating_sub(slot_config.zero_time); + let current_slot = slot_config.zero_slot + elapsed_ms / slot_config.slot_length; + + // Slots into the current epoch. + let slot_in_epoch = (current_slot - slot_config.zero_slot) % slot_config.epoch_length; + let slots_until_next = slot_config.epoch_length - slot_in_epoch; + + let ms_until_next = slots_until_next * slot_config.slot_length; + Duration::from_millis(ms_until_next) + EPOCH_BOUNDARY_BUFFER +} + +#[cfg(test)] +mod tests { + use super::*; + use bf_common::chain_config::SlotConfig; + + #[test] + fn test_zero_slot_length_returns_retry_interval() { + let config = SlotConfig { + slot_length: 0, + zero_slot: 0, + zero_time: 0, + epoch_length: 432000, + }; + assert_eq!(duration_until_next_epoch(&config), RETRY_INTERVAL); + } + + #[test] + fn test_zero_epoch_length_returns_retry_interval() { + let config = SlotConfig { + slot_length: 1000, + zero_slot: 0, + zero_time: 0, + epoch_length: 0, + }; + assert_eq!(duration_until_next_epoch(&config), RETRY_INTERVAL); + } + + #[test] + fn test_result_includes_buffer() { + let result = duration_until_next_epoch(&SlotConfig::preview()); + assert!(result > EPOCH_BOUNDARY_BUFFER); + } + + #[test] + fn test_result_at_most_one_epoch() { + let config = SlotConfig::preview(); + let max = + Duration::from_millis(config.epoch_length * config.slot_length) + EPOCH_BOUNDARY_BUFFER; + let result = duration_until_next_epoch(&config); + assert!(result <= max, "result {result:?} exceeds max {max:?}"); + } + + #[test] + fn test_mainnet_config() { + let config = SlotConfig::mainnet(); + let max = + Duration::from_millis(config.epoch_length * config.slot_length) + EPOCH_BOUNDARY_BUFFER; + let result = duration_until_next_epoch(&config); + assert!(result > EPOCH_BOUNDARY_BUFFER); + assert!(result <= max); + } +} diff --git a/crates/node/src/ledger.rs b/crates/node/src/ledger.rs index 8e428f90..e2d5d72b 100644 --- a/crates/node/src/ledger.rs +++ b/crates/node/src/ledger.rs @@ -8,33 +8,6 @@ use super::connection::NodeClient; use bf_common::errors::BlockfrostError; impl NodeClient { - /// Fetches the current protocol parameters from the connected Cardano node. - /// @TODO These values can be cached - pub async fn protocol_params(&mut self) -> Result { - self.with_statequery(|generic_client: &mut localstate::GenericClient| { - Box::pin(async { - let era = localstate::queries_v16::get_current_era(generic_client).await?; - let params = - localstate::queries_v16::get_current_pparams(generic_client, era).await?; - Ok(params) - }) - }) - .await - } - - /// @TODO These values can be cached or read from a genesis file - pub async fn genesis_config(&mut self) -> Result { - self.with_statequery(|generic_client: &mut localstate::GenericClient| { - Box::pin(async { - let era = localstate::queries_v16::get_current_era(generic_client).await?; - let genesis = - localstate::queries_v16::get_genesis_config(generic_client, era).await?; - Ok(genesis) - }) - }) - .await - } - pub async fn genesis_config_and_pp( &mut self, ) -> Result<(GenesisConfig, CurrentProtocolParam), BlockfrostError> { diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d499d72f..cafe7cb5 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -1,5 +1,6 @@ pub mod cbor; pub mod chain_config; +pub mod chain_config_watch; pub mod connection; pub mod ledger; pub mod monitoring; diff --git a/crates/platform/src/main.rs b/crates/platform/src/main.rs index c90226ea..eade00fc 100644 --- a/crates/platform/src/main.rs +++ b/crates/platform/src/main.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), AppError> { env!("GIT_REVISION") ); - let (app, _, health_monitor, icebreakers_api, api_prefix) = + let (app, _, health_monitor, icebreakers_api, api_prefix, _) = build(config.clone().into()).await?; let address = std::net::SocketAddr::new(config.server_address, config.server_port); diff --git a/crates/platform/src/server.rs b/crates/platform/src/server.rs index ff8be2f4..4ecf7bb0 100644 --- a/crates/platform/src/server.rs +++ b/crates/platform/src/server.rs @@ -11,7 +11,7 @@ use bf_common::{ errors::{AppError, BlockfrostError}, }; use bf_data_node::client::DataNode; -use bf_node::{chain_config::init_caches, pool::NodePool}; +use bf_node::{chain_config_watch::ChainConfigWatch, pool::NodePool}; use bf_tx_evaluator::external::ExternalEvaluator; use metrics::{setup_metrics_recorder, spawn_process_collector}; use routes::{hidden::get_hidden_api_routes, nest_routes, regular::get_regular_api_routes}; @@ -32,6 +32,7 @@ pub async fn build( health_monitor::HealthMonitor, Option>, ApiPrefix, + ChainConfigWatch, ), AppError, > { @@ -62,11 +63,11 @@ pub async fn build( // Set up optional Icebreakers API (solitary option in CLI) let icebreakers_api = IcebreakersAPI::new(&config, api_prefix.clone()).await?; - // Initialize chain configurations - let chain_config_cache = init_caches(node_conn_pool.clone()).await?; + // Initialize chain config watcher (lazy — waits for node to sync) + let chain_config_watch = ChainConfigWatch::spawn(node_conn_pool.clone()); - // Initialize the Haskell-based tx evaluator - let tx_evaluator = ExternalEvaluator::spawn(chain_config_cache).await?; + // Initialize the Haskell-based tx evaluator (pull-based — reads config on demand from the cache) + let tx_evaluator = ExternalEvaluator::new(chain_config_watch.clone()); // API routes that are always under / (and also under the UUID prefix, if we use it) let regular_api_routes = get_regular_api_routes(!config.no_metrics); @@ -90,7 +91,7 @@ pub async fn build( .with_state(app_state.clone()) .layer(Extension(health_monitor.clone())) .layer(Extension(node_conn_pool.clone())) - .layer(Extension(tx_evaluator)) + .layer(Extension(tx_evaluator.clone())) .layer(from_fn(error_middleware)) .fallback(BlockfrostError::not_found()); @@ -113,5 +114,6 @@ pub async fn build( health_monitor, icebreakers_api, api_prefix, + chain_config_watch, )) } diff --git a/crates/platform/tests/common/mod.rs b/crates/platform/tests/common/mod.rs index c18dcf1b..334a0eb2 100644 --- a/crates/platform/tests/common/mod.rs +++ b/crates/platform/tests/common/mod.rs @@ -10,6 +10,7 @@ use bf_common::{ config::{Config, DataNodeConfig, IcebreakersConfig, Mode}, types::{LogLevel, Network}, }; +use bf_node::chain_config_watch::ChainConfigWatch; use bf_node::pool::NodePool; use blockfrost::{BlockFrostSettings, BlockfrostAPI}; use blockfrost_platform::{ @@ -31,9 +32,13 @@ pub fn initialize_logging() { let _ = INIT_LOGGING; } +/// Build the app and wait for chain config to be ready (node synced). +/// Use this for tests that need the evaluator or chain config. pub async fn initialize_app() -> Router { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, mut config_watch) = + build_app().await.expect("Failed to build the application"); + config_watch.wait_ready().await; app } @@ -75,6 +80,7 @@ pub async fn build_app() -> Result< health_monitor::HealthMonitor, Option>, ApiPrefix, + ChainConfigWatch, ), AppError, > { @@ -90,6 +96,7 @@ pub async fn build_app_non_solitary() -> Result< health_monitor::HealthMonitor, Option>, ApiPrefix, + ChainConfigWatch, ), AppError, > { @@ -143,6 +150,7 @@ pub async fn build_app_with_data_node( health_monitor::HealthMonitor, Option>, ApiPrefix, + ChainConfigWatch, ), AppError, > { diff --git a/crates/platform/tests/data_node_test.rs b/crates/platform/tests/data_node_test.rs index 192457b5..64c10c75 100644 --- a/crates/platform/tests/data_node_test.rs +++ b/crates/platform/tests/data_node_test.rs @@ -36,7 +36,7 @@ mod tests { initialize_logging(); let mock = MockDataNode::healthy().await; - let (app, _, _, _, _) = build_app_with_data_node(mock.url) + let (app, _, _, _, _, _) = build_app_with_data_node(mock.url) .await .expect("Failed to build the application"); @@ -61,7 +61,7 @@ mod tests { initialize_logging(); let mock = MockDataNode::unhealthy().await; - let (app, _, _, _, _) = build_app_with_data_node(mock.url) + let (app, _, _, _, _, _) = build_app_with_data_node(mock.url) .await .expect("Failed to build the application"); @@ -83,7 +83,7 @@ mod tests { initialize_logging(); let mock = MockDataNode::unreachable(); - let (app, _, _, _, _) = build_app_with_data_node(mock.url) + let (app, _, _, _, _, _) = build_app_with_data_node(mock.url) .await .expect("Failed to build the application"); diff --git a/crates/platform/tests/icebreakers_test.rs b/crates/platform/tests/icebreakers_test.rs index 4e15f1df..c256b2f8 100644 --- a/crates/platform/tests/icebreakers_test.rs +++ b/crates/platform/tests/icebreakers_test.rs @@ -19,7 +19,7 @@ mod tests { async fn test_icebreakers_registrations() -> Result<(), BlockfrostError> { initialize_logging(); - let (app, _, _, icebreakers_api, api_prefix) = build_app_non_solitary() + let (app, _, _, icebreakers_api, api_prefix, _) = build_app_non_solitary() .await .expect("Failed to build the application"); diff --git a/crates/platform/tests/metrics_test.rs b/crates/platform/tests/metrics_test.rs index 87f5d9bc..038889e1 100644 --- a/crates/platform/tests/metrics_test.rs +++ b/crates/platform/tests/metrics_test.rs @@ -15,7 +15,7 @@ mod tests { async fn test_route_metrics() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); // Test without trailing slash let response = app diff --git a/crates/platform/tests/root_test.rs b/crates/platform/tests/root_test.rs index 31f621f3..70e1fb29 100644 --- a/crates/platform/tests/root_test.rs +++ b/crates/platform/tests/root_test.rs @@ -17,7 +17,7 @@ mod tests { async fn test_route_root() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); let response = app .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) diff --git a/crates/platform/tests/submit_test.rs b/crates/platform/tests/submit_test.rs index 584eca82..0a00d62c 100644 --- a/crates/platform/tests/submit_test.rs +++ b/crates/platform/tests/submit_test.rs @@ -19,7 +19,7 @@ mod tests { #[ntest::timeout(120_000)] async fn test_route_submit_cbor_error() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); let tx = "AAAAAA"; @@ -52,7 +52,7 @@ mod tests { #[ntest::timeout(120_000)] async fn test_route_submit_error() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); let tx = "84a300d90102818258205176274bef11d575edd6aa72392aaf993a07f736e70239c1fb22d4b1426b22bc01018282583900ddf1eb9ce2a1561e8f156991486b97873fb6969190cbc99ddcb3816621dcb03574152623414ed354d2d8f50e310f3f2e7d167cb20e5754271a003d09008258390099a5cb0fa8f19aba38cacf8a243d632149129f882df3a8e67f6bd512bcb0cde66a545e9fbc7ca4492f39bca1f4f265cc1503b4f7d6ff205c1b000000024f127a7c021a0002a2ada100d90102818258208b83e59abc9d7a66a77be5e0825525546a595174f8b929f164fcf5052d7aab7b5840709c64556c946abf267edd90b8027343d065193ef816529d8fa7aa2243f1fd2ec27036a677974199e2264cb582d01925134b9a20997d5a734da298df957eb002f5f6"; @@ -96,7 +96,7 @@ mod tests { #[ntest::timeout(120_000)] async fn test_route_submit_agent_dequeu() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); let tx = "84a800848258204c16d304e6d531c59afd87a9199b7bb4175bc131b3d6746917901046b662963c00825820893c3f630c0b2db16d041c388aa0d58746ccbbc44133b2d7a3127a72c79722f1018258200998adb591c872a241776e39fe855e04b2d7c361008e94c582f59b6b6ccc452c028258208380ce7240ba59187f6450911f74a70cf3d2749228badb2e7cd10fb6499355f503018482581d61e15900a9a62a8fb01f936a25bf54af209c7ed1248c4e5abd05ec4e76821a0023ba63a1581ca0028f350aaabe0545fdcb56b039bfb08e4bb4d8c4d7c3c7d481c235a145484f534b5900a300581d71cba5c6770fe7b30ebc1fa32f01938c150513211360ded23ac76e36b301821a006336d5a3581c239075b83c03c2333eacd0b0beac6b8314f11ce3dc0c047012b0cad4a144706f6f6c01581c3547b4325e495d529619335603ababde10025dceafa9ed34b1fb6611a158208b284793d3bd4967244a2ddd68410d56d06d36ac8d201429b937096a2e8234bc1b7ffffffffffade6b581ca0028f350aaabe0545fdcb56b039bfb08e4bb4d8c4d7c3c7d481c235a145484f534b59195e99028201d818583ad8799fd8799f4040ffd8799f581ca0028f350aaabe0545fdcb56b039bfb08e4bb4d8c4d7c3c7d481c23545484f534b59ff1a006336d5195e99ff825839016d06090559d8ed2988aa5b2fff265d668cf552f4f62278c0128f816c0a48432e080280d0d9b15edb65563995f97ce236035afea568e660d1821a00118f32a1581c2f8b2d1f384485896f38406173fa11df2a4ce53b4b0886138b76597aa1476261746368657201825839016d06090559d8ed2988aa5b2fff265d668cf552f4f62278c0128f816c0a48432e080280d0d9b15edb65563995f97ce236035afea568e660d11a06d9f713021a000ab9e00b582027f17979d848d6472896266dd8bf39f7251ca23798713464bc407bf637286c230d81825820cf5de9189b958f8ad64c1f1837c2fa4711d073494598467a1c1a59589393eae20310825839016d06090559d8ed2988aa5b2fff265d668cf552f4f62278c0128f816c0a48432e080280d0d9b15edb65563995f97ce236035afea568e660d11a08666c75111a001016d01282825820bf93dc59c10c19c35210c2414779d7391ca19128cc7b13794ea85af5ff835f59008258201c37df764f8261edce8678b197767668a91d544b2b203fb5d0cf9acc10366e7600a200818258200eabfa083d7969681d2fc8e825a5f79e1c40f03aeac46ecd94bf5c5790db1bc058409a029ddd3cdde65598bb712c640ea63eeebfee526ce49bd0983b4d1fdca858481ddf931bf0354552cc0a7d3365e2f03fdb457c0466cea8b371b645f9b6d0c2010582840001d8799fd8799f011a006336d5195e991b7ffffffffffade6bd8799f1a000539e7ff01ffff821a000b46e41a0a7f3ca4840003d87d80821a002dccfe1a28868be8f5f6"; @@ -131,7 +131,7 @@ mod tests { #[ntest::timeout(120_000)] async fn test_route_submit_success() { initialize_logging(); - let (app, _, _, _, _) = build_app().await.expect("Failed to build the application"); + let (app, _, _, _, _, _) = build_app().await.expect("Failed to build the application"); let blockfrost_client = get_blockfrost_client(); let tx = build_tx(&blockfrost_client).await.unwrap(); diff --git a/crates/tx_evaluator/Cargo.toml b/crates/tx_evaluator/Cargo.toml index 6749b3c8..344821af 100644 --- a/crates/tx_evaluator/Cargo.toml +++ b/crates/tx_evaluator/Cargo.toml @@ -19,6 +19,8 @@ pallas-primitives.workspace = true pallas-addresses.workspace = true pallas-validate.workspace = true chrono.workspace = true +tokio.workspace = true +tracing.workspace = true [features] tarpaulin = [] diff --git a/crates/tx_evaluator/src/external.rs b/crates/tx_evaluator/src/external.rs index 025eba3a..b3232622 100644 --- a/crates/tx_evaluator/src/external.rs +++ b/crates/tx_evaluator/src/external.rs @@ -1,11 +1,17 @@ use std::str::FromStr; +use std::sync::Arc; +use bf_common::{ + chain_config::{ChainConfigCache, SlotConfig}, + errors::{AppError, BlockfrostError}, +}; +use bf_node::chain_config_watch::ChainConfigWatch; use bf_node::pool::NodePool; +use bf_testgen::testgen::{Testgen, TestgenResponse}; use pallas_codec::{ minicbor::to_vec, utils::{AnyUInt, CborWrap}, }; - use pallas_network::miniprotocols::{ localstate::queries_v16::{ DatumOption, PostAlonsoTransactionOutput, TransactionOutput, UTxO, Value as NetworkValue, @@ -16,12 +22,7 @@ use pallas_primitives::Bytes; use pallas_primitives::KeyValuePairs; use pallas_traverse::MultiEraTx; use serde::Serialize; - -use bf_common::{ - chain_config::{ChainConfigCache, SlotConfig}, - errors::{AppError, BlockfrostError}, -}; -use bf_testgen::testgen::{Testgen, TestgenResponse}; +use tokio::sync::Mutex; use crate::{ model::api::{AdditionalUtxoSet, AdditionalUtxoV6}, @@ -32,81 +33,105 @@ use crate::{ wrapper::{wrap_response_v5, wrap_response_v6}, }; +/// Evaluates transactions using the external testgen-hs Haskell binary. +/// +/// The evaluator is pull-based: on each request it reads the current +/// [`ChainConfigCache`] from [`ChainConfigWatch`]. If the config is not yet +/// available (node still syncing), it returns 503. If the config has changed +/// since the last init (e.g. protocol parameter update at an epoch boundary), +/// it re-spawns testgen-hs with the new parameters. +/// +/// testgen-hs does not support reinit, so a process restart is required on +/// config change. #[derive(Clone)] pub struct ExternalEvaluator { - testgen: Testgen, + /// Source of chain configuration (lazy, refreshed at epoch boundaries). + config_watch: ChainConfigWatch, + /// Shared mutable state holding the current testgen-hs process. + state: Arc>, } +struct EvaluatorState { + /// Running testgen-hs process, if initialized. + testgen: Option, + /// The config that was used to init the current testgen process. + /// `Arc::ptr_eq` is used to detect config changes from `ChainConfigWatch`. + last_config: Option>, +} + +/// JSON payload sent to testgen-hs on first line to initialize the evaluator. #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct InitPayload { + /// CBOR-encoded Shelley genesis `system_start`, hex string. system_start: String, + /// CBOR-encoded current protocol parameters, hex string. protocol_params: String, + /// Slot timing configuration for epoch/slot calculations. slot_config: SlotConfig, + /// Cardano era index (see [`ChainConfigCache::CONWAY_ERA`]). era: u16, } +/// JSON payload sent to testgen-hs for each evaluation request. #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct EvalPayload { + /// CBOR-encoded transaction, hex string. tx: String, + /// CBOR-encoded UTxO set (node UTxOs merged with user-provided additional UTxOs), hex string. utxos: String, } -/// Evaluates the given tx with utxos using the external testgen exe, which is a Haskell binary. impl ExternalEvaluator { - /// Spawn testgen with specific command 'evaluate-stream' - pub async fn spawn(config: ChainConfigCache) -> Result { - let testgen = Testgen::spawn("evaluate-stream") - .map_err(|err| AppError::Server(format!("Failed to spawn ExternalEvaluator: {err}")))?; - - let evaluator = Self { testgen }; - evaluator.init(config).await?; - - Ok(evaluator) + pub fn new(config_watch: ChainConfigWatch) -> Self { + Self { + config_watch, + state: Arc::new(Mutex::new(EvaluatorState { + testgen: None, + last_config: None, + })), + } } - /// Sends repetitive data as the first communication so we don't need to send every time. - /// Also makes sure the child process behaves as expected. - async fn init(&self, config: ChainConfigCache) -> Result { - use pallas_codec::minicbor::to_vec; - - let system_start = to_vec(config.genesis_config.system_start).map_err(|err| { - AppError::Server(format!( - "ExternalEvaluator: failed to serialize genesis config: {err}" - )) - })?; + /// Get or init testgen, re-spawning if config changed. Returns 503 if + /// chain config is not yet available. + async fn ensure_testgen(&self) -> Result { + let current_config = self.config_watch.get()?; - let protocol_params = to_vec(config.protocol_params).map_err(|err| { - AppError::Server(format!( - "ExternalEvaluator: failed to serialize protocol params: {err}" - )) - })?; - - let init_payload = InitPayload { - system_start: hex::encode(system_start), - protocol_params: hex::encode(protocol_params), - slot_config: config.slot_config, - era: config.era, - }; + // Fast path: config unchanged, testgen already running. + { + let state = self.state.lock().await; + if let (Some(ref last), Some(ref testgen)) = (&state.last_config, &state.testgen) { + if Arc::ptr_eq(last, ¤t_config) { + return Ok(testgen.clone()); + } + } + } + // Lock released — spawn outside the lock to avoid head-of-line blocking. + tracing::info!("ExternalEvaluator: spawning testgen-hs"); - let payload = serde_json::to_string(&init_payload).map_err(|err| { - AppError::Server(format!( - "ExternalEvaluator: failed to serialize initial payload: {err}" + let testgen = spawn_and_init_testgen(¤t_config).await.map_err(|e| { + tracing::error!("ExternalEvaluator: failed to initialize testgen-hs: {e}"); + BlockfrostError::internal_server_error(format!( + "Failed to initialize ExternalEvaluator: {e}" )) })?; - match self.testgen.send(payload).await { - Ok(response) => match response { - TestgenResponse::Ok(value) => Ok(value), - TestgenResponse::Err(err) => Err(AppError::Server(format!( - "ExternalEvaluator: Failed to initialize: {err}" - ))), - }, - Err(err) => Err(AppError::Server(format!( - "ExternalEvaluator: Failed to initialize: {err}" - ))), + let mut state = self.state.lock().await; + // Another request may have already initialized while we were spawning. + if let (Some(ref last), Some(ref existing)) = (&state.last_config, &state.testgen) { + if Arc::ptr_eq(last, ¤t_config) { + tracing::debug!( + "ExternalEvaluator: testgen-hs already initialized by another request, reusing" + ); + return Ok(existing.clone()); + } } + state.testgen = Some(testgen.clone()); + state.last_config = Some(current_config); + tracing::info!("ExternalEvaluator: testgen-hs initialized successfully"); + Ok(testgen) } pub async fn evaluate_binary_tx( @@ -115,6 +140,7 @@ impl ExternalEvaluator { tx_cbor_binary: &[u8], additional_utxos: Vec<(UTxO, TransactionOutput)>, ) -> Result { + let testgen = self.ensure_testgen().await?; let node = node_pool.get(); /* @@ -125,9 +151,9 @@ impl ExternalEvaluator { Err(err) => // handle pallas decoding error as if it's coming from external binary. { - return Ok(TestgenResponse::Err( - serde_json::to_value(err.to_string()).unwrap(), - )); + return Ok(TestgenResponse::Err(serde_json::Value::String( + err.to_string(), + ))); }, }; @@ -159,7 +185,7 @@ impl ExternalEvaluator { )) })?; - let response = self.testgen.send(json).await.map_err(|err| { + let response = testgen.send(json).await.map_err(|err| { BlockfrostError::internal_server_error(format!( "ExternalEvaluator: Failed to send payload: {err}" )) @@ -266,3 +292,46 @@ impl ExternalEvaluator { Ok(wrap_response_v6(response, serde_json::Value::Null)) } } + +/// Spawn a fresh testgen-hs process and send the init payload. +async fn spawn_and_init_testgen(config: &ChainConfigCache) -> Result { + let testgen = Testgen::spawn("evaluate-stream") + .map_err(|err| AppError::Server(format!("Failed to spawn ExternalEvaluator: {err}")))?; + + let system_start = to_vec(&config.genesis_config.system_start).map_err(|err| { + AppError::Server(format!( + "ExternalEvaluator: failed to serialize genesis config: {err}" + )) + })?; + + let protocol_params = to_vec(&config.protocol_params).map_err(|err| { + AppError::Server(format!( + "ExternalEvaluator: failed to serialize protocol params: {err}" + )) + })?; + + let init_payload = InitPayload { + system_start: hex::encode(system_start), + protocol_params: hex::encode(protocol_params), + slot_config: config.slot_config.clone(), + era: config.era, + }; + + let payload = serde_json::to_string(&init_payload).map_err(|err| { + AppError::Server(format!( + "ExternalEvaluator: failed to serialize initial payload: {err}" + )) + })?; + + match testgen.send(payload).await { + Ok(response) => match response { + TestgenResponse::Ok(_) => Ok(testgen), + TestgenResponse::Err(err) => Err(AppError::Server(format!( + "ExternalEvaluator: Failed to initialize: {err}" + ))), + }, + Err(err) => Err(AppError::Server(format!( + "ExternalEvaluator: Failed to initialize: {err}" + ))), + } +} diff --git a/crates/tx_evaluator/src/native.rs b/crates/tx_evaluator/src/native.rs index 54c449a6..749996b3 100644 --- a/crates/tx_evaluator/src/native.rs +++ b/crates/tx_evaluator/src/native.rs @@ -48,6 +48,11 @@ use crate::model::api::AdditionalUtxoSet; use crate::model::api::Value; use crate::model::api::ValueV6; +// TODO: This evaluator fetches genesis_config_and_pp() from the node on every +// request. It should use ChainConfigWatch to read cached chain config instead. +// Deferred because this native evaluator is currently unused (pallas-validate +// results differ from the Haskell ledger / Ogmios). + //* This implementation uses pallas validate. // Since pallas validate behaves differently from the ogmios validation (which uses ledger) // this implementation is not used at the moment */