From 6f8d0268d854cfab8ab9f1753d60c0f16615b809 Mon Sep 17 00:00:00 2001 From: Anurag chavan <118217089+anuragchvn-blip@users.noreply.github.com> Date: Fri, 19 Dec 2025 16:46:05 +0530 Subject: [PATCH] refactor: async pending tx cache processing with auto-flush - TxActor now automatically flushes pending tx cache in background - Configurable flush interval via --cache-flush-interval flag (default: 5 blocks) - Spamming continues uninterrupted during receipt collection - Improved error handling with automatic retry and recovery detection - Replace --loops flag with --indefinite for clearer semantics Closes #395 --- CHANGELOG.md | 29 ++ crates/cli/src/commands/campaign.rs | 3 +- crates/cli/src/commands/common.rs | 15 +- crates/cli/src/commands/spam.rs | 10 +- crates/cli/src/commands/spamd.rs | 2 +- crates/cli/src/main.rs | 12 +- crates/core/src/spammer/spammer_trait.rs | 32 +++ crates/core/src/spammer/tx_actor.rs | 347 ++++++++++++++++++++--- crates/core/src/test_scenario.rs | 3 + docs/cli.md | 4 +- 10 files changed, 392 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c0aa209..a45ce331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- +## [Unreleased] + +### Changed + +- **BREAKING**: Replaced `--loops` flag with `--indefinite` flag for spam command ([#395](https://github.com/flashbots/contender/issues/395)) + - Use `--indefinite` to run spam continuously until manually stopped + - Default behavior (without `--indefinite`) runs spam once + - Previous `--loops N` functionality replaced with running spamd N times via script + - Migration: If you used `--loops` without a value for infinite loops, use `--indefinite`. If you used `--loops N` for a specific count, run the command N times or use a wrapper script. + +### Added + +- **Auto-flush configuration**: New `--cache-flush-interval` flag to control how often (in blocks) the pending transaction cache is flushed to the database ([#395](https://github.com/flashbots/contender/issues/395)) + - Default: 5 blocks + - Lower values reduce memory usage but increase DB writes + - Higher values batch more efficiently but use more memory + +### Improved + +- **Spammer now processes receipts in background**: TxActor automatically flushes pending transaction cache in the background while spamming continues ([#395](https://github.com/flashbots/contender/issues/395)) + - Spamming no longer pauses to collect receipts + - Cache is automatically flushed at configurable intervals (default: every 5 blocks) + - More realistic RPC traffic patterns + - Better performance for long-running spam operations + - Improved error handling with automatic retry on transient failures + - Intelligent logging to avoid spam during extended RPC issues + +--- + ## [0.6.0](https://github.com/flashbots/contender/releases/tag/v0.6.0) - 2025-11-25 Features: diff --git a/crates/cli/src/commands/campaign.rs b/crates/cli/src/commands/campaign.rs index f097a015..796d4813 100644 --- a/crates/cli/src/commands/campaign.rs +++ b/crates/cli/src/commands/campaign.rs @@ -308,7 +308,8 @@ fn create_spam_cli_args( }, duration: spam_duration, pending_timeout: args.pending_timeout, - loops: Some(Some(1)), + indefinite: false, + cache_flush_interval: 5, // Use default for campaigns accounts_per_agent: args.accounts_per_agent, }, ignore_receipts: args.ignore_receipts, diff --git a/crates/cli/src/commands/common.rs b/crates/cli/src/commands/common.rs index 7aeebad4..8e09933e 100644 --- a/crates/cli/src/commands/common.rs +++ b/crates/cli/src/commands/common.rs @@ -294,13 +294,18 @@ Requires --priv-key to be set for each 'from' address in the given testfile.", /// If set without a value, the spam run will be repeated indefinitely. /// If not set, the spam run will be executed once. #[arg( - short, long, - num_args = 0..=1, - long_help = "The number of times to repeat the spam run. If set with a value, the spam run will be repeated this many times. If set without a value, the spam run will be repeated indefinitely. If not set, the spam run will be repeated once." + long_help = "Run spam indefinitely until manually stopped. If not set, the spam run will be executed once." )] - pub loops: Option>, - + pub indefinite: bool, + /// How often (in blocks) to flush the pending transaction cache to the database. + /// Lower values reduce memory usage but increase DB writes. Higher values batch more efficiently. + #[arg( + long, + default_value_t = 5, + long_help = "Number of blocks between automatic cache flushes. Controls memory usage vs DB write frequency." + )] + pub cache_flush_interval: u64, /// The number of accounts to generate for each agent (`from_pool` in scenario files) #[arg( short, diff --git a/crates/cli/src/commands/spam.rs b/crates/cli/src/commands/spam.rs index adbf341b..f815f82e 100644 --- a/crates/cli/src/commands/spam.rs +++ b/crates/cli/src/commands/spam.rs @@ -205,7 +205,8 @@ impl SpamCommandArgs { txs_per_block, duration, pending_timeout, - loops, + indefinite, + cache_flush_interval, accounts_per_agent, } = self.spam_args.spam_args.clone(); let SendTxsCliArgsInner { @@ -478,7 +479,7 @@ impl SpamCommandArgs { } done_fcu.store(true, std::sync::atomic::Ordering::SeqCst); - if loops.is_some_and(|inner_loops| inner_loops.is_none()) { + if indefinite { warn!("Spammer agents will eventually run out of funds."); println!( "Make sure you add plenty of funds with {} (set your pre-funded account with {}).", @@ -487,7 +488,7 @@ impl SpamCommandArgs { ); } - let total_cost = U256::from(duration * loops.flatten().unwrap_or(1)) + let total_cost = U256::from(duration) * test_scenario.get_max_spam_cost(&user_signers).await?; if min_balance < U256::from(total_cost) { return Err(ArgsError::MinBalanceInsufficient { @@ -497,6 +498,9 @@ impl SpamCommandArgs { .into()); } + // Set the cache flush interval from CLI args + test_scenario.cache_flush_interval_blocks = cache_flush_interval; + Ok(test_scenario) } diff --git a/crates/cli/src/commands/spamd.rs b/crates/cli/src/commands/spamd.rs index 6c44136b..c8bf188a 100644 --- a/crates/cli/src/commands/spamd.rs +++ b/crates/cli/src/commands/spamd.rs @@ -16,7 +16,7 @@ use tracing::{error, info, warn}; /// Runs spam in a loop, potentially executing multiple spam runs. /// -/// If `limit_loops` is `None`, it will run indefinitely. +/// If `limit_loops` is `None`, it will run indefinitely (--indefinite flag). /// /// If `limit_loops` is `Some(n)`, it will run `n` times. /// diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b33f7b9d..c5aa4bb0 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -94,7 +94,7 @@ async fn run() -> Result<(), CliError> { .. } = *args.to_owned(); - let SendSpamCliArgs { loops, .. } = spam_args.to_owned(); + let SendSpamCliArgs { indefinite, .. } = spam_args.to_owned(); let client = ClientBuilder::default() .http(Url::from_str(&rpc_args.rpc_url).map_err(ArgsError::UrlParse)?); @@ -119,15 +119,15 @@ async fn run() -> Result<(), CliError> { ) }; - let real_loops = if let Some(loops) = loops { - // loops flag is set; spamd will interpret a None value as infinite - loops + let limit_loops = if indefinite { + // indefinite flag is set; spamd will interpret None as infinite + None } else { - // loops flag is not set, so only loop once + // indefinite flag is not set, so only loop once Some(1) }; let spamd_args = SpamCommandArgs::new(scenario, *args)?; - commands::spamd(&db, spamd_args, gen_report, real_loops).await?; + commands::spamd(&db, spamd_args, gen_report, limit_loops).await?; } ContenderSubcommand::Replay { args } => { diff --git a/crates/core/src/spammer/spammer_trait.rs b/crates/core/src/spammer/spammer_trait.rs index 4503293d..f32de480 100644 --- a/crates/core/src/spammer/spammer_trait.rs +++ b/crates/core/src/spammer/spammer_trait.rs @@ -117,6 +117,22 @@ where scenario.sync_nonces().await?; } + // ═══════════════════════════════════════════════════════════════ + // START AUTO-FLUSH: Begin background receipt processing + // ═══════════════════════════════════════════════════════════════ + // From this point forward, pending transactions will be automatically + // flushed to the database every N blocks (configurable via --cache-flush-interval). + // This allows spamming to continue uninterrupted while receipts are collected. + // + // The auto-flush process: + // 1. TxActor monitors block numbers via periodic checks + // 2. When interval blocks have passed, it fetches receipts for those blocks + // 3. Confirmed transactions are saved to DB, unconfirmed remain in cache + // 4. Process repeats automatically until auto-flush is stopped + for msg_handle in scenario.msg_handles.values() { + msg_handle.start_auto_flush(run_id, scenario.cache_flush_interval_blocks).await?; + } + // calling cancel() on cancel_token should stop all running tasks // (as long as each task checks for it) let cancel_token = self.context().do_quit.clone(); @@ -143,6 +159,22 @@ where .done_sending .store(true, std::sync::atomic::Ordering::SeqCst); + // ═══════════════════════════════════════════════════════════════ + // STOP AUTO-FLUSH: Transition to final manual flush + // ═══════════════════════════════════════════════════════════════ + // Now that spamming is complete, we stop the background auto-flush + // and do a final comprehensive manual flush to ensure all remaining + // transactions are processed. + // + // Why stop auto-flush before final flush: + // 1. Prevents interference between auto-flush and manual flush + // 2. Manual flush has more sophisticated timeout/stall detection + // 3. Ensures we wait for all blocks to be available + // 4. Cleaner separation of concerns + for msg_handle in scenario.msg_handles.values() { + let _ = msg_handle.stop_auto_flush().await; + } + // collect results from cached pending txs let flush_finished: bool = tokio::select! { _ = tokio::signal::ctrl_c() => { diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 91a268bc..9a5d4936 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -32,19 +32,55 @@ pub enum TxActorMessage { run_id: u64, on_dump_cache: oneshot::Sender>, }, + StartAutoFlush { + run_id: u64, + flush_interval_blocks: u64, + on_start: oneshot::Sender<()>, + }, + StopAutoFlush { + on_stop_auto: oneshot::Sender<()>, + }, Stop { on_stop: oneshot::Sender<()>, }, } +/// Actor responsible for managing pending transaction cache and processing receipts. +/// +/// The TxActor runs in a background task and handles: +/// - Caching transaction hashes as they're sent +/// - Automatically flushing confirmed transactions to the database at regular intervals +/// - Manual flush operations on demand +/// - Graceful shutdown +/// +/// Auto-flush mechanism: +/// - Monitors the blockchain via periodic block number checks +/// - When enough blocks have passed (configurable interval), processes pending transactions +/// - Fetches receipts for transactions in those blocks +/// - Saves confirmed transactions to the database +/// - Retains unconfirmed transactions in cache for next flush +/// +/// This design allows spamming to continue uninterrupted while receipt processing +/// happens in the background, improving performance and creating more realistic traffic patterns. struct TxActor where D: DbOps, { receiver: mpsc::Receiver, db: Arc, + /// In-memory cache of pending transactions awaiting confirmation cache: Vec, rpc: Arc, + /// Whether auto-flush is currently enabled + auto_flush_enabled: bool, + /// The run_id to associate with auto-flushed transactions + auto_flush_run_id: Option, + /// Number of blocks between auto-flush operations + auto_flush_interval_blocks: u64, + /// Last block number that was successfully flushed + last_flushed_block: u64, + /// Track consecutive auto-flush failures to prevent log spam and detect persistent issues + consecutive_flush_failures: u32, } #[derive(Debug, Clone, PartialEq)] @@ -85,6 +121,11 @@ where db, cache: Vec::new(), rpc, + auto_flush_enabled: false, + auto_flush_run_id: None, + auto_flush_interval_blocks: 10, // default interval + last_flushed_block: 0, + consecutive_flush_failures: 0, } } @@ -227,6 +268,9 @@ where db: &Arc, rpc: &Arc, message: TxActorMessage, + auto_flush_enabled: &mut bool, + auto_flush_run_id: &mut Option, + auto_flush_interval_blocks: &mut u64, ) -> Result<()> { match message { TxActorMessage::Stop { on_stop } => { @@ -267,58 +311,155 @@ where let res = Self::dump_cache(cache, db, run_id).await?; on_dump_cache.send(res).map_err(CallbackError::DumpCache)?; } + TxActorMessage::StartAutoFlush { + run_id, + flush_interval_blocks, + on_start, + } => { + *auto_flush_enabled = true; + *auto_flush_run_id = Some(run_id); + *auto_flush_interval_blocks = flush_interval_blocks; + info!("Auto-flush enabled with interval: {} blocks", flush_interval_blocks); + on_start.send(()).map_err(CallbackError::OneshotSend)?; + } + TxActorMessage::StopAutoFlush { on_stop_auto } => { + *auto_flush_enabled = false; + *auto_flush_run_id = None; + info!("Auto-flush disabled"); + on_stop_auto.send(()).map_err(CallbackError::OneshotSend)?; + } } Ok(()) } pub async fn run(&mut self) -> Result<()> { - while let Some(msg) = self.receiver.recv().await { - match &msg { - TxActorMessage::DumpCache { - on_dump_cache: _, - run_id: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::FlushCache { - run_id: _, - on_flush: _, - target_block_num: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::SentRunTx { - tx_hash: _, - start_timestamp_ms: _, - kind: _, - on_receive: _, - error: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::RemovedRunTx { - tx_hash: _, - on_remove: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::Stop { on_stop: _ } => { - // do nothing here; stop is a signal to interrupt other message handlers - } + let mut block_check_interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + tokio::select! { + Some(msg) = self.receiver.recv() => { + match &msg { + TxActorMessage::DumpCache { + on_dump_cache: _, + run_id: _, + } => { + tokio::select! { + _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, + msg, &mut self.auto_flush_enabled, &mut self.auto_flush_run_id, + &mut self.auto_flush_interval_blocks + ) => {}, + Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { + // exits early if a stop message is received + break; + }, + }; + } + TxActorMessage::FlushCache { + run_id: _, + on_flush: _, + target_block_num: _, + } => { + tokio::select! { + _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, + msg, &mut self.auto_flush_enabled, &mut self.auto_flush_run_id, + &mut self.auto_flush_interval_blocks + ) => {}, + Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { + // exits early if a stop message is received + break; + }, + }; + } + TxActorMessage::SentRunTx { + tx_hash: _, + start_timestamp_ms: _, + kind: _, + on_receive: _, + error: _, + } => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg, + &mut self.auto_flush_enabled, &mut self.auto_flush_run_id, + &mut self.auto_flush_interval_blocks).await?; + } + TxActorMessage::RemovedRunTx { + tx_hash: _, + on_remove: _, + } => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg, + &mut self.auto_flush_enabled, &mut self.auto_flush_run_id, + &mut self.auto_flush_interval_blocks).await?; + } + TxActorMessage::StartAutoFlush { .. } | TxActorMessage::StopAutoFlush { .. } => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg, + &mut self.auto_flush_enabled, &mut self.auto_flush_run_id, + &mut self.auto_flush_interval_blocks).await?; + } + TxActorMessage::Stop { on_stop: _ } => { + // do nothing here; stop is a signal to interrupt other message handlers + break; + } + } + }, + _ = block_check_interval.tick() => { + if self.auto_flush_enabled && !self.cache.is_empty() { + if let Some(run_id) = self.auto_flush_run_id { + // Check current block number + match self.rpc.get_block_number().await { + Ok(current_block) => { + // Initialize last_flushed_block if this is the first check + if self.last_flushed_block == 0 { + self.last_flushed_block = current_block.saturating_sub(1); + } + + // Flush if enough blocks have passed + if current_block >= self.last_flushed_block + self.auto_flush_interval_blocks { + let target_block = self.last_flushed_block + 1; + + // Only log periodically to avoid spam during failures + if self.consecutive_flush_failures == 0 { + debug!("Auto-flushing cache for block {} (cache size: {})", target_block, self.cache.len()); + } + + // Create a dummy oneshot channel since we don't need the response + let (tx, _rx) = oneshot::channel(); + match Self::flush_cache( + &mut self.cache, + &self.db, + &self.rpc, + run_id, + tx, + target_block, + ).await { + Ok(_) => { + self.last_flushed_block = target_block; + // Reset failure counter on success + if self.consecutive_flush_failures > 0 { + info!("Auto-flush recovered after {} failures", self.consecutive_flush_failures); + self.consecutive_flush_failures = 0; + } + }, + Err(e) => { + self.consecutive_flush_failures += 1; + // Log every 10 failures to avoid spam + if self.consecutive_flush_failures == 1 || self.consecutive_flush_failures % 10 == 0 { + warn!( + "Auto-flush failed (attempt {}): {:?}. Cache size: {}. Will retry.", + self.consecutive_flush_failures, e, self.cache.len() + ); + } + } + } + } + }, + Err(e) => { + if self.consecutive_flush_failures == 0 { + warn!("Failed to get block number for auto-flush: {:?}. Will retry.", e); + } + } + } + } + } + }, } } Ok(()) @@ -423,6 +564,65 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } + /// Starts automatic cache flushing in the background. + /// + /// Once started, the TxActor will automatically flush pending transactions + /// every `flush_interval_blocks` blocks. This allows spam operations to continue + /// uninterrupted while receipts are collected and saved to the database. + /// + /// # Arguments + /// * `run_id` - The database run ID to associate with flushed transactions + /// * `flush_interval_blocks` - Number of blocks between flush operations + /// + /// # Returns + /// Ok(()) if auto-flush was successfully started + /// + /// # Notes + /// - Auto-flush will continue until explicitly stopped via `stop_auto_flush()` + /// - Failures are automatically retried on the next interval + /// - Multiple calls to start_auto_flush will update the configuration + pub async fn start_auto_flush( + &self, + run_id: u64, + flush_interval_blocks: u64, + ) -> Result<()> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(TxActorMessage::StartAutoFlush { + run_id, + flush_interval_blocks, + on_start: sender, + }) + .await + .map_err(Box::new) + .map_err(CallbackError::from)?; + Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) + } + + /// Stops automatic cache flushing. + /// + /// After calling this, the TxActor will no longer automatically flush + /// pending transactions. Any remaining transactions in the cache can be + /// flushed manually via `flush_cache()` or dumped via `dump_cache()`. + /// + /// # Returns + /// Ok(()) if auto-flush was successfully stopped + /// + /// # Notes + /// - Typically called before final manual flush at end of spam run + /// - Does not affect transactions already in the cache + pub async fn stop_auto_flush(&self) -> Result<()> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(TxActorMessage::StopAutoFlush { + on_stop_auto: sender, + }) + .await + .map_err(Box::new) + .map_err(CallbackError::from)?; + Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) + } + /// Stops the actor, terminating any pending tasks. pub async fn stop(&self) -> Result<()> { let (sender, receiver) = oneshot::channel(); @@ -434,3 +634,56 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pending_run_tx_creation() { + let tx_hash = TxHash::default(); + let timestamp = 1234567890u128; + let kind = Some("test"); + let error = Some("error message"); + + let pending_tx = PendingRunTx::new(tx_hash, timestamp, kind, error); + + assert_eq!(pending_tx.tx_hash, tx_hash); + assert_eq!(pending_tx.start_timestamp_ms, timestamp); + assert_eq!(pending_tx.kind, Some("test".to_string())); + assert_eq!(pending_tx.error, Some("error message".to_string())); + } + + #[test] + fn test_pending_run_tx_no_kind_or_error() { + let tx_hash = TxHash::default(); + let timestamp = 1234567890u128; + + let pending_tx = PendingRunTx::new(tx_hash, timestamp, None, None); + + assert_eq!(pending_tx.tx_hash, tx_hash); + assert_eq!(pending_tx.start_timestamp_ms, timestamp); + assert_eq!(pending_tx.kind, None); + assert_eq!(pending_tx.error, None); + } + + #[test] + fn test_cache_tx_creation() { + let tx_hash = TxHash::default(); + let timestamp = 9876543210u128; + let kind = Some("transfer".to_string()); + let error = None; + + let cache_tx = CacheTx { + tx_hash, + start_timestamp_ms: timestamp, + kind: kind.clone(), + error: error.clone(), + }; + + assert_eq!(cache_tx.tx_hash, tx_hash); + assert_eq!(cache_tx.start_timestamp_ms, timestamp); + assert_eq!(cache_tx.kind, kind); + assert_eq!(cache_tx.error, error); + } +} diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index 093094d7..c22e20c9 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -122,6 +122,8 @@ where /// Max num of eth_sendRawTransaction calls per json-rpc batch; 0 disables batching. pub rpc_batch_size: u64, pub num_rpc_batches_sent: u64, + /// Number of blocks between automatic cache flushes during spam runs (default: 5) + pub cache_flush_interval_blocks: u64, } pub struct TestScenarioParams { @@ -319,6 +321,7 @@ where should_sync_nonces: sync_nonces_after_batch, rpc_batch_size, num_rpc_batches_sent: 0, + cache_flush_interval_blocks: 5, }) } diff --git a/docs/cli.md b/docs/cli.md index 81488a73..a0903706 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -28,7 +28,7 @@ contender --help - `--tps ` txs/second (drives agent count) - `--tpb ` txs/block - `-d, --duration ` batches to send before receipt collection -- `-l, --loops [N]` run indefinitely or N times +- `--indefinite` run spam indefinitely until manually stopped - `--report` auto-generate a report after spam - `-e KEY=VALUE` override/insert `[env]` values in a scenario config @@ -71,7 +71,7 @@ contender spam --help - **stress** — a composite scenario mixing several patterns. > Tip: All built‑ins accept the same timing/loop flags as file‑based scenarios: -> `--tps` (per‑second), `--tpb` (per‑block), `-l/--loops`, `-d/--duration`, and env overrides via `-e KEY=VALUE`. +> `--tps` (per‑second), `--tpb` (per‑block), `--indefinite`, `-d/--duration`, and env overrides via `-e KEY=VALUE`. ### Usage examples