diff --git a/crates/cli/CHANGELOG.md b/crates/cli/CHANGELOG.md index fec80baa..bf593304 100644 --- a/crates/cli/CHANGELOG.md +++ b/crates/cli/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +- cli is now solely responsible for intercepting CTRL-C signals + - to shutdown background tasks, we rely on [`CancellationToken`s](https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html) + - we no longer require two-phase cancellation (CTRL-C once to stop spamming, CTRL-C again to stop result collection) + - result collection happens async, so when the user cancels, most results will have already been collected + - stopping quickly is a better UX than two-phase + +### Breaking changes + +- `commands::spam::spam` removes the `&mut TestScenario` param, creates a `TestScenario` from `spam_args` instead +- `SendSpamCliArgs` replaces `--loops [NUM_LOOPS]` (optional `usize`) with `--forever` (`bool`) +- some functions are moved from `utils` to `commands::spam` +- `commands::spamd` has been deleted (it was just a junk wrapper for `spam`) + ## [0.6.0](https://github.com/flashbots/contender/releases/tag/v0.6.0) - 2025-11-25 - support new ENV vars ([#376](https://github.com/flashbots/contender/pull/376)) diff --git a/crates/cli/src/commands/campaign.rs b/crates/cli/src/commands/campaign.rs index f097a015..a1cd6c95 100644 --- a/crates/cli/src/commands/campaign.rs +++ b/crates/cli/src/commands/campaign.rs @@ -308,7 +308,7 @@ fn create_spam_cli_args( }, duration: spam_duration, pending_timeout: args.pending_timeout, - loops: Some(Some(1)), + run_forever: false, accounts_per_agent: args.accounts_per_agent, }, ignore_receipts: args.ignore_receipts, @@ -374,7 +374,6 @@ async fn execute_stage( }; let spam_args = SpamCommandArgs::new(spam_scenario, spam_cli_args)?; - let scenario = spam_args.init_scenario(db).await?; let duration = stage.duration; let db = db.clone(); let campaign_id_owned = campaign_id.to_owned(); @@ -403,8 +402,7 @@ async fn execute_stage( // Wait for all parallel scenarios to be ready before starting barrier_clone.wait().await; - let mut scenario = scenario; - let run_res = commands::spam(&db, &spam_args, &mut scenario, ctx).await; + let run_res = commands::spam(&db, &spam_args, ctx).await; match run_res { Ok(Some(run_id)) => { info!( diff --git a/crates/cli/src/commands/common.rs b/crates/cli/src/commands/common.rs index 7aeebad4..26b83644 100644 --- a/crates/cli/src/commands/common.rs +++ b/crates/cli/src/commands/common.rs @@ -294,12 +294,13 @@ Requires --priv-key to be set for each 'from' address in the given testfile.", /// If set without a value, the spam run will be repeated indefinitely. /// If not set, the spam run will be executed once. #[arg( - short, - long, - num_args = 0..=1, - long_help = "The number of times to repeat the spam run. If set with a value, the spam run will be repeated this many times. If set without a value, the spam run will be repeated indefinitely. If not set, the spam run will be repeated once." + global = true, + default_value_t = false, // pretty sure this line is unnecessary but it makes me feel safe + long = "forever", + long_help = "Run spammer indefinitely.", + visible_aliases = ["indefinite", "indefinitely", "infinite"] )] - pub loops: Option>, + pub run_forever: bool, /// The number of accounts to generate for each agent (`from_pool` in scenario files) #[arg( diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 58dd131f..15f60336 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -7,13 +7,11 @@ pub mod error; pub mod replay; mod setup; mod spam; -mod spamd; use clap::Parser; -pub use contender_subcommand::{ContenderSubcommand, DbCommand}; -pub use setup::{setup, SetupCommandArgs}; -pub use spam::{spam, EngineArgs, SpamCampaignContext, SpamCliArgs, SpamCommandArgs, SpamScenario}; -pub use spamd::spamd; +pub use contender_subcommand::*; +pub use setup::*; +pub use spam::*; use crate::error::CliError; diff --git a/crates/cli/src/commands/spam.rs b/crates/cli/src/commands/spam.rs index adbf341b..a28036c6 100644 --- a/crates/cli/src/commands/spam.rs +++ b/crates/cli/src/commands/spam.rs @@ -9,17 +9,26 @@ use crate::{ error::CliError, util::{ bold, check_private_keys, fund_accounts, load_seedfile, load_testconfig, parse_duration, - provider::AuthClient, spam_callback_default, TypedSpamCallback, + provider::AuthClient, }, LATENCY_HIST as HIST, PROM, }; -use alloy::{consensus::TxType, primitives::U256, transports::http::reqwest::Url}; +use alloy::{ + consensus::TxType, + primitives::{utils::format_ether, U256}, + transports::http::reqwest::Url, +}; use contender_core::{ agent_controller::AgentStore, db::{DbOps, SpamDuration, SpamRunRequest}, error::{RuntimeErrorKind, RuntimeParamErrorKind}, - generator::{seeder::Seeder, templater::Templater, types::SpamRequest, PlanConfig, RandSeed}, - spammer::{BlockwiseSpammer, Spammer, TimedSpammer}, + generator::{ + seeder::Seeder, + templater::Templater, + types::{AnyProvider, SpamRequest}, + PlanConfig, RandSeed, + }, + spammer::{BlockwiseSpammer, LogCallback, NilCallback, Spammer, TimedSpammer}, test_scenario::{TestScenario, TestScenarioParams}, util::get_block_time, }; @@ -205,7 +214,7 @@ impl SpamCommandArgs { txs_per_block, duration, pending_timeout, - loops, + run_forever, accounts_per_agent, } = self.spam_args.spam_args.clone(); let SendTxsCliArgsInner { @@ -478,17 +487,8 @@ impl SpamCommandArgs { } done_fcu.store(true, std::sync::atomic::Ordering::SeqCst); - if loops.is_some_and(|inner_loops| inner_loops.is_none()) { - warn!("Spammer agents will eventually run out of funds."); - println!( - "Make sure you add plenty of funds with {} (set your pre-funded account with {}).", - bold("spam --min-balance"), - bold("spam -p"), - ); - } - - let total_cost = U256::from(duration * loops.flatten().unwrap_or(1)) - * test_scenario.get_max_spam_cost(&user_signers).await?; + let total_cost = + U256::from(duration) * test_scenario.get_max_spam_cost(&user_signers).await?; if min_balance < U256::from(total_cost) { return Err(ArgsError::MinBalanceInsufficient { min_balance, @@ -497,6 +497,26 @@ impl SpamCommandArgs { .into()); } + let duration_unit = if txs_per_second.is_some() { + "second" + } else { + "block" + }; + let duration_units = if duration > 1 { + format!("{duration_unit}s") + } else { + duration_unit.to_owned() + }; + if run_forever { + warn!("Spammer agents will eventually run out of funds. Each batch of spam (sent over {duration} {duration_units}) will cost {} ETH.", format_ether(total_cost)); + // we use println! after warn! because warn! doesn't properly format bold strings + println!( + "Make sure you add plenty of funds with {} (set your pre-funded account with {}).", + bold("spam --min-balance"), + bold("spam -p"), + ); + } + Ok(test_scenario) } @@ -509,6 +529,39 @@ impl SpamCommandArgs { } } +pub fn spam_callback_default( + log_txs: bool, + send_fcu: bool, + rpc_client: Option>, + auth_client: Option>, + cancel_token: tokio_util::sync::CancellationToken, +) -> TypedSpamCallback { + if let Some(rpc_client) = rpc_client { + if log_txs { + let log_callback = LogCallback { + rpc_provider: rpc_client.clone(), + auth_provider: auth_client, + send_fcu, + cancel_token, + }; + return TypedSpamCallback::Log(log_callback); + } + } + TypedSpamCallback::Nil(NilCallback) +} + +#[derive(Clone)] +pub enum TypedSpamCallback { + Log(LogCallback), + Nil(NilCallback), +} + +impl TypedSpamCallback { + pub fn is_log(&self) -> bool { + matches!(self, TypedSpamCallback::Log(_)) + } +} + enum TypedSpammer { Blockwise(BlockwiseSpammer), Timed(TimedSpammer), @@ -564,16 +617,12 @@ impl TypedSpammer { } /// Runs spammer and returns run ID. -pub async fn spam< - D: DbOps + Clone + Send + Sync + 'static, - S: Seeder + Send + Sync + Clone, - P: PlanConfig + Templater + Send + Sync + Clone, ->( +pub async fn spam( db: &D, args: &SpamCommandArgs, - test_scenario: &mut TestScenario, run_context: SpamCampaignContext, ) -> Result> { + let mut test_scenario = args.init_scenario(db).await?; let SpamCommandArgs { scenario, spam_args, @@ -690,10 +739,29 @@ pub async fn spam< } } - spammer - .spam_rpc(test_scenario, txs_per_batch, duration, run_id, callback) - .await - .map_err(err_parse)?; + loop { + tokio::select! { + res = spammer + .spam_rpc( + &mut test_scenario, + txs_per_batch, + duration, + run_id, + callback.clone(), + ) => { + res.map_err(err_parse)?; + } + + _ = tokio::signal::ctrl_c() => { + println!("\nCTRL-C received, shutting down spam run."); + test_scenario.shutdown().await; + } + } + + if !args.spam_args.spam_args.run_forever || test_scenario.ctx.cancel_token.is_cancelled() { + break; + } + } Ok(run_id) } diff --git a/crates/cli/src/commands/spamd.rs b/crates/cli/src/commands/spamd.rs deleted file mode 100644 index 6c44136b..00000000 --- a/crates/cli/src/commands/spamd.rs +++ /dev/null @@ -1,137 +0,0 @@ -use super::{SpamCampaignContext, SpamCommandArgs}; -use crate::CliError; -use crate::{ - commands::{self}, - util::data_dir, -}; -use contender_core::db::DbOps; -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; -use tracing::{error, info, warn}; - -/// Runs spam in a loop, potentially executing multiple spam runs. -/// -/// If `limit_loops` is `None`, it will run indefinitely. -/// -/// If `limit_loops` is `Some(n)`, it will run `n` times. -/// -/// If `gen_report` is `true`, it will generate a report at the end. -pub async fn spamd( - db: &(impl DbOps + Clone + Send + Sync + 'static), - args: SpamCommandArgs, - gen_report: bool, - limit_loops: Option, -) -> Result<(), CliError> { - let is_done = Arc::new(AtomicBool::new(false)); - let mut scenario = args.init_scenario(db).await?; - - // collects run IDs from the spam command - let mut run_ids = vec![]; - - // if CTRL-C signal is received, set `is_done` to true - { - let is_done = is_done.clone(); - tokio::task::spawn(async move { - tokio::signal::ctrl_c() - .await - .expect("Failed to listen for CTRL-C"); - info!( - "CTRL-C received. Spam daemon will shut down as soon as current batch finishes..." - ); - is_done.store(true, Ordering::SeqCst); - }); - } - - // runs spam command in a loop - let mut i = 0; - // this holds a Some value only when a timeout has been started. - let mut timeout_start = None; - loop { - let mut do_finish = false; - if let Some(loops) = &limit_loops { - if i >= *loops { - do_finish = true; - } - i += 1; - } - if is_done.load(Ordering::SeqCst) { - do_finish = true; - } - if do_finish { - info!("Spam loop finished."); - break; - } - - let db = db.clone(); - let spam_res = - commands::spam(&db, &args, &mut scenario, SpamCampaignContext::default()).await; - let wait_time = Duration::from_secs(3); - - if let Err(e) = spam_res { - error!("spam run failed: {e:?}"); - - if timeout_start.is_none() { - let start_time = std::time::Instant::now(); - timeout_start = Some(start_time); - warn!("retrying in {} seconds...", wait_time.as_secs()); - tokio::time::sleep(wait_time).await; - continue; - } - - if let Some(timeout_start) = timeout_start { - if std::time::Instant::now().duration_since(timeout_start) - > args.spam_args.spam_timeout - { - warn!("timeout reached, quitting spam loop..."); - scenario.ctx.cancel_token.cancel(); - break; - } else { - tokio::time::sleep(wait_time).await; - } - } else { - scenario.ctx.cancel_token.cancel(); - break; - } - } else { - timeout_start = None; - let run_id = spam_res.expect("spam"); - if let Some(run_id) = run_id { - run_ids.push(run_id); - } - } - } - - // generate a report if requested; in closure for tokio::select to handle CTRL-C - let run_report = || async move { - if gen_report { - if run_ids.is_empty() { - warn!("No runs found, exiting."); - return Ok::<_, CliError>(()); - } - let first_run_id = run_ids.iter().min().expect("no run IDs found"); - let last_run_id = *run_ids.iter().max().expect("no run IDs found"); - contender_report::command::report( - Some(last_run_id), - last_run_id - first_run_id, - db, - &data_dir()?, - ) - .await?; - } - Ok(()) - }; - - tokio::select! { - _ = run_report() => {}, - _ = tokio::signal::ctrl_c() => { - info!("CTRL-C received, cancelling report..."); - } - } - - Ok(()) -} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b33f7b9d..202d5d5f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -3,7 +3,7 @@ mod default_scenarios; mod error; mod util; -use crate::commands::error::ArgsError; +use crate::commands::{error::ArgsError, SpamCampaignContext}; use alloy::{ network::AnyNetwork, providers::{DynProvider, ProviderBuilder}, @@ -12,7 +12,7 @@ use alloy::{ }; use commands::{ admin::handle_admin_command, - common::{ScenarioSendTxsCliArgs, SendSpamCliArgs}, + common::ScenarioSendTxsCliArgs, db::{drop_db, export_db, import_db, reset_db}, replay::ReplayArgs, ContenderCli, ContenderSubcommand, DbCommand, SetupCommandArgs, SpamCliArgs, SpamCommandArgs, @@ -26,7 +26,7 @@ use std::{str::FromStr, sync::LazyLock}; use tokio::sync::OnceCell; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; -use util::{bold, data_dir, db_file, init_reports_dir, prompt_continue}; +use util::{data_dir, db_file, init_reports_dir}; static DB: LazyLock = std::sync::LazyLock::new(|| { let path = db_file().expect("failed to get DB file path"); @@ -77,9 +77,6 @@ async fn run() -> Result<(), CliError> { args, builtin_scenario_config, } => { - if !check_spam_args(&args)? { - return Ok(()); - } if builtin_scenario_config.is_some() && args.eth_json_rpc_args.testfile.is_some() { return Err(ArgsError::ScenarioFileBuiltinConflict.into()); } @@ -89,13 +86,9 @@ async fn run() -> Result<(), CliError> { ScenarioSendTxsCliArgs { testfile, rpc_args, .. }, - spam_args, - gen_report, .. } = *args.to_owned(); - let SendSpamCliArgs { loops, .. } = spam_args.to_owned(); - let client = ClientBuilder::default() .http(Url::from_str(&rpc_args.rpc_url).map_err(ArgsError::UrlParse)?); let provider = DynProvider::new( @@ -119,15 +112,8 @@ async fn run() -> Result<(), CliError> { ) }; - let real_loops = if let Some(loops) = loops { - // loops flag is set; spamd will interpret a None value as infinite - loops - } else { - // loops flag is not set, so only loop once - Some(1) - }; - let spamd_args = SpamCommandArgs::new(scenario, *args)?; - commands::spamd(&db, spamd_args, gen_report, real_loops).await?; + let spam_args = SpamCommandArgs::new(scenario, *args)?; + commands::spam(&db, &spam_args, SpamCampaignContext::default()).await?; } ContenderSubcommand::Replay { args } => { @@ -247,37 +233,3 @@ fn init_tracing() { contender_core::util::init_core_tracing(filter); } } - -/// Check if spam arguments are typical and prompt the user to continue if they are not. -/// Returns true if the user chooses to continue, false otherwise. -fn check_spam_args(args: &SpamCliArgs) -> Result { - let (units, max_duration) = if args.spam_args.txs_per_block.is_some() { - ("blocks", 50) - } else if args.spam_args.txs_per_second.is_some() { - ("seconds", 100) - } else { - return Err(ArgsError::SpamRateNotFound.into()); - }; - let duration = args.spam_args.duration; - if duration > max_duration { - let time_limit = duration / max_duration; - let scenario = args - .eth_json_rpc_args - .testfile - .as_deref() - .unwrap_or_default(); - let suggestion_cmd = bold(format!( - "contender spam {scenario} -d {max_duration} -l {time_limit} ..." - )); - println!( -"Duration is set to {duration} {units}, which is quite high. Generating transactions and collecting results may take a long time. -You may want to use {} with a lower spamming duration {} and a loop limit {}:\n -\t{suggestion_cmd}\n", - bold("spam"), - bold("(-d)"), - bold("(-l)") - ); - return Ok(prompt_continue(None)); - } - Ok(true) -} diff --git a/crates/cli/src/util/utils.rs b/crates/cli/src/util/utils.rs index 2070b572..95fcb47a 100644 --- a/crates/cli/src/util/utils.rs +++ b/crates/cli/src/util/utils.rs @@ -16,27 +16,15 @@ use contender_core::{ types::{AnyProvider, FunctionCallDefinition, SpamRequest}, util::complete_tx_request, }, - spammer::{LogCallback, NilCallback}, util::get_blob_fee_maybe, }; -use contender_engine_provider::{ControlChain, DEFAULT_BLOCK_TIME}; +use contender_engine_provider::DEFAULT_BLOCK_TIME; use contender_testfile::TestConfig; -use nu_ansi_term::{AnsiGenericString, Color, Style as ANSIStyle}; +use nu_ansi_term::{AnsiGenericString, Style as ANSIStyle}; use rand::Rng; use std::{str::FromStr, sync::Arc, time::Duration}; use tracing::{debug, info, warn}; -pub enum TypedSpamCallback { - Log(LogCallback), - Nil(NilCallback), -} - -impl TypedSpamCallback { - pub fn is_log(&self) -> bool { - matches!(self, TypedSpamCallback::Log(_)) - } -} - pub const DEFAULT_PRV_KEYS: [&str; 10] = [ "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", @@ -357,45 +345,6 @@ pub async fn find_insufficient_balances( Ok(insufficient_balances) } -pub fn spam_callback_default( - log_txs: bool, - send_fcu: bool, - rpc_client: Option>, - auth_client: Option>, - cancel_token: tokio_util::sync::CancellationToken, -) -> TypedSpamCallback { - if let Some(rpc_client) = rpc_client { - if log_txs { - let log_callback = LogCallback { - rpc_provider: rpc_client.clone(), - auth_provider: auth_client, - send_fcu, - cancel_token, - }; - return TypedSpamCallback::Log(log_callback); - } - } - TypedSpamCallback::Nil(NilCallback) -} - -pub fn prompt_cli(msg: impl AsRef) -> String { - println!("{}", Color::Rgb(252, 186, 3).paint(msg.as_ref())); - - let mut input = String::new(); - std::io::stdin() - .read_line(&mut input) - .expect("Failed to read line"); - input.trim().to_owned() -} - -/// Prompts the user for a yes/no answer. -/// Returns true if the answer starts with 'y' or 'Y', false otherwise. -pub fn prompt_continue(msg: Option<&str>) -> bool { - prompt_cli(msg.unwrap_or("Do you want to continue anyways? [y/N]")) - .to_lowercase() - .starts_with("y") -} - /// Returns the path to the data directory. /// The directory is created if it does not exist. pub fn data_dir() -> Result { diff --git a/crates/core/CHANGELOG.md b/crates/core/CHANGELOG.md index e87b2f01..d7a9c561 100644 --- a/crates/core/CHANGELOG.md +++ b/crates/core/CHANGELOG.md @@ -5,6 +5,29 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +- core no longer processes CTRL-C signals ([#396](https://github.com/flashbots/contender/pull/396/files)) + - instead, `TestScenario` uses a `cancel_token` to shut its processes down + - `cancel_token.cancel()` is triggered by the caller (e.g. the CLI) +- pending txs are now processed asynchronously ([#396](https://github.com/flashbots/contender/pull/396/files)) + - `cancel_token.cancelled()` terminates ALL bg processes, making shutdown nearly immediate (and one-step, not two like previously) +- `TxActor` runs receipt-processing internally (automatically, async) +- `TxActor` adds a new function `update_ctx_target_block` to internally track the target block to collect receipts from + - and `is_shutting_down` to report whether it will continue processing +- `TxActorHandle` adds a new function `done_flushing` to track whether it's done emptying the cache internally +- `TestScenario` added a new function `shutdown` to trigger cancellation on its `CancellationToken` + +### Breaking changes + +- `spammer::error::CallbackError::OneshotSend` now requires a string parameter to be passed along with it +- `SpamRunContext` removed `do_quit` (it was an unnecessarily-copied clone of `TestScenario.ctx.cancel_token`) +- `SpamRunContext` removed `get_msg_handler` (replaced with `TestScenario.tx_actor()`) +- `TxActor` changes the signature of `flush_cache`, `dump_cache`, `remove_cached_tx`, `handle_message` +- `TxActorHandle` adds a new function `init_ctx` which must be called before trying to process receipts +- `flush_tx_cache` removed from `TestScenario` (queue is now passively managed) + + ## [0.6.0](https://github.com/flashbots/contender/releases/tag/v0.6.0) - 2025-11-25 - support groth16 proof verification in fuzzer ([#379](https://github.com/flashbots/contender/pull/379)) diff --git a/crates/core/src/spammer/error.rs b/crates/core/src/spammer/error.rs index 35c86951..d53798b2 100644 --- a/crates/core/src/spammer/error.rs +++ b/crates/core/src/spammer/error.rs @@ -29,14 +29,17 @@ pub enum CallbackError { #[error("failed to flush cache. pending txs: {0:?}")] FlushCache(Vec), + #[error("ctx cannot be updated before it's initialized")] + UpdateRequiresCtx, + #[error("failed to send mpsc message: {0}")] TxActorSendMessage(#[from] Box>), #[error("failed to send mpsc message: {0}")] MpscSendAddrNonce(#[from] mpsc::error::SendError<(Address, u64)>), - #[error("oneshot failed to send")] - OneshotSend(()), + #[error("oneshot failed to send: {0}")] + OneshotSend(String), #[error("oneshot receiver failed: {0}")] OneshotReceive(#[from] oneshot::error::RecvError), diff --git a/crates/core/src/spammer/spammer_trait.rs b/crates/core/src/spammer/spammer_trait.rs index 4503293d..3d8c9f20 100644 --- a/crates/core/src/spammer/spammer_trait.rs +++ b/crates/core/src/spammer/spammer_trait.rs @@ -6,26 +6,25 @@ use alloy::providers::Provider; use contender_engine_provider::DEFAULT_BLOCK_TIME; use futures::Stream; use futures::StreamExt; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; +use super::tx_callback::OnBatchSent; +use super::OnTxSent; +use super::SpamTrigger; use crate::db::SpamDuration; +use crate::spammer::tx_actor::ActorContext; use crate::spammer::CallbackError; use crate::{ db::DbOps, - generator::{seeder::Seeder, templater::Templater, types::AnyProvider, PlanConfig}, + generator::{seeder::Seeder, templater::Templater, PlanConfig}, test_scenario::TestScenario, Result, }; -use super::tx_callback::OnBatchSent; -use super::SpamTrigger; -use super::{tx_actor::TxActorHandle, OnTxSent}; - #[derive(Clone)] pub struct SpamRunContext { done_sending: Arc, done_fcu: Arc, - do_quit: tokio_util::sync::CancellationToken, } impl SpamRunContext { @@ -39,7 +38,6 @@ impl Default for SpamRunContext { Self { done_sending: Arc::new(AtomicBool::new(false)), done_fcu: Arc::new(AtomicBool::new(false)), - do_quit: tokio_util::sync::CancellationToken::new(), } } } @@ -51,10 +49,6 @@ where S: Seeder + Send + Sync + Clone, P: PlanConfig + Templater + Send + Sync + Clone, { - fn get_msg_handler(&self, db: Arc, rpc_client: Arc) -> TxActorHandle { - TxActorHandle::new(12, db.clone(), rpc_client.clone()) - } - fn context(&self) -> &SpamRunContext; fn on_spam( @@ -77,8 +71,11 @@ where let is_fcu_done = self.context().done_fcu.clone(); let is_sending_done = self.context().done_sending.clone(); let auth_provider = scenario.auth_provider.clone(); - // run loop in background to call fcu when spamming is done + let start_block = scenario.rpc_client.get_block_number().await.map_err(|e| { + CallbackError::ProviderCall(format!("failed to get block number: {e}")) + })?; + // run loop in background to call fcu when spamming is done let fcu_handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { if let Some(auth_client) = &auth_provider { loop { @@ -108,24 +105,21 @@ where let tx_req_chunks = scenario .get_spam_tx_chunks(txs_per_period, num_periods) .await?; - let start_block = scenario.rpc_client.get_block_number().await.map_err(|e| { - CallbackError::ProviderCall(format!("failed to get block number: {e}")) - })?; let mut cursor = self.on_spam(scenario).await?.take(num_periods as usize); if scenario.should_sync_nonces { scenario.sync_nonces().await?; } - // calling cancel() on cancel_token should stop all running tasks - // (as long as each task checks for it) - let cancel_token = self.context().do_quit.clone(); + let actor_ctx = ActorContext::new(start_block, run_id); + scenario.tx_actor().init_ctx(actor_ctx).await?; // run spammer within tokio::select! to allow for graceful shutdown + let do_quit = scenario.ctx.cancel_token.clone(); let spam_finished: bool = tokio::select! { - _ = tokio::signal::ctrl_c() => { - warn!("CTRL-C received, stopping spamming..."); - cancel_token.cancel(); + _ = do_quit.cancelled() => { + debug!("CTRL-C received, dropping execute_spammer call..."); + false }, @@ -143,36 +137,13 @@ where .done_sending .store(true, std::sync::atomic::Ordering::SeqCst); - // collect results from cached pending txs - let flush_finished: bool = tokio::select! { - _ = tokio::signal::ctrl_c() => { - warn!("CTRL-C received, stopping result collection..."); - for msg_handle in scenario.msg_handles.values() { - let _ = msg_handle.stop().await; - } - cancel_token.cancel(); - self.context().done_fcu.store(true, std::sync::atomic::Ordering::SeqCst); - false - }, - _ = scenario.flush_tx_cache(start_block, run_id) => { - true - } - }; - if !flush_finished { - warn!("Result collection terminated. Some pending txs may not have been saved to the database."); - } else { - self.context() - .done_fcu - .store(true, std::sync::atomic::Ordering::SeqCst); - } - fcu_handle.await.map_err(CallbackError::Join)??; // clear out unconfirmed txs from the cache let dump_finished: bool = tokio::select! { - _ = tokio::signal::ctrl_c() => { + _ = scenario.ctx.cancel_token.cancelled() => { warn!("CTRL-C received, stopping tx cache dump..."); - cancel_token.cancel(); + scenario.tx_actor().stop().await?; false }, _ = scenario.dump_tx_cache(run_id) => { diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 91a268bc..ea8dabf8 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -11,7 +11,10 @@ use crate::{ Result, }; +#[derive(Debug)] pub enum TxActorMessage { + InitCtx(ActorContext), + GetCacheLen(oneshot::Sender), SentRunTx { tx_hash: TxHash, start_timestamp_ms: u128, @@ -23,11 +26,6 @@ pub enum TxActorMessage { tx_hash: TxHash, on_remove: oneshot::Sender<()>, }, - FlushCache { - run_id: u64, - on_flush: oneshot::Sender>, // returns the number of txs remaining in cache - target_block_num: u64, - }, DumpCache { run_id: u64, on_dump_cache: oneshot::Sender>, @@ -45,6 +43,8 @@ where db: Arc, cache: Vec, rpc: Arc, + ctx: Option, + status: ActorStatus, } #[derive(Debug, Clone, PartialEq)] @@ -71,6 +71,28 @@ impl PendingRunTx { } } +#[derive(Clone, Debug)] +pub struct ActorContext { + pub run_id: u64, + pub target_block: u64, +} + +impl ActorContext { + pub fn new(target_block: u64, run_id: u64) -> Self { + Self { + run_id, + target_block, + } + } +} + +#[derive(Clone, Debug, PartialEq, Default)] +pub enum ActorStatus { + ShuttingDown, + #[default] + Running, +} + impl TxActor where D: DbOps + Send + Sync + 'static, @@ -85,20 +107,31 @@ where db, cache: Vec::new(), rpc, + ctx: None, + status: ActorStatus::default(), } } + pub fn update_ctx_target_block(&mut self, target_block_num: u64) -> Result<()> { + if let Some(ctx) = self.ctx.as_mut() { + ctx.target_block = target_block_num; + } else { + return Err(CallbackError::UpdateRequiresCtx.into()); + } + + Ok(()) + } + /// Waits for target block to appear onchain, /// gets block receipts for the target block, /// removes txs that were included in the block from cache, and saves them to the DB. async fn flush_cache( - cache: &mut Vec, - db: &Arc, - rpc: &Arc, + &mut self, run_id: u64, on_flush: oneshot::Sender>, // returns the number of txs remaining in cache target_block_num: u64, ) -> Result<()> { + let Self { cache, rpc, db, .. } = self; info!("unconfirmed txs: {}", cache.len()); if cache.is_empty() { @@ -191,12 +224,9 @@ where } /// Dumps all cached txs into the DB. Does not assign `end_timestamp`, `block_number`, or `gas_used`. - async fn dump_cache( - cache: &mut Vec, - db: &Arc, - run_id: u64, - ) -> Result> { - let run_txs = cache + async fn dump_cache(&mut self, run_id: u64) -> Result> { + let run_txs = self + .cache .iter() .map(|pending_tx| RunTx { tx_hash: pending_tx.tx_hash, @@ -208,30 +238,37 @@ where error: pending_tx.error.to_owned(), }) .collect::>(); - db.insert_run_txs(run_id, &run_txs).map_err(|e| e.into())?; - cache.clear(); + self.db + .insert_run_txs(run_id, &run_txs) + .map_err(|e| e.into())?; + self.cache.clear(); Ok(run_txs) } - async fn remove_cached_tx(cache: &mut Vec, old_tx_hash: TxHash) -> Result<()> { - let old_tx = cache + async fn remove_cached_tx(&mut self, old_tx_hash: TxHash) -> Result<()> { + let old_tx = self + .cache .iter() .position(|tx| tx.tx_hash == old_tx_hash) .ok_or(CallbackError::CacheRemoveTx(old_tx_hash))?; - cache.remove(old_tx); + self.cache.remove(old_tx); Ok(()) } - async fn handle_message( - cache: &mut Vec, - db: &Arc, - rpc: &Arc, - message: TxActorMessage, - ) -> Result<()> { + /// Parse message and execute appropriate methods. + async fn handle_message(&mut self, message: TxActorMessage) -> Result<()> { match message { + TxActorMessage::GetCacheLen(on_len) => { + on_len + .send(self.cache.len()) + .map_err(|e| CallbackError::OneshotSend(format!("GetCacheLen: {:?}", e)))?; + } + TxActorMessage::InitCtx(ctx) => { + self.ctx = Some(ctx); + } TxActorMessage::Stop { on_stop } => { + self.status = ActorStatus::ShuttingDown; on_stop.send(()).map_err(|_| CallbackError::Stop)?; - return Ok(()); } TxActorMessage::SentRunTx { tx_hash, @@ -246,83 +283,70 @@ where kind, error, }; - cache.push(run_tx.to_owned()); - on_receive.send(()).map_err(CallbackError::OneshotSend)?; + self.cache.push(run_tx.to_owned()); + on_receive + .send(()) + .map_err(|e| CallbackError::OneshotSend(format!("SentRunTx: {:?}", e)))?; } TxActorMessage::RemovedRunTx { tx_hash, on_remove } => { - Self::remove_cached_tx(cache, tx_hash).await?; - on_remove.send(()).map_err(CallbackError::OneshotSend)?; - } - TxActorMessage::FlushCache { - on_flush, - run_id, - target_block_num, - } => { - Self::flush_cache(cache, db, rpc, run_id, on_flush, target_block_num).await?; + self.remove_cached_tx(tx_hash).await?; + on_remove + .send(()) + .map_err(|e| CallbackError::OneshotSend(format!("RemovedRunTx: {:?}", e)))?; } TxActorMessage::DumpCache { on_dump_cache, run_id, } => { - let res = Self::dump_cache(cache, db, run_id).await?; + let res = self.dump_cache(run_id).await?; on_dump_cache.send(res).map_err(CallbackError::DumpCache)?; } } + Ok(()) } + /// Receive & handle messages. pub async fn run(&mut self) -> Result<()> { - while let Some(msg) = self.receiver.recv().await { - match &msg { - TxActorMessage::DumpCache { - on_dump_cache: _, - run_id: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::FlushCache { - run_id: _, - on_flush: _, - target_block_num: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::SentRunTx { - tx_hash: _, - start_timestamp_ms: _, - kind: _, - on_receive: _, - error: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::RemovedRunTx { - tx_hash: _, - on_remove: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; + let mut interval = + tokio::time::interval(/* self.cfg.poll_interval */ Duration::from_secs(1)); + let provider = self.rpc.clone(); + + loop { + if self.is_shutting_down() { + break; + } + + tokio::select! { + // periodically flush cache in background while spammer runs + _ = interval.tick() => { + if let Some(ctx) = self.ctx.to_owned() { + let new_block = provider.get_block_number().await?; + for bn in ctx.target_block..new_block { + let (on_flush, _receiver) = oneshot::channel(); + self.flush_cache(ctx.run_id, on_flush, bn).await?; + } + self.update_ctx_target_block(new_block)?; + } else { + debug!("TxActor context not initialized."); + } } - TxActorMessage::Stop { on_stop: _ } => { - // do nothing here; stop is a signal to interrupt other message handlers + + // handle messages (sent by test_scenario) + msg = self.receiver.recv() => { + if let Some(msg) = msg { + self.handle_message(msg).await?; + } } } } + Ok(()) } + + pub fn is_shutting_down(&self) -> bool { + self.status == ActorStatus::ShuttingDown + } } #[derive(Debug)] @@ -347,7 +371,8 @@ impl TxActorHandle { let (sender, receiver) = mpsc::channel(bufsize); let mut actor = TxActor::new(receiver, db, rpc); tokio::task::spawn(async move { - actor.run().await.expect("tx actor massively failed"); + actor.run().await?; + Ok::<_, crate::Error>(()) }); Self { sender } } @@ -389,23 +414,15 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } - /// Removes txs included onchain from the cache, saves them to the DB, and returns the number of txs remaining in the cache. - pub async fn flush_cache( - &self, - run_id: u64, - target_block_num: u64, - ) -> Result> { + pub async fn done_flushing(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender - .send(TxActorMessage::FlushCache { - run_id, - on_flush: sender, - target_block_num, - }) + .send(TxActorMessage::GetCacheLen(sender)) .await .map_err(Box::new) .map_err(CallbackError::from)?; - Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) + let cache_len = receiver.await.map_err(CallbackError::from)?; + Ok(cache_len == 0) } /// Removes an existing tx in the cache. @@ -423,7 +440,7 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } - /// Stops the actor, terminating any pending tasks. + /// Stops the actor, terminating all pending tasks. pub async fn stop(&self) -> Result<()> { let (sender, receiver) = oneshot::channel(); self.sender @@ -433,4 +450,13 @@ impl TxActorHandle { .map_err(CallbackError::from)?; Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } + + pub async fn init_ctx(&self, ctx: ActorContext) -> Result<()> { + self.sender + .send(TxActorMessage::InitCtx(ctx)) + .await + .map_err(Box::new) + .map_err(CallbackError::from)?; + Ok(()) + } } diff --git a/crates/core/src/spammer/tx_callback.rs b/crates/core/src/spammer/tx_callback.rs index 0b413432..119e0dbc 100644 --- a/crates/core/src/spammer/tx_callback.rs +++ b/crates/core/src/spammer/tx_callback.rs @@ -93,6 +93,7 @@ impl SpamCallback for T {} #[derive(Clone)] pub struct NilCallback; +#[derive(Clone)] pub struct LogCallback { pub rpc_provider: Arc, pub auth_provider: Option>, diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index 093094d7..957dec80 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -45,7 +45,7 @@ use std::{ pin::Pin, str::FromStr, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use tokio::sync::OnceCell; use tokio_util::sync::CancellationToken; @@ -166,10 +166,6 @@ impl ExecutionContext { pub fn add_to_gas_price(&mut self, amount: i128) { self.gas_price_adder += amount; } - - pub fn cancel_run(&self) { - self.cancel_token.cancel(); - } } struct DeployContractParams<'a, D: DbOps> { @@ -282,7 +278,7 @@ where let cancel_token = CancellationToken::new(); // default msg_handle to handle txs sent on rpc_url - let msg_handle = Arc::new(TxActorHandle::new(120, db.clone(), rpc_client.clone())); + let msg_handle = Arc::new(TxActorHandle::new(12000, db.clone(), rpc_client.clone())); let mut msg_handles = HashMap::new(); msg_handles.insert("default".to_owned(), msg_handle); msg_handles.extend(extra_msg_handles.unwrap_or_default()); @@ -332,6 +328,12 @@ where .await } + pub fn tx_actor(&self) -> &TxActorHandle { + self.msg_handles + .get("default") + .expect("default msg_handle uninitialized") + } + // Polls anvil to ensure its initialized and ready to accept RPC requests async fn wait_for_anvil_ready(endpoint_url: &Url, timeout: Duration) -> Result<()> { let start = std::time::Instant::now(); @@ -1413,67 +1415,32 @@ where .collect::<_>()) } - pub async fn flush_tx_cache(&self, block_start: u64, run_id: u64) -> Result<()> { - let mut block_counter = 0; - // the number of blocks to check for stalled txs - let block_timeout = ((self.pending_tx_timeout_secs / self.ctx.block_time_secs) + 1) - // must be at least 2 blocks because otherwise we have nothing to compare - .max(2); - let mut cache_size_queue = vec![]; - cache_size_queue.resize(block_timeout as usize, 1); - for msg_handle in self.msg_handles.values() { - loop { - let pending_txs = msg_handle - .flush_cache(run_id, block_start + block_counter as u64) - .await?; - cache_size_queue.rotate_right(1); - cache_size_queue[0] = pending_txs.len(); - - if pending_txs.is_empty() { - break; - } - - let current_timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("time went backwards") - .as_millis(); - - // remove cached txs if the size hasn't changed for the last N blocks - if cache_size_queue - .iter() - .all(|&size| size == cache_size_queue[0]) - { - warn!( - "Cache size has not changed for the last {block_timeout} blocks. Removing stalled txs...", - ); - - for tx in &pending_txs { - // only remove txs that have been waiting for > T seconds - if current_timestamp - > tx.start_timestamp_ms + (self.pending_tx_timeout_secs as u128 * 1000) - { - msg_handle.remove_cached_tx(tx.tx_hash).await?; - } - } - } - - block_counter += 1; - } - } - - Ok(()) - } - + /// Wait for `self.pending_tx_timeout_secs` for pending txs to confirm, then delete any remaining cache items. pub async fn dump_tx_cache(&self, run_id: u64) -> Result<()> { debug!("dumping tx cache..."); + let start_time = Instant::now(); for msg_handle in self.msg_handles.values() { - let failed_txs = msg_handle.dump_cache(run_id).await?; - if !failed_txs.is_empty() { - warn!( - "Failed to collect receipts for {} txs. Any valid txs sent may still land.", - failed_txs.len() - ); + tokio::select! { + _ = self.ctx.cancel_token.cancelled() => { + println!("dump_tx_cache cancelled"); + } + _ = async { + while !self.tx_actor().done_flushing().await? { + if start_time.elapsed().as_secs() > self.pending_tx_timeout_secs { + warn!("timed out waiting for pending transactions"); + break; + } + } + let failed_txs = msg_handle.dump_cache(run_id).await?; + if !failed_txs.is_empty() { + warn!( + "Failed to collect receipts for {} txs. Any valid txs sent may still land.", + failed_txs.len() + ); + } + Ok::<_, Error>(()) + } => {} } } @@ -1552,6 +1519,10 @@ where } Ok(key) } + + pub async fn shutdown(&mut self) { + self.ctx.cancel_token.cancel(); + } } async fn handle_tx_outcome<'a, F: SpamCallback + 'static>( @@ -1603,7 +1574,8 @@ async fn handle_tx_outcome<'a, F: SpamCallback + 'static>( } else { // success path if let Err(e) = ctx.success_sender.send(()).await { - warn!( + // this error can safely be ignored; it just means the receiver was closed (e.g. by CTRL-C) + debug!( "failed to send success notification for tx {}: {:?}", tx_hash, e ); diff --git a/docs/cli.md b/docs/cli.md index 81488a73..6efcb0d8 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -28,7 +28,7 @@ contender --help - `--tps ` txs/second (drives agent count) - `--tpb ` txs/block - `-d, --duration ` batches to send before receipt collection -- `-l, --loops [N]` run indefinitely or N times +- `--forever` run spammer indefinitely - `--report` auto-generate a report after spam - `-e KEY=VALUE` override/insert `[env]` values in a scenario config @@ -71,7 +71,7 @@ contender spam --help - **stress** — a composite scenario mixing several patterns. > Tip: All built‑ins accept the same timing/loop flags as file‑based scenarios: -> `--tps` (per‑second), `--tpb` (per‑block), `-l/--loops`, `-d/--duration`, and env overrides via `-e KEY=VALUE`. +> `--tps` (per‑second), `--tpb` (per‑block), `--forever`, `-d/--duration`, and env overrides via `-e KEY=VALUE`. ### Usage examples @@ -80,7 +80,7 @@ contender spam --help contender spam fill-block -r $RPC_URL --tps 200 -d 5 # Storage write pressure -contender spam storage -r $RPC_URL --tpb 300 -l 50 +contender spam storage -r $RPC_URL --tpb 300 --forever # Targeted opcode/precompile stress contender spam eth-functions -r $RPC_URL --tps 100