From 4e5c4e961ddb7e1503812a525200497a88cf151c Mon Sep 17 00:00:00 2001 From: Zeref Date: Mon, 23 Mar 2026 15:56:23 +0530 Subject: [PATCH 1/4] added sigterm pid termination --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1137ae74..39d08480 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ convert_case = "0.8.0" crossbeam = "0.8.4" crossbeam-channel = { version = "0.5.15", default-features = false } crossterm = { version = "0.28.1", default-features = false } -ctrlc = "3.4.5" +ctrlc = {version = "3.4.5", features = ["termination"]} dialoguer = { version = "0.11.0", default-features = false } diesel = { version = "2.2.11", default-features = false } diesel-dynamic-schema = { version = "0.2.3", default-features = false } From 2126cfc11ada5101aca93936f58be9a0df5f6427 Mon Sep 17 00:00:00 2001 From: Zeref Date: Tue, 31 Mar 2026 11:18:36 +0530 Subject: [PATCH 2/4] added changes to terminal simnet when tui enabled --- crates/cli/src/tui/simnet.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index ee236f26..9145c901 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -2,6 +2,7 @@ use std::{ env, error::Error, io, + sync::{Arc, RwLock}, time::{Duration, Instant}, }; @@ -448,8 +449,28 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( } }); + let ctrlc_simnet_cmd_tx_clone = app.simnet_commands_tx.clone(); + + let should_exit = Arc::new(RwLock::new(false)); + let should_exit_read = Arc::clone(&should_exit); + let should_exit_write = Arc::clone(&should_exit); + + ctrlc::set_handler(move || { + // Send terminate command to allow graceful shutdown (Drop to run) + let _ = ctrlc_simnet_cmd_tx_clone.send(SimnetCommand::Terminate(None)); + // .unwrap should be fine here since it's the only place that will receive + *should_exit_write.write().unwrap() = true; + }) + .expect("Error setting Ctrl-C handler"); + let mut deployment_completed = false; loop { + if let Ok(should_exit_loop) = should_exit_read.read() + && *should_exit_loop + { + return Ok(()); + } + let mut selector = Select::new(); let mut handles = vec![]; let mut new_events = vec![]; @@ -701,6 +722,7 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( app.tail(); } + #[allow(clippy::collapsible_if)] if event::poll(Duration::from_millis(50))? { if let Event::Key(key_event) = event::read()? { if key_event.kind == KeyEventKind::Press { From 236c9ea357b6e35212ddbec7bbd914a68271ca12 Mon Sep 17 00:00:00 2001 From: Zeref Date: Thu, 2 Apr 2026 11:02:12 +0530 Subject: [PATCH 3/4] chore: shifted from RwLock to AtomicBool --- crates/cli/src/tui/simnet.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index 9145c901..9694cefa 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -2,7 +2,7 @@ use std::{ env, error::Error, io, - sync::{Arc, RwLock}, + sync::{Arc, RwLock, atomic::AtomicBool}, time::{Duration, Instant}, }; @@ -451,23 +451,20 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( let ctrlc_simnet_cmd_tx_clone = app.simnet_commands_tx.clone(); - let should_exit = Arc::new(RwLock::new(false)); + let should_exit = Arc::new(AtomicBool::new(false)); let should_exit_read = Arc::clone(&should_exit); let should_exit_write = Arc::clone(&should_exit); ctrlc::set_handler(move || { // Send terminate command to allow graceful shutdown (Drop to run) let _ = ctrlc_simnet_cmd_tx_clone.send(SimnetCommand::Terminate(None)); - // .unwrap should be fine here since it's the only place that will receive - *should_exit_write.write().unwrap() = true; + should_exit_write.store(true, std::sync::atomic::Ordering::Release); }) .expect("Error setting Ctrl-C handler"); let mut deployment_completed = false; loop { - if let Ok(should_exit_loop) = should_exit_read.read() - && *should_exit_loop - { + if should_exit_read.load(std::sync::atomic::Ordering::Acquire) { return Ok(()); } @@ -722,7 +719,6 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( app.tail(); } - #[allow(clippy::collapsible_if)] if event::poll(Duration::from_millis(50))? { if let Event::Key(key_event) = event::read()? { if key_event.kind == KeyEventKind::Press { From 80c2f9ef5ccd152345a4644fc69a73d2114f0add Mon Sep 17 00:00:00 2001 From: Zeref Date: Thu, 2 Apr 2026 16:48:19 +0530 Subject: [PATCH 4/4] feature(rpc): Adds getBundleStatuses rpc method --- crates/core/src/error.rs | 6 + crates/core/src/rpc/jito.rs | 210 +++++++++++++++++++++++++++++- crates/core/src/runloops/mod.rs | 5 + crates/core/src/surfnet/locker.rs | 17 +++ crates/core/src/surfnet/svm.rs | 4 + crates/types/src/types.rs | 1 + 6 files changed, 239 insertions(+), 4 deletions(-) diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index d2544737..4f776b88 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -99,6 +99,12 @@ impl SurfpoolError { Self(error) } + pub fn invalid_bundle_id() -> Self { + let mut error = Error::invalid_request(); + error.data = Some(json!(format!("Solana RPC client error: Invalid bundle id"))); + Self(error) + } + pub fn missing_context() -> Self { let mut error = Error::internal_error(); error.data = Some(json!("Failed to access internal Surfnet context")); diff --git a/crates/core/src/rpc/jito.rs b/crates/core/src/rpc/jito.rs index 1111c107..486acef5 100644 --- a/crates/core/src/rpc/jito.rs +++ b/crates/core/src/rpc/jito.rs @@ -1,10 +1,12 @@ use std::str::FromStr; -use jsonrpc_core::{Error, Result}; +use jsonrpc_core::{BoxFuture, Error, Result}; use jsonrpc_derive::rpc; use sha2::{Digest, Sha256}; use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_custom_error::RpcCustomError}; +use solana_rpc_client_api::response::Response as RpcResponse; use solana_signature::Signature; +use solana_transaction_status::TransactionStatus; use super::{ RunloopContext, @@ -59,6 +61,47 @@ pub trait Jito { transactions: Vec, config: Option, ) -> Result; + + /// Retrieves the statuses of all transactions in a previously submitted bundle. + /// + /// This RPC method looks up a bundle by its `bundle_id` (the SHA-256 hash returned by + /// [`sendBundle`](#method.send_bundle)) and returns the signature statuses for the bundle's + /// transactions in the same order they were recorded. + /// + /// ## Parameters + /// - `bundle_id`: The bundle identifier returned by `sendBundle`. + /// + /// ## Returns + /// A contextualized response containing: + /// - `value`: A list of optional transaction statuses corresponding to the bundle signatures. + /// Each entry can be: + /// - `null` if the signature is unknown or not sufficiently confirmed for status reporting + /// - a `TransactionStatus` object if the transaction is found and its status can be returned + /// + /// ## Example Request (JSON-RPC) + /// ```json + /// { + /// "jsonrpc": "2.0", + /// "id": 1, + /// "method": "getBundleStatuses", + /// "params": [ + /// "bundleIdHere" + /// ] + /// } + /// ``` + /// + /// ## Notes + /// - Bundles are stored locally as a mapping from `bundle_id` to a list of base-58 signatures. + /// - If the bundle ID is not known locally, an error is returned. + /// - Status resolution is delegated to the same logic used by `getSignatureStatuses`: + /// statuses are computed from locally stored transactions (and may fall back to a remote + /// datasource, if configured). + #[rpc(meta, name = "getBundleStatuses")] + fn get_bundle_statuses( + &self, + meta: Self::Metadata, + bundle_id: String, + ) -> BoxFuture>>>>; } #[derive(Clone)] @@ -83,7 +126,7 @@ impl Jito for SurfpoolJitoRpc { ))); } - let Some(_ctx) = &meta else { + let Some(ctx) = &meta else { return Err(RpcCustomError::NodeUnhealthy { num_slots_behind: None, } @@ -102,7 +145,7 @@ impl Jito for SurfpoolJitoRpc { let bundle_config = Some(SurfpoolRpcSendTransactionConfig { base: RpcSendTransactionConfig { skip_preflight: true, - ..base_config.clone() + ..base_config }, skip_sig_verify: None, }); @@ -136,7 +179,40 @@ impl Jito for SurfpoolJitoRpc { let mut hasher = Sha256::new(); hasher.update(concatenated_signatures.as_bytes()); let bundle_id = hasher.finalize(); - Ok(hex::encode(bundle_id)) + let bundle_id = hex::encode(bundle_id); + + let _ = ctx + .simnet_commands_tx + .send(surfpool_types::SimnetCommand::SendBundle(( + bundle_id.clone(), + bundle_signatures + .iter() + .map(|sig| sig.to_string()) + .collect(), + ))); + + Ok(bundle_id) + } + + fn get_bundle_statuses( + &self, + meta: Self::Metadata, + bundle_id: String, + ) -> BoxFuture>>>> { + Box::pin(async move { + let Some(ctx) = &meta else { + return Err(RpcCustomError::NodeUnhealthy { + num_slots_behind: None, + } + .into()); + }; + + let signatures = ctx.svm_locker.get_bundle(bundle_id)?; + + SurfpoolFullRpc + .get_signature_statuses(meta.clone(), signatures, None) + .await + }) } } @@ -421,4 +497,130 @@ mod tests { "Bundle ID should match SHA-256 of comma-separated signatures" ); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_send_bundle_persists_bundle_signatures() { + let payer = Keypair::new(); + let recipient = Pubkey::new_unique(); + let (mempool_tx, mempool_rx) = crossbeam_channel::unbounded(); + let setup = TestSetup::new_with_mempool(SurfpoolJitoRpc, mempool_tx); + + let recent_blockhash = setup + .context + .svm_locker + .with_svm_reader(|svm_reader| svm_reader.latest_blockhash()); + + // Airdrop to payer so tx can succeed in our manual processing + let _ = setup + .context + .svm_locker + .0 + .write() + .await + .airdrop(&payer.pubkey(), 2 * LAMPORTS_PER_SOL); + + let tx = build_v0_transaction( + &payer.pubkey(), + &[&payer], + &[system_instruction::transfer( + &payer.pubkey(), + &recipient, + LAMPORTS_PER_SOL, + )], + &recent_blockhash, + ); + let tx_encoded = bs58::encode(bincode::serialize(&tx).unwrap()).into_string(); + + // Build expected signatures locally (what we expect to be persisted under bundle_id) + let expected_sigs = vec![tx.signatures[0].to_string()]; + + let setup_clone = setup.clone(); + let handle = hiro_system_kit::thread_named("send_bundle") + .spawn(move || { + setup_clone + .rpc + .send_bundle(Some(setup_clone.context), vec![tx_encoded], None) + }) + .unwrap(); + + let mut processed_tx = false; + let mut processed_bundle = false; + let mut bundle_id_from_cmd: Option = None; + let mut sigs_from_cmd: Option> = None; + + while !(processed_tx && processed_bundle) { + match mempool_rx.recv() { + Ok(SimnetCommand::ProcessTransaction(_, tx, status_tx, _, _)) => { + let mut writer = setup.context.svm_locker.0.write().await; + let slot = writer.get_latest_absolute_slot(); + writer.transactions_queued_for_confirmation.push_back(( + tx.clone(), + status_tx.clone(), + None, + )); + let sig = tx.signatures[0]; + let tx_with_status_meta = TransactionWithStatusMeta { + slot, + transaction: tx, + ..Default::default() + }; + writer + .transactions + .store( + sig.to_string(), + SurfnetTransactionStatus::processed( + tx_with_status_meta, + std::collections::HashSet::new(), + ), + ) + .unwrap(); + status_tx + .send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Confirmed, + )) + .unwrap(); + processed_tx = true; + } + Ok(SimnetCommand::SendBundle((bundle_id, signatures))) => { + setup + .context + .svm_locker + .process_bundle(bundle_id.clone(), signatures.clone()) + .unwrap(); + bundle_id_from_cmd = Some(bundle_id); + sigs_from_cmd = Some(signatures); + processed_bundle = true; + } + Ok(SimnetCommand::AirdropProcessed) => continue, + other => panic!("unexpected simnet command: {:?}", other), + } + } + + let result = handle.join().unwrap().expect("sendBundle should succeed"); + let stored_bundle_id = bundle_id_from_cmd.expect("should have received SendBundle command"); + assert_eq!( + result, stored_bundle_id, + "sendBundle result bundle id should match stored bundle id" + ); + + let persisted = setup + .context + .svm_locker + .get_bundle(stored_bundle_id.clone()) + .expect("bundle should be persisted"); + assert!( + !persisted.is_empty(), + "svm_locker.get_bundle(bundle_id) should not be empty" + ); + + let sigs_from_cmd = sigs_from_cmd.expect("should have captured signatures from SendBundle"); + assert_eq!( + sigs_from_cmd, expected_sigs, + "Signatures in SendBundle command should match locally built signatures" + ); + assert_eq!( + persisted, expected_sigs, + "Persisted bundle signatures should match locally built signatures" + ); + } } diff --git a/crates/core/src/runloops/mod.rs b/crates/core/src/runloops/mod.rs index 1a2b2437..b65ee344 100644 --- a/crates/core/src/runloops/mod.rs +++ b/crates/core/src/runloops/mod.rs @@ -491,6 +491,11 @@ pub async fn start_block_production_runloop( do_produce_block = true; } } + SimnetCommand::SendBundle((bundle_id, signatures)) => { + if let Err(e) = svm_locker.process_bundle(bundle_id, signatures) { + let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!("Failed to send jito bundle: {}", e))); + } + } SimnetCommand::Terminate(_) => { // Explicitly shutdown storage to trigger WAL checkpoint before exiting svm_locker.shutdown(); diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 04c0f2bc..61ea8626 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -980,6 +980,18 @@ impl SurfnetSvmLocker { } } + pub fn get_bundle(&self, bundle_id: String) -> SurfpoolResult> { + self.with_svm_reader(|svm_reader| { + if let Ok(get_signatures_result) = svm_reader.jito_bundles.get(&bundle_id) + && let Some(signatures) = get_signatures_result + { + Ok(signatures) + } else { + Err(SurfpoolError::invalid_bundle_id()) + } + }) + } + /// Retrieves a transaction from local cache, returning a contextualized result. pub fn get_transaction_local( &self, @@ -1108,6 +1120,11 @@ impl SurfnetSvmLocker { Ok(()) } + pub fn process_bundle(&self, bundle_id: String, signatures: Vec) -> SurfpoolResult<()> { + self.with_svm_writer(|svm_writer| svm_writer.jito_bundles.store(bundle_id, signatures))?; + Ok(()) + } + pub async fn profile_transaction( &self, remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>, diff --git a/crates/core/src/surfnet/svm.rs b/crates/core/src/surfnet/svm.rs index cf2ad003..101b44c0 100644 --- a/crates/core/src/surfnet/svm.rs +++ b/crates/core/src/surfnet/svm.rs @@ -223,6 +223,7 @@ pub struct SurfnetSvm { pub chain_tip: BlockIdentifier, pub blocks: Box>, pub transactions: Box>, + pub jito_bundles: Box>>, pub transactions_queued_for_confirmation: VecDeque<( VersionedTransaction, Sender, @@ -343,6 +344,7 @@ impl SurfnetSvm { // Wrap all storage fields with OverlayStorage blocks: OverlayStorage::wrap(self.blocks.clone_box()), transactions: OverlayStorage::wrap(self.transactions.clone_box()), + jito_bundles: OverlayStorage::wrap(self.jito_bundles.clone_box()), profile_tag_map: OverlayStorage::wrap(self.profile_tag_map.clone_box()), simulated_transaction_profiles: OverlayStorage::wrap( self.simulated_transaction_profiles.clone_box(), @@ -456,6 +458,7 @@ impl SurfnetSvm { )?; let blocks_db = new_kv_store(&database_url, "blocks", surfnet_id)?; let transactions_db = new_kv_store(&database_url, "transactions", surfnet_id)?; + let jito_bundles_db = new_kv_store(&database_url, "jito_bundles", surfnet_id)?; let token_accounts_db = new_kv_store(&database_url, "token_accounts", surfnet_id)?; let mut token_mints_db: Box> = new_kv_store(&database_url, "token_mints", surfnet_id)?; @@ -545,6 +548,7 @@ impl SurfnetSvm { chain_tip, blocks: blocks_db, transactions: transactions_db, + jito_bundles: jito_bundles_db, perf_samples: VecDeque::new(), transactions_processed, simnet_events_tx, diff --git a/crates/types/src/types.rs b/crates/types/src/types.rs index 8198b148..10c6cd91 100644 --- a/crates/types/src/types.rs +++ b/crates/types/src/types.rs @@ -557,6 +557,7 @@ pub enum SimnetCommand { CompleteRunbookExecution(String, Option>), FetchRemoteAccounts(Vec, String), AirdropProcessed, + SendBundle((String, Vec)), } #[derive(Debug)]