Skip to content

Commit 6ec40dc

Browse files
committed
refactoring
1 parent d513f18 commit 6ec40dc

2 files changed

Lines changed: 100 additions & 138 deletions

File tree

mutiny-core/src/nodemanager.rs

Lines changed: 6 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ use crate::ldkstorage::CHANNEL_CLOSURE_PREFIX;
33
use crate::logging::LOGGING_KEY;
44
use crate::lsp::voltage;
55
use crate::messagehandler::{CommonLnEvent, CommonLnEventCallback};
6-
use crate::onchain::RESTORE_SYNC_STOP_GAP;
76
use crate::peermanager::PeerManager;
8-
use crate::utils::now;
97
use crate::utils::sleep;
108
use crate::MutinyInvoice;
119
use crate::MutinyWalletConfig;
@@ -26,15 +24,12 @@ use crate::{
2624
use crate::{gossip::*, scorer::HubPreferentialScorer};
2725
use crate::{
2826
node::NodeBuilder,
29-
storage::{
30-
IndexItem, MutinyStorage, DEVICE_ID_KEY, KEYCHAIN_STORE_KEY, NEED_FULL_SYNC_KEY,
31-
ONCHAIN_PREFIX,
32-
},
27+
storage::{MutinyStorage, DEVICE_ID_KEY, KEYCHAIN_STORE_KEY, NEED_FULL_SYNC_KEY},
3328
};
3429
use anyhow::anyhow;
3530
use async_lock::RwLock;
3631
use bdk_chain::{BlockId, ConfirmationTime};
37-
use bdk_wallet::{ChangeSet, KeychainKind, LocalOutput};
32+
use bdk_wallet::{KeychainKind, LocalOutput};
3833
use bitcoin::address::NetworkUnchecked;
3934
use bitcoin::bip32::Xpriv;
4035
use bitcoin::blockdata::script;
@@ -69,8 +64,6 @@ use url::Url;
6964
#[cfg(target_arch = "wasm32")]
7065
use web_time::Instant;
7166

72-
const KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES: usize = 128 * 1024; // 128KB
73-
7467
// This is the NodeStorage object saved to the DB
7568
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
7669
pub struct NodeStorage {
@@ -655,8 +648,6 @@ impl<S: MutinyStorage> NodeManager<S> {
655648
utils::spawn(async move {
656649
let mut synced = false;
657650
loop {
658-
let mut did_keychain_compact_this_round = false;
659-
660651
// If we are stopped, don't sync
661652
if nm.stop.load(Ordering::Relaxed) {
662653
return;
@@ -699,135 +690,13 @@ impl<S: MutinyStorage> NodeManager<S> {
699690
}
700691

701692
// check keychain size
702-
let start = Instant::now();
703-
let changes = match nm.storage.read_changes() {
704-
Ok(Some(c)) => c,
705-
Ok(None) => ChangeSet::default(),
693+
let did_keychain_compact_this_round = match nm.wallet.try_compact_keychain().await {
694+
Ok(did_keychain_compact_this_round) => did_keychain_compact_this_round,
706695
Err(e) => {
707-
log_error!(
708-
nm.logger,
709-
"Compaction check: Failed to read changes: {:?}",
710-
e
711-
);
712-
ChangeSet::default()
696+
log_error!(nm.logger, "Failed to compact keychain: {e}");
697+
false
713698
}
714699
};
715-
let total_size = serde_json::to_vec(&changes).unwrap_or_default().len();
716-
log_info!(nm.logger, "Keychain size: {} bytes", total_size);
717-
if total_size > KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES {
718-
log_info!(
719-
nm.logger,
720-
"Keychain size threshold exceeded {} Bytes, spawning simplified compaction task.",
721-
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
722-
);
723-
724-
let local_chain_size = serde_json::to_vec(&changes.local_chain)
725-
.map(|v| v.len())
726-
.unwrap_or(0);
727-
let tx_graph_size = serde_json::to_vec(&changes.tx_graph)
728-
.map(|v| v.len())
729-
.unwrap_or(0);
730-
let indexer_size = serde_json::to_vec(&changes.indexer)
731-
.map(|v| v.len())
732-
.unwrap_or(0);
733-
log_debug!(
734-
nm.logger,
735-
"PRE-COMPACTION size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}",
736-
total_size,
737-
local_chain_size,
738-
tx_graph_size,
739-
indexer_size
740-
);
741-
742-
if let Ok(mut new_wallet) = nm.wallet.new_wallet() {
743-
if let Ok(update) = OnChainWallet::<S>::full_scan(
744-
&new_wallet,
745-
RESTORE_SYNC_STOP_GAP,
746-
nm.esplora.clone(),
747-
)
748-
.await
749-
{
750-
let total_size = serde_json::to_vec(&changes).unwrap_or_default().len();
751-
let local_chain_size = serde_json::to_vec(&changes.local_chain)
752-
.map(|v| v.len())
753-
.unwrap_or(0);
754-
let tx_graph_size = serde_json::to_vec(&changes.tx_graph)
755-
.map(|v| v.len())
756-
.unwrap_or(0);
757-
let indexer_size = serde_json::to_vec(&changes.indexer)
758-
.map(|v| v.len())
759-
.unwrap_or(0);
760-
log_debug!(nm.logger,
761-
"POST-COMPACTION size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}",
762-
total_size,
763-
local_chain_size,
764-
tx_graph_size,
765-
indexer_size
766-
);
767-
768-
did_keychain_compact_this_round = true;
769-
if new_wallet
770-
.apply_update_at(update, Some(now().as_secs()))
771-
.is_ok()
772-
{
773-
// Strategy: Try acquiring main lock once.
774-
// - Failure indicates contention. Abort compaction this cycle to ensure we don't overwrite
775-
// changes from the contending operation (unlike a retry-until-success approach which *would* overwrite).
776-
// - Success indicates no contention detected now; proceed with replace/overwrite.
777-
if let Ok(mut wallet) = nm.wallet.wallet.try_write() {
778-
if let Some(changeset) = new_wallet.take_staged() {
779-
if nm.storage.restore_changes(&changeset).is_ok() {
780-
*wallet = new_wallet;
781-
log_info!(
782-
nm.logger,
783-
"Keychain compaction completed successfully."
784-
);
785-
}
786-
}
787-
drop(wallet); // drop so we can read from wallet
788-
789-
// update the activity index, just get the list of transactions
790-
// and insert them into the index, this is done in background so shouldn't
791-
// block the wallet update
792-
if let Ok(txs) = nm.wallet.list_transactions(false) {
793-
let index_items = txs
794-
.into_iter()
795-
.map(|t| IndexItem {
796-
timestamp: match t.confirmation_time {
797-
ConfirmationTime::Confirmed {
798-
time, ..
799-
} => Some(time),
800-
ConfirmationTime::Unconfirmed { .. } => None,
801-
},
802-
key: format!("{ONCHAIN_PREFIX}{}", t.internal_id),
803-
})
804-
.collect::<Vec<_>>();
805-
806-
if let Ok(mut index) =
807-
nm.storage.activity_index().try_write()
808-
{
809-
// remove old-onchain txs
810-
index.retain(|i| !i.key.starts_with(ONCHAIN_PREFIX));
811-
index.extend(index_items);
812-
}
813-
}
814-
} else {
815-
log_warn!(
816-
nm.logger,
817-
"Compaction: Failed to acquire main wallet lock due to contention. Aborting compaction attempt for this cycle."
818-
);
819-
}
820-
} else {
821-
log_error!(nm.logger, "Keychain compaction failed to apply update");
822-
}
823-
}
824-
}
825-
}
826-
log_info!(
827-
nm.logger,
828-
"Keychain compaction took {} seconds",
829-
start.elapsed().as_secs()
830-
);
831700

832701
// wait for next sync round, checking graceful shutdown check each second.
833702
if !did_keychain_compact_this_round {

mutiny-core/src/onchain.rs

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use bdk_wallet::bitcoin::FeeRate;
1313
use bdk_wallet::psbt::PsbtUtils;
1414
use bdk_wallet::template::DescriptorTemplateOut;
1515
use bdk_wallet::{
16-
CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
16+
ChangeSet, CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
1717
};
1818
use bitcoin::bip32::{ChildNumber, DerivationPath, Xpriv};
1919
use bitcoin::consensus::serialize;
@@ -47,6 +47,7 @@ use web_time::Instant;
4747
pub(crate) const FULL_SYNC_STOP_GAP: usize = 150;
4848
pub(crate) const RESTORE_SYNC_STOP_GAP: usize = 50;
4949
const PARALLEL_REQUESTS: usize = 10;
50+
const KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES: usize = 128 * 1024; // 128KB
5051

5152
#[derive(Clone)]
5253
pub struct OnChainWallet<S: MutinyStorage> {
@@ -891,6 +892,98 @@ impl<S: MutinyStorage> OnChainWallet<S> {
891892

892893
Ok(update)
893894
}
895+
896+
pub async fn try_compact_keychain(&self) -> Result<bool, MutinyError> {
897+
let start = Instant::now();
898+
let mut did_keychain_full_scan = false;
899+
900+
let changes = self.storage.read_changes()?.unwrap_or_default();
901+
let total_size = serde_json::to_vec(&changes).unwrap_or_default().len();
902+
if total_size < KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES {
903+
log_info!(
904+
self.logger,
905+
"Keychain size {}is below threshold {}, not compacting",
906+
total_size,
907+
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
908+
);
909+
return Ok(did_keychain_full_scan);
910+
}
911+
log_info!(
912+
self.logger,
913+
"Keychain size threshold exceeded {} Bytes, spawning simplified compaction task.",
914+
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
915+
);
916+
self.log_keychain_size(&changes);
917+
918+
let mut new_wallet = self.new_wallet()?;
919+
let update =
920+
Self::full_scan(&new_wallet, RESTORE_SYNC_STOP_GAP, self.blockchain.clone()).await?;
921+
did_keychain_full_scan = true;
922+
923+
new_wallet
924+
.apply_update_at(update, Some(now().as_secs()))
925+
.map_err(|e| {
926+
log_error!(self.logger, "Could not apply wallet update: {e}");
927+
MutinyError::Other(anyhow!("Could not apply update: {e}"))
928+
})?;
929+
let mut wallet = self.wallet.try_write()?;
930+
let index = self.storage.activity_index();
931+
let mut index = index.try_write()?;
932+
let new_changeset = new_wallet.take_staged().ok_or(MutinyError::Other(anyhow!(
933+
"Failed to take staged changeset from new wallet"
934+
)))?;
935+
self.log_keychain_size(&new_changeset);
936+
self.storage.restore_changes(&new_changeset)?;
937+
*wallet = new_wallet;
938+
drop(wallet); // drop so we can read from wallet
939+
940+
// update the activity index, just get the list of transactions
941+
// and insert them into the index
942+
let index_items = self
943+
.list_transactions(false)?
944+
.into_iter()
945+
.map(|t| IndexItem {
946+
timestamp: match t.confirmation_time {
947+
ConfirmationTime::Confirmed { time, .. } => Some(time),
948+
ConfirmationTime::Unconfirmed { .. } => None,
949+
},
950+
key: format!("{ONCHAIN_PREFIX}{}", t.internal_id),
951+
})
952+
.collect::<Vec<_>>();
953+
954+
// remove old-onchain txs
955+
index.retain(|i| !i.key.starts_with(ONCHAIN_PREFIX));
956+
index.extend(index_items);
957+
958+
log_info!(self.logger, "Keychain compaction completed successfully.");
959+
log_info!(
960+
self.logger,
961+
"Keychain compaction took {} seconds",
962+
start.elapsed().as_secs()
963+
);
964+
965+
Ok(did_keychain_full_scan)
966+
}
967+
968+
fn log_keychain_size(&self, keychain: &ChangeSet) {
969+
let total_size = serde_json::to_vec(&keychain).unwrap_or_default().len();
970+
let local_chain_size = serde_json::to_vec(&keychain.local_chain)
971+
.map(|v| v.len())
972+
.unwrap_or(0);
973+
let tx_graph_size = serde_json::to_vec(&keychain.tx_graph)
974+
.map(|v| v.len())
975+
.unwrap_or(0);
976+
let indexer_size = serde_json::to_vec(&keychain.indexer)
977+
.map(|v| v.len())
978+
.unwrap_or(0);
979+
log_debug!(self.logger,
980+
"PRE-COMPACTION size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}",
981+
total_size,
982+
local_chain_size,
983+
tx_graph_size,
984+
indexer_size
985+
);
986+
}
894987
}
895988

896989
fn get_tr_descriptors_for_extended_key(

0 commit comments

Comments
 (0)