Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ cargotest.toml
!test_fixtures/*.db
report.csv


.DS_Store

# Local Foundry binaries
foundry_bin
foundry
foundry.zip
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ ark-ff = "0.5.0"

## sqlite
r2d2_sqlite = "0.25.0"
rusqlite = "0.32.1"
rusqlite = { version = "0.32.1", features = ["bundled"] }
r2d2 = "0.8.10"

## testfile
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ impl DbOps for MockDb {
)))
}

fn get_named_txs(
&self,
_name: &str,
_rpc_url: &str,
_genesis_hash: FixedBytes<32>,
) -> Result<Vec<NamedTx>, Self::Error> {
Ok(vec![])
}

fn get_setup_progress(&self, _scenario_hash: &str) -> Result<Option<u64>, Self::Error> {
Ok(None)
}

fn update_setup_progress(
&self,
_scenario_hash: &str,
_step_index: u64,
) -> Result<(), Self::Error> {
Ok(())
}

fn get_named_tx_by_address(&self, address: &Address) -> Result<Option<NamedTx>, Self::Error> {
Ok(Some(NamedTx::new(
String::default(),
Expand Down
15 changes: 15 additions & 0 deletions crates/core/src/db/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ pub trait DbOps {
genesis_hash: FixedBytes<32>,
) -> Result<Option<NamedTx>, Self::Error>;

fn get_named_txs(
&self,
name: &str,
rpc_url: &str,
genesis_hash: FixedBytes<32>,
) -> Result<Vec<NamedTx>, Self::Error>;

fn get_setup_progress(&self, scenario_hash: &str) -> Result<Option<u64>, Self::Error>;

fn update_setup_progress(
&self,
scenario_hash: &str,
step_index: u64,
) -> Result<(), Self::Error>;

fn get_named_tx_by_address(&self, address: &Address) -> Result<Option<NamedTx>, Self::Error>;

fn get_run(&self, run_id: u64) -> Result<Option<SpamRun>, Self::Error>;
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub enum RuntimeErrorKind {

#[error("invalid runtime params")]
InvalidParams(#[from] RuntimeParamErrorKind),

#[error("setup step {0} failed")]
SetupStepFailed(u64),
}

impl From<alloy::node_bindings::NodeError> for Error {
Expand Down
82 changes: 82 additions & 0 deletions crates/core/src/test_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ where
// we have to do this to populate the database with new named transaction after each deployment
let redeploy = self.redeploy;
let genesis_hash = self.ctx.genesis_hash;

let name_counts = Arc::new(std::sync::Mutex::new(HashMap::<String, usize>::new()));

self.load_txs(PlanType::Create(|tx_req| {
let from =
tx_req
Expand All @@ -502,6 +505,39 @@ where
"deploying contract: {:?}",
tx_req.name.as_ref().unwrap_or(&"".to_string())
);

// Resumable logic: check if contract already exists
if let Some(name) = &tx_req.name {
let mut counts = name_counts.lock().expect("mutex poisoned");
let count = counts.entry(name.clone()).or_insert(0);
let current_index = *count;
*count += 1;
drop(counts); // release lock

// Check DB for existing deployments with this name
let existing_txs = self
.db
.get_named_txs(name, self.rpc_url.as_str(), genesis_hash)
.map_err(|e| crate::error::Error::Db(e.into()))?;
if current_index < existing_txs.len() && !redeploy {
// Check if the specific instance exists and has code
let existing = &existing_txs[current_index];
if let Some(addr) = existing.address {
// We can't easily check code here synchronously in the closure without blocking async runtime
// But since we have the record in DB, we assume it was successful.
// To be extra safe, we could check code in a spawned task, but that complicates "skipping"
// passed to the caller.
// For now, trust the DB record.
info!(
contract = %name,
address = %addr,
"skipping deploy; contract already exists in DB"
);
return Ok(None);
}
}
}

let rpc_url = self.rpc_url.to_owned();
let tx_type = self.tx_type;
let wallet = self
Expand Down Expand Up @@ -551,7 +587,13 @@ where
.network::<AnyNetwork>()
.connect_http(rpc_url.to_owned());

// Note: The outer loop handles skipping if the count matches.
// This inner check is redundant if we trust the loop logic, but kept for `redeploy` check safety
// if we decide to run it anyway.
if let Some(name) = tx_req.name.as_ref() {
// We might be redeploying, so we proceed.
// If we are here, it means either it's not in DB (new) or redeploy=true.
// However, let's double check code existence if we are here (e.g. if we forced run).
if let Some(existing) = db
.get_named_tx(name, rpc_url.as_str(), genesis_hash)
.map_err(|e| e.into())?
Expand Down Expand Up @@ -625,7 +667,28 @@ where
pub async fn run_setup(&mut self) -> Result<()> {
let chain_id = self.chain_id;
let genesis_hash = self.ctx.genesis_hash;

let setup_steps = self.config.get_setup_steps().unwrap_or_default();
let scenario_hash = format!("{:x}", keccak256(format!("{:?}", setup_steps)));
let progress = self
.db
.get_setup_progress(&scenario_hash)
.map_err(|e| crate::error::Error::Db(e.into()))?;
let last_step_index = progress.map(|p| p as isize).unwrap_or(-1);

let current_step_idx = Arc::new(std::sync::Mutex::new(0isize));

self.load_txs(PlanType::Setup(|tx_req| {
let mut idx_lock = current_step_idx.lock().expect("mutex poisoned");
let idx = *idx_lock;
*idx_lock += 1;
drop(idx_lock);

if idx <= last_step_index {
info!("skipping setup step {} (already completed)", idx);
return Ok(None);
}

/* callback */
info!("{}", self.format_setup_log(&tx_req));

