Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
admin::AdminRpc, bank_data::BankData, full::Full, jito::Jito, minimal::Minimal,
surfnet_cheatcodes::SurfnetCheatcodes, ws::Rpc,
},
surfnet::{GeyserEvent, PluginCommand, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient},
surfnet::{GeyserEvent, PluginCommand, ProfilingJob, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient},
};

const BLOCKHASH_SLOT_TTL: u64 = 75;
Expand Down Expand Up @@ -247,6 +247,9 @@ pub async fn start_local_surfnet_runloop(
simnet_events_tx_cc.send(SimnetEvent::error(format!("Geyser plugin failed: {e}")));
}
};

setup_profiling(&svm_locker);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wrap this in an:

if simnet_config.instruction_profiling_enabled {
    setup_profiling(&svm_locker);
}



let (clock_event_rx, clock_command_tx) =
start_clock_runloop(simnet_config.slot_time, Some(simnet_events_tx_cc.clone()));
Expand Down Expand Up @@ -1138,3 +1141,57 @@ async fn start_ws_rpc_server_runloop(
.map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?;
Ok(_ws_handle)
}


pub fn setup_profiling(svm_locker: &SurfnetSvmLocker) {
let simnet_events_tx = svm_locker.simnet_events_tx();
let (profiling_job_tx, profiling_job_rx) = crossbeam_channel::bounded(128);
start_profiling_runloop(profiling_job_rx, simnet_events_tx);
svm_locker.with_svm_writer(|svm| {
svm.profiling_job_tx = Some(profiling_job_tx);
});
}

