diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cedd8c0e..825614f17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,8 @@ - [BREAKING] Modified `TransactionHeader` serialization to allow converting back into the native type after serialization ([#1759](https://github.com/0xMiden/node/issues/1759)). - Removed `chain_tip` requirement from mempool subscription request ([#1771](https://github.com/0xMiden/node/pull/1771)). - Moved bootstrap procedure to `miden-node validator bootstrap` command ([#1764](https://github.com/0xMiden/node/pull/1764)). +- NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)). + ### Fixes diff --git a/bin/node/src/commands/mod.rs b/bin/node/src/commands/mod.rs index 25c0ddf23..a1b6f8be5 100644 --- a/bin/node/src/commands/mod.rs +++ b/bin/node/src/commands/mod.rs @@ -184,6 +184,16 @@ pub struct NtxBuilderConfig { )] pub idle_timeout: Duration, + /// Maximum number of crashes before an account deactivated. + /// + /// Once this limit is reached, no new transactions will be created for this account. + #[arg( + long = "ntx-builder.max-account-crashes", + default_value_t = 10, + value_name = "NUM" + )] + pub max_account_crashes: usize, + /// Directory for the ntx-builder's persistent database. /// /// If not set, defaults to the node's data directory. @@ -215,6 +225,7 @@ impl NtxBuilderConfig { .with_tx_prover_url(self.tx_prover_url) .with_script_cache_size(self.script_cache_size) .with_idle_timeout(self.idle_timeout) + .with_max_account_crashes(self.max_account_crashes) } } diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index 46f090f3c..9dab9c8db 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -5,6 +5,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use candidate::TransactionCandidate; use futures::FutureExt; use miden_node_proto::clients::{Builder, ValidatorClient}; @@ -17,7 +18,7 @@ use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; use miden_protocol::transaction::TransactionId; use miden_remote_prover_client::RemoteTransactionProver; -use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc}; +use tokio::sync::{Notify, RwLock, Semaphore, mpsc}; use tokio_util::sync::CancellationToken; use url::Url; @@ -44,27 +45,6 @@ pub enum ActorRequest { CacheNoteScript { script_root: Word, script: NoteScript }, } -// ACTOR SHUTDOWN REASON -// ================================================================================================ - -/// The reason an actor has shut down. -pub enum ActorShutdownReason { - /// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore. - SemaphoreFailed(AcquireError), - /// Occurs when an account actor detects its corresponding cancellation token has been triggered - /// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate - /// graceful shutdown of actors. - Cancelled(NetworkAccountId), - /// Occurs when the actor encounters a database error it cannot recover from. - DbError(NetworkAccountId, miden_node_db::DatabaseError), - /// Occurs when an account actor detects that its account has been removed from the database - /// (e.g. due to a reverted account creation). - AccountRemoved(NetworkAccountId), - /// Occurs when the actor has been idle for longer than the idle timeout and the builder - /// has confirmed there are no available notes in the DB. - IdleTimeout(NetworkAccountId), -} - // ACCOUNT ACTOR CONFIG // ================================================================================================ @@ -98,6 +78,43 @@ pub struct AccountActorContext { pub request_tx: mpsc::Sender, } +#[cfg(test)] +impl AccountActorContext { + /// Creates a minimal `AccountActorContext` suitable for unit tests. + /// + /// The URLs are fake and actors spawned with this context will fail on their first gRPC call, + /// but this is sufficient for testing coordinator logic (registry, deactivation, etc.). + pub fn test(db: &crate::db::Db) -> Self { + use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr}; + use tokio::sync::RwLock; + use url::Url; + + use crate::chain_state::ChainState; + use crate::clients::StoreClient; + use crate::test_utils::mock_block_header; + + let url = Url::parse("http://127.0.0.1:1").unwrap(); + let block_header = mock_block_header(0_u32.into()); + let chain_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap()); + let chain_state = Arc::new(RwLock::new(ChainState::new(block_header, chain_mmr))); + let (request_tx, _request_rx) = mpsc::channel(1); + + Self { + block_producer_url: url.clone(), + validator_url: url.clone(), + tx_prover_url: None, + chain_state, + store: StoreClient::new(url), + script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()), + max_notes_per_tx: NonZeroUsize::new(1).unwrap(), + max_note_attempts: 1, + idle_timeout: Duration::from_secs(60), + db: db.clone(), + request_tx, + } + } +} + // ACCOUNT ORIGIN // ================================================================================================ @@ -239,9 +256,13 @@ impl AccountActor { } } - /// Runs the account actor, processing events and managing state until a reason to shutdown is - /// encountered. - pub async fn run(mut self, semaphore: Arc) -> Result<(), ActorShutdownReason> { + /// Runs the account actor, processing events and managing state until shutdown. + /// + /// The return value signals the shutdown category to the coordinator: + /// + /// - `Ok(())`: intentional shutdown (idle timeout, cancellation, or account removal). + /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). + pub async fn run(mut self, semaphore: Arc) -> anyhow::Result<()> { let account_id = self.origin.id(); // Determine initial mode by checking DB for available notes. @@ -250,10 +271,7 @@ impl AccountActor { .db .has_available_notes(account_id, block_num, self.max_note_attempts) .await - .map_err(|err| { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check for available notes"); - ActorShutdownReason::DbError(account_id, err) - })?; + .context("failed to check for available notes")?; if has_notes { self.mode = ActorMode::NotesAvailable; @@ -279,7 +297,7 @@ impl AccountActor { tokio::select! { _ = self.cancel_token.cancelled() => { - return Err(ActorShutdownReason::Cancelled(account_id)); + return Ok(()); } // Handle coordinator notifications. On notification, re-evaluate state from DB. _ = self.notify.notified() => { @@ -290,12 +308,7 @@ impl AccountActor { .db .transaction_exists(awaited_id) .await - .inspect_err(|err| { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check transaction status"); - }) - .map_err(|err| { - ActorShutdownReason::DbError(account_id, err) - })?; + .context("failed to check transaction status")?; if exists { self.mode = ActorMode::NotesAvailable; } @@ -307,7 +320,7 @@ impl AccountActor { }, // Execute transactions. permit = tx_permit_acquisition => { - let _permit = permit.map_err(ActorShutdownReason::SemaphoreFailed)?; + let _permit = permit.context("semaphore closed")?; // Read the chain state. let chain_state = self.chain_state.read().await.clone(); @@ -327,7 +340,8 @@ impl AccountActor { } // Idle timeout: actor has been idle too long, deactivate account. _ = idle_timeout_sleep => { - return Err(ActorShutdownReason::IdleTimeout(account_id)); + tracing::info!(%account_id, "Account actor deactivated due to idle timeout"); + return Ok(()); } } } @@ -338,7 +352,7 @@ impl AccountActor { &self, account_id: NetworkAccountId, chain_state: ChainState, - ) -> Result, ActorShutdownReason> { + ) -> anyhow::Result> { let block_num = chain_state.chain_tip_header.block_num(); let max_notes = self.max_notes_per_tx.get(); @@ -346,13 +360,11 @@ impl AccountActor { .db .select_candidate(account_id, block_num, self.max_note_attempts) .await - .map_err(|err| { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to query DB for transaction candidate"); - ActorShutdownReason::DbError(account_id, err) - })?; + .context("failed to query DB for transaction candidate")?; let Some(account) = latest_account else { - return Err(ActorShutdownReason::AccountRemoved(account_id)); + tracing::info!(account_id = %account_id, "Account no longer exists in DB"); + return Ok(None); }; let notes: Vec<_> = notes.into_iter().take(max_notes).collect(); diff --git a/crates/ntx-builder/src/coordinator.rs b/crates/ntx-builder/src/coordinator.rs index 0188db74e..87aa5edcd 100644 --- a/crates/ntx-builder/src/coordinator.rs +++ b/crates/ntx-builder/src/coordinator.rs @@ -1,17 +1,15 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::Context; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_node_proto::domain::mempool::MempoolEvent; -use miden_node_utils::ErrorReport; use miden_protocol::account::delta::AccountUpdateDetails; use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use crate::actor::{AccountActor, AccountActorContext, AccountOrigin, ActorShutdownReason}; +use crate::actor::{AccountActor, AccountActorContext, AccountOrigin}; use crate::db::Db; // WRITE EVENT RESULT @@ -91,7 +89,7 @@ pub struct Coordinator { /// This join set allows the coordinator to wait for actor task completion and handle /// different shutdown scenarios. When an actor task completes (either successfully or /// due to an error), the corresponding entry is removed from the actor registry. - actor_join_set: JoinSet, + actor_join_set: JoinSet<(NetworkAccountId, anyhow::Result<()>)>, /// Semaphore for controlling the maximum number of concurrent transactions across all network /// accounts. @@ -104,16 +102,29 @@ pub struct Coordinator { /// Database for persistent state. db: Db, + + /// Tracks the number of crashes per account actor. + /// + /// When an actor shuts down due to a DB error, its crash count is incremented. Once + /// the count reaches `max_account_crashes`, the account is deactivated and no new actor + /// will be spawned for it. + crash_counts: HashMap, + + /// Maximum number of crashes an account actor is allowed before being deactivated. + max_account_crashes: usize, } impl Coordinator { - /// Creates a new coordinator with the specified maximum number of inflight transactions. - pub fn new(max_inflight_transactions: usize, db: Db) -> Self { + /// Creates a new coordinator with the specified maximum number of inflight transactions + /// and the crash threshold for account deactivation. + pub fn new(max_inflight_transactions: usize, max_account_crashes: usize, db: Db) -> Self { Self { actor_registry: HashMap::new(), actor_join_set: JoinSet::new(), semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), db, + crash_counts: HashMap::new(), + max_account_crashes, } } @@ -126,6 +137,18 @@ impl Coordinator { pub fn spawn_actor(&mut self, origin: AccountOrigin, actor_context: &AccountActorContext) { let account_id = origin.id(); + // Skip spawning if the account has been deactivated due to repeated crashes. + if let Some(&count) = self.crash_counts.get(&account_id) { + if count >= self.max_account_crashes { + tracing::warn!( + account.id = %account_id, + crash_count = count, + "Account deactivated due to repeated crashes, skipping actor spawn" + ); + return; + } + } + // If an actor already exists for this account ID, something has gone wrong. if let Some(handle) = self.actor_registry.remove(&account_id) { tracing::error!( @@ -142,10 +165,8 @@ impl Coordinator { // Run the actor. Actor reads state from DB on startup. let semaphore = self.semaphore.clone(); - self.actor_join_set.spawn(Box::pin(async move { - // The actor loop runs indefinitely, it only exits via Err with a shutdown reason. - actor.run(semaphore).await.expect_err("actor loop runs indefinitely") - })); + self.actor_join_set + .spawn(Box::pin(async move { (account_id, actor.run(semaphore).await) })); self.actor_registry.insert(account_id, handle); tracing::info!(account_id = %account_id, "Created actor for account prefix"); @@ -164,7 +185,7 @@ impl Coordinator { } } - /// Waits for the next actor to complete and processes the shutdown reason. + /// Waits for the next actor to complete and handles the outcome. /// /// This method monitors the join set for actor task completion and handles /// different shutdown scenarios appropriately. It's designed to be called @@ -173,43 +194,34 @@ impl Coordinator { /// If no actors are currently running, this method will wait indefinitely until /// new actors are spawned. This prevents busy-waiting when the coordinator is idle. /// - /// Returns `Some(account_id)` if a timed-out actor should be respawned (because a - /// notification arrived just as it timed out), or `None` otherwise. + /// Returns `Some(account_id)` if an actor should be respawned (because a + /// notification arrived just as it shut down), or `None` otherwise. pub async fn next(&mut self) -> anyhow::Result> { let actor_result = self.actor_join_set.join_next().await; match actor_result { - Some(Ok(shutdown_reason)) => match shutdown_reason { - ActorShutdownReason::Cancelled(account_id) => { - // Do not remove the actor from the registry, as it may be re-spawned. - // The coordinator should always remove actors immediately after cancellation. - tracing::info!(account_id = %account_id, "Account actor cancelled"); - Ok(None) - }, - ActorShutdownReason::SemaphoreFailed(err) => Err(err).context("semaphore failed"), - ActorShutdownReason::DbError(account_id, err) => { - tracing::error!(account_id = %account_id, err = err.as_report(), "Account actor shut down due to DB error"); - self.actor_registry.remove(&account_id); - Ok(None) - }, - ActorShutdownReason::AccountRemoved(account_id) => { - self.actor_registry.remove(&account_id); - tracing::info!(account_id = %account_id, "Account actor shut down: account removed"); - Ok(None) - }, - ActorShutdownReason::IdleTimeout(account_id) => { - tracing::info!(account_id = %account_id, "Account actor shut down due to idle timeout"); - - // Remove the actor from the registry, but check if a notification arrived - // just as the actor timed out. If so, the caller should respawn it. - let should_respawn = - self.actor_registry.remove(&account_id).is_some_and(|handle| { - let notified = handle.notify.notified(); - tokio::pin!(notified); - notified.enable() - }); - - Ok(should_respawn.then_some(account_id)) - }, + Some(Ok((account_id, Ok(())))) => { + // Actor shut down intentionally (idle timeout, cancelled, account removed). + // Remove from registry and check if a notification arrived just as it shut + // down. If so, the caller should respawn it. + let should_respawn = + self.actor_registry.remove(&account_id).is_some_and(|handle| { + let notified = handle.notify.notified(); + tokio::pin!(notified); + notified.enable() + }); + + Ok(should_respawn.then_some(account_id)) + }, + Some(Ok((account_id, Err(err)))) => { + // Actor crashed. Increment crash counter. + let count = self.crash_counts.entry(account_id).or_insert(0); + *count += 1; + tracing::error!( + account.id = %account_id, + "Account actor crashed: {err:#}" + ); + self.actor_registry.remove(&account_id); + Ok(None) }, Some(Err(err)) => { tracing::error!(err = %err, "actor task failed"); @@ -318,20 +330,26 @@ impl Coordinator { } } +#[cfg(test)] +impl Coordinator { + /// Creates a coordinator with default settings backed by a temp DB. + pub async fn test() -> (Self, tempfile::TempDir) { + let (db, dir) = Db::test_setup().await; + (Self::new(4, 10, db), dir) + } +} + #[cfg(test)] mod tests { + use std::sync::Arc; + use miden_node_proto::domain::mempool::MempoolEvent; use super::*; + use crate::actor::{AccountActorContext, AccountOrigin}; use crate::db::Db; use crate::test_utils::*; - /// Creates a coordinator with default settings backed by a temp DB. - async fn test_coordinator() -> (Coordinator, tempfile::TempDir) { - let (db, dir) = Db::test_setup().await; - (Coordinator::new(4, db), dir) - } - /// Registers a dummy actor handle (no real actor task) in the coordinator's registry. fn register_dummy_actor(coordinator: &mut Coordinator, account_id: NetworkAccountId) { let notify = Arc::new(Notify::new()); @@ -346,7 +364,7 @@ mod tests { #[tokio::test] async fn send_targeted_returns_inactive_targets() { - let (mut coordinator, _dir) = test_coordinator().await; + let (mut coordinator, _dir) = Coordinator::test().await; let active_id = mock_network_account_id(); let inactive_id = mock_network_account_id_seeded(42); @@ -369,4 +387,47 @@ mod tests { assert_eq!(inactive_targets.len(), 1); assert_eq!(inactive_targets[0], inactive_id); } + + // DEACTIVATED ACCOUNTS + // ============================================================================================ + + #[tokio::test] + async fn spawn_actor_skips_deactivated_account() { + let (db, _dir) = Db::test_setup().await; + let max_crashes = 3; + let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); + let actor_context = AccountActorContext::test(&db); + + let account_id = mock_network_account_id(); + + // Simulate the account having reached the crash threshold. + coordinator.crash_counts.insert(account_id, max_crashes); + + coordinator.spawn_actor(AccountOrigin::Store(account_id), &actor_context); + + assert!( + !coordinator.actor_registry.contains_key(&account_id), + "Deactivated account should not have an actor in the registry" + ); + } + + #[tokio::test] + async fn spawn_actor_allows_below_threshold() { + let (db, _dir) = Db::test_setup().await; + let max_crashes = 3; + let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); + let actor_context = AccountActorContext::test(&db); + + let account_id = mock_network_account_id(); + + // Set crash count below the threshold. + coordinator.crash_counts.insert(account_id, max_crashes - 1); + + coordinator.spawn_actor(AccountOrigin::Store(account_id), &actor_context); + + assert!( + coordinator.actor_registry.contains_key(&account_id), + "Account below crash threshold should have an actor in the registry" + ); + } } diff --git a/crates/ntx-builder/src/lib.rs b/crates/ntx-builder/src/lib.rs index fb63bc5be..fc3ab0ae9 100644 --- a/crates/ntx-builder/src/lib.rs +++ b/crates/ntx-builder/src/lib.rs @@ -59,6 +59,9 @@ const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize = /// Default duration after which an idle network account actor will deactivate. const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); +/// Default maximum number of crashes an account actor is allowed before being deactivated. +const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10; + // CONFIGURATION // ================================================================================================= @@ -106,6 +109,11 @@ pub struct NtxBuilderConfig { /// A deactivated account will reactivate if targeted with new notes. pub idle_timeout: Duration, + /// Maximum number of crashes before an account deactivated. + /// + /// Once this limit is reached, no new transactions will be created for this account. + pub max_account_crashes: usize, + /// Path to the SQLite database file used for persistent state. pub database_filepath: PathBuf, } @@ -129,6 +137,7 @@ impl NtxBuilderConfig { max_block_count: DEFAULT_MAX_BLOCK_COUNT, account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY, idle_timeout: DEFAULT_IDLE_TIMEOUT, + max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES, database_filepath, } } @@ -203,6 +212,13 @@ impl NtxBuilderConfig { self } + /// Sets the maximum number of crashes before an account actor is deactivated. + #[must_use] + pub fn with_max_account_crashes(mut self, max: usize) -> Self { + self.max_account_crashes = max; + self + } + /// Builds and initializes the network transaction builder. /// /// This method connects to the store and block producer services, fetches the current @@ -222,7 +238,8 @@ impl NtxBuilderConfig { db.purge_inflight().await.context("failed to purge inflight state")?; let script_cache = LruCache::new(self.script_cache_size); - let coordinator = Coordinator::new(self.max_concurrent_txs, db.clone()); + let coordinator = + Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone()); let store = StoreClient::new(self.store_url.clone()); let block_producer = BlockProducerClient::new(self.block_producer_url.clone()); diff --git a/docs/external/src/operator/architecture.md b/docs/external/src/operator/architecture.md index 674507752..694ae66e7 100644 --- a/docs/external/src/operator/architecture.md +++ b/docs/external/src/operator/architecture.md @@ -58,3 +58,7 @@ Internally, the builder spawns a dedicated actor for each network account that h idle (no notes to consume) for a configurable duration are automatically deactivated to conserve resources, and are re-activated when new notes arrive. The idle timeout can be tuned with the `--ntx-builder.idle-timeout` CLI argument (default: 5 minutes). + +Accounts whose actors crash repeatedly (due to database errors) are automatically deactivated after a configurable +number of failures, preventing resource exhaustion. The threshold can be set with +`--ntx-builder.max-account-crashes` (default: 10). diff --git a/docs/internal/src/ntx-builder.md b/docs/internal/src/ntx-builder.md index a662f7658..f15feb544 100644 --- a/docs/internal/src/ntx-builder.md +++ b/docs/internal/src/ntx-builder.md @@ -51,5 +51,11 @@ argument (default: 5 minutes). Deactivated actors are re-spawned when new notes targeting their account are detected by the coordinator (via the `send_targeted` path). +If an actor repeatedly crashes (shuts down due to a database error), its crash count is tracked by +the coordinator. Once the count reaches the configurable threshold, the account is **deactivated** +and no new actor will be spawned for it. This prevents resource exhaustion from a persistently +failing account. The threshold is configurable via the `--ntx-builder.max-account-crashes` CLI +argument (default: 10). + The block-producer remains blissfully unaware of network transactions. From its perspective a network transaction is simply the same as any other.