From 2561171380036a28bf7f0c45826d9a464e2b22b6 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 29 Oct 2025 20:00:22 +0000 Subject: [PATCH 1/6] TQ: Support persisting state to ledger Builds on https://github.com/oxidecomputer/omicron/pull/9296 This commit persists state to a ledger, following the pattern used in the bootstore. It's done this way because the `PersistentState` itself is contained in the sans-io layer, but we must save it in the async task layer. The sans-io layer shouldn't know how the state is persisted, just that it is, and so we recreate the ledger for every time we write it. A follow up will PR will deal with the early networking information saved by the bootstore, and will be very similar. --- Cargo.lock | 1 + trust-quorum/Cargo.toml | 1 + trust-quorum/src/ledgers.rs | 75 +++++++++++++ trust-quorum/src/lib.rs | 1 + trust-quorum/src/task.rs | 210 ++++++++++++++++++++++++++++++++++-- 5 files changed, 281 insertions(+), 7 deletions(-) create mode 100644 trust-quorum/src/ledgers.rs diff --git a/Cargo.lock b/Cargo.lock index 38813d7bd65..83fdd88f60a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14827,6 +14827,7 @@ dependencies = [ "hex", "hkdf", "iddqd", + "omicron-common", "omicron-test-utils", "omicron-uuid-kinds", "omicron-workspace-hack", diff --git a/trust-quorum/Cargo.toml b/trust-quorum/Cargo.toml index 306764dc6f1..a778dbe6daa 100644 --- a/trust-quorum/Cargo.toml +++ b/trust-quorum/Cargo.toml @@ -22,6 +22,7 @@ gfss.workspace = true hex.workspace = true hkdf.workspace = true iddqd.workspace = true +omicron-common.workspace = true omicron-uuid-kinds.workspace = true rand = { workspace = true, features = ["os_rng"] } secrecy.workspace = true diff --git a/trust-quorum/src/ledgers.rs b/trust-quorum/src/ledgers.rs new file mode 100644 index 00000000000..39099aa1f7a --- /dev/null +++ b/trust-quorum/src/ledgers.rs @@ -0,0 +1,75 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Persistent storage for the trust quorum task +//! +//! We write two pieces of data to M.2 devices in production via +//! [`omicron_common::ledger::Ledger`]: +//! +//! 1. [`trust_quorum_protocol::PersistentState`] for trust quorum state +//! 2. A network config blob required for pre-rack-unlock configuration + +use camino::Utf8PathBuf; +use omicron_common::ledger::{Ledger, Ledgerable}; +use serde::{Deserialize, Serialize}; +use slog::{Logger, info}; +use trust_quorum_protocol::PersistentState; + +/// A wrapper type around [`PersistentState`] for use as a [`Ledger`] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PersistentStateLedger { + pub generation: u64, + pub state: PersistentState, +} + +impl Ledgerable for PersistentStateLedger { + fn is_newer_than(&self, other: &Self) -> bool { + self.generation > other.generation + } + + fn generation_bump(&mut self) { + self.generation += 1; + } +} + +impl PersistentStateLedger { + /// Save the persistent state to a ledger and return the new generation + /// number. + /// + /// Panics if the ledger cannot be saved. + pub async fn save( + log: &Logger, + paths: Vec, + generation: u64, + state: PersistentState, + ) -> u64 { + let persistent_state = PersistentStateLedger { generation, state }; + let mut ledger = Ledger::new_with(log, paths, persistent_state); + ledger + .commit() + .await + .expect("Critical: Failed to save bootstore ledger for Fsm::State"); + ledger.data().generation + } + + /// Return Some(`PersistentStateLedger`) if it exists on disk, otherwise + /// return `None`. + pub async fn load( + log: &Logger, + paths: Vec, + ) -> Option { + let Some(ledger) = + Ledger::::new(&log, paths).await + else { + return None; + }; + let persistent_state = ledger.into_inner(); + info!( + log, + "Loaded persistent state from ledger with generation {}", + persistent_state.generation + ); + Some(persistent_state) + } +} diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index f508647c889..dec9b3608ed 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -6,6 +6,7 @@ mod connection_manager; pub(crate) mod established_conn; +mod ledgers; mod task; pub(crate) use connection_manager::{ diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 179caf26a22..e91733f317a 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -8,6 +8,8 @@ use crate::connection_manager::{ ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, }; +use crate::ledgers::PersistentStateLedger; +use camino::Utf8PathBuf; use omicron_uuid_kinds::RackUuid; use serde::{Deserialize, Serialize}; use slog::{Logger, debug, error, info, o}; @@ -47,8 +49,8 @@ const CONN_TO_MAIN_CHANNEL_BOUND: usize = 1024; pub struct Config { pub baseboard_id: BaseboardId, pub listen_addr: SocketAddrV6, - // pub tq_state_ledger_paths: Vec, - // pub network_config_ledger_paths: Vec, + pub tq_ledger_paths: Vec, + pub network_config_ledger_paths: Vec, pub sprockets: SprocketsConfig, } @@ -323,8 +325,8 @@ impl NodeTaskHandle { pub struct NodeTask { shutdown: bool, log: Logger, - #[expect(unused)] config: Config, + tq_ledger_generation: u64, node: Node, ctx: NodeCtx, conn_mgr: ConnMgr, @@ -351,8 +353,20 @@ impl NodeTask { let baseboard_id = config.baseboard_id.clone(); - // TODO: Load persistent state from ledger - let mut ctx = NodeCtx::new(config.baseboard_id.clone()); + let (mut ctx, tq_ledger_generation) = if let Some(ps_ledger) = + PersistentStateLedger::load(&log, config.tq_ledger_paths.clone()) + .await + { + ( + NodeCtx::new_with_persistent_state( + config.baseboard_id.clone(), + ps_ledger.state, + ), + ps_ledger.generation, + ) + } else { + (NodeCtx::new(config.baseboard_id.clone()), 0) + }; let node = Node::new(&log, &mut ctx); let conn_mgr = ConnMgr::new( &log, @@ -367,6 +381,7 @@ impl NodeTask { shutdown: false, log, config, + tq_ledger_generation, node, ctx, conn_mgr, @@ -406,6 +421,10 @@ impl NodeTask { } // Handle messages from connection management tasks + // + // We persist state at the end of this method, which always occurs before + // we send any outgoing messages in the `run` loop as a response of handling + // this message. async fn on_conn_msg(&mut self, msg: ConnToMainMsg) { let task_id = msg.task_id; match msg.msg { @@ -435,9 +454,14 @@ impl NodeTask { todo!(); } } + self.save_persistent_state().await; } - // TODO: Process `ctx`: save persistent state + // Handle API requests from sled-agent + // + // NOTE: We persist state where necessary before responding to clients. Any + // resulting output messages will also be sent in the `run` loop after we + // persist state. async fn on_api_request(&mut self, request: NodeApiRequest) { match request { NodeApiRequest::BootstrapAddresses(addrs) => { @@ -467,6 +491,7 @@ impl NodeTask { CommitStatus::Pending } }); + self.save_persistent_state().await; let _ = tx.send(res); } NodeApiRequest::ConnMgrStatus { tx } => { @@ -489,6 +514,7 @@ impl NodeTask { NodeApiRequest::LrtqUpgrade { msg, tx } => { let res = self.node.coordinate_upgrade_from_lrtq(&mut self.ctx, msg); + self.save_persistent_state().await; let _ = tx.send(res); } NodeApiRequest::NodeStatus { tx } => { @@ -511,11 +537,13 @@ impl NodeTask { CommitStatus::Pending } }); + self.save_persistent_state().await; let _ = tx.send(res); } NodeApiRequest::Reconfigure { msg, tx } => { let res = self.node.coordinate_reconfiguration(&mut self.ctx, msg); + self.save_persistent_state().await; let _ = tx.send(res); } NodeApiRequest::Shutdown => { @@ -524,6 +552,19 @@ impl NodeTask { } } } + + /// Save `PersistentState` to storage if necessary + pub async fn save_persistent_state(&mut self) { + if self.ctx.persistent_state_change_check_and_reset() { + self.tq_ledger_generation = PersistentStateLedger::save( + &self.log, + self.config.tq_ledger_paths.clone(), + self.tq_ledger_generation, + self.ctx.persistent_state().clone(), + ) + .await; + } + } } #[cfg(test)] @@ -580,7 +621,15 @@ mod tests { }, roots: vec![cert_path(dir.clone(), &root_prefix())], }; - Config { baseboard_id, listen_addr, sprockets } + let tq_ledger_paths = + vec![dir.join(format!("test-tq-ledger-[{i}]"))]; + Config { + baseboard_id, + listen_addr, + sprockets, + tq_ledger_paths, + network_config_ledger_paths: vec![], + } }) .collect() } @@ -1472,4 +1521,151 @@ mod tests { setup.cleanup_successful(); } + + /// Ensure state is persisted as we expect + #[tokio::test] + pub async fn tq_persistent_state() { + let num_nodes = 4; + let mut setup = + TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Simulate a crash of the last node. + let join_handle = setup.join_handles.pop().unwrap(); + let node_handle = setup.node_handles.pop().unwrap(); + node_handle.shutdown().await.unwrap(); + join_handle.await.unwrap(); + let _ = setup.listen_addrs.pop().unwrap(); + + // Now Bring it back up with the same persistent state, which contains + // the initial config and prepare. Commit should work and everything + // should pick up as expected. + let (mut task, handle) = NodeTask::new( + setup.configs.last().unwrap().clone(), + &setup.logctx.log, + ) + .await; + let listen_addr = handle.listen_addr(); + setup.node_handles.push(handle); + setup.join_handles.push(tokio::spawn(async move { task.run().await })); + setup.listen_addrs.push(listen_addr); + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles { + if h.commit(rack_id, Epoch(1)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + // Simulate crash and restart again + let join_handle = setup.join_handles.pop().unwrap(); + let node_handle = setup.node_handles.pop().unwrap(); + node_handle.shutdown().await.unwrap(); + join_handle.await.unwrap(); + let _ = setup.listen_addrs.pop().unwrap(); + let (mut task, handle) = NodeTask::new( + setup.configs.last().unwrap().clone(), + &setup.logctx.log, + ) + .await; + let listen_addr = handle.listen_addr(); + setup.node_handles.push(handle); + setup.join_handles.push(tokio::spawn(async move { task.run().await })); + setup.listen_addrs.push(listen_addr); + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + setup.cleanup_successful(); + } } From 299bd7163369e467a8bb36dd5a27243ce51430d2 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 31 Oct 2025 21:06:45 +0000 Subject: [PATCH 2/6] fix comment --- trust-quorum/src/ledgers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trust-quorum/src/ledgers.rs b/trust-quorum/src/ledgers.rs index 39099aa1f7a..598ac1b78ea 100644 --- a/trust-quorum/src/ledgers.rs +++ b/trust-quorum/src/ledgers.rs @@ -49,7 +49,7 @@ impl PersistentStateLedger { ledger .commit() .await - .expect("Critical: Failed to save bootstore ledger for Fsm::State"); + .expect("Critical: Failed to save ledger for persistent state"); ledger.data().generation } From 2630d11fe7a34c68c7fbca7fb21d6f0614e292a8 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 3 Nov 2025 16:06:44 +0000 Subject: [PATCH 3/6] Document panic rationale for ledgers --- trust-quorum/src/ledgers.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/trust-quorum/src/ledgers.rs b/trust-quorum/src/ledgers.rs index 598ac1b78ea..b8830141d59 100644 --- a/trust-quorum/src/ledgers.rs +++ b/trust-quorum/src/ledgers.rs @@ -38,6 +38,22 @@ impl PersistentStateLedger { /// number. /// /// Panics if the ledger cannot be saved. + /// + /// The trust quorum protocol relies on persisting state to disk, such + /// as whether a node has prepared or committed a configuration, before + /// responding to a coordinator node or Nexus. This is necessary in order + /// to ensure that enough nodes actually have performed an operation and + /// not have the overall state of the protocol go backward in the case of + /// a crash and restart of a node. In this manner, trust quorum is similar + /// to consensus protocols like Raft and Paxos. + /// + /// If for any reason we cannot persist trust quorum state to the ledger, + /// we must panic to ensure that the node does not take any further + /// action incorrectly, like acknowledging a `Prepare` to a coordinator. + /// Panicking is the simplest mechanism to ensure that a given node will + /// not violate the invariants of the trust quorum protocol in the case + /// of internal disk failures. It also ensures a very obvious failure that + /// will allow support to get involved and replace internal disks. pub async fn save( log: &Logger, paths: Vec, From ca8a905703af458d057eda91c42ab4667c55279d Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Mon, 3 Nov 2025 18:56:35 +0000 Subject: [PATCH 4/6] Fix test for new load rack secret api and clean up comments --- trust-quorum/src/task.rs | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index e91733f317a..7f63b72563d 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -234,9 +234,6 @@ impl NodeTaskHandle { } /// Load the rack secret for the given epoch - /// - /// This can block for an indefinite period of time before returning - /// and depends on availability of the trust quorum. pub async fn load_rack_secret( &self, epoch: Epoch, @@ -247,12 +244,7 @@ impl NodeTaskHandle { Ok(rs) } - /// Return `Ok(true)` if the configuration has committed, `Ok(false)` if - /// it hasn't committed yet, or an error otherwise. - /// - /// Nexus will retry this operation and so we should only try once here. - /// This is in contrast to operations like `load_rack_secret` that are - /// called directly from sled agent. + /// Attempt to prepare and commit the given configuration pub async fn prepare_and_commit( &self, config: Configuration, @@ -263,12 +255,7 @@ impl NodeTaskHandle { Ok(res) } - /// Return `Ok(true)` if the configuration has committed, `Ok(false)` if - /// it hasn't committed yet, or an error otherwise. - /// - /// Nexus will retry this operation and so we should only try once here. - /// This is in contrast to operations like `load_rack_secret` that are - /// called directly from sled agent. + /// Attempt to commit the configuration at epoch `epoch` pub async fn commit( &self, rack_id: RackUuid, @@ -1607,7 +1594,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles { - if h.commit(rack_id, Epoch(1)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { acked += 1; } } @@ -1657,14 +1647,13 @@ mod tests { } // Now load the rack secret at all nodes - let mut secret = None; - for h in &setup.node_handles { - let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); - if secret.is_none() { - secret = Some(rs.clone()); - } - assert_eq!(&rs, secret.as_ref().unwrap()); - } + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); setup.cleanup_successful(); } From 4d1d0321df037e9a35114f9a08a78779c7dd3c46 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Tue, 4 Nov 2025 00:09:36 +0000 Subject: [PATCH 5/6] Better duplicate connection management --- trust-quorum/src/connection_manager.rs | 82 +++++++++++++++++++++++--- trust-quorum/src/task.rs | 2 +- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index 5adcc44d747..c9ef935b998 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -404,7 +404,7 @@ impl ConnMgr { self.on_task_exit(task_id).await; } Err(err) => { - error!(self.log, "Connection task panic: {err}"); + warn!(self.log, "Connection task panic: {err}"); self.on_task_exit(err.id()).await; } @@ -482,7 +482,19 @@ impl ConnMgr { tx, conn_type: ConnectionType::Accepted(addr), }; - assert!(self.accepting.insert_unique(task_handle).is_ok()); + let replaced = self.accepting.insert_overwrite(task_handle); + for h in replaced { + // We accepted a connection from the same `SocketAddrV6` before the + // old one was torn down. This should be rare, if not impossible. + warn!( + self.log, + "Accepted connection replaced. Aborting old task."; + "task_id" => ?h.task_id(), + "peer_addr" => %h.addr(), + ); + h.abort(); + } + Ok(()) } @@ -501,12 +513,38 @@ impl ConnMgr { "peer_id" => %peer_id ); - let already_established = self.established.insert_unique( + let replaced = self.established.insert_overwrite( EstablishedTaskHandle::new(peer_id, task_handle), ); - assert!(already_established.is_ok()); + + // The only reason for for established connections to be replaced + // like this is when the IP address for a peer changes, but the + // previous connection has not yet been torn down. + // + // Tear down usually happens quickly due to TCP reset or missed + // pings. However if the new ip address is fed into the task via + // `load_peer_addresses` and the peer at that address connects + // before the old connection is torn down, you end up in this + // situation. + // + // This isn't really possible, except in tests where we change port + // numbers when simulating crash and restart of nodes, and do this + // very quickly. We change port numbers because `NodeTask`s listen + // on port 0 and use ephemeral ports to prevent collisions in tests + // where the IP address is localhost. + for h in replaced { + warn!( + self.log, + "Established connection replaced. Aborting old task."; + "task_id" => ?h.task_id(), + "peer_addr" => %h.addr(), + "peer_id" => %h.baseboard_id + ); + h.abort(); + } } else { - error!(self.log, "Server handshake completed, but no server addr in map"; + warn!(self.log, + "Server handshake completed, but no server addr in map"; "task_id" => ?task_id, "peer_addr" => %addr, "peer_id" => %peer_id @@ -528,12 +566,38 @@ impl ConnMgr { "peer_addr" => %addr, "peer_id" => %peer_id ); - let already_established = self.established.insert_unique( + let replaced = self.established.insert_overwrite( EstablishedTaskHandle::new(peer_id, task_handle), ); - assert!(already_established.is_ok()); + + // The only reason for for established connections to be replaced + // like this is when the IP address for a peer changes, but the + // previous connection has not yet been torn down. + // + // Tear down usually happens quickly due to TCP reset or missed + // pings. However if the new ip address is fed into the task via + // `load_peer_addresses` and the peer at that address connects + // before the old connection is torn down, you end up in this + // situation. + // + // This isn't really possible, except in tests where we change port + // numbers when simulating crash and restart of nodes, and do this + // very quickly. We change port numbers because `NodeTask`s listen + // on port 0 and use ephemeral ports to prevent collisions in tests + // where the IP address is localhost. + for h in replaced { + warn!( + self.log, + "Established connection replaced. Aborting old task."; + "task_id" => ?h.task_id(), + "peer_addr" => %h.addr(), + "peer_id" => %h.baseboard_id + ); + h.abort(); + } } else { - error!(self.log, "Client handshake completed, but no client addr in map"; + warn!(self.log, + "Client handshake completed, but no client addr in map"; "task_id" => ?task_id, "peer_addr" => %addr, "peer_id" => %peer_id @@ -634,7 +698,7 @@ impl ConnMgr { disconnected_peers } - /// Spawn a task to estalbish a sprockets connection for the given address + /// Spawn a task to establish a sprockets connection for the given address async fn connect_client( &mut self, corpus: Vec, diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 7f63b72563d..e2a1024d652 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -1655,6 +1655,6 @@ mod tests { .await .unwrap(); - setup.cleanup_successful(); + // setup.cleanup_successful(); } } From 6bb101566b031305cb6f5a616cc4597e4e465440 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Tue, 4 Nov 2025 00:10:52 +0000 Subject: [PATCH 6/6] whoops --- trust-quorum/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index e2a1024d652..7f63b72563d 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -1655,6 +1655,6 @@ mod tests { .await .unwrap(); - // setup.cleanup_successful(); + setup.cleanup_successful(); } }