pub fn start_profiling_runloop(
profiling_job_rx: Receiver<ProfilingJob>,
simnet_events_tx: Sender<SimnetEvent>,
) {
// no need of this channel in profiling
let (temp_status_tx, _) = crossbeam_channel::unbounded();
hiro_system_kit::thread_named("Instruction Profiler").spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("Failed to build profiling runtime");

while let Ok(job) = profiling_job_rx.recv() {
let result = rt.block_on(async {
let profiling_locker = SurfnetSvmLocker::new(job.profiling_svm);
profiling_locker
.generate_instruction_profiles(
&job.transaction,
&job.transaction_accounts,
&job.loaded_addresses,
&job.accounts_before,
&job.token_accounts_before,
&job.token_programs,
job.pre_execution_capture,
&temp_status_tx
)
.await
});

let profiles = match result {
Ok(profiles) => profiles,
Err(e) => {
let _ = simnet_events_tx.try_send(SimnetEvent::error(format!(
"Instruction profiling failed: {}", e
)));
None
}
};
let _ = job.result_tx.send(profiles);
}
}).expect("Failed to spawn Instruction Profiler thread");
}
122 changes: 89 additions & 33 deletions crates/core/src/surfnet/locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::{
error::{SurfpoolError, SurfpoolResult},
helpers::time_travel::calculate_time_travel_clock,
rpc::utils::{convert_transaction_metadata_from_canonical, verify_pubkey},
surfnet::{FINALIZATION_SLOT_THRESHOLD, SLOTS_PER_EPOCH},
surfnet::{FINALIZATION_SLOT_THRESHOLD, ProfilingJob, SLOTS_PER_EPOCH},
types::{
GeyserAccountUpdate, OfflineAccountConfig, RemoteRpcResult, SurfnetTransactionStatus,
TimeTravelConfig, TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta,
Expand Down Expand Up @@ -1096,7 +1096,7 @@ impl SurfnetSvmLocker {
) -> SurfpoolResult<()> {
let do_propagate_status_updates = true;
let signature = transaction.signatures[0];
let profile_result = match self
let (profile_result, ix_profile_rx) = match self
.fetch_all_tx_accounts_then_process_tx_returning_profile_res(
remote_ctx,
transaction,
Expand Down Expand Up @@ -1133,6 +1133,32 @@ impl SurfnetSvmLocker {
self.with_svm_writer(|svm_writer| {
svm_writer.write_executed_profile_result(signature, profile_result)
})?;

// Spawn an async task to receive profiling results and append them to the
// stored profiles
if let Some(rx) = ix_profile_rx {
let locker = self.clone();
tokio::spawn(async move {
let ix_profiles = tokio::task::spawn_blocking(move || rx.recv().ok().flatten())
.await
.ok()
.flatten();
if let Some(profiles) = ix_profiles {
locker.with_svm_writer(|svm_writer| {
if let Ok(Some(mut keyed_profile)) = svm_writer
.executed_transaction_profiles
.get(&signature.to_string())
{
keyed_profile.instruction_profiles = Some(profiles);
let _ = svm_writer
.executed_transaction_profiles
.store(signature.to_string(), keyed_profile);
}
});
}
});
}

Ok(())
}

Expand All @@ -1153,7 +1179,7 @@ impl SurfnetSvmLocker {
let skip_preflight = true; // skip preflight checks during transaction profiling
let sigverify = true; // do verify signatures during transaction profiling
let do_propagate_status_updates = false; // don't propagate status updates during transaction profiling
let mut profile_result = svm_locker
let (mut profile_result, ix_profile_rx) = svm_locker
.fetch_all_tx_accounts_then_process_tx_returning_profile_res(
remote_ctx,
transaction,
Expand All @@ -1171,9 +1197,37 @@ impl SurfnetSvmLocker {
svm_writer.write_simulated_profile_result(uuid, tag, profile_result)
})?;

if let Some(rx) = ix_profile_rx {
let locker = self.clone();
tokio::spawn(async move {
let ix_profiles = tokio::task::spawn_blocking(move || rx.recv().ok().flatten())
.await
.ok()
.flatten();
if let Some(profiles) = ix_profiles {
locker.with_svm_writer(|svm_writer| {
if let Ok(Some(mut keyed_profile)) = svm_writer
.simulated_transaction_profiles
.get(&uuid.to_string())
{
keyed_profile.instruction_profiles = Some(profiles);
let _ = svm_writer
.simulated_transaction_profiles
.store(uuid.to_string(), keyed_profile);
}
});
}
});
}

Ok(self.with_contextualized_svm_reader(|_| uuid))
}


pub fn profiling_job_tx(&self) -> Option<Sender<ProfilingJob>> {
self.with_svm_reader(|svm| svm.profiling_job_tx.clone())
}

async fn fetch_all_tx_accounts_then_process_tx_returning_profile_res(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
Expand All @@ -1182,7 +1236,7 @@ impl SurfnetSvmLocker {
skip_preflight: bool,
sigverify: bool,
do_propagate: bool,
) -> SurfpoolResult<KeyedProfileResult> {
) -> SurfpoolResult<(KeyedProfileResult, Option<crossbeam_channel::Receiver<Option<Vec<ProfileResult>>>>)> {
let signature = transaction.signatures[0];

// Sigverify the transaction upfront before doing any account fetching or other pre-processing.
Expand Down Expand Up @@ -1338,29 +1392,28 @@ impl SurfnetSvmLocker {

let loaded_addresses = tx_loaded_addresses.as_ref().map(|l| l.loaded_addresses());

let ix_profiles = if self.is_instruction_profiling_enabled() {
match self
.generate_instruction_profiles(
&transaction,
&transaction_accounts,
&tx_loaded_addresses,
&accounts_before,
&token_accounts_before,
&token_programs,
pre_execution_capture.clone(),
&status_tx,
)
.await
{
Ok(profiles) => profiles,
Err(e) => {
let _ = self.simnet_events_tx().try_send(SimnetEvent::error(format!(
"Failed to generate instruction profiles: {}",
e
)));
None
}
let ix_profile_rx = if self.is_instruction_profiling_enabled() {
if let Some(profiling_job_tx) = self.profiling_job_tx() {
let profiling_svm = self.with_svm_reader(|r| r.clone_for_profiling());
let (result_tx, result_rx) = crossbeam_channel::bounded(1);
let job = ProfilingJob {
profiling_svm,
transaction: transaction.clone(),
transaction_accounts: transaction_accounts.to_vec(),
loaded_addresses: tx_loaded_addresses.clone(),
accounts_before: accounts_before.to_vec(),
token_accounts_before: token_accounts_before.to_vec(),
token_programs: token_programs.to_vec(),
pre_execution_capture: pre_execution_capture.clone(),
result_tx,
};
let _ = profiling_job_tx.send(job);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this .send is blocking. If our rx end of this gets filled up (bounded to 128 messages in the queue), this send will wait for the queue to free up a spot. This means the actual tx processing (which happens below) is being delayed.

Instead let's us try_send and handle errors. Something like:

match profiling_job_tx.try_send(job) {
    Ok(()) => Some(result_rx),
    Err(crossbeam_channel::TrySendError::Full(job)) => {
        // log warning or emit event saying this transaction will not be profiled
        None
    }
    Err(crossbeam_channel::TrySendError::Disconnected(job)) => {
        // worker is gone; log an error and just disable ix profiling so we don't log again and again
        None
    }
}

Some(result_rx)
}
else {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If instruction profiling is enabled, and there is no profiling_job_tx, I assume this means there was a configuration error (setup_profiling wasn't called). Can we log a warning in this case?

The runloop file does call setup_profiling, so in the default case this won't ever be an issue, but if someone is trying to directly consume surfpool as an SDK and isn't using the runloop functions, it could be a helpful warning.

None
}

} else {
None
};
Expand All @@ -1381,17 +1434,20 @@ impl SurfnetSvmLocker {
)
.await?;

Ok(KeyedProfileResult::new(
latest_absolute_slot,
UuidOrSignature::Signature(signature),
ix_profiles,
profile_result,
readonly_account_states,
Ok((
KeyedProfileResult::new(
latest_absolute_slot,
UuidOrSignature::Signature(signature),
None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So previously we would immediately return ix profiles, now that obviously isn't possible. But this None could maybe be misleading - like there is no ix profile at all, or profiling is disabled. What if the profile results were stored in an enum:

enum InstructionProfiles {
    // ix profiling disabled, serializes to just `null` like the `None` variant of our option would
    Unavailable, 
    // what we would return in this line if profiling is enabled - indicates to the user that results will be available later
    Pending,
    // what is filled in upon completion
    Ready(Vec<ProfileResult>),
    // what is filled in if profiling fails
    Failed(String),
}

We'd then have to have this serialized as such:

{
  "instructionProfiles": null,
  "instructionProfilesStatus": "unavailable"
}
{
  "instructionProfiles": null,
  "instructionProfilesStatus": "pending"
}
{
  "instructionProfiles": [ ... ],
  "instructionProfilesStatus": "ready"
}
{
  "instructionProfiles": null,
  "instructionProfilesStatus": "failed: {error}"
}

profile_result,
readonly_account_states,
),
ix_profile_rx,
))
}

#[allow(clippy::too_many_arguments)]
async fn generate_instruction_profiles(
pub async fn generate_instruction_profiles(
&self,
transaction: &VersionedTransaction,
transaction_accounts: &[Pubkey],
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/surfnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use solana_signature::Signature;
use solana_transaction::versioned::VersionedTransaction;
use solana_transaction_error::TransactionError;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, TransactionStatus};
use surfpool_types::{ExecutionCapture, ProfileResult};
use svm::SurfnetSvm;

use crate::{
PluginInfo,
error::{SurfpoolError, SurfpoolResult},
types::{GeyserAccountUpdate, TransactionWithStatusMeta},
types::{GeyserAccountUpdate, TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta},
};

pub mod locker;
Expand Down Expand Up @@ -195,6 +196,18 @@ pub enum SnapshotImportStatus {
Failed,
}

pub struct ProfilingJob {
pub profiling_svm: SurfnetSvm,
pub transaction: VersionedTransaction,
pub transaction_accounts: Vec<Pubkey>,
pub loaded_addresses: Option<TransactionLoadedAddresses>,
pub accounts_before: Vec<Option<Account>>,
pub token_accounts_before: Vec<(usize, TokenAccount)>,
pub token_programs: Vec<Pubkey>,
pub pre_execution_capture: ExecutionCapture,
pub result_tx: Sender<Option<Vec<ProfileResult>>>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum SignatureSubscriptionType {
Received,
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/surfnet/svm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use crate::{
scenarios::TemplateRegistry,
storage::{OverlayStorage, Storage, new_kv_store, new_kv_store_with_default},
surfnet::{
LogsSubscriptionData, locker::is_supported_token_program, surfnet_lite_svm::SurfnetLiteSvm,
LogsSubscriptionData, ProfilingJob, locker::is_supported_token_program, surfnet_lite_svm::SurfnetLiteSvm
},
types::{
GeyserAccountUpdate, MintAccount, OfflineAccountConfig, SerializableAccountAdditionalData,
Expand Down Expand Up @@ -235,6 +235,7 @@ pub struct SurfnetSvm {
Sender<TransactionStatusEvent>,
Option<TransactionError>,
)>,
pub profiling_job_tx: Option<Sender<ProfilingJob>>,
pub perf_samples: VecDeque<RpcPerfSample>,
pub transactions_processed: u64,
pub latest_epoch_info: EpochInfo,
Expand Down Expand Up @@ -344,6 +345,7 @@ impl SurfnetSvm {
inner: self.inner.clone_for_profiling(),
remote_rpc_url: self.remote_rpc_url.clone(),
chain_tip: self.chain_tip.clone(),
profiling_job_tx: self.profiling_job_tx.clone(),

// Wrap all storage fields with OverlayStorage
blocks: OverlayStorage::wrap(self.blocks.clone_box()),
Expand Down Expand Up @@ -557,6 +559,7 @@ impl SurfnetSvm {
chain_tip,
blocks: blocks_db,
transactions: transactions_db,
profiling_job_tx: None,
perf_samples: VecDeque::new(),
transactions_processed,
simnet_events_tx,
Expand Down
23 changes: 22 additions & 1 deletion crates/core/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,34 @@ use crossbeam_channel::Sender;
use solana_clock::Clock;
use solana_epoch_info::EpochInfo;
use solana_transaction::versioned::VersionedTransaction;
use surfpool_types::{CheatcodeConfig, RpcConfig, SimnetCommand};
use surfpool_types::{CheatcodeConfig, RpcConfig, RpcProfileResultConfig, SimnetCommand, UiKeyedProfileResult, types::UuidOrSignature};

use crate::{
rpc::RunloopContext,
surfnet::{PluginCommand, locker::SurfnetSvmLocker, svm::SurfnetSvm},
};

/// Polls `get_profile_result` until `instruction_profiles` is populated or the timeout is reached.
/// Instruction profiles are appended asynchronously, so tests need to wait for them.
pub async fn poll_for_instruction_profiles(
svm_locker: &SurfnetSvmLocker,
key: UuidOrSignature,
config: &RpcProfileResultConfig,
timeout: std::time::Duration,
) -> UiKeyedProfileResult {
let start = std::time::Instant::now();
loop {
let result = svm_locker
.get_profile_result(key.clone(), config)
.unwrap()
.expect("Profile result should exist");
if result.instruction_profiles.is_some() || start.elapsed() >= timeout {
return result;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}

pub fn get_free_port() -> Result<u16, String> {
let listener =
TcpListener::bind("127.0.0.1:0").map_err(|e| format!("Failed to bind to port 0: {}", e))?;
Expand Down
Loading