Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
- [BREAKING] Modified `TransactionHeader` serialization to allow converting back into the native type after serialization ([#1759](https://github.com/0xMiden/node/issues/1759)).
- Removed `chain_tip` requirement from mempool subscription request ([#1771](https://github.com/0xMiden/node/pull/1771)).
- Moved bootstrap procedure to `miden-node validator bootstrap` command ([#1764](https://github.com/0xMiden/node/pull/1764)).
- NTX Builder now deactivates network accounts which crash repeatedly (configurable via `--ntx-builder.max-account-crashes`, default 10) ([#1712](https://github.com/0xMiden/miden-node/pull/1712)).


### Fixes

Expand Down
11 changes: 11 additions & 0 deletions bin/node/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ pub struct NtxBuilderConfig {
)]
pub idle_timeout: Duration,

/// Maximum number of crashes before an account deactivated.
///
/// Once this limit is reached, no new transactions will be created for this account.
#[arg(
long = "ntx-builder.max-account-crashes",
default_value_t = 10,
value_name = "NUM"
)]
pub max_account_crashes: usize,

/// Directory for the ntx-builder's persistent database.
///
/// If not set, defaults to the node's data directory.
Expand Down Expand Up @@ -215,6 +225,7 @@ impl NtxBuilderConfig {
.with_tx_prover_url(self.tx_prover_url)
.with_script_cache_size(self.script_cache_size)
.with_idle_timeout(self.idle_timeout)
.with_max_account_crashes(self.max_account_crashes)
}
}

Expand Down
100 changes: 56 additions & 44 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use candidate::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::clients::{Builder, ValidatorClient};
Expand All @@ -17,7 +18,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc};
use tokio::sync::{Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;

Expand All @@ -44,27 +45,6 @@ pub enum ActorRequest {
CacheNoteScript { script_root: Word, script: NoteScript },
}

// ACTOR SHUTDOWN REASON
// ================================================================================================

/// The reason an actor has shut down.
pub enum ActorShutdownReason {
/// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore.
SemaphoreFailed(AcquireError),
/// Occurs when an account actor detects its corresponding cancellation token has been triggered
/// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate
/// graceful shutdown of actors.
Cancelled(NetworkAccountId),
/// Occurs when the actor encounters a database error it cannot recover from.
DbError(NetworkAccountId, miden_node_db::DatabaseError),
/// Occurs when an account actor detects that its account has been removed from the database
/// (e.g. due to a reverted account creation).
AccountRemoved(NetworkAccountId),
/// Occurs when the actor has been idle for longer than the idle timeout and the builder
/// has confirmed there are no available notes in the DB.
IdleTimeout(NetworkAccountId),
}

// ACCOUNT ACTOR CONFIG
// ================================================================================================

Expand Down Expand Up @@ -98,6 +78,43 @@ pub struct AccountActorContext {
pub request_tx: mpsc::Sender<ActorRequest>,
}

#[cfg(test)]
impl AccountActorContext {
/// Creates a minimal `AccountActorContext` suitable for unit tests.
///
/// The URLs are fake and actors spawned with this context will fail on their first gRPC call,
/// but this is sufficient for testing coordinator logic (registry, deactivation, etc.).
pub fn test(db: &crate::db::Db) -> Self {
use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr};
use tokio::sync::RwLock;
use url::Url;

use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::test_utils::mock_block_header;

let url = Url::parse("http://127.0.0.1:1").unwrap();
let block_header = mock_block_header(0_u32.into());
let chain_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap());
let chain_state = Arc::new(RwLock::new(ChainState::new(block_header, chain_mmr)));
let (request_tx, _request_rx) = mpsc::channel(1);

Self {
block_producer_url: url.clone(),
validator_url: url.clone(),
tx_prover_url: None,
chain_state,
store: StoreClient::new(url),
script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()),
max_notes_per_tx: NonZeroUsize::new(1).unwrap(),
max_note_attempts: 1,
idle_timeout: Duration::from_secs(60),
db: db.clone(),
request_tx,
}
}
}

// ACCOUNT ORIGIN
// ================================================================================================

Expand Down Expand Up @@ -239,9 +256,13 @@ impl AccountActor {
}
}

/// Runs the account actor, processing events and managing state until a reason to shutdown is
/// encountered.
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> Result<(), ActorShutdownReason> {
/// Runs the account actor, processing events and managing state until shutdown.
///
/// The return value signals the shutdown category to the coordinator:
///
/// - `Ok(())`: intentional shutdown (idle timeout, cancellation, or account removal).
/// - `Err(_)`: crash (database error, semaphore failure, or any other bug).
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
let account_id = self.origin.id();

// Determine initial mode by checking DB for available notes.
Expand All @@ -250,10 +271,7 @@ impl AccountActor {
.db
.has_available_notes(account_id, block_num, self.max_note_attempts)
.await
.map_err(|err| {
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check for available notes");
ActorShutdownReason::DbError(account_id, err)
})?;
.context("failed to check for available notes")?;

if has_notes {
self.mode = ActorMode::NotesAvailable;
Expand All @@ -279,7 +297,7 @@ impl AccountActor {

tokio::select! {
_ = self.cancel_token.cancelled() => {
return Err(ActorShutdownReason::Cancelled(account_id));
return Ok(());
}
// Handle coordinator notifications. On notification, re-evaluate state from DB.
_ = self.notify.notified() => {
Expand All @@ -290,12 +308,7 @@ impl AccountActor {
.db
.transaction_exists(awaited_id)
.await
.inspect_err(|err| {
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check transaction status");
})
.map_err(|err| {
ActorShutdownReason::DbError(account_id, err)
})?;
.context("failed to check transaction status")?;
if exists {
self.mode = ActorMode::NotesAvailable;
}
Expand All @@ -307,7 +320,7 @@ impl AccountActor {
},
// Execute transactions.
permit = tx_permit_acquisition => {
let _permit = permit.map_err(ActorShutdownReason::SemaphoreFailed)?;
let _permit = permit.context("semaphore closed")?;

// Read the chain state.
let chain_state = self.chain_state.read().await.clone();
Expand All @@ -327,7 +340,8 @@ impl AccountActor {
}
// Idle timeout: actor has been idle too long, deactivate account.
_ = idle_timeout_sleep => {
return Err(ActorShutdownReason::IdleTimeout(account_id));
tracing::info!(%account_id, "Account actor deactivated due to idle timeout");
return Ok(());
}
}
}
Expand All @@ -338,21 +352,19 @@ impl AccountActor {
&self,
account_id: NetworkAccountId,
chain_state: ChainState,
) -> Result<Option<TransactionCandidate>, ActorShutdownReason> {
) -> anyhow::Result<Option<TransactionCandidate>> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.max_notes_per_tx.get();

let (latest_account, notes) = self
.db
.select_candidate(account_id, block_num, self.max_note_attempts)
.await
.map_err(|err| {
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to query DB for transaction candidate");
ActorShutdownReason::DbError(account_id, err)
})?;
.context("failed to query DB for transaction candidate")?;

let Some(account) = latest_account else {
return Err(ActorShutdownReason::AccountRemoved(account_id));
tracing::info!(account_id = %account_id, "Account no longer exists in DB");
return Ok(None);
};

let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
Expand Down
Loading
Loading