From 9733616ee914795708338c835aa3568f636c25d3 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Thu, 20 Nov 2025 17:24:41 +0100 Subject: [PATCH 01/10] Migrating to workspace structure Move Updated to core Remove dummy implementation of get_input_hashes Remove dead code Heavy refactoring --- Cargo.toml | 28 +- backend-blindbit-native/Cargo.toml | 45 + backend-blindbit-native/src/backend.rs | 86 ++ backend-blindbit-native/src/backend_async.rs | 126 +++ .../src}/client/client.rs | 107 ++- .../src/client/http_trait.rs | 63 ++ backend-blindbit-native/src/client/mod.rs | 11 + .../src}/client/structs.rs | 2 +- .../src/client/ureq_impl.rs | 92 +++ backend-blindbit-native/src/lib.rs | 25 + spdk-core/Cargo.toml | 39 + spdk-core/src/backend/backend.rs | 33 + spdk-core/src/backend/backend_async.rs | 64 ++ spdk-core/src/backend/mod.rs | 10 + {src => spdk-core/src}/client/client.rs | 39 +- {src => spdk-core/src}/client/mod.rs | 0 {src => spdk-core/src}/client/spend.rs | 0 {src => spdk-core/src}/client/structs.rs | 0 {src => spdk-core/src}/constants.rs | 0 spdk-core/src/lib.rs | 26 + spdk-core/src/scanner/mod.rs | 779 ++++++++++++++++++ .../structs.rs => spdk-core/src/types.rs | 1 + spdk-core/src/updater/mod.rs | 5 + spdk-core/src/updater/updater.rs | 86 ++ src/backend/backend.rs | 24 - src/backend/blindbit/backend/backend.rs | 84 -- src/backend/blindbit/backend/mod.rs | 3 - src/backend/blindbit/client/mod.rs | 4 - src/backend/blindbit/mod.rs | 5 - src/backend/mod.rs | 13 - src/lib.rs | 14 - src/scanner/mod.rs | 3 - src/scanner/scanner.rs | 380 --------- src/updater/mod.rs | 3 - src/updater/updater.rs | 31 - 35 files changed, 1592 insertions(+), 639 deletions(-) create mode 100644 backend-blindbit-native/Cargo.toml create mode 100644 backend-blindbit-native/src/backend.rs create mode 100644 backend-blindbit-native/src/backend_async.rs rename {src/backend/blindbit => backend-blindbit-native/src}/client/client.rs (51%) create mode 100644 backend-blindbit-native/src/client/http_trait.rs create mode 100644 backend-blindbit-native/src/client/mod.rs rename {src/backend/blindbit => backend-blindbit-native/src}/client/structs.rs (97%) create mode 100644 backend-blindbit-native/src/client/ureq_impl.rs create mode 100644 backend-blindbit-native/src/lib.rs create mode 100644 spdk-core/Cargo.toml create mode 100644 spdk-core/src/backend/backend.rs create mode 100644 spdk-core/src/backend/backend_async.rs create mode 100644 spdk-core/src/backend/mod.rs rename {src => spdk-core/src}/client/client.rs (75%) rename {src => spdk-core/src}/client/mod.rs (100%) rename {src => spdk-core/src}/client/spend.rs (100%) rename {src => spdk-core/src}/client/structs.rs (100%) rename {src => spdk-core/src}/constants.rs (100%) create mode 100644 spdk-core/src/lib.rs create mode 100644 spdk-core/src/scanner/mod.rs rename src/backend/structs.rs => spdk-core/src/types.rs (97%) create mode 100644 spdk-core/src/updater/mod.rs create mode 100644 spdk-core/src/updater/updater.rs delete mode 100644 src/backend/backend.rs delete mode 100644 src/backend/blindbit/backend/backend.rs delete mode 100644 src/backend/blindbit/backend/mod.rs delete mode 100644 src/backend/blindbit/client/mod.rs delete mode 100644 src/backend/blindbit/mod.rs delete mode 100644 src/backend/mod.rs delete mode 100644 src/lib.rs delete mode 100644 src/scanner/mod.rs delete mode 100644 src/scanner/scanner.rs delete mode 100644 src/updater/mod.rs delete mode 100644 src/updater/updater.rs diff --git a/Cargo.toml b/Cargo.toml index 757a43a..390df4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,15 @@ -[package] -name = "spdk" -version = "0.1.0" -edition = "2021" +[workspace] +members = [ + "spdk-core", + "backend-blindbit-native", +] +resolver = "2" -[lib] -name = "spdk" -crate-type = ["lib", "staticlib", "cdylib"] +[workspace.dependencies] +# Internal workspace crates +spdk-core = { path = "spdk-core", version = "0.1.0" } -[dependencies] +# Core dependencies - shared across crates silentpayments = "0.4" anyhow = "1.0" serde = { version = "1.0.188", features = ["derive"] } @@ -15,11 +17,11 @@ serde_json = "1.0.107" bitcoin = { version = "0.31.1", features = ["serde", "rand", "base64"] } rayon = "1.10.0" futures = "0.3" -log = "0.4" +futures-util = "0.3.31" async-trait = "0.1" -reqwest = { version = "0.12.4", features = ["rustls-tls", "gzip", "json"], default-features = false, optional = true } -hex = { version = "0.4.3", features = ["serde"], optional = true } +log = "0.4" +hex = { version = "0.4.3", features = ["serde"] } bdk_coin_select = "0.4.0" -[features] -blindbit-backend = ["reqwest", "hex"] +[workspace.package] +repository = "https://github.com/cygnet3/spdk" diff --git a/backend-blindbit-native/Cargo.toml b/backend-blindbit-native/Cargo.toml new file mode 100644 index 0000000..01ec857 --- /dev/null +++ b/backend-blindbit-native/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "backend-blindbit-native" +version = "0.1.0" +edition = "2021" +repository.workspace = true + +[dependencies] +# Core client dependency +spdk-core.workspace = true + +# URL parsing (minimal: ~20KB) +url = "2.5" + +# Minimal HTTP client (optional, ~200KB + TLS adds ~600KB) +# TLS support via rustls is required for HTTPS connections to blindbit +ureq = { version = "2.10", default-features = false, features = ["json", "tls"], optional = true } + +# Logging +log.workspace = true + +# Re-export some core types for convenience +bitcoin.workspace = true +anyhow.workspace = true +silentpayments.workspace = true +serde.workspace = true +serde_json.workspace = true +hex.workspace = true + +# Async dependencies (optional) +futures = { workspace = true, optional = true } +futures-util = { workspace = true, optional = true } +async-trait = { workspace = true, optional = true } + +[features] +# Default: both sync and async APIs available +default = ["async"] + +# Enable async APIs +async = ["spdk-core/async", "dep:futures", "dep:futures-util", "dep:async-trait"] + +# Sync-only mode - excludes async code (backwards compatible) +sync = [] + +# Minimal HTTP client implementation using ureq +ureq-client = ["ureq"] diff --git a/backend-blindbit-native/src/backend.rs b/backend-blindbit-native/src/backend.rs new file mode 100644 index 0000000..acbd304 --- /dev/null +++ b/backend-blindbit-native/src/backend.rs @@ -0,0 +1,86 @@ +use std::ops::RangeInclusive; + +use bitcoin::{absolute::Height, Amount}; +use futures::executor::block_on; + +use anyhow::Result; + +use crate::client::{BlindbitClient, HttpClient}; +use spdk_core::{BlockData, BlockDataIterator, ChainBackend, SpentIndexData, UtxoData}; + +/// Synchronous Blindbit backend implementation, generic over the HTTP client. +/// +/// This uses `block_on` to convert async HTTP calls to synchronous operations. +/// For better performance, consider using `AsyncBlindbitBackend` with the `async` feature. +/// +/// Consumers must provide their own HTTP client implementation by implementing the `HttpClient` trait. +pub struct BlindbitBackend { + client: BlindbitClient, +} + +impl BlindbitBackend { + /// Create a new synchronous Blindbit backend with a custom HTTP client. + /// + /// # Arguments + /// * `blindbit_url` - Base URL of the Blindbit server + /// * `http_client` - HTTP client implementation + pub fn new(blindbit_url: String, http_client: H) -> Result { + Ok(Self { + client: BlindbitClient::new(blindbit_url, http_client)?, + }) + } +} + +impl ChainBackend for BlindbitBackend { + /// High-level function to get block data for a range of blocks. + /// Block data includes all the information needed to determine if a block is relevant for scanning, + /// but does not include utxos, or spent index. + /// These need to be fetched separately afterwards, if it is determined this block is relevant. + fn get_block_data_for_range( + &self, + range: RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> BlockDataIterator { + let client = self.client.clone(); + + // Convert range to iterator that fetches block data synchronously + let iter = range.map(move |n| { + let client = client.clone(); + block_on(async move { + let blkheight = Height::from_consensus(n)?; + let tweaks = match with_cutthrough { + true => client.tweaks(blkheight, dust_limit).await?, + false => client.tweak_index(blkheight, dust_limit).await?, + }; + let new_utxo_filter = client.filter_new_utxos(blkheight).await?; + let spent_filter = client.filter_spent(blkheight).await?; + let blkhash = new_utxo_filter.block_hash; + Ok(BlockData { + blkheight, + blkhash, + tweaks, + new_utxo_filter: new_utxo_filter.into(), + spent_filter: spent_filter.into(), + }) + }) + }); + + Box::new(iter) + } + + fn spent_index(&self, block_height: Height) -> Result { + block_on(self.client.spent_index(block_height)).map(Into::into) + } + + fn utxos(&self, block_height: Height) -> Result> { + Ok(block_on(self.client.utxos(block_height))? + .into_iter() + .map(Into::into) + .collect()) + } + + fn block_height(&self) -> Result { + block_on(self.client.block_height()) + } +} diff --git a/backend-blindbit-native/src/backend_async.rs b/backend-blindbit-native/src/backend_async.rs new file mode 100644 index 0000000..756753c --- /dev/null +++ b/backend-blindbit-native/src/backend_async.rs @@ -0,0 +1,126 @@ +use std::{ops::RangeInclusive, pin::Pin, sync::Arc}; + +use bitcoin::{absolute::Height, Amount}; +use futures::{stream, Stream, StreamExt}; + +use anyhow::Result; + +use crate::client::{BlindbitClient, HttpClient}; +use spdk_core::{AsyncChainBackend, BlockData, BlockDataStream, SpentIndexData, UtxoData}; + +const CONCURRENT_FILTER_REQUESTS: usize = 200; + +/// Asynchronous Blindbit backend implementation, generic over the HTTP client. +/// +/// This provides high-performance async methods with concurrent request handling. +/// Enable with the `async` feature flag. +/// +/// Consumers must provide their own HTTP client implementation by implementing the `HttpClient` trait. +pub struct AsyncBlindbitBackend { + client: BlindbitClient, +} + +impl AsyncBlindbitBackend { + /// Create a new async Blindbit backend with a custom HTTP client. + /// + /// # Arguments + /// * `blindbit_url` - Base URL of the Blindbit server + /// * `http_client` - HTTP client implementation + pub fn new(blindbit_url: String, http_client: H) -> Result { + Ok(Self { + client: BlindbitClient::new(blindbit_url, http_client)?, + }) + } + + /// Get block data for a range of blocks as a Stream (async iterator). + /// + /// This fetches blocks concurrently for better performance. + /// + /// # Arguments + /// * `range` - Range of block heights to fetch + /// * `dust_limit` - Minimum amount to consider (dust outputs are ignored) + /// * `with_cutthrough` - Whether to use cutthrough optimization + /// + /// # Returns + /// A Stream of BlockData results + pub fn get_block_data_stream( + &self, + range: RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Pin> + Send + 'static>> { + let client = Arc::new(self.client.clone()); + + let res = stream::iter(range) + .map(move |n| { + let client = client.clone(); + + async move { + let blkheight = Height::from_consensus(n)?; + let tweaks = match with_cutthrough { + true => client.tweaks(blkheight, dust_limit).await?, + false => client.tweak_index(blkheight, dust_limit).await?, + }; + let new_utxo_filter = client.filter_new_utxos(blkheight).await?; + let spent_filter = client.filter_spent(blkheight).await?; + let blkhash = new_utxo_filter.block_hash; + Ok(BlockData { + blkheight, + blkhash, + tweaks, + new_utxo_filter: new_utxo_filter.into(), + spent_filter: spent_filter.into(), + }) + } + }) + .buffered(CONCURRENT_FILTER_REQUESTS); + + Box::pin(res) + } + + /// Get spent index data for a block height + pub async fn spent_index(&self, block_height: Height) -> Result { + self.client.spent_index(block_height).await.map(Into::into) + } + + /// Get UTXO data for a block height + pub async fn utxos(&self, block_height: Height) -> Result> { + Ok(self + .client + .utxos(block_height) + .await? + .into_iter() + .map(Into::into) + .collect()) + } + + /// Get the current block height from the server + pub async fn block_height(&self) -> Result { + self.client.block_height().await + } +} + +// Implement the AsyncChainBackend trait for AsyncBlindbitBackend +#[async_trait::async_trait] +impl AsyncChainBackend for AsyncBlindbitBackend { + fn get_block_data_stream( + &self, + range: RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> BlockDataStream { + self.get_block_data_stream(range, dust_limit, with_cutthrough) + } + + async fn spent_index(&self, block_height: Height) -> Result { + self.spent_index(block_height).await + } + + async fn utxos(&self, block_height: Height) -> Result> { + self.utxos(block_height).await + } + + async fn block_height(&self) -> Result { + self.block_height().await + } +} diff --git a/src/backend/blindbit/client/client.rs b/backend-blindbit-native/src/client/client.rs similarity index 51% rename from src/backend/blindbit/client/client.rs rename to backend-blindbit-native/src/client/client.rs index 8b1e730..7b3371b 100644 --- a/src/backend/blindbit/client/client.rs +++ b/backend-blindbit-native/src/client/client.rs @@ -1,58 +1,62 @@ -use std::time::Duration; - use bitcoin::{absolute::Height, secp256k1::PublicKey, Amount, Txid}; -use reqwest::{Client, Url}; +use url::Url; use anyhow::Result; -use crate::backend::blindbit::client::structs::InfoResponse; +use crate::client::structs::InfoResponse; +use super::http_trait::HttpClient; use super::structs::{ BlockHeightResponse, FilterResponse, ForwardTxRequest, SpentIndexResponse, UtxoResponse, }; -#[derive(Clone, Debug)] -pub struct BlindbitClient { - client: Client, +/// Client for interacting with a Blindbit server. +/// +/// Generic over the HTTP client implementation, allowing consumers to provide +/// their own HTTP client by implementing the `HttpClient` trait. +#[derive(Clone)] +pub struct BlindbitClient { + http_client: H, host_url: Url, } -impl BlindbitClient { - pub fn new(host_url: String) -> Result { +impl BlindbitClient { + /// Create a new Blindbit client with a custom HTTP client implementation. + /// + /// # Arguments + /// * `host_url` - Base URL of the Blindbit server + /// * `http_client` - HTTP client implementation + pub fn new(host_url: String, http_client: H) -> Result { let mut host_url = Url::parse(&host_url)?; - let client = reqwest::Client::new(); // we need a trailing slash, if not present we append it if !host_url.path().ends_with('/') { host_url.set_path(&format!("{}/", host_url.path())); } - Ok(BlindbitClient { client, host_url }) + Ok(BlindbitClient { + http_client, + host_url, + }) } pub async fn block_height(&self) -> Result { let url = self.host_url.join("block-height")?; - - let res = self - .client - .get(url) - .timeout(Duration::from_secs(5)) - .send() - .await?; - let blkheight: BlockHeightResponse = serde_json::from_str(&res.text().await?)?; + let body = self.http_client.get(url.as_str(), &[]).await?; + let blkheight: BlockHeightResponse = serde_json::from_str(&body)?; Ok(blkheight.block_height) } pub async fn tweaks(&self, block_height: Height, dust_limit: Amount) -> Result> { let url = self.host_url.join(&format!("tweaks/{}", block_height))?; - - let res = self - .client - .get(url) - .query(&[("dustLimit", format!("{}", dust_limit.to_sat()))]) - .send() + let body = self + .http_client + .get( + url.as_str(), + &[("dustLimit", dust_limit.to_sat().to_string())], + ) .await?; - Ok(serde_json::from_str(&res.text().await?)?) + Ok(serde_json::from_str(&body)?) } pub async fn tweak_index( @@ -63,66 +67,57 @@ impl BlindbitClient { let url = self .host_url .join(&format!("tweak-index/{}", block_height))?; - - let res = self - .client - .get(url) - .query(&[("dustLimit", format!("{}", dust_limit.to_sat()))]) - .send() + let body = self + .http_client + .get( + url.as_str(), + &[("dustLimit", dust_limit.to_sat().to_string())], + ) .await?; - Ok(serde_json::from_str(&res.text().await?)?) + Ok(serde_json::from_str(&body)?) } pub async fn utxos(&self, block_height: Height) -> Result> { let url = self.host_url.join(&format!("utxos/{}", block_height))?; - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let body = self.http_client.get(url.as_str(), &[]).await?; + Ok(serde_json::from_str(&body)?) } pub async fn spent_index(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("spent-index/{}", block_height))?; - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let body = self.http_client.get(url.as_str(), &[]).await?; + Ok(serde_json::from_str(&body)?) } pub async fn filter_new_utxos(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("filter/new-utxos/{}", block_height))?; - - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let body = self.http_client.get(url.as_str(), &[]).await?; + Ok(serde_json::from_str(&body)?) } pub async fn filter_spent(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("filter/spent/{}", block_height))?; - - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let body = self.http_client.get(url.as_str(), &[]).await?; + Ok(serde_json::from_str(&body)?) } pub async fn forward_tx(&self, tx_hex: String) -> Result { let url = self.host_url.join("forward-tx")?; - - let body = ForwardTxRequest::new(tx_hex); - - let res = self.client.post(url).json(&body).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let request = ForwardTxRequest::new(tx_hex); + let json_body = serde_json::to_string(&request)?; + let body = self.http_client.post_json(url.as_str(), &json_body).await?; + Ok(serde_json::from_str(&body)?) } pub async fn info(&self) -> Result { let url = self.host_url.join("info")?; - - let res = self.client.get(url).send().await?; - Ok(serde_json::from_str(&res.text().await?)?) + let body = self.http_client.get(url.as_str(), &[]).await?; + Ok(serde_json::from_str(&body)?) } } diff --git a/backend-blindbit-native/src/client/http_trait.rs b/backend-blindbit-native/src/client/http_trait.rs new file mode 100644 index 0000000..b3af50d --- /dev/null +++ b/backend-blindbit-native/src/client/http_trait.rs @@ -0,0 +1,63 @@ +use anyhow::Result; +use async_trait::async_trait; + +/// Minimal async HTTP client trait that can be implemented with any HTTP library. +/// +/// This allows consumers to bring their own HTTP client implementation. +/// You can use any HTTP library you prefer: hyper, isahc, surf, ureq, +/// platform-specific APIs (NSURLSession, fetch, etc.), or any other HTTP client. +/// +/// The trait is async because HTTP I/O is inherently async and the library +/// benefits from concurrent requests (e.g., 200 parallel block fetches). +/// +/// # Implementing the trait +/// +/// Simply implement the two methods with your preferred HTTP library: +/// +/// ```ignore +/// use anyhow::Result; +/// use async_trait::async_trait; +/// use backend_blindbit_native::HttpClient; +/// +/// #[derive(Clone)] +/// struct MyHttpClient { +/// // Your HTTP client here +/// } +/// +/// #[async_trait] +/// impl HttpClient for MyHttpClient { +/// async fn get(&self, url: &str, query_params: &[(&str, String)]) -> Result { +/// // Implement GET request with your HTTP library +/// // Build URL with query params and return response body +/// Ok("response".to_string()) +/// } +/// +/// async fn post_json(&self, url: &str, json_body: &str) -> Result { +/// // Implement POST request with your HTTP library +/// // Send JSON body and return response body +/// Ok("response".to_string()) +/// } +/// } +/// ``` +#[async_trait] +pub trait HttpClient: Send + Sync + Clone { + /// Perform a GET request with optional query parameters. + /// + /// # Arguments + /// * `url` - The full URL to request + /// * `query_params` - Optional query parameters as key-value pairs + /// + /// # Returns + /// The response body as a string + async fn get(&self, url: &str, query_params: &[(&str, String)]) -> Result; + + /// Perform a POST request with a JSON body. + /// + /// # Arguments + /// * `url` - The full URL to request + /// * `json_body` - The JSON body as a string + /// + /// # Returns + /// The response body as a string + async fn post_json(&self, url: &str, json_body: &str) -> Result; +} diff --git a/backend-blindbit-native/src/client/mod.rs b/backend-blindbit-native/src/client/mod.rs new file mode 100644 index 0000000..df6352c --- /dev/null +++ b/backend-blindbit-native/src/client/mod.rs @@ -0,0 +1,11 @@ +mod client; +mod http_trait; +pub mod structs; +#[cfg(feature = "ureq-client")] +mod ureq_impl; + +pub use client::BlindbitClient; +pub use http_trait::HttpClient; + +#[cfg(feature = "ureq-client")] +pub use ureq_impl::UreqClient; diff --git a/src/backend/blindbit/client/structs.rs b/backend-blindbit-native/src/client/structs.rs similarity index 97% rename from src/backend/blindbit/client/structs.rs rename to backend-blindbit-native/src/client/structs.rs index e4c3f13..5bfaf58 100644 --- a/src/backend/blindbit/client/structs.rs +++ b/backend-blindbit-native/src/client/structs.rs @@ -2,7 +2,7 @@ use bitcoin::{absolute::Height, Amount, BlockHash, Network, ScriptBuf, Txid}; use serde::{Deserialize, Deserializer, Serialize}; -use crate::{FilterData, SpentIndexData, UtxoData}; +use spdk_core::{FilterData, SpentIndexData, UtxoData}; #[derive(Debug, Deserialize)] pub struct BlockHeightResponse { diff --git a/backend-blindbit-native/src/client/ureq_impl.rs b/backend-blindbit-native/src/client/ureq_impl.rs new file mode 100644 index 0000000..b238760 --- /dev/null +++ b/backend-blindbit-native/src/client/ureq_impl.rs @@ -0,0 +1,92 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; + +use super::http_trait::HttpClient; + +/// Minimal HTTP client implementation using ureq. +/// +/// This is a lightweight, blocking HTTP client that's perfect for basic needs. +/// It uses about ~200KB of binary size and has minimal dependencies. +/// +/// # Example +/// +/// ```ignore +/// use backend_blindbit_native::{UreqClient, BlindbitBackend}; +/// +/// let http_client = UreqClient::new(); +/// let backend = BlindbitBackend::new("https://blindbit.io".to_string(), http_client)?; +/// ``` +#[derive(Clone)] +pub struct UreqClient { + agent: ureq::Agent, +} + +impl UreqClient { + /// Create a new ureq HTTP client with default settings. + pub fn new() -> Self { + Self { + agent: ureq::AgentBuilder::new() + .timeout(std::time::Duration::from_secs(30)) + .build(), + } + } + + /// Create a new ureq HTTP client with a custom timeout. + pub fn with_timeout(timeout_secs: u64) -> Self { + Self { + agent: ureq::AgentBuilder::new() + .timeout(std::time::Duration::from_secs(timeout_secs)) + .build(), + } + } +} + +impl Default for UreqClient { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl HttpClient for UreqClient { + async fn get(&self, url: &str, query_params: &[(&str, String)]) -> Result { + // Build URL with query parameters + let mut full_url = url.to_string(); + if !query_params.is_empty() { + full_url.push('?'); + for (i, (key, value)) in query_params.iter().enumerate() { + if i > 0 { + full_url.push('&'); + } + full_url.push_str(key); + full_url.push('='); + full_url.push_str(value); + } + } + + // Perform blocking request (wrapped in async for trait compatibility) + let response = self + .agent + .get(&full_url) + .call() + .map_err(|e| anyhow!("HTTP GET request failed: {}", e))? + .into_string() + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + Ok(response) + } + + async fn post_json(&self, url: &str, json_body: &str) -> Result { + // Perform blocking request (wrapped in async for trait compatibility) + let response = self + .agent + .post(url) + .set("Content-Type", "application/json") + .send_string(json_body) + .map_err(|e| anyhow!("HTTP POST request failed: {}", e))? + .into_string() + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + Ok(response) + } +} diff --git a/backend-blindbit-native/src/lib.rs b/backend-blindbit-native/src/lib.rs new file mode 100644 index 0000000..b218148 --- /dev/null +++ b/backend-blindbit-native/src/lib.rs @@ -0,0 +1,25 @@ +mod backend; + +// Async backend - available with "async" feature +#[cfg(feature = "async")] +mod backend_async; + +mod client; + +// Re-export backend functionality +pub use backend::BlindbitBackend; + +#[cfg(feature = "async")] +pub use backend_async::AsyncBlindbitBackend; + +pub use client::{BlindbitClient, HttpClient}; + +#[cfg(feature = "ureq-client")] +pub use client::UreqClient; + +#[cfg(feature = "async")] +pub use async_trait; +#[cfg(feature = "async")] +pub use futures; +#[cfg(feature = "async")] +pub use futures_util; diff --git a/spdk-core/Cargo.toml b/spdk-core/Cargo.toml new file mode 100644 index 0000000..8118a7e --- /dev/null +++ b/spdk-core/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "spdk-core" +version = "0.1.0" +edition = "2021" +repository.workspace = true + +[lib] +crate-type = ["rlib"] + +[features] +# Default: both sync and async APIs available, with parallelization +default = ["parallel", "async"] + +# Sync-only mode - excludes async code and async dependencies +sync = [] + +# Enable async APIs (included by default, excluded with sync) +async = ["dep:futures", "dep:async-trait"] + +# Enable CPU parallelization with rayon (native only, recommended) +parallel = ["dep:rayon"] + +[dependencies] +# Core dependencies - always available +silentpayments.workspace = true +anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true +bitcoin.workspace = true +hex.workspace = true +bdk_coin_select.workspace = true + +# Async dependencies - optional +futures = { workspace = true, optional = true } +async-trait = { workspace = true, optional = true } + +# Parallelization (not available on WASM) +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +rayon = { workspace = true, optional = true } diff --git a/spdk-core/src/backend/backend.rs b/spdk-core/src/backend/backend.rs new file mode 100644 index 0000000..f23b55c --- /dev/null +++ b/spdk-core/src/backend/backend.rs @@ -0,0 +1,33 @@ +use std::ops::RangeInclusive; + +use anyhow::Result; +use bitcoin::{absolute::Height, Amount}; + +use crate::{BlockData, SpentIndexData, UtxoData}; + +/// Iterator type for block data that conditionally includes `Send` bound. +/// +/// - For native targets: includes `Send` bound for thread safety +/// - For WASM targets: omits `Send` since WASM is single-threaded +// For native targets, we require Send +#[cfg(not(target_arch = "wasm32"))] +pub type BlockDataIterator = Box> + Send>; + +// For WASM targets, we don't require Send +#[cfg(target_arch = "wasm32")] +pub type BlockDataIterator = Box>>; + +pub trait ChainBackend { + fn get_block_data_for_range( + &self, + range: RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> BlockDataIterator; + + fn spent_index(&self, block_height: Height) -> Result; + + fn utxos(&self, block_height: Height) -> Result>; + + fn block_height(&self) -> Result; +} diff --git a/spdk-core/src/backend/backend_async.rs b/spdk-core/src/backend/backend_async.rs new file mode 100644 index 0000000..55ef4f0 --- /dev/null +++ b/spdk-core/src/backend/backend_async.rs @@ -0,0 +1,64 @@ +use std::ops::RangeInclusive; +use std::pin::Pin; + +use anyhow::Result; +use bitcoin::{absolute::Height, Amount}; +use futures::Stream; + +use crate::{BlockData, SpentIndexData, UtxoData}; + +/// Async stream type for block data that conditionally includes `Send` bound. +/// +/// - For native targets: includes `Send` bound for thread safety +/// - For WASM targets: omits `Send` since WASM is single-threaded +// For native targets, we require Send +#[cfg(not(target_arch = "wasm32"))] +pub type BlockDataStream = Pin> + Send>>; + +// For WASM targets, we don't require Send +#[cfg(target_arch = "wasm32")] +pub type BlockDataStream = Pin>>>; + +/// Async version of ChainBackend for non-blocking I/O operations +#[async_trait::async_trait] +pub trait AsyncChainBackend: Send + Sync { + /// Get a stream of block data for a range of blocks + /// + /// # Arguments + /// * `range` - Range of block heights to fetch + /// * `dust_limit` - Minimum amount to consider (dust outputs are ignored) + /// * `with_cutthrough` - Whether to use cutthrough optimization + /// + /// # Returns + /// * Stream of block data results + fn get_block_data_stream( + &self, + range: RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> BlockDataStream; + + /// Get spent index data for a specific block height + /// + /// # Arguments + /// * `block_height` - Block height to query + /// + /// # Returns + /// * Spent index data for the block + async fn spent_index(&self, block_height: Height) -> Result; + + /// Get UTXOs for a specific block height + /// + /// # Arguments + /// * `block_height` - Block height to query + /// + /// # Returns + /// * Vector of UTXO data + async fn utxos(&self, block_height: Height) -> Result>; + + /// Get the current blockchain tip height + /// + /// # Returns + /// * Current block height + async fn block_height(&self) -> Result; +} diff --git a/spdk-core/src/backend/mod.rs b/spdk-core/src/backend/mod.rs new file mode 100644 index 0000000..35616f7 --- /dev/null +++ b/spdk-core/src/backend/mod.rs @@ -0,0 +1,10 @@ +mod backend; + +// Async backend - available by default, excluded when "sync" feature is enabled +#[cfg(feature = "async")] +mod backend_async; + +pub use backend::{BlockDataIterator, ChainBackend}; + +#[cfg(feature = "async")] +pub use backend_async::{AsyncChainBackend, BlockDataStream}; diff --git a/src/client/client.rs b/spdk-core/src/client/client.rs similarity index 75% rename from src/client/client.rs rename to spdk-core/src/client/client.rs index 0a4c1d0..21ca242 100644 --- a/src/client/client.rs +++ b/spdk-core/src/client/client.rs @@ -1,14 +1,17 @@ -use std::{collections::HashMap, io::Write, str::FromStr}; +use std::{io::Write, str::FromStr}; use bitcoin::hashes::Hash; use bitcoin::{ key::constants::ONE, - secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey}, + secp256k1::{Scalar, Secp256k1, SecretKey}, Network, XOnlyPublicKey, }; use serde::{Deserialize, Serialize}; +use bitcoin::secp256k1::PublicKey; use silentpayments::utils as sp_utils; +use std::collections::HashMap; + use silentpayments::Network as SpNetwork; use silentpayments::{ bitcoin_hashes::sha256, @@ -108,16 +111,42 @@ impl SpClient { &self, tweak_data_vec: Vec, ) -> Result> { - use rayon::prelude::*; let b_scan = &self.get_scan_key(); + // Use parallel iteration for CPU-intensive ECDH calculations + #[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))] + let shared_secrets: Vec = { + use rayon::prelude::*; + tweak_data_vec + .into_par_iter() + .map(|tweak| sp_utils::receiving::calculate_ecdh_shared_secret(&tweak, b_scan)) + .collect() + }; + + // Sequential fallback (WASM or no parallel feature) + #[cfg(not(all(not(target_arch = "wasm32"), feature = "parallel")))] let shared_secrets: Vec = tweak_data_vec - .into_par_iter() + .into_iter() .map(|tweak| sp_utils::receiving::calculate_ecdh_shared_secret(&tweak, b_scan)) .collect(); + // Use parallel iteration for CPU-intensive SPK derivation + #[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))] + let items: Result> = { + use rayon::prelude::*; + shared_secrets + .into_par_iter() + .map(|secret| { + let spks = self.sp_receiver.get_spks_from_shared_secret(&secret)?; + Ok((secret, spks.into_values())) + }) + .collect() + }; + + // Sequential fallback (WASM or no parallel feature) + #[cfg(not(all(not(target_arch = "wasm32"), feature = "parallel")))] let items: Result> = shared_secrets - .into_par_iter() + .into_iter() .map(|secret| { let spks = self.sp_receiver.get_spks_from_shared_secret(&secret)?; diff --git a/src/client/mod.rs b/spdk-core/src/client/mod.rs similarity index 100% rename from src/client/mod.rs rename to spdk-core/src/client/mod.rs diff --git a/src/client/spend.rs b/spdk-core/src/client/spend.rs similarity index 100% rename from src/client/spend.rs rename to spdk-core/src/client/spend.rs diff --git a/src/client/structs.rs b/spdk-core/src/client/structs.rs similarity index 100% rename from src/client/structs.rs rename to spdk-core/src/client/structs.rs diff --git a/src/constants.rs b/spdk-core/src/constants.rs similarity index 100% rename from src/constants.rs rename to spdk-core/src/constants.rs diff --git a/spdk-core/src/lib.rs b/spdk-core/src/lib.rs new file mode 100644 index 0000000..0ecfcae --- /dev/null +++ b/spdk-core/src/lib.rs @@ -0,0 +1,26 @@ +pub mod backend; +pub mod client; +pub mod constants; +pub mod scanner; +pub mod types; +pub mod updater; + +// Re-export core functionality +pub use backend::{BlockDataIterator, ChainBackend}; +pub use client::*; +pub use constants::*; +pub use scanner::SpScanner; +pub use types::*; +pub use updater::Updater; +// Re-export commonly used external types +pub use bdk_coin_select::FeeRate; +pub use bitcoin; +pub use silentpayments; + +// Async types available by default, excluded when "sync" feature is enabled +#[cfg(feature = "async")] +pub use backend::{AsyncChainBackend, BlockDataStream}; +#[cfg(feature = "async")] +pub use scanner::AsyncSpScanner; +#[cfg(feature = "async")] +pub use updater::AsyncUpdater; diff --git a/spdk-core/src/scanner/mod.rs b/spdk-core/src/scanner/mod.rs new file mode 100644 index 0000000..abc3a80 --- /dev/null +++ b/spdk-core/src/scanner/mod.rs @@ -0,0 +1,779 @@ +use std::collections::{HashMap, HashSet}; + +#[cfg(not(all(not(target_arch = "wasm32"), feature = "parallel")))] +use anyhow::Error; +use anyhow::Result; +use bitcoin::{ + absolute::Height, bip158::BlockFilter, Amount, BlockHash, OutPoint, Txid, XOnlyPublicKey, +}; +use silentpayments::receiving::Label; + +#[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))] +use rayon::prelude::*; + +use crate::{BlockData, ChainBackend, FilterData, OwnedOutput, SpClient, Updater, UtxoData}; + +/// Trait for scanning silent payment blocks +/// +/// This trait abstracts the core scanning functionality, allowing consumers +/// to implement it with their own constraints and requirements. +pub trait SpScanner { + /// Scan a range of blocks for silent payment outputs and inputs + /// + /// # Arguments + /// * `start` - Starting block height (inclusive) + /// * `end` - Ending block height (inclusive) + /// * `dust_limit` - Minimum amount to consider (dust outputs are ignored) + /// * `with_cutthrough` - Whether to use cutthrough optimization + fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()>; + + /// Process a single block's data + /// + /// # Arguments + /// * `blockdata` - Block data containing tweaks and filters + /// + /// # Returns + /// * `(found_outputs, found_inputs)` - Tuple of found outputs and spent inputs + fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)>; + + /// Process block outputs to find owned silent payment outputs + /// + /// # Arguments + /// * `blkheight` - Block height + /// * `tweaks` - List of tweak public keys + /// * `new_utxo_filter` - Filter data for new UTXOs + /// + /// # Returns + /// * Map of outpoints to owned outputs + fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result>; + + /// Process block inputs to find spent outputs + /// + /// # Arguments + /// * `blkheight` - Block height + /// * `spent_filter` - Filter data for spent outputs + /// + /// # Returns + /// * Set of spent outpoints + fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result>; + + /// Get the block data iterator for a range of blocks + /// + /// # Arguments + /// * `range` - Range of block heights + /// * `dust_limit` - Minimum amount to consider + /// * `with_cutthrough` - Whether to use cutthrough optimization + /// + /// # Returns + /// * Iterator of block data results + fn get_block_data_iterator( + &self, + range: std::ops::RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> crate::BlockDataIterator; + + /// Check if scanning should be interrupted + /// + /// # Returns + /// * `true` if scanning should stop, `false` otherwise + fn should_interrupt(&self) -> bool; + + /// Save current state to persistent storage + fn save_state(&mut self) -> Result<()>; + + /// Record found outputs for a block + /// + /// # Arguments + /// * `height` - Block height + /// * `block_hash` - Block hash + /// * `outputs` - Found outputs + fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()>; + + /// Record spent inputs for a block + /// + /// # Arguments + /// * `height` - Block height + /// * `block_hash` - Block hash + /// * `inputs` - Spent inputs + fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()>; + + /// Record scan progress + /// + /// # Arguments + /// * `start` - Start height + /// * `current` - Current height + /// * `end` - End height + fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()>; + + /// Get the silent payment client + fn client(&self) -> &SpClient; + + /// Get the chain backend + fn backend(&self) -> &dyn ChainBackend; + + /// Get the updater + fn updater(&mut self) -> &mut dyn Updater; + + // Helper methods with default implementations + + /// Process multiple blocks from an iterator + /// + /// This is a default implementation that can be overridden if needed + fn process_blocks(&mut self, start: Height, end: Height, block_data_iter: I) -> Result<()> + where + I: Iterator>, + { + use std::time::{Duration, Instant}; + + let mut update_time = Instant::now(); + + for blockdata_result in block_data_iter { + let blockdata = blockdata_result?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state()?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata)?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs)?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs)?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end)?; + + if save_to_storage { + self.save_state()?; + update_time = Instant::now(); + } + } + + Ok(()) + } + + /// Helper method to process blocks sequentially + /// + /// # Arguments + /// * `start` - Start height + /// * `end` - End height + /// * `block_data_iter` - Iterator of block data + /// * `with_cutthrough` - Whether cutthrough is enabled (unused, kept for API compatibility) + /// + /// # Returns + /// * Result indicating success or failure + fn process_blocks_auto( + &mut self, + start: Height, + end: Height, + block_data_iter: I, + _with_cutthrough: bool, + ) -> Result<()> + where + I: Iterator>, + { + // Always use sequential processing + self.process_blocks(start, end, block_data_iter) + } + + /// Scan UTXOs for a given block and secrets map + /// + /// This is a default implementation that can be overridden if needed + fn scan_utxos( + &self, + blkheight: Height, + secrets_map: HashMap<[u8; 34], bitcoin::secp256k1::PublicKey>, + ) -> Result, UtxoData, bitcoin::secp256k1::Scalar)>> { + let utxos = self.backend().utxos(blkheight)?; + + // group utxos by the txid + let mut txmap: HashMap> = HashMap::new(); + for utxo in utxos { + txmap.entry(utxo.txid).or_default().push(utxo); + } + + let client = self.client(); + + // Parallel transaction scanning on native platforms with parallel feature + #[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))] + let res: Vec<_> = txmap + .into_par_iter() + .filter_map(|(_, utxos)| { + // check if we know the secret to any of the spks + let secret = utxos.iter().find_map(|utxo| { + let spk = utxo.scriptpubkey.as_bytes(); + secrets_map.get(spk) + })?; + + let output_keys: Vec = utxos + .iter() + .filter_map(|x| { + if x.scriptpubkey.is_p2tr() { + XOnlyPublicKey::from_slice(&x.scriptpubkey.as_bytes()[2..]).ok() + } else { + None + } + }) + .collect(); + + // CPU-intensive cryptographic operation + let ours = client + .sp_receiver + .scan_transaction(secret, output_keys) + .ok()?; + + // Match UTXOs against our keys + let matched: Vec<_> = utxos + .into_iter() + .filter(|utxo| utxo.scriptpubkey.is_p2tr() && !utxo.spent) + .filter_map(|utxo| { + let xonly = + XOnlyPublicKey::from_slice(&utxo.scriptpubkey.as_bytes()[2..]).ok()?; + ours.iter().find_map(|(label, map)| { + map.get(&xonly) + .map(|scalar| (label.clone(), utxo.clone(), *scalar)) + }) + }) + .collect(); + + if matched.is_empty() { + None + } else { + Some(matched) + } + }) + .flatten() + .collect(); + + // Sequential fallback (WASM or no parallel feature) + #[cfg(not(all(not(target_arch = "wasm32"), feature = "parallel")))] + let res: Vec<_> = { + let mut result = Vec::new(); + for utxos in txmap.into_values() { + // check if we know the secret to any of the spks + let mut secret = None; + for utxo in utxos.iter() { + let spk = utxo.scriptpubkey.as_bytes(); + if let Some(s) = secrets_map.get(spk) { + secret = Some(s); + break; + } + } + + // skip this tx if no secret is found + let secret = match secret { + Some(secret) => secret, + None => continue, + }; + + let output_keys: Result> = utxos + .iter() + .filter_map(|x| { + if x.scriptpubkey.is_p2tr() { + Some( + XOnlyPublicKey::from_slice(&x.scriptpubkey.as_bytes()[2..]) + .map_err(Error::new), + ) + } else { + None + } + }) + .collect(); + + let ours = client.sp_receiver.scan_transaction(secret, output_keys?)?; + + for utxo in utxos { + if !utxo.scriptpubkey.is_p2tr() || utxo.spent { + continue; + } + + match XOnlyPublicKey::from_slice(&utxo.scriptpubkey.as_bytes()[2..]) { + Ok(xonly) => { + for (label, map) in ours.iter() { + if let Some(scalar) = map.get(&xonly) { + result.push((label.clone(), utxo, *scalar)); + break; + } + } + } + Err(_) => todo!(), + } + } + } + result + }; + + Ok(res) + } + + /// Check if block contains relevant output transactions + /// + /// This is a default implementation that can be overridden if needed + fn check_block_outputs( + created_utxo_filter: BlockFilter, + blkhash: BlockHash, + candidate_spks: Vec<&[u8; 34]>, + ) -> Result { + // check output scripts + let output_keys: Vec<_> = candidate_spks + .into_iter() + .map(|spk| spk[2..].as_ref()) + .collect(); + + // note: match will always return true for an empty query! + if !output_keys.is_empty() { + Ok(created_utxo_filter.match_any(&blkhash, &mut output_keys.into_iter())?) + } else { + Ok(false) + } + } + + /// Get input hashes for owned outpoints + /// + /// This is a default implementation that can be overridden if needed + fn get_input_hashes(&self, blkhash: BlockHash) -> Result>; + + /// Check if block contains relevant input transactions + /// + /// This is a default implementation that can be overridden if needed + fn check_block_inputs( + &self, + spent_filter: BlockFilter, + blkhash: BlockHash, + input_hashes: Vec<[u8; 8]>, + ) -> Result { + // note: match will always return true for an empty query! + if !input_hashes.is_empty() { + Ok(spent_filter.match_any(&blkhash, &mut input_hashes.into_iter())?) + } else { + Ok(false) + } + } +} + +/// Async version of SpScanner for non-blocking I/O operations +/// +/// This trait provides async methods for scanning silent payment blocks, +/// allowing for concurrent operations and better integration with async ecosystems. +/// Particularly useful for WASM targets and UI applications. +#[cfg(feature = "async")] +#[async_trait::async_trait] +pub trait AsyncSpScanner: Send + Sync { + /// Scan a range of blocks for silent payment outputs and inputs + /// + /// # Arguments + /// * `start` - Starting block height (inclusive) + /// * `end` - Ending block height (inclusive) + /// * `dust_limit` - Minimum amount to consider (dust outputs are ignored) + /// * `with_cutthrough` - Whether to use cutthrough optimization + async fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()>; + + /// Process a single block's data + /// + /// # Arguments + /// * `blockdata` - Block data containing tweaks and filters + /// + /// # Returns + /// * `(found_outputs, found_inputs)` - Tuple of found outputs and spent inputs + async fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)>; + + /// Process block outputs to find owned silent payment outputs + /// + /// # Arguments + /// * `blkheight` - Block height + /// * `tweaks` - List of tweak public keys + /// * `new_utxo_filter` - Filter data for new UTXOs + /// + /// # Returns + /// * Map of outpoints to owned outputs + async fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result>; + + /// Process block inputs to find spent outputs + /// + /// # Arguments + /// * `blkheight` - Block height + /// * `spent_filter` - Filter data for spent outputs + /// + /// # Returns + /// * Set of spent outpoints + async fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result>; + + /// Get the block data stream for a range of blocks + /// + /// # Arguments + /// * `range` - Range of block heights + /// * `dust_limit` - Minimum amount to consider + /// * `with_cutthrough` - Whether to use cutthrough optimization + /// + /// # Returns + /// * Stream of block data results + fn get_block_data_stream( + &self, + range: std::ops::RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> crate::backend::BlockDataStream; + + /// Check if scanning should be interrupted + /// + /// # Returns + /// * `true` if scanning should stop, `false` otherwise + fn should_interrupt(&self) -> bool; + + /// Save current state to persistent storage + async fn save_state(&mut self) -> Result<()>; + + /// Record found outputs for a block + /// + /// # Arguments + /// * `height` - Block height + /// * `block_hash` - Block hash + /// * `outputs` - Found outputs + async fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()>; + + /// Record spent inputs for a block + /// + /// # Arguments + /// * `height` - Block height + /// * `block_hash` - Block hash + /// * `inputs` - Spent inputs + async fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()>; + + /// Record scan progress + /// + /// # Arguments + /// * `start` - Start height + /// * `current` - Current height + /// * `end` - End height + async fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()>; + + /// Get the silent payment client + fn client(&self) -> &SpClient; + + /// Get the async chain backend + fn backend(&self) -> &dyn crate::backend::AsyncChainBackend; + + /// Get the async updater + fn updater(&mut self) -> &mut dyn crate::updater::AsyncUpdater; + + // Helper methods with default implementations + + /// Process multiple blocks from a stream + /// + /// This is a default implementation that can be overridden if needed + async fn process_blocks( + &mut self, + start: Height, + end: Height, + mut block_data_stream: crate::backend::BlockDataStream, + ) -> Result<()> { + use futures::StreamExt; + use std::time::{Duration, Instant}; + + let mut update_time = Instant::now(); + + while let Some(blockdata_result) = block_data_stream.next().await { + let blockdata = blockdata_result?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state().await?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs) + .await?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs).await?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end).await?; + + if save_to_storage { + self.save_state().await?; + update_time = Instant::now(); + } + } + + Ok(()) + } + + /// Scan UTXOs for a given block and secrets map + /// + /// This is a default implementation that can be overridden if needed + async fn scan_utxos( + &self, + blkheight: Height, + secrets_map: HashMap<[u8; 34], bitcoin::secp256k1::PublicKey>, + ) -> Result, UtxoData, bitcoin::secp256k1::Scalar)>> { + let utxos = self.backend().utxos(blkheight).await?; + + // Group utxos by the txid + let mut txmap: HashMap> = HashMap::new(); + for utxo in utxos { + txmap.entry(utxo.txid).or_default().push(utxo); + } + + let client = self.client(); + + // Parallel transaction scanning on native platforms with parallel feature + // This uses Rayon for CPU parallelism. Rayon uses its own thread pool internally, + // so while this blocks the current async task, it doesn't block the entire runtime + // on multi-threaded executors. The CPU work benefits significantly from parallelism. + #[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))] + let res = { + use rayon::prelude::*; + use std::sync::Arc; + + // Clone data needed for parallel processing + let secrets_map = Arc::new(secrets_map); + let client = Arc::new(client.clone()); + + // Run CPU-intensive Rayon work + // Rayon uses its own thread pool, so this parallelizes across CPU cores + txmap + .into_par_iter() + .filter_map(|(_, utxos)| { + // check if we know the secret to any of the spks + let secret = utxos.iter().find_map(|utxo| { + let spk = utxo.scriptpubkey.as_bytes(); + secrets_map.get(spk) + })?; + + let output_keys: Vec = utxos + .iter() + .filter_map(|x| { + if x.scriptpubkey.is_p2tr() { + XOnlyPublicKey::from_slice(&x.scriptpubkey.as_bytes()[2..]).ok() + } else { + None + } + }) + .collect(); + + // CPU-intensive cryptographic operation + let ours = client + .sp_receiver + .scan_transaction(secret, output_keys) + .ok()?; + + // Match UTXOs against our keys + let matched: Vec<_> = utxos + .into_iter() + .filter(|utxo| utxo.scriptpubkey.is_p2tr() && !utxo.spent) + .filter_map(|utxo| { + let xonly = + XOnlyPublicKey::from_slice(&utxo.scriptpubkey.as_bytes()[2..]).ok()?; + ours.iter().find_map(|(label, map)| { + map.get(&xonly) + .map(|scalar| (label.clone(), utxo.clone(), *scalar)) + }) + }) + .collect(); + + if matched.is_empty() { + None + } else { + Some(matched) + } + }) + .flatten() + .collect() + }; + + // Sequential fallback (WASM or no parallel feature) + #[cfg(not(all(not(target_arch = "wasm32"), feature = "parallel")))] + let res: Vec<_> = { + let mut result = Vec::new(); + for utxos in txmap.into_values() { + // check if we know the secret to any of the spks + let mut secret = None; + for utxo in utxos.iter() { + let spk = utxo.scriptpubkey.as_bytes(); + if let Some(s) = secrets_map.get(spk) { + secret = Some(s); + break; + } + } + + // skip this tx if no secret is found + let secret = match secret { + Some(secret) => secret, + None => continue, + }; + + let output_keys: Result> = utxos + .iter() + .filter_map(|x| { + if x.scriptpubkey.is_p2tr() { + Some( + XOnlyPublicKey::from_slice(&x.scriptpubkey.as_bytes()[2..]) + .map_err(|e| anyhow::Error::new(e)), + ) + } else { + None + } + }) + .collect(); + + let ours = client.sp_receiver.scan_transaction(secret, output_keys?)?; + + for utxo in utxos { + if !utxo.scriptpubkey.is_p2tr() || utxo.spent { + continue; + } + + match XOnlyPublicKey::from_slice(&utxo.scriptpubkey.as_bytes()[2..]) { + Ok(xonly) => { + for (label, map) in ours.iter() { + if let Some(scalar) = map.get(&xonly) { + result.push((label.clone(), utxo.clone(), *scalar)); + break; + } + } + } + Err(_) => todo!(), + } + } + } + result + }; + + Ok(res) + } + + /// Check if block contains relevant output transactions + /// + /// This is a default implementation that can be overridden if needed + fn check_block_outputs( + created_utxo_filter: BlockFilter, + blkhash: BlockHash, + candidate_spks: Vec<&[u8; 34]>, + ) -> Result { + // check output scripts + let output_keys: Vec<_> = candidate_spks + .into_iter() + .map(|spk| spk[2..].as_ref()) + .collect(); + + // note: match will always return true for an empty query! + if !output_keys.is_empty() { + Ok(created_utxo_filter.match_any(&blkhash, &mut output_keys.into_iter())?) + } else { + Ok(false) + } + } + + /// Get input hashes for owned outpoints + /// + /// This is a default implementation that can be overridden if needed + async fn get_input_hashes(&self, blkhash: BlockHash) -> Result>; + + /// Check if block contains relevant input transactions + /// + /// This is a default implementation that can be overridden if needed + fn check_block_inputs( + &self, + spent_filter: BlockFilter, + blkhash: BlockHash, + input_hashes: Vec<[u8; 8]>, + ) -> Result { + // note: match will always return true for an empty query! + if !input_hashes.is_empty() { + Ok(spent_filter.match_any(&blkhash, &mut input_hashes.into_iter())?) + } else { + Ok(false) + } + } +} diff --git a/src/backend/structs.rs b/spdk-core/src/types.rs similarity index 97% rename from src/backend/structs.rs rename to spdk-core/src/types.rs index b5ee039..3cd9ad4 100644 --- a/src/backend/structs.rs +++ b/spdk-core/src/types.rs @@ -8,6 +8,7 @@ pub struct BlockData { pub spent_filter: FilterData, } +#[derive(Clone)] pub struct UtxoData { pub txid: Txid, pub vout: u32, diff --git a/spdk-core/src/updater/mod.rs b/spdk-core/src/updater/mod.rs new file mode 100644 index 0000000..c82b597 --- /dev/null +++ b/spdk-core/src/updater/mod.rs @@ -0,0 +1,5 @@ +mod updater; + +#[cfg(feature = "async")] +pub use updater::AsyncUpdater; +pub use updater::Updater; diff --git a/spdk-core/src/updater/updater.rs b/spdk-core/src/updater/updater.rs new file mode 100644 index 0000000..bd13893 --- /dev/null +++ b/spdk-core/src/updater/updater.rs @@ -0,0 +1,86 @@ +use std::collections::{HashMap, HashSet}; + +use bitcoin::{absolute::Height, BlockHash, OutPoint}; + +use anyhow::Result; + +use crate::OwnedOutput; + +/// Trait for persisting scan results and progress +/// +/// This trait provides synchronous methods for recording scanning progress, +/// found outputs, and spent inputs. Implementations should handle persistence +/// to storage (database, file system, etc.). +/// +/// For async operations, see `AsyncUpdater`. +pub trait Updater { + /// Ask the updater to record the scanning progress. + fn record_scan_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()>; + + /// Ask the updater to record the outputs found in a block. + fn record_block_outputs( + &mut self, + height: Height, + blkhash: BlockHash, + found_outputs: HashMap, + ) -> Result<()>; + + /// Ask the updater to record the inputs found in a block. + fn record_block_inputs( + &mut self, + blkheight: Height, + blkhash: BlockHash, + found_inputs: HashSet, + ) -> Result<()>; + + /// Ask the updater to save all recorded changes to persistent storage. + fn save_to_persistent_storage(&mut self) -> Result<()>; +} + +/// Async version of Updater for non-blocking I/O operations +/// Available by default, excluded when "sync" feature is enabled +#[cfg(feature = "async")] +#[async_trait::async_trait] +pub trait AsyncUpdater: Send + Sync { + /// Ask the updater to record the scanning progress. + /// + /// # Arguments + /// * `start` - Starting block height + /// * `current` - Current block height being processed + /// * `end` - Ending block height + async fn record_scan_progress( + &mut self, + start: Height, + current: Height, + end: Height, + ) -> Result<()>; + + /// Ask the updater to record the outputs found in a block. + /// + /// # Arguments + /// * `height` - Block height + /// * `blkhash` - Block hash + /// * `found_outputs` - Map of found outputs + async fn record_block_outputs( + &mut self, + height: Height, + blkhash: BlockHash, + found_outputs: HashMap, + ) -> Result<()>; + + /// Ask the updater to record the inputs found in a block. + /// + /// # Arguments + /// * `blkheight` - Block height + /// * `blkhash` - Block hash + /// * `found_inputs` - Set of spent inputs + async fn record_block_inputs( + &mut self, + blkheight: Height, + blkhash: BlockHash, + found_inputs: HashSet, + ) -> Result<()>; + + /// Ask the updater to save all recorded changes to persistent storage. + async fn save_to_persistent_storage(&mut self) -> Result<()>; +} diff --git a/src/backend/backend.rs b/src/backend/backend.rs deleted file mode 100644 index d1892f0..0000000 --- a/src/backend/backend.rs +++ /dev/null @@ -1,24 +0,0 @@ -use std::{ops::RangeInclusive, pin::Pin}; - -use anyhow::Result; -use async_trait::async_trait; -use bitcoin::{absolute::Height, Amount}; -use futures::Stream; - -use super::structs::{BlockData, SpentIndexData, UtxoData}; - -#[async_trait] -pub trait ChainBackend { - fn get_block_data_for_range( - &self, - range: RangeInclusive, - dust_limit: Amount, - with_cutthrough: bool, - ) -> Pin> + Send>>; - - async fn spent_index(&self, block_height: Height) -> Result; - - async fn utxos(&self, block_height: Height) -> Result>; - - async fn block_height(&self) -> Result; -} diff --git a/src/backend/blindbit/backend/backend.rs b/src/backend/blindbit/backend/backend.rs deleted file mode 100644 index ba47e70..0000000 --- a/src/backend/blindbit/backend/backend.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::{ops::RangeInclusive, pin::Pin, sync::Arc}; - -use async_trait::async_trait; -use bitcoin::{absolute::Height, Amount}; -use futures::{stream, Stream, StreamExt}; - -use anyhow::Result; - -use crate::{backend::blindbit::BlindbitClient, BlockData, ChainBackend, SpentIndexData, UtxoData}; - -const CONCURRENT_FILTER_REQUESTS: usize = 200; - -#[derive(Debug)] -pub struct BlindbitBackend { - client: BlindbitClient, -} - -impl BlindbitBackend { - pub fn new(blindbit_url: String) -> Result { - Ok(Self { - client: BlindbitClient::new(blindbit_url)?, - }) - } -} - -#[async_trait] -impl ChainBackend for BlindbitBackend { - /// High-level function to get block data for a range of blocks. - /// Block data includes all the information needed to determine if a block is relevant for scanning, - /// but does not include utxos, or spent index. - /// These need to be fetched separately afterwards, if it is determined this block is relevant. - fn get_block_data_for_range( - &self, - range: RangeInclusive, - dust_limit: Amount, - with_cutthrough: bool, - ) -> Pin> + Send>> { - let client = Arc::new(self.client.clone()); - - let res = stream::iter(range) - .map(move |n| { - let client = client.clone(); - - async move { - let blkheight = Height::from_consensus(n)?; - let tweaks = match with_cutthrough { - true => client.tweaks(blkheight, dust_limit).await?, - false => client.tweak_index(blkheight, dust_limit).await?, - }; - let new_utxo_filter = client.filter_new_utxos(blkheight).await?; - let spent_filter = client.filter_spent(blkheight).await?; - let blkhash = new_utxo_filter.block_hash; - Ok(BlockData { - blkheight, - blkhash, - tweaks, - new_utxo_filter: new_utxo_filter.into(), - spent_filter: spent_filter.into(), - }) - } - }) - .buffered(CONCURRENT_FILTER_REQUESTS); - - Box::pin(res) - } - - async fn spent_index(&self, block_height: Height) -> Result { - self.client.spent_index(block_height).await.map(Into::into) - } - - async fn utxos(&self, block_height: Height) -> Result> { - Ok(self - .client - .utxos(block_height) - .await? - .into_iter() - .map(Into::into) - .collect()) - } - - async fn block_height(&self) -> Result { - self.client.block_height().await - } -} diff --git a/src/backend/blindbit/backend/mod.rs b/src/backend/blindbit/backend/mod.rs deleted file mode 100644 index e48b365..0000000 --- a/src/backend/blindbit/backend/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod backend; - -pub use backend::BlindbitBackend; diff --git a/src/backend/blindbit/client/mod.rs b/src/backend/blindbit/client/mod.rs deleted file mode 100644 index 82f00d4..0000000 --- a/src/backend/blindbit/client/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod client; -pub mod structs; - -pub use client::BlindbitClient; diff --git a/src/backend/blindbit/mod.rs b/src/backend/blindbit/mod.rs deleted file mode 100644 index 4dc245c..0000000 --- a/src/backend/blindbit/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod backend; -mod client; - -pub use backend::BlindbitBackend; -pub use client::BlindbitClient; diff --git a/src/backend/mod.rs b/src/backend/mod.rs deleted file mode 100644 index d81e60c..0000000 --- a/src/backend/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod backend; -#[cfg(feature = "blindbit-backend")] -mod blindbit; -mod structs; - -pub use backend::ChainBackend; -pub use structs::*; - -#[cfg(feature = "blindbit-backend")] -pub use blindbit::BlindbitBackend; - -#[cfg(feature = "blindbit-backend")] -pub use blindbit::BlindbitClient; diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index c055981..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -mod backend; -mod client; -pub mod constants; -mod scanner; -mod updater; - -pub use bdk_coin_select::FeeRate; -pub use bitcoin; -pub use silentpayments; - -pub use backend::*; -pub use client::*; -pub use scanner::SpScanner; -pub use updater::Updater; diff --git a/src/scanner/mod.rs b/src/scanner/mod.rs deleted file mode 100644 index 2d6bea2..0000000 --- a/src/scanner/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod scanner; - -pub use scanner::SpScanner; diff --git a/src/scanner/scanner.rs b/src/scanner/scanner.rs deleted file mode 100644 index 5ecc765..0000000 --- a/src/scanner/scanner.rs +++ /dev/null @@ -1,380 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::atomic::AtomicBool, - time::{Duration, Instant}, -}; - -use anyhow::{bail, Error, Result}; -use bitcoin::{ - absolute::Height, - bip158::BlockFilter, - hashes::{sha256, Hash}, - secp256k1::{PublicKey, Scalar}, - Amount, BlockHash, OutPoint, Txid, XOnlyPublicKey, -}; -use futures::{pin_mut, Stream, StreamExt}; -use log::info; -use silentpayments::receiving::Label; - -use crate::{ - backend::{BlockData, ChainBackend, FilterData, UtxoData}, - client::{OutputSpendStatus, OwnedOutput, SpClient}, - updater::Updater, -}; - -pub struct SpScanner<'a> { - updater: Box, - backend: Box, - client: SpClient, - keep_scanning: &'a AtomicBool, // used to interrupt scanning - owned_outpoints: HashSet, // used to scan block inputs -} - -impl<'a> SpScanner<'a> { - pub fn new( - client: SpClient, - updater: Box, - backend: Box, - owned_outpoints: HashSet, - keep_scanning: &'a AtomicBool, - ) -> Self { - Self { - client, - updater, - backend, - owned_outpoints, - keep_scanning, - } - } - - pub async fn scan_blocks( - &mut self, - start: Height, - end: Height, - dust_limit: Amount, - with_cutthrough: bool, - ) -> Result<()> { - if start > end { - bail!("bigger start than end: {} > {}", start, end); - } - - info!("start: {} end: {}", start, end); - let start_time: Instant = Instant::now(); - - // get block data stream - let range = start.to_consensus_u32()..=end.to_consensus_u32(); - let block_data_stream = - self.backend - .get_block_data_for_range(range, dust_limit, with_cutthrough); - - // process blocks using block data stream - self.process_blocks(start, end, block_data_stream).await?; - - // time elapsed for the scan - info!( - "Blindbit scan complete in {} seconds", - start_time.elapsed().as_secs() - ); - - Ok(()) - } - - async fn process_blocks( - &mut self, - start: Height, - end: Height, - block_data_stream: impl Stream>, - ) -> Result<()> { - pin_mut!(block_data_stream); - - let mut update_time: Instant = Instant::now(); - - while let Some(blockdata) = block_data_stream.next().await { - let blockdata = blockdata?; - let blkheight = blockdata.blkheight; - let blkhash = blockdata.blkhash; - - // stop scanning and return if interrupted - if self.interrupt_requested() { - self.updater.save_to_persistent_storage()?; - return Ok(()); - } - - let mut save_to_storage = false; - - // always save on last block or after 30 seconds since last save - if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { - save_to_storage = true; - } - - let (found_outputs, found_inputs) = self.process_block(blockdata).await?; - - if !found_outputs.is_empty() { - save_to_storage = true; - self.updater - .record_block_outputs(blkheight, blkhash, found_outputs)?; - } - - if !found_inputs.is_empty() { - save_to_storage = true; - self.updater - .record_block_inputs(blkheight, blkhash, found_inputs)?; - } - - // tell the updater we scanned this block - self.updater.record_scan_progress(start, blkheight, end)?; - - if save_to_storage { - self.updater.save_to_persistent_storage()?; - update_time = Instant::now(); - } - } - - Ok(()) - } - - async fn process_block( - &mut self, - blockdata: BlockData, - ) -> Result<(HashMap, HashSet)> { - let BlockData { - blkheight, - tweaks, - new_utxo_filter, - spent_filter, - .. - } = blockdata; - - let outs = self - .process_block_outputs(blkheight, tweaks, new_utxo_filter) - .await?; - - // after processing outputs, we add the found outputs to our list - self.owned_outpoints.extend(outs.keys()); - - let ins = self.process_block_inputs(blkheight, spent_filter).await?; - - // after processing inputs, we remove the found inputs - self.owned_outpoints.retain(|item| !ins.contains(item)); - - Ok((outs, ins)) - } - - async fn process_block_outputs( - &self, - blkheight: Height, - tweaks: Vec, - new_utxo_filter: FilterData, - ) -> Result> { - let mut res = HashMap::new(); - - if !tweaks.is_empty() { - let secrets_map = self.client.get_script_to_secret_map(tweaks)?; - - //last_scan = last_scan.max(n as u32); - let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); - - //get block gcs & check match - let blkfilter = BlockFilter::new(&new_utxo_filter.data); - let blkhash = new_utxo_filter.block_hash; - - let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; - - //if match: fetch and scan utxos - if matched_outputs { - info!("matched outputs on: {}", blkheight); - let found = self.scan_utxos(blkheight, secrets_map).await?; - - if !found.is_empty() { - for (label, utxo, tweak) in found { - let outpoint = OutPoint { - txid: utxo.txid, - vout: utxo.vout, - }; - - let out = OwnedOutput { - blockheight: blkheight, - tweak: tweak.to_be_bytes(), - amount: utxo.value, - script: utxo.scriptpubkey, - label, - spend_status: OutputSpendStatus::Unspent, - }; - - res.insert(outpoint, out); - } - } - } - } - Ok(res) - } - - async fn process_block_inputs( - &self, - blkheight: Height, - spent_filter: FilterData, - ) -> Result> { - let mut res = HashSet::new(); - - let blkhash = spent_filter.block_hash; - - // first get the 8-byte hashes used to construct the input filter - let input_hashes_map = self.get_input_hashes(blkhash)?; - - // check against filter - let blkfilter = BlockFilter::new(&spent_filter.data); - let matched_inputs = self.check_block_inputs( - blkfilter, - blkhash, - input_hashes_map.keys().cloned().collect(), - )?; - - // if match: download spent data, collect the outpoints that are spent - if matched_inputs { - info!("matched inputs on: {}", blkheight); - let spent = self.backend.spent_index(blkheight).await?.data; - - for spent in spent { - let hex: &[u8] = spent.as_ref(); - - if let Some(outpoint) = input_hashes_map.get(hex) { - res.insert(*outpoint); - } - } - } - Ok(res) - } - - async fn scan_utxos( - &self, - blkheight: Height, - secrets_map: HashMap<[u8; 34], PublicKey>, - ) -> Result, UtxoData, Scalar)>> { - let utxos = self.backend.utxos(blkheight).await?; - - let mut res: Vec<(Option