Expand All @@ -646,6 +709,7 @@ where
let db = self.db.clone();
let rpc_url = self.rpc_url.clone();
let tx_type = self.tx_type;
let scenario_hash_clone = scenario_hash.clone();

let handle = tokio::task::spawn(async move {
let wallet = ProviderBuilder::new()
Expand All @@ -671,6 +735,10 @@ where
warn!(
"failed to estimate gas for setup step '{tx_label}', skipping step..."
);
// If we skip due to failure, do we mark as done?
// Probably no, but the user requirement says "If a step fails, stop execution but preserve progress."
// Here we are skipping inside the task. If we return Ok, it won't crash.
// But we won't update progress for this step.
return Ok(());
}
};
Expand Down Expand Up @@ -714,6 +782,20 @@ where
.map_err(|e| e.into())?;
}

// Update progress after successful step
if receipt.status() {
db.update_setup_progress(&scenario_hash_clone, idx as u64)
.map_err(|e| crate::error::Error::Db(e.into()))?;
} else {
warn!(
"Setup step {} failed (reverted). Progress not updated.",
idx
);
// Should we error out? The requirement says "If a step fails, stop execution"
// load_txs awaits handles. If this task returns Err, load_txs returns Err.
return Err(RuntimeErrorKind::SetupStepFailed(idx as u64).into());
}

Ok(())
});
Ok(Some(handle))
Expand Down
89 changes: 89 additions & 0 deletions crates/sqlite_db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ impl DbOps for SqliteDb {
gas_per_second INTEGER NOT NULL,
gas_used INTEGER NOT NULL
)",
"CREATE TABLE setup_progress (
scenario_hash TEXT PRIMARY KEY,
last_step_index INTEGER NOT NULL
)",
];

for query in queries {
Expand Down Expand Up @@ -444,6 +448,49 @@ impl DbOps for SqliteDb {
Ok(res)
}

fn get_named_txs(
&self,
name: &str,
rpc_url: &str,
genesis_hash: FixedBytes<32>,
) -> Result<Vec<NamedTx>> {
let pool = self.get_pool()?;
let mut stmt = pool
.prepare(
"SELECT name, tx_hash, contract_address, rpc_url_id FROM named_txs WHERE name = ?1 AND rpc_url_id = (
SELECT id FROM rpc_urls WHERE url = ?2 AND genesis_hash = ?3
) ORDER BY id ASC",
)?;

let rows = stmt.query_map(
params![name, rpc_url, genesis_hash.to_string().to_lowercase()],
NamedTxRow::from_row,
)?;
let res = rows
.map(|r| r.map(|r| r.into()))
.map(|r| r.map_err(|e| e.into()))
.collect::<Result<Vec<NamedTx>>>()?;
Ok(res)
}

fn get_setup_progress(&self, scenario_hash: &str) -> Result<Option<u64>> {
self.query_row(
"SELECT last_step_index FROM setup_progress WHERE scenario_hash = ?1",
params![scenario_hash],
|row| row.get(0),
)
.ok()
.map(|res| Ok(Some(res)))
.unwrap_or(Ok(None))
}

fn update_setup_progress(&self, scenario_hash: &str, step_index: u64) -> Result<()> {
self.execute(
"INSERT INTO setup_progress (scenario_hash, last_step_index) VALUES (?1, ?2) ON CONFLICT(scenario_hash) DO UPDATE SET last_step_index = ?2",
params![scenario_hash, step_index],
)
}

fn get_named_tx_by_address(&self, address: &Address) -> Result<Option<NamedTx>> {
let pool = self.get_pool()?;
let mut stmt = pool
Expand Down Expand Up @@ -774,4 +821,46 @@ mod tests {
assert_eq!(fetched_report.gas_used(), req.gas_used);
assert_eq!(fetched_report.rpc_url_id(), req.rpc_url_id);
}

#[test]
fn verifies_resumable_methods() {
let db = SqliteDb::new_memory();
db.create_tables().unwrap();

// 1. Test get_named_txs ordering
let tx_hash = TxHash::from_slice(&[0u8; 32]);
let contract_address = Some(Address::from_slice(&[1u8; 20]));
let name = "multi_tx";
let rpc_url = "http://resumable:8545";

let tx1 = NamedTx::new(name.to_owned(), tx_hash, contract_address);
let tx2 = NamedTx::new(name.to_owned(), tx_hash, contract_address);

db.insert_named_txs(&[tx1], rpc_url, FixedBytes::default())
.unwrap();
std::thread::sleep(Duration::from_millis(10)); // Ensure ID order if relying on time, but mostly ID is auto-inc
db.insert_named_txs(&[tx2], rpc_url, FixedBytes::default())
.unwrap();

let txs = db
.get_named_txs(name, rpc_url, FixedBytes::default())
.unwrap();
assert_eq!(txs.len(), 2);
// IDs should be increasing. The returned struct doesn't have ID, but the order is ASC.
// If we assumed 1st inserted is 1st returned.
assert_eq!(txs[0].name, name);

// 2. Test setup_progress
let scenario_hash = "abc123hash";
let progress = db.get_setup_progress(scenario_hash).unwrap();
assert!(progress.is_none());

db.update_setup_progress(scenario_hash, 5).unwrap();
let progress = db.get_setup_progress(scenario_hash).unwrap();
assert_eq!(progress, Some(5));

db.update_setup_progress(scenario_hash, 6).unwrap();
let progress = db.get_setup_progress(scenario_hash).unwrap();
assert_eq!(progress, Some(6));
}
}
1 change: 1 addition & 0 deletions test_output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
^C