diff --git a/Cargo.toml b/Cargo.toml
index 2b4dbf3..a03826b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -82,3 +82,4 @@ rust_decimal = "1.36"
# Pin wasm-bindgen/js-sys to avoid version mismatch with rusty-kaspa transitive deps (ENG-746)
wasm-bindgen = "=0.2.100"
js-sys = "=0.3.70"
+criterion = "0.5.1"
diff --git a/common/src/addresses.rs b/common/src/addresses.rs
index 8802567..28b1e42 100644
--- a/common/src/addresses.rs
+++ b/common/src/addresses.rs
@@ -28,7 +28,20 @@ pub fn multisig_address(
) -> WalletResult
{
let mut sorted_extended_public_keys = extended_public_keys.as_ref().clone();
sorted_extended_public_keys.sort();
+ multisig_address_from_sorted_keys(
+ &sorted_extended_public_keys,
+ minimum_signatures,
+ prefix,
+ derivation_path,
+ )
+}
+pub fn multisig_address_from_sorted_keys(
+ sorted_extended_public_keys: &[ExtendedPublicKey],
+ minimum_signatures: usize,
+ prefix: Prefix,
+ derivation_path: &DerivationPath,
+) -> WalletResult {
let mut signing_public_keys = Vec::with_capacity(sorted_extended_public_keys.len());
for x_public_key in sorted_extended_public_keys.iter() {
let derived_key = x_public_key
@@ -46,3 +59,46 @@ pub fn multisig_address(
.to_wallet_result_internal()?;
Ok(address)
}
+
+#[cfg(test)]
+mod tests {
+ use super::{multisig_address, multisig_address_from_sorted_keys};
+ use crate::keys::master_key_path;
+ use kaspa_addresses::Prefix;
+ use kaspa_bip32::secp256k1::SecretKey;
+ use kaspa_bip32::{DerivationPath, ExtendedPrivateKey, Language, Mnemonic};
+ use std::sync::Arc;
+
+ fn xpub_from_mnemonic(
+ phrase: &str,
+ ) -> kaspa_bip32::ExtendedPublicKey {
+ let mnemonic = Mnemonic::new(phrase, Language::English).unwrap();
+ let seed = mnemonic.to_seed("");
+ let xprv = ExtendedPrivateKey::::new(seed).unwrap();
+ let xprv = xprv.derive_path(&master_key_path(true)).unwrap();
+ xprv.public_key()
+ }
+
+ #[test]
+ fn multisig_address_sorted_helper_matches_existing_function() {
+ let xpub1 = xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ );
+ let xpub2 = xpub_from_mnemonic(
+ "legal winner thank year wave sausage worth useful legal winner thank yellow",
+ );
+
+ let derivation_path: DerivationPath = "m/0/0/0".parse().unwrap();
+ let unsorted = Arc::new(vec![xpub2.clone(), xpub1.clone()]);
+
+ let expected = multisig_address(unsorted, 2, Prefix::Devnet, &derivation_path).unwrap();
+
+ let mut sorted = vec![xpub2, xpub1];
+ sorted.sort();
+ let actual =
+ multisig_address_from_sorted_keys(&sorted, 2, Prefix::Devnet, &derivation_path)
+ .unwrap();
+
+ assert_eq!(expected.to_string(), actual.to_string());
+ }
+}
diff --git a/common/src/model.rs b/common/src/model.rs
index 3d91999..64e3fbb 100644
--- a/common/src/model.rs
+++ b/common/src/model.rs
@@ -37,7 +37,7 @@ impl WalletAddress {
}
}
-#[derive(Clone, Debug, Hash, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct WalletOutpoint {
pub transaction_id: Hash,
pub index: u32,
diff --git a/common/src/proto_convert.rs b/common/src/proto_convert.rs
index 1b1442b..9fb0ecb 100644
--- a/common/src/proto_convert.rs
+++ b/common/src/proto_convert.rs
@@ -128,6 +128,20 @@ impl From for ProtoUtxoEntry {
}
}
+impl From<&WalletUtxoEntry> for ProtoUtxoEntry {
+ fn from(value: &WalletUtxoEntry) -> ProtoUtxoEntry {
+ ProtoUtxoEntry {
+ amount: value.amount,
+ script_public_key: Some(ProtoScriptPublicKey {
+ version: value.script_public_key.version as u32,
+ script_public_key: hex::encode(value.script_public_key.script()),
+ }),
+ block_daa_score: value.block_daa_score,
+ is_coinbase: value.is_coinbase,
+ }
+ }
+}
+
pub fn utxo_entry_to_proto(value: UtxoEntry) -> ProtoUtxoEntry {
ProtoUtxoEntry {
amount: value.amount,
@@ -148,6 +162,15 @@ pub fn utxo_entry_from_proto(value: ProtoUtxoEntry) -> UtxoEntry {
}
impl WalletUtxo {
+ pub fn to_proto(&self, is_pending: bool, is_dust: bool) -> ProtoUtxo {
+ ProtoUtxo {
+ outpoint: Some(self.outpoint.clone().into()),
+ utxo_entry: Some(ProtoUtxoEntry::from(&self.utxo_entry)),
+ is_pending,
+ is_dust,
+ }
+ }
+
pub fn into_proto(self, is_pending: bool, is_dust: bool) -> ProtoUtxo {
ProtoUtxo {
outpoint: Some(self.outpoint.into()),
diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml
index bda45e2..b319f99 100644
--- a/daemon/Cargo.toml
+++ b/daemon/Cargo.toml
@@ -12,6 +12,11 @@ repository.workspace = true
name = "kaswallet-daemon"
path = "src/main.rs"
+[[bin]]
+name = "kaswallet-stress-bench"
+path = "src/bin/kaswallet_stress_bench.rs"
+required-features = ["bench"]
+
[dependencies]
kaswallet-common.workspace = true
kaswallet-proto.workspace = true
@@ -36,7 +41,31 @@ thiserror.workspace = true
wasm-bindgen.workspace = true
js-sys.workspace = true
+[features]
+bench = []
+
[dev-dependencies]
kaspa-consensus.workspace = true
kaspa-hashes.workspace = true
-rstest.workspace = true
\ No newline at end of file
+rstest.workspace = true
+criterion.workspace = true
+
+[[bench]]
+name = "address_scaling"
+harness = false
+required-features = ["bench"]
+
+[[bench]]
+name = "utxo_scaling"
+harness = false
+required-features = ["bench"]
+
+[[bench]]
+name = "from_addresses_filter"
+harness = false
+required-features = ["bench"]
+
+[[bench]]
+name = "utxo_contention"
+harness = false
+required-features = ["bench"]
diff --git a/daemon/benches/address_scaling.rs b/daemon/benches/address_scaling.rs
new file mode 100644
index 0000000..521120b
--- /dev/null
+++ b/daemon/benches/address_scaling.rs
@@ -0,0 +1,246 @@
+use common::addresses::{multisig_address, multisig_address_from_sorted_keys};
+use common::keys::{Keys, master_key_path};
+use common::model::{Keychain, WalletAddress};
+use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
+use kaspa_addresses::{Address, Prefix as AddressPrefix, Version};
+use kaspa_bip32::secp256k1::SecretKey;
+use kaspa_bip32::{
+ DerivationPath, ExtendedPrivateKey, ExtendedPublicKey, Language, Mnemonic, Prefix as XPubPrefix,
+};
+use kaswallet_daemon::address_manager::AddressManager;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+
+fn xpub_from_mnemonic(
+ phrase: &str,
+ is_multisig: bool,
+) -> ExtendedPublicKey {
+ let mnemonic = Mnemonic::new(phrase, Language::English).unwrap();
+ let seed = mnemonic.to_seed("");
+ let xprv = ExtendedPrivateKey::::new(seed).unwrap();
+ let xprv = xprv.derive_path(&master_key_path(is_multisig)).unwrap();
+ xprv.public_key()
+}
+
+fn make_keys(
+ public_keys: Vec>,
+ minimum_signatures: u16,
+) -> Arc {
+ Arc::new(Keys::new(
+ "bench-unused-keys.json".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ public_keys,
+ 0,
+ 0,
+ minimum_signatures,
+ 0,
+ ))
+}
+
+fn baseline_calculate_address_path(
+ wallet_address: &WalletAddress,
+ is_multisig: bool,
+) -> DerivationPath {
+ let keychain_number = wallet_address.keychain.clone() as u32;
+ let path_string = if is_multisig {
+ format!(
+ "m/{}/{}/{}",
+ wallet_address.cosigner_index, keychain_number, wallet_address.index
+ )
+ } else {
+ format!("m/{}/{}", keychain_number, wallet_address.index)
+ };
+ path_string.parse().unwrap()
+}
+
+fn bench_calculate_address_path(c: &mut Criterion) {
+ let keys = make_keys(
+ vec![xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ false,
+ )],
+ 1,
+ );
+ let manager = AddressManager::new(keys, AddressPrefix::Mainnet);
+
+ let wallet_addresses: Vec = (0..256)
+ .map(|i| WalletAddress::new(i, 0, Keychain::External))
+ .collect();
+
+ c.bench_function("calculate_address_path/new (singlesig)", |b| {
+ b.iter(|| {
+ for wa in &wallet_addresses {
+ black_box(manager.calculate_address_path(wa).unwrap());
+ }
+ })
+ });
+
+ c.bench_function("calculate_address_path/baseline parse (singlesig)", |b| {
+ b.iter(|| {
+ for wa in &wallet_addresses {
+ black_box(baseline_calculate_address_path(wa, false));
+ }
+ })
+ });
+}
+
+fn bench_multisig_sorted_vs_unsorted(c: &mut Criterion) {
+ let xpub1 = xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ true,
+ );
+ let xpub2 = xpub_from_mnemonic(
+ "legal winner thank year wave sausage worth useful legal winner thank yellow",
+ true,
+ );
+ let xpub3 = xpub_from_mnemonic(
+ "letter advice cage absurd amount doctor acoustic avoid letter advice cage above",
+ true,
+ );
+
+ let unsorted = Arc::new(vec![xpub2.clone(), xpub3.clone(), xpub1.clone()]);
+ let mut sorted = vec![xpub2, xpub3, xpub1];
+ sorted.sort();
+
+ let derivation_path: DerivationPath = "m/0/0/0".parse().unwrap();
+
+ c.bench_function("multisig_address (sorts each call)", |b| {
+ b.iter(|| {
+ black_box(
+ multisig_address(unsorted.clone(), 2, AddressPrefix::Devnet, &derivation_path)
+ .unwrap(),
+ );
+ })
+ });
+
+ c.bench_function(
+ "multisig_address_from_sorted_keys (no per-call sort)",
+ |b| {
+ b.iter(|| {
+ black_box(
+ multisig_address_from_sorted_keys(
+ &sorted,
+ 2,
+ AddressPrefix::Devnet,
+ &derivation_path,
+ )
+ .unwrap(),
+ );
+ })
+ },
+ );
+}
+
+fn bench_monitored_addresses_cache(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
+ let keys = make_keys(
+ vec![xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ false,
+ )],
+ 1,
+ );
+ let manager = AddressManager::new(keys, AddressPrefix::Mainnet);
+
+ // Seed a large monitored set without disk IO (bench-only helpers).
+ let address_count: u32 = 10_000;
+ rt.block_on(async {
+ for i in 0..address_count {
+ let mut payload = [0u8; 32];
+ payload[..4].copy_from_slice(&i.to_le_bytes());
+ let address = Address::new(AddressPrefix::Mainnet, Version::PubKey, &payload);
+ let wa = WalletAddress::new(i, 0, Keychain::External);
+ manager.insert_address_for_bench(address, wa).await;
+ }
+ // Warm cache once.
+ manager.monitored_addresses().await.unwrap();
+ });
+
+ c.bench_function("monitored_addresses/cached (10k)", |b| {
+ b.iter(|| {
+ let addresses = rt.block_on(manager.monitored_addresses()).unwrap();
+ black_box(addresses.len());
+ })
+ });
+
+ c.bench_function("monitored_addresses/rebuild (10k)", |b| {
+ b.iter(|| {
+ manager.bump_address_set_version_for_bench();
+ let addresses = rt.block_on(manager.monitored_addresses()).unwrap();
+ black_box(addresses.len());
+ })
+ });
+}
+
+fn bench_addresses_to_query(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
+ let single_keys = make_keys(
+ vec![xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ false,
+ )],
+ 1,
+ );
+ let single = AddressManager::new(single_keys, AddressPrefix::Mainnet);
+
+ let multi_keys = make_keys(
+ vec![
+ xpub_from_mnemonic(
+ "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
+ true,
+ ),
+ xpub_from_mnemonic(
+ "legal winner thank year wave sausage worth useful legal winner thank yellow",
+ true,
+ ),
+ xpub_from_mnemonic(
+ "letter advice cage absurd amount doctor acoustic avoid letter advice cage above",
+ true,
+ ),
+ ],
+ 2,
+ );
+ let multi = AddressManager::new(multi_keys, AddressPrefix::Mainnet);
+
+ let mut group = c.benchmark_group("addresses_to_query");
+ for &indexes in &[10u32, 100, 1_000] {
+ group.bench_with_input(
+ BenchmarkId::new("singlesig", indexes),
+ &indexes,
+ |b, &indexes| {
+ b.iter(|| {
+ let set = rt.block_on(single.addresses_to_query(0, indexes)).unwrap();
+ black_box(set.len());
+ })
+ },
+ );
+
+ // Multisig derivation is significantly more expensive; keep the upper bound smaller.
+ if indexes <= 100 {
+ group.bench_with_input(
+ BenchmarkId::new("multisig (2-of-3)", indexes),
+ &indexes,
+ |b, &indexes| {
+ b.iter(|| {
+ let set = rt.block_on(multi.addresses_to_query(0, indexes)).unwrap();
+ black_box(set.len());
+ })
+ },
+ );
+ }
+ }
+ group.finish();
+}
+
+criterion_group!(
+ benches,
+ bench_calculate_address_path,
+ bench_multisig_sorted_vs_unsorted,
+ bench_monitored_addresses_cache,
+ bench_addresses_to_query
+);
+criterion_main!(benches);
diff --git a/daemon/benches/from_addresses_filter.rs b/daemon/benches/from_addresses_filter.rs
new file mode 100644
index 0000000..b458f19
--- /dev/null
+++ b/daemon/benches/from_addresses_filter.rs
@@ -0,0 +1,77 @@
+use common::model::{Keychain, WalletAddress};
+use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
+use std::collections::HashSet;
+use std::time::Duration;
+
+fn bench_from_addresses_filter(c: &mut Criterion) {
+ let mut group = c.benchmark_group("from_addresses_filter");
+ group.sample_size(20);
+ group.measurement_time(Duration::from_secs(3));
+
+ let address_pool_size: u32 = 10_000;
+ let address_pool: Vec = (0..address_pool_size)
+ .map(|i| WalletAddress::new(i, 0, Keychain::External))
+ .collect();
+
+ let utxo_counts: [usize; 2] = [200_000, 1_000_000];
+ let filter_lens: [usize; 4] = [0, 1, 10, 100];
+
+ for &utxo_count in &utxo_counts {
+ let utxo_addresses: Vec = (0..utxo_count as u32)
+ .map(|i| WalletAddress::new(i % address_pool_size, 0, Keychain::External))
+ .collect();
+
+ for &filter_len in &filter_lens {
+ let from_addresses: Vec<&WalletAddress> =
+ address_pool.iter().take(filter_len).collect();
+
+ group.bench_with_input(
+ BenchmarkId::new(format!("linear/utxos_{utxo_count}"), filter_len),
+ &filter_len,
+ |b, _| {
+ b.iter(|| {
+ let mut kept = 0usize;
+ for wa in &utxo_addresses {
+ if !from_addresses.is_empty() && !from_addresses.contains(&wa) {
+ continue;
+ }
+ kept += 1;
+ }
+ black_box(kept);
+ })
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new(format!("hashset/utxos_{utxo_count}"), filter_len),
+ &filter_len,
+ |b, _| {
+ b.iter(|| {
+ let from_set: Option> = if from_addresses.is_empty()
+ {
+ None
+ } else {
+ Some(from_addresses.iter().map(|wa| (*wa).clone()).collect())
+ };
+
+ let mut kept = 0usize;
+ for wa in &utxo_addresses {
+ if let Some(ref set) = from_set {
+ if !set.contains(wa) {
+ continue;
+ }
+ }
+ kept += 1;
+ }
+ black_box(kept);
+ })
+ },
+ );
+ }
+ }
+
+ group.finish();
+}
+
+criterion_group!(benches, bench_from_addresses_filter);
+criterion_main!(benches);
diff --git a/daemon/benches/utxo_contention.rs b/daemon/benches/utxo_contention.rs
new file mode 100644
index 0000000..fbc4ab5
--- /dev/null
+++ b/daemon/benches/utxo_contention.rs
@@ -0,0 +1,191 @@
+use common::keys::Keys;
+use common::model::{Keychain, WalletAddress, WalletSignableTransaction, WalletUtxoEntry};
+use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
+use kaspa_addresses::{Address, Prefix as AddressPrefix, Version};
+use kaspa_bip32::Prefix as XPubPrefix;
+use kaspa_consensus_core::tx::{
+ ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint,
+ TransactionOutput, UtxoEntry,
+};
+use kaspa_hashes::Hash;
+use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry};
+use kaswallet_daemon::address_manager::AddressManager;
+use kaswallet_daemon::utxo_manager::UtxoManager;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
+use std::time::Duration;
+use tokio::runtime::Runtime;
+use tokio::sync::Mutex;
+
+fn txid(i: u32) -> Hash {
+ let mut bytes = [0u8; 32];
+ bytes[..4].copy_from_slice(&i.to_le_bytes());
+ Hash::from_bytes(bytes)
+}
+
+fn make_outpoint(i: u32) -> RpcTransactionOutpoint {
+ RpcTransactionOutpoint {
+ transaction_id: txid(i),
+ index: i,
+ }
+}
+
+fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry {
+ RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false)
+}
+
+fn make_address(prefix: AddressPrefix, i: u32) -> Address {
+ let mut payload = [0u8; 32];
+ payload[..4].copy_from_slice(&i.to_le_bytes());
+ Address::new(prefix, Version::PubKey, &payload)
+}
+
+fn make_rpc_utxo(i: u32, address: Address) -> RpcUtxosByAddressesEntry {
+ let amount = ((i % 10_000) + 1) as u64;
+ RpcUtxosByAddressesEntry {
+ address: Some(address),
+ outpoint: make_outpoint(i),
+ utxo_entry: make_rpc_utxo_entry(amount),
+ }
+}
+
+fn bench_utxo_state_reads_while_updating(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
+ let keys = Arc::new(Keys::new(
+ "bench-unused-keys.json".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+
+ let prefix = AddressPrefix::Mainnet;
+ let address_count: u32 = 10_000;
+
+ // Seed address manager with a large set of monitored addresses.
+ let address_manager = AddressManager::new(keys, prefix);
+ let mut addresses = Vec::with_capacity(address_count as usize);
+ rt.block_on(async {
+ for i in 0..address_count {
+ let address = make_address(prefix, i);
+ let wa = WalletAddress::new(i, 0, Keychain::External);
+ address_manager
+ .insert_address_for_bench(address.clone(), wa)
+ .await;
+ addresses.push(address);
+ }
+ });
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ rt.block_on(async {
+ let guard = address_manager.lock().await;
+ guard.monitored_address_map().await.unwrap();
+ });
+
+ let utxo_manager = Arc::new(UtxoManager::new_for_bench(address_manager.clone()));
+
+ let utxo_count: u32 = 10_000;
+ let base_entries: Vec = (0..utxo_count)
+ .map(|i| make_rpc_utxo(i, addresses[(i % address_count) as usize].clone()))
+ .collect();
+
+ // Establish initial state and keep one wallet-local pending tx in the overlay.
+ rt.block_on(async {
+ utxo_manager
+ .update_utxo_set(base_entries.clone(), vec![])
+ .await
+ .unwrap();
+
+ // Spend one known outpoint and create one output to our address (wallet-local mempool overlay).
+ let input_outpoint = make_outpoint(0);
+ let wallet_utxo_entry: WalletUtxoEntry = make_rpc_utxo_entry(1).into();
+ let input_entry: UtxoEntry = wallet_utxo_entry.clone().into();
+
+ let input = TransactionInput::new(
+ TransactionOutpoint::new(input_outpoint.transaction_id, input_outpoint.index),
+ vec![],
+ 0,
+ 1,
+ );
+ let output = TransactionOutput::new(1, ScriptPublicKey::from_vec(0, vec![]));
+ let tx = Transaction::new(
+ 0,
+ vec![input],
+ vec![output],
+ 0,
+ Default::default(),
+ 0,
+ vec![],
+ );
+ let signable = SignableTransaction::with_entries(tx, vec![input_entry]);
+
+ let wa0 = WalletAddress::new(0, 0, Keychain::External);
+ let a0 = addresses[0].clone();
+ let wallet_tx: WalletSignableTransaction = WalletSignableTransaction::new_from_unsigned(
+ signable,
+ HashSet::new(),
+ vec![wa0],
+ vec![a0],
+ );
+ utxo_manager.add_mempool_transaction(&wallet_tx).await;
+ });
+
+ // Background refresh loop to exercise write-lock swaps while measuring reads.
+ let stop = Arc::new(AtomicBool::new(false));
+ let stop_clone = Arc::clone(&stop);
+ let utxo_manager_clone = Arc::clone(&utxo_manager);
+ let refresh_entries = base_entries.clone();
+ let refresh_task = rt.spawn(async move {
+ while !stop_clone.load(Relaxed) {
+ utxo_manager_clone
+ .update_utxo_set(refresh_entries.clone(), vec![])
+ .await
+ .unwrap();
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+ });
+
+ let mut group = c.benchmark_group("utxo_contention");
+ group.bench_function(BenchmarkId::new("state", "utxo_count"), |b| {
+ b.iter(|| {
+ let count = rt.block_on(async { utxo_manager.state().await.utxo_count() });
+ black_box(count);
+ });
+ });
+ group.bench_function(BenchmarkId::new("state", "sorted_take_10_sum"), |b| {
+ b.iter(|| {
+ let sum = rt.block_on(async {
+ let state = utxo_manager.state().await;
+ state
+ .utxos_sorted_by_amount()
+ .take(10)
+ .map(|utxo| utxo.utxo_entry.amount)
+ .sum::()
+ });
+ black_box(sum);
+ });
+ });
+ group.bench_function(BenchmarkId::new("state_with_mempool", "utxo_count"), |b| {
+ b.iter(|| {
+ let count = rt.block_on(async {
+ let view = utxo_manager.state_with_mempool().await.unwrap();
+ view.utxo_count()
+ });
+ black_box(count);
+ });
+ });
+ group.finish();
+
+ stop.store(true, Relaxed);
+ refresh_task.abort();
+ let _ = rt.block_on(refresh_task);
+}
+
+criterion_group!(benches, bench_utxo_state_reads_while_updating);
+criterion_main!(benches);
diff --git a/daemon/benches/utxo_scaling.rs b/daemon/benches/utxo_scaling.rs
new file mode 100644
index 0000000..31c9e37
--- /dev/null
+++ b/daemon/benches/utxo_scaling.rs
@@ -0,0 +1,108 @@
+use common::keys::Keys;
+use common::model::{Keychain, WalletAddress};
+use criterion::{BatchSize, BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
+use kaspa_addresses::Prefix as AddressPrefix;
+use kaspa_bip32::Prefix as XPubPrefix;
+use kaspa_consensus_core::tx::ScriptPublicKey;
+use kaspa_hashes::Hash;
+use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry};
+use kaswallet_daemon::address_manager::AddressManager;
+use kaswallet_daemon::utxo_manager::UtxoManager;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+use tokio::sync::Mutex;
+
+fn txid(i: u32) -> Hash {
+ let mut bytes = [0u8; 32];
+ bytes[..4].copy_from_slice(&i.to_le_bytes());
+ Hash::from_bytes(bytes)
+}
+
+fn make_outpoint(i: u32) -> RpcTransactionOutpoint {
+ RpcTransactionOutpoint {
+ transaction_id: txid(i),
+ index: i,
+ }
+}
+
+fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry {
+ RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false)
+}
+
+fn make_rpc_utxo(i: u32, address: kaspa_addresses::Address) -> RpcUtxosByAddressesEntry {
+ let amount = ((i % 10_000) + 1) as u64;
+ RpcUtxosByAddressesEntry {
+ address: Some(address),
+ outpoint: make_outpoint(i),
+ utxo_entry: make_rpc_utxo_entry(amount),
+ }
+}
+
+fn bench_update_utxo_set(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
+ let keys = Arc::new(Keys::new(
+ "bench-unused-keys.json".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+
+ // Seed address_manager with a realistic number of monitored addresses.
+ let address_manager = AddressManager::new(keys, AddressPrefix::Mainnet);
+ let address_count: u32 = 20_000;
+ let mut addresses = Vec::with_capacity(address_count as usize);
+ rt.block_on(async {
+ for i in 0..address_count {
+ let mut payload = [0u8; 32];
+ payload[..4].copy_from_slice(&i.to_le_bytes());
+ let address = kaspa_addresses::Address::new(
+ AddressPrefix::Mainnet,
+ kaspa_addresses::Version::PubKey,
+ &payload,
+ );
+ let wa = WalletAddress::new(i, 0, Keychain::External);
+ address_manager
+ .insert_address_for_bench(address.clone(), wa)
+ .await;
+ addresses.push(address);
+ }
+ });
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ let utxo_manager = UtxoManager::new_for_bench(address_manager);
+
+ let mut group = c.benchmark_group("update_utxo_set");
+ for &utxo_count in &[1_000u32, 10_000, 50_000] {
+ let base_entries: Vec = (0..utxo_count)
+ .map(|i| make_rpc_utxo(i, addresses[(i % address_count) as usize].clone()))
+ .collect();
+
+ group.bench_with_input(
+ BenchmarkId::from_parameter(utxo_count),
+ &utxo_count,
+ |b, _| {
+ // Clone inputs outside the measured section so we measure `update_utxo_set` itself.
+ b.iter_batched(
+ || base_entries.clone(),
+ |entries| {
+ rt.block_on(utxo_manager.update_utxo_set(entries, vec![]))
+ .unwrap();
+ let count = rt.block_on(async { utxo_manager.state().await.utxo_count() });
+ black_box(count);
+ },
+ BatchSize::LargeInput,
+ )
+ },
+ );
+ }
+ group.finish();
+}
+
+criterion_group!(benches, bench_update_utxo_set);
+criterion_main!(benches);
diff --git a/daemon/src/address_manager.rs b/daemon/src/address_manager.rs
index 7b86243..5a1ca9b 100644
--- a/daemon/src/address_manager.rs
+++ b/daemon/src/address_manager.rs
@@ -1,19 +1,28 @@
-use common::addresses::{multisig_address, p2pk_address};
-use common::errors::{ResultExt, WalletResult};
+use common::addresses::{multisig_address_from_sorted_keys, p2pk_address};
+use common::errors::WalletResult;
use common::keys::Keys;
use common::model::{KEYCHAINS, Keychain, WalletAddress};
use kaspa_addresses::{Address, Prefix as AddressPrefix};
use kaspa_bip32::secp256k1::PublicKey;
-use kaspa_bip32::{DerivationPath, ExtendedPublicKey};
+use kaspa_bip32::{ChildNumber, DerivationPath, ExtendedPublicKey};
use kaspa_rpc_core::RpcBalancesByAddressesEntry;
use std::collections::HashMap;
use std::error::Error;
-use std::str::FromStr;
use std::sync::Arc;
+use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use tokio::sync::Mutex;
pub type AddressSet = HashMap;
+pub type AddressQuerySet = HashMap;
+
+#[derive(Debug, Clone)]
+struct MonitoredAddressesCache {
+ version: u64,
+ addresses: Arc>,
+ by_address: Arc>,
+}
+
#[derive(Debug)]
pub struct AddressManager {
keys_file: Arc,
@@ -23,19 +32,30 @@ pub struct AddressManager {
prefix: AddressPrefix,
address_cache: Mutex>,
+
+ address_set_version: AtomicU64,
+ monitored_addresses_cache: Mutex,
}
impl AddressManager {
pub fn new(keys: Arc, prefix: AddressPrefix) -> Self {
let is_multisig = keys.public_keys.len() > 1;
+ let mut sorted_public_keys = keys.public_keys.clone();
+ sorted_public_keys.sort();
Self {
keys_file: keys.clone(),
- extended_public_keys: Arc::new(keys.public_keys.clone()),
+ extended_public_keys: Arc::new(sorted_public_keys),
addresses: Mutex::new(HashMap::new()),
is_multisig,
prefix,
address_cache: Mutex::new(HashMap::new()),
+ address_set_version: AtomicU64::new(0),
+ monitored_addresses_cache: Mutex::new(MonitoredAddressesCache {
+ version: 0,
+ addresses: Arc::new(Vec::new()),
+ by_address: Arc::new(HashMap::new()),
+ }),
}
}
@@ -60,6 +80,58 @@ impl AddressManager {
Ok(strings)
}
+ pub async fn monitored_addresses(
+ &self,
+ ) -> Result>, Box> {
+ let current_version = self.address_set_version.load(Relaxed);
+ {
+ let cache = self.monitored_addresses_cache.lock().await;
+ if cache.version == current_version {
+ return Ok(cache.addresses.clone());
+ }
+ }
+
+ let (addresses_vec, by_address): (Vec, HashMap) = {
+ let addresses = self.addresses.lock().await;
+ let mut parsed = Vec::with_capacity(addresses.len());
+ let mut by_address = HashMap::with_capacity(addresses.len());
+ for (address_string, wallet_address) in addresses.iter() {
+ let address = Address::try_from(address_string.as_str()).map_err(|err| {
+ format!("invalid address in wallet address_set ({address_string}): {err}")
+ })?;
+ parsed.push(address.clone());
+ by_address.insert(address, wallet_address.clone());
+ }
+ (parsed, by_address)
+ };
+
+ let addresses_arc = Arc::new(addresses_vec);
+ let by_address_arc = Arc::new(by_address);
+ let mut cache = self.monitored_addresses_cache.lock().await;
+ cache.version = current_version;
+ cache.addresses = addresses_arc.clone();
+ cache.by_address = by_address_arc;
+ Ok(addresses_arc)
+ }
+
+ pub async fn monitored_address_map(
+ &self,
+ ) -> Result>, Box> {
+ let current_version = self.address_set_version.load(Relaxed);
+ {
+ let cache = self.monitored_addresses_cache.lock().await;
+ if cache.version == current_version {
+ return Ok(cache.by_address.clone());
+ }
+ }
+
+ // Rebuild once (also refreshes the map).
+ let _ = self.monitored_addresses().await?;
+
+ let cache = self.monitored_addresses_cache.lock().await;
+ Ok(cache.by_address.clone())
+ }
+
pub async fn new_address(&self) -> WalletResult<(String, WalletAddress)> {
let last_used_external_index_previous_value = self
.keys_file
@@ -77,22 +149,26 @@ impl AddressManager {
.kaspa_address_from_wallet_address(&wallet_address, true)
.await?;
+ let address_string = address.to_string();
{
- self.addresses
- .lock()
- .await
- .insert(address.to_string(), wallet_address.clone());
+ let mut addresses = self.addresses.lock().await;
+ addresses.insert(address_string.clone(), wallet_address.clone());
+ self.address_set_version.fetch_add(1, Relaxed);
}
- Ok((address.to_string(), wallet_address))
+ Ok((address_string, wallet_address))
}
pub async fn addresses_to_query(
&self,
start: u32,
end: u32,
- ) -> Result> {
- let mut addresses = HashMap::new();
+ ) -> Result> {
+ let mut addresses = HashMap::with_capacity(
+ (end.saturating_sub(start) as usize)
+ * self.extended_public_keys.len()
+ * KEYCHAINS.len(),
+ );
for index in start..end {
for cosigner_index in 0..self.extended_public_keys.len() as u16 {
@@ -101,7 +177,7 @@ impl AddressManager {
let address = self
.kaspa_address_from_wallet_address(&wallet_address, false)
.await?;
- addresses.insert(address.to_string(), wallet_address);
+ addresses.insert(address, wallet_address);
}
}
}
@@ -111,18 +187,20 @@ impl AddressManager {
pub async fn update_addresses_and_last_used_indexes(
&self,
- mut address_set: AddressSet,
+ mut address_set: AddressQuerySet,
get_balances_by_addresses_response: Vec,
) -> Result<(), Box> {
// create scope to release last_used_internal/external_index before keys_file.save() is called
{
+ let mut addresses_guard = self.addresses.lock().await;
+ let mut inserted_any = false;
for entry in get_balances_by_addresses_response {
if entry.balance == Some(0) {
continue;
}
let address_string = entry.address.to_string();
- let wallet_address = address_set.remove(&address_string).unwrap();
+ let wallet_address = address_set.remove(&entry.address).unwrap();
if wallet_address.keychain == Keychain::External {
if wallet_address.index > self.keys_file.last_used_external_index.load(Relaxed)
@@ -139,10 +217,11 @@ impl AddressManager {
.store(wallet_address.index, Relaxed);
}
- self.addresses
- .lock()
- .await
- .insert(address_string, wallet_address);
+ addresses_guard.insert(address_string, wallet_address);
+ inserted_any = true;
+ }
+ if inserted_any {
+ self.address_set_version.fetch_add(1, Relaxed);
}
}
@@ -194,17 +273,15 @@ impl AddressManager {
&self,
wallet_address: &WalletAddress,
) -> WalletResult {
+ // Avoid string formatting + parsing in the hot address-derivation path.
let keychain_number = wallet_address.keychain.clone() as u32;
- let path_string = if self.is_multisig {
- format!(
- "m/{}/{}/{}",
- wallet_address.cosigner_index, keychain_number, wallet_address.index
- )
- } else {
- format!("m/{}/{}", keychain_number, wallet_address.index)
- };
- let path = DerivationPath::from_str(&path_string).to_wallet_result_internal()?;
+ let mut path = DerivationPath::default();
+ if self.is_multisig {
+ path.push(ChildNumber(wallet_address.cosigner_index as u32));
+ }
+ path.push(ChildNumber(keychain_number));
+ path.push(ChildNumber(wallet_address.index));
Ok(path)
}
@@ -217,8 +294,8 @@ impl AddressManager {
}
fn multisig_address(&self, derivation_path: &DerivationPath) -> WalletResult {
- multisig_address(
- self.extended_public_keys.clone(),
+ multisig_address_from_sorted_keys(
+ self.extended_public_keys.as_ref(),
self.keys_file.minimum_signatures as usize,
self.prefix,
derivation_path,
@@ -254,13 +331,150 @@ impl AddressManager {
.kaspa_address_from_wallet_address(&wallet_address, true)
.await?;
+ let address_string = address.to_string();
{
- self.addresses
- .lock()
- .await
- .insert(address.to_string(), wallet_address.clone());
+ let mut addresses = self.addresses.lock().await;
+ addresses.insert(address_string, wallet_address.clone());
+ self.address_set_version.fetch_add(1, Relaxed);
}
Ok((address, wallet_address))
}
+
+ #[cfg(any(test, feature = "bench"))]
+ pub async fn insert_address_for_bench(&self, address: Address, wallet_address: WalletAddress) {
+ let mut addresses = self.addresses.lock().await;
+ addresses.insert(address.to_string(), wallet_address);
+ self.address_set_version.fetch_add(1, Relaxed);
+ }
+
+ #[cfg(any(test, feature = "bench"))]
+ pub fn bump_address_set_version_for_bench(&self) {
+ self.address_set_version.fetch_add(1, Relaxed);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{AddressManager, AddressQuerySet, Keychain, WalletAddress};
+ use common::keys::Keys;
+ use kaspa_addresses::{Address, Prefix, Version};
+ use kaspa_bip32::secp256k1::SecretKey;
+ use kaspa_bip32::{ExtendedPrivateKey, Language, Mnemonic, Prefix as XPubPrefix};
+ use std::collections::HashMap;
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ static TEST_ID: AtomicUsize = AtomicUsize::new(0);
+
+ fn unique_keys_path() -> String {
+ let id = TEST_ID.fetch_add(1, Ordering::Relaxed);
+ std::env::temp_dir()
+ .join(format!("kaswallet-address-manager-test-{id}.json"))
+ .to_string_lossy()
+ .to_string()
+ }
+
+ fn keys_with_no_pubkeys() -> Arc {
+ Arc::new(Keys::new(
+ unique_keys_path(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ))
+ }
+
+ fn keys_with_single_pubkey() -> Arc {
+ let phrase = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about";
+ let mnemonic = Mnemonic::new(phrase, Language::English).unwrap();
+ let seed = mnemonic.to_seed("");
+ let xprv = ExtendedPrivateKey::::new(seed).unwrap();
+ let xprv = xprv
+ .derive_path(&common::keys::master_key_path(false))
+ .unwrap();
+ let xpub = xprv.public_key();
+
+ Arc::new(Keys::new(
+ unique_keys_path(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![xpub],
+ 0,
+ 0,
+ 1,
+ 0,
+ ))
+ }
+
+ #[test]
+ fn calculate_address_path_singlesig_matches_expected_format() {
+ let manager = AddressManager::new(keys_with_no_pubkeys(), Prefix::Mainnet);
+ let wallet_address = WalletAddress::new(7, 0, Keychain::External);
+
+ let path = manager.calculate_address_path(&wallet_address).unwrap();
+ assert_eq!(path.to_string(), "m/0/7");
+ }
+
+ #[tokio::test]
+ async fn addresses_to_query_uses_address_keys_and_expected_count() {
+ let manager = AddressManager::new(keys_with_single_pubkey(), Prefix::Devnet);
+ let set = manager.addresses_to_query(0, 2).await.unwrap();
+
+ // (end-start)=2 indexes, 1 cosigner, 2 keychains => 4 addresses.
+ assert_eq!(set.len(), 4);
+ }
+
+ #[tokio::test]
+ async fn monitored_addresses_cache_is_reused_and_invalidated_on_change() {
+ let keys = keys_with_no_pubkeys();
+ let manager = AddressManager::new(keys, Prefix::Mainnet);
+
+ let address1 = Address::new(Prefix::Mainnet, Version::PubKey, &[1u8; 32]);
+ let wallet_address1 = WalletAddress::new(1, 0, Keychain::External);
+
+ let mut query_set: AddressQuerySet = HashMap::new();
+ query_set.insert(address1.clone(), wallet_address1);
+
+ manager
+ .update_addresses_and_last_used_indexes(
+ query_set,
+ vec![kaspa_rpc_core::RpcBalancesByAddressesEntry {
+ address: address1.clone(),
+ balance: Some(1),
+ }],
+ )
+ .await
+ .unwrap();
+
+ let first = manager.monitored_addresses().await.unwrap();
+ let second = manager.monitored_addresses().await.unwrap();
+ assert!(Arc::ptr_eq(&first, &second));
+ assert_eq!(first.len(), 1);
+
+ let address2 = Address::new(Prefix::Mainnet, Version::PubKey, &[2u8; 32]);
+ let wallet_address2 = WalletAddress::new(2, 0, Keychain::External);
+ let mut query_set: AddressQuerySet = HashMap::new();
+ query_set.insert(address2.clone(), wallet_address2);
+
+ manager
+ .update_addresses_and_last_used_indexes(
+ query_set,
+ vec![kaspa_rpc_core::RpcBalancesByAddressesEntry {
+ address: address2.clone(),
+ balance: Some(1),
+ }],
+ )
+ .await
+ .unwrap();
+
+ let third = manager.monitored_addresses().await.unwrap();
+ assert!(!Arc::ptr_eq(&second, &third));
+ assert_eq!(third.len(), 2);
+ }
}
diff --git a/daemon/src/args.rs b/daemon/src/args.rs
index 85fa6dd..5e8b779 100644
--- a/daemon/src/args.rs
+++ b/daemon/src/args.rs
@@ -1,112 +1,135 @@
-use clap::{Parser, ValueEnum};
-use common::args::parse_network_type;
-use kaspa_consensus_core::network::NetworkId;
-use log::LevelFilter;
-
-#[derive(Parser, Debug, Clone)]
-#[command(name = "kaswallet-daemon")]
-pub struct Args {
- #[arg(long, help = "Use the test network")]
- pub testnet: bool,
-
- #[arg(long, default_value = "10", help = "Testnet network suffix number")]
- pub testnet_suffix: u32,
-
- #[arg(long, help = "Use the development test network")]
- pub devnet: bool,
-
- #[arg(long, help = "Use the simulation test network")]
- pub simnet: bool,
-
- // TODO: Remove when wallet is more stable
- #[arg(long = "enable-mainnet-pre-launch", hide = true)]
- pub enable_mainnet_pre_launch: bool,
-
- #[arg(long = "keys", short = 'k', help = "Path to keys file")]
- pub keys_file_path: Option,
-
- #[arg(long, help = "Path to logs directory")]
- pub logs_path: Option,
-
- #[arg(long, short = 'v', default_value = "info", help = "Log level")]
- pub logs_level: LogsLevel,
-
- #[arg(long, short = 's', help = "Kaspa node RPC server to connect to")]
- pub server: Option,
-
- #[arg(
- long,
- short = 'l',
- default_value = "127.0.0.1:8082",
- help = "Address to listen on"
- )]
- pub listen: String,
-
- #[arg(long, help = "Enable tokio console")]
- #[cfg(debug_assertions)]
- pub enable_tokio_console: bool,
-
- #[arg(
- long,
- default_value = "10000",
- help = "Sync interval in milliseconds",
- hide = true
- )]
- pub sync_interval_millis: u64,
-}
-
-impl Default for Args {
- fn default() -> Self {
- Self {
- testnet: false,
- testnet_suffix: 10,
- devnet: false,
- simnet: false,
- enable_mainnet_pre_launch: false,
- keys_file_path: None,
- logs_path: None,
- logs_level: Default::default(),
- server: None,
- listen: "".to_string(),
- #[cfg(debug_assertions)]
- enable_tokio_console: false,
- sync_interval_millis: 10,
- }
- }
-}
-
-#[derive(Debug, Clone, ValueEnum, Default)]
-pub enum LogsLevel {
- Off,
- Trace,
- #[default]
- Debug,
- Info,
- Warn,
- Error,
-}
-
-impl From for LevelFilter {
- fn from(value: LogsLevel) -> LevelFilter {
- match value {
- LogsLevel::Off => LevelFilter::Off,
- LogsLevel::Trace => LevelFilter::Trace,
- LogsLevel::Debug => LevelFilter::Debug,
- LogsLevel::Info => LevelFilter::Info,
- LogsLevel::Warn => LevelFilter::Warn,
- LogsLevel::Error => LevelFilter::Error,
- }
- }
-}
-
-impl Args {
- pub fn network_id(&self) -> NetworkId {
- parse_network_type(
- self.testnet,
- self.devnet,
- self.simnet,
- self.testnet_suffix,
- self.enable_mainnet_pre_launch,
- )
- }
-}
+use clap::{Parser, ValueEnum};
+use common::args::parse_network_type;
+use kaspa_consensus_core::network::NetworkId;
+use log::LevelFilter;
+
+const DEFAULT_SYNC_INTERVAL_MILLIS: u64 = 10_000;
+
+#[derive(Parser, Debug, Clone)]
+#[command(name = "kaswallet-daemon")]
+pub struct Args {
+ #[arg(long, help = "Use the test network")]
+ pub testnet: bool,
+
+ #[arg(long, default_value = "10", help = "Testnet network suffix number")]
+ pub testnet_suffix: u32,
+
+ #[arg(long, help = "Use the development test network")]
+ pub devnet: bool,
+
+ #[arg(long, help = "Use the simulation test network")]
+ pub simnet: bool,
+
+ // TODO: Remove when wallet is more stable
+ #[arg(long = "enable-mainnet-pre-launch", hide = true)]
+ pub enable_mainnet_pre_launch: bool,
+
+ #[arg(long = "keys", short = 'k', help = "Path to keys file")]
+ pub keys_file_path: Option,
+
+ #[arg(long, help = "Path to logs directory")]
+ pub logs_path: Option,
+
+ #[arg(long, short = 'v', default_value = "info", help = "Log level")]
+ pub logs_level: LogsLevel,
+
+ #[arg(long, short = 's', help = "Kaspa node RPC server to connect to")]
+ pub server: Option,
+
+ #[arg(
+ long,
+ short = 'l',
+ default_value = "127.0.0.1:8082",
+ help = "Address to listen on"
+ )]
+ pub listen: String,
+
+ #[arg(long, help = "Enable tokio console")]
+ #[cfg(debug_assertions)]
+ pub enable_tokio_console: bool,
+
+ #[arg(
+ long,
+ default_value_t = DEFAULT_SYNC_INTERVAL_MILLIS,
+ help = "Sync interval in milliseconds",
+ hide = true
+ )]
+ pub sync_interval_millis: u64,
+}
+
+impl Default for Args {
+ fn default() -> Self {
+ Self {
+ testnet: false,
+ testnet_suffix: 10,
+ devnet: false,
+ simnet: false,
+ enable_mainnet_pre_launch: false,
+ keys_file_path: None,
+ logs_path: None,
+ logs_level: Default::default(),
+ server: None,
+ listen: "".to_string(),
+ #[cfg(debug_assertions)]
+ enable_tokio_console: false,
+ sync_interval_millis: DEFAULT_SYNC_INTERVAL_MILLIS,
+ }
+ }
+}
+
+#[derive(Debug, Clone, ValueEnum, Default)]
+pub enum LogsLevel {
+ Off,
+ Trace,
+ #[default]
+ Debug,
+ Info,
+ Warn,
+ Error,
+}
+
+impl From for LevelFilter {
+ fn from(value: LogsLevel) -> LevelFilter {
+ match value {
+ LogsLevel::Off => LevelFilter::Off,
+ LogsLevel::Trace => LevelFilter::Trace,
+ LogsLevel::Debug => LevelFilter::Debug,
+ LogsLevel::Info => LevelFilter::Info,
+ LogsLevel::Warn => LevelFilter::Warn,
+ LogsLevel::Error => LevelFilter::Error,
+ }
+ }
+}
+
+impl Args {
+ pub fn network_id(&self) -> NetworkId {
+ parse_network_type(
+ self.testnet,
+ self.devnet,
+ self.simnet,
+ self.testnet_suffix,
+ self.enable_mainnet_pre_launch,
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Args;
+ use clap::Parser;
+
+ #[test]
+ fn sync_interval_default_matches_clap_default() {
+ let args_from_struct = Args::default();
+ let args_from_clap = Args::parse_from(["kaswallet-daemon"]);
+
+ assert_eq!(
+ args_from_struct.sync_interval_millis,
+ args_from_clap.sync_interval_millis
+ );
+ assert_eq!(
+ args_from_struct.sync_interval_millis,
+ super::DEFAULT_SYNC_INTERVAL_MILLIS
+ );
+ }
+}
diff --git a/daemon/src/bin/kaswallet_stress_bench.rs b/daemon/src/bin/kaswallet_stress_bench.rs
new file mode 100644
index 0000000..4961950
--- /dev/null
+++ b/daemon/src/bin/kaswallet_stress_bench.rs
@@ -0,0 +1,385 @@
+use clap::Parser;
+use common::keys::Keys;
+use common::model::{Keychain, WalletAddress, WalletSignableTransaction, WalletUtxoEntry};
+use kaspa_addresses::{Address, Prefix as AddressPrefix, Version};
+use kaspa_bip32::Prefix as XPubPrefix;
+use kaspa_consensus_core::tx::ScriptPublicKey;
+use kaspa_consensus_core::tx::TransactionId;
+use kaspa_consensus_core::tx::{
+ SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput,
+ UtxoEntry,
+};
+use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry};
+use kaswallet_daemon::address_manager::AddressManager;
+use kaswallet_daemon::utxo_manager::UtxoManager;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
+use std::time::Instant;
+use tokio::runtime::Runtime;
+use tokio::sync::Mutex;
+use tokio::time::MissedTickBehavior;
+
+#[derive(Parser, Debug)]
+#[command(about = "Synthetic stress benchmark for huge wallets (no RPC/network).")]
+struct Args {
+ /// Number of wallet addresses to seed into AddressManager.
+ #[arg(long, default_value_t = 1_000_000)]
+ addresses: u32,
+
+ /// Number of UTXOs to generate and feed into UtxoManager::update_utxo_set.
+ #[arg(long, default_value_t = 10_000_000)]
+ utxos: u32,
+
+ /// Progress log cadence (0 disables progress logs).
+ #[arg(long, default_value_t = 100_000)]
+ progress_every: u32,
+
+ /// Required safety flag because this benchmark can use MANY GiB of RAM.
+ #[arg(long)]
+ i_understand: bool,
+
+ /// Run a contention scenario: measure read latencies while a second `update_utxo_set` runs.
+ #[arg(long)]
+ contend: bool,
+
+ /// Number of concurrent reader tasks when running `--contend`.
+ #[arg(long, default_value_t = 4)]
+ contend_readers: u32,
+
+ /// Sampling interval (microseconds) for read latency measurements when running `--contend`.
+ #[arg(long, default_value_t = 100)]
+ contend_sample_interval_micros: u64,
+}
+
+fn address_for_index(prefix: AddressPrefix, i: u32) -> Address {
+ let mut payload = [0u8; 32];
+ payload[..4].copy_from_slice(&i.to_le_bytes());
+ Address::new(prefix, Version::PubKey, &payload)
+}
+
+fn txid(i: u32) -> TransactionId {
+ let mut bytes = [0u8; 32];
+ bytes[..4].copy_from_slice(&i.to_le_bytes());
+ TransactionId::from_bytes(bytes)
+}
+
+fn summarize_latencies(name: &str, mut samples_ns: Vec) {
+ if samples_ns.is_empty() {
+ println!(" {name}: no samples");
+ return;
+ }
+ samples_ns.sort_unstable();
+ let n = samples_ns.len();
+ let p99 = samples_ns[((n - 1) * 99) / 100];
+ let p999 = samples_ns[((n - 1) * 999) / 1000];
+ let max = *samples_ns.last().unwrap();
+
+ let p99_us = (p99 as f64) / 1_000.0;
+ let p999_us = (p999 as f64) / 1_000.0;
+ let max_us = (max as f64) / 1_000.0;
+
+ println!(" {name}: samples={n} p99={p99_us:.3}µs p999={p999_us:.3}µs max={max_us:.3}µs");
+}
+
+fn main() {
+ let args = Args::parse();
+ if !args.i_understand {
+ eprintln!(
+ "Refusing to run without --i-understand (this can use MANY GiB of RAM). \
+Example:\n RUSTC_WRAPPER= CARGO_TARGET_DIR=target cargo run -p kaswallet-daemon --features bench --release --bin kaswallet-stress-bench -- --i-understand --addresses 1000000 --utxos 10000000"
+ );
+ std::process::exit(2);
+ }
+ if args.addresses == 0 {
+ eprintln!("--addresses must be > 0");
+ std::process::exit(2);
+ }
+
+ let rt = Runtime::new().expect("tokio runtime");
+ let prefix = AddressPrefix::Mainnet;
+
+ // We don't derive addresses here; we seed AddressManager directly.
+ let keys = Arc::new(Keys::new(
+ "bench-unused-keys.json".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+
+ let address_manager = AddressManager::new(keys, prefix);
+
+ println!(
+ "kaswallet-stress-bench: seeding addresses={} utxos={}",
+ args.addresses, args.utxos
+ );
+
+ let start = Instant::now();
+ rt.block_on(async {
+ for i in 0..args.addresses {
+ let address = address_for_index(prefix, i);
+ let wa = WalletAddress::new(i, 0, Keychain::External);
+ address_manager.insert_address_for_bench(address, wa).await;
+
+ if args.progress_every > 0 && (i + 1) % args.progress_every == 0 {
+ println!(" seeded {} addresses", i + 1);
+ }
+ }
+ });
+ println!("Seeded addresses in {:?}", start.elapsed());
+
+ // Build and warm the monitored-address caches (both Vec and HashMap).
+ let start = Instant::now();
+ let monitored = rt
+ .block_on(address_manager.monitored_addresses())
+ .expect("monitored_addresses");
+ println!(
+ "monitored_addresses first build: {:?} (len={})",
+ start.elapsed(),
+ monitored.len()
+ );
+
+ let start = Instant::now();
+ let monitored2 = rt
+ .block_on(address_manager.monitored_addresses())
+ .expect("monitored_addresses cached");
+ println!(
+ "monitored_addresses cached: {:?} (same_arc={})",
+ start.elapsed(),
+ Arc::ptr_eq(&monitored, &monitored2)
+ );
+
+ let start = Instant::now();
+ let by_address = rt
+ .block_on(address_manager.monitored_address_map())
+ .expect("monitored_address_map");
+ println!(
+ "monitored_address_map cached: {:?} (len={})",
+ start.elapsed(),
+ by_address.len()
+ );
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ let utxo_manager = Arc::new(UtxoManager::new_for_bench(address_manager));
+
+ let empty_spk = ScriptPublicKey::from_vec(0, vec![]);
+
+ println!("Generating {} UTXO entries...", args.utxos);
+ let start = Instant::now();
+ let mut entries: Vec = Vec::with_capacity(args.utxos as usize);
+ for i in 0..args.utxos {
+ let address_index = i % args.addresses;
+ let address = address_for_index(prefix, address_index);
+
+ let outpoint = RpcTransactionOutpoint {
+ transaction_id: txid(i),
+ index: i,
+ };
+ let amount = ((i % 10_000) + 1) as u64;
+ let utxo_entry = RpcUtxoEntry::new(amount, empty_spk.clone(), 0, false);
+
+ entries.push(RpcUtxosByAddressesEntry {
+ address: Some(address),
+ outpoint,
+ utxo_entry,
+ });
+
+ if args.progress_every > 0 && (i + 1) % args.progress_every == 0 {
+ println!(" generated {} utxos", i + 1);
+ }
+ }
+ println!("Generated UTXO entries in {:?}", start.elapsed());
+
+ println!("Running update_utxo_set...");
+ let start = Instant::now();
+ rt.block_on(utxo_manager.update_utxo_set(entries, vec![]))
+ .expect("update_utxo_set");
+ let state = rt.block_on(utxo_manager.state());
+ println!(
+ "update_utxo_set: {:?} (utxos_by_outpoint={})",
+ start.elapsed(),
+ state.utxos_by_outpoint().len()
+ );
+
+ // Minimal sanity check to keep the compiler honest and confirm the sorted index exists.
+ let mut sum = 0u64;
+ for utxo in state.utxos_sorted_by_amount().take(1000) {
+ sum = sum.wrapping_add(utxo.utxo_entry.amount);
+ }
+ println!("sanity: sum(first 1000 amounts)={sum}");
+
+ if !args.contend {
+ return;
+ }
+
+ if args.contend_readers == 0 {
+ eprintln!("--contend-readers must be > 0");
+ std::process::exit(2);
+ }
+
+ println!(
+ "Starting contention run: readers={} sample_interval={}µs",
+ args.contend_readers, args.contend_sample_interval_micros
+ );
+
+ let stop = Arc::new(AtomicBool::new(false));
+ let utxo_manager_clone = Arc::clone(&utxo_manager);
+
+ // Keep one wallet-local pending tx so `state_with_mempool()` includes the overlay path.
+ rt.block_on(async {
+ let input_outpoint = RpcTransactionOutpoint {
+ transaction_id: txid(0),
+ index: 0,
+ };
+
+ let input = TransactionInput::new(
+ TransactionOutpoint::new(input_outpoint.transaction_id, input_outpoint.index),
+ vec![],
+ 0,
+ 1,
+ );
+ let output = TransactionOutput::new(1, empty_spk.clone());
+ let tx = Transaction::new(
+ 0,
+ vec![input],
+ vec![output],
+ 0,
+ Default::default(),
+ 0,
+ vec![],
+ );
+
+ let wallet_utxo_entry = WalletUtxoEntry::new(1, empty_spk.clone(), 0, false);
+ let input_entry: UtxoEntry = wallet_utxo_entry.into();
+ let signable = SignableTransaction::with_entries(tx, vec![input_entry]);
+
+ let wa0 = WalletAddress::new(0, 0, Keychain::External);
+ let a0 = address_for_index(prefix, 0);
+ let wallet_tx = WalletSignableTransaction::new_from_unsigned(
+ signable,
+ HashSet::new(),
+ vec![wa0],
+ vec![a0],
+ );
+ utxo_manager_clone.add_mempool_transaction(&wallet_tx).await;
+ });
+
+ let contend_sample_interval = if args.contend_sample_interval_micros == 0 {
+ None
+ } else {
+ Some(core::time::Duration::from_micros(
+ args.contend_sample_interval_micros,
+ ))
+ };
+
+ let utxo_manager_for_update = Arc::clone(&utxo_manager);
+ let contend_address_count = args.addresses;
+ let contend_utxo_count = args.utxos;
+ let contend_prefix = prefix;
+
+ let stop_clone = Arc::clone(&stop);
+ let update_handle = rt.spawn(async move {
+ println!("Contention: generating UTXOs for refresh...");
+ let start = Instant::now();
+ let empty_spk = ScriptPublicKey::from_vec(0, vec![]);
+ let mut entries: Vec =
+ Vec::with_capacity(contend_utxo_count as usize);
+ for i in 0..contend_utxo_count {
+ let address_index = i % contend_address_count;
+ let address = address_for_index(contend_prefix, address_index);
+ let outpoint = RpcTransactionOutpoint {
+ transaction_id: txid(i),
+ index: i,
+ };
+ let amount = ((i % 10_000) + 1) as u64;
+ let utxo_entry = RpcUtxoEntry::new(amount, empty_spk.clone(), 0, false);
+ entries.push(RpcUtxosByAddressesEntry {
+ address: Some(address),
+ outpoint,
+ utxo_entry,
+ });
+ }
+ println!("Contention: generated UTXOs in {:?}", start.elapsed());
+
+ println!("Contention: running update_utxo_set...");
+ let start = Instant::now();
+ utxo_manager_for_update
+ .update_utxo_set(entries, vec![])
+ .await
+ .expect("contention update_utxo_set");
+ let elapsed = start.elapsed();
+ println!("Contention: update_utxo_set done in {elapsed:?}");
+
+ stop_clone.store(true, Relaxed);
+ elapsed
+ });
+
+ let mut reader_handles = Vec::new();
+ for _ in 0..args.contend_readers {
+ let utxo_manager = Arc::clone(&utxo_manager);
+ let stop = Arc::clone(&stop);
+ let sample_interval = contend_sample_interval;
+ reader_handles.push(rt.spawn(async move {
+ let mut state_samples_ns: Vec = Vec::new();
+ let mut mempool_samples_ns: Vec = Vec::new();
+
+ if let Some(interval_duration) = sample_interval {
+ let mut interval = tokio::time::interval(interval_duration);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ loop {
+ interval.tick().await;
+ if stop.load(Relaxed) {
+ break;
+ }
+ let t0 = Instant::now();
+ let state = utxo_manager.state().await;
+ std::hint::black_box(state.utxo_count());
+ state_samples_ns.push(t0.elapsed().as_nanos() as u64);
+
+ let t0 = Instant::now();
+ let view = utxo_manager.state_with_mempool().await.unwrap();
+ std::hint::black_box(view.utxo_count());
+ mempool_samples_ns.push(t0.elapsed().as_nanos() as u64);
+ }
+ } else {
+ while !stop.load(Relaxed) {
+ let t0 = Instant::now();
+ let state = utxo_manager.state().await;
+ std::hint::black_box(state.utxo_count());
+ state_samples_ns.push(t0.elapsed().as_nanos() as u64);
+
+ let t0 = Instant::now();
+ let view = utxo_manager.state_with_mempool().await.unwrap();
+ std::hint::black_box(view.utxo_count());
+ mempool_samples_ns.push(t0.elapsed().as_nanos() as u64);
+ }
+ }
+
+ (state_samples_ns, mempool_samples_ns)
+ }));
+ }
+
+ let update_elapsed = rt.block_on(async { update_handle.await.expect("update task panicked") });
+ println!("Contention: update_utxo_set elapsed = {update_elapsed:?}");
+
+ // Ensure readers stop even if the update task completed before they started.
+ stop.store(true, Relaxed);
+
+ let mut merged_state_ns = Vec::new();
+ let mut merged_mempool_ns = Vec::new();
+ for handle in reader_handles {
+ let (state_ns, mempool_ns) =
+ rt.block_on(async { handle.await.expect("reader task panicked") });
+ merged_state_ns.extend(state_ns);
+ merged_mempool_ns.extend(mempool_ns);
+ }
+
+ println!("Read latency while update_utxo_set was running:");
+ summarize_latencies("state().await + utxo_count", merged_state_ns);
+ summarize_latencies("state_with_mempool().await + utxo_count", merged_mempool_ns);
+}
diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs
index 8e0af00..b44ab01 100644
--- a/daemon/src/daemon.rs
+++ b/daemon/src/daemon.rs
@@ -81,11 +81,11 @@ impl Daemon {
keys.clone(),
address_prefix,
)));
- let utxo_manager = Arc::new(Mutex::new(utxo_manager::UtxoManager::new(
+ let utxo_manager = Arc::new(utxo_manager::UtxoManager::new(
address_manager.clone(),
consensus_params,
block_dag_info,
- )));
+ ));
let transaction_generator = Arc::new(Mutex::new(TransactionGenerator::new(
kaspa_rpc_client.clone(),
keys.clone(),
diff --git a/daemon/src/service/broadcast.rs b/daemon/src/service/broadcast.rs
index ed8fff2..8846f71 100644
--- a/daemon/src/service/broadcast.rs
+++ b/daemon/src/service/broadcast.rs
@@ -10,10 +10,7 @@ impl KasWalletService {
let signed_transactions: Vec<_> =
request.transactions.into_iter().map(Into::into).collect();
- let mut utxo_manager = self.utxo_manager.lock().await;
- let transaction_ids = self
- .submit_transactions(&mut utxo_manager, &signed_transactions)
- .await?;
+ let transaction_ids = self.submit_transactions(&signed_transactions).await?;
Ok(BroadcastResponse { transaction_ids })
}
diff --git a/daemon/src/service/common.rs b/daemon/src/service/common.rs
index f040e67..cdb8a45 100644
--- a/daemon/src/service/common.rs
+++ b/daemon/src/service/common.rs
@@ -1,11 +1,9 @@
use crate::service::kaswallet_service::KasWalletService;
-use crate::utxo_manager::UtxoManager;
use common::errors::WalletError::UserInputError;
use common::errors::{ResultExt, WalletResult};
use common::model::WalletSignableTransaction;
use kaspa_consensus_core::sign::Signed::{Fully, Partially};
use kaspa_wallet_core::rpc::RpcApi;
-use tokio::sync::MutexGuard;
impl KasWalletService {
pub(crate) async fn get_virtual_daa_score(&self) -> WalletResult {
@@ -30,8 +28,7 @@ impl KasWalletService {
pub(crate) async fn submit_transactions(
&self,
- utxo_manager: &mut MutexGuard<'_, UtxoManager>,
- signed_transactions: &Vec,
+ signed_transactions: &[WalletSignableTransaction],
) -> WalletResult> {
let _ = self.submit_transaction_mutex.lock().await;
@@ -57,7 +54,7 @@ impl KasWalletService {
transaction_ids.push(rpc_transaction_id.to_string());
- utxo_manager
+ self.utxo_manager
.add_mempool_transaction(signed_transaction)
.await;
}
diff --git a/daemon/src/service/create_unsigned_transaction.rs b/daemon/src/service/create_unsigned_transaction.rs
index a7ef719..346d5a5 100644
--- a/daemon/src/service/create_unsigned_transaction.rs
+++ b/daemon/src/service/create_unsigned_transaction.rs
@@ -1,12 +1,11 @@
use crate::service::kaswallet_service::KasWalletService;
-use crate::utxo_manager::UtxoManager;
-use common::errors::WalletError::UserInputError;
+use crate::utxo_manager::UtxoStateView;
+use common::errors::WalletError::{InternalServerError, UserInputError};
use common::errors::WalletResult;
use common::model::WalletSignableTransaction;
use proto::kaswallet_proto::{
CreateUnsignedTransactionsRequest, CreateUnsignedTransactionsResponse, TransactionDescription,
};
-use tokio::sync::MutexGuard;
impl KasWalletService {
pub(crate) async fn create_unsigned_transactions(
@@ -21,12 +20,13 @@ impl KasWalletService {
let transaction_description = request.transaction_description.unwrap();
let unsigned_transactions: Vec;
{
- let utxo_manager = self.utxo_manager.lock().await;
+ let utxo_state = self
+ .utxo_manager
+ .state_with_mempool()
+ .await
+ .map_err(|e| InternalServerError(e.to_string()))?;
unsigned_transactions = self
- .create_unsigned_transactions_from_description(
- transaction_description,
- &utxo_manager,
- )
+ .create_unsigned_transactions_from_description(transaction_description, &utxo_state)
.await?;
}
@@ -38,13 +38,17 @@ impl KasWalletService {
pub(crate) async fn create_unsigned_transactions_from_description(
&self,
transaction_description: TransactionDescription,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_state: &UtxoStateView,
) -> WalletResult> {
self.check_is_synced().await?;
let mut transaction_generator = self.transaction_generator.lock().await;
transaction_generator
- .create_unsigned_transactions(utxo_manager, transaction_description)
+ .create_unsigned_transactions(
+ self.utxo_manager.as_ref(),
+ utxo_state,
+ transaction_description,
+ )
.await
}
}
diff --git a/daemon/src/service/get_balance.rs b/daemon/src/service/get_balance.rs
index 926810a..0ef63a4 100644
--- a/daemon/src/service/get_balance.rs
+++ b/daemon/src/service/get_balance.rs
@@ -1,6 +1,5 @@
use crate::service::kaswallet_service::KasWalletService;
-use common::errors::{ResultExt, WalletResult};
-use common::model::WalletUtxo;
+use common::errors::{ResultExt, WalletError::InternalServerError, WalletResult};
use log::info;
use proto::kaswallet_proto::{AddressBalances, GetBalanceRequest, GetBalanceResponse};
use std::collections::HashMap;
@@ -15,43 +14,56 @@ impl KasWalletService {
let virtual_daa_score = self.get_virtual_daa_score().await?;
let mut balances_map = HashMap::new();
- let utxos_sorted_by_amount: Vec;
- let utxos_count: usize;
- {
- let utxo_manager = self.utxo_manager.lock().await;
- utxos_sorted_by_amount = utxo_manager.utxos_sorted_by_amount();
+ let utxo_state = self
+ .utxo_manager
+ .state_with_mempool()
+ .await
+ .map_err(|e| InternalServerError(e.to_string()))?;
- utxos_count = utxos_sorted_by_amount.len();
- for entry in utxos_sorted_by_amount {
- let amount = entry.utxo_entry.amount;
- let balances = balances_map
- .entry(entry.address.clone())
- .or_insert_with(BalancesEntry::new);
- if utxo_manager.is_utxo_pending(&entry, virtual_daa_score) {
- balances.add_pending(amount);
- } else {
- balances.add_available(amount);
- }
+ let utxos_count = utxo_state.utxo_count();
+ if utxos_count == 0 {
+ info!("GetBalance request scanned 0 UTXOs overall over 0 addresses");
+ return Ok(GetBalanceResponse {
+ available: 0,
+ pending: 0,
+ address_balances: vec![],
+ });
+ }
+
+ for utxo in utxo_state.utxos_iter() {
+ let amount = utxo.utxo_entry.amount;
+ let balances = balances_map
+ .entry(utxo.address.clone())
+ .or_insert_with(BalancesEntry::new);
+ if self.utxo_manager.is_utxo_pending(utxo, virtual_daa_score) {
+ balances.add_pending(amount);
+ } else {
+ balances.add_available(amount);
}
}
let mut address_balances = vec![];
let mut total_balances = BalancesEntry::new();
- let address_manager = self.address_manager.lock().await;
- for (wallet_address, balances) in &balances_map {
- let address = address_manager
- .kaspa_address_from_wallet_address(wallet_address, true)
- .await
- .to_wallet_result_internal()?;
+ if request.include_balance_per_address {
+ let address_manager = self.address_manager.lock().await;
+ address_balances.reserve(balances_map.len());
+ for (wallet_address, balances) in &balances_map {
+ let address = address_manager
+ .kaspa_address_from_wallet_address(wallet_address, true)
+ .await
+ .to_wallet_result_internal()?;
- if request.include_balance_per_address {
address_balances.push(AddressBalances {
address: address.to_string(),
available: balances.available,
pending: balances.pending,
});
+ total_balances.add(balances);
+ }
+ } else {
+ for balances in balances_map.values() {
+ total_balances.add(balances);
}
- total_balances.add(balances);
}
info!(
diff --git a/daemon/src/service/get_utxos.rs b/daemon/src/service/get_utxos.rs
index 8cbfe37..063ca70 100644
--- a/daemon/src/service/get_utxos.rs
+++ b/daemon/src/service/get_utxos.rs
@@ -1,14 +1,14 @@
use crate::address_manager::AddressSet;
use crate::service::kaswallet_service::KasWalletService;
-use common::errors::WalletError::UserInputError;
+use common::errors::WalletError::{InternalServerError, UserInputError};
use common::errors::{ResultExt, WalletResult};
-use common::model::WalletUtxo;
+use common::model::WalletAddress;
use kaspa_addresses::Address;
use kaspa_wallet_core::rpc::RpcApi;
use proto::kaswallet_proto::{
AddressToUtxos, GetUtxosRequest, GetUtxosResponse, Utxo as ProtoUtxo,
};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
impl KasWalletService {
pub(crate) async fn get_utxos(
@@ -24,8 +24,8 @@ impl KasWalletService {
let address_manager = self.address_manager.lock().await;
address_set = address_manager.address_set().await;
}
- let address_strings: Vec = if request.addresses.is_empty() {
- address_set.keys().cloned().collect()
+ let allowed_addresses: Option> = if request.addresses.is_empty() {
+ None
} else {
for address in &request.addresses {
if !address_set.contains_key(address) {
@@ -35,8 +35,14 @@ impl KasWalletService {
)));
}
}
- request.addresses
+ Some(request.addresses.iter().cloned().collect())
};
+ let wallet_address_to_string: HashMap = address_set
+ .iter()
+ .map(|(address_string, wallet_address)| {
+ (wallet_address.clone(), address_string.clone())
+ })
+ .collect();
let fee_estimate = self
.kaspa_client
@@ -48,85 +54,70 @@ impl KasWalletService {
let virtual_daa_score = self.get_virtual_daa_score().await?;
- let utxos = {
- let utxo_manager = self.utxo_manager.lock().await;
- utxo_manager.utxos_sorted_by_amount()
- };
- let filtered_bucketed_utxos = self
- .filter_utxos_and_bucket_by_address(
- &utxos,
- fee_rate,
- virtual_daa_score,
- &address_strings,
- request.include_pending,
- request.include_dust,
- )
- .await?;
-
- let addresses_to_utxos = filtered_bucketed_utxos
- .iter()
- .map(|(address_string, utxos)| AddressToUtxos {
- address: address_string.clone(),
- utxos: utxos.clone(),
- })
- .collect();
+ let utxo_state = self
+ .utxo_manager
+ .state_with_mempool()
+ .await
+ .map_err(|e| InternalServerError(e.to_string()))?;
- Ok(GetUtxosResponse { addresses_to_utxos })
- }
+ let dust_fee_for_single_utxo = if request.include_dust {
+ None
+ } else {
+ let sample_utxo = { utxo_state.utxos_sorted_by_amount().next().cloned() };
+ if let Some(sample_utxo) = sample_utxo {
+ let transaction_generator = self.transaction_generator.lock().await;
+ let mass = transaction_generator
+ .estimate_mass(
+ &vec![sample_utxo.clone()],
+ sample_utxo.utxo_entry.amount,
+ &[] as &[u8],
+ )
+ .await?;
+ Some(((mass as f64) * fee_rate).ceil() as u64)
+ } else {
+ None
+ }
+ };
- async fn filter_utxos_and_bucket_by_address(
- &self,
- utxos: &Vec,
- fee_rate: f64,
- virtual_daa_score: u64,
- address_strings: &[String],
- include_pending: bool,
- include_dust: bool,
- ) -> WalletResult>> {
- let mut filtered_bucketed_utxos = HashMap::new();
- for utxo in utxos {
- let is_pending = {
- let utxo_manager = self.utxo_manager.lock().await;
- utxo_manager.is_utxo_pending(utxo, virtual_daa_score)
- };
- if !include_pending && is_pending {
+ let mut filtered_bucketed_utxos: HashMap> = HashMap::new();
+ for utxo in utxo_state.utxos_sorted_by_amount() {
+ let is_pending = self.utxo_manager.is_utxo_pending(utxo, virtual_daa_score);
+ if !request.include_pending && is_pending {
continue;
}
- let is_dust = self.is_utxo_dust(utxo, fee_rate).await?;
- if !include_dust && is_dust {
+
+ let is_dust =
+ dust_fee_for_single_utxo.is_some_and(|dust_fee| dust_fee >= utxo.utxo_entry.amount);
+ if !request.include_dust && is_dust {
continue;
}
- let address: String;
- {
- let address_manager = self.address_manager.lock().await;
- address = address_manager
- .kaspa_address_from_wallet_address(&utxo.address, true)
- .await?
- .address_to_string();
+ let address = wallet_address_to_string.get(&utxo.address).ok_or_else(|| {
+ InternalServerError(format!(
+ "wallet address missing from address_set: {:?}",
+ utxo.address
+ ))
+ })?;
+ if let Some(allowed_addresses) = &allowed_addresses {
+ if !allowed_addresses.contains(address) {
+ continue;
+ }
}
- if !address_strings.is_empty() && !address_strings.contains(&address) {
- continue;
+ match filtered_bucketed_utxos.get_mut(address) {
+ Some(bucket) => bucket.push(utxo.to_proto(is_pending, is_dust)),
+ None => {
+ filtered_bucketed_utxos
+ .insert(address.clone(), vec![utxo.to_proto(is_pending, is_dust)]);
+ }
}
-
- let entry = filtered_bucketed_utxos
- .entry(address)
- .or_insert_with(Vec::new);
- entry.push(utxo.to_owned().into_proto(is_pending, is_dust));
}
- Ok(filtered_bucketed_utxos)
- }
-
- async fn is_utxo_dust(&self, utxo: &WalletUtxo, fee_rate: f64) -> WalletResult {
- let transaction_generator = self.transaction_generator.lock().await;
- let mass = transaction_generator
- .estimate_mass(&vec![utxo.clone()], utxo.utxo_entry.amount, &[])
- .await?;
-
- let fee = ((mass as f64) * fee_rate).ceil() as u64;
+ let addresses_to_utxos = filtered_bucketed_utxos
+ .into_iter()
+ .map(|(address, utxos)| AddressToUtxos { address, utxos })
+ .collect();
- Ok(fee >= utxo.utxo_entry.amount)
+ Ok(GetUtxosResponse { addresses_to_utxos })
}
}
diff --git a/daemon/src/service/kaswallet_service.rs b/daemon/src/service/kaswallet_service.rs
index f6d8e7b..1225508 100644
--- a/daemon/src/service/kaswallet_service.rs
+++ b/daemon/src/service/kaswallet_service.rs
@@ -22,7 +22,7 @@ pub struct KasWalletService {
pub(crate) kaspa_client: Arc,
pub(crate) keys: Arc,
pub(crate) address_manager: Arc>,
- pub(crate) utxo_manager: Arc>,
+ pub(crate) utxo_manager: Arc,
pub(crate) transaction_generator: Arc>,
pub(crate) sync_manager: Arc,
pub(crate) submit_transaction_mutex: Mutex<()>,
@@ -33,7 +33,7 @@ impl KasWalletService {
kaspa_client: Arc,
keys: Arc,
address_manager: Arc>,
- utxo_manager: Arc>,
+ utxo_manager: Arc,
transaction_generator: Arc>,
sync_manager: Arc,
) -> Self {
diff --git a/daemon/src/service/send.rs b/daemon/src/service/send.rs
index 85bd291..3869cc7 100644
--- a/daemon/src/service/send.rs
+++ b/daemon/src/service/send.rs
@@ -1,5 +1,5 @@
use crate::service::kaswallet_service::KasWalletService;
-use common::errors::WalletError::UserInputError;
+use common::errors::WalletError::{InternalServerError, UserInputError};
use common::errors::WalletResult;
use log::{debug, error, info};
use proto::kaswallet_proto::{SendRequest, SendResponse};
@@ -7,10 +7,6 @@ use std::time::Instant;
impl KasWalletService {
pub(crate) async fn send(&self, request: SendRequest) -> WalletResult {
- // lock utxo_manager at this point, so that if sync happens in the middle - it doesn't
- // interfere with apply_transaction
- let mut utxo_manager = self.utxo_manager.lock().await;
-
let send_start = Instant::now();
let transaction_description = match request.transaction_description {
Some(description) => description,
@@ -27,8 +23,13 @@ impl KasWalletService {
debug!("Creating unsigned transactions...");
+ let utxo_state = self
+ .utxo_manager
+ .state_with_mempool()
+ .await
+ .map_err(|e| InternalServerError(e.to_string()))?;
let unsigned_transactions = self
- .create_unsigned_transactions_from_description(transaction_description, &utxo_manager)
+ .create_unsigned_transactions_from_description(transaction_description, &utxo_state)
.await?;
debug!("Created {} transactions", unsigned_transactions.len());
@@ -39,9 +40,7 @@ impl KasWalletService {
debug!("Transactions got signed!");
debug!("Submitting transactions...");
- let submit_transactions_result = self
- .submit_transactions(&mut utxo_manager, &signed_transactions)
- .await;
+ let submit_transactions_result = self.submit_transactions(&signed_transactions).await;
if let Err(e) = submit_transactions_result {
error!("Failed to submit transactions: {}", e);
return Err(e);
diff --git a/daemon/src/sync_manager.rs b/daemon/src/sync_manager.rs
index 9e4c9c4..ba15462 100644
--- a/daemon/src/sync_manager.rs
+++ b/daemon/src/sync_manager.rs
@@ -1,4 +1,4 @@
-use crate::address_manager::{AddressManager, AddressSet};
+use crate::address_manager::{AddressManager, AddressQuerySet};
use crate::utxo_manager::UtxoManager;
use common::keys::Keys;
use kaspa_addresses::Address;
@@ -21,11 +21,12 @@ pub struct SyncManager {
kaspa_client: Arc,
keys_file: Arc,
address_manager: Arc>,
- utxo_manager: Arc>,
+ utxo_manager: Arc,
sync_interval_millis: u64,
first_sync_done: AtomicBool,
next_sync_start_index: AtomicU32,
+ recent_scan_next_index: AtomicU32,
is_log_final_progress_line_shown: AtomicBool,
max_used_addresses_for_log: AtomicU32,
max_processed_addresses_for_log: AtomicU32,
@@ -36,7 +37,7 @@ impl SyncManager {
kaspa_rpc_client: Arc,
keys_file: Arc,
address_manager: Arc>,
- utxo_manager: Arc>,
+ utxo_manager: Arc,
sync_interval: u64,
) -> Self {
Self {
@@ -47,6 +48,7 @@ impl SyncManager {
sync_interval_millis: sync_interval,
first_sync_done: AtomicBool::new(false),
next_sync_start_index: 0.into(),
+ recent_scan_next_index: 0.into(),
is_log_final_progress_line_shown: false.into(),
max_used_addresses_for_log: 0.into(),
max_processed_addresses_for_log: 0.into(),
@@ -94,19 +96,17 @@ impl SyncManager {
async fn refresh_utxos(&self) -> Result<(), Box> {
debug!("Refreshing UTXOs...");
- let address_strings: Vec;
+ let monitored_addresses: Arc>;
{
let address_manager = self.address_manager.lock().await;
- address_strings = address_manager.address_strings().await?;
+ monitored_addresses = address_manager.monitored_addresses().await?;
}
- let addresses: Vec = address_strings
- .iter()
- .map(|address_string| Address::constructor(address_string))
- .collect();
+ let addresses: Vec = monitored_addresses.as_ref().clone();
- // Lock utxo_manager at this stage, so that nobody tries to generate transactions while
- // we update the utxo set
- let mut utxo_manager = self.utxo_manager.lock().await;
+ if addresses.is_empty() {
+ self.utxo_manager.update_utxo_set(vec![], vec![]).await?;
+ return Ok(());
+ }
debug!("Getting mempool entries for addresses: {:?}...", addresses);
// It's important to check the mempool before calling `GetUTXOsByAddresses`:
@@ -135,7 +135,9 @@ impl SyncManager {
self.kaspa_client.get_utxos_by_addresses(addresses).await?;
debug!("Got {} utxo entries", get_utxo_by_addresses_response.len());
- utxo_manager
+ // `update_utxo_set` builds a new snapshot without holding any read locks and
+ // swaps the Arc pointer under a brief lock.
+ self.utxo_manager
.update_utxo_set(get_utxo_by_addresses_response, mempool_entries_by_addresses)
.await?;
@@ -158,26 +160,73 @@ impl SyncManager {
pub async fn collect_recent_addresses(&self) -> Result<(), Box> {
debug!("Collecting recent addresses");
+ if !self.first_sync_done.load(Relaxed) {
+ return self.collect_recent_addresses_full_scan().await;
+ }
+ self.collect_recent_addresses_incremental().await
+ }
+
+ async fn collect_recent_addresses_full_scan(&self) -> Result<(), Box> {
let mut index: u32 = 0;
let mut max_used_index: u32 = 0;
- while index < max_used_index + NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES {
+ while index < max_used_index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES) {
self.collect_addresses(index, index + NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES)
.await?;
- index += NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES;
+ index = index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES);
max_used_index = self.last_used_index().await;
self.update_address_collection_progress_log(index, max_used_index);
}
- let next_sync_start_index = self.next_sync_start_index.load(Relaxed);
- if index > next_sync_start_index {
- self.next_sync_start_index.store(index, Relaxed);
+ self.bump_next_sync_start_index(index);
+ Ok(())
+ }
+
+ async fn collect_recent_addresses_incremental(
+ &self,
+ ) -> Result<(), Box> {
+ // After the initial full scan, we avoid rescanning from index 0 on every tick. Instead we
+ // scan a fixed-size chunk per cycle and advance a cursor, eventually covering the full
+ // range [0, last_used_index + LOOKAHEAD).
+ let last_used_index = self.last_used_index().await;
+ let frontier = Self::recent_scan_frontier(last_used_index);
+
+ // Keep "synced" semantics stable as last_used_index grows.
+ self.bump_next_sync_start_index(frontier);
+
+ let cursor = self.recent_scan_next_index.load(Relaxed);
+ let (start, end, next_cursor) = Self::recent_scan_step(frontier, cursor);
+
+ debug!(
+ "Incremental recent-address scan: [{}, {}) (cursor={}, frontier={}, last_used_index={})",
+ start, end, cursor, frontier, last_used_index
+ );
+
+ if start < end {
+ self.collect_addresses(start, end).await?;
}
+ self.recent_scan_next_index.store(next_cursor, Relaxed);
Ok(())
}
+ fn recent_scan_frontier(last_used_index: u32) -> u32 {
+ last_used_index.saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES)
+ }
+
+ fn recent_scan_step(frontier: u32, cursor: u32) -> (u32, u32, u32) {
+ if frontier == 0 {
+ return (0, 0, 0);
+ }
+ let start = if cursor >= frontier { 0 } else { cursor };
+ let end = start
+ .saturating_add(NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES)
+ .min(frontier);
+ let next_cursor = if end >= frontier { 0 } else { end };
+ (start, end, next_cursor)
+ }
+
pub async fn collect_far_addresses(&self) -> Result<(), Box> {
debug!("Collecting far addresses");
@@ -195,6 +244,13 @@ impl SyncManager {
Ok(())
}
+ fn bump_next_sync_start_index(&self, candidate: u32) {
+ let current = self.next_sync_start_index.load(Relaxed);
+ if candidate > current {
+ self.next_sync_start_index.store(candidate, Relaxed);
+ }
+ }
+
async fn collect_addresses(
&self,
start: u32,
@@ -202,7 +258,7 @@ impl SyncManager {
) -> Result<(), Box> {
debug!("Collecting addresses from {} to {}", start, end);
- let addresses: AddressSet;
+ let addresses: AddressQuerySet;
{
let address_manager = self.address_manager.lock().await;
addresses = address_manager.addresses_to_query(start, end).await?;
@@ -211,12 +267,7 @@ impl SyncManager {
let get_balances_by_addresses_response = self
.kaspa_client
- .get_balances_by_addresses(
- addresses
- .keys()
- .map(|address_string| Address::constructor(address_string))
- .collect(),
- )
+ .get_balances_by_addresses(addresses.keys().cloned().collect())
.await?;
let address_manager = self.address_manager.lock().await;
@@ -260,7 +311,7 @@ impl SyncManager {
* 100.0;
info!(
- "{} addressed of {} of processed ({:.2}%)",
+ "{} addresses of {} processed ({:.2}%)",
self.max_processed_addresses_for_log.load(Relaxed),
self.max_used_addresses_for_log.load(Relaxed),
percent_processed
@@ -268,3 +319,63 @@ impl SyncManager {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::SyncManager;
+
+ #[test]
+ fn recent_scan_frontier_saturates_at_u32_max() {
+ let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES;
+ assert_eq!(SyncManager::recent_scan_frontier(u32::MAX), u32::MAX);
+ assert_eq!(
+ SyncManager::recent_scan_frontier(u32::MAX - lookahead + 1),
+ u32::MAX
+ );
+ }
+
+ #[test]
+ fn recent_scan_step_clamps_end_to_frontier_and_wraps() {
+ let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES;
+ let frontier = lookahead / 2;
+ let (start, end, next_cursor) = SyncManager::recent_scan_step(frontier, 0);
+ assert_eq!(start, 0);
+ assert_eq!(end, frontier);
+ assert_eq!(next_cursor, 0);
+ }
+
+ #[test]
+ fn recent_scan_step_advances_cursor_in_chunks_and_wraps() {
+ let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES;
+ let frontier = lookahead * 2 + 500;
+
+ let (start1, end1, cursor1) = SyncManager::recent_scan_step(frontier, 0);
+ assert_eq!((start1, end1, cursor1), (0, lookahead, lookahead));
+
+ let (start2, end2, cursor2) = SyncManager::recent_scan_step(frontier, cursor1);
+ assert_eq!(
+ (start2, end2, cursor2),
+ (lookahead, lookahead * 2, lookahead * 2)
+ );
+
+ let (start3, end3, cursor3) = SyncManager::recent_scan_step(frontier, cursor2);
+ assert_eq!((start3, end3, cursor3), (lookahead * 2, frontier, 0));
+
+ let (start4, end4, cursor4) = SyncManager::recent_scan_step(frontier, cursor3);
+ assert_eq!((start4, end4, cursor4), (0, lookahead, lookahead));
+ }
+
+ #[test]
+ fn recent_scan_step_resets_stale_cursor_to_zero() {
+ let lookahead = super::NUM_INDEXES_TO_QUERY_FOR_RECENT_ADDRESSES;
+ let frontier = lookahead;
+ let (start, end, next_cursor) = SyncManager::recent_scan_step(frontier, lookahead * 2);
+ assert_eq!((start, end, next_cursor), (0, frontier, 0));
+ }
+
+ #[test]
+ fn recent_scan_step_zero_frontier_is_empty() {
+ assert_eq!(SyncManager::recent_scan_step(0, 0), (0, 0, 0));
+ assert_eq!(SyncManager::recent_scan_step(0, 123), (0, 0, 0));
+ }
+}
diff --git a/daemon/src/transaction_generator.rs b/daemon/src/transaction_generator.rs
index a96ab3d..f7ce1a1 100644
--- a/daemon/src/transaction_generator.rs
+++ b/daemon/src/transaction_generator.rs
@@ -1,5 +1,5 @@
use crate::address_manager::AddressManager;
-use crate::utxo_manager::UtxoManager;
+use crate::utxo_manager::{UtxoManager, UtxoStateView};
use common::errors::WalletError::{SanityCheckFailed, UserInputError};
use common::errors::{ResultExt, WalletError, WalletResult};
use common::keys::Keys;
@@ -24,7 +24,7 @@ use proto::kaswallet_proto::{FeePolicy, Outpoint, TransactionDescription, fee_po
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use tokio::sync::{Mutex, MutexGuard};
+use tokio::sync::Mutex;
// The current minimal fee rate according to mempool standards
const MIN_FEE_RATE: f64 = 1.0;
@@ -69,7 +69,8 @@ impl TransactionGenerator {
pub async fn create_unsigned_transactions(
&mut self,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_manager: &UtxoManager,
+ utxo_state: &UtxoStateView,
transaction_description: TransactionDescription,
) -> WalletResult> {
let validate_address = |address_string, name| -> WalletResult {
@@ -113,9 +114,9 @@ impl TransactionGenerator {
HashMap::new()
} else {
let mut preselected_utxos = HashMap::new();
- let utxos_by_outpoint = utxo_manager.utxos_by_outpoint();
for preselected_outpoint in &transaction_description.utxos {
- if let Some(utxo) = utxos_by_outpoint.get(&preselected_outpoint.clone().into()) {
+ let outpoint: WalletOutpoint = preselected_outpoint.clone().into();
+ if let Some(utxo) = utxo_state.get_utxo_by_outpoint(&outpoint) {
preselected_utxos.insert(utxo.outpoint.clone(), utxo.clone());
} else {
return Err(UserInputError(format!(
@@ -145,6 +146,7 @@ impl TransactionGenerator {
(selected_utxos, amount_sent_to_recipient, change_sompi) = self
.select_utxos(
utxo_manager,
+ utxo_state,
&preselected_utxos,
transaction_description.amount,
transaction_description.is_send_all,
@@ -178,6 +180,7 @@ impl TransactionGenerator {
let unsigned_transactions = self
.maybe_auto_compound_transaction(
utxo_manager,
+ utxo_state,
unsigned_transaction,
&selected_utxos,
from_addresses,
@@ -198,7 +201,8 @@ impl TransactionGenerator {
#[allow(clippy::too_many_arguments)]
async fn maybe_auto_compound_transaction(
&self,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_manager: &UtxoManager,
+ utxo_state: &UtxoStateView,
original_wallet_transaction: WalletSignableTransaction,
original_selected_utxos: &Vec,
from_addresses: Vec<&WalletAddress>,
@@ -267,6 +271,7 @@ impl TransactionGenerator {
let merge_transaction = self
.merge_transaction(
utxo_manager,
+ utxo_state,
&split_transactions,
&original_consensus_transaction.tx,
original_selected_utxos,
@@ -285,6 +290,7 @@ impl TransactionGenerator {
// Recursion will be 2-3 iterations deep even in the rarest cases, so considered safe...
let split_merge_transaction = Box::pin(self.maybe_auto_compound_transaction(
utxo_manager,
+ utxo_state,
merge_transaction,
original_selected_utxos,
from_addresses,
@@ -309,7 +315,8 @@ impl TransactionGenerator {
#[allow(clippy::too_many_arguments)]
async fn merge_transaction(
&self,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_manager: &UtxoManager,
+ utxo_state: &UtxoStateView,
split_transactions: &[WalletSignableTransaction],
original_consensus_transaction: &Transaction,
original_selected_utxos: &[WalletUtxo],
@@ -412,6 +419,7 @@ impl TransactionGenerator {
let (additional_utxos, total_value_added) = self
.more_utxos_for_merge_transaction(
utxo_manager,
+ utxo_state,
original_consensus_transaction,
original_selected_utxos,
from_addresses,
@@ -458,9 +466,11 @@ impl TransactionGenerator {
}
// Returns: (additional_utxos, total_Value_added)
+ #[allow(clippy::too_many_arguments)]
async fn more_utxos_for_merge_transaction(
&self,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_manager: &UtxoManager,
+ utxo_state: &UtxoStateView,
original_consensus_transaction: &Transaction,
original_selected_utxos: &[WalletUtxo],
from_addresses: &[&WalletAddress],
@@ -478,20 +488,30 @@ impl TransactionGenerator {
.await;
let fee_per_input = (mass_per_input as f64 * fee_rate).ceil() as u64;
- let utxos_sorted_by_amount = utxo_manager.utxos_sorted_by_amount();
- let already_selected_utxos =
- HashSet::::from_iter(original_selected_utxos.iter().cloned());
+ let already_selected_outpoints = HashSet::::from_iter(
+ original_selected_utxos
+ .iter()
+ .map(|utxo| utxo.outpoint.clone()),
+ );
+
+ let from_addresses_set: Option> = if from_addresses.is_empty() {
+ None
+ } else {
+ Some(from_addresses.iter().map(|wa| (*wa).clone()).collect())
+ };
let mut additional_utxos = vec![];
let mut total_value_added = 0;
- for utxo in utxos_sorted_by_amount {
- if already_selected_utxos.contains(&utxo)
- || utxo_manager.is_utxo_pending(&utxo, dag_info.virtual_daa_score)
+ for utxo in utxo_state.utxos_sorted_by_amount() {
+ if already_selected_outpoints.contains(&utxo.outpoint)
+ || utxo_manager.is_utxo_pending(utxo, dag_info.virtual_daa_score)
{
continue;
}
- if !from_addresses.is_empty() && !from_addresses.contains(&&utxo.address) {
- continue;
+ if let Some(ref allowed_set) = from_addresses_set {
+ if !allowed_set.contains(&utxo.address) {
+ continue;
+ }
}
additional_utxos.push(utxo.clone());
@@ -784,7 +804,8 @@ impl TransactionGenerator {
#[allow(clippy::too_many_arguments)]
pub async fn select_utxos(
&mut self,
- utxo_manager: &MutexGuard<'_, UtxoManager>,
+ utxo_manager: &UtxoManager,
+ utxo_state: &UtxoStateView,
preselected_utxos: &HashMap,
amount: u64,
is_send_all: bool,
@@ -804,6 +825,12 @@ impl TransactionGenerator {
let mut total_value = 0;
let mut selected_utxos = vec![];
+ let from_addresses_set: Option> = if from_addresses.is_empty() {
+ None
+ } else {
+ Some(from_addresses.iter().map(|wa| (*wa).clone()).collect())
+ };
+
let dag_info = self
.kaspa_client
.get_block_dag_info()
@@ -813,11 +840,12 @@ impl TransactionGenerator {
let mut fee = 0;
let mut fee_per_utxo = None;
let mut iteration = async |transaction_generator: &mut TransactionGenerator,
- utxo_manager: &MutexGuard,
utxo: &WalletUtxo|
-> WalletResult {
- if !from_addresses.is_empty() && !from_addresses.contains(&&utxo.address) {
- return Ok(true);
+ if let Some(ref allowed_set) = from_addresses_set {
+ if !allowed_set.contains(&utxo.address) {
+ return Ok(true);
+ }
}
if utxo_manager.is_utxo_pending(utxo, dag_info.virtual_daa_score) {
return Ok(true);
@@ -858,16 +886,19 @@ impl TransactionGenerator {
}
Ok(true)
};
- let owned_utxos = utxo_manager.utxos_sorted_by_amount();
- let available_utxos: Vec<_> = if !preselected_utxos.is_empty() {
- preselected_utxos.values().collect()
+ if !preselected_utxos.is_empty() {
+ for utxo in preselected_utxos.values() {
+ let should_continue = iteration(self, utxo).await?;
+ if !should_continue {
+ break;
+ }
+ }
} else {
- owned_utxos.iter().collect()
- };
- for utxo in available_utxos {
- let should_continue = iteration(self, utxo_manager, utxo).await?;
- if !should_continue {
- break;
+ for utxo in utxo_state.utxos_sorted_by_amount() {
+ let should_continue = iteration(self, utxo).await?;
+ if !should_continue {
+ break;
+ }
}
}
diff --git a/daemon/src/utxo_manager.rs b/daemon/src/utxo_manager.rs
index 5d43d64..bbbfd98 100644
--- a/daemon/src/utxo_manager.rs
+++ b/daemon/src/utxo_manager.rs
@@ -1,24 +1,250 @@
-use crate::address_manager::{AddressManager, AddressSet};
-use common::model::{WalletOutpoint, WalletSignableTransaction, WalletUtxo, WalletUtxoEntry};
-use itertools::Itertools;
+use crate::address_manager::AddressManager;
+use common::model::{
+ WalletAddress, WalletOutpoint, WalletSignableTransaction, WalletUtxo, WalletUtxoEntry,
+};
+use kaspa_addresses::Address;
use kaspa_consensus_core::config::params::Params;
use kaspa_rpc_core::{GetBlockDagInfoResponse, RpcMempoolEntryByAddress, RpcUtxosByAddressesEntry};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
-use tokio::sync::Mutex;
+use tokio::sync::{Mutex, RwLock};
+
+#[derive(Debug, Default)]
+pub struct UtxoState {
+ pub(crate) utxos_by_outpoint: HashMap,
+ // Sorted by (amount, outpoint) so the outpoint can be used as a deterministic tiebreaker.
+ pub(crate) utxo_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)>,
+}
+
+impl UtxoState {
+ pub fn new_empty() -> Self {
+ Self::default()
+ }
+
+ pub fn utxo_count(&self) -> usize {
+ self.utxos_by_outpoint.len()
+ }
+
+ pub fn utxos_by_outpoint(&self) -> &HashMap {
+ &self.utxos_by_outpoint
+ }
+
+ pub fn get_utxo_by_outpoint(&self, outpoint: &WalletOutpoint) -> Option<&WalletUtxo> {
+ self.utxos_by_outpoint.get(outpoint)
+ }
+
+ pub fn utxos_sorted_by_amount(&self) -> impl Iterator- + '_ {
+ self.utxo_keys_sorted_by_amount.iter().map(|(_, outpoint)| {
+ self.utxos_by_outpoint
+ .get(outpoint)
+ .expect("utxo_keys_sorted_by_amount contains unknown outpoint")
+ })
+ }
+}
+
+pub struct UtxoStateView {
+ base_state: Arc,
+ removed_utxos: HashSet,
+ added_utxos: HashMap,
+ added_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)>,
+}
+
+impl UtxoStateView {
+ pub fn new(base_state: Arc) -> Self {
+ Self {
+ base_state,
+ removed_utxos: HashSet::new(),
+ added_utxos: HashMap::new(),
+ added_keys_sorted_by_amount: Vec::new(),
+ }
+ }
+
+ pub fn from_mempool_overlay(
+ base_state: Arc,
+ mempool_txs: &[WalletSignableTransaction],
+ address_map: &HashMap,
+ ) -> Self {
+ let mut removed_utxos = HashSet::new();
+ let mut added_utxos = HashMap::new();
+ let mut added_keys_sorted_by_amount = Vec::new();
+
+ for tx in mempool_txs {
+ let consensus_tx = &tx.transaction.unwrap_ref().tx;
+
+ // Inputs are removed from the effective UTXO set.
+ for input in &consensus_tx.inputs {
+ removed_utxos.insert(input.previous_outpoint.into());
+ }
+
+ // Outputs are added if they pay to one of our addresses.
+ for (i, output) in consensus_tx.outputs.iter().enumerate() {
+ let kaspa_address = &tx.address_by_output_index[i];
+ let Some(wallet_address) = address_map.get(kaspa_address) else {
+ continue;
+ };
+
+ let outpoint = WalletOutpoint {
+ transaction_id: consensus_tx.id(),
+ index: i as u32,
+ };
+ let utxo_entry = WalletUtxoEntry {
+ amount: output.value,
+ script_public_key: output.script_public_key.clone(),
+ block_daa_score: 0,
+ is_coinbase: false,
+ };
+ let utxo = WalletUtxo::new(outpoint.clone(), utxo_entry, wallet_address.clone());
+
+ let previous = added_utxos.insert(outpoint.clone(), utxo);
+ debug_assert!(
+ previous.is_none(),
+ "mempool overlay inserted outpoint twice"
+ );
+ added_keys_sorted_by_amount.push((output.value, outpoint));
+ }
+ }
+
+ added_keys_sorted_by_amount.sort_unstable();
+
+ Self {
+ base_state,
+ removed_utxos,
+ added_utxos,
+ added_keys_sorted_by_amount,
+ }
+ }
+
+ pub fn base_state(&self) -> &Arc {
+ &self.base_state
+ }
+
+ pub fn utxo_count(&self) -> usize {
+ let removed_in_base = self
+ .removed_utxos
+ .iter()
+ .filter(|outpoint| self.base_state.utxos_by_outpoint.contains_key(*outpoint))
+ .count();
+ self.base_state.utxos_by_outpoint.len() - removed_in_base + self.added_utxos.len()
+ }
+
+ pub fn get_utxo_by_outpoint(&self, outpoint: &WalletOutpoint) -> Option<&WalletUtxo> {
+ if self.removed_utxos.contains(outpoint) {
+ return None;
+ }
+ if let Some(utxo) = self.added_utxos.get(outpoint) {
+ return Some(utxo);
+ }
+ self.base_state.utxos_by_outpoint.get(outpoint)
+ }
+
+ pub fn utxos_iter(&self) -> impl Iterator
- + '_ {
+ let base_iter = self
+ .base_state
+ .utxos_by_outpoint
+ .values()
+ .filter(|utxo| !self.removed_utxos.contains(&utxo.outpoint));
+ base_iter.chain(self.added_utxos.values())
+ }
+
+ pub fn utxos_sorted_by_amount(&self) -> UtxosSortedByAmountIter<'_> {
+ UtxosSortedByAmountIter {
+ view: self,
+ base_index: 0,
+ added_index: 0,
+ }
+ }
+}
+
+pub struct UtxosSortedByAmountIter<'a> {
+ view: &'a UtxoStateView,
+ base_index: usize,
+ added_index: usize,
+}
+
+impl<'a> Iterator for UtxosSortedByAmountIter<'a> {
+ type Item = &'a WalletUtxo;
+
+ fn next(&mut self) -> Option {
+ let base_keys = self.view.base_state.utxo_keys_sorted_by_amount.as_slice();
+ let added_keys = self.view.added_keys_sorted_by_amount.as_slice();
+
+ let next_base = loop {
+ if self.base_index >= base_keys.len() {
+ break None;
+ }
+ let (amount, outpoint) = &base_keys[self.base_index];
+ if self.view.removed_utxos.contains(outpoint) {
+ self.base_index += 1;
+ continue;
+ }
+ break Some((amount, outpoint));
+ };
+
+ let next_added = if self.added_index < added_keys.len() {
+ let (amount, outpoint) = &added_keys[self.added_index];
+ Some((amount, outpoint))
+ } else {
+ None
+ };
+
+ match (next_base, next_added) {
+ (None, None) => None,
+ (Some((_amount, outpoint)), None) => {
+ self.base_index += 1;
+ Some(
+ self.view
+ .base_state
+ .utxos_by_outpoint
+ .get(outpoint)
+ .expect("utxo_keys_sorted_by_amount contains unknown outpoint"),
+ )
+ }
+ (None, Some((_amount, outpoint))) => {
+ self.added_index += 1;
+ Some(
+ self.view
+ .added_utxos
+ .get(outpoint)
+ .expect("added_keys_sorted_by_amount contains unknown outpoint"),
+ )
+ }
+ (Some((base_amount, base_outpoint)), Some((added_amount, added_outpoint))) => {
+ let base_key = (*base_amount, (*base_outpoint).clone());
+ let added_key = (*added_amount, (*added_outpoint).clone());
+ if base_key <= added_key {
+ self.base_index += 1;
+ Some(
+ self.view
+ .base_state
+ .utxos_by_outpoint
+ .get(base_outpoint)
+ .expect("utxo_keys_sorted_by_amount contains unknown outpoint"),
+ )
+ } else {
+ self.added_index += 1;
+ Some(
+ self.view
+ .added_utxos
+ .get(added_outpoint)
+ .expect("added_keys_sorted_by_amount contains unknown outpoint"),
+ )
+ }
+ }
+ }
+ }
+}
pub struct UtxoManager {
address_manager: Arc>,
coinbase_maturity: u64, // Is different in testnet
- utxos_sorted_by_amount: Vec,
- utxos_by_outpoint: HashMap,
- // Since the sync stage of address collection only picks up on addresses that have accepted
- // (non-mempool) balance, we might miss some mempool outputs that are not yet accepted.
- // To mitigate this we maintain a list of mempool transactions generated by this wallet
- // that should be accepted soon, but are not yet accepted by consensus.
- mempool_transactions: Vec,
+ // Consensus snapshot (already includes node mempool effects from refresh).
+ state: RwLock>,
+
+ // Wallet-generated, not-yet-accepted transactions. Applied as a lightweight overlay.
+ // Stored separately because cloning the whole UTXO set per mempool tx is not viable at scale.
+ mempool_transactions: Mutex>,
}
impl UtxoManager {
@@ -34,96 +260,55 @@ impl UtxoManager {
Self {
address_manager,
coinbase_maturity,
- utxos_sorted_by_amount: Vec::new(),
- utxos_by_outpoint: HashMap::new(),
- mempool_transactions: Vec::new(),
+ state: RwLock::new(Arc::new(UtxoState::new_empty())),
+ mempool_transactions: Mutex::new(Vec::new()),
}
}
- pub fn utxos_sorted_by_amount(&self) -> Vec {
- self.utxos_sorted_by_amount.clone()
- }
-
- pub fn utxos_by_outpoint(&self) -> HashMap {
- self.utxos_by_outpoint.clone()
+ #[cfg(any(test, feature = "bench"))]
+ pub fn new_for_bench(address_manager: Arc>) -> Self {
+ Self {
+ address_manager,
+ coinbase_maturity: 0,
+ state: RwLock::new(Arc::new(UtxoState::new_empty())),
+ mempool_transactions: Mutex::new(Vec::new()),
+ }
}
- pub async fn add_mempool_transaction(&mut self, transaction: &WalletSignableTransaction) {
- self.mempool_transactions.push(transaction.clone());
- self.apply_mempool_transaction(transaction).await;
+ pub async fn state(&self) -> Arc {
+ let guard = self.state.read().await;
+ Arc::clone(&*guard)
}
- async fn apply_mempool_transaction(&mut self, transaction: &WalletSignableTransaction) {
- let tx = &transaction.transaction.unwrap_ref().tx;
+ pub async fn state_with_mempool(&self) -> Result> {
+ let base_state = self.state().await;
- for input in &tx.inputs {
- let outpoint = input.previous_outpoint;
- self.remove_utxo(&outpoint.into());
- }
-
- for (i, output) in tx.outputs.iter().enumerate() {
- let address_string = transaction.address_by_output_index[i].to_string();
- let wallet_address = {
- let address_manager = self.address_manager.lock().await;
- address_manager
- .wallet_address_from_string(&address_string)
- .await
- };
- if wallet_address.is_none() {
- // this means payment is not to this wallet
- continue;
+ let mempool_txs = {
+ let guard = self.mempool_transactions.lock().await;
+ if guard.is_empty() {
+ return Ok(UtxoStateView::new(base_state));
}
- let wallet_address = wallet_address.unwrap();
- let outpoint = WalletOutpoint {
- transaction_id: tx.id(),
- index: i as u32,
- };
- let utxo = WalletUtxo::new(
- outpoint.clone(),
- WalletUtxoEntry {
- amount: output.value,
- script_public_key: output.script_public_key.clone(),
- block_daa_score: 0,
- is_coinbase: false,
- },
- wallet_address,
- );
- self.insert_utxo(outpoint, utxo);
- }
- }
-
- fn insert_utxo(&mut self, outpoint: WalletOutpoint, utxo: WalletUtxo) {
- self.utxos_by_outpoint.insert(outpoint, utxo.clone());
- let position = self
- .utxos_sorted_by_amount
- .binary_search_by(|existing_utxo| {
- existing_utxo.utxo_entry.amount.cmp(&utxo.utxo_entry.amount)
- });
- let position = position.unwrap_or_else(|position| position);
- self.utxos_sorted_by_amount.insert(position, utxo);
- }
+ guard.clone()
+ };
- fn contains_utxo(&self, outpoint: &WalletOutpoint) -> bool {
- self.utxos_by_outpoint.contains_key(outpoint)
- }
-
- fn remove_utxo(&mut self, outpoint: &WalletOutpoint) {
- self.utxos_by_outpoint.remove(outpoint).unwrap();
- let (position, _) = self
- .utxos_sorted_by_amount
- .iter()
- .find_position(|existing_utxo| existing_utxo.outpoint.eq(outpoint))
- .unwrap();
- self.utxos_sorted_by_amount.remove(position);
+ // Map Address -> WalletAddress using the cached map (no per-output string parsing).
+ let address_map: Arc> = {
+ let address_manager = self.address_manager.lock().await;
+ address_manager.monitored_address_map().await?
+ };
+
+ Ok(UtxoStateView::from_mempool_overlay(
+ base_state,
+ &mempool_txs,
+ &address_map,
+ ))
}
pub async fn update_utxo_set(
- &mut self,
+ &self,
rpc_utxo_entries: Vec,
rpc_mempool_utxo_entries: Vec,
) -> Result<(), Box> {
- let mut wallet_utxos: Vec = vec![];
-
let mut exclude: HashSet = HashSet::new();
for rpc_mempool_entries_by_address in &rpc_mempool_utxo_entries {
for sending_rpc_mempool_entry in &rpc_mempool_entries_by_address.sending {
@@ -133,27 +318,41 @@ impl UtxoManager {
}
}
- let address_set: AddressSet;
- {
+ let address_map: Arc> = {
let address_manager = self.address_manager.lock().await;
- address_set = address_manager.address_set().await;
- }
+ address_manager.monitored_address_map().await?
+ };
- for rpc_utxo_entry in &rpc_utxo_entries {
+ // Build new state without holding any UTXO locks.
+ let mut utxos_by_outpoint: HashMap =
+ HashMap::with_capacity(rpc_utxo_entries.len());
+ let mut utxo_keys_sorted_by_amount: Vec<(u64, WalletOutpoint)> =
+ Vec::with_capacity(rpc_utxo_entries.len());
+
+ for rpc_utxo_entry in rpc_utxo_entries {
let wallet_outpoint: WalletOutpoint = rpc_utxo_entry.outpoint.into();
if exclude.contains(&wallet_outpoint) {
continue;
}
- let wallet_utxo_entry: WalletUtxoEntry = rpc_utxo_entry.utxo_entry.clone().into();
+ let wallet_utxo_entry: WalletUtxoEntry = rpc_utxo_entry.utxo_entry.into();
+ let amount = wallet_utxo_entry.amount;
+
+ let Some(address) = rpc_utxo_entry.address else {
+ continue;
+ };
+ let wallet_address = address_map
+ .get(&address)
+ .ok_or_else(|| format!("UTXO address {} not found in wallet address_set", address))?
+ .clone();
- let address = address_set
- .get(&rpc_utxo_entry.address.as_ref().unwrap().to_string())
- .unwrap();
+ let wallet_utxo =
+ WalletUtxo::new(wallet_outpoint.clone(), wallet_utxo_entry, wallet_address);
- let wallet_utxo = WalletUtxo::new(wallet_outpoint, wallet_utxo_entry, address.clone());
+ let previous = utxos_by_outpoint.insert(wallet_outpoint.clone(), wallet_utxo);
+ debug_assert!(previous.is_none(), "UTXO outpoint inserted twice");
- wallet_utxos.push(wallet_utxo);
+ utxo_keys_sorted_by_amount.push((amount, wallet_outpoint));
}
for rpc_mempool_entry in rpc_mempool_utxo_entries {
@@ -166,18 +365,16 @@ impl UtxoManager {
let Some(output_verbose_data) = &output.verbose_data else {
panic!("output verbose data missing")
};
- let address_manager = self.address_manager.lock().await;
- let address = address_manager
- .wallet_address_from_string(
- &output_verbose_data
- .script_public_key_address
- .address_to_string(),
- )
- .await;
- if address.is_none() {
+ let address_string = output_verbose_data
+ .script_public_key_address
+ .address_to_string();
+ let address = Address::try_from(address_string.as_str()).map_err(|err| {
+ format!("invalid address in mempool output ({address_string}): {err}")
+ })?;
+ let Some(wallet_address) = address_map.get(&address) else {
// this means this output is not to this wallet
continue;
- }
+ };
let wallet_outpoint =
WalletOutpoint::new(transaction_verbose_data.transaction_id, i as u32);
@@ -192,53 +389,274 @@ impl UtxoManager {
false,
);
- let utxo = WalletUtxo::new(wallet_outpoint, utxo_entry, address.unwrap());
+ let utxo = WalletUtxo::new(
+ wallet_outpoint.clone(),
+ utxo_entry,
+ wallet_address.clone(),
+ );
- wallet_utxos.push(utxo);
+ let previous = utxos_by_outpoint.insert(wallet_outpoint.clone(), utxo);
+ debug_assert!(previous.is_none(), "mempool outpoint inserted twice");
+ utxo_keys_sorted_by_amount.push((output.value, wallet_outpoint));
}
}
}
- self.update_utxos_sorted_by_amount(wallet_utxos.clone());
- self.update_utxos_by_outpoint(wallet_utxos);
+ utxo_keys_sorted_by_amount.sort_unstable();
+ let new_state = Arc::new(UtxoState {
+ utxos_by_outpoint,
+ utxo_keys_sorted_by_amount,
+ });
- self.apply_mempool_transactions_after_update().await;
+ // Swap the Arc pointer under a brief write lock.
+ {
+ let mut guard = self.state.write().await;
+ *guard = new_state.clone();
+ }
+
+ self.prune_mempool_transactions_after_update(&new_state)
+ .await;
Ok(())
}
- async fn apply_mempool_transactions_after_update(&mut self) {
- let previous_mempool_transactions = std::mem::take(&mut self.mempool_transactions);
- self.mempool_transactions = vec![];
- 'outer: for transaction in &previous_mempool_transactions {
+ pub async fn add_mempool_transaction(&self, transaction: &WalletSignableTransaction) {
+ let mut mempool = self.mempool_transactions.lock().await;
+ mempool.push(transaction.clone());
+ }
+
+ async fn prune_mempool_transactions_after_update(&self, new_state: &UtxoState) {
+ let mut mempool = self.mempool_transactions.lock().await;
+ mempool.retain(|transaction| {
for input in transaction.transaction.unwrap_ref().tx.inputs.iter() {
let outpoint = input.previous_outpoint;
- if !self.contains_utxo(&outpoint.into()) {
- // this means this transaction was either accepted or double-spent
- continue 'outer;
+ if !new_state.utxos_by_outpoint.contains_key(&outpoint.into()) {
+ // Transaction is either accepted (now covered by RPC mempool snapshot) or double-spent.
+ return false;
}
}
- self.add_mempool_transaction(transaction).await;
+ true
+ });
+ }
+
+ pub fn is_utxo_pending(&self, utxo: &WalletUtxo, virtual_daa_score: u64) -> bool {
+ if !utxo.utxo_entry.is_coinbase {
+ return false;
}
+
+ utxo.utxo_entry.block_daa_score + self.coinbase_maturity > virtual_daa_score
}
+}
- fn update_utxos_sorted_by_amount(&mut self, mut wallet_utxos: Vec) {
- wallet_utxos.sort_by(|a, b| a.utxo_entry.amount.cmp(&b.utxo_entry.amount));
- self.utxos_sorted_by_amount = wallet_utxos;
+#[cfg(test)]
+mod tests {
+ use super::UtxoManager;
+ use crate::address_manager::AddressManager;
+ use common::keys::Keys;
+ use common::model::{Keychain, WalletAddress, WalletOutpoint, WalletSignableTransaction};
+ use kaspa_addresses::{Address, Prefix, Version};
+ use kaspa_bip32::Prefix as XPubPrefix;
+ use kaspa_consensus_core::tx::{
+ ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint,
+ TransactionOutput, UtxoEntry,
+ };
+ use kaspa_hashes::Hash;
+ use kaspa_rpc_core::{RpcTransactionOutpoint, RpcUtxoEntry, RpcUtxosByAddressesEntry};
+ use std::collections::HashSet;
+ use std::sync::Arc;
+ use tokio::sync::Mutex;
+
+ fn txid(i: u32) -> Hash {
+ let mut bytes = [0u8; 32];
+ bytes[..4].copy_from_slice(&i.to_le_bytes());
+ Hash::from_bytes(bytes)
}
- fn update_utxos_by_outpoint(&mut self, wallet_utxos: Vec) {
- self.utxos_by_outpoint.clear();
- for wallet_utxo in wallet_utxos {
- self.utxos_by_outpoint
- .insert(wallet_utxo.outpoint.clone(), wallet_utxo);
+ fn make_address(prefix: Prefix, i: u32) -> Address {
+ let mut payload = [0u8; 32];
+ payload[..4].copy_from_slice(&i.to_le_bytes());
+ Address::new(prefix, Version::PubKey, &payload)
+ }
+
+ fn make_outpoint(i: u32) -> RpcTransactionOutpoint {
+ RpcTransactionOutpoint {
+ transaction_id: txid(i),
+ index: i,
}
}
- pub fn is_utxo_pending(&self, utxo: &WalletUtxo, virtual_daa_score: u64) -> bool {
- if !utxo.utxo_entry.is_coinbase {
- return false;
+ fn make_rpc_utxo_entry(amount: u64) -> RpcUtxoEntry {
+ RpcUtxoEntry::new(amount, ScriptPublicKey::from_vec(0, vec![]), 0, false)
+ }
+
+ fn make_rpc_utxo(i: u32, address: Address) -> RpcUtxosByAddressesEntry {
+ let amount = ((i % 10_000) + 1) as u64;
+ RpcUtxosByAddressesEntry {
+ address: Some(address),
+ outpoint: make_outpoint(i),
+ utxo_entry: make_rpc_utxo_entry(amount),
}
+ }
- utxo.utxo_entry.block_daa_score + self.coinbase_maturity > virtual_daa_score
+ #[tokio::test]
+ async fn update_utxo_set_produces_sorted_index() {
+ let keys = Arc::new(Keys::new(
+ "unused".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+ let address_manager = AddressManager::new(keys, Prefix::Mainnet);
+ let address = make_address(Prefix::Mainnet, 1);
+ let wa = WalletAddress::new(1, 0, Keychain::External);
+ address_manager
+ .insert_address_for_bench(address.clone(), wa)
+ .await;
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ let utxo_manager = UtxoManager::new_for_bench(address_manager);
+
+ let entries = vec![
+ make_rpc_utxo(1, address.clone()), // amount 2
+ make_rpc_utxo(2, address.clone()), // amount 3
+ make_rpc_utxo(10_000, address.clone()), // amount 1
+ ];
+
+ utxo_manager.update_utxo_set(entries, vec![]).await.unwrap();
+
+ let state = utxo_manager.state().await;
+ let amounts: Vec = state
+ .utxos_sorted_by_amount()
+ .map(|utxo| utxo.utxo_entry.amount)
+ .collect();
+ assert_eq!(amounts, vec![1, 2, 3]);
+ }
+
+ #[tokio::test]
+ async fn state_snapshots_remain_valid_after_update() {
+ let keys = Arc::new(Keys::new(
+ "unused".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+ let address_manager = AddressManager::new(keys, Prefix::Mainnet);
+ let address = make_address(Prefix::Mainnet, 1);
+ let wa = WalletAddress::new(1, 0, Keychain::External);
+ address_manager
+ .insert_address_for_bench(address.clone(), wa)
+ .await;
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ let utxo_manager = UtxoManager::new_for_bench(address_manager);
+
+ utxo_manager
+ .update_utxo_set(vec![make_rpc_utxo(1, address.clone())], vec![])
+ .await
+ .unwrap();
+ let old_state = utxo_manager.state().await;
+ assert_eq!(old_state.utxo_count(), 1);
+
+ utxo_manager
+ .update_utxo_set(
+ vec![
+ make_rpc_utxo(1, address.clone()),
+ make_rpc_utxo(2, address.clone()),
+ ],
+ vec![],
+ )
+ .await
+ .unwrap();
+ let new_state = utxo_manager.state().await;
+ assert_eq!(new_state.utxo_count(), 2);
+
+ // Old snapshot remains valid and unchanged.
+ assert_eq!(old_state.utxo_count(), 1);
+ }
+
+ #[tokio::test]
+ async fn state_with_mempool_overlays_wallet_transactions() {
+ let keys = Arc::new(Keys::new(
+ "unused".to_string(),
+ 1,
+ vec![],
+ XPubPrefix::XPUB,
+ vec![],
+ 0,
+ 0,
+ 1,
+ 0,
+ ));
+ let address_manager = AddressManager::new(keys, Prefix::Mainnet);
+ let address = make_address(Prefix::Mainnet, 1);
+ let wa = WalletAddress::new(1, 0, Keychain::External);
+ address_manager
+ .insert_address_for_bench(address.clone(), wa.clone())
+ .await;
+
+ let address_manager = Arc::new(Mutex::new(address_manager));
+ let utxo_manager = UtxoManager::new_for_bench(address_manager);
+
+ // Base state: one confirmed UTXO.
+ utxo_manager
+ .update_utxo_set(vec![make_rpc_utxo(1, address.clone())], vec![])
+ .await
+ .unwrap();
+ let base_state = utxo_manager.state().await;
+ let base_outpoint: WalletOutpoint = make_outpoint(1).into();
+ let base_utxo = base_state
+ .get_utxo_by_outpoint(&base_outpoint)
+ .expect("base utxo missing");
+
+ // Local wallet tx spends that UTXO and creates one output to our address.
+ let input = TransactionInput::new(
+ TransactionOutpoint::new(base_outpoint.transaction_id, base_outpoint.index),
+ vec![],
+ 0,
+ 1,
+ );
+ let output = TransactionOutput::new(1, ScriptPublicKey::from_vec(0, vec![]));
+ let tx = Transaction::new(
+ 0,
+ vec![input],
+ vec![output],
+ 0,
+ Default::default(),
+ 0,
+ vec![],
+ );
+ let input_entry: UtxoEntry = base_utxo.utxo_entry.clone().into();
+ let signable = SignableTransaction::with_entries(tx, vec![input_entry]);
+ let wallet_tx = WalletSignableTransaction::new_from_unsigned(
+ signable,
+ HashSet::new(),
+ vec![wa],
+ vec![address.clone()],
+ );
+
+ utxo_manager.add_mempool_transaction(&wallet_tx).await;
+
+ let view = utxo_manager.state_with_mempool().await.unwrap();
+
+ // View hides the spent outpoint but base snapshot remains unchanged.
+ assert!(view.get_utxo_by_outpoint(&base_outpoint).is_none());
+ assert!(base_state.get_utxo_by_outpoint(&base_outpoint).is_some());
+
+ // View includes the newly created outpoint.
+ let created_outpoint = WalletOutpoint {
+ transaction_id: wallet_tx.transaction.unwrap_ref().tx.id(),
+ index: 0,
+ };
+ assert!(view.get_utxo_by_outpoint(&created_outpoint).is_some());
+ assert!(base_state.get_utxo_by_outpoint(&created_outpoint).is_none());
}
}