Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
- [BREAKING] Reworked `miden-remote-prover`, removing the `worker`/`proxy` distinction and simplifying to a `worker` with a request queue ([#1688](https://github.com/0xMiden/node/pull/1688)).
- [BREAKING] Renamed `NoteRoot` protobuf message used in `GetNoteScriptByRoot` gRPC endpoints into `NoteScriptRoot` ([#1722](https://github.com/0xMiden/node/pull/1722)).
- NTX Builder actors now deactivate after being idle for a configurable idle timeout (`--ntx-builder.idle-timeout`, default 5 min) and are re-activated when new notes target their account ([#1705](https://github.com/0xMiden/node/pull/1705)).
- [BREAKING] Modified `TransactionHeader` serialization to allow converting back into the native type after serialization ([#1759](https://github.com/0xMiden/node/issues/1759)).
- Removed `chain_tip` requirement from mempool subscription request ([#1771](https://github.com/0xMiden/node/pull/1771)).
- Moved bootstrap procedure to `miden-node validator bootstrap` command ([#1764](https://github.com/0xMiden/node/pull/1764)).
Expand Down
14 changes: 14 additions & 0 deletions bin/node/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const ENV_VALIDATOR_KMS_KEY_ID: &str = "MIDEN_NODE_VALIDATOR_KMS_KEY_ID";
const ENV_NTX_DATA_DIRECTORY: &str = "MIDEN_NODE_NTX_DATA_DIRECTORY";

const DEFAULT_NTX_TICKER_INTERVAL: Duration = Duration::from_millis(200);
const DEFAULT_NTX_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
const DEFAULT_NTX_SCRIPT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap();

/// Configuration for the Validator key used to sign blocks.
Expand Down Expand Up @@ -171,6 +172,18 @@ pub struct NtxBuilderConfig {
)]
pub script_cache_size: NonZeroUsize,

/// Duration after which an idle network account will deactivate.
///
/// An account is considered idle once it has no viable notes to consume.
/// A deactivated account will reactivate if targeted with new notes.
#[arg(
long = "ntx-builder.idle-timeout",
default_value = &duration_to_human_readable_string(DEFAULT_NTX_IDLE_TIMEOUT),
value_parser = humantime::parse_duration,
value_name = "DURATION"
)]
pub idle_timeout: Duration,

/// Directory for the ntx-builder's persistent database.
///
/// If not set, defaults to the node's data directory.
Expand Down Expand Up @@ -201,6 +214,7 @@ impl NtxBuilderConfig {
)
.with_tx_prover_url(self.tx_prover_url)
.with_script_cache_size(self.script_cache_size)
.with_idle_timeout(self.idle_timeout)
}
}

Expand Down
3 changes: 1 addition & 2 deletions bin/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub struct Cli {
pub command: Command,
}

#[expect(clippy::large_enum_variant)]
#[derive(Subcommand)]
pub enum Command {
/// Commands related to the node's store component.
Expand All @@ -40,7 +39,7 @@ pub enum Command {
///
/// This is the recommended way to run the node at the moment.
#[command(subcommand)]
Bundled(commands::bundled::BundledCommand),
Bundled(Box<commands::bundled::BundledCommand>),
}

impl Command {
Expand Down
20 changes: 20 additions & 0 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub enum ActorShutdownReason {
/// Occurs when an account actor detects that its account has been removed from the database
/// (e.g. due to a reverted account creation).
AccountRemoved(NetworkAccountId),
/// Occurs when the actor has been idle for longer than the idle timeout and the builder
/// has confirmed there are no available notes in the DB.
IdleTimeout(NetworkAccountId),
}

// ACCOUNT ACTOR CONFIG
Expand Down Expand Up @@ -87,6 +90,8 @@ pub struct AccountActorContext {
pub max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
pub max_note_attempts: usize,
/// Duration after which an idle actor will deactivate.
pub idle_timeout: Duration,
/// Database for persistent state.
pub db: Db,
/// Channel for sending requests to the coordinator (via the builder event loop).
Expand Down Expand Up @@ -192,6 +197,8 @@ pub struct AccountActor {
max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
max_note_attempts: usize,
/// Duration after which an idle actor will deactivate.
idle_timeout: Duration,
/// Channel for sending requests to the coordinator.
request_tx: mpsc::Sender<ActorRequest>,
}
Expand Down Expand Up @@ -227,6 +234,7 @@ impl AccountActor {
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
idle_timeout: actor_context.idle_timeout,
request_tx: actor_context.request_tx.clone(),
}
}
Expand Down Expand Up @@ -261,6 +269,14 @@ impl AccountActor {
// Enable transaction execution.
ActorMode::NotesAvailable => semaphore.acquire().boxed(),
};

// Idle timeout timer: only ticks when in NoViableNotes mode.
// Mode changes cause the next loop iteration to create a fresh sleep or pending.
let idle_timeout_sleep = match self.mode {
ActorMode::NoViableNotes => tokio::time::sleep(self.idle_timeout).boxed(),
_ => std::future::pending().boxed(),
};

tokio::select! {
_ = self.cancel_token.cancelled() => {
return Err(ActorShutdownReason::Cancelled(account_id));
Expand Down Expand Up @@ -309,6 +325,10 @@ impl AccountActor {
self.mode = ActorMode::NoViableNotes;
}
}
// Idle timeout: actor has been idle too long, deactivate account.
_ = idle_timeout_sleep => {
return Err(ActorShutdownReason::IdleTimeout(account_id));
}
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ impl NetworkTransactionBuilder {
// Main event loop.
loop {
tokio::select! {
// Handle actor result.
// Handle actor result. If a timed-out actor needs respawning, do so.
result = self.coordinator.next() => {
result?;
if let Some(account_id) = result? {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
},
// Handle mempool events.
event = self.mempool_events.next() => {
Expand Down Expand Up @@ -203,7 +206,11 @@ impl NetworkTransactionBuilder {
}
}
}
self.coordinator.send_targeted(&event);
let inactive_targets = self.coordinator.send_targeted(&event);
for account_id in inactive_targets {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
Ok(())
},
// Update chain state and notify affected actors.
Expand Down
101 changes: 94 additions & 7 deletions crates/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ impl ActorHandle {
/// - Controls transaction concurrency across all network accounts using a semaphore.
/// - Prevents resource exhaustion by limiting simultaneous transaction processing.
///
/// ## Actor Lifecycle
/// - Actors that have been idle for longer than the idle timeout deactivate themselves.
/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor
/// timed out. If so, the actor is respawned immediately.
/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting
/// an account without an active actor.
///
/// The coordinator operates in an event-driven manner:
/// 1. Network accounts are registered and actors spawned as needed.
/// 2. Mempool events are written to DB, then actors are notified.
Expand Down Expand Up @@ -165,31 +172,48 @@ impl Coordinator {
///
/// If no actors are currently running, this method will wait indefinitely until
/// new actors are spawned. This prevents busy-waiting when the coordinator is idle.
pub async fn next(&mut self) -> anyhow::Result<()> {
///
/// Returns `Some(account_id)` if a timed-out actor should be respawned (because a
/// notification arrived just as it timed out), or `None` otherwise.
pub async fn next(&mut self) -> anyhow::Result<Option<NetworkAccountId>> {
let actor_result = self.actor_join_set.join_next().await;
match actor_result {
Some(Ok(shutdown_reason)) => match shutdown_reason {
ActorShutdownReason::Cancelled(account_id) => {
// Do not remove the actor from the registry, as it may be re-spawned.
// The coordinator should always remove actors immediately after cancellation.
tracing::info!(account_id = %account_id, "Account actor cancelled");
Ok(())
Ok(None)
},
ActorShutdownReason::SemaphoreFailed(err) => Err(err).context("semaphore failed"),
ActorShutdownReason::DbError(account_id, err) => {
self.actor_registry.remove(&account_id);
tracing::error!(account_id = %account_id, err = err.as_report(), "Account actor shut down due to DB error");
Ok(())
self.actor_registry.remove(&account_id);
Ok(None)
},
ActorShutdownReason::AccountRemoved(account_id) => {
self.actor_registry.remove(&account_id);
tracing::info!(account_id = %account_id, "Account actor shut down: account removed");
Ok(())
Ok(None)
},
ActorShutdownReason::IdleTimeout(account_id) => {
tracing::info!(account_id = %account_id, "Account actor shut down due to idle timeout");

// Remove the actor from the registry, but check if a notification arrived
// just as the actor timed out. If so, the caller should respawn it.
let should_respawn =
self.actor_registry.remove(&account_id).is_some_and(|handle| {
let notified = handle.notify.notified();
tokio::pin!(notified);
notified.enable()
});

Ok(should_respawn.then_some(account_id))
Comment on lines +199 to +211
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but I really still think we shouldn't need ActorShutdownReason. The actor return value can just be a anyhow::Result<()>. You'll also notice that next returns a result, but we never actually return a result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the ActorShutdownReason in the coordinator side to determinate if we should re-spawn it

Copy link
Collaborator

@Mirko-von-Leipzig Mirko-von-Leipzig Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but there is only one reason to respawn it (currently, and for the foreseeable future imo)

  • if the actor itself decides to shutdown, and
    • if the actor had unhandled notifications
    • then we should restart it

An actor currently shuts down (on purpose) if:

  • it idles for too long
  • its creation tx was reverted

Everything else is a crash/error/bug.

We can therefore in theory say that all Ok(()) means check notify and possibly respawn, and all Err(_) means increment crash counter and respawn.

I'm okay with merging as is, and we can litigate/discuss after. Basically I want to reduce the number of branches and conditionals because these lead to edge cases and bugs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I misunderstood before. It should be a simple change. To avoid generating conflicts in this PR #1712 I will address this there since it is related to that PR too

},
},
Some(Err(err)) => {
tracing::error!(err = %err, "actor task failed");
Ok(())
Ok(None)
},
None => {
// There are no actors to wait for. Wait indefinitely until actors are spawned.
Expand All @@ -203,8 +227,14 @@ impl Coordinator {
/// Only actors that are currently active are notified. Since event effects are already
/// persisted in the DB by `write_event()`, actors that spawn later read their state from the
/// DB and do not need predating events.
pub fn send_targeted(&self, event: &MempoolEvent) {
///
/// Returns account IDs of note targets that do not have active actors (e.g. previously
/// deactivated due to sterility). The caller can use this to re-activate actors for those
/// accounts.
pub fn send_targeted(&self, event: &MempoolEvent) -> Vec<NetworkAccountId> {
let mut target_account_ids = HashSet::new();
let mut inactive_targets = Vec::new();

if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event {
// We need to inform the account if it was updated. This lets it know that its own
// transaction has been applied, and in the future also resolves race conditions with
Expand All @@ -228,6 +258,8 @@ impl Coordinator {

if self.actor_registry.contains_key(&account) {
target_account_ids.insert(account);
} else {
inactive_targets.push(account);
}
}
}
Expand All @@ -237,6 +269,8 @@ impl Coordinator {
handle.notify.notify_one();
}
}

inactive_targets
}

/// Writes mempool event effects to the database.
Expand Down Expand Up @@ -283,3 +317,56 @@ impl Coordinator {
}
}
}

#[cfg(test)]
mod tests {
use miden_node_proto::domain::mempool::MempoolEvent;

use super::*;
use crate::db::Db;
use crate::test_utils::*;

/// Creates a coordinator with default settings backed by a temp DB.
async fn test_coordinator() -> (Coordinator, tempfile::TempDir) {
let (db, dir) = Db::test_setup().await;
(Coordinator::new(4, db), dir)
}

/// Registers a dummy actor handle (no real actor task) in the coordinator's registry.
fn register_dummy_actor(coordinator: &mut Coordinator, account_id: NetworkAccountId) {
let notify = Arc::new(Notify::new());
let cancel_token = CancellationToken::new();
coordinator
.actor_registry
.insert(account_id, ActorHandle::new(notify, cancel_token));
}

// SEND TARGETED TESTS
// ============================================================================================

#[tokio::test]
async fn send_targeted_returns_inactive_targets() {
let (mut coordinator, _dir) = test_coordinator().await;

let active_id = mock_network_account_id();
let inactive_id = mock_network_account_id_seeded(42);

// Only register the active account.
register_dummy_actor(&mut coordinator, active_id);

let note_active = mock_single_target_note(active_id, 10);
let note_inactive = mock_single_target_note(inactive_id, 20);

let event = MempoolEvent::TransactionAdded {
id: mock_tx_id(1),
nullifiers: vec![],
network_notes: vec![note_active, note_inactive],
account_delta: None,
};

let inactive_targets = coordinator.send_targeted(&event);

assert_eq!(inactive_targets.len(), 1);
assert_eq!(inactive_targets[0], inactive_id);
}
}
11 changes: 11 additions & 0 deletions crates/ntx-builder/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,15 @@ impl Db {
apply_migrations(&mut conn).expect("migrations should apply on empty database");
(conn, dir)
}

/// Creates an async `Db` instance backed by a temp file for testing.
///
/// Returns `(Db, TempDir)` — the `TempDir` must be kept alive for the DB's lifetime.
#[cfg(test)]
pub async fn test_setup() -> (Db, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("failed to create temp directory");
let db_path = dir.path().join("test.sqlite3");
let db = Db::setup(db_path).await.expect("test DB setup should succeed");
(db, dir)
}
}
Loading
Loading