diff --git a/Cargo.lock b/Cargo.lock index ec85172..73a452e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1465,6 +1465,7 @@ dependencies = [ "alloy-rpc-types-eth", "alloy-rpc-types-trace", "alloy-serde", + "alloy-signer", "alloy-signer-local", "alloy-sol-macro", "alloy-sol-types", @@ -1477,6 +1478,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-proc-macros", "modular-bitfield", + "rand 0.8.5", "reth", "reth-basic-payload-builder", "reth-chainspec", @@ -1491,12 +1493,16 @@ dependencies = [ "reth-engine-local", "reth-engine-primitives", "reth-errors", + "reth-eth-wire-types", "reth-ethereum-cli", "reth-ethereum-engine-primitives", "reth-ethereum-payload-builder", "reth-ethereum-primitives", "reth-evm", "reth-evm-ethereum", + "reth-metrics", + "reth-network", + "reth-network-api", "reth-network-peers", "reth-node-api", "reth-node-builder", @@ -1513,9 +1519,11 @@ dependencies = [ "reth-rpc-eth-types", "reth-transaction-pool", "revm-inspectors", + "rusqlite", "serde", "serde_json", "sha2", + "tempfile", "test-fuzz", "thiserror 2.0.18", "tokio", @@ -1934,9 +1942,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] @@ -2677,7 +2685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.114", + "syn 1.0.109", ] [[package]] @@ -3203,6 +3211,18 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fast-float2" version = "0.2.3" @@ -4710,6 +4730,17 @@ dependencies = [ "redox_syscall 0.7.0", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.23" @@ -9457,6 +9488,20 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48fd7bd8a6377e15ad9d42a8ec25371b94ddc67abe7c8b9127bec79bebaaae18" +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags 2.10.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-hash" version = "2.1.1" @@ -10505,9 +10550,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", @@ -10529,9 +10574,9 @@ checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.26" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/Cargo.toml b/Cargo.toml index fefd8b9..5e6db65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ alloy-rlp = "0.3.10" alloy-rpc-types = "1.0.41" alloy-rpc-types-eth = "1.0.41" alloy-serde = "1.0.41" +alloy-signer = "1.0.41" alloy-signer-local = "1.0.41" alloy-sol-macro = "1.4.1" alloy-sol-types = "1.4.1" @@ -21,6 +22,8 @@ bytes = "1.10.1" clap = { version = "4.5.40", features = ["derive"] } derive_more = "2.0.1" eyre = "0.6.12" +rand = "0.8" +rusqlite = { version = "0.32", features = ["bundled"] } sha2 = "0.10" # rpc @@ -43,12 +46,16 @@ reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-engine-local = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-engine-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-eth-wire-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-ethereum-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-ethereum-engine-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-ethereum-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-ethereum-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-evm-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-metrics = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-network = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-network-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-network-peers = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } @@ -77,6 +84,7 @@ reth-e2e-test-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9 reth-rpc-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } revm-inspectors = "0.32.0" serde_json = "1.0" +tempfile = "3" test-fuzz = "7" [build-dependencies] diff --git a/src/args.rs b/src/args.rs new file mode 100644 index 0000000..954dfb5 --- /dev/null +++ b/src/args.rs @@ -0,0 +1,96 @@ +use clap::Args; +use std::path::PathBuf; + +const DEFAULT_POG_TIMEOUT_SECS: u64 = 120; +const DEFAULT_POG_REPUTATION_PENALTY: i32 = -25600; + +#[derive(Debug, Clone, Default, Args)] +#[command(next_help_heading = "Proof of Gossip")] +pub struct BerachainArgs { + #[arg(long = "pog.private-key-file")] + pub pog_private_key_file: Option, + + #[arg(long = "pog.timeout", default_value_t = DEFAULT_POG_TIMEOUT_SECS)] + pub pog_timeout: u64, + + #[arg( + long = "pog.reputation-penalty", + default_value_t = DEFAULT_POG_REPUTATION_PENALTY, + allow_hyphen_values = true + )] + pub pog_reputation_penalty: i32, +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::{Parser, error::ErrorKind}; + use std::path::PathBuf; + + #[derive(Parser)] + struct TestCli { + #[command(flatten)] + args: BerachainArgs, + } + + #[test] + fn test_defaults() { + let cli = TestCli::parse_from(["test"]); + assert_eq!(cli.args.pog_private_key_file, None); + assert_eq!(cli.args.pog_timeout, DEFAULT_POG_TIMEOUT_SECS); + assert_eq!(cli.args.pog_reputation_penalty, DEFAULT_POG_REPUTATION_PENALTY); + } + + #[test] + fn test_with_private_key_file() { + let key_file = "/tmp/pog.key"; + let cli = TestCli::parse_from(["test", "--pog.private-key-file", key_file]); + assert_eq!( + cli.args.pog_private_key_file.as_deref(), + Some(PathBuf::from(key_file).as_path()) + ); + assert_eq!(cli.args.pog_timeout, DEFAULT_POG_TIMEOUT_SECS); + } + + #[test] + fn test_with_all_flags() { + let key_file = "/tmp/pog.key"; + let cli = TestCli::parse_from([ + "test", + "--pog.private-key-file", + key_file, + "--pog.timeout", + "300", + "--pog.reputation-penalty", + "-50000", + ]); + assert_eq!( + cli.args.pog_private_key_file.as_deref(), + Some(PathBuf::from(key_file).as_path()) + ); + assert_eq!(cli.args.pog_timeout, 300); + assert_eq!(cli.args.pog_reputation_penalty, -50000); + } + + #[test] + fn test_with_explicit_reputation_penalty() { + let cli = TestCli::parse_from(["test", "--pog.reputation-penalty", "-10000"]); + assert_eq!(cli.args.pog_reputation_penalty, -10000); + } + + #[test] + fn test_feature_off_when_flag_absent() { + let cli = TestCli::parse_from(["test", "--pog.timeout", "60"]); + assert_eq!(cli.args.pog_private_key_file, None); + assert_eq!(cli.args.pog_timeout, 60); + } + + #[test] + fn test_rejects_old_private_key_flag() { + let key = format!("0x{:064x}", 1u64); + let parsed = TestCli::try_parse_from(["test", "--pog.private-key", &key]); + assert!(parsed.is_err()); + let err = parsed.err().expect("old flag should be rejected"); + assert_eq!(err.kind(), ErrorKind::UnknownArgument); + } +} diff --git a/src/lib.rs b/src/lib.rs index 30b4118..a918e94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ //! //! Built on Reth SDK with Ethereum compatibility plus Prague1 hardfork for minimum base fee. +pub mod args; pub mod chainspec; pub mod consensus; pub mod engine; @@ -11,6 +12,7 @@ pub mod hardforks; pub mod node; pub mod pool; pub mod primitives; +pub mod proof_of_gossip; pub mod rpc; pub mod transaction; pub mod version; diff --git a/src/main.rs b/src/main.rs index 1785785..4991844 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,19 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); use bera_reth::{ + args::BerachainArgs, chainspec::{BerachainChainSpec, BerachainChainSpecParser}, consensus::BerachainBeaconConsensus, evm::BerachainEvmFactory, node::{BerachainNode, evm::config::BerachainEvmConfig}, + proof_of_gossip::new_pog_service, version::init_bera_version, }; use clap::Parser; -use reth::CliRunner; -use reth_cli_commands::node::NoArgs; +use reth::{ + CliRunner, + chainspec::{ChainSpecProvider, EthChainSpec}, +}; use reth_ethereum_cli::Cli; use reth_node_builder::NodeHandle; use std::sync::Arc; @@ -37,15 +41,26 @@ fn main() { ) }; - if let Err(err) = Cli::::parse() + if let Err(err) = Cli::::parse() .with_runner_and_components::( CliRunner::try_default_runtime().expect("Failed to create default runtime"), cli_components_builder, - async move |builder, _| { + async move |builder, args| { info!(target: "reth::cli", "Launching Berachain node"); - let NodeHandle { node: _node, node_exit_future } = + let NodeHandle { node, node_exit_future } = builder.node(BerachainNode::default()).launch_with_debug_capabilities().await?; + if let Some(service) = new_pog_service( + node.network.clone(), + node.provider.clone(), + node.provider.chain_spec().chain().id(), + node.config.datadir().data_dir().to_path_buf(), + &args, + )? { + node.task_executor + .spawn_with_graceful_shutdown_signal(|shutdown| service.run(shutdown)); + } + node_exit_future.await }, ) diff --git a/src/proof_of_gossip.rs b/src/proof_of_gossip.rs new file mode 100644 index 0000000..a444285 --- /dev/null +++ b/src/proof_of_gossip.rs @@ -0,0 +1,1330 @@ +use crate::args::BerachainArgs; +use alloy_consensus::{EthereumTxEnvelope, SignableTransaction, TxEip1559}; +use alloy_primitives::{Address, Bytes, TxHash, U256}; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; +use rand::{Rng, seq::SliceRandom}; +use reth::providers::{BlockReaderIdExt, StateProviderFactory}; +use reth_eth_wire_types::NetworkPrimitives; +use reth_metrics::{ + Metrics, metrics, + metrics::{Counter, Gauge}, +}; +use reth_network::NetworkHandle; +use reth_network_api::{NetworkInfo, PeerInfo, Peers, ReputationChangeKind}; +use reth_network_peers::PeerId; +use rusqlite::{Connection, params}; +use std::{ + collections::{HashMap, HashSet}, + fs, + future::Future, + path::{Path, PathBuf}, + sync::{Arc, OnceLock}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; +use thiserror::Error; +use tokio::time::sleep; +use tracing::{info, warn}; + +const CANARY_GAS_LIMIT: u64 = 21000; +const MAX_FEE_BUFFER_MULTIPLIER: u128 = 2; +const CANARY_PRIORITY_FEE_WEI: u128 = 1_000_000_000; +const MIN_CANARY_VALUE: u64 = 1; +const MAX_CANARY_VALUE: u64 = 1000; +const LOOP_TICK_INTERVAL_SECS: u64 = 10; +const LATE_CONFIRMATION_TRACK_WINDOW_SECS: u64 = 900; +const MIN_FUNDING_BACKOFF_SECS: u64 = 30; +const MAX_FUNDING_BACKOFF_SECS: u64 = 86400; + +#[derive(Debug, Error)] +pub enum PogError { + #[error(transparent)] + Report(#[from] eyre::Report), +} + +pub type PogResult = std::result::Result; + +trait IntoPogResult { + fn into_pog(self) -> PogResult; +} + +impl IntoPogResult for std::result::Result +where + E: Into, +{ + fn into_pog(self) -> PogResult { + self.map_err(|err| PogError::from(err.into())) + } +} + +pub trait NetworkOps: Send + Sync { + fn is_syncing(&self) -> bool; + fn get_all_peers(&self) -> impl Future>> + Send; + fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind); + fn disconnect_peer(&self, peer: PeerId); + fn send_canary(&self, peer_id: PeerId, tx: crate::transaction::BerachainTxEnvelope); +} + +impl> + NetworkOps for NetworkHandle +{ + fn is_syncing(&self) -> bool { + NetworkInfo::is_syncing(self) + } + + async fn get_all_peers(&self) -> PogResult> { + Peers::get_all_peers(self).await.into_pog() + } + + fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { + Peers::reputation_change(self, peer_id, kind) + } + + fn disconnect_peer(&self, peer: PeerId) { + Peers::disconnect_peer(self, peer) + } + + fn send_canary(&self, peer_id: PeerId, tx: crate::transaction::BerachainTxEnvelope) { + NetworkHandle::send_transactions(self, peer_id, vec![Arc::new(tx)]) + } +} + +pub trait PogProvider: Send + Sync { + fn receipt_exists(&self, hash: TxHash) -> PogResult; + fn account_nonce(&self, address: &Address) -> PogResult>; + fn account_balance(&self, address: &Address) -> PogResult>; + fn latest_base_fee(&self) -> PogResult; +} + +impl

