Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ convert_case = "0.8.0"
crossbeam = "0.8.4"
crossbeam-channel = { version = "0.5.15", default-features = false }
crossterm = { version = "0.28.1", default-features = false }
ctrlc = "3.4.5"
ctrlc = {version = "3.4.5", features = ["termination"]}
dialoguer = { version = "0.11.0", default-features = false }
diesel = { version = "2.2.11", default-features = false }
diesel-dynamic-schema = { version = "0.2.3", default-features = false }
Expand Down
18 changes: 18 additions & 0 deletions crates/cli/src/tui/simnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
env,
error::Error,
io,
sync::{Arc, RwLock, atomic::AtomicBool},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -448,8 +449,25 @@ fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Result<(
}
});

let ctrlc_simnet_cmd_tx_clone = app.simnet_commands_tx.clone();

let should_exit = Arc::new(AtomicBool::new(false));
let should_exit_read = Arc::clone(&should_exit);
let should_exit_write = Arc::clone(&should_exit);

ctrlc::set_handler(move || {
// Send terminate command to allow graceful shutdown (Drop to run)
let _ = ctrlc_simnet_cmd_tx_clone.send(SimnetCommand::Terminate(None));
should_exit_write.store(true, std::sync::atomic::Ordering::Release);
})
.expect("Error setting Ctrl-C handler");

let mut deployment_completed = false;
loop {
if should_exit_read.load(std::sync::atomic::Ordering::Acquire) {
return Ok(());
}

let mut selector = Select::new();
let mut handles = vec![];
let mut new_events = vec![];
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ impl SurfpoolError {
Self(error)
}

pub fn invalid_bundle_id() -> Self {
let mut error = Error::invalid_request();
error.data = Some(json!(format!("Solana RPC client error: Invalid bundle id")));
Self(error)
}

pub fn missing_context() -> Self {
let mut error = Error::internal_error();
error.data = Some(json!("Failed to access internal Surfnet context"));
Expand Down
210 changes: 206 additions & 4 deletions crates/core/src/rpc/jito.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::str::FromStr;

use jsonrpc_core::{Error, Result};
use jsonrpc_core::{BoxFuture, Error, Result};
use jsonrpc_derive::rpc;
use sha2::{Digest, Sha256};
use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_custom_error::RpcCustomError};
use solana_rpc_client_api::response::Response as RpcResponse;
use solana_signature::Signature;
use solana_transaction_status::TransactionStatus;

use super::{
RunloopContext,
Expand Down Expand Up @@ -59,6 +61,47 @@ pub trait Jito {
transactions: Vec<String>,
config: Option<RpcSendTransactionConfig>,
) -> Result<String>;

/// Retrieves the statuses of all transactions in a previously submitted bundle.
///
/// This RPC method looks up a bundle by its `bundle_id` (the SHA-256 hash returned by
/// [`sendBundle`](#method.send_bundle)) and returns the signature statuses for the bundle's
/// transactions in the same order they were recorded.
///
/// ## Parameters
/// - `bundle_id`: The bundle identifier returned by `sendBundle`.
///
/// ## Returns
/// A contextualized response containing:
/// - `value`: A list of optional transaction statuses corresponding to the bundle signatures.
/// Each entry can be:
/// - `null` if the signature is unknown or not sufficiently confirmed for status reporting
/// - a `TransactionStatus` object if the transaction is found and its status can be returned
///
/// ## Example Request (JSON-RPC)
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "id": 1,
/// "method": "getBundleStatuses",
/// "params": [
/// "bundleIdHere"
/// ]
/// }
/// ```
///
/// ## Notes
/// - Bundles are stored locally as a mapping from `bundle_id` to a list of base-58 signatures.
/// - If the bundle ID is not known locally, an error is returned.
/// - Status resolution is delegated to the same logic used by `getSignatureStatuses`:
/// statuses are computed from locally stored transactions (and may fall back to a remote
/// datasource, if configured).
#[rpc(meta, name = "getBundleStatuses")]
fn get_bundle_statuses(
&self,
meta: Self::Metadata,
bundle_id: String,
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>>;
}

#[derive(Clone)]
Expand All @@ -83,7 +126,7 @@ impl Jito for SurfpoolJitoRpc {
)));
}

