diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3c50b2a0041..2658ff454e9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -62,52 +62,35 @@ jobs: - name: Set RUSTFLAGS to deny warnings if: "matrix.toolchain == '1.75.0'" run: echo "RUSTFLAGS=-D warnings" >> "$GITHUB_ENV" - - name: Run CI script - shell: bash # Default on Winblows is powershell - run: CI_ENV=1 CI_MINIMIZE_DISK_USAGE=1 ./ci/ci-tests.sh - - build-tx-sync: - strategy: - fail-fast: false - matrix: - platform: [ ubuntu-latest, macos-latest ] - toolchain: [ stable, beta, 1.75.0 ] - runs-on: ${{ matrix.platform }} - steps: - - name: Checkout source code - uses: actions/checkout@v4 - - name: Install Rust ${{ matrix.toolchain }} toolchain - run: | - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ matrix.toolchain }} - - name: Set RUSTFLAGS to deny warnings - if: "matrix.toolchain == '1.75.0'" - run: echo "RUSTFLAGS=-D warnings" >> "$GITHUB_ENV" - name: Enable caching for bitcoind + if: matrix.platform != 'windows-latest' id: cache-bitcoind uses: actions/cache@v4 with: path: bin/bitcoind-${{ runner.os }}-${{ runner.arch }} key: bitcoind-${{ runner.os }}-${{ runner.arch }} - name: Enable caching for electrs + if: matrix.platform != 'windows-latest' id: cache-electrs uses: actions/cache@v4 with: path: bin/electrs-${{ runner.os }}-${{ runner.arch }} key: electrs-${{ runner.os }}-${{ runner.arch }} - name: Download bitcoind/electrs - if: "steps.cache-bitcoind.outputs.cache-hit != 'true' || steps.cache-electrs.outputs.cache-hit != 'true'" + if: "matrix.platform != 'windows-latest' && (steps.cache-bitcoind.outputs.cache-hit != 'true' || steps.cache-electrs.outputs.cache-hit != 'true')" run: | source ./contrib/download_bitcoind_electrs.sh mkdir bin mv "$BITCOIND_EXE" bin/bitcoind-${{ runner.os }}-${{ runner.arch }} mv "$ELECTRS_EXE" bin/electrs-${{ runner.os }}-${{ runner.arch }} - name: Set bitcoind/electrs environment variables + if: matrix.platform != 'windows-latest' run: | echo "BITCOIND_EXE=$( pwd )/bin/bitcoind-${{ runner.os }}-${{ runner.arch }}" >> "$GITHUB_ENV" echo "ELECTRS_EXE=$( pwd )/bin/electrs-${{ runner.os }}-${{ runner.arch }}" >> "$GITHUB_ENV" - name: Run CI script shell: bash # Default on Winblows is powershell - run: CI_ENV=1 CI_MINIMIZE_DISK_USAGE=1 ./ci/ci-tx-sync-tests.sh + run: CI_ENV=1 CI_MINIMIZE_DISK_USAGE=1 ./ci/ci-tests.sh coverage: needs: fuzz diff --git a/Cargo.toml b/Cargo.toml index f9f7406339e..a0895fe1641 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,11 @@ members = [ "lightning-macros", "lightning-dns-resolver", "lightning-liquidity", + "lightning-transaction-sync", "possiblyrandom", ] exclude = [ - "lightning-transaction-sync", "lightning-tests", "ext-functional-test-demo", "no-std-check", diff --git a/ci/check-lint.sh b/ci/check-lint.sh index 39c10692310..c1f1b08a1e1 100755 --- a/ci/check-lint.sh +++ b/ci/check-lint.sh @@ -107,7 +107,8 @@ CLIPPY() { -A clippy::useless_conversion \ -A clippy::manual_repeat_n `# to be removed once we hit MSRV 1.86` \ -A clippy::manual_is_multiple_of `# to be removed once we hit MSRV 1.87` \ - -A clippy::uninlined-format-args + -A clippy::uninlined-format-args \ + -A clippy::manual-async-fn # Not really sure why this is even a warning when there's a Send bound } CLIPPY diff --git a/ci/ci-tests.sh b/ci/ci-tests.sh index 91ead9903cb..488c5ac4826 100755 --- a/ci/ci-tests.sh +++ b/ci/ci-tests.sh @@ -2,7 +2,6 @@ #shellcheck disable=SC2002,SC2207 set -eox pipefail -# Currently unused as we don't have to pin anything for MSRV: RUSTC_MINOR_VERSION=$(rustc --version | awk '{ split($2,a,"."); print a[2] }') # Some crates require pinning to meet our MSRV even for our downstream users, @@ -20,6 +19,9 @@ PIN_RELEASE_DEPS # pin the release dependencies in our main workspace # proptest 1.9.0 requires rustc 1.82.0 [ "$RUSTC_MINOR_VERSION" -lt 82 ] && cargo update -p proptest --precise "1.8.0" --verbose +# Starting with version 1.2.0, the `idna_adapter` crate has an MSRV of rustc 1.81.0. +[ "$RUSTC_MINOR_VERSION" -lt 81 ] && cargo update -p idna_adapter --precise "1.1.0" --verbose + export RUST_BACKTRACE=1 echo -e "\n\nChecking the workspace, except lightning-transaction-sync." @@ -57,6 +59,23 @@ cargo check -p lightning-block-sync --verbose --color always --features rpc-clie cargo test -p lightning-block-sync --verbose --color always --features rpc-client,rest-client,tokio cargo check -p lightning-block-sync --verbose --color always --features rpc-client,rest-client,tokio +echo -e "\n\nChecking Transaction Sync Clients with features." +cargo check -p lightning-transaction-sync --verbose --color always --features esplora-blocking +cargo check -p lightning-transaction-sync --verbose --color always --features esplora-async +cargo check -p lightning-transaction-sync --verbose --color always --features esplora-async-https +cargo check -p lightning-transaction-sync --verbose --color always --features electrum + +if [ -z "$CI_ENV" ] && [[ -z "$BITCOIND_EXE" || -z "$ELECTRS_EXE" ]]; then + echo -e "\n\nSkipping testing Transaction Sync Clients due to BITCOIND_EXE or ELECTRS_EXE being unset." + cargo check -p lightning-transaction-sync --tests +else + echo -e "\n\nTesting Transaction Sync Clients with features." + cargo test -p lightning-transaction-sync --verbose --color always --features esplora-blocking + cargo test -p lightning-transaction-sync --verbose --color always --features esplora-async + cargo test -p lightning-transaction-sync --verbose --color always --features esplora-async-https + cargo test -p lightning-transaction-sync --verbose --color always --features electrum +fi + echo -e "\n\nChecking and testing lightning-persister with features" cargo test -p lightning-persister --verbose --color always --features tokio cargo check -p lightning-persister --verbose --color always --features tokio diff --git a/ci/ci-tx-sync-tests.sh b/ci/ci-tx-sync-tests.sh deleted file mode 100755 index 0839e2ced3d..00000000000 --- a/ci/ci-tx-sync-tests.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -set -eox pipefail - -RUSTC_MINOR_VERSION=$(rustc --version | awk '{ split($2,a,"."); print a[2] }') - -pushd lightning-transaction-sync - -# Some crates require pinning to meet our MSRV even for our downstream users, -# which we do here. -# Further crates which appear only as dev-dependencies are pinned further down. -function PIN_RELEASE_DEPS { - return 0 # Don't fail the script if our rustc is higher than the last check -} - -PIN_RELEASE_DEPS # pin the release dependencies - -# Starting with version 1.2.0, the `idna_adapter` crate has an MSRV of rustc 1.81.0. -[ "$RUSTC_MINOR_VERSION" -lt 81 ] && cargo update -p idna_adapter --precise "1.1.0" --verbose - -export RUST_BACKTRACE=1 - -echo -e "\n\nChecking Transaction Sync Clients with features." -cargo check --verbose --color always --features esplora-blocking -cargo check --verbose --color always --features esplora-async -cargo check --verbose --color always --features esplora-async-https -cargo check --verbose --color always --features electrum - -if [ -z "$CI_ENV" ] && [[ -z "$BITCOIND_EXE" || -z "$ELECTRS_EXE" ]]; then - echo -e "\n\nSkipping testing Transaction Sync Clients due to BITCOIND_EXE or ELECTRS_EXE being unset." - cargo check --tests -else - echo -e "\n\nTesting Transaction Sync Clients with features." - cargo test --verbose --color always --features esplora-blocking - cargo test --verbose --color always --features esplora-async - cargo test --verbose --color always --features esplora-async-https - cargo test --verbose --color always --features electrum -fi - -popd diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 19333c5823a..bc0d42ac191 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -41,6 +41,8 @@ use lightning::events::ReplayEvent; use lightning::events::{Event, PathFailure}; use lightning::util::ser::Writeable; +#[cfg(not(c_bindings))] +use lightning::io::Error; use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; @@ -51,6 +53,8 @@ use lightning::routing::utxo::UtxoLookup; use lightning::sign::{ ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender, }; +#[cfg(not(c_bindings))] +use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -83,7 +87,11 @@ use std::time::Instant; #[cfg(not(feature = "std"))] use alloc::boxed::Box; #[cfg(all(not(c_bindings), not(feature = "std")))] +use alloc::string::String; +#[cfg(all(not(c_bindings), not(feature = "std")))] use alloc::sync::Arc; +#[cfg(all(not(c_bindings), not(feature = "std")))] +use alloc::vec::Vec; /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its @@ -416,6 +424,37 @@ pub const NO_ONION_MESSENGER: Option< >, > = None; +#[cfg(not(c_bindings))] +/// A panicking implementation of [`KVStore`] that is used in [`NO_LIQUIDITY_MANAGER`]. +pub struct DummyKVStore; + +#[cfg(not(c_bindings))] +impl KVStore for DummyKVStore { + fn read( + &self, _: &str, _: &str, _: &str, + ) -> impl core::future::Future, Error>> + MaybeSend + 'static { + async { unimplemented!() } + } + + fn write( + &self, _: &str, _: &str, _: &str, _: Vec, + ) -> impl core::future::Future> + MaybeSend + 'static { + async { unimplemented!() } + } + + fn remove( + &self, _: &str, _: &str, _: &str, _: bool, + ) -> impl core::future::Future> + MaybeSend + 'static { + async { unimplemented!() } + } + + fn list( + &self, _: &str, _: &str, + ) -> impl core::future::Future, Error>> + MaybeSend + 'static { + async { unimplemented!() } + } +} + /// When initializing a background processor without a liquidity manager, this can be used to avoid /// specifying a concrete `LiquidityManager` type. #[cfg(not(c_bindings))] @@ -430,8 +469,8 @@ pub const NO_LIQUIDITY_MANAGER: Option< CM = &DynChannelManager, Filter = dyn chain::Filter + Send + Sync, C = &(dyn chain::Filter + Send + Sync), - KVStore = dyn lightning::util::persist::KVStore + Send + Sync, - K = &(dyn lightning::util::persist::KVStore + Send + Sync), + KVStore = DummyKVStore, + K = &DummyKVStore, TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync, TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync), BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 0fe221b9231..596098350c7 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -2,7 +2,7 @@ //! current UTXO set. This module defines an implementation of the LDK API required to do so //! against a [`BlockSource`] which implements a few additional methods for accessing the UTXO set. -use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError}; +use crate::{BlockData, BlockSource, BlockSourceError, BlockSourceResult}; use bitcoin::block::Block; use bitcoin::constants::ChainHash; @@ -18,7 +18,7 @@ use lightning::util::native_async::FutureSpawner; use std::collections::VecDeque; use std::future::Future; use std::ops::Deref; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::{Arc, Mutex}; use std::task::Poll; @@ -35,11 +35,13 @@ pub trait UtxoSource: BlockSource + 'static { /// for gossip validation. fn get_block_hash_by_height<'a>( &'a self, block_height: u32, - ) -> AsyncBlockSourceResult<'a, BlockHash>; + ) -> impl Future> + Send + 'a; /// Returns true if the given output has *not* been spent, i.e. is a member of the current UTXO /// set. - fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>; + fn is_output_unspent<'a>( + &'a self, outpoint: OutPoint, + ) -> impl Future> + Send + 'a; } #[cfg(feature = "tokio")] @@ -55,34 +57,37 @@ impl FutureSpawner for TokioSpawner { /// A trivial future which joins two other futures and polls them at the same time, returning only /// once both complete. pub(crate) struct Joiner< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, + 'a, + A: Future), BlockSourceError>>, + B: Future>, > { - pub a: A, - pub b: B, + pub a: Pin<&'a mut A>, + pub b: Pin<&'a mut B>, a_res: Option<(BlockHash, Option)>, b_res: Option, } impl< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, - > Joiner + 'a, + A: Future), BlockSourceError>>, + B: Future>, + > Joiner<'a, A, B> { - fn new(a: A, b: B) -> Self { + fn new(a: Pin<&'a mut A>, b: Pin<&'a mut B>) -> Self { Self { a, b, a_res: None, b_res: None } } } impl< - A: Future), BlockSourceError>> + Unpin, - B: Future> + Unpin, - > Future for Joiner + 'a, + A: Future), BlockSourceError>>, + B: Future>, + > Future for Joiner<'a, A, B> { type Output = Result<((BlockHash, Option), BlockHash), BlockSourceError>; fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { if self.a_res.is_none() { - match Pin::new(&mut self.a).poll(ctx) { + match self.a.as_mut().poll(ctx) { Poll::Ready(res) => { if let Ok(ok) = res { self.a_res = Some(ok); @@ -94,7 +99,7 @@ impl< } } if self.b_res.is_none() { - match Pin::new(&mut self.b).poll(ctx) { + match self.b.as_mut().poll(ctx) { Poll::Ready(res) => { if let Ok(ok) = res { self.b_res = Some(ok); @@ -200,10 +205,12 @@ where } } - let ((_, tip_height_opt), block_hash) = - Joiner::new(source.get_best_block(), source.get_block_hash_by_height(block_height)) - .await - .map_err(|_| UtxoLookupError::UnknownTx)?; + let ((_, tip_height_opt), block_hash) = Joiner::new( + pin!(source.get_best_block()), + pin!(source.get_block_hash_by_height(block_height)), + ) + .await + .map_err(|_| UtxoLookupError::UnknownTx)?; if let Some(tip_height) = tip_height_opt { // If the block doesn't yet have five confirmations, error out. // diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 8656ba6ec6b..02593047658 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -53,7 +53,6 @@ use lightning::chain::{BestBlock, Listen}; use std::future::Future; use std::ops::Deref; -use std::pin::Pin; /// Abstract type for retrieving block headers and data. pub trait BlockSource: Sync + Send { @@ -65,12 +64,13 @@ pub trait BlockSource: Sync + Send { /// when `height_hint` is `None`. fn get_header<'a>( &'a self, header_hash: &'a BlockHash, height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData>; + ) -> impl Future> + Send + 'a; /// Returns the block for a given hash. A headers-only block source should return a `Transient` /// error. - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) - -> AsyncBlockSourceResult<'a, BlockData>; + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> impl Future> + Send + 'a; /// Returns the hash of the best block and, optionally, its height. /// @@ -78,18 +78,14 @@ pub trait BlockSource: Sync + Send { /// to allow for a more efficient lookup. /// /// [`get_header`]: Self::get_header - fn get_best_block(&self) -> AsyncBlockSourceResult<'_, (BlockHash, Option)>; + fn get_best_block<'a>( + &'a self, + ) -> impl Future)>> + Send + 'a; } /// Result type for `BlockSource` requests. pub type BlockSourceResult = Result; -// TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details, -// see: https://areweasyncyet.rs. -/// Result type for asynchronous `BlockSource` requests. -pub type AsyncBlockSourceResult<'a, T> = - Pin> + 'a + Send>>; - /// Error type for `BlockSource` requests. /// /// Transient errors may be resolved when re-polling, but no attempt will be made to re-poll on diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 843cc961899..13e0403c3b6 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -1,14 +1,12 @@ //! Adapters that make one or more [`BlockSource`]s simpler to poll for new chain tip transitions. -use crate::{ - AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, - BlockSourceResult, -}; +use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; use lightning::chain::BestBlock; +use std::future::Future; use std::ops::Deref; /// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving @@ -22,17 +20,17 @@ pub trait Poll { /// Returns a chain tip in terms of its relationship to the provided chain tip. fn poll_chain_tip<'a>( &'a self, best_known_chain_tip: ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ChainTip>; + ) -> impl Future> + Send + 'a; /// Returns the header that preceded the given header in the chain. fn look_up_previous_header<'a>( &'a self, header: &'a ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader>; + ) -> impl Future> + Send + 'a; /// Returns the block associated with the given header. fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ValidatedBlock>; + ) -> impl Future> + Send + 'a; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -217,8 +215,8 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll { fn poll_chain_tip<'a>( &'a self, best_known_chain_tip: ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ChainTip> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let (block_hash, height) = self.block_source.get_best_block().await?; if block_hash == best_known_chain_tip.header.block_hash() { return Ok(ChainTip::Common); @@ -231,13 +229,13 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll } else { Ok(ChainTip::Worse(chain_tip)) } - }) + } } fn look_up_previous_header<'a>( &'a self, header: &'a ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { if header.height == 0 { return Err(BlockSourceError::persistent("genesis block reached")); } @@ -252,15 +250,13 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll header.check_builds_on(&previous_header, self.network)?; Ok(previous_header) - }) + } } fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, - ) -> AsyncBlockSourceResult<'a, ValidatedBlock> { - Box::pin(async move { - self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) - }) + ) -> impl Future> + Send + 'a { + async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) } } } diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 1f79ab4a0b0..619981bb4d0 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -4,13 +4,14 @@ use crate::convert::GetUtxosResponse; use crate::gossip::UtxoSource; use crate::http::{BinaryResponse, HttpClient, HttpEndpoint, JsonResponse}; -use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; +use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; use bitcoin::OutPoint; use std::convert::TryFrom; use std::convert::TryInto; +use std::future::Future; use std::sync::Mutex; /// A simple REST client for requesting resources using HTTP `GET`. @@ -49,49 +50,51 @@ impl RestClient { impl BlockSource for RestClient { fn get_header<'a>( &'a self, header_hash: &'a BlockHash, _height: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let resource_path = format!("headers/1/{}.json", header_hash.to_string()); Ok(self.request_resource::(&resource_path).await?) - }) + } } fn get_block<'a>( &'a self, header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let resource_path = format!("block/{}.bin", header_hash.to_string()); Ok(BlockData::FullBlock( self.request_resource::(&resource_path).await?, )) - }) + } } - fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { - Box::pin( - async move { Ok(self.request_resource::("chaininfo.json").await?) }, - ) + fn get_best_block<'a>( + &'a self, + ) -> impl Future)>> + Send + 'a { + async move { Ok(self.request_resource::("chaininfo.json").await?) } } } impl UtxoSource for RestClient { fn get_block_hash_by_height<'a>( &'a self, block_height: u32, - ) -> AsyncBlockSourceResult<'a, BlockHash> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let resource_path = format!("blockhashbyheight/{}.bin", block_height); Ok(self.request_resource::(&resource_path).await?) - }) + } } - fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { - Box::pin(async move { + fn is_output_unspent<'a>( + &'a self, outpoint: OutPoint, + ) -> impl Future> + Send + 'a { + async move { let resource_path = format!("getutxos/{}-{}.json", outpoint.txid.to_string(), outpoint.vout); let utxo_result = self.request_resource::(&resource_path).await?; Ok(utxo_result.hit_bitmap_nonempty) - }) + } } } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 3df50a2267b..d851ba2ccf0 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -3,7 +3,7 @@ use crate::gossip::UtxoSource; use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; -use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; +use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; use bitcoin::OutPoint; @@ -16,6 +16,7 @@ use std::convert::TryFrom; use std::convert::TryInto; use std::error::Error; use std::fmt; +use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; /// An error returned by the RPC server. @@ -135,47 +136,51 @@ impl RpcClient { impl BlockSource for RpcClient { fn get_header<'a>( &'a self, header_hash: &'a BlockHash, _height: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let header_hash = serde_json::json!(header_hash.to_string()); Ok(self.call_method("getblockheader", &[header_hash]).await?) - }) + } } fn get_block<'a>( &'a self, header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let header_hash = serde_json::json!(header_hash.to_string()); let verbosity = serde_json::json!(0); Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?)) - }) + } } - fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { - Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) }) + fn get_best_block<'a>( + &'a self, + ) -> impl Future)>> + Send + 'a { + async move { Ok(self.call_method("getblockchaininfo", &[]).await?) } } } impl UtxoSource for RpcClient { fn get_block_hash_by_height<'a>( &'a self, block_height: u32, - ) -> AsyncBlockSourceResult<'a, BlockHash> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { let height_param = serde_json::json!(block_height); Ok(self.call_method("getblockhash", &[height_param]).await?) - }) + } } - fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { - Box::pin(async move { + fn is_output_unspent<'a>( + &'a self, outpoint: OutPoint, + ) -> impl Future> + Send + 'a { + async move { let txid_param = serde_json::json!(outpoint.txid.to_string()); let vout_param = serde_json::json!(outpoint.vout); let include_mempool = serde_json::json!(false); let utxo_opt: serde_json::Value = self.call_method("gettxout", &[txid_param, vout_param, include_mempool]).await?; Ok(!utxo_opt.is_null()) - }) + } } } diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index d307c4506eb..40788e4d08c 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,7 +1,6 @@ use crate::poll::{Validate, ValidatedBlockHeader}; use crate::{ - AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, - UnboundedCache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, UnboundedCache, }; use bitcoin::block::{Block, Header, Version}; @@ -17,6 +16,7 @@ use lightning::chain::BestBlock; use std::cell::RefCell; use std::collections::VecDeque; +use std::future::Future; #[derive(Default)] pub struct Blockchain { @@ -141,8 +141,8 @@ impl Blockchain { impl BlockSource for Blockchain { fn get_header<'a>( &'a self, header_hash: &'a BlockHash, _height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { if self.without_headers { return Err(BlockSourceError::persistent("header not found")); } @@ -158,13 +158,13 @@ impl BlockSource for Blockchain { } } Err(BlockSourceError::transient("header not found")) - }) + } } fn get_block<'a>( &'a self, header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { + ) -> impl Future> + Send + 'a { + async move { for (height, block) in self.blocks.iter().enumerate() { if block.header.block_hash() == *header_hash { if let Some(without_blocks) = &self.without_blocks { @@ -181,11 +181,13 @@ impl BlockSource for Blockchain { } } Err(BlockSourceError::transient("block not found")) - }) + } } - fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { - Box::pin(async move { + fn get_best_block<'a>( + &'a self, + ) -> impl Future)>> + Send + 'a { + async move { match self.blocks.last() { None => Err(BlockSourceError::transient("empty chain")), Some(block) => { @@ -193,7 +195,7 @@ impl BlockSource for Blockchain { Ok((block.block_hash(), Some(height))) }, } - }) + } } } diff --git a/lightning-invoice/src/lib.rs b/lightning-invoice/src/lib.rs index 47f929377de..60d413cf76a 100644 --- a/lightning-invoice/src/lib.rs +++ b/lightning-invoice/src/lib.rs @@ -40,7 +40,6 @@ use bitcoin::secp256k1::ecdsa::RecoverableSignature; use bitcoin::secp256k1::PublicKey; use bitcoin::secp256k1::{Message, Secp256k1}; -use alloc::boxed::Box; use alloc::string; use core::cmp::Ordering; use core::fmt::{self, Display, Formatter}; @@ -1081,8 +1080,8 @@ macro_rules! find_all_extract { #[allow(missing_docs)] impl RawBolt11Invoice { /// Hash the HRP (as bytes) and signatureless data part (as Fe32 iterator) - fn hash_from_parts<'s>( - hrp_bytes: &[u8], data_without_signature: Box + 's>, + fn hash_from_parts<'s, I: Iterator + 's>( + hrp_bytes: &[u8], data_without_signature: I, ) -> [u8; 32] { use crate::bech32::Fe32IterExt; use bitcoin::hashes::HashEngine; diff --git a/lightning-invoice/src/ser.rs b/lightning-invoice/src/ser.rs index 5c93fa84ae0..853accdd3ca 100644 --- a/lightning-invoice/src/ser.rs +++ b/lightning-invoice/src/ser.rs @@ -1,4 +1,3 @@ -use alloc::boxed::Box; use core::fmt; use core::fmt::{Display, Formatter}; use core::{array, iter}; @@ -13,14 +12,28 @@ use super::{ SignedRawBolt11Invoice, TaggedField, }; +macro_rules! define_iterator_enum { + ($name: ident, $($n: ident),*) => { + enum $name<$($n: Iterator,)*> { + $($n($n),)* + } + impl<$($n: Iterator,)*> Iterator for $name<$($n,)*> { + type Item = Fe32; + fn next(&mut self) -> Option { + match self { + $(Self::$n(iter) => iter.next(),)* + } + } + } + } +} + /// Objects that can be encoded to base32 (bech32). /// -/// Private to this crate to avoid polluting the API. +/// Private to this crate (except in fuzzing) to avoid polluting the API. pub trait Base32Iterable { - /// apoelstra: In future we want to replace this Box with an explicit - /// associated type, to avoid the allocation. But we cannot do this until - /// Rust 1.65 and GATs since the iterator may contain a reference to self. - fn fe_iter<'s>(&'s self) -> Box + 's>; + /// Serialize this object, returning an iterator over bech32 field elements. + fn fe_iter<'s>(&'s self) -> impl Iterator + 's; } /// Interface to calculate the length of the base32 representation before actually serializing @@ -32,7 +45,7 @@ pub(crate) trait Base32Len: Base32Iterable { // Base32Iterable & Base32Len implementations are here, because the traits are in this module. impl Base32Iterable for [u8; N] { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { self[..].fe_iter() } } @@ -45,8 +58,8 @@ impl Base32Len for [u8; N] { } impl Base32Iterable for [u8] { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.iter().copied().bytes_to_fes()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.iter().copied().bytes_to_fes() } } @@ -58,8 +71,8 @@ impl Base32Len for [u8] { } impl Base32Iterable for Vec { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.iter().copied().bytes_to_fes()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.iter().copied().bytes_to_fes() } } @@ -71,8 +84,8 @@ impl Base32Len for Vec { } impl Base32Iterable for PaymentSecret { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.0[..].fe_iter()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.0[..].fe_iter() } } @@ -88,7 +101,7 @@ impl Base32Iterable for Bolt11InvoiceFeatures { /// starting from the rightmost bit, /// and taking the resulting 5-bit values in reverse (left-to-right), /// with the leading 0's skipped. - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { // Fe32 conversion cannot be used, because this packs from right, right-to-left let mut input_iter = self.le_flags().iter(); // Carry bits, 0..7 bits @@ -126,7 +139,7 @@ impl Base32Iterable for Bolt11InvoiceFeatures { output.push(Fe32::try_from(next_out8 & 31u8).expect("<32")) } // Take result in reverse order, and skip leading 0s - Box::new(output.into_iter().rev().skip_while(|e| *e == Fe32::Q)) + output.into_iter().rev().skip_while(|e| *e == Fe32::Q) } } @@ -241,36 +254,35 @@ fn encoded_int_be_base32_size(int: u64) -> usize { } impl Base32Iterable for RawDataPart { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { let ts_iter = self.timestamp.fe_iter(); let fields_iter = self.tagged_fields.iter().map(RawTaggedField::fe_iter).flatten(); - Box::new(ts_iter.chain(fields_iter)) + ts_iter.chain(fields_iter) } } impl Base32Iterable for PositiveTimestamp { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { let fes = encode_int_be_base32(self.as_unix_timestamp()); debug_assert!(fes.len() <= 7, "Invalid timestamp length"); let to_pad = 7 - fes.len(); - Box::new(core::iter::repeat(Fe32::Q).take(to_pad).chain(fes)) + core::iter::repeat(Fe32::Q).take(to_pad).chain(fes) } } impl Base32Iterable for RawTaggedField { - fn fe_iter<'s>(&'s self) -> Box + 's> { - // Annoyingly, when we move to explicit types, we will need an - // explicit enum holding the two iterator variants. + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + define_iterator_enum!(TwoIters, A, B); match *self { - RawTaggedField::UnknownSemantics(ref content) => Box::new(content.iter().copied()), - RawTaggedField::KnownSemantics(ref tagged_field) => tagged_field.fe_iter(), + RawTaggedField::UnknownSemantics(ref content) => TwoIters::A(content.iter().copied()), + RawTaggedField::KnownSemantics(ref tagged_field) => TwoIters::B(tagged_field.fe_iter()), } } } impl Base32Iterable for Sha256 { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.0[..].fe_iter()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.0[..].fe_iter() } } @@ -281,8 +293,8 @@ impl Base32Len for Sha256 { } impl Base32Iterable for Description { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.0 .0.as_bytes().fe_iter()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.0 .0.as_bytes().fe_iter() } } @@ -293,8 +305,8 @@ impl Base32Len for Description { } impl Base32Iterable for PayeePubKey { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(self.serialize().into_iter().bytes_to_fes()) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + self.serialize().into_iter().bytes_to_fes() } } @@ -305,8 +317,8 @@ impl Base32Len for PayeePubKey { } impl Base32Iterable for ExpiryTime { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(encode_int_be_base32(self.as_seconds())) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + encode_int_be_base32(self.as_seconds()) } } @@ -317,8 +329,8 @@ impl Base32Len for ExpiryTime { } impl Base32Iterable for MinFinalCltvExpiryDelta { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(encode_int_be_base32(self.0)) + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + encode_int_be_base32(self.0) } } @@ -329,8 +341,8 @@ impl Base32Len for MinFinalCltvExpiryDelta { } impl Base32Iterable for Fallback { - fn fe_iter<'s>(&'s self) -> Box + 's> { - Box::new(match *self { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { + match *self { Fallback::SegWitProgram { version: v, program: ref p } => { let v = Fe32::try_from(v.to_num()).expect("valid version"); core::iter::once(v).chain(p[..].fe_iter()) @@ -343,7 +355,7 @@ impl Base32Iterable for Fallback { // 18 'J' core::iter::once(Fe32::J).chain(hash[..].fe_iter()) }, - }) + } } } @@ -371,7 +383,7 @@ type RouteHintHopIter = iter::Chain< >; impl Base32Iterable for PrivateRoute { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { fn serialize_to_iter(hop: &RouteHintHop) -> RouteHintHopIter { let i1 = hop.src_node_id.serialize().into_iter(); let i2 = u64::to_be_bytes(hop.short_channel_id).into_iter(); @@ -381,7 +393,7 @@ impl Base32Iterable for PrivateRoute { i1.chain(i2).chain(i3).chain(i4).chain(i5) } - Box::new(self.0 .0.iter().map(serialize_to_iter).flatten().bytes_to_fes()) + self.0 .0.iter().map(serialize_to_iter).flatten().bytes_to_fes() } } @@ -391,16 +403,11 @@ impl Base32Len for PrivateRoute { } } -// Shorthand type -type TaggedFieldIter = core::iter::Chain, I>; - impl Base32Iterable for TaggedField { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { /// Writes a tagged field: tag, length and data. `tag` should be in `0..32` otherwise the /// function will panic. - fn write_tagged_field<'s, P>( - tag: u8, payload: &'s P, - ) -> TaggedFieldIter + 's>> + fn write_tagged_field<'s, P>(tag: u8, payload: &'s P) -> impl Iterator + 's where P: Base32Iterable + Base32Len + ?Sized, { @@ -416,54 +423,49 @@ impl Base32Iterable for TaggedField { .chain(payload.fe_iter()) } - // we will also need a giant enum for this - Box::new(match *self { + define_iterator_enum!(ManyIters, A, B, C, D, E, F, G, H, I, J, K); + match *self { TaggedField::PaymentHash(ref hash) => { - write_tagged_field(constants::TAG_PAYMENT_HASH, hash) + ManyIters::A(write_tagged_field(constants::TAG_PAYMENT_HASH, hash)) }, TaggedField::Description(ref description) => { - write_tagged_field(constants::TAG_DESCRIPTION, description) + ManyIters::B(write_tagged_field(constants::TAG_DESCRIPTION, description)) }, TaggedField::PayeePubKey(ref pub_key) => { - write_tagged_field(constants::TAG_PAYEE_PUB_KEY, pub_key) + ManyIters::C(write_tagged_field(constants::TAG_PAYEE_PUB_KEY, pub_key)) }, TaggedField::DescriptionHash(ref hash) => { - write_tagged_field(constants::TAG_DESCRIPTION_HASH, hash) + ManyIters::D(write_tagged_field(constants::TAG_DESCRIPTION_HASH, hash)) }, TaggedField::ExpiryTime(ref duration) => { - write_tagged_field(constants::TAG_EXPIRY_TIME, duration) + ManyIters::E(write_tagged_field(constants::TAG_EXPIRY_TIME, duration)) }, TaggedField::MinFinalCltvExpiryDelta(ref expiry) => { - write_tagged_field(constants::TAG_MIN_FINAL_CLTV_EXPIRY_DELTA, expiry) + ManyIters::F(write_tagged_field(constants::TAG_MIN_FINAL_CLTV_EXPIRY_DELTA, expiry)) }, TaggedField::Fallback(ref fallback_address) => { - write_tagged_field(constants::TAG_FALLBACK, fallback_address) + ManyIters::G(write_tagged_field(constants::TAG_FALLBACK, fallback_address)) }, TaggedField::PrivateRoute(ref route_hops) => { - write_tagged_field(constants::TAG_PRIVATE_ROUTE, route_hops) + ManyIters::H(write_tagged_field(constants::TAG_PRIVATE_ROUTE, route_hops)) }, TaggedField::PaymentSecret(ref payment_secret) => { - write_tagged_field(constants::TAG_PAYMENT_SECRET, payment_secret) + ManyIters::I(write_tagged_field(constants::TAG_PAYMENT_SECRET, payment_secret)) }, TaggedField::PaymentMetadata(ref payment_metadata) => { - write_tagged_field(constants::TAG_PAYMENT_METADATA, payment_metadata) + ManyIters::J(write_tagged_field(constants::TAG_PAYMENT_METADATA, payment_metadata)) }, TaggedField::Features(ref features) => { - write_tagged_field(constants::TAG_FEATURES, features) + ManyIters::K(write_tagged_field(constants::TAG_FEATURES, features)) }, - }) + } } } impl Base32Iterable for Bolt11InvoiceSignature { - fn fe_iter<'s>(&'s self) -> Box + 's> { + fn fe_iter<'s>(&'s self) -> impl Iterator + 's { let (recovery_id, signature) = self.0.serialize_compact(); - Box::new( - signature - .into_iter() - .chain(core::iter::once(recovery_id.to_i32() as u8)) - .bytes_to_fes(), - ) + signature.into_iter().chain(core::iter::once(recovery_id.to_i32() as u8)).bytes_to_fes() } } diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index a6736e63eef..5c4bd63bc48 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -9,7 +9,6 @@ //! Contains the main bLIP-52 / LSPS2 server-side object, [`LSPS2ServiceHandler`]. -use alloc::boxed::Box; use alloc::string::{String, ToString}; use alloc::vec::Vec; use lightning::util::persist::KVStore; @@ -17,6 +16,7 @@ use lightning::util::persist::KVStore; use core::cmp::Ordering as CmpOrdering; use core::future::Future as StdFuture; use core::ops::Deref; +use core::pin::pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task; @@ -2173,7 +2173,7 @@ where &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64, cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128, ) -> Result<(), APIError> { - let mut fut = Box::pin(self.inner.invoice_parameters_generated( + let mut fut = pin!(self.inner.invoice_parameters_generated( counterparty_node_id, request_id, intercept_scid, @@ -2202,7 +2202,7 @@ where &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64, payment_hash: PaymentHash, ) -> Result<(), APIError> { - let mut fut = Box::pin(self.inner.htlc_intercepted( + let mut fut = pin!(self.inner.htlc_intercepted( intercept_scid, intercept_id, expected_outbound_amount_msat, @@ -2228,7 +2228,7 @@ where pub fn htlc_handling_failed( &self, failure_type: HTLCHandlingFailureType, ) -> Result<(), APIError> { - let mut fut = Box::pin(self.inner.htlc_handling_failed(failure_type)); + let mut fut = pin!(self.inner.htlc_handling_failed(failure_type)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); @@ -2249,7 +2249,7 @@ where pub fn payment_forwarded( &self, next_channel_id: ChannelId, skimmed_fee_msat: u64, ) -> Result<(), APIError> { - let mut fut = Box::pin(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat)); + let mut fut = pin!(self.inner.payment_forwarded(next_channel_id, skimmed_fee_msat)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); @@ -2290,7 +2290,7 @@ where &self, counterparty_node_id: &PublicKey, user_channel_id: u128, ) -> Result<(), APIError> { let mut fut = - Box::pin(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id)); + pin!(self.inner.channel_open_abandoned(counterparty_node_id, user_channel_id)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); @@ -2309,8 +2309,7 @@ where pub fn channel_open_failed( &self, counterparty_node_id: &PublicKey, user_channel_id: u128, ) -> Result<(), APIError> { - let mut fut = - Box::pin(self.inner.channel_open_failed(counterparty_node_id, user_channel_id)); + let mut fut = pin!(self.inner.channel_open_failed(counterparty_node_id, user_channel_id)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); @@ -2332,7 +2331,7 @@ where &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { let mut fut = - Box::pin(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id)); + pin!(self.inner.channel_ready(user_channel_id, channel_id, counterparty_node_id)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index f0143fc624f..d3822715b8d 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -7,7 +7,6 @@ // You may not use this file except in accordance with one or both of these // licenses. -use alloc::boxed::Box; use alloc::string::ToString; use alloc::vec::Vec; @@ -61,6 +60,7 @@ use bitcoin::secp256k1::PublicKey; use core::future::Future as StdFuture; use core::ops::Deref; +use core::pin::pin; use core::task; const LSPS_FEATURE_BIT: usize = 729; @@ -1106,7 +1106,7 @@ where ) -> Result { let kv_store = KVStoreSyncWrapper(kv_store_sync); - let mut fut = Box::pin(LiquidityManager::new( + let mut fut = pin!(LiquidityManager::new( entropy_source, node_signer, channel_manager, @@ -1159,7 +1159,7 @@ where client_config: Option, time_provider: TP, ) -> Result { let kv_store = KVStoreSyncWrapper(kv_store_sync); - let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider( + let mut fut = pin!(LiquidityManager::new_with_custom_time_provider( entropy_source, node_signer, channel_manager, @@ -1289,7 +1289,7 @@ where pub fn persist(&self) -> Result<(), lightning::io::Error> { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) { + match pin!(self.inner.persist()).as_mut().poll(&mut ctx) { task::Poll::Ready(result) => result, task::Poll::Pending => { // In a sync context, we can't wait for the future to complete. diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 068f77a84bb..c6fbd3dc3c5 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -43,7 +43,7 @@ use std::hash::Hash; use std::net::SocketAddr; use std::net::TcpStream as StdTcpStream; use std::ops::Deref; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{self, Poll}; @@ -205,18 +205,17 @@ impl Connection { } us_lock.read_paused }; - // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support. let select_result = if read_paused { TwoSelector { - a: Box::pin(write_avail_receiver.recv()), - b: Box::pin(read_wake_receiver.recv()), + a: pin!(write_avail_receiver.recv()), + b: pin!(read_wake_receiver.recv()), } .await } else { ThreeSelector { - a: Box::pin(write_avail_receiver.recv()), - b: Box::pin(read_wake_receiver.recv()), - c: Box::pin(reader.readable()), + a: pin!(write_avail_receiver.recv()), + b: pin!(read_wake_receiver.recv()), + c: pin!(reader.readable()), } .await }; diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 9b15398d4d1..b2d327f6bc1 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -14,8 +14,6 @@ use std::sync::{Arc, Mutex, RwLock}; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] -use core::pin::Pin; -#[cfg(feature = "tokio")] use lightning::util::persist::KVStore; #[cfg(target_os = "windows")] @@ -464,93 +462,85 @@ impl FilesystemStoreInner { impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, lightning::io::Error>> + 'static + Send>> { + ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); - let path = match this.get_checked_dest_file_path( + let path = this.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), "read", - ) { - Ok(path) => path, - Err(e) => return Box::pin(async move { Err(e) }), - }; + ); - Box::pin(async move { + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) }) - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + 'static + Send>> { + ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = match this.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "write", - ) { - Ok(path) => path, - Err(e) => return Box::pin(async move { Err(e) }), - }; - - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - Box::pin(async move { + let path = this + .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + + async move { + let ((inner_lock_ref, version), path) = match path { + Ok(res) => res, + Err(e) => return Err(e), + }; tokio::task::spawn_blocking(move || { this.write_version(inner_lock_ref, path, buf, version) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Pin> + 'static + Send>> { + ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = match this.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "remove", - ) { - Ok(path) => path, - Err(e) => return Box::pin(async move { Err(e) }), - }; - - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - Box::pin(async move { + let path = this + .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + + async move { + let ((inner_lock_ref, version), path) = match path { + Ok(res) => res, + Err(e) => return Err(e), + }; tokio::task::spawn_blocking(move || { this.remove_version(inner_lock_ref, path, lazy, version) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, lightning::io::Error>> + 'static + Send>> { + ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); - let path = match this.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - None, - "list", - ) { - Ok(path) => path, - Err(e) => return Box::pin(async move { Err(e) }), - }; + let path = + this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); - Box::pin(async move { + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) }) - }) + } } } @@ -758,24 +748,24 @@ mod tests { let fs_store = Arc::new(FilesystemStore::new(temp_path)); assert_eq!(fs_store.state_size(), 0); - let async_fs_store: Arc = fs_store.clone(); + let async_fs_store = Arc::clone(&fs_store); let data1 = vec![42u8; 32]; let data2 = vec![43u8; 32]; - let primary_namespace = "testspace"; - let secondary_namespace = "testsubspace"; + let primary = "testspace"; + let secondary = "testsubspace"; let key = "testkey"; // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure // that eventual consistency works. - let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1); + let fut1 = KVStore::write(&*async_fs_store, primary, secondary, key, data1); assert_eq!(fs_store.state_size(), 1); - let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false); + let fut2 = KVStore::remove(&*async_fs_store, primary, secondary, key, false); assert_eq!(fs_store.state_size(), 1); - let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); + let fut3 = KVStore::write(&*async_fs_store, primary, secondary, key, data2.clone()); assert_eq!(fs_store.state_size(), 1); fut3.await.unwrap(); @@ -788,21 +778,18 @@ mod tests { assert_eq!(fs_store.state_size(), 0); // Test list. - let listed_keys = - async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); assert_eq!(listed_keys.len(), 1); assert_eq!(listed_keys[0], key); // Test read. We expect to read data2, as the write call was initiated later. - let read_data = - async_fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap(); + let read_data = KVStore::read(&*async_fs_store, primary, secondary, key).await.unwrap(); assert_eq!(data2, &*read_data); // Test remove. - async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap(); + KVStore::remove(&*async_fs_store, primary, secondary, key, false).await.unwrap(); - let listed_keys = - async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); assert_eq!(listed_keys.len(), 0); } diff --git a/lightning/src/events/bump_transaction/mod.rs b/lightning/src/events/bump_transaction/mod.rs index 3d9beb82c07..e141d9b8abc 100644 --- a/lightning/src/events/bump_transaction/mod.rs +++ b/lightning/src/events/bump_transaction/mod.rs @@ -14,6 +14,7 @@ pub mod sync; use alloc::collections::BTreeMap; +use core::future::Future; use core::ops::Deref; use crate::chain::chaininterface::{ @@ -36,7 +37,7 @@ use crate::sign::{ ChannelDerivationParameters, HTLCDescriptor, SignerProvider, P2WPKH_WITNESS_WEIGHT, }; use crate::sync::Mutex; -use crate::util::async_poll::{AsyncResult, MaybeSend, MaybeSync}; +use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::logger::Logger; use bitcoin::amount::Amount; @@ -394,13 +395,15 @@ pub trait CoinSelectionSource { fn select_confirmed_utxos<'a>( &'a self, claim_id: ClaimId, must_spend: Vec, must_pay_to: &'a [TxOut], target_feerate_sat_per_1000_weight: u32, max_tx_weight: u64, - ) -> AsyncResult<'a, CoinSelection, ()>; + ) -> impl Future> + MaybeSend + 'a; /// Signs and provides the full witness for all inputs within the transaction known to the /// trait (i.e., any provided via [`CoinSelectionSource::select_confirmed_utxos`]). /// /// If your wallet does not support signing PSBTs you can call `psbt.extract_tx()` to get the /// unsigned transaction and then sign it with your wallet. - fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction, ()>; + fn sign_psbt<'a>( + &'a self, psbt: Psbt, + ) -> impl Future> + MaybeSend + 'a; } /// An alternative to [`CoinSelectionSource`] that can be implemented and used along [`Wallet`] to @@ -412,17 +415,23 @@ pub trait CoinSelectionSource { // Note that updates to documentation on this trait should be copied to the synchronous version. pub trait WalletSource { /// Returns all UTXOs, with at least 1 confirmation each, that are available to spend. - fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec, ()>; + fn list_confirmed_utxos<'a>( + &'a self, + ) -> impl Future, ()>> + MaybeSend + 'a; /// Returns a script to use for change above dust resulting from a successful coin selection /// attempt. - fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()>; + fn get_change_script<'a>( + &'a self, + ) -> impl Future> + MaybeSend + 'a; /// Signs and provides the full [`TxIn::script_sig`] and [`TxIn::witness`] for all inputs within /// the transaction known to the wallet (i.e., any provided via /// [`WalletSource::list_confirmed_utxos`]). /// /// If your wallet does not support signing PSBTs you can call `psbt.extract_tx()` to get the /// unsigned transaction and then sign it with your wallet. - fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction, ()>; + fn sign_psbt<'a>( + &'a self, psbt: Psbt, + ) -> impl Future> + MaybeSend + 'a; } /// A wrapper over [`WalletSource`] that implements [`CoinSelectionSource`] by preferring UTXOs @@ -617,8 +626,8 @@ where fn select_confirmed_utxos<'a>( &'a self, claim_id: ClaimId, must_spend: Vec, must_pay_to: &'a [TxOut], target_feerate_sat_per_1000_weight: u32, max_tx_weight: u64, - ) -> AsyncResult<'a, CoinSelection, ()> { - Box::pin(async move { + ) -> impl Future> + MaybeSend + 'a { + async move { let utxos = self.source.list_confirmed_utxos().await?; // TODO: Use fee estimation utils when we upgrade to bitcoin v0.30.0. let total_output_size: u64 = must_pay_to @@ -665,10 +674,12 @@ where } } Err(()) - }) + } } - fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction, ()> { + fn sign_psbt<'a>( + &'a self, psbt: Psbt, + ) -> impl Future> + MaybeSend + 'a { self.source.sign_psbt(psbt) } } diff --git a/lightning/src/events/bump_transaction/sync.rs b/lightning/src/events/bump_transaction/sync.rs index 653710a3358..1328c2c1b3a 100644 --- a/lightning/src/events/bump_transaction/sync.rs +++ b/lightning/src/events/bump_transaction/sync.rs @@ -11,13 +11,14 @@ use core::future::Future; use core::ops::Deref; +use core::pin::pin; use core::task; use crate::chain::chaininterface::BroadcasterInterface; use crate::chain::ClaimId; use crate::prelude::*; use crate::sign::SignerProvider; -use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync}; +use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; use crate::util::logger::Logger; use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut}; @@ -71,19 +72,25 @@ impl WalletSource for WalletSourceSyncWrapper where T::Target: WalletSourceSync, { - fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec, ()> { + fn list_confirmed_utxos<'a>( + &'a self, + ) -> impl Future, ()>> + MaybeSend + 'a { let utxos = self.0.list_confirmed_utxos(); - Box::pin(async move { utxos }) + async move { utxos } } - fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()> { + fn get_change_script<'a>( + &'a self, + ) -> impl Future> + MaybeSend + 'a { let script = self.0.get_change_script(); - Box::pin(async move { script }) + async move { script } } - fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction, ()> { + fn sign_psbt<'a>( + &'a self, psbt: Psbt, + ) -> impl Future> + MaybeSend + 'a { let signed_psbt = self.0.sign_psbt(psbt); - Box::pin(async move { signed_psbt }) + async move { signed_psbt } } } @@ -122,7 +129,7 @@ where &self, claim_id: ClaimId, must_spend: Vec, must_pay_to: &[TxOut], target_feerate_sat_per_1000_weight: u32, max_tx_weight: u64, ) -> Result { - let mut fut = self.wallet.select_confirmed_utxos( + let fut = self.wallet.select_confirmed_utxos( claim_id, must_spend, must_pay_to, @@ -131,7 +138,7 @@ where ); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match fut.as_mut().poll(&mut ctx) { + match pin!(fut).poll(&mut ctx) { task::Poll::Ready(result) => result, task::Poll::Pending => { unreachable!( @@ -142,10 +149,10 @@ where } fn sign_psbt(&self, psbt: Psbt) -> Result { - let mut fut = self.wallet.sign_psbt(psbt); + let fut = self.wallet.sign_psbt(psbt); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match fut.as_mut().poll(&mut ctx) { + match pin!(fut).poll(&mut ctx) { task::Poll::Ready(result) => result, task::Poll::Pending => { unreachable!("Wallet::sign_psbt should not be pending in a sync context"); @@ -233,7 +240,7 @@ where fn select_confirmed_utxos<'a>( &'a self, claim_id: ClaimId, must_spend: Vec, must_pay_to: &'a [TxOut], target_feerate_sat_per_1000_weight: u32, max_tx_weight: u64, - ) -> AsyncResult<'a, CoinSelection, ()> { + ) -> impl Future> + MaybeSend + 'a { let coins = self.0.select_confirmed_utxos( claim_id, must_spend, @@ -241,12 +248,14 @@ where target_feerate_sat_per_1000_weight, max_tx_weight, ); - Box::pin(async move { coins }) + async move { coins } } - fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction, ()> { + fn sign_psbt<'a>( + &'a self, psbt: Psbt, + ) -> impl Future> + MaybeSend + 'a { let psbt = self.0.sign_psbt(psbt); - Box::pin(async move { psbt }) + async move { psbt } } } @@ -289,7 +298,7 @@ where /// Handles all variants of [`BumpTransactionEvent`]. pub fn handle_event(&self, event: &BumpTransactionEvent) { - let mut fut = Box::pin(self.bump_transaction_event_handler.handle_event(event)); + let mut fut = pin!(self.bump_transaction_event_handler.handle_event(event)); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match fut.as_mut().poll(&mut ctx) { diff --git a/lightning/src/offers/offer.rs b/lightning/src/offers/offer.rs index cbea8e34d08..59589f4e72e 100644 --- a/lightning/src/offers/offer.rs +++ b/lightning/src/offers/offer.rs @@ -258,6 +258,7 @@ macro_rules! offer_explicit_metadata_builder_methods { /// Sets the [`Offer::metadata`] to the given bytes. /// /// Successive calls to this method will override the previous setting. + #[allow(unused_mut)] pub fn metadata( mut $self: $self_type, metadata: Vec, ) -> Result<$return_type, Bolt12SemanticError> { diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 1d771d22783..6d0d5bf405a 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -58,7 +58,7 @@ use crate::ln::script::ShutdownScript; use crate::offers::invoice::UnsignedBolt12Invoice; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; -use crate::util::async_poll::AsyncResult; +use crate::util::async_poll::MaybeSend; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; @@ -68,7 +68,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner; #[cfg(taproot)] use crate::sign::taproot::TaprootChannelSigner; use crate::util::atomic_counter::AtomicCounter; + use core::convert::TryInto; +use core::future::Future; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; #[cfg(taproot)] @@ -1066,7 +1068,9 @@ pub trait ChangeDestinationSource { /// /// This method should return a different value each time it is called, to avoid linking /// on-chain funds controlled to the same user. - fn get_change_destination_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()>; + fn get_change_destination_script<'a>( + &'a self, + ) -> impl Future> + MaybeSend + 'a; } /// A synchronous helper trait that describes an on-chain wallet capable of returning a (change) destination script. @@ -1101,9 +1105,11 @@ impl ChangeDestinationSource for ChangeDestinationSourceSyncWrapper where T::Target: ChangeDestinationSourceSync, { - fn get_change_destination_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()> { + fn get_change_destination_script<'a>( + &'a self, + ) -> impl Future> + MaybeSend + 'a { let script = self.0.get_change_destination_script(); - Box::pin(async move { script }) + async move { script } } } diff --git a/lightning/src/sync/debug_sync.rs b/lightning/src/sync/debug_sync.rs index c8fb212a58b..4448a36a9b3 100644 --- a/lightning/src/sync/debug_sync.rs +++ b/lightning/src/sync/debug_sync.rs @@ -16,6 +16,7 @@ use parking_lot::Condvar as StdCondvar; use parking_lot::Mutex as StdMutex; use parking_lot::MutexGuard as StdMutexGuard; +#[cfg(feature = "std")] pub use parking_lot::WaitTimeoutResult; use crate::prelude::*; @@ -56,7 +57,6 @@ impl Condvar { Ok(MutexGuard { mutex, lock: Some(lock) }) } - #[allow(unused)] pub fn wait_timeout_while<'a, T, F: FnMut(&mut T) -> bool>( &'a self, guard: MutexGuard<'a, T>, dur: Duration, condition: F, ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index eefa40d1055..9c2ca4c247f 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -9,7 +9,6 @@ //! Some utilities to make working with the standard library's [`Future`]s easier -use alloc::boxed::Box; use alloc::vec::Vec; use core::future::Future; use core::marker::Unpin; @@ -92,17 +91,6 @@ pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } -#[cfg(feature = "std")] -/// A type alias for a future that returns a result of type `T` or error `E`. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub type AsyncResult<'a, T, E> = Pin> + 'a + Send>>; -#[cfg(not(feature = "std"))] -/// A type alias for a future that returns a result of type `T` or error `E`. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub type AsyncResult<'a, T, E> = Pin> + 'a>>; - /// Marker trait to optionally implement `Sync` under std. /// /// This is not exported to bindings users as async is only supported in Rust. diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index d00e29e686a..7feb781a57a 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -19,7 +19,7 @@ use bitcoin::{BlockHash, Txid}; use core::future::Future; use core::mem; use core::ops::Deref; -use core::pin::Pin; +use core::pin::{pin, Pin}; use core::str::FromStr; use core::task; @@ -34,7 +34,7 @@ use crate::chain::transaction::OutPoint; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; -use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync}; +use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; use crate::util::logger::Logger; use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -216,34 +216,34 @@ where { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> AsyncResult<'static, Vec, io::Error> { + ) -> impl Future, io::Error>> + 'static + MaybeSend { let res = self.0.read(primary_namespace, secondary_namespace, key); - Box::pin(async move { res }) + async move { res } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> AsyncResult<'static, (), io::Error> { + ) -> impl Future> + 'static + MaybeSend { let res = self.0.write(primary_namespace, secondary_namespace, key, buf); - Box::pin(async move { res }) + async move { res } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> AsyncResult<'static, (), io::Error> { + ) -> impl Future> + 'static + MaybeSend { let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); - Box::pin(async move { res }) + async move { res } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> AsyncResult<'static, Vec, io::Error> { + ) -> impl Future, io::Error>> + 'static + MaybeSend { let res = self.0.list(primary_namespace, secondary_namespace); - Box::pin(async move { res }) + async move { res } } } @@ -283,16 +283,18 @@ pub trait KVStore { /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> AsyncResult<'static, Vec, io::Error>; + ) -> impl Future, io::Error>> + 'static + MaybeSend; /// Persists the given data under the given `key`. /// - /// The order of multiple writes to the same key needs to be retained while persisting - /// asynchronously. In other words, if two writes to the same key occur, the state (as seen by - /// [`Self::read`]) must either see the first write then the second, or only ever the second, - /// no matter when the futures complete (and must always contain the second write once the - /// second future completes). The state should never contain the first write after the second - /// write's future completes, nor should it contain the second write, then contain the first - /// write at any point thereafter (even if the second write's future hasn't yet completed). + /// Note that this is *not* an `async fn`. Rather, the order of multiple writes to the same key + /// (as defined by the order of the synchronous function calls) needs to be retained while + /// persisting asynchronously. In other words, if two writes to the same key occur, the state + /// (as seen by [`Self::read`]) must either see the first write then the second, or only ever + /// the second, no matter when the futures complete (and must always contain the second write + /// once the second future completes). The state should never contain the first write after the + /// second write's future completes, nor should it contain the second write, then contain the + /// first write at any point thereafter (even if the second write's future hasn't yet + /// completed). /// /// One way to ensure this requirement is met is by assigning a version number to each write /// before returning the future, and then during asynchronous execution, ensuring that the @@ -303,7 +305,7 @@ pub trait KVStore { /// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store. fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> AsyncResult<'static, (), io::Error>; + ) -> impl Future> + 'static + MaybeSend; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily @@ -311,6 +313,10 @@ pub trait KVStore { /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to /// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted. /// + /// Note that similar to [`Self::write`] this is *not* an `async fn`, but rather a sync fn + /// which defines the order of writes to a given key, but which may complete its operation + /// asynchronously. + /// /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could /// potentially get lost on crash after the method returns. Therefore, this flag should only be @@ -321,12 +327,13 @@ pub trait KVStore { /// to the same key which occur before a removal completes must cancel/overwrite the pending /// removal. /// + /// /// Returns successfully if no data will be stored for the given `primary_namespace`, /// `secondary_namespace`, and `key`, independently of whether it was present before its /// invokation or not. fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> AsyncResult<'static, (), io::Error>; + ) -> impl Future> + 'static + MaybeSend; /// Returns a list of keys that are stored under the given `secondary_namespace` in /// `primary_namespace`. /// @@ -334,7 +341,7 @@ pub trait KVStore { /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> AsyncResult<'static, Vec, io::Error>; + ) -> impl Future, io::Error>> + 'static + MaybeSend; } /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] @@ -490,8 +497,7 @@ impl FutureSpawner for PanicingSpawner { fn poll_sync_future(future: F) -> F::Output { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - // TODO A future MSRV bump to 1.68 should allow for the pin macro - match Pin::new(&mut Box::pin(future)).poll(&mut ctx) { + match pin!(future).poll(&mut ctx) { task::Poll::Ready(result) => result, task::Poll::Pending => { // In a sync context, we can't wait for the future to complete. @@ -1006,6 +1012,9 @@ where } } +trait MaybeSendableFuture: Future> + MaybeSend {} +impl> + MaybeSend> MaybeSendableFuture for F {} + impl MonitorUpdatingPersisterAsyncInner where @@ -1179,9 +1188,9 @@ where Ok(()) } - fn persist_new_channel( - &self, monitor_name: MonitorName, monitor: &ChannelMonitor, - ) -> impl Future> { + fn persist_new_channel<'a, ChannelSigner: EcdsaChannelSigner>( + &'a self, monitor_name: MonitorName, monitor: &'a ChannelMonitor, + ) -> Pin> + 'static>> { // Determine the proper key for this monitor let monitor_key = monitor_name.to_string(); // Serialize and write the new monitor @@ -1200,7 +1209,10 @@ where // completion of the write. This ensures monitor persistence ordering is preserved. let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes) + // There's no real reason why this needs to be boxed, but dropping it rams into the "hidden + // type for impl... captures lifetime that does not appear in bounds" issue. This can + // trivially be dropped once we upgrade to edition 2024/MSRV 1.85. + Box::pin(self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)) } fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( @@ -1226,12 +1238,10 @@ where // write method, allowing it to do its queueing immediately, and then return a // future for the completion of the write. This ensures monitor persistence // ordering is preserved. - res_a = Some(self.kv_store.write( - primary, - &monitor_key, - update_name.as_str(), - update.encode(), - )); + let encoded = update.encode(); + res_a = Some(async move { + self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await + }); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. // Note that this is NOT an async function, but rather calls the *sync* KVStore diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 5a1ffad3e04..bf048efdae1 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -35,10 +35,11 @@ use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid}; use core::future::Future; use core::ops::Deref; +use core::pin::{pin, Pin}; use core::sync::atomic::{AtomicBool, Ordering}; use core::task; -use super::async_poll::{dummy_waker, AsyncResult}; +use super::async_poll::dummy_waker; /// The number of blocks we wait before we prune the tracked spendable outputs. pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY; @@ -609,15 +610,32 @@ where sweeper_state.dirty = true; } - fn persist_state<'a>(&self, sweeper_state: &SweeperState) -> AsyncResult<'a, (), io::Error> { + #[cfg(feature = "std")] + fn persist_state<'a>( + &'a self, sweeper_state: &SweeperState, + ) -> Pin> + Send + 'static>> { let encoded = sweeper_state.encode(); - self.kv_store.write( + Box::pin(self.kv_store.write( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, encoded, - ) + )) + } + + #[cfg(not(feature = "std"))] + fn persist_state<'a>( + &'a self, sweeper_state: &SweeperState, + ) -> Pin> + 'static>> { + let encoded = sweeper_state.encode(); + + Box::pin(self.kv_store.write( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + encoded, + )) } /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty, @@ -970,7 +988,7 @@ where &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { - let mut fut = Box::pin(self.sweeper.track_spendable_outputs( + let mut fut = pin!(self.sweeper.track_spendable_outputs( output_descriptors, channel_id, exclude_static_outputs, @@ -1005,7 +1023,7 @@ where /// /// Wraps [`OutputSweeper::regenerate_and_broadcast_spend_if_necessary`]. pub fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> { - let mut fut = Box::pin(self.sweeper.regenerate_and_broadcast_spend_if_necessary()); + let mut fut = pin!(self.sweeper.regenerate_and_broadcast_spend_if_necessary()); let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match fut.as_mut().poll(&mut ctx) { diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index ad8ea224205..b4db17bee20 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -50,7 +50,7 @@ use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; -use crate::util::async_poll::AsyncResult; +use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -1012,13 +1012,13 @@ impl TestStore { impl KVStore for TestStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> AsyncResult<'static, Vec, io::Error> { + ) -> impl Future, io::Error>> + 'static + MaybeSend { let res = self.read_internal(&primary_namespace, &secondary_namespace, &key); - Box::pin(async move { res }) + async move { res } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> AsyncResult<'static, (), io::Error> { + ) -> impl Future> + 'static + MaybeSend { let path = format!("{primary_namespace}/{secondary_namespace}/{key}"); let future = Arc::new(Mutex::new((None, None))); @@ -1027,19 +1027,19 @@ impl KVStore for TestStore { let new_id = pending_writes.last().map(|(id, _, _)| id + 1).unwrap_or(0); pending_writes.push((new_id, Arc::clone(&future), buf)); - Box::pin(OneShotChannel(future)) + OneShotChannel(future) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> AsyncResult<'static, (), io::Error> { + ) -> impl Future> + 'static + MaybeSend { let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy); - Box::pin(async move { res }) + async move { res } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> AsyncResult<'static, Vec, io::Error> { + ) -> impl Future, io::Error>> + 'static + MaybeSend { let res = self.list_internal(primary_namespace, secondary_namespace); - Box::pin(async move { res }) + async move { res } } }