PogProvider for P +where + P: StateProviderFactory + + BlockReaderIdExt

+ + Send + + Sync, +{ + fn receipt_exists(&self, hash: TxHash) -> PogResult { + Ok(self.receipt_by_hash(hash).into_pog()?.is_some()) + } + + fn account_nonce(&self, address: &Address) -> PogResult> { + self.latest().into_pog()?.account_nonce(address).into_pog() + } + + fn account_balance(&self, address: &Address) -> PogResult> { + self.latest().into_pog()?.account_balance(address).into_pog() + } + + fn latest_base_fee(&self) -> PogResult { + let header = self + .latest_header() + .into_pog()? + .ok_or_else(|| eyre::eyre!("Failed to fetch latest block header"))? + .into_header(); + let base_fee = header + .base_fee_per_gas + .ok_or_else(|| eyre::eyre!("Latest block has no base fee - pre-EIP-1559 chain?"))?; + Ok(base_fee as u128) + } +} + +struct ActiveCanary { + tx_hash: TxHash, + peer_id: PeerId, + nonce: u64, + sent_at: Instant, +} + +struct TimedOutCanary { + peer_id: PeerId, + timed_out_at: Instant, +} + +#[derive(Metrics)] +#[metrics(scope = "bera_reth.pog")] +struct PoGMetrics { + #[metric(describe = "Canary transactions sent")] + canaries_sent_total: Counter, + #[metric(describe = "Canaries confirmed before timeout")] + canary_confirmed_total: Counter, + #[metric(describe = "Canaries timed out")] + canary_timeout_total: Counter, + #[metric(describe = "Timed-out canaries later confirmed")] + canary_late_confirmed_total: Counter, + #[metric(describe = "Reputation penalties applied")] + penalties_total: Counter, + #[metric(describe = "Peer bans/disconnects")] + bans_total: Counter, + #[metric(describe = "Active canaries")] + inflight_canaries: Gauge, +} + +fn pog_metrics() -> &'static PoGMetrics { + static METRICS: OnceLock = OnceLock::new(); + METRICS.get_or_init(PoGMetrics::default) +} + +fn load_pog_signer(private_key_path: &Path) -> PogResult { + let private_key = fs::read_to_string(private_key_path).map_err(|err| { + eyre::eyre!("Failed to read PoG private key file {}: {err}", private_key_path.display()) + })?; + let private_key = private_key.trim(); + + if private_key.is_empty() { + return Err(PogError::from(eyre::eyre!( + "PoG private key file {} is empty", + private_key_path.display() + ))); + } + + private_key.parse::().map_err(|err| { + PogError::from(eyre::eyre!( + "Invalid PoG private key in {}: {err}", + private_key_path.display() + )) + }) +} + +const fn normalize_reputation_penalty(input: i32) -> i32 { + if input > 0 { -input } else { input } +} + +pub struct ProofOfGossipService { + network: Network, + provider: Provider, + signer: PrivateKeySigner, + chain_id: u64, + db: Connection, + confirmed_peers: HashSet, + failure_counts: HashMap, + reputation_penalty: i32, + active: Option, + timed_out_canaries: HashMap, + nonce: u64, + timeout: Duration, + warned_syncing: bool, + funding_backoff: Option, + funding_backoff_secs: u64, +} + +impl ProofOfGossipService +where + Network: NetworkOps + 'static, + Provider: PogProvider + 'static, +{ + pub fn new( + network: Network, + provider: Provider, + chain_id: u64, + datadir: PathBuf, + args: &BerachainArgs, + ) -> PogResult> { + let Some(private_key_path) = &args.pog_private_key_file else { + return Ok(None); + }; + + let signer = load_pog_signer(private_key_path)?; + let address = signer.address(); + + let db_path = datadir.join("proof_of_gossip.db"); + let db = Connection::open(&db_path).into_pog()?; + + db.execute( + "CREATE TABLE IF NOT EXISTS peer_tests ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + peer_id TEXT NOT NULL, + tx_hash TEXT NOT NULL, + result TEXT NOT NULL, + tested_at INTEGER NOT NULL + )", + [], + ) + .into_pog()?; + + db.execute("CREATE INDEX IF NOT EXISTS idx_peer_tests_peer_id ON peer_tests(peer_id)", []) + .into_pog()?; + + db.pragma_update(None, "journal_mode", "WAL").into_pog()?; + + let confirmed_peers: HashSet = { + let mut stmt = db + .prepare( + "SELECT DISTINCT peer_id FROM peer_tests WHERE result IN ('confirmed', 'late_confirmed')", + ) + .into_pog()?; + stmt.query_map([], |row| { + let peer_id_str: String = row.get(0)?; + peer_id_str.parse::().map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 0, + rusqlite::types::Type::Text, + Box::new(e), + ) + }) + }) + .into_pog()? + .collect::>() + .into_pog()? + }; + + let failure_counts: HashMap = { + let mut stmt = db + .prepare( + "SELECT peer_id, COUNT(*) FROM peer_tests WHERE result = 'timeout' GROUP BY peer_id", + ) + .into_pog()?; + stmt.query_map([], |row| { + let peer_id_str: String = row.get(0)?; + let count: u32 = row.get(1)?; + Ok(( + peer_id_str.parse::().map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 0, + rusqlite::types::Type::Text, + Box::new(e), + ) + })?, + count, + )) + }) + .into_pog()? + .collect::>() + .into_pog()? + }; + + info!( + target: "bera_reth::pog", + address = %address, + confirmed_peers = confirmed_peers.len(), + failed_peers = failure_counts.len(), + "Proof of Gossip service initialized" + ); + pog_metrics().inflight_canaries.set(0.0); + + Ok(Some(Self { + network, + provider, + signer, + chain_id, + db, + confirmed_peers, + failure_counts, + reputation_penalty: normalize_reputation_penalty(args.pog_reputation_penalty), + active: None, + timed_out_canaries: HashMap::new(), + nonce: 0, + timeout: Duration::from_secs(args.pog_timeout), + warned_syncing: false, + funding_backoff: None, + funding_backoff_secs: 0, + })) + } + + pub async fn run(mut self, mut shutdown: reth::tasks::shutdown::GracefulShutdown) { + info!(target: "bera_reth::pog", "PoG service started"); + + loop { + tokio::select! { + guard = &mut shutdown => { + info!(target: "bera_reth::pog", "PoG service shutting down"); + pog_metrics().inflight_canaries.set(0.0); + drop(guard); + return; + } + _ = sleep(Duration::from_secs(LOOP_TICK_INTERVAL_SECS)) => { + if let Err(e) = self.tick().await { + warn!(target: "bera_reth::pog", error = %e, "Error in PoG service tick"); + } + } + } + } + } + + async fn tick(&mut self) -> PogResult<()> { + self.reconcile_late_confirmations()?; + self.drop_invalidated_active_canary()?; + + if self.network.is_syncing() { + if !self.warned_syncing { + info!(target: "bera_reth::pog", "PoG paused while node is syncing"); + self.warned_syncing = true; + } + + if let Some(active) = self.active.as_mut() { + active.sent_at = Instant::now(); + } + + return Ok(()); + } + + if self.warned_syncing { + info!(target: "bera_reth::pog", "PoG resumed after sync"); + self.warned_syncing = false; + } + + if let Some(deadline) = self.funding_backoff { + if Instant::now() < deadline { + return Ok(()); + } + self.funding_backoff = None; + } + + if let Some(canary) = &self.active { + let tx_hash = canary.tx_hash; + let peer_id = canary.peer_id; + let elapsed = canary.sent_at.elapsed(); + + if self.provider.receipt_exists(tx_hash)? { + info!( + target: "bera_reth::pog", + peer_id = %peer_id, + tx_hash = %tx_hash, + "Canary transaction confirmed" + ); + + self.active = None; + self.persist_result(&peer_id, tx_hash, "confirmed")?; + self.confirmed_peers.insert(peer_id); + pog_metrics().canary_confirmed_total.increment(1); + pog_metrics().inflight_canaries.set(0.0); + self.refresh_nonce()?; + } else if elapsed > self.timeout { + warn!( + target: "bera_reth::pog", + peer_id = %peer_id, + tx_hash = %tx_hash, + elapsed_secs = elapsed.as_secs(), + "Canary transaction timed out" + ); + + self.active = None; + self.persist_result(&peer_id, tx_hash, "timeout")?; + pog_metrics().canary_timeout_total.increment(1); + pog_metrics().inflight_canaries.set(0.0); + + let failure_count = + self.failure_counts.entry(peer_id).and_modify(|c| *c += 1).or_insert(1); + let failure_count = *failure_count; + + self.network.reputation_change( + peer_id, + ReputationChangeKind::Other(self.reputation_penalty), + ); + self.network.disconnect_peer(peer_id); + pog_metrics().penalties_total.increment(1); + pog_metrics().bans_total.increment(1); + + self.timed_out_canaries + .insert(tx_hash, TimedOutCanary { peer_id, timed_out_at: Instant::now() }); + + self.refresh_nonce()?; + + info!( + target: "bera_reth::pog", + nonce = self.nonce, + failure_count = failure_count, + "Re-queried on-chain nonce after timeout" + ); + } + } else if self.check_funding()? { + let all_peers = self.network.get_all_peers().await?; + + let eligible: Vec<_> = + all_peers.iter().filter(|p| !self.confirmed_peers.contains(&p.remote_id)).collect(); + + if let Some(peer) = eligible.choose(&mut rand::thread_rng()) { + let peer_id = peer.remote_id; + self.refresh_nonce()?; + let base_fee = match self.provider.latest_base_fee() { + Ok(base_fee) => base_fee, + Err(err) => { + warn!( + target: "bera_reth::pog", + peer_id = %peer_id, + error = %err, + fallback_base_fee = CANARY_PRIORITY_FEE_WEI, + "Failed to fetch base fee, using fallback" + ); + CANARY_PRIORITY_FEE_WEI + } + }; + let canary_tx = + create_canary_tx(&self.signer, self.nonce, self.chain_id, base_fee)?; + let tx_hash = *canary_tx.hash(); + let canary_nonce = self.nonce; + + self.network.send_canary(peer_id, canary_tx); + self.nonce = self.nonce.saturating_add(1); + pog_metrics().canaries_sent_total.increment(1); + pog_metrics().inflight_canaries.set(1.0); + + info!( + target: "bera_reth::pog", + peer_id = %peer_id, + tx_hash = %tx_hash, + nonce = canary_nonce, + "Sent canary transaction to peer" + ); + + self.active = Some(ActiveCanary { + tx_hash, + peer_id, + nonce: canary_nonce, + sent_at: Instant::now(), + }); + } + } + + Ok(()) + } + + fn check_funding(&mut self) -> PogResult { + let address = self.signer.address(); + let balance = self.provider.account_balance(&address)?; + let base_fee = match self.provider.latest_base_fee() { + Ok(base_fee) => base_fee, + Err(err) => { + warn!( + target: "bera_reth::pog", + address = %address, + error = %err, + fallback_base_fee = CANARY_PRIORITY_FEE_WEI, + "Failed to fetch base fee for funding check, using fallback" + ); + CANARY_PRIORITY_FEE_WEI + } + }; + let max_fee = (base_fee * MAX_FEE_BUFFER_MULTIPLIER).max(CANARY_PRIORITY_FEE_WEI + 1); + let min_balance = + U256::from(CANARY_GAS_LIMIT) * U256::from(max_fee) + U256::from(MAX_CANARY_VALUE); + + match balance { + Some(b) if b >= min_balance => { + self.funding_backoff_secs = 0; + Ok(true) + } + _ => { + self.funding_backoff_secs = if self.funding_backoff_secs == 0 { + MIN_FUNDING_BACKOFF_SECS + } else { + (self.funding_backoff_secs * 2).min(MAX_FUNDING_BACKOFF_SECS) + }; + self.funding_backoff = + Some(Instant::now() + Duration::from_secs(self.funding_backoff_secs)); + + warn!( + target: "bera_reth::pog", + address = %address, + balance = ?balance, + backoff_secs = self.funding_backoff_secs, + "PoG wallet underfunded, backing off" + ); + Ok(false) + } + } + } + + fn drop_invalidated_active_canary(&mut self) -> PogResult<()> { + let Some(active) = self.active.as_ref() else { + return Ok(()); + }; + + let Some(on_chain_nonce) = self.provider.account_nonce(&self.signer.address())? else { + return Ok(()); + }; + + if on_chain_nonce > active.nonce { + if self.provider.receipt_exists(active.tx_hash)? { + return Ok(()); + } + info!( + target: "bera_reth::pog", + tx_hash = %active.tx_hash, + active_nonce = active.nonce, + on_chain_nonce = on_chain_nonce, + "Discarding in-flight canary invalidated by nonce advance" + ); + self.active = None; + self.nonce = on_chain_nonce; + pog_metrics().inflight_canaries.set(0.0); + } + + Ok(()) + } + + fn refresh_nonce(&mut self) -> PogResult<()> { + let address = self.signer.address(); + self.nonce = self.provider.account_nonce(&address)?.ok_or_else(|| { + eyre::eyre!("PoG wallet {address} not found in state - is it funded?") + })?; + Ok(()) + } + + fn reconcile_late_confirmations(&mut self) -> PogResult<()> { + if self.timed_out_canaries.is_empty() { + return Ok(()); + } + + let mut confirmed_late = Vec::new(); + for (&tx_hash, timed_out) in &self.timed_out_canaries { + if self.provider.receipt_exists(tx_hash)? { + confirmed_late.push((tx_hash, timed_out.peer_id)); + } + } + + for (tx_hash, peer_id) in confirmed_late { + self.persist_result(&peer_id, tx_hash, "late_confirmed")?; + self.confirmed_peers.insert(peer_id); + self.timed_out_canaries.remove(&tx_hash); + pog_metrics().canary_late_confirmed_total.increment(1); + info!( + target: "bera_reth::pog", + peer_id = %peer_id, + tx_hash = %tx_hash, + "Timed-out canary confirmed later; marked peer as confirmed" + ); + } + + let window = Duration::from_secs(LATE_CONFIRMATION_TRACK_WINDOW_SECS); + self.timed_out_canaries.retain(|_, timed_out| timed_out.timed_out_at.elapsed() <= window); + + Ok(()) + } + + fn persist_result(&mut self, peer_id: &PeerId, tx_hash: TxHash, result: &str) -> PogResult<()> { + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).into_pog()?.as_secs() as i64; + + self.db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_id.to_string(), tx_hash.to_string(), result, timestamp], + ) + .into_pog()?; + + Ok(()) + } +} + +pub fn new_pog_service( + network: Network, + provider: Provider, + chain_id: u64, + datadir: PathBuf, + args: &BerachainArgs, +) -> PogResult>> +where + Network: NetworkOps + 'static, + Provider: PogProvider + 'static, +{ + ProofOfGossipService::new(network, provider, chain_id, datadir, args) +} + +pub fn create_canary_tx( + signer: &PrivateKeySigner, + nonce: u64, + chain_id: u64, + base_fee: u128, +) -> PogResult { + let to = signer.address(); + let value = rand::thread_rng().gen_range(MIN_CANARY_VALUE..=MAX_CANARY_VALUE); + let max_priority_fee_per_gas = CANARY_PRIORITY_FEE_WEI; + let max_fee_per_gas = (base_fee * MAX_FEE_BUFFER_MULTIPLIER).max(max_priority_fee_per_gas + 1); + + let tx = TxEip1559 { + chain_id, + nonce, + gas_limit: CANARY_GAS_LIMIT, + max_fee_per_gas, + max_priority_fee_per_gas, + to: alloy_primitives::TxKind::Call(to), + value: U256::from(value), + access_list: Default::default(), + input: Bytes::default(), + }; + + let signature = signer.sign_hash_sync(&tx.signature_hash()).into_pog()?; + let signed = tx.into_signed(signature); + let eth_envelope = EthereumTxEnvelope::Eip1559(signed); + + Ok(crate::transaction::BerachainTxEnvelope::Ethereum(eth_envelope)) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::Transaction; + use alloy_primitives::B256; + use std::{fs, sync::Mutex}; + use tempfile::NamedTempFile; + + const ONE_BERA: U256 = U256::from_limbs([1_000_000_000_000_000_000, 0, 0, 0]); + const TEST_CHAIN_ID: u64 = 80094; + const TEST_TIMEOUT_SECS: u64 = 120; + const TEST_REPUTATION_PENALTY: i32 = -25600; + + fn test_signer() -> PrivateKeySigner { + format!("0x{:064x}", 1u64).parse().unwrap() + } + + fn write_key_file(contents: &str) -> NamedTempFile { + let key_file = NamedTempFile::new().unwrap(); + fs::write(key_file.path(), contents).unwrap(); + key_file + } + + #[test] + fn test_load_pog_signer_from_file_with_0x_prefix() { + let key_file = write_key_file(&format!("0x{:064x}", 1u64)); + let signer = load_pog_signer(key_file.path()).unwrap(); + assert_eq!(signer.address(), test_signer().address()); + } + + #[test] + fn test_load_pog_signer_from_file_without_0x_prefix() { + let key_file = write_key_file(&format!("{:064x}", 1u64)); + let signer = load_pog_signer(key_file.path()).unwrap(); + assert_eq!(signer.address(), test_signer().address()); + } + + #[test] + fn test_load_pog_signer_trims_whitespace() { + let key_file = write_key_file(&format!("\n\t 0x{:064x} \n", 1u64)); + let signer = load_pog_signer(key_file.path()).unwrap(); + assert_eq!(signer.address(), test_signer().address()); + } + + #[test] + fn test_load_pog_signer_missing_file_errors() { + let missing_path = std::env::temp_dir().join(format!("pog-missing-{}", B256::random())); + let err = load_pog_signer(&missing_path).unwrap_err(); + assert!(err.to_string().contains("Failed to read PoG private key file")); + } + + #[test] + fn test_load_pog_signer_empty_file_errors() { + let key_file = write_key_file(" \n\t "); + let err = load_pog_signer(key_file.path()).unwrap_err(); + assert!(err.to_string().contains("is empty")); + } + + #[test] + fn test_load_pog_signer_malformed_key_errors() { + let key_file = write_key_file("not-a-private-key"); + let err = load_pog_signer(key_file.path()).unwrap_err(); + assert!(err.to_string().contains("Invalid PoG private key in")); + } + + #[test] + fn test_normalize_reputation_penalty_handles_i32_min() { + assert_eq!(normalize_reputation_penalty(100), -100); + assert_eq!(normalize_reputation_penalty(-100), -100); + assert_eq!(normalize_reputation_penalty(0), 0); + assert_eq!(normalize_reputation_penalty(i32::MIN), i32::MIN); + } + + #[test] + fn test_canary_tx_construction() { + let signer = test_signer(); + let nonce = 42; + let chain_id = TEST_CHAIN_ID; + let base_fee = 1_000_000_000; + + let tx = create_canary_tx(&signer, nonce, chain_id, base_fee).unwrap(); + + let eth_envelope = match &tx { + crate::transaction::BerachainTxEnvelope::Ethereum(eth) => eth, + _ => panic!("Expected Ethereum transaction"), + }; + + let inner = match eth_envelope { + EthereumTxEnvelope::Eip1559(signed) => signed, + _ => panic!("Expected EIP-1559 transaction"), + }; + + assert_eq!(inner.to(), Some(signer.address())); + assert!(inner.value() >= U256::from(MIN_CANARY_VALUE)); + assert!(inner.value() <= U256::from(MAX_CANARY_VALUE)); + assert_eq!(inner.gas_limit(), CANARY_GAS_LIMIT); + assert_eq!(inner.nonce(), nonce); + assert_eq!(inner.chain_id(), Some(chain_id)); + + let recovered = inner.recover_signer().unwrap(); + assert_eq!(recovered, signer.address()); + } + + #[test] + fn test_sqlite_persistence() { + let temp_file = NamedTempFile::new().unwrap(); + let db = create_test_db(temp_file.path()); + + let peer_id = PeerId::random(); + let tx_hash = B256::random(); + let timestamp = 1234567890i64; + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_id.to_string(), tx_hash.to_string(), "confirmed", timestamp], + ) + .unwrap(); + + let mut stmt = + db.prepare("SELECT peer_id, tx_hash, result, tested_at FROM peer_tests").unwrap(); + let mut rows = stmt.query([]).unwrap(); + + let row = rows.next().unwrap().unwrap(); + let loaded_peer_id: String = row.get(0).unwrap(); + let loaded_tx_hash: String = row.get(1).unwrap(); + let loaded_result: String = row.get(2).unwrap(); + let loaded_timestamp: i64 = row.get(3).unwrap(); + + assert_eq!(loaded_peer_id, peer_id.to_string()); + assert_eq!(loaded_tx_hash, tx_hash.to_string()); + assert_eq!(loaded_result, "confirmed"); + assert_eq!(loaded_timestamp, timestamp); + } + + #[test] + fn test_sqlite_reload() { + let temp_file = NamedTempFile::new().unwrap(); + + { + let db = create_test_db(temp_file.path()); + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![PeerId::random().to_string(), B256::random().to_string(), "timeout", 9999999], + ) + .unwrap(); + } + + let db = Connection::open(temp_file.path()).unwrap(); + let mut stmt = db.prepare("SELECT peer_id FROM peer_tests").unwrap(); + let count = stmt.query_map([], |_| Ok(())).unwrap().count(); + + assert_eq!(count, 1); + } + + fn create_test_db(path: &std::path::Path) -> Connection { + let db = Connection::open(path).unwrap(); + db.execute( + "CREATE TABLE IF NOT EXISTS peer_tests ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + peer_id TEXT NOT NULL, + tx_hash TEXT NOT NULL, + result TEXT NOT NULL, + tested_at INTEGER NOT NULL + )", + [], + ) + .unwrap(); + db + } + + #[test] + fn test_confirmed_excludes_from_eligible() { + let temp_file = NamedTempFile::new().unwrap(); + let db = create_test_db(temp_file.path()); + + let confirmed = PeerId::random(); + let timed_out = PeerId::random(); + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![confirmed.to_string(), B256::random().to_string(), "confirmed", 1000], + ) + .unwrap(); + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![timed_out.to_string(), B256::random().to_string(), "timeout", 2000], + ) + .unwrap(); + + let confirmed_peers: HashSet = { + let mut stmt = db + .prepare("SELECT DISTINCT peer_id FROM peer_tests WHERE result IN ('confirmed', 'late_confirmed')") + .unwrap(); + stmt.query_map([], |row| { + let s: String = row.get(0)?; + Ok(s.parse::().unwrap()) + }) + .unwrap() + .collect::>() + .unwrap() + }; + + assert!(confirmed_peers.contains(&confirmed)); + assert!(!confirmed_peers.contains(&timed_out)); + } + + #[test] + fn test_failure_count_reload() { + let temp_file = NamedTempFile::new().unwrap(); + let db = create_test_db(temp_file.path()); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + for _ in 0..3 { + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_a.to_string(), B256::random().to_string(), "timeout", 1000], + ) + .unwrap(); + } + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_b.to_string(), B256::random().to_string(), "timeout", 2000], + ) + .unwrap(); + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_b.to_string(), B256::random().to_string(), "confirmed", 3000], + ) + .unwrap(); + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_c.to_string(), B256::random().to_string(), "confirmed", 4000], + ) + .unwrap(); + + let failure_counts: HashMap = { + let mut stmt = db + .prepare( + "SELECT peer_id, COUNT(*) FROM peer_tests WHERE result = 'timeout' GROUP BY peer_id", + ) + .unwrap(); + stmt.query_map([], |row| { + let s: String = row.get(0)?; + let count: u32 = row.get(1)?; + Ok((s.parse::().unwrap(), count)) + }) + .unwrap() + .collect::>() + .unwrap() + }; + + let confirmed_peers: HashSet = { + let mut stmt = db + .prepare("SELECT DISTINCT peer_id FROM peer_tests WHERE result IN ('confirmed', 'late_confirmed')") + .unwrap(); + stmt.query_map([], |row| { + let s: String = row.get(0)?; + Ok(s.parse::().unwrap()) + }) + .unwrap() + .collect::>() + .unwrap() + }; + + assert_eq!(failure_counts.get(&peer_a), Some(&3)); + assert_eq!(failure_counts.get(&peer_b), Some(&1)); + assert_eq!(failure_counts.get(&peer_c), None); + + assert!(!confirmed_peers.contains(&peer_a)); + assert!(confirmed_peers.contains(&peer_b)); + assert!(confirmed_peers.contains(&peer_c)); + } + + #[test] + fn test_sqlite_multiple_results_per_peer() { + let temp_file = NamedTempFile::new().unwrap(); + let db = create_test_db(temp_file.path()); + + let peer_id = PeerId::random(); + let tx_hash1 = B256::random(); + let tx_hash2 = B256::random(); + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_id.to_string(), tx_hash1.to_string(), "timeout", 1111111], + ) + .unwrap(); + + db.execute( + "INSERT INTO peer_tests (peer_id, tx_hash, result, tested_at) VALUES (?1, ?2, ?3, ?4)", + params![peer_id.to_string(), tx_hash2.to_string(), "confirmed", 2222222], + ) + .unwrap(); + + let mut stmt = db.prepare("SELECT COUNT(*) FROM peer_tests WHERE peer_id = ?1").unwrap(); + let count: i64 = stmt.query_row(params![peer_id.to_string()], |row| row.get(0)).unwrap(); + assert_eq!(count, 2); + } + + #[derive(Default)] + struct MockProviderState { + receipts: HashSet, + nonce: Option, + balance: Option, + base_fee: u128, + fail_base_fee: bool, + } + + struct MockProvider { + state: Mutex, + } + + impl MockProvider { + fn new(nonce: u64, balance: U256, base_fee: u128) -> Self { + Self { + state: Mutex::new(MockProviderState { + receipts: HashSet::new(), + nonce: Some(nonce), + balance: Some(balance), + base_fee, + fail_base_fee: false, + }), + } + } + + fn add_receipt(&self, hash: TxHash) { + self.state.lock().unwrap().receipts.insert(hash); + } + + fn set_balance(&self, balance: U256) { + self.state.lock().unwrap().balance = Some(balance); + } + + fn set_nonce(&self, nonce: u64) { + self.state.lock().unwrap().nonce = Some(nonce); + } + + fn set_base_fee_error(&self, fail: bool) { + self.state.lock().unwrap().fail_base_fee = fail; + } + } + + impl PogProvider for MockProvider { + fn receipt_exists(&self, hash: TxHash) -> PogResult { + Ok(self.state.lock().unwrap().receipts.contains(&hash)) + } + + fn account_nonce(&self, _address: &Address) -> PogResult> { + Ok(self.state.lock().unwrap().nonce) + } + + fn account_balance(&self, _address: &Address) -> PogResult> { + Ok(self.state.lock().unwrap().balance) + } + + fn latest_base_fee(&self) -> PogResult { + let state = self.state.lock().unwrap(); + if state.fail_base_fee { + return Err(eyre::eyre!("mock base fee error").into()); + } + Ok(state.base_fee) + } + } + + struct MockNetworkState { + syncing: bool, + peers: Vec, + sent_canaries: Vec<(PeerId, TxHash)>, + reputation_changes: Vec<(PeerId, i32)>, + disconnected: Vec, + } + + struct MockNetwork { + state: Mutex, + } + + impl MockNetwork { + fn new(peer_ids: Vec) -> Self { + let peers = peer_ids.into_iter().map(make_peer_info).collect(); + + Self { + state: Mutex::new(MockNetworkState { + syncing: false, + peers, + sent_canaries: Vec::new(), + reputation_changes: Vec::new(), + disconnected: Vec::new(), + }), + } + } + + fn set_syncing(&self, syncing: bool) { + self.state.lock().unwrap().syncing = syncing; + } + + fn sent_canaries(&self) -> Vec<(PeerId, TxHash)> { + self.state.lock().unwrap().sent_canaries.clone() + } + + fn reputation_changes(&self) -> Vec<(PeerId, i32)> { + self.state.lock().unwrap().reputation_changes.clone() + } + + fn disconnected_peers(&self) -> Vec { + self.state.lock().unwrap().disconnected.clone() + } + } + + impl NetworkOps for MockNetwork { + fn is_syncing(&self) -> bool { + self.state.lock().unwrap().syncing + } + + async fn get_all_peers(&self) -> PogResult> { + Ok(self.state.lock().unwrap().peers.clone()) + } + + fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { + if let ReputationChangeKind::Other(val) = kind { + self.state.lock().unwrap().reputation_changes.push((peer_id, val)); + } + } + + fn disconnect_peer(&self, peer: PeerId) { + self.state.lock().unwrap().disconnected.push(peer); + } + + fn send_canary(&self, peer_id: PeerId, tx: crate::transaction::BerachainTxEnvelope) { + let tx_hash = *tx.hash(); + self.state.lock().unwrap().sent_canaries.push((peer_id, tx_hash)); + } + } + + fn make_peer_info(id: PeerId) -> PeerInfo { + use reth_eth_wire_types::{ + Capability, EthVersion, UnifiedStatus, capability::Capabilities, + }; + PeerInfo { + capabilities: Arc::new(Capabilities::from(vec![Capability::eth(EthVersion::Eth68)])), + remote_id: id, + client_version: Arc::from("test/1.0"), + enode: String::new(), + enr: None, + remote_addr: "127.0.0.1:30303".parse().unwrap(), + local_addr: None, + direction: reth_network_api::Direction::Incoming, + eth_version: EthVersion::Eth68, + status: Arc::new(UnifiedStatus::default()), + session_established: Instant::now(), + kind: reth_network_api::PeerKind::Basic, + } + } + + fn make_service( + network: MockNetwork, + provider: MockProvider, + db_path: &std::path::Path, + ) -> ProofOfGossipService { + let db = create_test_db(db_path); + let signer = test_signer(); + + ProofOfGossipService { + network, + provider, + signer, + chain_id: TEST_CHAIN_ID, + db, + confirmed_peers: HashSet::new(), + failure_counts: HashMap::new(), + reputation_penalty: TEST_REPUTATION_PENALTY, + active: None, + timed_out_canaries: HashMap::new(), + nonce: 0, + timeout: Duration::from_secs(TEST_TIMEOUT_SECS), + warned_syncing: false, + funding_backoff: None, + funding_backoff_secs: 0, + } + } + + #[tokio::test] + async fn test_tick_skips_when_syncing() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + network.set_syncing(true); + + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + + assert!(service.network.sent_canaries().is_empty()); + assert!(service.warned_syncing); + } + + #[tokio::test] + async fn test_tick_sends_canary_to_untested_peer() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + + let canaries = service.network.sent_canaries(); + assert_eq!(canaries.len(), 1); + assert_eq!(canaries[0].0, peer); + assert!(service.active.is_some()); + } + + #[tokio::test] + async fn test_tick_discards_invalidated_active_canary_without_penalty() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(7, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + let active = service.active.as_ref().unwrap(); + assert_eq!(active.nonce, 7); + assert_eq!(service.nonce, 8); + + service.provider.set_nonce(9); + service.tick().await.unwrap(); + + let active = service.active.as_ref().unwrap(); + assert_eq!(active.nonce, 9); + assert_eq!(service.nonce, 10); + assert_eq!(service.network.sent_canaries().len(), 2); + assert!(service.network.reputation_changes().is_empty()); + assert!(service.network.disconnected_peers().is_empty()); + } + + #[tokio::test] + async fn test_tick_uses_base_fee_fallback_when_provider_errors() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + service.provider.set_base_fee_error(true); + + service.tick().await.unwrap(); + + let canaries = service.network.sent_canaries(); + assert_eq!(canaries.len(), 1); + assert_eq!(canaries[0].0, peer); + } + + #[tokio::test] + async fn test_tick_confirms_canary() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + let tx_hash = service.active.as_ref().unwrap().tx_hash; + + service.provider.add_receipt(tx_hash); + service.provider.set_nonce(1); + service.tick().await.unwrap(); + + assert!(service.active.is_none()); + assert!(service.confirmed_peers.contains(&peer)); + } + + #[tokio::test] + async fn test_tick_times_out_and_penalizes() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + service.timeout = Duration::from_millis(0); + + service.tick().await.unwrap(); + assert!(service.active.is_some()); + + sleep(Duration::from_millis(10)).await; + service.tick().await.unwrap(); + + assert!(service.active.is_none()); + assert!(!service.confirmed_peers.contains(&peer)); + + let penalties = service.network.reputation_changes(); + assert_eq!(penalties.len(), 1); + assert_eq!(penalties[0].0, peer); + assert_eq!(penalties[0].1, TEST_REPUTATION_PENALTY); + + let disconnected = service.network.disconnected_peers(); + assert_eq!(disconnected, vec![peer]); + } + + #[tokio::test] + async fn test_tick_late_confirmation_reconciles() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + service.timeout = Duration::from_millis(0); + + service.tick().await.unwrap(); + let tx_hash = service.active.as_ref().unwrap().tx_hash; + + sleep(Duration::from_millis(10)).await; + service.tick().await.unwrap(); + assert!(service.timed_out_canaries.contains_key(&tx_hash)); + assert!(!service.confirmed_peers.contains(&peer)); + + service.provider.add_receipt(tx_hash); + service.tick().await.unwrap(); + + assert!(!service.timed_out_canaries.contains_key(&tx_hash)); + assert!(service.confirmed_peers.contains(&peer)); + } + + #[tokio::test] + async fn test_tick_skips_confirmed_peers() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, ONE_BERA, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.confirmed_peers.insert(peer); + service.tick().await.unwrap(); + + assert!(service.network.sent_canaries().is_empty()); + } + + #[tokio::test] + async fn test_tick_backs_off_when_underfunded() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, U256::ZERO, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + + assert!(service.network.sent_canaries().is_empty()); + assert!(service.funding_backoff.is_some()); + assert_eq!(service.funding_backoff_secs, MIN_FUNDING_BACKOFF_SECS); + + service.funding_backoff = Some(Instant::now() - Duration::from_secs(1)); + service.tick().await.unwrap(); + + assert_eq!(service.funding_backoff_secs, MIN_FUNDING_BACKOFF_SECS * 2); + } + + #[tokio::test] + async fn test_funding_backoff_resets_on_fund() { + let temp_file = NamedTempFile::new().unwrap(); + let peer = PeerId::random(); + let network = MockNetwork::new(vec![peer]); + let provider = MockProvider::new(0, U256::ZERO, 1_000_000_000); + let mut service = make_service(network, provider, temp_file.path()); + + service.tick().await.unwrap(); + assert_eq!(service.funding_backoff_secs, MIN_FUNDING_BACKOFF_SECS); + + service.provider.set_balance(ONE_BERA); + service.funding_backoff = Some(Instant::now() - Duration::from_secs(1)); + service.tick().await.unwrap(); + + assert_eq!(service.funding_backoff_secs, 0); + assert!(!service.network.sent_canaries().is_empty()); + } +}