From 3f450dcdb2d6b5c451f4c2f169ce6a4b45b12ee6 Mon Sep 17 00:00:00 2001 From: Ro Ma Date: Tue, 3 Mar 2026 11:18:38 +0100 Subject: [PATCH] [ENG-756] feat(daemon): UTXO performance optimizations for large-scale wallets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses critical scaling bottlenecks for wallets with 10M+ UTXOs: - Snapshot double-buffering (RwLock>) eliminates 23% reader unavailability caused by exclusive Mutex lock during sort operations - Mempool overlay view (UtxoStateView) replaces full UTXO set cloning - O(N×M) to O(N+M) transaction filtering via HashSet pre-build - Incremental round-robin address scanning instead of O(N) full rescan - Address derivation caching with direct ChildNumber paths and pre-sorted multisig keys - Versioned monitored address cache avoids rebuild/parse per sync tick - Fix sync interval default mismatch (struct Default was 10ms vs clap 10s) - Add benchmarks for address scaling, UTXO scaling, lock contention, and stress testing at production scale --- Cargo.toml | 1 + common/src/addresses.rs | 56 ++ common/src/model.rs | 2 +- common/src/proto_convert.rs | 23 + daemon/Cargo.toml | 31 +- daemon/benches/address_scaling.rs | 246 +++++++ daemon/benches/from_addresses_filter.rs | 77 ++ daemon/benches/utxo_contention.rs | 191 +++++ daemon/benches/utxo_scaling.rs | 108 +++ daemon/src/address_manager.rs | 282 +++++++- daemon/src/args.rs | 247 ++++--- daemon/src/bin/kaswallet_stress_bench.rs | 385 ++++++++++ daemon/src/daemon.rs | 4 +- daemon/src/service/broadcast.rs | 5 +- daemon/src/service/common.rs | 7 +- .../service/create_unsigned_transaction.rs | 24 +- daemon/src/service/get_balance.rs | 64 +- daemon/src/service/get_utxos.rs | 137 ++-- daemon/src/service/kaswallet_service.rs | 4 +- daemon/src/service/send.rs | 17 +- daemon/src/sync_manager.rs | 163 ++++- daemon/src/transaction_generator.rs | 89 ++- daemon/src/utxo_manager.rs | 680 ++++++++++++++---- 23 files changed, 2378 insertions(+), 465 deletions(-) create mode 100644 daemon/benches/address_scaling.rs create mode 100644 daemon/benches/from_addresses_filter.rs create mode 100644 daemon/benches/utxo_contention.rs create mode 100644 daemon/benches/utxo_scaling.rs create mode 100644 daemon/src/bin/kaswallet_stress_bench.rs diff --git a/Cargo.toml b/Cargo.toml index 2b4dbf3..a03826b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,3 +82,4 @@ rust_decimal = "1.36" # Pin wasm-bindgen/js-sys to avoid version mismatch with rusty-kaspa transitive deps (ENG-746) wasm-bindgen = "=0.2.100" js-sys = "=0.3.70" +criterion = "0.5.1" diff --git a/common/src/addresses.rs b/common/src/addresses.rs index 8802567..28b1e42 100644 --- a/common/src/addresses.rs +++ b/common/src/addresses.rs @@ -28,7 +28,20 @@ pub fn multisig_address( ) -> WalletResult
{ let mut sorted_extended_public_keys = extended_public_keys.as_ref().clone(); sorted_extended_public_keys.sort(); + multisig_address_from_sorted_keys( + &sorted_extended_public_keys, + minimum_signatures, + prefix, + derivation_path, + ) +} +pub fn multisig_address_from_sorted_keys( + sorted_extended_public_keys: &[ExtendedPublicKey], + minimum_signatures: usize, + prefix: Prefix, + derivation_path: &DerivationPath, +) -> WalletResult
{ let mut signing_public_keys = Vec::with_capacity(sorted_extended_public_keys.len()); for x_public_key in sorted_extended_public_keys.iter() { let derived_key = x_public_key @@ -46,3 +59,46 @@ pub fn multisig_address( .to_wallet_result_internal()?; Ok(address) } + +#[cfg(test)] +mod tests { + use super::{multisig_address, multisig_address_from_sorted_keys}; + use crate::keys::master_key_path; + use kaspa_addresses::Prefix; + use kaspa_bip32::secp256k1::SecretKey; + use kaspa_bip32::{DerivationPath, ExtendedPrivateKey, Language, Mnemonic}; + use std::sync::Arc; + + fn xpub_from_mnemonic( + phrase: &str, + ) -> kaspa_bip32::ExtendedPublicKey { + let mnemonic = Mnemonic::new(phrase, Language::English).unwrap(); + let seed = mnemonic.to_seed(""); + let xprv = ExtendedPrivateKey::::new(seed).unwrap(); + let xprv = xprv.derive_path(&master_key_path(true)).unwrap(); + xprv.public_key() + } + + #[test] + fn multisig_address_sorted_helper_matches_existing_function() { + let xpub1 = xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + ); + let xpub2 = xpub_from_mnemonic( + "legal winner thank year wave sausage worth useful legal winner thank yellow", + ); + + let derivation_path: DerivationPath = "m/0/0/0".parse().unwrap(); + let unsorted = Arc::new(vec![xpub2.clone(), xpub1.clone()]); + + let expected = multisig_address(unsorted, 2, Prefix::Devnet, &derivation_path).unwrap(); + + let mut sorted = vec![xpub2, xpub1]; + sorted.sort(); + let actual = + multisig_address_from_sorted_keys(&sorted, 2, Prefix::Devnet, &derivation_path) + .unwrap(); + + assert_eq!(expected.to_string(), actual.to_string()); + } +} diff --git a/common/src/model.rs b/common/src/model.rs index 3d91999..64e3fbb 100644 --- a/common/src/model.rs +++ b/common/src/model.rs @@ -37,7 +37,7 @@ impl WalletAddress { } } -#[derive(Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct WalletOutpoint { pub transaction_id: Hash, pub index: u32, diff --git a/common/src/proto_convert.rs b/common/src/proto_convert.rs index 1b1442b..9fb0ecb 100644 --- a/common/src/proto_convert.rs +++ b/common/src/proto_convert.rs @@ -128,6 +128,20 @@ impl From for ProtoUtxoEntry { } } +impl From<&WalletUtxoEntry> for ProtoUtxoEntry { + fn from(value: &WalletUtxoEntry) -> ProtoUtxoEntry { + ProtoUtxoEntry { + amount: value.amount, + script_public_key: Some(ProtoScriptPublicKey { + version: value.script_public_key.version as u32, + script_public_key: hex::encode(value.script_public_key.script()), + }), + block_daa_score: value.block_daa_score, + is_coinbase: value.is_coinbase, + } + } +} + pub fn utxo_entry_to_proto(value: UtxoEntry) -> ProtoUtxoEntry { ProtoUtxoEntry { amount: value.amount, @@ -148,6 +162,15 @@ pub fn utxo_entry_from_proto(value: ProtoUtxoEntry) -> UtxoEntry { } impl WalletUtxo { + pub fn to_proto(&self, is_pending: bool, is_dust: bool) -> ProtoUtxo { + ProtoUtxo { + outpoint: Some(self.outpoint.clone().into()), + utxo_entry: Some(ProtoUtxoEntry::from(&self.utxo_entry)), + is_pending, + is_dust, + } + } + pub fn into_proto(self, is_pending: bool, is_dust: bool) -> ProtoUtxo { ProtoUtxo { outpoint: Some(self.outpoint.into()), diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index bda45e2..b319f99 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -12,6 +12,11 @@ repository.workspace = true name = "kaswallet-daemon" path = "src/main.rs" +[[bin]] +name = "kaswallet-stress-bench" +path = "src/bin/kaswallet_stress_bench.rs" +required-features = ["bench"] + [dependencies] kaswallet-common.workspace = true kaswallet-proto.workspace = true @@ -36,7 +41,31 @@ thiserror.workspace = true wasm-bindgen.workspace = true js-sys.workspace = true +[features] +bench = [] + [dev-dependencies] kaspa-consensus.workspace = true kaspa-hashes.workspace = true -rstest.workspace = true \ No newline at end of file +rstest.workspace = true +criterion.workspace = true + +[[bench]] +name = "address_scaling" +harness = false +required-features = ["bench"] + +[[bench]] +name = "utxo_scaling" +harness = false +required-features = ["bench"] + +[[bench]] +name = "from_addresses_filter" +harness = false +required-features = ["bench"] + +[[bench]] +name = "utxo_contention" +harness = false +required-features = ["bench"] diff --git a/daemon/benches/address_scaling.rs b/daemon/benches/address_scaling.rs new file mode 100644 index 0000000..521120b --- /dev/null +++ b/daemon/benches/address_scaling.rs @@ -0,0 +1,246 @@ +use common::addresses::{multisig_address, multisig_address_from_sorted_keys}; +use common::keys::{Keys, master_key_path}; +use common::model::{Keychain, WalletAddress}; +use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use kaspa_addresses::{Address, Prefix as AddressPrefix, Version}; +use kaspa_bip32::secp256k1::SecretKey; +use kaspa_bip32::{ + DerivationPath, ExtendedPrivateKey, ExtendedPublicKey, Language, Mnemonic, Prefix as XPubPrefix, +}; +use kaswallet_daemon::address_manager::AddressManager; +use std::sync::Arc; +use tokio::runtime::Runtime; + +fn xpub_from_mnemonic( + phrase: &str, + is_multisig: bool, +) -> ExtendedPublicKey { + let mnemonic = Mnemonic::new(phrase, Language::English).unwrap(); + let seed = mnemonic.to_seed(""); + let xprv = ExtendedPrivateKey::::new(seed).unwrap(); + let xprv = xprv.derive_path(&master_key_path(is_multisig)).unwrap(); + xprv.public_key() +} + +fn make_keys( + public_keys: Vec>, + minimum_signatures: u16, +) -> Arc { + Arc::new(Keys::new( + "bench-unused-keys.json".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + public_keys, + 0, + 0, + minimum_signatures, + 0, + )) +} + +fn baseline_calculate_address_path( + wallet_address: &WalletAddress, + is_multisig: bool, +) -> DerivationPath { + let keychain_number = wallet_address.keychain.clone() as u32; + let path_string = if is_multisig { + format!( + "m/{}/{}/{}", + wallet_address.cosigner_index, keychain_number, wallet_address.index + ) + } else { + format!("m/{}/{}", keychain_number, wallet_address.index) + }; + path_string.parse().unwrap() +} + +fn bench_calculate_address_path(c: &mut Criterion) { + let keys = make_keys( + vec![xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + false, + )], + 1, + ); + let manager = AddressManager::new(keys, AddressPrefix::Mainnet); + + let wallet_addresses: Vec = (0..256) + .map(|i| WalletAddress::new(i, 0, Keychain::External)) + .collect(); + + c.bench_function("calculate_address_path/new (singlesig)", |b| { + b.iter(|| { + for wa in &wallet_addresses { + black_box(manager.calculate_address_path(wa).unwrap()); + } + }) + }); + + c.bench_function("calculate_address_path/baseline parse (singlesig)", |b| { + b.iter(|| { + for wa in &wallet_addresses { + black_box(baseline_calculate_address_path(wa, false)); + } + }) + }); +} + +fn bench_multisig_sorted_vs_unsorted(c: &mut Criterion) { + let xpub1 = xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + true, + ); + let xpub2 = xpub_from_mnemonic( + "legal winner thank year wave sausage worth useful legal winner thank yellow", + true, + ); + let xpub3 = xpub_from_mnemonic( + "letter advice cage absurd amount doctor acoustic avoid letter advice cage above", + true, + ); + + let unsorted = Arc::new(vec![xpub2.clone(), xpub3.clone(), xpub1.clone()]); + let mut sorted = vec![xpub2, xpub3, xpub1]; + sorted.sort(); + + let derivation_path: DerivationPath = "m/0/0/0".parse().unwrap(); + + c.bench_function("multisig_address (sorts each call)", |b| { + b.iter(|| { + black_box( + multisig_address(unsorted.clone(), 2, AddressPrefix::Devnet, &derivation_path) + .unwrap(), + ); + }) + }); + + c.bench_function( + "multisig_address_from_sorted_keys (no per-call sort)", + |b| { + b.iter(|| { + black_box( + multisig_address_from_sorted_keys( + &sorted, + 2, + AddressPrefix::Devnet, + &derivation_path, + ) + .unwrap(), + ); + }) + }, + ); +} + +fn bench_monitored_addresses_cache(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let keys = make_keys( + vec![xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + false, + )], + 1, + ); + let manager = AddressManager::new(keys, AddressPrefix::Mainnet); + + // Seed a large monitored set without disk IO (bench-only helpers). + let address_count: u32 = 10_000; + rt.block_on(async { + for i in 0..address_count { + let mut payload = [0u8; 32]; + payload[..4].copy_from_slice(&i.to_le_bytes()); + let address = Address::new(AddressPrefix::Mainnet, Version::PubKey, &payload); + let wa = WalletAddress::new(i, 0, Keychain::External); + manager.insert_address_for_bench(address, wa).await; + } + // Warm cache once. + manager.monitored_addresses().await.unwrap(); + }); + + c.bench_function("monitored_addresses/cached (10k)", |b| { + b.iter(|| { + let addresses = rt.block_on(manager.monitored_addresses()).unwrap(); + black_box(addresses.len()); + }) + }); + + c.bench_function("monitored_addresses/rebuild (10k)", |b| { + b.iter(|| { + manager.bump_address_set_version_for_bench(); + let addresses = rt.block_on(manager.monitored_addresses()).unwrap(); + black_box(addresses.len()); + }) + }); +} + +fn bench_addresses_to_query(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let single_keys = make_keys( + vec![xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + false, + )], + 1, + ); + let single = AddressManager::new(single_keys, AddressPrefix::Mainnet); + + let multi_keys = make_keys( + vec![ + xpub_from_mnemonic( + "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + true, + ), + xpub_from_mnemonic( + "legal winner thank year wave sausage worth useful legal winner thank yellow", + true, + ), + xpub_from_mnemonic( + "letter advice cage absurd amount doctor acoustic avoid letter advice cage above", + true, + ), + ], + 2, + ); + let multi = AddressManager::new(multi_keys, AddressPrefix::Mainnet); + + let mut group = c.benchmark_group("addresses_to_query"); + for &indexes in &[10u32, 100, 1_000] { + group.bench_with_input( + BenchmarkId::new("singlesig", indexes), + &indexes, + |b, &indexes| { + b.iter(|| { + let set = rt.block_on(single.addresses_to_query(0, indexes)).unwrap(); + black_box(set.len()); + }) + }, + ); + + // Multisig derivation is significantly more expensive; keep the upper bound smaller. + if indexes <= 100 { + group.bench_with_input( + BenchmarkId::new("multisig (2-of-3)", indexes), + &indexes, + |b, &indexes| { + b.iter(|| { + let set = rt.block_on(multi.addresses_to_query(0, indexes)).unwrap(); + black_box(set.len()); + }) + }, + ); + } + } + group.finish(); +} + +criterion_group!( + benches, + bench_calculate_address_path, + bench_multisig_sorted_vs_unsorted, + bench_monitored_addresses_cache, + bench_addresses_to_query +); +criterion_main!(benches); diff --git a/daemon/benches/from_addresses_filter.rs b/daemon/benches/from_addresses_filter.rs new file mode 100644 index 0000000..b458f19 --- /dev/null +++ b/daemon/benches/from_addresses_filter.rs @@ -0,0 +1,77 @@ +use common::model::{Keychain, WalletAddress}; +use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use std::collections::HashSet; +use std::time::Duration; + +fn bench_from_addresses_filter(c: &mut Criterion) { + let mut group = c.benchmark_group("from_addresses_filter"); + group.sample_size(20); + group.measurement_time(Duration::from_secs(3)); + + let address_pool_size: u32 = 10_000; + let address_pool: Vec = (0..address_pool_size) + .map(|i| WalletAddress::new(i, 0, Keychain::External)) + .collect(); + + let utxo_counts: [usize; 2] = [200_000, 1_000_000]; + let filter_lens: [usize; 4] = [0, 1, 10, 100]; + + for &utxo_count in &utxo_counts { + let utxo_addresses: Vec = (0..utxo_count as u32) + .map(|i| WalletAddress::new(i % address_pool_size, 0, Keychain::External)) + .collect(); + + for &filter_len in &filter_lens { + let from_addresses: Vec<&WalletAddress> = + address_pool.iter().take(filter_len).collect(); + + group.bench_with_input( + BenchmarkId::new(format!("linear/utxos_{utxo_count}"), filter_len), + &filter_len, + |b, _| { + b.iter(|| { + let mut kept = 0usize; + for wa in &utxo_addresses { + if !from_addresses.is_empty() && !from_addresses.contains(&wa) { + continue; + } + kept += 1; + } + black_box(kept); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new(format!("hashset/utxos_{utxo_count}"), filter_len), + &filter_len, + |b, _| { + b.iter(|| { + let from_set: Option> = if from_addresses.is_empty() + { + None + } else { + Some(from_addresses.iter().map(|wa| (*wa).clone()).collect()) + }; + + let mut kept = 0usize; + for wa in &utxo_addresses { + if let Some(ref set) = from_set { + if !set.contains(wa) { + continue; + } + } + kept += 1; + } + black_box(kept); + }) + }, + ); + } + } + + group.finish(); +} + +criterion_group!(benches, bench_from_addresses_filter); +criterion_main!(benches); diff --git a/daemon/benches/utxo_contention.rs b/daemon/benches/utxo_contention.rs new file mode 100644 index 0000000..fbc4ab5 --- /dev/null +++ b/daemon/benches/utxo_contention.rs @@ -0,0 +1,191 @@ +use common::keys::Keys; +use common::model::{Keychain, WalletAddress, WalletSignableTransaction, WalletUtxoEntry}; +use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use kaspa_addresses::{Address, Prefix as AddressPrefix, Version}; +use kaspa_bip32::Prefix as XPubPrefix; +use kaspa_consensus_core::tx::{ + ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, + TransactionOutput, UtxoEntry, +}; +use kaspa_hashes::Hash; +use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry}; +use kaswallet_daemon::address_manager::AddressManager; +use kaswallet_daemon::utxo_manager::UtxoManager; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::time::Duration; +use tokio::runtime::Runtime; +use tokio::sync::Mutex; + +fn txid(i: u32) -> Hash { + let mut bytes = [0u8; 32]; + bytes[..4].copy_from_slice(&i.to_le_bytes()); + Hash::from_bytes(bytes) +} + +fn make_outpoint(i: u32) -> RpcTransactionOutpoint { + RpcTransactionOutpoint { + transaction_id: txid(i), + index: i, + } +} + +fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry { + RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false) +} + +fn make_address(prefix: AddressPrefix, i: u32) -> Address { + let mut payload = [0u8; 32]; + payload[..4].copy_from_slice(&i.to_le_bytes()); + Address::new(prefix, Version::PubKey, &payload) +} + +fn make_rpc_utxo(i: u32, address: Address) -> RpcUtxosByAddressesEntry { + let amount = ((i % 10_000) + 1) as u64; + RpcUtxosByAddressesEntry { + address: Some(address), + outpoint: make_outpoint(i), + utxo_entry: make_rpc_utxo_entry(amount), + } +} + +fn bench_utxo_state_reads_while_updating(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let keys = Arc::new(Keys::new( + "bench-unused-keys.json".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + + let prefix = AddressPrefix::Mainnet; + let address_count: u32 = 10_000; + + // Seed address manager with a large set of monitored addresses. + let address_manager = AddressManager::new(keys, prefix); + let mut addresses = Vec::with_capacity(address_count as usize); + rt.block_on(async { + for i in 0..address_count { + let address = make_address(prefix, i); + let wa = WalletAddress::new(i, 0, Keychain::External); + address_manager + .insert_address_for_bench(address.clone(), wa) + .await; + addresses.push(address); + } + }); + + let address_manager = Arc::new(Mutex::new(address_manager)); + rt.block_on(async { + let guard = address_manager.lock().await; + guard.monitored_address_map().await.unwrap(); + }); + + let utxo_manager = Arc::new(UtxoManager::new_for_bench(address_manager.clone())); + + let utxo_count: u32 = 10_000; + let base_entries: Vec = (0..utxo_count) + .map(|i| make_rpc_utxo(i, addresses[(i % address_count) as usize].clone())) + .collect(); + + // Establish initial state and keep one wallet-local pending tx in the overlay. + rt.block_on(async { + utxo_manager + .update_utxo_set(base_entries.clone(), vec![]) + .await + .unwrap(); + + // Spend one known outpoint and create one output to our address (wallet-local mempool overlay). + let input_outpoint = make_outpoint(0); + let wallet_utxo_entry: WalletUtxoEntry = make_rpc_utxo_entry(1).into(); + let input_entry: UtxoEntry = wallet_utxo_entry.clone().into(); + + let input = TransactionInput::new( + TransactionOutpoint::new(input_outpoint.transaction_id, input_outpoint.index), + vec![], + 0, + 1, + ); + let output = TransactionOutput::new(1, ScriptPublicKey::from_vec(0, vec![])); + let tx = Transaction::new( + 0, + vec![input], + vec![output], + 0, + Default::default(), + 0, + vec![], + ); + let signable = SignableTransaction::with_entries(tx, vec![input_entry]); + + let wa0 = WalletAddress::new(0, 0, Keychain::External); + let a0 = addresses[0].clone(); + let wallet_tx: WalletSignableTransaction = WalletSignableTransaction::new_from_unsigned( + signable, + HashSet::new(), + vec![wa0], + vec![a0], + ); + utxo_manager.add_mempool_transaction(&wallet_tx).await; + }); + + // Background refresh loop to exercise write-lock swaps while measuring reads. + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = Arc::clone(&stop); + let utxo_manager_clone = Arc::clone(&utxo_manager); + let refresh_entries = base_entries.clone(); + let refresh_task = rt.spawn(async move { + while !stop_clone.load(Relaxed) { + utxo_manager_clone + .update_utxo_set(refresh_entries.clone(), vec![]) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + } + }); + + let mut group = c.benchmark_group("utxo_contention"); + group.bench_function(BenchmarkId::new("state", "utxo_count"), |b| { + b.iter(|| { + let count = rt.block_on(async { utxo_manager.state().await.utxo_count() }); + black_box(count); + }); + }); + group.bench_function(BenchmarkId::new("state", "sorted_take_10_sum"), |b| { + b.iter(|| { + let sum = rt.block_on(async { + let state = utxo_manager.state().await; + state + .utxos_sorted_by_amount() + .take(10) + .map(|utxo| utxo.utxo_entry.amount) + .sum::() + }); + black_box(sum); + }); + }); + group.bench_function(BenchmarkId::new("state_with_mempool", "utxo_count"), |b| { + b.iter(|| { + let count = rt.block_on(async { + let view = utxo_manager.state_with_mempool().await.unwrap(); + view.utxo_count() + }); + black_box(count); + }); + }); + group.finish(); + + stop.store(true, Relaxed); + refresh_task.abort(); + let _ = rt.block_on(refresh_task); +} + +criterion_group!(benches, bench_utxo_state_reads_while_updating); +criterion_main!(benches); diff --git a/daemon/benches/utxo_scaling.rs b/daemon/benches/utxo_scaling.rs new file mode 100644 index 0000000..31c9e37 --- /dev/null +++ b/daemon/benches/utxo_scaling.rs @@ -0,0 +1,108 @@ +use common::keys::Keys; +use common::model::{Keychain, WalletAddress}; +use criterion::{BatchSize, BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use kaspa_addresses::Prefix as AddressPrefix; +use kaspa_bip32::Prefix as XPubPrefix; +use kaspa_consensus_core::tx::ScriptPublicKey; +use kaspa_hashes::Hash; +use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry}; +use kaswallet_daemon::address_manager::AddressManager; +use kaswallet_daemon::utxo_manager::UtxoManager; +use std::sync::Arc; +use tokio::runtime::Runtime; +use tokio::sync::Mutex; + +fn txid(i: u32) -> Hash { + let mut bytes = [0u8; 32]; + bytes[..4].copy_from_slice(&i.to_le_bytes()); + Hash::from_bytes(bytes) +} + +fn make_outpoint(i: u32) -> RpcTransactionOutpoint { + RpcTransactionOutpoint { + transaction_id: txid(i), + index: i, + } +} + +fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry { + RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false) +} + +fn make_rpc_utxo(i: u32, address: kaspa_addresses::Address) -> RpcUtxosByAddressesEntry { + let amount = ((i % 10_000) + 1) as u64; + RpcUtxosByAddressesEntry { + address: Some(address), + outpoint: make_outpoint(i), + utxo_entry: make_rpc_utxo_entry(amount), + } +} + +fn bench_update_utxo_set(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let keys = Arc::new(Keys::new( + "bench-unused-keys.json".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + + // Seed address_manager with a realistic number of monitored addresses. + let address_manager = AddressManager::new(keys, AddressPrefix::Mainnet); + let address_count: u32 = 20_000; + let mut addresses = Vec::with_capacity(address_count as usize); + rt.block_on(async { + for i in 0..address_count { + let mut payload = [0u8; 32]; + payload[..4].copy_from_slice(&i.to_le_bytes()); + let address = kaspa_addresses::Address::new( + AddressPrefix::Mainnet, + kaspa_addresses::Version::PubKey, + &payload, + ); + let wa = WalletAddress::new(i, 0, Keychain::External); + address_manager + .insert_address_for_bench(address.clone(), wa) + .await; + addresses.push(address); + } + }); + + let address_manager = Arc::new(Mutex::new(address_manager)); + let utxo_manager = UtxoManager::new_for_bench(address_manager); + + let mut group = c.benchmark_group("update_utxo_set"); + for &utxo_count in &[1_000u32, 10_000, 50_000] { + let base_entries: Vec = (0..utxo_count) + .map(|i| make_rpc_utxo(i, addresses[(i % address_count) as usize].clone())) + .collect(); + + group.bench_with_input( + BenchmarkId::from_parameter(utxo_count), + &utxo_count, + |b, _| { + // Clone inputs outside the measured section so we measure `update_utxo_set` itself. + b.iter_batched( + || base_entries.clone(), + |entries| { + rt.block_on(utxo_manager.update_utxo_set(entries, vec![])) + .unwrap(); + let count = rt.block_on(async { utxo_manager.state().await.utxo_count() }); + black_box(count); + }, + BatchSize::LargeInput, + ) + }, + ); + } + group.finish(); +} + +criterion_group!(benches, bench_update_utxo_set); +criterion_main!(benches); diff --git a/daemon/src/address_manager.rs b/daemon/src/address_manager.rs index 7b86243..5a1ca9b 100644 --- a/daemon/src/address_manager.rs +++ b/daemon/src/address_manager.rs @@ -1,19 +1,28 @@ -use common::addresses::{multisig_address, p2pk_address}; -use common::errors::{ResultExt, WalletResult}; +use common::addresses::{multisig_address_from_sorted_keys, p2pk_address}; +use common::errors::WalletResult; use common::keys::Keys; use common::model::{KEYCHAINS, Keychain, WalletAddress}; use kaspa_addresses::{Address, Prefix as AddressPrefix}; use kaspa_bip32::secp256k1::PublicKey; -use kaspa_bip32::{DerivationPath, ExtendedPublicKey}; +use kaspa_bip32::{ChildNumber, DerivationPath, ExtendedPublicKey}; use kaspa_rpc_core::RpcBalancesByAddressesEntry; use std::collections::HashMap; use std::error::Error; -use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::Relaxed; use tokio::sync::Mutex; pub type AddressSet = HashMap; +pub type AddressQuerySet = HashMap; + +#[derive(Debug, Clone)] +struct MonitoredAddressesCache { + version: u64, + addresses: Arc>, + by_address: Arc>, +} + #[derive(Debug)] pub struct AddressManager { keys_file: Arc, @@ -23,19 +32,30 @@ pub struct AddressManager { prefix: AddressPrefix, address_cache: Mutex>, + + address_set_version: AtomicU64, + monitored_addresses_cache: Mutex, } impl AddressManager { pub fn new(keys: Arc, prefix: AddressPrefix) -> Self { let is_multisig = keys.public_keys.len() > 1; + let mut sorted_public_keys = keys.public_keys.clone(); + sorted_public_keys.sort(); Self { keys_file: keys.clone(), - extended_public_keys: Arc::new(keys.public_keys.clone()), + extended_public_keys: Arc::new(sorted_public_keys), addresses: Mutex::new(HashMap::new()), is_multisig, prefix, address_cache: Mutex::new(HashMap::new()), + address_set_version: AtomicU64::new(0), + monitored_addresses_cache: Mutex::new(MonitoredAddressesCache { + version: 0, + addresses: Arc::new(Vec::new()), + by_address: Arc::new(HashMap::new()), + }), } } @@ -60,6 +80,58 @@ impl AddressManager { Ok(strings) } + pub async fn monitored_addresses( + &self, + ) -> Result>, Box> { + let current_version = self.address_set_version.load(Relaxed); + { + let cache = self.monitored_addresses_cache.lock().await; + if cache.version == current_version { + return Ok(cache.addresses.clone()); + } + } + + let (addresses_vec, by_address): (Vec
, HashMap) = { + let addresses = self.addresses.lock().await; + let mut parsed = Vec::with_capacity(addresses.len()); + let mut by_address = HashMap::with_capacity(addresses.len()); + for (address_string, wallet_address) in addresses.iter() { + let address = Address::try_from(address_string.as_str()).map_err(|err| { + format!("invalid address in wallet address_set ({address_string}): {err}") + })?; + parsed.push(address.clone()); + by_address.insert(address, wallet_address.clone()); + } + (parsed, by_address) + }; + + let addresses_arc = Arc::new(addresses_vec); + let by_address_arc = Arc::new(by_address); + let mut cache = self.monitored_addresses_cache.lock().await; + cache.version = current_version; + cache.addresses = addresses_arc.clone(); + cache.by_address = by_address_arc; + Ok(addresses_arc) + } + + pub async fn monitored_address_map( + &self, + ) -> Result>, Box> { + let current_version = self.address_set_version.load(Relaxed); + { + let cache = self.monitored_addresses_cache.lock().await; + if cache.version == current_version { + return Ok(cache.by_address.clone()); + } + } + + // Rebuild once (also refreshes the map). + let _ = self.monitored_addresses().await?; + + let cache = self.monitored_addresses_cache.lock().await; + Ok(cache.by_address.clone()) + } + pub async fn new_address(&self) -> WalletResult<(String, WalletAddress)> { let last_used_external_index_previous_value = self .keys_file @@ -77,22 +149,26 @@ impl AddressManager { .kaspa_address_from_wallet_address(&wallet_address, true) .await?; + let address_string = address.to_string(); { - self.addresses - .lock() - .await - .insert(address.to_string(), wallet_address.clone()); + let mut addresses = self.addresses.lock().await; + addresses.insert(address_string.clone(), wallet_address.clone()); + self.address_set_version.fetch_add(1, Relaxed); } - Ok((address.to_string(), wallet_address)) + Ok((address_string, wallet_address)) } pub async fn addresses_to_query( &self, start: u32, end: u32, - ) -> Result> { - let mut addresses = HashMap::new(); + ) -> Result> { + let mut addresses = HashMap::with_capacity( + (end.saturating_sub(start) as usize) + * self.extended_public_keys.len() + * KEYCHAINS.len(), + ); for index in start..end { for cosigner_index in 0..self.extended_public_keys.len() as u16 { @@ -101,7 +177,7 @@ impl AddressManager { let address = self .kaspa_address_from_wallet_address(&wallet_address, false) .await?; - addresses.insert(address.to_string(), wallet_address); + addresses.insert(address, wallet_address); } } } @@ -111,18 +187,20 @@ impl AddressManager { pub async fn update_addresses_and_last_used_indexes( &self, - mut address_set: AddressSet, + mut address_set: AddressQuerySet, get_balances_by_addresses_response: Vec, ) -> Result<(), Box> { // create scope to release last_used_internal/external_index before keys_file.save() is called { + let mut addresses_guard = self.addresses.lock().await; + let mut inserted_any = false; for entry in get_balances_by_addresses_response { if entry.balance == Some(0) { continue; } let address_string = entry.address.to_string(); - let wallet_address = address_set.remove(&address_string).unwrap(); + let wallet_address = address_set.remove(&entry.address).unwrap(); if wallet_address.keychain == Keychain::External { if wallet_address.index > self.keys_file.last_used_external_index.load(Relaxed) @@ -139,10 +217,11 @@ impl AddressManager { .store(wallet_address.index, Relaxed); } - self.addresses - .lock() - .await - .insert(address_string, wallet_address); + addresses_guard.insert(address_string, wallet_address); + inserted_any = true; + } + if inserted_any { + self.address_set_version.fetch_add(1, Relaxed); } } @@ -194,17 +273,15 @@ impl AddressManager { &self, wallet_address: &WalletAddress, ) -> WalletResult { + // Avoid string formatting + parsing in the hot address-derivation path. let keychain_number = wallet_address.keychain.clone() as u32; - let path_string = if self.is_multisig { - format!( - "m/{}/{}/{}", - wallet_address.cosigner_index, keychain_number, wallet_address.index - ) - } else { - format!("m/{}/{}", keychain_number, wallet_address.index) - }; - let path = DerivationPath::from_str(&path_string).to_wallet_result_internal()?; + let mut path = DerivationPath::default(); + if self.is_multisig { + path.push(ChildNumber(wallet_address.cosigner_index as u32)); + } + path.push(ChildNumber(keychain_number)); + path.push(ChildNumber(wallet_address.index)); Ok(path) } @@ -217,8 +294,8 @@ impl AddressManager { } fn multisig_address(&self, derivation_path: &DerivationPath) -> WalletResult
{ - multisig_address( - self.extended_public_keys.clone(), + multisig_address_from_sorted_keys( + self.extended_public_keys.as_ref(), self.keys_file.minimum_signatures as usize, self.prefix, derivation_path, @@ -254,13 +331,150 @@ impl AddressManager { .kaspa_address_from_wallet_address(&wallet_address, true) .await?; + let address_string = address.to_string(); { - self.addresses - .lock() - .await - .insert(address.to_string(), wallet_address.clone()); + let mut addresses = self.addresses.lock().await; + addresses.insert(address_string, wallet_address.clone()); + self.address_set_version.fetch_add(1, Relaxed); } Ok((address, wallet_address)) } + + #[cfg(any(test, feature = "bench"))] + pub async fn insert_address_for_bench(&self, address: Address, wallet_address: WalletAddress) { + let mut addresses = self.addresses.lock().await; + addresses.insert(address.to_string(), wallet_address); + self.address_set_version.fetch_add(1, Relaxed); + } + + #[cfg(any(test, feature = "bench"))] + pub fn bump_address_set_version_for_bench(&self) { + self.address_set_version.fetch_add(1, Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::{AddressManager, AddressQuerySet, Keychain, WalletAddress}; + use common::keys::Keys; + use kaspa_addresses::{Address, Prefix, Version}; + use kaspa_bip32::secp256k1::SecretKey; + use kaspa_bip32::{ExtendedPrivateKey, Language, Mnemonic, Prefix as XPubPrefix}; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + static TEST_ID: AtomicUsize = AtomicUsize::new(0); + + fn unique_keys_path() -> String { + let id = TEST_ID.fetch_add(1, Ordering::Relaxed); + std::env::temp_dir() + .join(format!("kaswallet-address-manager-test-{id}.json")) + .to_string_lossy() + .to_string() + } + + fn keys_with_no_pubkeys() -> Arc { + Arc::new(Keys::new( + unique_keys_path(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )) + } + + fn keys_with_single_pubkey() -> Arc { + let phrase = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; + let mnemonic = Mnemonic::new(phrase, Language::English).unwrap(); + let seed = mnemonic.to_seed(""); + let xprv = ExtendedPrivateKey::::new(seed).unwrap(); + let xprv = xprv + .derive_path(&common::keys::master_key_path(false)) + .unwrap(); + let xpub = xprv.public_key(); + + Arc::new(Keys::new( + unique_keys_path(), + 1, + vec![], + XPubPrefix::XPUB, + vec![xpub], + 0, + 0, + 1, + 0, + )) + } + + #[test] + fn calculate_address_path_singlesig_matches_expected_format() { + let manager = AddressManager::new(keys_with_no_pubkeys(), Prefix::Mainnet); + let wallet_address = WalletAddress::new(7, 0, Keychain::External); + + let path = manager.calculate_address_path(&wallet_address).unwrap(); + assert_eq!(path.to_string(), "m/0/7"); + } + + #[tokio::test] + async fn addresses_to_query_uses_address_keys_and_expected_count() { + let manager = AddressManager::new(keys_with_single_pubkey(), Prefix::Devnet); + let set = manager.addresses_to_query(0, 2).await.unwrap(); + + // (end-start)=2 indexes, 1 cosigner, 2 keychains => 4 addresses. + assert_eq!(set.len(), 4); + } + + #[tokio::test] + async fn monitored_addresses_cache_is_reused_and_invalidated_on_change() { + let keys = keys_with_no_pubkeys(); + let manager = AddressManager::new(keys, Prefix::Mainnet); + + let address1 = Address::new(Prefix::Mainnet, Version::PubKey, &[1u8; 32]); + let wallet_address1 = WalletAddress::new(1, 0, Keychain::External); + + let mut query_set: AddressQuerySet = HashMap::new(); + query_set.insert(address1.clone(), wallet_address1); + + manager + .update_addresses_and_last_used_indexes( + query_set, + vec![kaspa_rpc_core::RpcBalancesByAddressesEntry { + address: address1.clone(), + balance: Some(1), + }], + ) + .await + .unwrap(); + + let first = manager.monitored_addresses().await.unwrap(); + let second = manager.monitored_addresses().await.unwrap(); + assert!(Arc::ptr_eq(&first, &second)); + assert_eq!(first.len(), 1); + + let address2 = Address::new(Prefix::Mainnet, Version::PubKey, &[2u8; 32]); + let wallet_address2 = WalletAddress::new(2, 0, Keychain::External); + let mut query_set: AddressQuerySet = HashMap::new(); + query_set.insert(address2.clone(), wallet_address2); + + manager + .update_addresses_and_last_used_indexes( + query_set, + vec![kaspa_rpc_core::RpcBalancesByAddressesEntry { + address: address2.clone(), + balance: Some(1), + }], + ) + .await + .unwrap(); + + let third = manager.monitored_addresses().await.unwrap(); + assert!(!Arc::ptr_eq(&second, &third)); + assert_eq!(third.len(), 2); + } } diff --git a/daemon/src/args.rs b/daemon/src/args.rs index 85fa6dd..5e8b779 100644 --- a/daemon/src/args.rs +++ b/daemon/src/args.rs @@ -1,112 +1,135 @@ -use clap::{Parser, ValueEnum}; -use common::args::parse_network_type; -use kaspa_consensus_core::network::NetworkId; -use log::LevelFilter; - -#[derive(Parser, Debug, Clone)] -#[command(name = "kaswallet-daemon")] -pub struct Args { - #[arg(long, help = "Use the test network")] - pub testnet: bool, - - #[arg(long, default_value = "10", help = "Testnet network suffix number")] - pub testnet_suffix: u32, - - #[arg(long, help = "Use the development test network")] - pub devnet: bool, - - #[arg(long, help = "Use the simulation test network")] - pub simnet: bool, - - // TODO: Remove when wallet is more stable - #[arg(long = "enable-mainnet-pre-launch", hide = true)] - pub enable_mainnet_pre_launch: bool, - - #[arg(long = "keys", short = 'k', help = "Path to keys file")] - pub keys_file_path: Option, - - #[arg(long, help = "Path to logs directory")] - pub logs_path: Option, - - #[arg(long, short = 'v', default_value = "info", help = "Log level")] - pub logs_level: LogsLevel, - - #[arg(long, short = 's', help = "Kaspa node RPC server to connect to")] - pub server: Option, - - #[arg( - long, - short = 'l', - default_value = "127.0.0.1:8082", - help = "Address to listen on" - )] - pub listen: String, - - #[arg(long, help = "Enable tokio console")] - #[cfg(debug_assertions)] - pub enable_tokio_console: bool, - - #[arg( - long, - default_value = "10000", - help = "Sync interval in milliseconds", - hide = true - )] - pub sync_interval_millis: u64, -} - -impl Default for Args { - fn default() -> Self { - Self { - testnet: false, - testnet_suffix: 10, - devnet: false, - simnet: false, - enable_mainnet_pre_launch: false, - keys_file_path: None, - logs_path: None, - logs_level: Default::default(), - server: None, - listen: "".to_string(), - #[cfg(debug_assertions)] - enable_tokio_console: false, - sync_interval_millis: 10, - } - } -} - -#[derive(Debug, Clone, ValueEnum, Default)] -pub enum LogsLevel { - Off, - Trace, - #[default] - Debug, - Info, - Warn, - Error, -} - -impl From for LevelFilter { - fn from(value: LogsLevel) -> LevelFilter { - match value { - LogsLevel::Off => LevelFilter::Off, - LogsLevel::Trace => LevelFilter::Trace, - LogsLevel::Debug => LevelFilter::Debug, - LogsLevel::Info => LevelFilter::Info, - LogsLevel::Warn => LevelFilter::Warn, - LogsLevel::Error => LevelFilter::Error, - } - } -} - -impl Args { - pub fn network_id(&self) -> NetworkId { - parse_network_type( - self.testnet, - self.devnet, - self.simnet, - self.testnet_suffix, - self.enable_mainnet_pre_launch, - ) - } -} +use clap::{Parser, ValueEnum}; +use common::args::parse_network_type; +use kaspa_consensus_core::network::NetworkId; +use log::LevelFilter; + +const DEFAULT_SYNC_INTERVAL_MILLIS: u64 = 10_000; + +#[derive(Parser, Debug, Clone)] +#[command(name = "kaswallet-daemon")] +pub struct Args { + #[arg(long, help = "Use the test network")] + pub testnet: bool, + + #[arg(long, default_value = "10", help = "Testnet network suffix number")] + pub testnet_suffix: u32, + + #[arg(long, help = "Use the development test network")] + pub devnet: bool, + + #[arg(long, help = "Use the simulation test network")] + pub simnet: bool, + + // TODO: Remove when wallet is more stable + #[arg(long = "enable-mainnet-pre-launch", hide = true)] + pub enable_mainnet_pre_launch: bool, + + #[arg(long = "keys", short = 'k', help = "Path to keys file")] + pub keys_file_path: Option, + + #[arg(long, help = "Path to logs directory")] + pub logs_path: Option, + + #[arg(long, short = 'v', default_value = "info", help = "Log level")] + pub logs_level: LogsLevel, + + #[arg(long, short = 's', help = "Kaspa node RPC server to connect to")] + pub server: Option, + + #[arg( + long, + short = 'l', + default_value = "127.0.0.1:8082", + help = "Address to listen on" + )] + pub listen: String, + + #[arg(long, help = "Enable tokio console")] + #[cfg(debug_assertions)] + pub enable_tokio_console: bool, + + #[arg( + long, + default_value_t = DEFAULT_SYNC_INTERVAL_MILLIS, + help = "Sync interval in milliseconds", + hide = true + )] + pub sync_interval_millis: u64, +} + +impl Default for Args { + fn default() -> Self { + Self { + testnet: false, + testnet_suffix: 10, + devnet: false, + simnet: false, + enable_mainnet_pre_launch: false, + keys_file_path: None, + logs_path: None, + logs_level: Default::default(), + server: None, + listen: "".to_string(), + #[cfg(debug_assertions)] + enable_tokio_console: false, + sync_interval_millis: DEFAULT_SYNC_INTERVAL_MILLIS, + } + } +} + +#[derive(Debug, Clone, ValueEnum, Default)] +pub enum LogsLevel { + Off, + Trace, + #[default] + Debug, + Info, + Warn, + Error, +} + +impl From for LevelFilter { + fn from(value: LogsLevel) -> LevelFilter { + match value { + LogsLevel::Off => LevelFilter::Off, + LogsLevel::Trace => LevelFilter::Trace, + LogsLevel::Debug => LevelFilter::Debug, + LogsLevel::Info => LevelFilter::Info, + LogsLevel::Warn => LevelFilter::Warn, + LogsLevel::Error => LevelFilter::Error, + } + } +} + +impl Args { + pub fn network_id(&self) -> NetworkId { + parse_network_type( + self.testnet, + self.devnet, + self.simnet, + self.testnet_suffix, + self.enable_mainnet_pre_launch, + ) + } +} + +#[cfg(test)] +mod tests { + use super::Args; + use clap::Parser; + + #[test] + fn sync_interval_default_matches_clap_default() { + let args_from_struct = Args::default(); + let args_from_clap = Args::parse_from(["kaswallet-daemon"]); + + assert_eq!( + args_from_struct.sync_interval_millis, + args_from_clap.sync_interval_millis + ); + assert_eq!( + args_from_struct.sync_interval_millis, + super::DEFAULT_SYNC_INTERVAL_MILLIS + ); + } +} diff --git a/daemon/src/bin/kaswallet_stress_bench.rs b/daemon/src/bin/kaswallet_stress_bench.rs new file mode 100644 index 0000000..4961950 --- /dev/null +++ b/daemon/src/bin/kaswallet_stress_bench.rs @@ -0,0 +1,385 @@ +use clap::Parser; +use common::keys::Keys; +use common::model::{Keychain, WalletAddress, WalletSignableTransaction, WalletUtxoEntry}; +use kaspa_addresses::{Address, Prefix as AddressPrefix, Version}; +use kaspa_bip32::Prefix as XPubPrefix; +use kaspa_consensus_core::tx::ScriptPublicKey; +use kaspa_consensus_core::tx::TransactionId; +use kaspa_consensus_core::tx::{ + SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, + UtxoEntry, +}; +use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry}; +use kaswallet_daemon::address_manager::AddressManager; +use kaswallet_daemon::utxo_manager::UtxoManager; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::time::Instant; +use tokio::runtime::Runtime; +use tokio::sync::Mutex; +use tokio::time::MissedTickBehavior; + +#[derive(Parser, Debug)] +#[command(about = "Synthetic stress benchmark for huge wallets (no RPC/network).")] +struct Args { + /// Number of wallet addresses to seed into AddressManager. + #[arg(long, default_value_t = 1_000_000)] + addresses: u32, + + /// Number of UTXOs to generate and feed into UtxoManager::update_utxo_set. + #[arg(long, default_value_t = 10_000_000)] + utxos: u32, + + /// Progress log cadence (0 disables progress logs). + #[arg(long, default_value_t = 100_000)] + progress_every: u32, + + /// Required safety flag because this benchmark can use MANY GiB of RAM. + #[arg(long)] + i_understand: bool, + + /// Run a contention scenario: measure read latencies while a second `update_utxo_set` runs. + #[arg(long)] + contend: bool, + + /// Number of concurrent reader tasks when running `--contend`. + #[arg(long, default_value_t = 4)] + contend_readers: u32, + + /// Sampling interval (microseconds) for read latency measurements when running `--contend`. + #[arg(long, default_value_t = 100)] + contend_sample_interval_micros: u64, +} + +fn address_for_index(prefix: AddressPrefix, i: u32) -> Address { + let mut payload = [0u8; 32]; + payload[..4].copy_from_slice(&i.to_le_bytes()); + Address::new(prefix, Version::PubKey, &payload) +} + +fn txid(i: u32) -> TransactionId { + let mut bytes = [0u8; 32]; + bytes[..4].copy_from_slice(&i.to_le_bytes()); + TransactionId::from_bytes(bytes) +} + +fn summarize_latencies(name: &str, mut samples_ns: Vec) { + if samples_ns.is_empty() { + println!(" {name}: no samples"); + return; + } + samples_ns.sort_unstable(); + let n = samples_ns.len(); + let p99 = samples_ns[((n - 1) * 99) / 100]; + let p999 = samples_ns[((n - 1) * 999) / 1000]; + let max = *samples_ns.last().unwrap(); + + let p99_us = (p99 as f64) / 1_000.0; + let p999_us = (p999 as f64) / 1_000.0; + let max_us = (max as f64) / 1_000.0; + + println!(" {name}: samples={n} p99={p99_us:.3}µs p999={p999_us:.3}µs max={max_us:.3}µs"); +} + +fn main() { + let args = Args::parse(); + if !args.i_understand { + eprintln!( + "Refusing to run without --i-understand (this can use MANY GiB of RAM). \ +Example:\n RUSTC_WRAPPER= CARGO_TARGET_DIR=target cargo run -p kaswallet-daemon --features bench --release --bin kaswallet-stress-bench -- --i-understand --addresses 1000000 --utxos 10000000" + ); + std::process::exit(2); + } + if args.addresses == 0 { + eprintln!("--addresses must be > 0"); + std::process::exit(2); + } + + let rt = Runtime::new().expect("tokio runtime"); + let prefix = AddressPrefix::Mainnet; + + // We don't derive addresses here; we seed AddressManager directly. + let keys = Arc::new(Keys::new( + "bench-unused-keys.json".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + + let address_manager = AddressManager::new(keys, prefix); + + println!( + "kaswallet-stress-bench: seeding addresses={} utxos={}", + args.addresses, args.utxos + ); + + let start = Instant::now(); + rt.block_on(async { + for i in 0..args.addresses { + let address = address_for_index(prefix, i); + let wa = WalletAddress::new(i, 0, Keychain::External); + address_manager.insert_address_for_bench(address, wa).await; + + if args.progress_every > 0 && (i + 1) % args.progress_every == 0 { + println!(" seeded {} addresses", i + 1); + } + } + }); + println!("Seeded addresses in {:?}", start.elapsed()); + + // Build and warm the monitored-address caches (both Vec
and HashMap). + let start = Instant::now(); + let monitored = rt + .block_on(address_manager.monitored_addresses()) + .expect("monitored_addresses"); + println!( + "monitored_addresses first build: {:?} (len={})", + start.elapsed(), + monitored.len() + ); + + let start = Instant::now(); + let monitored2 = rt + .block_on(address_manager.monitored_addresses()) + .expect("monitored_addresses cached"); + println!( + "monitored_addresses cached: {:?} (same_arc={})", + start.elapsed(), + Arc::ptr_eq(&monitored, &monitored2) + ); + + let start = Instant::now(); + let by_address = rt + .block_on(address_manager.monitored_address_map()) + .expect("monitored_address_map"); + println!( + "monitored_address_map cached: {:?} (len={})", + start.elapsed(), + by_address.len() + ); + + let address_manager = Arc::new(Mutex::new(address_manager)); + let utxo_manager = Arc::new(UtxoManager::new_for_bench(address_manager)); + + let empty_spk = ScriptPublicKey::from_vec(0, vec![]); + + println!("Generating {} UTXO entries...", args.utxos); + let start = Instant::now(); + let mut entries: Vec = Vec::with_capacity(args.utxos as usize); + for i in 0..args.utxos { + let address_index = i % args.addresses; + let address = address_for_index(prefix, address_index); + + let outpoint = RpcTransactionOutpoint { + transaction_id: txid(i), + index: i, + }; + let amount = ((i % 10_000) + 1) as u64; + let utxo_entry = RpcUtxoEntry::new(amount, empty_spk.clone(), 0, false); + + entries.push(RpcUtxosByAddressesEntry { + address: Some(address), + outpoint, + utxo_entry, + }); + + if args.progress_every > 0 && (i + 1) % args.progress_every == 0 { + println!(" generated {} utxos", i + 1); + } + } + println!("Generated UTXO entries in {:?}", start.elapsed()); + + println!("Running update_utxo_set..."); + let start = Instant::now(); + rt.block_on(utxo_manager.update_utxo_set(entries, vec![])) + .expect("update_utxo_set"); + let state = rt.block_on(utxo_manager.state()); + println!( + "update_utxo_set: {:?} (utxos_by_outpoint={})", + start.elapsed(), + state.utxos_by_outpoint().len() + ); + + // Minimal sanity check to keep the compiler honest and confirm the sorted index exists. + let mut sum = 0u64; + for utxo in state.utxos_sorted_by_amount().take(1000) { + sum = sum.wrapping_add(utxo.utxo_entry.amount); + } + println!("sanity: sum(first 1000 amounts)={sum}"); + + if !args.contend { + return; + } + + if args.contend_readers == 0 { + eprintln!("--contend-readers must be > 0"); + std::process::exit(2); + } + + println!( + "Starting contention run: readers={} sample_interval={}µs", + args.contend_readers, args.contend_sample_interval_micros + ); + + let stop = Arc::new(AtomicBool::new(false)); + let utxo_manager_clone = Arc::clone(&utxo_manager); + + // Keep one wallet-local pending tx so `state_with_mempool()` includes the overlay path. + rt.block_on(async { + let input_outpoint = RpcTransactionOutpoint { + transaction_id: txid(0), + index: 0, + }; + + let input = TransactionInput::new( + TransactionOutpoint::new(input_outpoint.transaction_id, input_outpoint.index), + vec![], + 0, + 1, + ); + let output = TransactionOutput::new(1, empty_spk.clone()); + let tx = Transaction::new( + 0, + vec![input], + vec![output], + 0, + Default::default(), + 0, + vec![], + ); + + let wallet_utxo_entry = WalletUtxoEntry::new(1, empty_spk.clone(), 0, false); + let input_entry: UtxoEntry = wallet_utxo_entry.into(); + let signable = SignableTransaction::with_entries(tx, vec![input_entry]); + + let wa0 = WalletAddress::new(0, 0, Keychain::External); + let a0 = address_for_index(prefix, 0); + let wallet_tx = WalletSignableTransaction::new_from_unsigned( + signable, + HashSet::new(), + vec![wa0], + vec![a0], + ); + utxo_manager_clone.add_mempool_transaction(&wallet_tx).await; + }); + + let contend_sample_interval = if args.contend_sample_interval_micros == 0 { + None + } else { + Some(core::time::Duration::from_micros( + args.contend_sample_interval_micros, + )) + }; + + let utxo_manager_for_update = Arc::clone(&utxo_manager); + let contend_address_count = args.addresses; + let contend_utxo_count = args.utxos; + let contend_prefix = prefix; + + let stop_clone = Arc::clone(&stop); + let update_handle = rt.spawn(async move { + println!("Contention: generating UTXOs for refresh..."); + let start = Instant::now(); + let empty_spk = ScriptPublicKey::from_vec(0, vec![]); + let mut entries: Vec = + Vec::with_capacity(contend_utxo_count as usize); + for i in 0..contend_utxo_count { + let address_index = i % contend_address_count; + let address = address_for_index(contend_prefix, address_index); + let outpoint = RpcTransactionOutpoint { + transaction_id: txid(i), + index: i, + }; + let amount = ((i % 10_000) + 1) as u64; + let utxo_entry = RpcUtxoEntry::new(amount, empty_spk.clone(), 0, false); + entries.push(RpcUtxosByAddressesEntry { + address: Some(address), + outpoint, + utxo_entry, + }); + } + println!("Contention: generated UTXOs in {:?}", start.elapsed()); + + println!("Contention: running update_utxo_set..."); + let start = Instant::now(); + utxo_manager_for_update + .update_utxo_set(entries, vec![]) + .await + .expect("contention update_utxo_set"); + let elapsed = start.elapsed(); + println!("Contention: update_utxo_set done in {elapsed:?}"); + + stop_clone.store(true, Relaxed); + elapsed + }); + + let mut reader_handles = Vec::new(); + for _ in 0..args.contend_readers { + let utxo_manager = Arc::clone(&utxo_manager); + let stop = Arc::clone(&stop); + let sample_interval = contend_sample_interval; + reader_handles.push(rt.spawn(async move { + let mut state_samples_ns: Vec = Vec::new(); + let mut mempool_samples_ns: Vec = Vec::new(); + + if let Some(interval_duration) = sample_interval { + let mut interval = tokio::time::interval(interval_duration); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if stop.load(Relaxed) { + break; + } + let t0 = Instant::now(); + let state = utxo_manager.state().await; + std::hint::black_box(state.utxo_count()); + state_samples_ns.push(t0.elapsed().as_nanos() as u64); + + let t0 = Instant::now(); + let view = utxo_manager.state_with_mempool().await.unwrap(); + std::hint::black_box(view.utxo_count()); + mempool_samples_ns.push(t0.elapsed().as_nanos() as u64); + } + } else { + while !stop.load(Relaxed) { + let t0 = Instant::now(); + let state = utxo_manager.state().await; + std::hint::black_box(state.utxo_count()); + state_samples_ns.push(t0.elapsed().as_nanos() as u64); + + let t0 = Instant::now(); + let view = utxo_manager.state_with_mempool().await.unwrap(); + std::hint::black_box(view.utxo_count()); + mempool_samples_ns.push(t0.elapsed().as_nanos() as u64); + } + } + + (state_samples_ns, mempool_samples_ns) + })); + } + + let update_elapsed = rt.block_on(async { update_handle.await.expect("update task panicked") }); + println!("Contention: update_utxo_set elapsed = {update_elapsed:?}"); + + // Ensure readers stop even if the update task completed before they started. + stop.store(true, Relaxed); + + let mut merged_state_ns = Vec::new(); + let mut merged_mempool_ns = Vec::new(); + for handle in reader_handles { + let (state_ns, mempool_ns) = + rt.block_on(async { handle.await.expect("reader task panicked") }); + merged_state_ns.extend(state_ns); + merged_mempool_ns.extend(mempool_ns); + } + + println!("Read latency while update_utxo_set was running:"); + summarize_latencies("state().await + utxo_count", merged_state_ns); + summarize_latencies("state_with_mempool().await + utxo_count", merged_mempool_ns); +} diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs index 8e0af00..b44ab01 100644 --- a/daemon/src/daemon.rs +++ b/daemon/src/daemon.rs @@ -81,11 +81,11 @@ impl Daemon { keys.clone(), address_prefix, ))); - let utxo_manager = Arc::new(Mutex::new(utxo_manager::UtxoManager::new( + let utxo_manager = Arc::new(utxo_manager::UtxoManager::new( address_manager.clone(), consensus_params, block_dag_info, - ))); + )); let transaction_generator = Arc::new(Mutex::new(TransactionGenerator::new( kaspa_rpc_client.clone(), keys.clone(), diff --git a/daemon/src/service/broadcast.rs b/daemon/src/service/broadcast.rs index ed8fff2..8846f71 100644 --- a/daemon/src/service/broadcast.rs +++ b/daemon/src/service/broadcast.rs @@ -10,10 +10,7 @@ impl KasWalletService { let signed_transactions: Vec<_> = request.transactions.into_iter().map(Into::into).collect(); - let mut utxo_manager = self.utxo_manager.lock().await; - let transaction_ids = self - .submit_transactions(&mut utxo_manager, &signed_transactions) - .await?; + let transaction_ids = self.submit_transactions(&signed_transactions).await?; Ok(BroadcastResponse { transaction_ids }) } diff --git a/daemon/src/service/common.rs b/daemon/src/service/common.rs index f040e67..cdb8a45 100644 --- a/daemon/src/service/common.rs +++ b/daemon/src/service/common.rs @@ -1,11 +1,9 @@ use crate::service::kaswallet_service::KasWalletService; -use crate::utxo_manager::UtxoManager; use common::errors::WalletError::UserInputError; use common::errors::{ResultExt, WalletResult}; use common::model::WalletSignableTransaction; use kaspa_consensus_core::sign::Signed::{Fully, Partially}; use kaspa_wallet_core::rpc::RpcApi; -use tokio::sync::MutexGuard; impl KasWalletService { pub(crate) async fn get_virtual_daa_score(&self) -> WalletResult { @@ -30,8 +28,7 @@ impl KasWalletService { pub(crate) async fn submit_transactions( &self, - utxo_manager: &mut MutexGuard<'_, UtxoManager>, - signed_transactions: &Vec, + signed_transactions: &[WalletSignableTransaction], ) -> WalletResult> { let _ = self.submit_transaction_mutex.lock().await; @@ -57,7 +54,7 @@ impl KasWalletService { transaction_ids.push(rpc_transaction_id.to_string()); - utxo_manager + self.utxo_manager .add_mempool_transaction(signed_transaction) .await; } diff --git a/daemon/src/service/create_unsigned_transaction.rs b/daemon/src/service/create_unsigned_transaction.rs index a7ef719..346d5a5 100644 --- a/daemon/src/service/create_unsigned_transaction.rs +++ b/daemon/src/service/create_unsigned_transaction.rs @@ -1,12 +1,11 @@ use crate::service::kaswallet_service::KasWalletService; -use crate::utxo_manager::UtxoManager; -use common::errors::WalletError::UserInputError; +use crate::utxo_manager::UtxoStateView; +use common::errors::WalletError::{InternalServerError, UserInputError}; use common::errors::WalletResult; use common::model::WalletSignableTransaction; use proto::kaswallet_proto::{ CreateUnsignedTransactionsRequest, CreateUnsignedTransactionsResponse, TransactionDescription, }; -use tokio::sync::MutexGuard; impl KasWalletService { pub(crate) async fn create_unsigned_transactions( @@ -21,12 +20,13 @@ impl KasWalletService { let transaction_description = request.transaction_description.unwrap(); let unsigned_transactions: Vec; { - let utxo_manager = self.utxo_manager.lock().await; + let utxo_state = self + .utxo_manager + .state_with_mempool() + .await + .map_err(|e| InternalServerError(e.to_string()))?; unsigned_transactions = self - .create_unsigned_transactions_from_description( - transaction_description, - &utxo_manager, - ) + .create_unsigned_transactions_from_description(transaction_description, &utxo_state) .await?; } @@ -38,13 +38,17 @@ impl KasWalletService { pub(crate) async fn create_unsigned_transactions_from_description( &self, transaction_description: TransactionDescription, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_state: &UtxoStateView, ) -> WalletResult> { self.check_is_synced().await?; let mut transaction_generator = self.transaction_generator.lock().await; transaction_generator - .create_unsigned_transactions(utxo_manager, transaction_description) + .create_unsigned_transactions( + self.utxo_manager.as_ref(), + utxo_state, + transaction_description, + ) .await } } diff --git a/daemon/src/service/get_balance.rs b/daemon/src/service/get_balance.rs index 926810a..0ef63a4 100644 --- a/daemon/src/service/get_balance.rs +++ b/daemon/src/service/get_balance.rs @@ -1,6 +1,5 @@ use crate::service::kaswallet_service::KasWalletService; -use common::errors::{ResultExt, WalletResult}; -use common::model::WalletUtxo; +use common::errors::{ResultExt, WalletError::InternalServerError, WalletResult}; use log::info; use proto::kaswallet_proto::{AddressBalances, GetBalanceRequest, GetBalanceResponse}; use std::collections::HashMap; @@ -15,43 +14,56 @@ impl KasWalletService { let virtual_daa_score = self.get_virtual_daa_score().await?; let mut balances_map = HashMap::new(); - let utxos_sorted_by_amount: Vec; - let utxos_count: usize; - { - let utxo_manager = self.utxo_manager.lock().await; - utxos_sorted_by_amount = utxo_manager.utxos_sorted_by_amount(); + let utxo_state = self + .utxo_manager + .state_with_mempool() + .await + .map_err(|e| InternalServerError(e.to_string()))?; - utxos_count = utxos_sorted_by_amount.len(); - for entry in utxos_sorted_by_amount { - let amount = entry.utxo_entry.amount; - let balances = balances_map - .entry(entry.address.clone()) - .or_insert_with(BalancesEntry::new); - if utxo_manager.is_utxo_pending(&entry, virtual_daa_score) { - balances.add_pending(amount); - } else { - balances.add_available(amount); - } + let utxos_count = utxo_state.utxo_count(); + if utxos_count == 0 { + info!("GetBalance request scanned 0 UTXOs overall over 0 addresses"); + return Ok(GetBalanceResponse { + available: 0, + pending: 0, + address_balances: vec![], + }); + } + + for utxo in utxo_state.utxos_iter() { + let amount = utxo.utxo_entry.amount; + let balances = balances_map + .entry(utxo.address.clone()) + .or_insert_with(BalancesEntry::new); + if self.utxo_manager.is_utxo_pending(utxo, virtual_daa_score) { + balances.add_pending(amount); + } else { + balances.add_available(amount); } } let mut address_balances = vec![]; let mut total_balances = BalancesEntry::new(); - let address_manager = self.address_manager.lock().await; - for (wallet_address, balances) in &balances_map { - let address = address_manager - .kaspa_address_from_wallet_address(wallet_address, true) - .await - .to_wallet_result_internal()?; + if request.include_balance_per_address { + let address_manager = self.address_manager.lock().await; + address_balances.reserve(balances_map.len()); + for (wallet_address, balances) in &balances_map { + let address = address_manager + .kaspa_address_from_wallet_address(wallet_address, true) + .await + .to_wallet_result_internal()?; - if request.include_balance_per_address { address_balances.push(AddressBalances { address: address.to_string(), available: balances.available, pending: balances.pending, }); + total_balances.add(balances); + } + } else { + for balances in balances_map.values() { + total_balances.add(balances); } - total_balances.add(balances); } info!( diff --git a/daemon/src/service/get_utxos.rs b/daemon/src/service/get_utxos.rs index 8cbfe37..063ca70 100644 --- a/daemon/src/service/get_utxos.rs +++ b/daemon/src/service/get_utxos.rs @@ -1,14 +1,14 @@ use crate::address_manager::AddressSet; use crate::service::kaswallet_service::KasWalletService; -use common::errors::WalletError::UserInputError; +use common::errors::WalletError::{InternalServerError, UserInputError}; use common::errors::{ResultExt, WalletResult}; -use common::model::WalletUtxo; +use common::model::WalletAddress; use kaspa_addresses::Address; use kaspa_wallet_core::rpc::RpcApi; use proto::kaswallet_proto::{ AddressToUtxos, GetUtxosRequest, GetUtxosResponse, Utxo as ProtoUtxo, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; impl KasWalletService { pub(crate) async fn get_utxos( @@ -24,8 +24,8 @@ impl KasWalletService { let address_manager = self.address_manager.lock().await; address_set = address_manager.address_set().await; } - let address_strings: Vec = if request.addresses.is_empty() { - address_set.keys().cloned().collect() + let allowed_addresses: Option> = if request.addresses.is_empty() { + None } else { for address in &request.addresses { if !address_set.contains_key(address) { @@ -35,8 +35,14 @@ impl KasWalletService { ))); } } - request.addresses + Some(request.addresses.iter().cloned().collect()) }; + let wallet_address_to_string: HashMap = address_set + .iter() + .map(|(address_string, wallet_address)| { + (wallet_address.clone(), address_string.clone()) + }) + .collect(); let fee_estimate = self .kaspa_client @@ -48,85 +54,70 @@ impl KasWalletService { let virtual_daa_score = self.get_virtual_daa_score().await?; - let utxos = { - let utxo_manager = self.utxo_manager.lock().await; - utxo_manager.utxos_sorted_by_amount() - }; - let filtered_bucketed_utxos = self - .filter_utxos_and_bucket_by_address( - &utxos, - fee_rate, - virtual_daa_score, - &address_strings, - request.include_pending, - request.include_dust, - ) - .await?; - - let addresses_to_utxos = filtered_bucketed_utxos - .iter() - .map(|(address_string, utxos)| AddressToUtxos { - address: address_string.clone(), - utxos: utxos.clone(), - }) - .collect(); + let utxo_state = self + .utxo_manager + .state_with_mempool() + .await + .map_err(|e| InternalServerError(e.to_string()))?; - Ok(GetUtxosResponse { addresses_to_utxos }) - } + let dust_fee_for_single_utxo = if request.include_dust { + None + } else { + let sample_utxo = { utxo_state.utxos_sorted_by_amount().next().cloned() }; + if let Some(sample_utxo) = sample_utxo { + let transaction_generator = self.transaction_generator.lock().await; + let mass = transaction_generator + .estimate_mass( + &vec![sample_utxo.clone()], + sample_utxo.utxo_entry.amount, + &[] as &[u8], + ) + .await?; + Some(((mass as f64) * fee_rate).ceil() as u64) + } else { + None + } + }; - async fn filter_utxos_and_bucket_by_address( - &self, - utxos: &Vec, - fee_rate: f64, - virtual_daa_score: u64, - address_strings: &[String], - include_pending: bool, - include_dust: bool, - ) -> WalletResult>> { - let mut filtered_bucketed_utxos = HashMap::new(); - for utxo in utxos { - let is_pending = { - let utxo_manager = self.utxo_manager.lock().await; - utxo_manager.is_utxo_pending(utxo, virtual_daa_score) - }; - if !include_pending && is_pending { + let mut filtered_bucketed_utxos: HashMap> = HashMap::new(); + for utxo in utxo_state.utxos_sorted_by_amount() { + let is_pending = self.utxo_manager.is_utxo_pending(utxo, virtual_daa_score); + if !request.include_pending && is_pending { continue; } - let is_dust = self.is_utxo_dust(utxo, fee_rate).await?; - if !include_dust && is_dust { + + let is_dust = + dust_fee_for_single_utxo.is_some_and(|dust_fee| dust_fee >= utxo.utxo_entry.amount); + if !request.include_dust && is_dust { continue; } - let address: String; - { - let address_manager = self.address_manager.lock().await; - address = address_manager - .kaspa_address_from_wallet_address(&utxo.address, true) - .await? - .address_to_string(); + let address = wallet_address_to_string.get(&utxo.address).ok_or_else(|| { + InternalServerError(format!( + "wallet address missing from address_set: {:?}", + utxo.address + )) + })?; + if let Some(allowed_addresses) = &allowed_addresses { + if !allowed_addresses.contains(address) { + continue; + } } - if !address_strings.is_empty() && !address_strings.contains(&address) { - continue; + match filtered_bucketed_utxos.get_mut(address) { + Some(bucket) => bucket.push(utxo.to_proto(is_pending, is_dust)), + None => { + filtered_bucketed_utxos + .insert(address.clone(), vec![utxo.to_proto(is_pending, is_dust)]); + } } - - let entry = filtered_bucketed_utxos - .entry(address) - .or_insert_with(Vec::new); - entry.push(utxo.to_owned().into_proto(is_pending, is_dust)); } - Ok(filtered_bucketed_utxos) - } - - async fn is_utxo_dust(&self, utxo: &WalletUtxo, fee_rate: f64) -> WalletResult { - let transaction_generator = self.transaction_generator.lock().await; - let mass = transaction_generator - .estimate_mass(&vec![utxo.clone()], utxo.utxo_entry.amount, &[]) - .await?; - - let fee = ((mass as f64) * fee_rate).ceil() as u64; + let addresses_to_utxos = filtered_bucketed_utxos + .into_iter() + .map(|(address, utxos)| AddressToUtxos { address, utxos }) + .collect(); - Ok(fee >= utxo.utxo_entry.amount) + Ok(GetUtxosResponse { addresses_to_utxos }) } } diff --git a/daemon/src/service/kaswallet_service.rs b/daemon/src/service/kaswallet_service.rs index f6d8e7b..1225508 100644 --- a/daemon/src/service/kaswallet_service.rs +++ b/daemon/src/service/kaswallet_service.rs @@ -22,7 +22,7 @@ pub struct KasWalletService { pub(crate) kaspa_client: Arc, pub(crate) keys: Arc, pub(crate) address_manager: Arc>, - pub(crate) utxo_manager: Arc>, + pub(crate) utxo_manager: Arc, pub(crate) transaction_generator: Arc>, pub(crate) sync_manager: Arc, pub(crate) submit_transaction_mutex: Mutex<()>, @@ -33,7 +33,7 @@ impl KasWalletService { kaspa_client: Arc, keys: Arc, address_manager: Arc>, - utxo_manager: Arc>, + utxo_manager: Arc, transaction_generator: Arc>, sync_manager: Arc, ) -> Self { diff --git a/daemon/src/service/send.rs b/daemon/src/service/send.rs index 85bd291..3869cc7 100644 --- a/daemon/src/service/send.rs +++ b/daemon/src/service/send.rs @@ -1,5 +1,5 @@ use crate::service::kaswallet_service::KasWalletService; -use common::errors::WalletError::UserInputError; +use common::errors::WalletError::{InternalServerError, UserInputError}; use common::errors::WalletResult; use log::{debug, error, info}; use proto::kaswallet_proto::{SendRequest, SendResponse}; @@ -7,10 +7,6 @@ use std::time::Instant; impl KasWalletService { pub(crate) async fn send(&self, request: SendRequest) -> WalletResult { - // lock utxo_manager at this point, so that if sync happens in the middle - it doesn't - // interfere with apply_transaction - let mut utxo_manager = self.utxo_manager.lock().await; - let send_start = Instant::now(); let transaction_description = match request.transaction_description { Some(description) => description, @@ -27,8 +23,13 @@ impl KasWalletService { debug!("Creating unsigned transactions..."); + let utxo_state = self + .utxo_manager + .state_with_mempool() + .await + .map_err(|e| InternalServerError(e.to_string()))?; let unsigned_transactions = self - .create_unsigned_transactions_from_description(transaction_description, &utxo_manager) + .create_unsigned_transactions_from_description(transaction_description, &utxo_state) .await?; debug!("Created {} transactions", unsigned_transactions.len()); @@ -39,9 +40,7 @@ impl KasWalletService { debug!("Transactions got signed!"); debug!("Submitting transactions..."); - let submit_transactions_result = self - .submit_transactions(&mut utxo_manager, &signed_transactions) - .await; + let submit_transactions_result = self.submit_transactions(&signed_transactions).await; if let Err(e) = submit_transactions_result { error!("Failed to submit transactions: {}", e); return Err(e); diff --git a/daemon/src/sync_manager.rs b/daemon/src/sync_manager.rs index 9e4c9c4..ba15462 100644 --- a/daemon/src/sync_manager.rs +++ b/daemon/src/sync_manager.rs @@ -1,4 +1,4 @@ -use crate::address_manager::{AddressManager, AddressSet}; +use crate::address_manager::{AddressManager, AddressQuerySet}; use crate::utxo_manager::UtxoManager; use common::keys::Keys; use kaspa_addresses::Address; @@ -21,11 +21,12 @@ pub struct SyncManager { kaspa_client: Arc, keys_file: Arc, address_manager: Arc>, - utxo_manager: Arc>, + utxo_manager: Arc, sync_interval_millis: u64, first_sync_done: AtomicBool, next_sync_start_index: AtomicU32, + recent_scan_next_index: AtomicU32, is_log_final_progress_line_shown: AtomicBool, max_used_addresses_for_log: AtomicU32, max_processed_addresses_for_log: AtomicU32, @@ -36,7 +37,7 @@ impl SyncManager { kaspa_rpc_client: Arc, keys_file: Arc, address_manager: Arc>, - utxo_manager: Arc>, + utxo_manager: Arc, sync_interval: u64, ) -> Self { Self { @@ -47,6 +48,7 @@ impl SyncManager { sync_interval_millis: sync_interval, first_sync_done: AtomicBool::new(false), next_sync_start_index: 0.into(), + recent_scan_next_index: 0.into(), is_log_final_progress_line_shown: false.into(), max_used_addresses_for_log: 0.into(), max_processed_addresses_for_log: 0.into(), @@ -94,19 +96,17 @@ impl SyncManager { async fn refresh_utxos(&self) -> Result<(), Box> { debug!("Refreshing UTXOs..."); - let address_strings: Vec; + let monitored_addresses: Arc>; { let address_manager = self.address_manager.lock().await; - address_strings = address_manager.address_strings().await?; + monitored_addresses = address_manager.monitored_addresses().await?; } - let addresses: Vec
= address_strings - .iter() - .map(|address_string| Address::constructor(address_string)) - .collect(); + let addresses: Vec
= monitored_addresses.as_ref().clone(); - // Lock utxo_manager at this stage, so that nobody tries to generate transactions while - // we update the utxo set - let mut utxo_manager = self.utxo_manager.lock().await; + if addresses.is_empty() { + self.utxo_manager.update_utxo_set(vec![], vec![]).await?; + return Ok(()); + } debug!("Getting mempool entries for addresses: {:?}...", addresses); // It's important to check the mempool before calling `GetUTXOsByAddresses`: @@ -135,7 +135,9 @@ impl SyncManager { self.kaspa_client.get_utxos_by_addresses(addresses).await?; debug!("Got {} utxo entries", get_utxo_by_addresses_response.len()); - utxo_manager + // `update_utxo_set` builds a new snapshot without holding any read locks and + // swaps the Arc pointer under a brief lock. + self.utxo_manager .update_utxo_set(get_utxo_by_addresses_response, mempool_entries_by_addresses) .await?; @@ -158,26 +160,73 @@ impl SyncManager { pub async fn collect_recent_addresses(&self) -> Result<(), Box> { debug!("Collecting recent addresses"); + if !self.first_sync_done.load(Relaxed) { + return self.collect_recent_addresses_full_scan().await; + } + self.collect_recent_addresses_incremental().await + } + + async fn collect_recent_addresses_full_scan(&self) -> Result<(), Box> { let mut index: u32 = 0; let mut max_used_index: u32 = 0; - while index < max_used_index + NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES { + while index < max_used_index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES) { self.collect_addresses(index, index + NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES) .await?; - index += NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES; + index = index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES); max_used_index = self.last_used_index().await; self.update_address_collection_progress_log(index, max_used_index); } - let next_sync_start_index = self.next_sync_start_index.load(Relaxed); - if index > next_sync_start_index { - self.next_sync_start_index.store(index, Relaxed); + self.bump_next_sync_start_index(index); + Ok(()) + } + + async fn collect_recent_addresses_incremental( + &self, + ) -> Result<(), Box> { + // After the initial full scan, we avoid rescanning from index 0 on every tick. Instead we + // scan a fixed-size chunk per cycle and advance a cursor, eventually covering the full + // range [0, last_used_index + LOOKAHEAD). + let last_used_index = self.last_used_index().await; + let frontier = Self::recent_scan_frontier(last_used_index); + + // Keep "synced" semantics stable as last_used_index grows. + self.bump_next_sync_start_index(frontier); + + let cursor = self.recent_scan_next_index.load(Relaxed); + let (start, end, next_cursor) = Self::recent_scan_step(frontier, cursor); + + debug!( + "Incremental recent-address scan: [{}, {}) (cursor={}, frontier={}, last_used_index={})", + start, end, cursor, frontier, last_used_index + ); + + if start < end { + self.collect_addresses(start, end).await?; } + self.recent_scan_next_index.store(next_cursor, Relaxed); Ok(()) } + fn recent_scan_frontier(last_used_index: u32) -> u32 { + last_used_index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES) + } + + fn recent_scan_step(frontier: u32, cursor: u32) -> (u32, u32, u32) { + if frontier == 0 { + return (0, 0, 0); + } + let start = if cursor >= frontier { 0 } else { cursor }; + let end = start + .saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES) + .min(frontier); + let next_cursor = if end >= frontier { 0 } else { end }; + (start, end, next_cursor) + } + pub async fn collect_far_addresses(&self) -> Result<(), Box> { debug!("Collecting far addresses"); @@ -195,6 +244,13 @@ impl SyncManager { Ok(()) } + fn bump_next_sync_start_index(&self, candidate: u32) { + let current = self.next_sync_start_index.load(Relaxed); + if candidate > current { + self.next_sync_start_index.store(candidate, Relaxed); + } + } + async fn collect_addresses( &self, start: u32, @@ -202,7 +258,7 @@ impl SyncManager { ) -> Result<(), Box> { debug!("Collecting addresses from {} to {}", start, end); - let addresses: AddressSet; + let addresses: AddressQuerySet; { let address_manager = self.address_manager.lock().await; addresses = address_manager.addresses_to_query(start, end).await?; @@ -211,12 +267,7 @@ impl SyncManager { let get_balances_by_addresses_response = self .kaspa_client - .get_balances_by_addresses( - addresses - .keys() - .map(|address_string| Address::constructor(address_string)) - .collect(), - ) + .get_balances_by_addresses(addresses.keys().cloned().collect()) .await?; let address_manager = self.address_manager.lock().await; @@ -260,7 +311,7 @@ impl SyncManager { * 100.0; info!( - "{} addressed of {} of processed ({:.2}%)", + "{} addresses of {} processed ({:.2}%)", self.max_processed_addresses_for_log.load(Relaxed), self.max_used_addresses_for_log.load(Relaxed), percent_processed @@ -268,3 +319,63 @@ impl SyncManager { } } } + +#[cfg(test)] +mod tests { + use super::SyncManager; + + #[test] + fn recent_scan_frontier_saturates_at_u32_max() { + let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES; + assert_eq!(SyncManager::recent_scan_frontier(u32::MAX), u32::MAX); + assert_eq!( + SyncManager::recent_scan_frontier(u32::MAX - lookahead + 1), + u32::MAX + ); + } + + #[test] + fn recent_scan_step_clamps_end_to_frontier_and_wraps() { + let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES; + let frontier = lookahead / 2; + let (start, end, next_cursor) = SyncManager::recent_scan_step(frontier, 0); + assert_eq!(start, 0); + assert_eq!(end, frontier); + assert_eq!(next_cursor, 0); + } + + #[test] + fn recent_scan_step_advances_cursor_in_chunks_and_wraps() { + let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES; + let frontier = lookahead * 2 + 500; + + let (start1, end1, cursor1) = SyncManager::recent_scan_step(frontier, 0); + assert_eq!((start1, end1, cursor1), (0, lookahead, lookahead)); + + let (start2, end2, cursor2) = SyncManager::recent_scan_step(frontier, cursor1); + assert_eq!( + (start2, end2, cursor2), + (lookahead, lookahead * 2, lookahead * 2) + ); + + let (start3, end3, cursor3) = SyncManager::recent_scan_step(frontier, cursor2); + assert_eq!((start3, end3, cursor3), (lookahead * 2, frontier, 0)); + + let (start4, end4, cursor4) = SyncManager::recent_scan_step(frontier, cursor3); + assert_eq!((start4, end4, cursor4), (0, lookahead, lookahead)); + } + + #[test] + fn recent_scan_step_resets_stale_cursor_to_zero() { + let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES; + let frontier = lookahead; + let (start, end, next_cursor) = SyncManager::recent_scan_step(frontier, lookahead * 2); + assert_eq!((start, end, next_cursor), (0, frontier, 0)); + } + + #[test] + fn recent_scan_step_zero_frontier_is_empty() { + assert_eq!(SyncManager::recent_scan_step(0, 0), (0, 0, 0)); + assert_eq!(SyncManager::recent_scan_step(0, 123), (0, 0, 0)); + } +} diff --git a/daemon/src/transaction_generator.rs b/daemon/src/transaction_generator.rs index a96ab3d..f7ce1a1 100644 --- a/daemon/src/transaction_generator.rs +++ b/daemon/src/transaction_generator.rs @@ -1,5 +1,5 @@ use crate::address_manager::AddressManager; -use crate::utxo_manager::UtxoManager; +use crate::utxo_manager::{UtxoManager, UtxoStateView}; use common::errors::WalletError::{SanityCheckFailed, UserInputError}; use common::errors::{ResultExt, WalletError, WalletResult}; use common::keys::Keys; @@ -24,7 +24,7 @@ use proto::kaswallet_proto::{FeePolicy, Outpoint, TransactionDescription, fee_po use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::Mutex; // The current minimal fee rate according to mempool standards const MIN_FEE_RATE: f64 = 1.0; @@ -69,7 +69,8 @@ impl TransactionGenerator { pub async fn create_unsigned_transactions( &mut self, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_manager: &UtxoManager, + utxo_state: &UtxoStateView, transaction_description: TransactionDescription, ) -> WalletResult> { let validate_address = |address_string, name| -> WalletResult
{ @@ -113,9 +114,9 @@ impl TransactionGenerator { HashMap::new() } else { let mut preselected_utxos = HashMap::new(); - let utxos_by_outpoint = utxo_manager.utxos_by_outpoint(); for preselected_outpoint in &transaction_description.utxos { - if let Some(utxo) = utxos_by_outpoint.get(&preselected_outpoint.clone().into()) { + let outpoint: WalletOutpoint = preselected_outpoint.clone().into(); + if let Some(utxo) = utxo_state.get_utxo_by_outpoint(&outpoint) { preselected_utxos.insert(utxo.outpoint.clone(), utxo.clone()); } else { return Err(UserInputError(format!( @@ -145,6 +146,7 @@ impl TransactionGenerator { (selected_utxos, amount_sent_to_recipient, change_sompi) = self .select_utxos( utxo_manager, + utxo_state, &preselected_utxos, transaction_description.amount, transaction_description.is_send_all, @@ -178,6 +180,7 @@ impl TransactionGenerator { let unsigned_transactions = self .maybe_auto_compound_transaction( utxo_manager, + utxo_state, unsigned_transaction, &selected_utxos, from_addresses, @@ -198,7 +201,8 @@ impl TransactionGenerator { #[allow(clippy::too_many_arguments)] async fn maybe_auto_compound_transaction( &self, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_manager: &UtxoManager, + utxo_state: &UtxoStateView, original_wallet_transaction: WalletSignableTransaction, original_selected_utxos: &Vec, from_addresses: Vec<&WalletAddress>, @@ -267,6 +271,7 @@ impl TransactionGenerator { let merge_transaction = self .merge_transaction( utxo_manager, + utxo_state, &split_transactions, &original_consensus_transaction.tx, original_selected_utxos, @@ -285,6 +290,7 @@ impl TransactionGenerator { // Recursion will be 2-3 iterations deep even in the rarest cases, so considered safe... let split_merge_transaction = Box::pin(self.maybe_auto_compound_transaction( utxo_manager, + utxo_state, merge_transaction, original_selected_utxos, from_addresses, @@ -309,7 +315,8 @@ impl TransactionGenerator { #[allow(clippy::too_many_arguments)] async fn merge_transaction( &self, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_manager: &UtxoManager, + utxo_state: &UtxoStateView, split_transactions: &[WalletSignableTransaction], original_consensus_transaction: &Transaction, original_selected_utxos: &[WalletUtxo], @@ -412,6 +419,7 @@ impl TransactionGenerator { let (additional_utxos, total_value_added) = self .more_utxos_for_merge_transaction( utxo_manager, + utxo_state, original_consensus_transaction, original_selected_utxos, from_addresses, @@ -458,9 +466,11 @@ impl TransactionGenerator { } // Returns: (additional_utxos, total_Value_added) + #[allow(clippy::too_many_arguments)] async fn more_utxos_for_merge_transaction( &self, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_manager: &UtxoManager, + utxo_state: &UtxoStateView, original_consensus_transaction: &Transaction, original_selected_utxos: &[WalletUtxo], from_addresses: &[&WalletAddress], @@ -478,20 +488,30 @@ impl TransactionGenerator { .await; let fee_per_input = (mass_per_input as f64 * fee_rate).ceil() as u64; - let utxos_sorted_by_amount = utxo_manager.utxos_sorted_by_amount(); - let already_selected_utxos = - HashSet::::from_iter(original_selected_utxos.iter().cloned()); + let already_selected_outpoints = HashSet::::from_iter( + original_selected_utxos + .iter() + .map(|utxo| utxo.outpoint.clone()), + ); + + let from_addresses_set: Option> = if from_addresses.is_empty() { + None + } else { + Some(from_addresses.iter().map(|wa| (*wa).clone()).collect()) + }; let mut additional_utxos = vec![]; let mut total_value_added = 0; - for utxo in utxos_sorted_by_amount { - if already_selected_utxos.contains(&utxo) - || utxo_manager.is_utxo_pending(&utxo, dag_info.virtual_daa_score) + for utxo in utxo_state.utxos_sorted_by_amount() { + if already_selected_outpoints.contains(&utxo.outpoint) + || utxo_manager.is_utxo_pending(utxo, dag_info.virtual_daa_score) { continue; } - if !from_addresses.is_empty() && !from_addresses.contains(&&utxo.address) { - continue; + if let Some(ref allowed_set) = from_addresses_set { + if !allowed_set.contains(&utxo.address) { + continue; + } } additional_utxos.push(utxo.clone()); @@ -784,7 +804,8 @@ impl TransactionGenerator { #[allow(clippy::too_many_arguments)] pub async fn select_utxos( &mut self, - utxo_manager: &MutexGuard<'_, UtxoManager>, + utxo_manager: &UtxoManager, + utxo_state: &UtxoStateView, preselected_utxos: &HashMap, amount: u64, is_send_all: bool, @@ -804,6 +825,12 @@ impl TransactionGenerator { let mut total_value = 0; let mut selected_utxos = vec![]; + let from_addresses_set: Option> = if from_addresses.is_empty() { + None + } else { + Some(from_addresses.iter().map(|wa| (*wa).clone()).collect()) + }; + let dag_info = self .kaspa_client .get_block_dag_info() @@ -813,11 +840,12 @@ impl TransactionGenerator { let mut fee = 0; let mut fee_per_utxo = None; let mut iteration = async |transaction_generator: &mut TransactionGenerator, - utxo_manager: &MutexGuard, utxo: &WalletUtxo| -> WalletResult { - if !from_addresses.is_empty() && !from_addresses.contains(&&utxo.address) { - return Ok(true); + if let Some(ref allowed_set) = from_addresses_set { + if !allowed_set.contains(&utxo.address) { + return Ok(true); + } } if utxo_manager.is_utxo_pending(utxo, dag_info.virtual_daa_score) { return Ok(true); @@ -858,16 +886,19 @@ impl TransactionGenerator { } Ok(true) }; - let owned_utxos = utxo_manager.utxos_sorted_by_amount(); - let available_utxos: Vec<_> = if !preselected_utxos.is_empty() { - preselected_utxos.values().collect() + if !preselected_utxos.is_empty() { + for utxo in preselected_utxos.values() { + let should_continue = iteration(self, utxo).await?; + if !should_continue { + break; + } + } } else { - owned_utxos.iter().collect() - }; - for utxo in available_utxos { - let should_continue = iteration(self, utxo_manager, utxo).await?; - if !should_continue { - break; + for utxo in utxo_state.utxos_sorted_by_amount() { + let should_continue = iteration(self, utxo).await?; + if !should_continue { + break; + } } } diff --git a/daemon/src/utxo_manager.rs b/daemon/src/utxo_manager.rs index 5d43d64..bbbfd98 100644 --- a/daemon/src/utxo_manager.rs +++ b/daemon/src/utxo_manager.rs @@ -1,24 +1,250 @@ -use crate::address_manager::{AddressManager, AddressSet}; -use common::model::{WalletOutpoint, WalletSignableTransaction, WalletUtxo, WalletUtxoEntry}; -use itertools::Itertools; +use crate::address_manager::AddressManager; +use common::model::{ + WalletAddress, WalletOutpoint, WalletSignableTransaction, WalletUtxo, WalletUtxoEntry, +}; +use kaspa_addresses::Address; use kaspa_consensus_core::config::params::Params; use kaspa_rpc_core::{GetBlockDagInfoResponse, RpcMempoolEntryByAddress, RpcUtxosByAddressesEntry}; use std::collections::{HashMap, HashSet}; use std::error::Error; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; + +#[derive(Debug, Default)] +pub struct UtxoState { + pub(crate) utxos_by_outpoint: HashMap, + // Sorted by (amount, outpoint) so the outpoint can be used as a deterministic tiebreaker. + pub(crate) utxo_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)>, +} + +impl UtxoState { + pub fn new_empty() -> Self { + Self::default() + } + + pub fn utxo_count(&self) -> usize { + self.utxos_by_outpoint.len() + } + + pub fn utxos_by_outpoint(&self) -> &HashMap { + &self.utxos_by_outpoint + } + + pub fn get_utxo_by_outpoint(&self, outpoint: &WalletOutpoint) -> Option<&WalletUtxo> { + self.utxos_by_outpoint.get(outpoint) + } + + pub fn utxos_sorted_by_amount(&self) -> impl Iterator + '_ { + self.utxo_keys_sorted_by_amount.iter().map(|(_, outpoint)| { + self.utxos_by_outpoint + .get(outpoint) + .expect("utxo_keys_sorted_by_amount contains unknown outpoint") + }) + } +} + +pub struct UtxoStateView { + base_state: Arc, + removed_utxos: HashSet, + added_utxos: HashMap, + added_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)>, +} + +impl UtxoStateView { + pub fn new(base_state: Arc) -> Self { + Self { + base_state, + removed_utxos: HashSet::new(), + added_utxos: HashMap::new(), + added_keys_sorted_by_amount: Vec::new(), + } + } + + pub fn from_mempool_overlay( + base_state: Arc, + mempool_txs: &[WalletSignableTransaction], + address_map: &HashMap, + ) -> Self { + let mut removed_utxos = HashSet::new(); + let mut added_utxos = HashMap::new(); + let mut added_keys_sorted_by_amount = Vec::new(); + + for tx in mempool_txs { + let consensus_tx = &tx.transaction.unwrap_ref().tx; + + // Inputs are removed from the effective UTXO set. + for input in &consensus_tx.inputs { + removed_utxos.insert(input.previous_outpoint.into()); + } + + // Outputs are added if they pay to one of our addresses. + for (i, output) in consensus_tx.outputs.iter().enumerate() { + let kaspa_address = &tx.address_by_output_index[i]; + let Some(wallet_address) = address_map.get(kaspa_address) else { + continue; + }; + + let outpoint = WalletOutpoint { + transaction_id: consensus_tx.id(), + index: i as u32, + }; + let utxo_entry = WalletUtxoEntry { + amount: output.value, + script_public_key: output.script_public_key.clone(), + block_daa_score: 0, + is_coinbase: false, + }; + let utxo = WalletUtxo::new(outpoint.clone(), utxo_entry, wallet_address.clone()); + + let previous = added_utxos.insert(outpoint.clone(), utxo); + debug_assert!( + previous.is_none(), + "mempool overlay inserted outpoint twice" + ); + added_keys_sorted_by_amount.push((output.value, outpoint)); + } + } + + added_keys_sorted_by_amount.sort_unstable(); + + Self { + base_state, + removed_utxos, + added_utxos, + added_keys_sorted_by_amount, + } + } + + pub fn base_state(&self) -> &Arc { + &self.base_state + } + + pub fn utxo_count(&self) -> usize { + let removed_in_base = self + .removed_utxos + .iter() + .filter(|outpoint| self.base_state.utxos_by_outpoint.contains_key(*outpoint)) + .count(); + self.base_state.utxos_by_outpoint.len() - removed_in_base + self.added_utxos.len() + } + + pub fn get_utxo_by_outpoint(&self, outpoint: &WalletOutpoint) -> Option<&WalletUtxo> { + if self.removed_utxos.contains(outpoint) { + return None; + } + if let Some(utxo) = self.added_utxos.get(outpoint) { + return Some(utxo); + } + self.base_state.utxos_by_outpoint.get(outpoint) + } + + pub fn utxos_iter(&self) -> impl Iterator + '_ { + let base_iter = self + .base_state + .utxos_by_outpoint + .values() + .filter(|utxo| !self.removed_utxos.contains(&utxo.outpoint)); + base_iter.chain(self.added_utxos.values()) + } + + pub fn utxos_sorted_by_amount(&self) -> UtxosSortedByAmountIter<'_> { + UtxosSortedByAmountIter { + view: self, + base_index: 0, + added_index: 0, + } + } +} + +pub struct UtxosSortedByAmountIter<'a> { + view: &'a UtxoStateView, + base_index: usize, + added_index: usize, +} + +impl<'a> Iterator for UtxosSortedByAmountIter<'a> { + type Item = &'a WalletUtxo; + + fn next(&mut self) -> Option { + let base_keys = self.view.base_state.utxo_keys_sorted_by_amount.as_slice(); + let added_keys = self.view.added_keys_sorted_by_amount.as_slice(); + + let next_base = loop { + if self.base_index >= base_keys.len() { + break None; + } + let (amount, outpoint) = &base_keys[self.base_index]; + if self.view.removed_utxos.contains(outpoint) { + self.base_index += 1; + continue; + } + break Some((amount, outpoint)); + }; + + let next_added = if self.added_index < added_keys.len() { + let (amount, outpoint) = &added_keys[self.added_index]; + Some((amount, outpoint)) + } else { + None + }; + + match (next_base, next_added) { + (None, None) => None, + (Some((_amount, outpoint)), None) => { + self.base_index += 1; + Some( + self.view + .base_state + .utxos_by_outpoint + .get(outpoint) + .expect("utxo_keys_sorted_by_amount contains unknown outpoint"), + ) + } + (None, Some((_amount, outpoint))) => { + self.added_index += 1; + Some( + self.view + .added_utxos + .get(outpoint) + .expect("added_keys_sorted_by_amount contains unknown outpoint"), + ) + } + (Some((base_amount, base_outpoint)), Some((added_amount, added_outpoint))) => { + let base_key = (*base_amount, (*base_outpoint).clone()); + let added_key = (*added_amount, (*added_outpoint).clone()); + if base_key <= added_key { + self.base_index += 1; + Some( + self.view + .base_state + .utxos_by_outpoint + .get(base_outpoint) + .expect("utxo_keys_sorted_by_amount contains unknown outpoint"), + ) + } else { + self.added_index += 1; + Some( + self.view + .added_utxos + .get(added_outpoint) + .expect("added_keys_sorted_by_amount contains unknown outpoint"), + ) + } + } + } + } +} pub struct UtxoManager { address_manager: Arc>, coinbase_maturity: u64, // Is different in testnet - utxos_sorted_by_amount: Vec, - utxos_by_outpoint: HashMap, - // Since the sync stage of address collection only picks up on addresses that have accepted - // (non-mempool) balance, we might miss some mempool outputs that are not yet accepted. - // To mitigate this we maintain a list of mempool transactions generated by this wallet - // that should be accepted soon, but are not yet accepted by consensus. - mempool_transactions: Vec, + // Consensus snapshot (already includes node mempool effects from refresh). + state: RwLock>, + + // Wallet-generated, not-yet-accepted transactions. Applied as a lightweight overlay. + // Stored separately because cloning the whole UTXO set per mempool tx is not viable at scale. + mempool_transactions: Mutex>, } impl UtxoManager { @@ -34,96 +260,55 @@ impl UtxoManager { Self { address_manager, coinbase_maturity, - utxos_sorted_by_amount: Vec::new(), - utxos_by_outpoint: HashMap::new(), - mempool_transactions: Vec::new(), + state: RwLock::new(Arc::new(UtxoState::new_empty())), + mempool_transactions: Mutex::new(Vec::new()), } } - pub fn utxos_sorted_by_amount(&self) -> Vec { - self.utxos_sorted_by_amount.clone() - } - - pub fn utxos_by_outpoint(&self) -> HashMap { - self.utxos_by_outpoint.clone() + #[cfg(any(test, feature = "bench"))] + pub fn new_for_bench(address_manager: Arc>) -> Self { + Self { + address_manager, + coinbase_maturity: 0, + state: RwLock::new(Arc::new(UtxoState::new_empty())), + mempool_transactions: Mutex::new(Vec::new()), + } } - pub async fn add_mempool_transaction(&mut self, transaction: &WalletSignableTransaction) { - self.mempool_transactions.push(transaction.clone()); - self.apply_mempool_transaction(transaction).await; + pub async fn state(&self) -> Arc { + let guard = self.state.read().await; + Arc::clone(&*guard) } - async fn apply_mempool_transaction(&mut self, transaction: &WalletSignableTransaction) { - let tx = &transaction.transaction.unwrap_ref().tx; + pub async fn state_with_mempool(&self) -> Result> { + let base_state = self.state().await; - for input in &tx.inputs { - let outpoint = input.previous_outpoint; - self.remove_utxo(&outpoint.into()); - } - - for (i, output) in tx.outputs.iter().enumerate() { - let address_string = transaction.address_by_output_index[i].to_string(); - let wallet_address = { - let address_manager = self.address_manager.lock().await; - address_manager - .wallet_address_from_string(&address_string) - .await - }; - if wallet_address.is_none() { - // this means payment is not to this wallet - continue; + let mempool_txs = { + let guard = self.mempool_transactions.lock().await; + if guard.is_empty() { + return Ok(UtxoStateView::new(base_state)); } - let wallet_address = wallet_address.unwrap(); - let outpoint = WalletOutpoint { - transaction_id: tx.id(), - index: i as u32, - }; - let utxo = WalletUtxo::new( - outpoint.clone(), - WalletUtxoEntry { - amount: output.value, - script_public_key: output.script_public_key.clone(), - block_daa_score: 0, - is_coinbase: false, - }, - wallet_address, - ); - self.insert_utxo(outpoint, utxo); - } - } - - fn insert_utxo(&mut self, outpoint: WalletOutpoint, utxo: WalletUtxo) { - self.utxos_by_outpoint.insert(outpoint, utxo.clone()); - let position = self - .utxos_sorted_by_amount - .binary_search_by(|existing_utxo| { - existing_utxo.utxo_entry.amount.cmp(&utxo.utxo_entry.amount) - }); - let position = position.unwrap_or_else(|position| position); - self.utxos_sorted_by_amount.insert(position, utxo); - } + guard.clone() + }; - fn contains_utxo(&self, outpoint: &WalletOutpoint) -> bool { - self.utxos_by_outpoint.contains_key(outpoint) - } - - fn remove_utxo(&mut self, outpoint: &WalletOutpoint) { - self.utxos_by_outpoint.remove(outpoint).unwrap(); - let (position, _) = self - .utxos_sorted_by_amount - .iter() - .find_position(|existing_utxo| existing_utxo.outpoint.eq(outpoint)) - .unwrap(); - self.utxos_sorted_by_amount.remove(position); + // Map Address -> WalletAddress using the cached map (no per-output string parsing). + let address_map: Arc> = { + let address_manager = self.address_manager.lock().await; + address_manager.monitored_address_map().await? + }; + + Ok(UtxoStateView::from_mempool_overlay( + base_state, + &mempool_txs, + &address_map, + )) } pub async fn update_utxo_set( - &mut self, + &self, rpc_utxo_entries: Vec, rpc_mempool_utxo_entries: Vec, ) -> Result<(), Box> { - let mut wallet_utxos: Vec = vec![]; - let mut exclude: HashSet = HashSet::new(); for rpc_mempool_entries_by_address in &rpc_mempool_utxo_entries { for sending_rpc_mempool_entry in &rpc_mempool_entries_by_address.sending { @@ -133,27 +318,41 @@ impl UtxoManager { } } - let address_set: AddressSet; - { + let address_map: Arc> = { let address_manager = self.address_manager.lock().await; - address_set = address_manager.address_set().await; - } + address_manager.monitored_address_map().await? + }; - for rpc_utxo_entry in &rpc_utxo_entries { + // Build new state without holding any UTXO locks. + let mut utxos_by_outpoint: HashMap = + HashMap::with_capacity(rpc_utxo_entries.len()); + let mut utxo_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)> = + Vec::with_capacity(rpc_utxo_entries.len()); + + for rpc_utxo_entry in rpc_utxo_entries { let wallet_outpoint: WalletOutpoint = rpc_utxo_entry.outpoint.into(); if exclude.contains(&wallet_outpoint) { continue; } - let wallet_utxo_entry: WalletUtxoEntry = rpc_utxo_entry.utxo_entry.clone().into(); + let wallet_utxo_entry: WalletUtxoEntry = rpc_utxo_entry.utxo_entry.into(); + let amount = wallet_utxo_entry.amount; + + let Some(address) = rpc_utxo_entry.address else { + continue; + }; + let wallet_address = address_map + .get(&address) + .ok_or_else(|| format!("UTXO address {} not found in wallet address_set", address))? + .clone(); - let address = address_set - .get(&rpc_utxo_entry.address.as_ref().unwrap().to_string()) - .unwrap(); + let wallet_utxo = + WalletUtxo::new(wallet_outpoint.clone(), wallet_utxo_entry, wallet_address); - let wallet_utxo = WalletUtxo::new(wallet_outpoint, wallet_utxo_entry, address.clone()); + let previous = utxos_by_outpoint.insert(wallet_outpoint.clone(), wallet_utxo); + debug_assert!(previous.is_none(), "UTXO outpoint inserted twice"); - wallet_utxos.push(wallet_utxo); + utxo_keys_sorted_by_amount.push((amount, wallet_outpoint)); } for rpc_mempool_entry in rpc_mempool_utxo_entries { @@ -166,18 +365,16 @@ impl UtxoManager { let Some(output_verbose_data) = &output.verbose_data else { panic!("output verbose data missing") }; - let address_manager = self.address_manager.lock().await; - let address = address_manager - .wallet_address_from_string( - &output_verbose_data - .script_public_key_address - .address_to_string(), - ) - .await; - if address.is_none() { + let address_string = output_verbose_data + .script_public_key_address + .address_to_string(); + let address = Address::try_from(address_string.as_str()).map_err(|err| { + format!("invalid address in mempool output ({address_string}): {err}") + })?; + let Some(wallet_address) = address_map.get(&address) else { // this means this output is not to this wallet continue; - } + }; let wallet_outpoint = WalletOutpoint::new(transaction_verbose_data.transaction_id, i as u32); @@ -192,53 +389,274 @@ impl UtxoManager { false, ); - let utxo = WalletUtxo::new(wallet_outpoint, utxo_entry, address.unwrap()); + let utxo = WalletUtxo::new( + wallet_outpoint.clone(), + utxo_entry, + wallet_address.clone(), + ); - wallet_utxos.push(utxo); + let previous = utxos_by_outpoint.insert(wallet_outpoint.clone(), utxo); + debug_assert!(previous.is_none(), "mempool outpoint inserted twice"); + utxo_keys_sorted_by_amount.push((output.value, wallet_outpoint)); } } } - self.update_utxos_sorted_by_amount(wallet_utxos.clone()); - self.update_utxos_by_outpoint(wallet_utxos); + utxo_keys_sorted_by_amount.sort_unstable(); + let new_state = Arc::new(UtxoState { + utxos_by_outpoint, + utxo_keys_sorted_by_amount, + }); - self.apply_mempool_transactions_after_update().await; + // Swap the Arc pointer under a brief write lock. + { + let mut guard = self.state.write().await; + *guard = new_state.clone(); + } + + self.prune_mempool_transactions_after_update(&new_state) + .await; Ok(()) } - async fn apply_mempool_transactions_after_update(&mut self) { - let previous_mempool_transactions = std::mem::take(&mut self.mempool_transactions); - self.mempool_transactions = vec![]; - 'outer: for transaction in &previous_mempool_transactions { + pub async fn add_mempool_transaction(&self, transaction: &WalletSignableTransaction) { + let mut mempool = self.mempool_transactions.lock().await; + mempool.push(transaction.clone()); + } + + async fn prune_mempool_transactions_after_update(&self, new_state: &UtxoState) { + let mut mempool = self.mempool_transactions.lock().await; + mempool.retain(|transaction| { for input in transaction.transaction.unwrap_ref().tx.inputs.iter() { let outpoint = input.previous_outpoint; - if !self.contains_utxo(&outpoint.into()) { - // this means this transaction was either accepted or double-spent - continue 'outer; + if !new_state.utxos_by_outpoint.contains_key(&outpoint.into()) { + // Transaction is either accepted (now covered by RPC mempool snapshot) or double-spent. + return false; } } - self.add_mempool_transaction(transaction).await; + true + }); + } + + pub fn is_utxo_pending(&self, utxo: &WalletUtxo, virtual_daa_score: u64) -> bool { + if !utxo.utxo_entry.is_coinbase { + return false; } + + utxo.utxo_entry.block_daa_score + self.coinbase_maturity > virtual_daa_score } +} - fn update_utxos_sorted_by_amount(&mut self, mut wallet_utxos: Vec) { - wallet_utxos.sort_by(|a, b| a.utxo_entry.amount.cmp(&b.utxo_entry.amount)); - self.utxos_sorted_by_amount = wallet_utxos; +#[cfg(test)] +mod tests { + use super::UtxoManager; + use crate::address_manager::AddressManager; + use common::keys::Keys; + use common::model::{Keychain, WalletAddress, WalletOutpoint, WalletSignableTransaction}; + use kaspa_addresses::{Address, Prefix, Version}; + use kaspa_bip32::Prefix as XPubPrefix; + use kaspa_consensus_core::tx::{ + ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, + TransactionOutput, UtxoEntry, + }; + use kaspa_hashes::Hash; + use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry}; + use std::collections::HashSet; + use std::sync::Arc; + use tokio::sync::Mutex; + + fn txid(i: u32) -> Hash { + let mut bytes = [0u8; 32]; + bytes[..4].copy_from_slice(&i.to_le_bytes()); + Hash::from_bytes(bytes) } - fn update_utxos_by_outpoint(&mut self, wallet_utxos: Vec) { - self.utxos_by_outpoint.clear(); - for wallet_utxo in wallet_utxos { - self.utxos_by_outpoint - .insert(wallet_utxo.outpoint.clone(), wallet_utxo); + fn make_address(prefix: Prefix, i: u32) -> Address { + let mut payload = [0u8; 32]; + payload[..4].copy_from_slice(&i.to_le_bytes()); + Address::new(prefix, Version::PubKey, &payload) + } + + fn make_outpoint(i: u32) -> RpcTransactionOutpoint { + RpcTransactionOutpoint { + transaction_id: txid(i), + index: i, } } - pub fn is_utxo_pending(&self, utxo: &WalletUtxo, virtual_daa_score: u64) -> bool { - if !utxo.utxo_entry.is_coinbase { - return false; + fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry { + RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false) + } + + fn make_rpc_utxo(i: u32, address: Address) -> RpcUtxosByAddressesEntry { + let amount = ((i % 10_000) + 1) as u64; + RpcUtxosByAddressesEntry { + address: Some(address), + outpoint: make_outpoint(i), + utxo_entry: make_rpc_utxo_entry(amount), } + } - utxo.utxo_entry.block_daa_score + self.coinbase_maturity > virtual_daa_score + #[tokio::test] + async fn update_utxo_set_produces_sorted_index() { + let keys = Arc::new(Keys::new( + "unused".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + let address_manager = AddressManager::new(keys, Prefix::Mainnet); + let address = make_address(Prefix::Mainnet, 1); + let wa = WalletAddress::new(1, 0, Keychain::External); + address_manager + .insert_address_for_bench(address.clone(), wa) + .await; + + let address_manager = Arc::new(Mutex::new(address_manager)); + let utxo_manager = UtxoManager::new_for_bench(address_manager); + + let entries = vec![ + make_rpc_utxo(1, address.clone()), // amount 2 + make_rpc_utxo(2, address.clone()), // amount 3 + make_rpc_utxo(10_000, address.clone()), // amount 1 + ]; + + utxo_manager.update_utxo_set(entries, vec![]).await.unwrap(); + + let state = utxo_manager.state().await; + let amounts: Vec = state + .utxos_sorted_by_amount() + .map(|utxo| utxo.utxo_entry.amount) + .collect(); + assert_eq!(amounts, vec![1, 2, 3]); + } + + #[tokio::test] + async fn state_snapshots_remain_valid_after_update() { + let keys = Arc::new(Keys::new( + "unused".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + let address_manager = AddressManager::new(keys, Prefix::Mainnet); + let address = make_address(Prefix::Mainnet, 1); + let wa = WalletAddress::new(1, 0, Keychain::External); + address_manager + .insert_address_for_bench(address.clone(), wa) + .await; + + let address_manager = Arc::new(Mutex::new(address_manager)); + let utxo_manager = UtxoManager::new_for_bench(address_manager); + + utxo_manager + .update_utxo_set(vec![make_rpc_utxo(1, address.clone())], vec![]) + .await + .unwrap(); + let old_state = utxo_manager.state().await; + assert_eq!(old_state.utxo_count(), 1); + + utxo_manager + .update_utxo_set( + vec![ + make_rpc_utxo(1, address.clone()), + make_rpc_utxo(2, address.clone()), + ], + vec![], + ) + .await + .unwrap(); + let new_state = utxo_manager.state().await; + assert_eq!(new_state.utxo_count(), 2); + + // Old snapshot remains valid and unchanged. + assert_eq!(old_state.utxo_count(), 1); + } + + #[tokio::test] + async fn state_with_mempool_overlays_wallet_transactions() { + let keys = Arc::new(Keys::new( + "unused".to_string(), + 1, + vec![], + XPubPrefix::XPUB, + vec![], + 0, + 0, + 1, + 0, + )); + let address_manager = AddressManager::new(keys, Prefix::Mainnet); + let address = make_address(Prefix::Mainnet, 1); + let wa = WalletAddress::new(1, 0, Keychain::External); + address_manager + .insert_address_for_bench(address.clone(), wa.clone()) + .await; + + let address_manager = Arc::new(Mutex::new(address_manager)); + let utxo_manager = UtxoManager::new_for_bench(address_manager); + + // Base state: one confirmed UTXO. + utxo_manager + .update_utxo_set(vec![make_rpc_utxo(1, address.clone())], vec![]) + .await + .unwrap(); + let base_state = utxo_manager.state().await; + let base_outpoint: WalletOutpoint = make_outpoint(1).into(); + let base_utxo = base_state + .get_utxo_by_outpoint(&base_outpoint) + .expect("base utxo missing"); + + // Local wallet tx spends that UTXO and creates one output to our address. + let input = TransactionInput::new( + TransactionOutpoint::new(base_outpoint.transaction_id, base_outpoint.index), + vec![], + 0, + 1, + ); + let output = TransactionOutput::new(1, ScriptPublicKey::from_vec(0, vec![])); + let tx = Transaction::new( + 0, + vec![input], + vec![output], + 0, + Default::default(), + 0, + vec![], + ); + let input_entry: UtxoEntry = base_utxo.utxo_entry.clone().into(); + let signable = SignableTransaction::with_entries(tx, vec![input_entry]); + let wallet_tx = WalletSignableTransaction::new_from_unsigned( + signable, + HashSet::new(), + vec![wa], + vec![address.clone()], + ); + + utxo_manager.add_mempool_transaction(&wallet_tx).await; + + let view = utxo_manager.state_with_mempool().await.unwrap(); + + // View hides the spent outpoint but base snapshot remains unchanged. + assert!(view.get_utxo_by_outpoint(&base_outpoint).is_none()); + assert!(base_state.get_utxo_by_outpoint(&base_outpoint).is_some()); + + // View includes the newly created outpoint. + let created_outpoint = WalletOutpoint { + transaction_id: wallet_tx.transaction.unwrap_ref().tx.id(), + index: 0, + }; + assert!(view.get_utxo_by_outpoint(&created_outpoint).is_some()); + assert!(base_state.get_utxo_by_outpoint(&created_outpoint).is_none()); } }