let Some(_ctx) = &meta else {
let Some(ctx) = &meta else {
return Err(RpcCustomError::NodeUnhealthy {
num_slots_behind: None,
}
Expand All @@ -102,7 +145,7 @@ impl Jito for SurfpoolJitoRpc {
let bundle_config = Some(SurfpoolRpcSendTransactionConfig {
base: RpcSendTransactionConfig {
skip_preflight: true,
..base_config.clone()
..base_config
},
skip_sig_verify: None,
});
Expand Down Expand Up @@ -136,7 +179,40 @@ impl Jito for SurfpoolJitoRpc {
let mut hasher = Sha256::new();
hasher.update(concatenated_signatures.as_bytes());
let bundle_id = hasher.finalize();
Ok(hex::encode(bundle_id))
let bundle_id = hex::encode(bundle_id);

let _ = ctx
.simnet_commands_tx
.send(surfpool_types::SimnetCommand::SendBundle((
bundle_id.clone(),
bundle_signatures
.iter()
.map(|sig| sig.to_string())
.collect(),
)));

Ok(bundle_id)
}

fn get_bundle_statuses(
&self,
meta: Self::Metadata,
bundle_id: String,
) -> BoxFuture<Result<RpcResponse<Vec<Option<TransactionStatus>>>>> {
Box::pin(async move {
let Some(ctx) = &meta else {
return Err(RpcCustomError::NodeUnhealthy {
num_slots_behind: None,
}
.into());
};

let signatures = ctx.svm_locker.get_bundle(bundle_id)?;

SurfpoolFullRpc
.get_signature_statuses(meta.clone(), signatures, None)
.await
})
}
}

Expand Down Expand Up @@ -421,4 +497,130 @@ mod tests {
"Bundle ID should match SHA-256 of comma-separated signatures"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_send_bundle_persists_bundle_signatures() {
let payer = Keypair::new();
let recipient = Pubkey::new_unique();
let (mempool_tx, mempool_rx) = crossbeam_channel::unbounded();
let setup = TestSetup::new_with_mempool(SurfpoolJitoRpc, mempool_tx);

let recent_blockhash = setup
.context
.svm_locker
.with_svm_reader(|svm_reader| svm_reader.latest_blockhash());

// Airdrop to payer so tx can succeed in our manual processing
let _ = setup
.context
.svm_locker
.0
.write()
.await
.airdrop(&payer.pubkey(), 2 * LAMPORTS_PER_SOL);

let tx = build_v0_transaction(
&payer.pubkey(),
&[&payer],
&[system_instruction::transfer(
&payer.pubkey(),
&recipient,
LAMPORTS_PER_SOL,
)],
&recent_blockhash,
);
let tx_encoded = bs58::encode(bincode::serialize(&tx).unwrap()).into_string();

// Build expected signatures locally (what we expect to be persisted under bundle_id)
let expected_sigs = vec![tx.signatures[0].to_string()];

let setup_clone = setup.clone();
let handle = hiro_system_kit::thread_named("send_bundle")
.spawn(move || {
setup_clone
.rpc
.send_bundle(Some(setup_clone.context), vec![tx_encoded], None)
})
.unwrap();

let mut processed_tx = false;
let mut processed_bundle = false;
let mut bundle_id_from_cmd: Option<String> = None;
let mut sigs_from_cmd: Option<Vec<String>> = None;

while !(processed_tx && processed_bundle) {
match mempool_rx.recv() {
Ok(SimnetCommand::ProcessTransaction(_, tx, status_tx, _, _)) => {
let mut writer = setup.context.svm_locker.0.write().await;
let slot = writer.get_latest_absolute_slot();
writer.transactions_queued_for_confirmation.push_back((
tx.clone(),
status_tx.clone(),
None,
));
let sig = tx.signatures[0];
let tx_with_status_meta = TransactionWithStatusMeta {
slot,
transaction: tx,
..Default::default()
};
writer
.transactions
.store(
sig.to_string(),
SurfnetTransactionStatus::processed(
tx_with_status_meta,
std::collections::HashSet::new(),
),
)
.unwrap();
status_tx
.send(TransactionStatusEvent::Success(
TransactionConfirmationStatus::Confirmed,
))
.unwrap();
processed_tx = true;
}
Ok(SimnetCommand::SendBundle((bundle_id, signatures))) => {
setup
.context
.svm_locker
.process_bundle(bundle_id.clone(), signatures.clone())
.unwrap();
bundle_id_from_cmd = Some(bundle_id);
sigs_from_cmd = Some(signatures);
processed_bundle = true;
}
Ok(SimnetCommand::AirdropProcessed) => continue,
other => panic!("unexpected simnet command: {:?}", other),
}
}

let result = handle.join().unwrap().expect("sendBundle should succeed");
let stored_bundle_id = bundle_id_from_cmd.expect("should have received SendBundle command");
assert_eq!(
result, stored_bundle_id,
"sendBundle result bundle id should match stored bundle id"
);

let persisted = setup
.context
.svm_locker
.get_bundle(stored_bundle_id.clone())
.expect("bundle should be persisted");
assert!(
!persisted.is_empty(),
"svm_locker.get_bundle(bundle_id) should not be empty"
);

let sigs_from_cmd = sigs_from_cmd.expect("should have captured signatures from SendBundle");
assert_eq!(
sigs_from_cmd, expected_sigs,
"Signatures in SendBundle command should match locally built signatures"
);
assert_eq!(
persisted, expected_sigs,
"Persisted bundle signatures should match locally built signatures"
);
}
}
5 changes: 5 additions & 0 deletions crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ pub async fn start_block_production_runloop(
do_produce_block = true;
}
}
SimnetCommand::SendBundle((bundle_id, signatures)) => {
if let Err(e) = svm_locker.process_bundle(bundle_id, signatures) {
let _ = svm_locker.simnet_events_tx().send(SimnetEvent::error(format!("Failed to send jito bundle: {}", e)));
}
}
SimnetCommand::Terminate(_) => {
// Explicitly shutdown storage to trigger WAL checkpoint before exiting
svm_locker.shutdown();
Expand Down
17 changes: 17 additions & 0 deletions crates/core/src/surfnet/locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,18 @@ impl SurfnetSvmLocker {
}
}

pub fn get_bundle(&self, bundle_id: String) -> SurfpoolResult<Vec<String>> {
self.with_svm_reader(|svm_reader| {
if let Ok(get_signatures_result) = svm_reader.jito_bundles.get(&bundle_id)
&& let Some(signatures) = get_signatures_result
{
Ok(signatures)
} else {
Err(SurfpoolError::invalid_bundle_id())
}
})
}

/// Retrieves a transaction from local cache, returning a contextualized result.
pub fn get_transaction_local(
&self,
Expand Down Expand Up @@ -1108,6 +1120,11 @@ impl SurfnetSvmLocker {
Ok(())
}

pub fn process_bundle(&self, bundle_id: String, signatures: Vec<String>) -> SurfpoolResult<()> {
self.with_svm_writer(|svm_writer| svm_writer.jito_bundles.store(bundle_id, signatures))?;
Ok(())
}

pub async fn profile_transaction(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/surfnet/svm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub struct SurfnetSvm {
pub chain_tip: BlockIdentifier,
pub blocks: Box<dyn Storage<u64, BlockHeader>>,
pub transactions: Box<dyn Storage<String, SurfnetTransactionStatus>>,
pub jito_bundles: Box<dyn Storage<String, Vec<String>>>,
pub transactions_queued_for_confirmation: VecDeque<(
VersionedTransaction,
Sender<TransactionStatusEvent>,
Expand Down Expand Up @@ -343,6 +344,7 @@ impl SurfnetSvm {
// Wrap all storage fields with OverlayStorage
blocks: OverlayStorage::wrap(self.blocks.clone_box()),
transactions: OverlayStorage::wrap(self.transactions.clone_box()),
jito_bundles: OverlayStorage::wrap(self.jito_bundles.clone_box()),
profile_tag_map: OverlayStorage::wrap(self.profile_tag_map.clone_box()),
simulated_transaction_profiles: OverlayStorage::wrap(
self.simulated_transaction_profiles.clone_box(),
Expand Down Expand Up @@ -456,6 +458,7 @@ impl SurfnetSvm {
)?;
let blocks_db = new_kv_store(&database_url, "blocks", surfnet_id)?;
let transactions_db = new_kv_store(&database_url, "transactions", surfnet_id)?;
let jito_bundles_db = new_kv_store(&database_url, "jito_bundles", surfnet_id)?;
let token_accounts_db = new_kv_store(&database_url, "token_accounts", surfnet_id)?;
let mut token_mints_db: Box<dyn Storage<String, MintAccount>> =
new_kv_store(&database_url, "token_mints", surfnet_id)?;
Expand Down Expand Up @@ -545,6 +548,7 @@ impl SurfnetSvm {
chain_tip,
blocks: blocks_db,
transactions: transactions_db,
jito_bundles: jito_bundles_db,
perf_samples: VecDeque::new(),
transactions_processed,
simnet_events_tx,
Expand Down
1 change: 1 addition & 0 deletions crates/types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ pub enum SimnetCommand {
CompleteRunbookExecution(String, Option<Vec<String>>),
FetchRemoteAccounts(Vec<Pubkey>, String),
AirdropProcessed,
SendBundle((String, Vec<String>)),
}

#[derive(Debug)]
Expand Down
Loading