From 340342c184567d7326fce80e6332928cede8cce4 Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:17:52 -0800 Subject: [PATCH 1/8] (WIP) implement async cache processing - TxActor needs to be cleaned up; `handle_message` is pretty silly --- crates/core/src/spammer/spammer_trait.rs | 35 +---- crates/core/src/spammer/tx_actor.rs | 180 +++++++++++++++++------ crates/core/src/test_scenario.rs | 17 ++- 3 files changed, 159 insertions(+), 73 deletions(-) diff --git a/crates/core/src/spammer/spammer_trait.rs b/crates/core/src/spammer/spammer_trait.rs index 4503293d..e8574e20 100644 --- a/crates/core/src/spammer/spammer_trait.rs +++ b/crates/core/src/spammer/spammer_trait.rs @@ -9,6 +9,7 @@ use futures::StreamExt; use tracing::{info, warn}; use crate::db::SpamDuration; +use crate::spammer::tx_actor::ActorContext; use crate::spammer::CallbackError; use crate::{ db::DbOps, @@ -77,8 +78,11 @@ where let is_fcu_done = self.context().done_fcu.clone(); let is_sending_done = self.context().done_sending.clone(); let auth_provider = scenario.auth_provider.clone(); - // run loop in background to call fcu when spamming is done + let start_block = scenario.rpc_client.get_block_number().await.map_err(|e| { + CallbackError::ProviderCall(format!("failed to get block number: {e}")) + })?; + // run loop in background to call fcu when spamming is done let fcu_handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { if let Some(auth_client) = &auth_provider { loop { @@ -108,9 +112,6 @@ where let tx_req_chunks = scenario .get_spam_tx_chunks(txs_per_period, num_periods) .await?; - let start_block = scenario.rpc_client.get_block_number().await.map_err(|e| { - CallbackError::ProviderCall(format!("failed to get block number: {e}")) - })?; let mut cursor = self.on_spam(scenario).await?.take(num_periods as usize); if scenario.should_sync_nonces { @@ -121,6 +122,9 @@ where // (as long as each task checks for it) let cancel_token = self.context().do_quit.clone(); + let actor_ctx = ActorContext::new(start_block, run_id); + scenario.tx_actor().init_ctx(actor_ctx).await?; + // run spammer within tokio::select! to allow for graceful shutdown let spam_finished: bool = tokio::select! { _ = tokio::signal::ctrl_c() => { @@ -143,29 +147,6 @@ where .done_sending .store(true, std::sync::atomic::Ordering::SeqCst); - // collect results from cached pending txs - let flush_finished: bool = tokio::select! { - _ = tokio::signal::ctrl_c() => { - warn!("CTRL-C received, stopping result collection..."); - for msg_handle in scenario.msg_handles.values() { - let _ = msg_handle.stop().await; - } - cancel_token.cancel(); - self.context().done_fcu.store(true, std::sync::atomic::Ordering::SeqCst); - false - }, - _ = scenario.flush_tx_cache(start_block, run_id) => { - true - } - }; - if !flush_finished { - warn!("Result collection terminated. Some pending txs may not have been saved to the database."); - } else { - self.context() - .done_fcu - .store(true, std::sync::atomic::Ordering::SeqCst); - } - fcu_handle.await.map_err(CallbackError::Join)??; // clear out unconfirmed txs from the cache diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 91a268bc..e00195d6 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -12,6 +12,8 @@ use crate::{ }; pub enum TxActorMessage { + InitCtx(ActorContext), + GetCacheLen(oneshot::Sender), SentRunTx { tx_hash: TxHash, start_timestamp_ms: u128, @@ -45,6 +47,7 @@ where db: Arc, cache: Vec, rpc: Arc, + ctx: Option, } #[derive(Debug, Clone, PartialEq)] @@ -71,6 +74,21 @@ impl PendingRunTx { } } +#[derive(Clone, Debug)] +pub struct ActorContext { + pub run_id: u64, + pub target_block: u64, +} + +impl ActorContext { + pub fn new(target_block: u64, run_id: u64) -> Self { + Self { + run_id, + target_block, + } + } +} + impl TxActor where D: DbOps + Send + Sync + 'static, @@ -85,6 +103,7 @@ where db, cache: Vec::new(), rpc, + ctx: None, } } @@ -229,6 +248,15 @@ where message: TxActorMessage, ) -> Result<()> { match message { + TxActorMessage::GetCacheLen(on_len) => { + on_len + .send(cache.len()) + .map_err(|_| CallbackError::OneshotSend(()))?; + } + TxActorMessage::InitCtx(_) => { + // don't do anything here; this message's behavior is defined in `Self::run` + // handle_message can probably be deleted; its contents placed in `Self::run` + } TxActorMessage::Stop { on_stop } => { on_stop.send(()).map_err(|_| CallbackError::Stop)?; return Ok(()); @@ -271,56 +299,97 @@ where Ok(()) } + /// Receive & handle messages. pub async fn run(&mut self) -> Result<()> { - while let Some(msg) = self.receiver.recv().await { - match &msg { - TxActorMessage::DumpCache { - on_dump_cache: _, - run_id: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::FlushCache { - run_id: _, - on_flush: _, - target_block_num: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - }, - }; - } - TxActorMessage::SentRunTx { - tx_hash: _, - start_timestamp_ms: _, - kind: _, - on_receive: _, - error: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::RemovedRunTx { - tx_hash: _, - on_remove: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; + let mut interval = + tokio::time::interval(/* self.cfg.poll_interval */ Duration::from_secs(1)); + let mut shutting_down = false; + let provider = self.rpc.clone(); + + loop { + tokio::select! { + // periodically flush cache in background while spammer runs + _ = interval.tick() => { + if let Some(ctx) = self.ctx.as_mut() { + let new_block = provider.get_block_number().await?; + for bn in ctx.target_block..new_block { + let (on_flush, _receiver) = oneshot::channel(); + Self::flush_cache(&mut self.cache, &self.db, &self.rpc, ctx.run_id, on_flush, bn).await?; + } + ctx.target_block = new_block; + } else { + debug!("TxActor context not initialized."); + } } - TxActorMessage::Stop { on_stop: _ } => { - // do nothing here; stop is a signal to interrupt other message handlers + + // handle messages (sent by test_scenario) + msg = self.receiver.recv() => { + if let Some(msg) = msg { + match &msg { + TxActorMessage::GetCacheLen(_) => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; + } + TxActorMessage::InitCtx(ctx) => { + self.ctx = Some(ctx.clone()); + } + TxActorMessage::DumpCache { + on_dump_cache: _, + run_id: _, + } => { + tokio::select! { + _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, + msg + ) => {}, + Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { + // exits early if a stop message is received + shutting_down = true; + }, + }; + } + TxActorMessage::FlushCache { + run_id: _, + on_flush: _, + target_block_num: _, + } => { + tokio::select! { + _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, + msg + ) => {}, + Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { + // exits early if a stop message is received + shutting_down = true; + }, + }; + } + TxActorMessage::SentRunTx { + tx_hash: _, + start_timestamp_ms: _, + kind: _, + on_receive: _, + error: _, + } => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; + } + TxActorMessage::RemovedRunTx { + tx_hash: _, + on_remove: _, + } => { + Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; + } + TxActorMessage::Stop { on_stop: _ } => { + shutting_down = true; + // do nothing here; stop is a signal to interrupt other message handlers + } + } + } } } + if shutting_down { + break; + } } + + // } Ok(()) } } @@ -408,6 +477,17 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } + pub async fn done_flushing(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(TxActorMessage::GetCacheLen(sender)) + .await + .map_err(Box::new) + .map_err(CallbackError::from)?; + let cache_len = receiver.await.map_err(CallbackError::from)?; + Ok(cache_len == 0) + } + /// Removes an existing tx in the cache. pub async fn remove_cached_tx(&self, tx_hash: TxHash) -> Result<()> { let (sender, receiver) = oneshot::channel(); @@ -433,4 +513,14 @@ impl TxActorHandle { .map_err(CallbackError::from)?; Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } + + pub async fn init_ctx(&self, ctx: ActorContext) -> Result<()> { + // let (sender, receiver) = oneshot::channel::<()>(); + self.sender + .send(TxActorMessage::InitCtx(ctx)) + .await + .map_err(Box::new) + .map_err(CallbackError::from)?; + Ok(()) + } } diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index 093094d7..c334f451 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -45,7 +45,7 @@ use std::{ pin::Pin, str::FromStr, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use tokio::sync::OnceCell; use tokio_util::sync::CancellationToken; @@ -332,6 +332,12 @@ where .await } + pub fn tx_actor(&self) -> &TxActorHandle { + self.msg_handles + .get("default") + .expect("default msg_handle uninitialized") + } + // Polls anvil to ensure its initialized and ready to accept RPC requests async fn wait_for_anvil_ready(endpoint_url: &Url, timeout: Duration) -> Result<()> { let start = std::time::Instant::now(); @@ -1464,10 +1470,19 @@ where Ok(()) } + /// Wait for `self.pending_tx_timeout_secs` for pending txs to confirm, then delete any remaining cache items. pub async fn dump_tx_cache(&self, run_id: u64) -> Result<()> { debug!("dumping tx cache..."); + let start_time = Instant::now(); for msg_handle in self.msg_handles.values() { + while !self.tx_actor().done_flushing().await? { + if start_time.elapsed().as_secs() > self.pending_tx_timeout_secs { + warn!("timed out waiting for pending transactions"); + break; + } + tokio::time::sleep(Duration::from_millis(500)).await; + } let failed_txs = msg_handle.dump_cache(run_id).await?; if !failed_txs.is_empty() { warn!( From 533487659d84f0bad738bfaa2b21b3c4923e7bfc Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 16:01:28 -0800 Subject: [PATCH 2/8] clean up TxActor::{run, handle_message} --- crates/core/src/spammer/tx_actor.rs | 123 +++++++++++----------------- 1 file changed, 47 insertions(+), 76 deletions(-) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index e00195d6..5885ccd3 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -48,6 +48,7 @@ where cache: Vec, rpc: Arc, ctx: Option, + status: ActorStatus, } #[derive(Debug, Clone, PartialEq)] @@ -89,6 +90,18 @@ impl ActorContext { } } +#[derive(Clone, Debug, PartialEq)] +pub enum ActorStatus { + ShuttingDown, + Running, +} + +impl Default for ActorStatus { + fn default() -> Self { + ActorStatus::Running + } +} + impl TxActor where D: DbOps + Send + Sync + 'static, @@ -104,6 +117,7 @@ where cache: Vec::new(), rpc, ctx: None, + status: ActorStatus::default(), } } @@ -241,25 +255,23 @@ where Ok(()) } - async fn handle_message( - cache: &mut Vec, - db: &Arc, - rpc: &Arc, - message: TxActorMessage, - ) -> Result<()> { + async fn handle_message(&mut self, message: TxActorMessage) -> Result<()> { + if self.status == ActorStatus::ShuttingDown { + return Ok(()); + } + match message { TxActorMessage::GetCacheLen(on_len) => { on_len - .send(cache.len()) + .send(self.cache.len()) .map_err(|_| CallbackError::OneshotSend(()))?; } - TxActorMessage::InitCtx(_) => { - // don't do anything here; this message's behavior is defined in `Self::run` - // handle_message can probably be deleted; its contents placed in `Self::run` + TxActorMessage::InitCtx(ctx) => { + self.ctx = Some(ctx); } TxActorMessage::Stop { on_stop } => { on_stop.send(()).map_err(|_| CallbackError::Stop)?; - return Ok(()); + self.status = ActorStatus::ShuttingDown; } TxActorMessage::SentRunTx { tx_hash, @@ -274,11 +286,11 @@ where kind, error, }; - cache.push(run_tx.to_owned()); + self.cache.push(run_tx.to_owned()); on_receive.send(()).map_err(CallbackError::OneshotSend)?; } TxActorMessage::RemovedRunTx { tx_hash, on_remove } => { - Self::remove_cached_tx(cache, tx_hash).await?; + Self::remove_cached_tx(&mut self.cache, tx_hash).await?; on_remove.send(()).map_err(CallbackError::OneshotSend)?; } TxActorMessage::FlushCache { @@ -286,16 +298,25 @@ where run_id, target_block_num, } => { - Self::flush_cache(cache, db, rpc, run_id, on_flush, target_block_num).await?; + Self::flush_cache( + &mut self.cache, + &self.db, + &self.rpc, + run_id, + on_flush, + target_block_num, + ) + .await?; } TxActorMessage::DumpCache { on_dump_cache, run_id, } => { - let res = Self::dump_cache(cache, db, run_id).await?; + let res = Self::dump_cache(&mut self.cache, &self.db, run_id).await?; on_dump_cache.send(res).map_err(CallbackError::DumpCache)?; } } + Ok(()) } @@ -303,10 +324,13 @@ where pub async fn run(&mut self) -> Result<()> { let mut interval = tokio::time::interval(/* self.cfg.poll_interval */ Duration::from_secs(1)); - let mut shutting_down = false; let provider = self.rpc.clone(); loop { + if self.is_shutting_down() { + break; + } + tokio::select! { // periodically flush cache in background while spammer runs _ = interval.tick() => { @@ -325,75 +349,22 @@ where // handle messages (sent by test_scenario) msg = self.receiver.recv() => { if let Some(msg) = msg { - match &msg { - TxActorMessage::GetCacheLen(_) => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::InitCtx(ctx) => { - self.ctx = Some(ctx.clone()); - } - TxActorMessage::DumpCache { - on_dump_cache: _, - run_id: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - shutting_down = true; - }, - }; - } - TxActorMessage::FlushCache { - run_id: _, - on_flush: _, - target_block_num: _, - } => { - tokio::select! { - _ = Self::handle_message(&mut self.cache, &self.db, &self.rpc, - msg - ) => {}, - Some(TxActorMessage::Stop{on_stop: _}) = self.receiver.recv() => { - // exits early if a stop message is received - shutting_down = true; - }, - }; - } - TxActorMessage::SentRunTx { - tx_hash: _, - start_timestamp_ms: _, - kind: _, - on_receive: _, - error: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::RemovedRunTx { - tx_hash: _, - on_remove: _, - } => { - Self::handle_message(&mut self.cache, &self.db, &self.rpc, msg).await?; - } - TxActorMessage::Stop { on_stop: _ } => { - shutting_down = true; - // do nothing here; stop is a signal to interrupt other message handlers - } - } + self.handle_message(msg).await?; } } } - if shutting_down { - break; - } } - // } Ok(()) } } +impl TxActor { + pub fn is_shutting_down(&self) -> bool { + self.status == ActorStatus::ShuttingDown + } +} + #[derive(Debug)] pub struct TxActorHandle { sender: mpsc::Sender, From d6166def7968a631f9ae5278c87942604fc3819b Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:01:20 -0800 Subject: [PATCH 3/8] cleanup TxActor & related code - removed flush_cache from public TxActor API - tighten cancel_token & stop() logic; remove redundancies/noops - simpler internal APIs for TxActor functions --- crates/cli/src/commands/spamd.rs | 4 +- crates/core/src/spammer/error.rs | 3 ++ crates/core/src/spammer/spammer_trait.rs | 7 +-- crates/core/src/spammer/tx_actor.rs | 69 +++++++----------------- crates/core/src/test_scenario.rs | 66 ++++------------------- 5 files changed, 36 insertions(+), 113 deletions(-) diff --git a/crates/cli/src/commands/spamd.rs b/crates/cli/src/commands/spamd.rs index 6c44136b..9f410a6c 100644 --- a/crates/cli/src/commands/spamd.rs +++ b/crates/cli/src/commands/spamd.rs @@ -12,7 +12,7 @@ use std::{ }, time::Duration, }; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; /// Runs spam in a loop, potentially executing multiple spam runs. /// @@ -63,7 +63,7 @@ pub async fn spamd( do_finish = true; } if do_finish { - info!("Spam loop finished."); + debug!("spamd loop finished"); break; } diff --git a/crates/core/src/spammer/error.rs b/crates/core/src/spammer/error.rs index 35c86951..6329ce77 100644 --- a/crates/core/src/spammer/error.rs +++ b/crates/core/src/spammer/error.rs @@ -29,6 +29,9 @@ pub enum CallbackError { #[error("failed to flush cache. pending txs: {0:?}")] FlushCache(Vec), + #[error("ctx cannot be updated before it's initialized")] + UpdateRequiresCtx, + #[error("failed to send mpsc message: {0}")] TxActorSendMessage(#[from] Box>), diff --git a/crates/core/src/spammer/spammer_trait.rs b/crates/core/src/spammer/spammer_trait.rs index e8574e20..20a5f442 100644 --- a/crates/core/src/spammer/spammer_trait.rs +++ b/crates/core/src/spammer/spammer_trait.rs @@ -6,7 +6,7 @@ use alloy::providers::Provider; use contender_engine_provider::DEFAULT_BLOCK_TIME; use futures::Stream; use futures::StreamExt; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::db::SpamDuration; use crate::spammer::tx_actor::ActorContext; @@ -128,7 +128,7 @@ where // run spammer within tokio::select! to allow for graceful shutdown let spam_finished: bool = tokio::select! { _ = tokio::signal::ctrl_c() => { - warn!("CTRL-C received, stopping spamming..."); + debug!("CTRL-C received, dropping execute_spammer call..."); cancel_token.cancel(); false @@ -153,7 +153,8 @@ where let dump_finished: bool = tokio::select! { _ = tokio::signal::ctrl_c() => { warn!("CTRL-C received, stopping tx cache dump..."); - cancel_token.cancel(); + self.get_msg_handler(scenario.db.clone(), scenario.rpc_client.clone()).stop().await?; + false }, _ = scenario.dump_tx_cache(run_id) => { diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 5885ccd3..7cce617c 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -11,6 +11,7 @@ use crate::{ Result, }; +#[derive(Debug)] pub enum TxActorMessage { InitCtx(ActorContext), GetCacheLen(oneshot::Sender), @@ -25,11 +26,6 @@ pub enum TxActorMessage { tx_hash: TxHash, on_remove: oneshot::Sender<()>, }, - FlushCache { - run_id: u64, - on_flush: oneshot::Sender>, // returns the number of txs remaining in cache - target_block_num: u64, - }, DumpCache { run_id: u64, on_dump_cache: oneshot::Sender>, @@ -121,17 +117,26 @@ where } } + pub fn update_ctx_target_block(&mut self, target_block_num: u64) -> Result<()> { + if let Some(ctx) = self.ctx.as_mut() { + ctx.target_block = target_block_num; + } else { + return Err(CallbackError::UpdateRequiresCtx.into()); + } + + Ok(()) + } + /// Waits for target block to appear onchain, /// gets block receipts for the target block, /// removes txs that were included in the block from cache, and saves them to the DB. async fn flush_cache( - cache: &mut Vec, - db: &Arc, - rpc: &Arc, + &mut self, run_id: u64, on_flush: oneshot::Sender>, // returns the number of txs remaining in cache target_block_num: u64, ) -> Result<()> { + let Self { cache, rpc, db, .. } = self; info!("unconfirmed txs: {}", cache.len()); if cache.is_empty() { @@ -255,11 +260,8 @@ where Ok(()) } + /// Parse message and execute appropriate methods. async fn handle_message(&mut self, message: TxActorMessage) -> Result<()> { - if self.status == ActorStatus::ShuttingDown { - return Ok(()); - } - match message { TxActorMessage::GetCacheLen(on_len) => { on_len @@ -293,21 +295,6 @@ where Self::remove_cached_tx(&mut self.cache, tx_hash).await?; on_remove.send(()).map_err(CallbackError::OneshotSend)?; } - TxActorMessage::FlushCache { - on_flush, - run_id, - target_block_num, - } => { - Self::flush_cache( - &mut self.cache, - &self.db, - &self.rpc, - run_id, - on_flush, - target_block_num, - ) - .await?; - } TxActorMessage::DumpCache { on_dump_cache, run_id, @@ -334,13 +321,13 @@ where tokio::select! { // periodically flush cache in background while spammer runs _ = interval.tick() => { - if let Some(ctx) = self.ctx.as_mut() { + if let Some(ctx) = self.ctx.to_owned() { let new_block = provider.get_block_number().await?; for bn in ctx.target_block..new_block { let (on_flush, _receiver) = oneshot::channel(); - Self::flush_cache(&mut self.cache, &self.db, &self.rpc, ctx.run_id, on_flush, bn).await?; + self.flush_cache(ctx.run_id, on_flush, bn).await?; } - ctx.target_block = new_block; + self.update_ctx_target_block(new_block)?; } else { debug!("TxActor context not initialized."); } @@ -429,25 +416,6 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } - /// Removes txs included onchain from the cache, saves them to the DB, and returns the number of txs remaining in the cache. - pub async fn flush_cache( - &self, - run_id: u64, - target_block_num: u64, - ) -> Result> { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(TxActorMessage::FlushCache { - run_id, - on_flush: sender, - target_block_num, - }) - .await - .map_err(Box::new) - .map_err(CallbackError::from)?; - Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) - } - pub async fn done_flushing(&self) -> Result { let (sender, receiver) = oneshot::channel(); self.sender @@ -474,7 +442,7 @@ impl TxActorHandle { Ok(receiver.await.map_err(CallbackError::OneshotReceive)?) } - /// Stops the actor, terminating any pending tasks. + /// Stops the actor, terminating all pending tasks. pub async fn stop(&self) -> Result<()> { let (sender, receiver) = oneshot::channel(); self.sender @@ -486,7 +454,6 @@ impl TxActorHandle { } pub async fn init_ctx(&self, ctx: ActorContext) -> Result<()> { - // let (sender, receiver) = oneshot::channel::<()>(); self.sender .send(TxActorMessage::InitCtx(ctx)) .await diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index c334f451..444ad4c5 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -1031,10 +1031,12 @@ where .await .unwrap_or_else(|e| trace!("failed to send error signal: {e:?}")); } else { - success_sender - .send(()) - .await - .unwrap_or_else(|e| trace!("failed to send success signal: {e:?}")); + if !cancel_token.is_cancelled() { + success_sender + .send(()) + .await + .unwrap_or_else(|e| trace!("failed to send success signal: {e:?}")); + } } let mut tx_handles = vec![]; @@ -1419,57 +1421,6 @@ where .collect::<_>()) } - pub async fn flush_tx_cache(&self, block_start: u64, run_id: u64) -> Result<()> { - let mut block_counter = 0; - // the number of blocks to check for stalled txs - let block_timeout = ((self.pending_tx_timeout_secs / self.ctx.block_time_secs) + 1) - // must be at least 2 blocks because otherwise we have nothing to compare - .max(2); - let mut cache_size_queue = vec![]; - cache_size_queue.resize(block_timeout as usize, 1); - for msg_handle in self.msg_handles.values() { - loop { - let pending_txs = msg_handle - .flush_cache(run_id, block_start + block_counter as u64) - .await?; - cache_size_queue.rotate_right(1); - cache_size_queue[0] = pending_txs.len(); - - if pending_txs.is_empty() { - break; - } - - let current_timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("time went backwards") - .as_millis(); - - // remove cached txs if the size hasn't changed for the last N blocks - if cache_size_queue - .iter() - .all(|&size| size == cache_size_queue[0]) - { - warn!( - "Cache size has not changed for the last {block_timeout} blocks. Removing stalled txs...", - ); - - for tx in &pending_txs { - // only remove txs that have been waiting for > T seconds - if current_timestamp - > tx.start_timestamp_ms + (self.pending_tx_timeout_secs as u128 * 1000) - { - msg_handle.remove_cached_tx(tx.tx_hash).await?; - } - } - } - - block_counter += 1; - } - } - - Ok(()) - } - /// Wait for `self.pending_tx_timeout_secs` for pending txs to confirm, then delete any remaining cache items. pub async fn dump_tx_cache(&self, run_id: u64) -> Result<()> { debug!("dumping tx cache..."); @@ -1615,10 +1566,11 @@ async fn handle_tx_outcome<'a, F: SpamCallback + 'static>( } warn!("error from tx {tx_hash}: {msg}"); extra = extra.with_error(msg.to_string()); - } else { + } else if !ctx.cancel_token.is_cancelled() { // success path if let Err(e) = ctx.success_sender.send(()).await { - warn!( + // this error can safely be ignored; it just means the receiver was closed (e.g. by CTRL-C) + debug!( "failed to send success notification for tx {}: {:?}", tx_hash, e ); From 5cf0091f7247be77db5cab3f43e61f51e6e37e31 Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:08:26 -0800 Subject: [PATCH 4/8] more simple internal apis for TxActor --- crates/core/src/spammer/tx_actor.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 7cce617c..1e304276 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -229,12 +229,9 @@ where } /// Dumps all cached txs into the DB. Does not assign `end_timestamp`, `block_number`, or `gas_used`. - async fn dump_cache( - cache: &mut Vec, - db: &Arc, - run_id: u64, - ) -> Result> { - let run_txs = cache + async fn dump_cache(&mut self, run_id: u64) -> Result> { + let run_txs = self + .cache .iter() .map(|pending_tx| RunTx { tx_hash: pending_tx.tx_hash, @@ -246,17 +243,20 @@ where error: pending_tx.error.to_owned(), }) .collect::>(); - db.insert_run_txs(run_id, &run_txs).map_err(|e| e.into())?; - cache.clear(); + self.db + .insert_run_txs(run_id, &run_txs) + .map_err(|e| e.into())?; + self.cache.clear(); Ok(run_txs) } - async fn remove_cached_tx(cache: &mut Vec, old_tx_hash: TxHash) -> Result<()> { - let old_tx = cache + async fn remove_cached_tx(&mut self, old_tx_hash: TxHash) -> Result<()> { + let old_tx = self + .cache .iter() .position(|tx| tx.tx_hash == old_tx_hash) .ok_or(CallbackError::CacheRemoveTx(old_tx_hash))?; - cache.remove(old_tx); + self.cache.remove(old_tx); Ok(()) } @@ -292,14 +292,14 @@ where on_receive.send(()).map_err(CallbackError::OneshotSend)?; } TxActorMessage::RemovedRunTx { tx_hash, on_remove } => { - Self::remove_cached_tx(&mut self.cache, tx_hash).await?; + self.remove_cached_tx(tx_hash).await?; on_remove.send(()).map_err(CallbackError::OneshotSend)?; } TxActorMessage::DumpCache { on_dump_cache, run_id, } => { - let res = Self::dump_cache(&mut self.cache, &self.db, run_id).await?; + let res = self.dump_cache(run_id).await?; on_dump_cache.send(res).map_err(CallbackError::DumpCache)?; } } From 8271d1c878edec4a792d8afc54a15fbf66b4d4b6 Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:16:28 -0800 Subject: [PATCH 5/8] chore: clippy --- crates/core/src/spammer/tx_actor.rs | 7 ++----- crates/core/src/test_scenario.rs | 12 +++++------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 1e304276..52de0ace 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -87,16 +87,13 @@ impl ActorContext { } #[derive(Clone, Debug, PartialEq)] +#[derive(Default)] pub enum ActorStatus { ShuttingDown, + #[default] Running, } -impl Default for ActorStatus { - fn default() -> Self { - ActorStatus::Running - } -} impl TxActor where diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index 444ad4c5..2b68cb5d 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -1030,13 +1030,11 @@ where .send(e) .await .unwrap_or_else(|e| trace!("failed to send error signal: {e:?}")); - } else { - if !cancel_token.is_cancelled() { - success_sender - .send(()) - .await - .unwrap_or_else(|e| trace!("failed to send success signal: {e:?}")); - } + } else if !cancel_token.is_cancelled() { + success_sender + .send(()) + .await + .unwrap_or_else(|e| trace!("failed to send success signal: {e:?}")); } let mut tx_handles = vec![]; From 2710ddf456bbcb5adca65e3ac30c97ad4b80be9e Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:17:29 -0800 Subject: [PATCH 6/8] chore: fmt --- crates/core/src/spammer/tx_actor.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index 52de0ace..4eef257a 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -86,15 +86,13 @@ impl ActorContext { } } -#[derive(Clone, Debug, PartialEq)] -#[derive(Default)] +#[derive(Clone, Debug, PartialEq, Default)] pub enum ActorStatus { ShuttingDown, #[default] Running, } - impl TxActor where D: DbOps + Send + Sync + 'static, From 003e760d23c1618e3e951bca27ed616f52c1c391 Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Fri, 19 Dec 2025 17:53:46 -0800 Subject: [PATCH 7/8] (WIP) support file paths for create defs in scenario files --- crates/cli/src/default_scenarios/blobs.rs | 7 +- .../src/default_scenarios/custom_contract.rs | 2 +- crates/cli/src/default_scenarios/erc20.rs | 75 +++---- .../eth_functions/command.rs | 11 +- .../cli/src/default_scenarios/fill_block.rs | 11 +- crates/cli/src/default_scenarios/revert.rs | 13 +- crates/cli/src/default_scenarios/storage.rs | 11 +- crates/cli/src/default_scenarios/stress.rs | 11 +- crates/cli/src/default_scenarios/transfers.rs | 7 +- crates/cli/src/default_scenarios/uni_v2.rs | 10 +- crates/cli/src/util/error.rs | 3 + crates/cli/src/util/utils.rs | 7 +- crates/core/src/generator/create_def.rs | 211 +++++++++++++++++- crates/core/src/generator/templater.rs | 13 +- crates/core/src/generator/trait.rs | 10 +- crates/core/src/test_scenario.rs | 9 +- crates/testfile/src/lib.rs | 50 ++--- crates/testfile/src/test_config.rs | 17 ++ 18 files changed, 345 insertions(+), 133 deletions(-) diff --git a/crates/cli/src/default_scenarios/blobs.rs b/crates/cli/src/default_scenarios/blobs.rs index e42b164e..1b4cca0f 100644 --- a/crates/cli/src/default_scenarios/blobs.rs +++ b/crates/cli/src/default_scenarios/blobs.rs @@ -37,11 +37,6 @@ fn blob_txs(blob_data: impl AsRef, recipient: Option) -> Vec contender_testfile::TestConfig { - TestConfig { - env: None, - create: None, - setup: None, - spam: Some(blob_txs(&self.blob_data, self.recipient.to_owned())), - } + TestConfig::new().with_spam(blob_txs(&self.blob_data, self.recipient.to_owned())) } } diff --git a/crates/cli/src/default_scenarios/custom_contract.rs b/crates/cli/src/default_scenarios/custom_contract.rs index d947919b..ec2e4b55 100644 --- a/crates/cli/src/default_scenarios/custom_contract.rs +++ b/crates/cli/src/default_scenarios/custom_contract.rs @@ -417,7 +417,7 @@ impl NameAndArgs { impl ToTestConfig for CustomContractArgs { fn to_testconfig(&self) -> contender_testfile::TestConfig { TestConfig::new() - .with_create(vec![CreateDefinition::new(&self.contract)]) + .with_create(vec![CreateDefinition::new(&self.contract.clone().into())]) .with_setup(self.setup.to_owned()) .with_spam(self.spam.iter().map(SpamRequest::new_tx).collect()) } diff --git a/crates/cli/src/default_scenarios/erc20.rs b/crates/cli/src/default_scenarios/erc20.rs index c9a3d08b..7af486be 100644 --- a/crates/cli/src/default_scenarios/erc20.rs +++ b/crates/cli/src/default_scenarios/erc20.rs @@ -69,48 +69,47 @@ impl ToTestConfig for Erc20Args { .with_args(&[recipient.to_string(), self.fund_amount.to_string()]) }) .collect(); + let spam_steps = vec![SpamRequest::new_tx(&{ + let mut func_def = FunctionCallDefinition::new(token.template_name()) + .with_from_pool("spammers") // Senders from limited pool + .with_signature("transfer(address guy, uint256 wad)") + .with_args(&[ + // Use token_recipient if provided (via --recipient flag), + // otherwise this is a placeholder for fuzzing + self.token_recipient + .as_ref() + .map(|addr| addr.to_string()) + .unwrap_or_else(|| { + "0x0000000000000000000000000000000000000000".to_string() + }), + self.send_amount.to_string(), + ]) + .with_gas_limit(55000); - TestConfig { - env: None, - create: Some(vec![CreateDefinition { - contract: token.to_owned(), + // Only add fuzzing if token_recipient is NOT provided + if self.token_recipient.is_none() { + func_def = func_def.with_fuzz(&[FuzzParam { + param: Some("guy".to_string()), + value: None, + min: Some(U256::from(1)), + max: Some( + U256::from_str("0x0000000000ffffffffffffffffffffffffffffffff").unwrap(), + ), + }]); + } + + func_def + })]; + + TestConfig::new() + .with_create(vec![CreateDefinition { + contract: token.to_owned().into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: Some(setup_steps), - spam: Some(vec![SpamRequest::new_tx(&{ - let mut func_def = FunctionCallDefinition::new(token.template_name()) - .with_from_pool("spammers") // Senders from limited pool - .with_signature("transfer(address guy, uint256 wad)") - .with_args(&[ - // Use token_recipient if provided (via --recipient flag), - // otherwise this is a placeholder for fuzzing - self.token_recipient - .as_ref() - .map(|addr| addr.to_string()) - .unwrap_or_else(|| { - "0x0000000000000000000000000000000000000000".to_string() - }), - self.send_amount.to_string(), - ]) - .with_gas_limit(55000); - - // Only add fuzzing if token_recipient is NOT provided - if self.token_recipient.is_none() { - func_def = func_def.with_fuzz(&[FuzzParam { - param: Some("guy".to_string()), - value: None, - min: Some(U256::from(1)), - max: Some( - U256::from_str("0x0000000000ffffffffffffffffffffffffffffffff").unwrap(), - ), - }]); - } - - func_def - })]), - } + }]) + .with_setup(setup_steps) + .with_spam(spam_steps) } } diff --git a/crates/cli/src/default_scenarios/eth_functions/command.rs b/crates/cli/src/default_scenarios/eth_functions/command.rs index 5cac9660..63e03ee3 100644 --- a/crates/cli/src/default_scenarios/eth_functions/command.rs +++ b/crates/cli/src/default_scenarios/eth_functions/command.rs @@ -73,17 +73,14 @@ impl ToTestConfig for EthFunctionsArgs { let opcode_txs = opcode_txs(opcodes, *num_iterations); let txs = [precompile_txs, opcode_txs].concat(); - TestConfig { - env: None, - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: contracts::SPAM_ME.into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: None, - spam: Some(txs), - } + }]) + .with_spam(txs) } } diff --git a/crates/cli/src/default_scenarios/fill_block.rs b/crates/cli/src/default_scenarios/fill_block.rs index b5173e52..ed405f25 100644 --- a/crates/cli/src/default_scenarios/fill_block.rs +++ b/crates/cli/src/default_scenarios/fill_block.rs @@ -81,17 +81,14 @@ impl ToTestConfig for FillBlockArgs { }) .collect::>(); - TestConfig { - env: None, - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: contracts::SPAM_ME.into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: None, - spam: Some(spam_txs), - } + }]) + .with_spam(spam_txs) } } diff --git a/crates/cli/src/default_scenarios/revert.rs b/crates/cli/src/default_scenarios/revert.rs index 13e1e00f..511f015f 100644 --- a/crates/cli/src/default_scenarios/revert.rs +++ b/crates/cli/src/default_scenarios/revert.rs @@ -23,22 +23,19 @@ impl Default for RevertCliArgs { impl ToTestConfig for RevertCliArgs { fn to_testconfig(&self) -> contender_testfile::TestConfig { - TestConfig { - env: None, - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: SPAM_ME_6.into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: None, - spam: Some(vec![SpamRequest::new_tx( + }]) + .with_spam(vec![SpamRequest::new_tx( &FunctionCallDefinition::new(SPAM_ME_6.template_name()) .with_signature("consumeGasAndRevert(uint256 gas)") .with_args(&[self.gas_use.to_string()]) .with_gas_limit(self.gas_use + 35000), - )]), - } + )]) } } diff --git a/crates/cli/src/default_scenarios/storage.rs b/crates/cli/src/default_scenarios/storage.rs index 3d3f5f69..d57ba8af 100644 --- a/crates/cli/src/default_scenarios/storage.rs +++ b/crates/cli/src/default_scenarios/storage.rs @@ -62,17 +62,14 @@ impl ToTestConfig for StorageStressArgs { .map(SpamRequest::Tx) .collect::>(); - TestConfig { - env: None, - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: contracts::SPAM_ME.into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: None, - spam: Some(txs), - } + }]) + .with_spam(txs) } } diff --git a/crates/cli/src/default_scenarios/stress.rs b/crates/cli/src/default_scenarios/stress.rs index 726e02af..5b18f9a0 100644 --- a/crates/cli/src/default_scenarios/stress.rs +++ b/crates/cli/src/default_scenarios/stress.rs @@ -206,17 +206,14 @@ impl ToTestConfig for StressCliArgs { .flat_map(|config| config.spam.unwrap_or_default()) .collect::>(); - TestConfig { - env: None, - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: contracts::SPAM_ME.into(), signature: None, args: None, from: None, from_pool: Some("admin".to_owned()), - }]), - setup: None, - spam: Some(txs), - } + }]) + .with_spam(txs) } } diff --git a/crates/cli/src/default_scenarios/transfers.rs b/crates/cli/src/default_scenarios/transfers.rs index 8dd5b3e9..56e9d56f 100644 --- a/crates/cli/src/default_scenarios/transfers.rs +++ b/crates/cli/src/default_scenarios/transfers.rs @@ -59,11 +59,6 @@ impl ToTestConfig for TransferStressArgs { .map(Box::new) .map(SpamRequest::Tx) .collect::>(); - contender_testfile::TestConfig { - env: None, - create: None, - setup: None, - spam: Some(txs), - } + contender_testfile::TestConfig::new().with_spam(txs) } } diff --git a/crates/cli/src/default_scenarios/uni_v2.rs b/crates/cli/src/default_scenarios/uni_v2.rs index 2dc3c1ef..ea39ae00 100644 --- a/crates/cli/src/default_scenarios/uni_v2.rs +++ b/crates/cli/src/default_scenarios/uni_v2.rs @@ -135,7 +135,7 @@ impl ToTestConfig for UniV2Args { let mut add_create_steps = vec![]; for i in 0..self.num_tokens { let deployment = CreateDefinition { - contract: test_token(i + 1, self.initial_token_supply), + contract: test_token(i + 1, self.initial_token_supply).into(), signature: None, args: None, from: None, @@ -147,14 +147,18 @@ impl ToTestConfig for UniV2Args { // now that contract updates are done, we can update the config & use contracts in setup steps config.create = Some(create_steps.to_owned()); - let find_contract = |name: &str| { + let find_contract = |name: &str| -> Result { let contract = create_steps .iter() .find(|c| c.contract.name == name) .ok_or(UniV2Error::ContractNameNotFound(name.to_owned()))? .contract .to_owned(); - Ok::<_, UniV2Error>(contract) + Ok::<_, UniV2Error>( + contract + .to_compiled_contract(Default::default()) + .expect("contract is static, this can't fail"), + ) }; let weth = find_contract("weth").expect("contract"); diff --git a/crates/cli/src/util/error.rs b/crates/cli/src/util/error.rs index f8f5d751..9b00b8b6 100644 --- a/crates/cli/src/util/error.rs +++ b/crates/cli/src/util/error.rs @@ -13,6 +13,9 @@ pub enum UtilError { #[error("io error")] Io(#[from] std::io::Error), + #[error("invalid scenario path: {0}")] + InvalidScenarioPath(String), + #[error("failed to parse duration")] ParseDuration(#[from] ParseDurationError), diff --git a/crates/cli/src/util/utils.rs b/crates/cli/src/util/utils.rs index 2070b572..fa2d1d66 100644 --- a/crates/cli/src/util/utils.rs +++ b/crates/cli/src/util/utils.rs @@ -23,6 +23,7 @@ use contender_engine_provider::{ControlChain, DEFAULT_BLOCK_TIME}; use contender_testfile::TestConfig; use nu_ansi_term::{AnsiGenericString, Color, Style as ANSIStyle}; use rand::Rng; +use std::path::PathBuf; use std::{str::FromStr, sync::Arc, time::Duration}; use tracing::{debug, info, warn}; @@ -65,7 +66,11 @@ pub async fn load_testconfig(testfile: &str) -> Result = String> { pub name: S, } +pub enum ContractFileType { + Json { + /// Relative file path to the file containing bytecode. + path: PathBuf, + /// Determines where in the json the bytecode is located. Example: `.bytecode.object` (for forge builds) + bytecode_filter: String, + }, + Hex { + /// Relative file path to the file containing bytecode. + path: PathBuf, + }, +} + +struct MiniJq { + /// JSON file contents. + input: serde_json::Value, +} + +fn extract_string( + json: &serde_json::Value, + filter: &[String], +) -> Result> { + if filter.len() == 0 { + // desired element is in `json` + let contents: String = json.to_owned().to_string(); + return Ok(contents); + } + + let key = &filter[0]; + let val = json.get(key); + if let Some(val) = val { + // recurse w/ first element of filter removed + let new_filter = &filter[1..]; + return extract_string(val, new_filter); + } else { + return Err("invalid filter, key not found".into()); + } +} + +impl MiniJq { + pub fn new(input: &str) -> Result { + Ok(Self { + input: serde_json::from_str(input)?, + }) + } + + fn path(&self, filter: &str) -> Vec { + filter.split('.').map(|x| x.to_owned()).collect::>() + } + + pub fn value(&self, filter: &str) -> Result> { + extract_string(&self.input, &self.path(filter)) + } +} + +impl ContractFileType { + fn read_file( + scenario_path: &PathBuf, + relative_path: &PathBuf, + ) -> Result { + std::fs::read_to_string(scenario_path.join(relative_path)) + .map_err(|e| ContractError::ReadFile(e, relative_path.to_owned())) + } + + pub fn bytecode(&self, scenario_path: &PathBuf) -> Result { + use ContractFileType::*; + + match self { + Json { + path, + bytecode_filter, + } => { + // read file + let file_contents = Self::read_file(scenario_path, path)?; + // extract bytecode string from json + MiniJq::new(&file_contents) + .map_err(ContractError::JsonParse)? + .value(&bytecode_filter) + .map_err(|_| ContractError::InvalidJsonFilter(bytecode_filter.to_owned())) + } + Hex { path } => Self::read_file(scenario_path, path), + } + } +} + +#[derive(Debug, Error)] +pub enum ContractError { + #[error("invalid JSON")] + JsonParse(#[from] serde_json::Error), + + #[error("invalid contract bytecode configuration: {0}")] + InvalidConfig(&'static str), + + #[error("failed to read file at {1}: {0}")] + ReadFile(std::io::Error, PathBuf), + + #[error("'filter' containing location of raw bytecode is required for json files (example: \".bytecode.object\")")] + FilterRequiredForJson, + + #[error("invalid json filter: \"{0}\"")] + InvalidJsonFilter(String), + + #[error("unsupported file type, .hex and .json are supported")] + UnsupportedType, +} + +#[derive(Clone, Deserialize, Debug, Serialize)] +/// File spec for contracts. +pub struct ContractFile { + path: String, + filter: Option, +} + +impl ContractFile { + pub fn identify(&self) -> Result { + use ContractError::*; + let path = self.path.to_lowercase(); + if path.ends_with(".hex") { + return Ok(ContractFileType::Hex { path: path.into() }); + } + if path.ends_with(".json") { + if let Some(filter) = self.filter.clone() { + return Ok(ContractFileType::Json { + path: path.into(), + bytecode_filter: filter, + }); + } + return Err(FilterRequiredForJson); + } + return Err(UnsupportedType); + } +} + +#[derive(Clone, Deserialize, Debug, Serialize)] +/// Specification for config files to specify raw bytecode or a file containing raw bytecode. +/// ((Open to changes on the name)) +pub struct CompiledContractOrFile = String> { + pub bytecode: Option, + pub bytecode_file: Option, + pub name: S, +} + +impl> CompiledContractOrFile { + fn source(&self) -> Result { + if self.bytecode.is_some() && self.bytecode_file.is_some() { + return Err(ContractError::InvalidConfig( + "cannot specify both 'bytecode' & 'file'", + )); + } + if let Some(bytecode) = &self.bytecode { + return Ok(ContractSource::Bytecode(bytecode.as_ref().to_string())); + } + if let Some(file) = &self.bytecode_file { + return Ok(ContractSource::File(file.identify()?)); + } + return Err(ContractError::InvalidConfig( + "must specify 'bytecode' or 'file' in contract creation", + )); + } +} + +enum ContractSource { + Bytecode(String), + File(ContractFileType), +} + +impl CompiledContractOrFile { + pub fn to_compiled_contract( + &self, + scenario_path: PathBuf, + ) -> Result { + Ok(CompiledContract { + bytecode: match self.source()? { + ContractSource::Bytecode(bytecode) => bytecode, + ContractSource::File(file) => file.bytecode(&scenario_path).map(|s| s)?, + }, + name: self.name.clone(), + }) + } +} + impl> CompiledContract { pub fn new(bytecode: T, name: T) -> Self { CompiledContract { bytecode, name } @@ -30,6 +213,26 @@ impl<'a> From> for CompiledContract { } } +impl<'a> From> for CompiledContractOrFile { + fn from(value: CompiledContract<&'a str>) -> Self { + Self { + bytecode: Some(value.bytecode.to_owned()), + bytecode_file: None, + name: value.name.to_owned(), + } + } +} + +impl + Clone> From> for CompiledContractOrFile { + fn from(value: CompiledContract) -> Self { + Self { + bytecode: Some(value.bytecode.clone()), + bytecode_file: None, + name: value.name.clone(), + } + } +} + impl CompiledContract { pub fn with_constructor_args( mut self, @@ -61,10 +264,14 @@ impl CompiledContract { } } +pub struct CreateDefinitionUnresolved { + pub contract: CompiledContractOrFile, +} + #[derive(Clone, Deserialize, Debug, Serialize)] pub struct CreateDefinition { #[serde(flatten)] - pub contract: CompiledContract, + pub contract: CompiledContractOrFile, /// Constructor signature. Formats supported: "constructor(type1,type2,...)" or "(type1,type2,...)". pub signature: Option, /// Constructor arguments. May include placeholders. @@ -76,7 +283,7 @@ pub struct CreateDefinition { } impl CreateDefinition { - pub fn new(contract: &CompiledContract) -> Self { + pub fn new(contract: &CompiledContractOrFile) -> Self { CreateDefinition { contract: contract.to_owned(), signature: None, diff --git a/crates/core/src/generator/templater.rs b/crates/core/src/generator/templater.rs index adba5956..6e3aba3f 100644 --- a/crates/core/src/generator/templater.rs +++ b/crates/core/src/generator/templater.rs @@ -4,7 +4,7 @@ use crate::{ constants::{SENDER_KEY, SETCODE_KEY}, function_def::{FunctionCallDefinition, FunctionCallDefinitionStrict}, util::{encode_calldata, UtilError}, - CreateDefinition, + ContractError, CreateDefinition, }, }; use alloy::{ @@ -12,7 +12,7 @@ use alloy::{ primitives::{Address, Bytes, FixedBytes, TxKind, U256}, rpc::types::TransactionRequest, }; -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf}; use thiserror::Error; use super::CreateDefinitionStrict; @@ -21,6 +21,9 @@ pub type Result = std::result::Result; #[derive(Debug, Error)] pub enum TemplaterError { + #[error("error from contract creation")] + CreateError(#[from] ContractError), + #[error("failed to find placeholder key '{0}'")] KeyNotFound(String), @@ -48,6 +51,7 @@ where fn copy_end(&self, input: &str, last_end: usize) -> String; fn num_placeholders(&self, input: &str) -> usize; fn find_key(&self, input: &str) -> Option<(K, usize)>; + fn scenario_parent_directory(&self) -> PathBuf; /// Looks for {placeholders} in `arg` and updates `env` with the values found by querying the DB. fn find_placeholder_values( @@ -145,8 +149,11 @@ where self.find_placeholder_values(from, placeholder_map, db, rpc_url, genesis_hash)?; } // also scan bytecode for placeholders + let compiled_contract = createdef + .contract + .to_compiled_contract(self.scenario_parent_directory())?; self.find_placeholder_values( - &createdef.contract.bytecode, + &compiled_contract.bytecode, placeholder_map, db, rpc_url, diff --git a/crates/core/src/generator/trait.rs b/crates/core/src/generator/trait.rs index 1ee2e2cd..14e64816 100644 --- a/crates/core/src/generator/trait.rs +++ b/crates/core/src/generator/trait.rs @@ -7,7 +7,7 @@ use crate::{ function_def::{FunctionCallDefinition, FunctionCallDefinitionStrict, FuzzParam}, named_txs::{ExecutionRequest, NamedTxRequest, NamedTxRequestBuilder}, seeder::{SeedValue, Seeder}, - templater::Templater, + templater::{Templater, TemplaterError}, types::{AnyProvider, AsyncCallbackResult, PlanType, SpamRequest}, util::{parse_value, UtilError}, CreateDefinition, CreateDefinitionStrict, RandSeed, @@ -185,13 +185,17 @@ where // handle direct variable injection // (backwards-compatible for bytecode defs that include placeholders, // rather than using `args` + `signature` in the `CreateDefinition`) - let bytecode = create_def.contract.bytecode.to_owned().replace( + let compiled_contract = create_def + .contract + .to_compiled_contract(self.get_templater().scenario_parent_directory()) + .map_err(|e| TemplaterError::CreateError(e))?; + let bytecode = compiled_contract.bytecode.to_owned().replace( "{_sender}", &format!("{}{}", "0".repeat(24), from_address.encode_hex()), ); // inject address WITHOUT 0x prefix, padded with 24 zeroes Ok(CreateDefinitionStrict { - name: create_def.contract.name.to_owned(), + name: compiled_contract.name.to_owned(), bytecode, from: from_address, signature: create_def.signature.to_owned(), diff --git a/crates/core/src/test_scenario.rs b/crates/core/src/test_scenario.rs index 2b68cb5d..09b92fb6 100644 --- a/crates/core/src/test_scenario.rs +++ b/crates/core/src/test_scenario.rs @@ -1754,7 +1754,8 @@ pub mod tests { contract: CompiledContract { bytecode: COUNTER_BYTECODE.to_string(), name: "test_counter2".to_string(), - }, + } + .into(), signature: None, args: None, from: None, @@ -1764,7 +1765,8 @@ pub mod tests { contract: CompiledContract { bytecode: UNI_V2_FACTORY_BYTECODE.to_string(), name: "univ2_factory".to_string(), - }, + } + .into(), signature: None, args: None, from: None, @@ -1868,6 +1870,9 @@ pub mod tests { } impl Templater for MockConfig { + fn scenario_parent_directory(&self) -> std::path::PathBuf { + Default::default() + } fn copy_end(&self, input: &str, _last_end: usize) -> String { input.to_owned() } diff --git a/crates/testfile/src/lib.rs b/crates/testfile/src/lib.rs index b36dcac5..40251805 100644 --- a/crates/testfile/src/lib.rs +++ b/crates/testfile/src/lib.rs @@ -69,12 +69,7 @@ pub mod tests { "0xdead".to_owned(), ]); - TestConfig { - env: None, - create: None, - setup: None, - spam: vec![SpamRequest::Tx(Box::new(fncall))].into(), - } + TestConfig::new().with_spam(vec![SpamRequest::Tx(Box::new(fncall))].into()) } pub fn get_fuzzy_testconfig() -> TestConfig { @@ -95,11 +90,8 @@ pub mod tests { max: None, }]) }; - TestConfig { - env: None, - create: None, - setup: None, - spam: vec![ + TestConfig::new().with_spam( + vec![ SpamRequest::Tx(Box::new(fn_call( "0xbeef", "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", @@ -121,15 +113,12 @@ pub mod tests { }), ] .into(), - } + ) } pub fn get_setup_testconfig() -> TestConfig { - TestConfig { - env: None, - create: None, - spam: None, - setup: vec![ + TestConfig::new().with_setup( + vec![ FunctionCallDefinition::new("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D") .with_from("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266") .with_value(U256::from(4096)) @@ -142,40 +131,37 @@ pub mod tests { .with_args(&["1", "2", &Address::repeat_byte(0x11).encode_hex(), "0xbeef"]), ] .into(), - } + ) } pub fn get_create_testconfig() -> TestConfig { let mut env = HashMap::new(); env.insert("test1".to_owned(), "0xbeef".to_owned()); env.insert("test2".to_owned(), "0x9001".to_owned()); - TestConfig { - env: Some(env), - create: Some(vec![CreateDefinition { + TestConfig::new() + .with_create(vec![CreateDefinition { contract: CompiledContract::new( COUNTER_BYTECODE.to_string(), "test_counter".to_string(), - ), + ) + .into(), signature: None, args: None, from: Some("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266".to_owned()), from_pool: None, - }]), - spam: None, - setup: None, - } + }]) + .with_env(env) } pub fn get_composite_testconfig() -> TestConfig { let tc_fuzz = get_fuzzy_testconfig(); let tc_setup = get_setup_testconfig(); let tc_create = get_create_testconfig(); - TestConfig { - env: tc_create.env, // TODO: add something here - create: tc_create.create, - spam: tc_fuzz.spam, - setup: tc_setup.setup, - } + TestConfig::new() + .with_create(tc_create.create.unwrap()) + .with_env(tc_create.env.unwrap()) + .with_setup(tc_setup.setup.unwrap()) + .with_spam(tc_fuzz.spam.unwrap()) } #[tokio::test] diff --git a/crates/testfile/src/test_config.rs b/crates/testfile/src/test_config.rs index 238e1d4d..b8e1b695 100644 --- a/crates/testfile/src/test_config.rs +++ b/crates/testfile/src/test_config.rs @@ -6,6 +6,7 @@ use contender_core::generator::{ FunctionCallDefinition, PlanConfig, }; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; use std::{collections::HashMap, str::FromStr}; use std::{fs::read, ops::Deref}; @@ -24,6 +25,8 @@ pub struct TestConfig { /// Function to call in spam txs. pub spam: Option>, + + _scenario_directory: Option, } impl TestConfig { @@ -33,9 +36,15 @@ impl TestConfig { create: None, setup: None, spam: None, + _scenario_directory: None, } } + pub fn with_scenario_directory(mut self, dir: PathBuf) -> Self { + self._scenario_directory = Some(dir); + self + } + pub async fn from_remote_url(url: &str) -> Result { let file_contents = reqwest::get(url).await?.text().await?; let test_file: TestConfig = toml::from_str(&file_contents)?; @@ -220,6 +229,14 @@ impl PlanConfig for TestConfig { } impl Templater for TestConfig { + fn scenario_parent_directory(&self) -> std::path::PathBuf { + if let Some(dir) = &self._scenario_directory { + dir.to_owned() + } else { + Default::default() + } + } + /// Find values wrapped in brackets in a string and replace them with values from a hashmap whose key match the value in the brackets. /// example: "hello {world}" with hashmap {"world": "earth"} will return "hello earth" fn replace_placeholders(&self, input: &str, template_map: &HashMap) -> String { From 24fcbc850fd7abacb7fd368d3d272c63c65064b5 Mon Sep 17 00:00:00 2001 From: zeroXbrock <2791467+zeroXbrock@users.noreply.github.com> Date: Wed, 24 Dec 2025 17:02:46 -0800 Subject: [PATCH 8/8] fix merge --- crates/core/src/spammer/tx_actor.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/core/src/spammer/tx_actor.rs b/crates/core/src/spammer/tx_actor.rs index f77bdf47..ea8dabf8 100644 --- a/crates/core/src/spammer/tx_actor.rs +++ b/crates/core/src/spammer/tx_actor.rs @@ -349,12 +349,6 @@ where } } -impl TxActor { - pub fn is_shutting_down(&self) -> bool { - self.status == ActorStatus::ShuttingDown - } -} - #[derive(Debug)] pub struct TxActorHandle { sender: mpsc::Sender,