diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index 14595f8da..ed27653e2 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -160,6 +160,8 @@ public class PeerGroup implements TransactionBroadcaster, GovernanceVoteBroadcas = new CopyOnWriteArrayList<>(); protected final CopyOnWriteArrayList> masternodeListDownloadListeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList> timeoutErrorListeners + = new CopyOnWriteArrayList<>(); // Peer discovery sources, will be polled occasionally if there aren't enough inactives. private final CopyOnWriteArraySet peerDiscoverers; // The version message to use for new connections. @@ -902,7 +904,6 @@ public void addPreBlocksDownloadListener(Executor executor, PreBlocksDownloadLis preBlocksDownloadListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor)); } - /** See {@link Peer#addOnTransactionBroadcastListener(OnTransactionBroadcastListener)} */ public void addMasternodeListDownloadListener(Executor executor, MasternodeListDownloadedListener listener) { masternodeListDownloadListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor)); for (Peer peer : getConnectedPeers()) @@ -911,6 +912,14 @@ public void addMasternodeListDownloadListener(Executor executor, MasternodeListD peer.addMasternodeListDownloadedListener(executor, listener); } + public void addTimeoutErrorListener(Executor executor, TimeoutErrorListener listener) { + timeoutErrorListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor)); + for (Peer peer : getConnectedPeers()) + peer.addTimeoutErrorListener(executor, listener); + for (Peer peer : getPendingPeers()) + peer.addTimeoutErrorListener(executor, listener); + } + /** See {@link Peer#addPreMessageReceivedEventListener(PreMessageReceivedEventListener)} */ public void addPreMessageReceivedEventListener(PreMessageReceivedEventListener listener) { addPreMessageReceivedEventListener(Threading.USER_THREAD, listener); @@ -1022,6 +1031,16 @@ public boolean removeMasternodeListDownloadedListener(MasternodeListDownloadedLi return result; } + /** The given event listener will no longer be called with events. */ + public boolean removeTimeoutErrorListener(TimeoutErrorListener listener) { + boolean result = ListenerRegistration.removeFromList(listener, timeoutErrorListeners); + for (Peer peer : getConnectedPeers()) + peer.removeTimeoutErrorListener(listener); + for (Peer peer : getPendingPeers()) + peer.removeTimeoutErrorListener(listener); + return result; + } + public boolean removePreMessageReceivedEventListener(PreMessageReceivedEventListener listener) { boolean result = ListenerRegistration.removeFromList(listener, peersPreMessageReceivedEventListeners); for (Peer peer : getConnectedPeers()) @@ -1279,6 +1298,56 @@ public void start() { public ListenableFuture stopAsync() { checkState(vRunning); vRunning = false; + + // Log executor state and remaining jobs + log.info("stopAsync() called - executor shutdown: {}, terminated: {}", + executor.isShutdown(), executor.isTerminated()); + + // The executor is wrapped by MoreExecutors.listeningDecorator, need to access the underlying executor + if (executor instanceof ListeningScheduledExecutorService) { + // Try to get the underlying ScheduledThreadPoolExecutor + try { + // Use reflection to access the delegate field in ListeningDecorator + java.lang.reflect.Field delegateField = executor.getClass().getDeclaredField("delegate"); + delegateField.setAccessible(true); + Object delegate = delegateField.get(executor); + + if (delegate instanceof ScheduledThreadPoolExecutor) { + ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) delegate; + log.info("Executor queue size: {}, active threads: {}", + stpe.getQueue().size(), stpe.getActiveCount()); + + // Log remaining jobs in queue + stpe.getQueue().forEach(job -> { + log.info("Remaining job: {} (class: {})", job.toString(), job.getClass().getSimpleName()); + }); + + // Get call stacks of executor threads + ThreadGroup rootGroup = Thread.currentThread().getThreadGroup(); + while (rootGroup.getParent() != null) { + rootGroup = rootGroup.getParent(); + } + + Thread[] threads = new Thread[rootGroup.activeCount()]; + rootGroup.enumerate(threads); + + for (Thread thread : threads) { + if (thread != null && thread.getName().contains("PeerGroup Thread")) { + log.info("Found PeerGroup thread: {} (state: {})", thread.getName(), thread.getState()); + StackTraceElement[] stackTrace = thread.getStackTrace(); + log.info("Stack trace for thread {}:", thread.getName()); + for (int i = 0; i < Math.min(stackTrace.length, 15); i++) { + log.info(" at {}", stackTrace[i]); + } + } + } + } + } catch (Exception e) { + log.info("Could not access underlying executor details: {}", e.getMessage()); + } + } + + log.info("About to submit shutdown task to executor"); ListenableFuture future = executor.submit(new Runnable() { @Override public void run() { @@ -1320,6 +1389,52 @@ public void stop() { } } + public void forceStop(int waitMsBeforeShutdown) { + try { + Stopwatch watch = Stopwatch.createStarted(); + // we may have already called this function + if (vRunning) { + stopAsync(); + } + log.info("Awaiting PeerGroup shutdown ... (forcing after {} second(s))", waitMsBeforeShutdown/1000); + boolean result = executor.awaitTermination(waitMsBeforeShutdown, TimeUnit.MILLISECONDS); + log.info("... took {} (timed out: {})", watch, !result); + if (!result) { + log.info("PeerGroup shutdown timed out after {}, forcing shutdown now", + watch); + List remainingJobs = executor.shutdownNow(); + // Optional: wait a brief moment for forced shutdown + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + remainingJobs.forEach(job -> { + String jobDescription = getJobDescription(job); + log.info("Remaining job: {} (class: {}) - {}", job.toString(), job.getClass().getSimpleName(), jobDescription); + }); + } + + log.info("PeerGroup shutdown completed in {}", watch); + } catch (SecurityException e) { + log.info("failure to force stop remaining jobs"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String getJobDescription(Runnable job) { + if (job == triggerConnectionsJob) { + return "Peer connection management"; + } else if (job instanceof ChainDownloadSpeedCalculator) { + return "Chain download speed monitoring"; + } else if (job.toString().contains("ping")) { + return "Peer ping task"; + } else if (job.toString().contains("recalculate")) { + return "Bloom filter recalculation"; + } else if (job.toString().contains("listener")) { + return "Event listener task"; + } else { + return "Unknown task"; + } + } + /** * Gracefully drops all connected peers. */ @@ -1607,6 +1722,9 @@ protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, i peer.addConnectedEventListener(Threading.SAME_THREAD, startupListener); peer.addDisconnectedEventListener(Threading.SAME_THREAD, startupListener); peer.setMinProtocolVersion(vMinRequiredProtocolVersion); + // Add timeout error listeners early so they're available even if peer times out during connection + for (ListenerRegistration registration : timeoutErrorListeners) + peer.addTimeoutErrorListener(registration.executor, registration.listener); pendingPeers.add(peer); try { @@ -1968,6 +2086,8 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) { peer.removePreMessageReceivedEventListener(registration.listener); for (ListenerRegistration registration: masternodeListDownloadListeners) peer.removeMasternodeListDownloadedListener(registration.listener); + for (ListenerRegistration registration: timeoutErrorListeners) + peer.removeTimeoutErrorListener(registration.listener); for (ListenerRegistration registration : peersTransactionBroadastEventListeners) peer.removeOnTransactionBroadcastListener(registration.listener); for (final ListenerRegistration registration : peerDisconnectedEventListeners) { @@ -1988,6 +2108,7 @@ public void run() { @GuardedBy("lock") private int stallPeriodSeconds = 10; @GuardedBy("lock") private int stallMinSpeedBytesSec = Block.HEADER_SIZE * 10; + @GuardedBy("lock") private int maxStalls = 3; /** * Configures the stall speed: the speed at which a peer is considered to be serving us the block chain @@ -2010,6 +2131,36 @@ public void setStallThreshold(int periodSecs, int bytesPerSecond) { } } + /** + * Sets the maximum number of stalls allowed before giving up on stall disconnections. + * After this many stalls, the system assumes the network is terminally slow and stops + * disconnecting peers for stalls. Defaults to 3. + * + * @param maxStalls Maximum number of stalls allowed, or 0 to disable stall disconnections entirely + */ + public void setMaxStalls(int maxStalls) { + lock.lock(); + try { + this.maxStalls = maxStalls; + } finally { + lock.unlock(); + } + } + + /** + * Gets the maximum number of stalls allowed before giving up on stall disconnections. + * + * @return Maximum number of stalls allowed + */ + public int getMaxStalls() { + lock.lock(); + try { + return maxStalls; + } finally { + lock.unlock(); + } + } + private class ChainDownloadSpeedCalculator implements BlocksDownloadedEventListener, Runnable, HeadersDownloadedEventListener, PreBlocksDownloadListener, MasternodeListDownloadedListener { private int blocksInLastSecond, txnsInLastSecond, origTxnsInLastSecond; @@ -2020,7 +2171,7 @@ private class ChainDownloadSpeedCalculator implements BlocksDownloadedEventListe // If we take more stalls than this, we assume we're on some kind of terminally slow network and the // stall threshold just isn't set properly. We give up on stall disconnects after that. - private int maxStalls = 3; + // This now uses the configurable maxStalls from the outer PeerGroup class. // How many seconds the peer has until we start measuring its speed. private int warmupSeconds = -1; @@ -2104,7 +2255,8 @@ private void calculate() { bytesInLastSecond / 1024.0, chainHeight, mostCommonChainHeight); String thresholdString = String.format(Locale.US, "(threshold <%.2f KB/sec for %d seconds)", minSpeedBytesPerSec / 1024.0, samples.length); - if (maxStalls <= 0) { + int currentMaxStalls = getMaxStalls(); + if (currentMaxStalls <= 0) { log.info(statsString + ", stall disabled " + thresholdString); } else if (warmupSeconds > 0) { warmupSeconds--; @@ -2112,9 +2264,15 @@ private void calculate() { log.info(statsString + String.format(Locale.US, " (warming up %d more seconds)", warmupSeconds)); } else if (average < minSpeedBytesPerSec && !waitForPreBlockDownload) { - log.info(statsString + ", STALLED " + thresholdString); - maxStalls--; - if (maxStalls == 0) { + log.info("{}, STALLED {}, maxStalls: {}", statsString, thresholdString, currentMaxStalls); + lock.lock(); + try { + PeerGroup.this.maxStalls--; + currentMaxStalls = PeerGroup.this.maxStalls; + } finally { + lock.unlock(); + } + if (currentMaxStalls == 0) { // We could consider starting to drop the Bloom filtering FP rate at this point, because // we tried a bunch of peers and no matter what we don't seem to be able to go any faster. // This implies we're bandwidth bottlenecked and might want to start using bandwidth @@ -2127,7 +2285,7 @@ private void calculate() { Peer peer = getDownloadPeer(); log.warn(String.format(Locale.US, "Chain download stalled: received %.2f KB/sec for %d seconds, require average of %.2f KB/sec, disconnecting %s, %d stalls left", - average / 1024.0, samples.length, minSpeedBytesPerSec / 1024.0, peer, maxStalls)); + average / 1024.0, samples.length, minSpeedBytesPerSec / 1024.0, peer, currentMaxStalls)); if (peer != null) { peer.close(); } @@ -2295,6 +2453,12 @@ public void run() { } } + public void queueTimeoutErrorListeners(TimeoutError error, PeerAddress peerAddress) { + for (final ListenerRegistration registration : timeoutErrorListeners) { + registration.executor.execute(() -> registration.listener.onTimeout(error, peerAddress)); + } + } + @VisibleForTesting void startBlockChainDownloadFromPeer(Peer peer) { lock.lock(); diff --git a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java index 59a3ca9cd..029460f94 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java +++ b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java @@ -18,11 +18,14 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.bitcoinj.core.listeners.TimeoutError; +import org.bitcoinj.core.listeners.TimeoutErrorListener; import org.bitcoinj.net.AbstractTimeoutHandler; import org.bitcoinj.net.MessageWriteTarget; import org.bitcoinj.net.NioClient; import org.bitcoinj.net.NioClientManager; import org.bitcoinj.net.StreamConnection; +import org.bitcoinj.utils.ListenerRegistration; import org.bitcoinj.utils.Threading; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -35,6 +38,8 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.NotYetConnectedException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import static com.google.common.base.Preconditions.*; @@ -59,6 +64,8 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement private byte[] largeReadBuffer; private int largeReadBufferPos; private BitcoinSerializer.BitcoinPacketHeader header; + private final CopyOnWriteArrayList> timeoutErrorListeners + = new CopyOnWriteArrayList<>(); private Lock lock = Threading.lock("PeerSocketHandler"); @@ -116,10 +123,67 @@ public void close() { @Override protected void timeoutOccurred() { + Thread currentThread = Thread.currentThread(); log.info("{}: Timed out", getAddress()); + + // Check if any thread is stuck in peekByteArray or SPVBlockStore operations + boolean blockStoreTimeout = checkForBlockStoreTimeout(); + + if (blockStoreTimeout) { + log.error("TIMEOUT CAUSE: Native I/O operation (peekByteArray) is frozen - this requires app-level recovery"); + // The timeoutFlag and timeoutFuture in AbstractTimeoutHandler are already set + // Upstream code should check hasTimedOut() or listen to getTimeoutFuture() to handle this + queueTimeoutErrorListeners(TimeoutError.BLOCKSTORE_MEMORY_ACCESS, peerAddress); + } else { + // General timeout (not blockstore-specific) + log.warn("TIMEOUT CAUSE: General peer timeout - no response received within timeout period"); + queueTimeoutErrorListeners(TimeoutError.UNKNOWN, peerAddress); + } + close(); } + /** + * Checks all thread stacks to detect if any thread is stuck in native I/O operations + * (peekByteArray or SPVBlockStore operations that freeze on Android). + * @return true if a blockstore timeout is detected + */ + private boolean checkForBlockStoreTimeout() { + Thread.getAllStackTraces().forEach((thread, stackTrace) -> { + String threadName = thread.getName(); + if (threadName.contains("PeerGroup Thread") || threadName.contains("NioClientManager")) { + log.warn("Stack trace for thread '{}' (State: {}):", threadName, thread.getState()); + + boolean foundBlockingCall = false; + for (StackTraceElement element : stackTrace) { + log.warn(" at {}", element); + + // Check if this thread is stuck in native peekByteArray or SPVBlockStore operations + String elementStr = element.toString(); + if (elementStr.contains("peekByteArray")) { + foundBlockingCall = true; + } + } + + if (foundBlockingCall) { + log.error("CRITICAL: Thread '{}' is stuck in native I/O operation (peekByteArray/SPVBlockStore)", threadName); + } + } + }); + + // Check all threads for blocking SPVBlockStore calls + for (Thread thread : Thread.getAllStackTraces().keySet()) { + for (StackTraceElement element : thread.getStackTrace()) { + String elementStr = element.toString(); + if (elementStr.contains("peekByteArray")) { + log.error("CRITICAL: Detected SPVBlockStore timeout - native I/O freeze detected in thread: {}", thread.getName()); + return true; + } + } + } + return false; + } + /** * Called every time a message is received from the network */ @@ -246,4 +310,24 @@ private void exceptionCaught(Exception e) { protected void setMessageSerializer(MessageSerializer messageSerializer) { this.serializer = messageSerializer; } + + /** Registers a listener that is called immediately before a message is received */ + public void addTimeoutErrorListener(Executor executor, TimeoutErrorListener listener) { + timeoutErrorListeners.add(new ListenerRegistration<>(listener, executor)); + } + + public boolean removeTimeoutErrorListener(TimeoutErrorListener listener) { + return ListenerRegistration.removeFromList(listener, timeoutErrorListeners); + } + + public void queueTimeoutErrorListeners(TimeoutError error, PeerAddress peer) { + for (final ListenerRegistration registration : timeoutErrorListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onTimeout(error, peer); + } + }); + } + } } diff --git a/core/src/main/java/org/bitcoinj/core/listeners/TimeoutError.java b/core/src/main/java/org/bitcoinj/core/listeners/TimeoutError.java new file mode 100644 index 000000000..0f4c3a2e7 --- /dev/null +++ b/core/src/main/java/org/bitcoinj/core/listeners/TimeoutError.java @@ -0,0 +1,6 @@ +package org.bitcoinj.core.listeners; + +public enum TimeoutError { + BLOCKSTORE_MEMORY_ACCESS, + UNKNOWN +} diff --git a/core/src/main/java/org/bitcoinj/core/listeners/TimeoutErrorListener.java b/core/src/main/java/org/bitcoinj/core/listeners/TimeoutErrorListener.java new file mode 100644 index 000000000..204046b97 --- /dev/null +++ b/core/src/main/java/org/bitcoinj/core/listeners/TimeoutErrorListener.java @@ -0,0 +1,7 @@ +package org.bitcoinj.core.listeners; + +import org.bitcoinj.core.PeerAddress; + +public interface TimeoutErrorListener { + void onTimeout(TimeoutError error, PeerAddress peer); +} diff --git a/core/src/main/java/org/bitcoinj/evolution/AbstractQuorumState.java b/core/src/main/java/org/bitcoinj/evolution/AbstractQuorumState.java index c4d8b0d1d..6c7c83934 100644 --- a/core/src/main/java/org/bitcoinj/evolution/AbstractQuorumState.java +++ b/core/src/main/java/org/bitcoinj/evolution/AbstractQuorumState.java @@ -861,6 +861,13 @@ BLSSignature getCoinbaseChainlock(StoredBlock block) { Sha256Hash getHashModifier(LLMQParameters llmqParams, StoredBlock quorumBaseBlock) { StoredBlock workBlock = blockChain.getBlockAncestor(quorumBaseBlock, quorumBaseBlock.getHeight() - 8); + // workBlock can be null if the ancestor block doesn't exist yet (during sync) + if (workBlock == null) { + log.warn("Cannot calculate hash modifier: ancestor block at height {} not found for quorum base block {}", + quorumBaseBlock.getHeight() - 8, quorumBaseBlock.getHeader().getHash()); + return null; + } + if (params.isV20Active(workBlock.getHeight())) { // v20 is active: calculate modifier using the new way. BLSSignature bestCLSignature = getCoinbaseChainlock(workBlock); diff --git a/core/src/main/java/org/bitcoinj/evolution/QuorumState.java b/core/src/main/java/org/bitcoinj/evolution/QuorumState.java index 0ba5d79ec..5fb0d3a96 100644 --- a/core/src/main/java/org/bitcoinj/evolution/QuorumState.java +++ b/core/src/main/java/org/bitcoinj/evolution/QuorumState.java @@ -192,7 +192,20 @@ protected ArrayList computeQuorumMembers(LLMQParameters.LLMQType llm blockChain.getBlockAncestor(quorumBaseBlock, quorumBaseBlock.getHeight() - 8) : quorumBaseBlock; + // workBlock can be null if the ancestor block doesn't exist + if (workBlock == null) { + log.warn("Cannot compute quorum members: ancestor block not found for quorum at height {}", + quorumBaseBlock.getHeight()); + return null; + } + Sha256Hash modifier = getHashModifier(llmqParameters, quorumBaseBlock); + if (modifier == null) { + log.warn("Cannot compute quorum members: hash modifier is null for quorum at height {}", + quorumBaseBlock.getHeight()); + return null; + } + SimplifiedMasternodeList allMns = getListForBlock(workBlock.getHeader().getHash()); return allMns != null ? allMns.calculateQuorum(llmqParameters.getSize(), modifier, evoOnly) : null; } diff --git a/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListDiff.java b/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListDiff.java index 59a7dae5f..439304818 100644 --- a/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListDiff.java +++ b/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListDiff.java @@ -205,7 +205,7 @@ private String getAddRemovedString() { @Override public String toString() { - return String.format("Simplified MNList Diff{ %s }", getAddRemovedString()); + return String.format("Simplified MNList Diff{ %s }%n %s -> %s", getAddRemovedString(), prevBlockHash, blockHash); } public String toString(DualBlockChain blockChain) { diff --git a/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListManager.java b/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListManager.java index c41018b93..f80491a75 100644 --- a/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListManager.java +++ b/core/src/main/java/org/bitcoinj/evolution/SimplifiedMasternodeListManager.java @@ -445,13 +445,13 @@ public void close() { quorumRotationState.close(); peerGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener); - try { - threadPool.shutdown(); - threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException x) { - // swallow + threadPool.shutdown(); + // Don't wait at all - let it die naturally to avoid blocking + if (!threadPool.isTerminated()) { + log.info("ThreadPool shutdown initiated, not waiting"); + threadPool.shutdownNow(); // Send interrupt signal but don't wait } - saveNow(); + saveNow(); // Always save, regardless of thread pool state super.close(); } diff --git a/core/src/main/java/org/bitcoinj/manager/DashSystem.java b/core/src/main/java/org/bitcoinj/manager/DashSystem.java index 35968de97..9893399be 100644 --- a/core/src/main/java/org/bitcoinj/manager/DashSystem.java +++ b/core/src/main/java/org/bitcoinj/manager/DashSystem.java @@ -253,22 +253,39 @@ private void stopLLMQThread() { public void close() { if (initializedObjects) { + log.info("Closing network spork configuration manager"); sporkManager.close(peerGroup); + log.info("Closing masternode synchronization state"); masternodeSync.close(); + log.info("Closing masternode list manager (includes thread pool shutdown)"); masternodeListManager.close(); + log.info("Closing InstantSend transaction lock manager"); instantSendManager.close(peerGroup); + log.info("Closing LLMQ signature signing manager"); signingManager.close(); + log.info("Closing ChainLock verification handler"); chainLockHandler.close(); + log.info("Closing quorum state manager"); quorumManager.close(); + log.info("Closing CoinJoin mixing manager"); coinJoinManager.close(); - if(masternodeSync.hasSyncFlag(MasternodeSync.SYNC_FLAGS.SYNC_INSTANTSENDLOCKS)) + if(masternodeSync.hasSyncFlag(MasternodeSync.SYNC_FLAGS.SYNC_INSTANTSENDLOCKS)) { + log.info("Stopping LLMQ background thread"); llmqBackgroundThread.interrupt(); + } + log.info("Removing block chain event listener"); blockChain.removeNewBestBlockListener(newBestBlockListener); - if (scheduledMasternodeSync != null) + if (scheduledMasternodeSync != null) { + log.info("Cancelling scheduled masternode sync task"); scheduledMasternodeSync.cancel(true); + } + log.info("Closing main block chain"); blockChain.close(); - if (headerChain != null) + if (headerChain != null) { + log.info("Closing header chain"); headerChain.close(); + } + log.info("Clearing peer group reference"); peerGroup = null; } } diff --git a/core/src/main/java/org/bitcoinj/quorums/SimplifiedQuorumList.java b/core/src/main/java/org/bitcoinj/quorums/SimplifiedQuorumList.java index aa5582600..e1794bee3 100644 --- a/core/src/main/java/org/bitcoinj/quorums/SimplifiedQuorumList.java +++ b/core/src/main/java/org/bitcoinj/quorums/SimplifiedQuorumList.java @@ -430,7 +430,7 @@ private boolean checkCommitment(FinalCommitment commitment, StoredBlock quorumBl List members = manager.getAllQuorumMembers(llmqParameters.type, commitment.quorumHash); if (members == null) { - //no information about this quorum because it is before we were downloading + // no information about this quorum because it is before we were downloading log.warn("masternode list is missing to verify quorum: {}", commitment.quorumHash/*, manager.getBlockHeight(commitment.quorumHash)*/); return false; } diff --git a/core/src/main/java/org/bitcoinj/store/FlatDB.java b/core/src/main/java/org/bitcoinj/store/FlatDB.java index 0ec6893cb..aa2de27a6 100644 --- a/core/src/main/java/org/bitcoinj/store/FlatDB.java +++ b/core/src/main/java/org/bitcoinj/store/FlatDB.java @@ -5,6 +5,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -24,6 +26,8 @@ public class FlatDB { private String fileName; private String directory; private String magicMessage; + private int ioBufferSize = 64 * 1024; // Default 64KB buffer for optimal performance + private boolean useAdaptiveBufferSizing = true; public enum ReadResult { Ok, FileError, @@ -90,6 +94,46 @@ public String getDirectory() return directory; } + /** + * Set the I/O buffer size for file operations. Default is 64KB. + * @param bufferSize buffer size in bytes + */ + public void setIOBufferSize(int bufferSize) { + this.ioBufferSize = bufferSize; + } + + /** + * Enable or disable adaptive buffer sizing based on file size. + * @param useAdaptive true to enable adaptive sizing, false to use fixed buffer size + */ + public void setUseAdaptiveBufferSizing(boolean useAdaptive) { + this.useAdaptiveBufferSizing = useAdaptive; + } + + /** + * Calculate optimal buffer size based on data size. + */ + private int calculateOptimalBufferSize(long dataSize) { + if (!useAdaptiveBufferSizing) { + return ioBufferSize; + } + + // Adaptive buffer sizing based on file size + if (dataSize < 16 * 1024) { + // Small files: 4KB buffer + return 4 * 1024; + } else if (dataSize < 256 * 1024) { + // Medium files: 16KB buffer + return 16 * 1024; + } else if (dataSize < 2 * 1024 * 1024) { + // Large files: 64KB buffer + return 64 * 1024; + } else { + // Very large files: 128KB buffer + return 128 * 1024; + } + } + boolean write(Type object) { try { @@ -116,11 +160,13 @@ boolean write(Type object) { stream.write(hash.getReversedBytes()); - FileOutputStream fileStream = new FileOutputStream(pathDB); - - fileStream.write(stream.toByteArray()); - - fileStream.close(); + byte[] data = stream.toByteArray(); + int bufferSize = calculateOptimalBufferSize(data.length); + + try (FileOutputStream fileStream = new FileOutputStream(pathDB); + BufferedOutputStream bufferedStream = new BufferedOutputStream(fileStream, bufferSize)) { + bufferedStream.write(data); + } log.info("Written info to {} {}ms", pathDB, watch.elapsed(TimeUnit.MILLISECONDS)); log.info(" {}", object); @@ -150,8 +196,6 @@ ReadResult read(Type object, boolean fDryRun) { previousPathDB = directory + File.separator + object.getPreviousDefaultFileName(); } - FileInputStream fileStream = new FileInputStream(pathDB); - File file = new File(pathDB); // try loading the previous file if (!file.exists() && previousPathDB != null) { @@ -165,20 +209,21 @@ ReadResult read(Type object, boolean fDryRun) { if (dataSize < 0) dataSize = 0; if(dataSize == 0) { - fileStream.close(); return ReadResult.FileError; } byte [] hashIn = new byte[32]; byte [] vchData = new byte[(int)dataSize]; + + int bufferSize = calculateOptimalBufferSize(fileSize); - try { - fileStream.read(vchData); - fileStream.read(hashIn); + try (FileInputStream fileStream = new FileInputStream(file); + BufferedInputStream bufferedStream = new BufferedInputStream(fileStream, bufferSize)) { + bufferedStream.read(vchData); + bufferedStream.read(hashIn); } catch (IOException x) { return ReadResult.HashReadError; } - fileStream.close(); // verify stored checksum matches input data Sha256Hash hashTmp = Sha256Hash.twiceOf(vchData); diff --git a/core/src/main/java/org/bitcoinj/utils/BriefLogFormatter.java b/core/src/main/java/org/bitcoinj/utils/BriefLogFormatter.java index 6b3170246..68feb7f4e 100644 --- a/core/src/main/java/org/bitcoinj/utils/BriefLogFormatter.java +++ b/core/src/main/java/org/bitcoinj/utils/BriefLogFormatter.java @@ -57,7 +57,7 @@ public static void initWithSilentBitcoinJ() { @Override public String format(LogRecord logRecord) { Object[] arguments = new Object[6]; - arguments[0] = logRecord.getThreadID(); + arguments[0] = String.format("[%s]", Thread.currentThread().getName()); String fullClassName = logRecord.getSourceClassName(); int lastDot = fullClassName.lastIndexOf('.'); String className = fullClassName.substring(lastDot + 1); diff --git a/core/src/main/java/org/bitcoinj/wallet/CoinJoinExtension.java b/core/src/main/java/org/bitcoinj/wallet/CoinJoinExtension.java index a22fa8f1a..8e263ed44 100644 --- a/core/src/main/java/org/bitcoinj/wallet/CoinJoinExtension.java +++ b/core/src/main/java/org/bitcoinj/wallet/CoinJoinExtension.java @@ -51,7 +51,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.security.SecureRandom; import java.util.Arrays; import java.util.HashMap; @@ -129,43 +128,76 @@ public boolean supportsEncryption() { */ @Override public byte[] serializeWalletExtension() { - try { - Protos.CoinJoin.Builder builder = Protos.CoinJoin.newBuilder(); - List keys = coinJoinKeyChainGroup != null ? coinJoinKeyChainGroup.serializeToProtobuf() : Lists.newArrayList(); - builder.addAllKey(keys); - builder.setRounds(rounds); - - // Serialize outpoint rounds cache for WalletEx - if (wallet instanceof WalletEx) { - WalletEx walletEx = (WalletEx) wallet; - if (!walletEx.mapOutpointRoundsCache.isEmpty()) { - Protos.OutpointRoundsCache.Builder cacheBuilder = Protos.OutpointRoundsCache.newBuilder(); - for (Map.Entry entry : walletEx.mapOutpointRoundsCache.entrySet()) { - Protos.OutpointRoundsEntry.Builder entryBuilder = Protos.OutpointRoundsEntry.newBuilder(); - entryBuilder.setTransactionHash(ByteString.copyFrom(entry.getKey().getHash().getBytes())); - entryBuilder.setOutputIndex(entry.getKey().getIndex()); - entryBuilder.setRounds(entry.getValue()); - cacheBuilder.addEntries(entryBuilder); - } - builder.setOutpointRoundsCache(cacheBuilder); + long startTime = System.nanoTime(); + + Protos.CoinJoin.Builder builder = Protos.CoinJoin.newBuilder(); + + // Time key chain serialization + long keysStartTime = System.nanoTime(); + List keys = coinJoinKeyChainGroup != null ? coinJoinKeyChainGroup.serializeToProtobuf() : Lists.newArrayList(); + builder.addAllKey(keys); + builder.setRounds(rounds); + long keysEndTime = System.nanoTime(); + long keysMs = (keysEndTime - keysStartTime) / 1_000_000; + + // Time outpoint rounds cache serialization + long cacheStartTime = System.nanoTime(); + int cacheEntries = 0; + if (wallet instanceof WalletEx) { + WalletEx walletEx = (WalletEx) wallet; + if (!walletEx.mapOutpointRoundsCache.isEmpty()) { + Protos.OutpointRoundsCache.Builder cacheBuilder = Protos.OutpointRoundsCache.newBuilder(); + for (Map.Entry entry : walletEx.mapOutpointRoundsCache.entrySet()) { + Protos.OutpointRoundsEntry.Builder entryBuilder = Protos.OutpointRoundsEntry.newBuilder(); + entryBuilder.setTransactionHash(ByteString.copyFrom(entry.getKey().getHash().getBytes())); + entryBuilder.setOutputIndex(entry.getKey().getIndex()); + entryBuilder.setRounds(entry.getValue()); + cacheBuilder.addEntries(entryBuilder); + cacheEntries++; } + builder.setOutpointRoundsCache(cacheBuilder); } + } + long cacheEndTime = System.nanoTime(); + long cacheMs = (cacheEndTime - cacheStartTime) / 1_000_000; - // Serialize coinJoinSalt (ensure it is initialized) - if (coinJoinSalt.equals(Sha256Hash.ZERO_HASH)) { - calculateCoinJoinSalt(); - } - builder.setCoinjoinSalt(ByteString.copyFrom(coinJoinSalt.getBytes())); - Protos.CoinJoin coinJoinProto = builder.build(); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - final CodedOutputStream codedOutput = CodedOutputStream.newInstance(output); + // Time salt operations + long saltStartTime = System.nanoTime(); + if (coinJoinSalt.equals(Sha256Hash.ZERO_HASH)) { + calculateCoinJoinSalt(); + } + builder.setCoinjoinSalt(ByteString.copyFrom(coinJoinSalt.getBytes())); + long saltEndTime = System.nanoTime(); + long saltMs = (saltEndTime - saltStartTime) / 1_000_000; + + // Time protobuf building and optimized I/O + long ioStartTime = System.nanoTime(); + Protos.CoinJoin coinJoinProto = builder.build(); + + // Use optimized buffer size based on actual serialized size + int serializedSize = coinJoinProto.getSerializedSize(); + ByteArrayOutputStream output = new ByteArrayOutputStream(serializedSize); + + // Use optimized CodedOutputStream buffer size - larger than default for better performance + int bufferSize = Math.max(64 * 1024, serializedSize / 4); // At least 64KB or 25% of data size + try { + final CodedOutputStream codedOutput = CodedOutputStream.newInstance(output, bufferSize); coinJoinProto.writeTo(codedOutput); codedOutput.flush(); - return output.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } + byte[] result = output.toByteArray(); + long ioEndTime = System.nanoTime(); + long ioMs = (ioEndTime - ioStartTime) / 1_000_000; + long totalTime = System.nanoTime(); + long totalMs = (totalTime - startTime) / 1_000_000; + + log.info("CoinJoinExtension serialization: {}ms total (keys:{}ms[{}], cache:{}ms[{}], salt:{}ms, io:{}ms, size:{}bytes, buffer:{}KB)", + totalMs, keysMs, keys.size(), cacheMs, cacheEntries, saltMs, ioMs, result.length, bufferSize/1024); + + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to serialize CoinJoinExtension", e); + } } /** diff --git a/core/src/main/java/org/bitcoinj/wallet/Wallet.java b/core/src/main/java/org/bitcoinj/wallet/Wallet.java index f735cd153..d08c4abde 100644 --- a/core/src/main/java/org/bitcoinj/wallet/Wallet.java +++ b/core/src/main/java/org/bitcoinj/wallet/Wallet.java @@ -167,6 +167,11 @@ public class Wallet extends BaseTaggableObject // Used to speed up various calculations. protected final HashSet myUnspents = Sets.newHashSet(); + // Index mapping outpoints to transaction hashes that spend them. + // Used to make findDoubleSpendsAgainst() O(I) instead of O(W × I). + @GuardedBy("lock") + private final Map> spentOutpointsIndex = new HashMap<>(); + // Transactions that were dropped by the risk analysis system. These are not in any pools and not serialized // to disk. We have to keep them around because if we ignore a tx because we think it will never confirm, but // then it actually does confirm and does so within the same network session, remote peers will not resend us @@ -260,6 +265,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { @Nullable protected volatile UTXOProvider vUTXOProvider; // keep track of locked outputs to prevent double spends @GuardedBy("lock") protected HashSet lockedOutputs = Sets.newHashSet(); + // save now on blocks with transactions + private boolean saveOnNextBlock = true; /** @@ -851,11 +858,20 @@ public boolean removeKey(ECKey key) { * Returns the number of keys in the key chain group, including lookahead keys. */ public int getKeyChainGroupSize() { - keyChainGroupLock.lock(); + lock.lock(); try { - return keyChainGroup.numKeys(); + keyChainGroupLock.lock(); + try { + int walletKeys = keyChainGroup.numKeys(); + if (receivingFromFriendsGroup != null) walletKeys += receivingFromFriendsGroup.numKeys(); + if (sendingToFriendsGroup != null) walletKeys += sendingToFriendsGroup.numKeys(); + for (KeyChainGroupExtension ext : keyChainExtensions.values()) walletKeys += ext.numKeys(); + return walletKeys; + } finally { + keyChainGroupLock.unlock(); + } } finally { - keyChainGroupLock.unlock(); + lock.unlock(); } } @@ -1841,6 +1857,14 @@ protected void saveNow() { } } + public void setSaveOnNextBlock(boolean saveOnNextBlock) { + this.saveOnNextBlock = saveOnNextBlock; + } + + public boolean getSaveOnNextBlock() { + return saveOnNextBlock; + } + /** * Uses protobuf serialization to save the wallet to the given file stream. To learn more about this file format, see * {@link WalletProtobufSerializer}. @@ -1911,23 +1935,18 @@ public boolean isConsistent() { public void isConsistentOrThrow() throws IllegalStateException { lock.lock(); try { - Set transactions = getTransactions(true); + // Calculate expected size first (no object allocation) + int expectedSize = unspent.size() + spent.size() + pending.size() + dead.size(); - Set hashes = new HashSet<>(); - for (Transaction tx : transactions) { - hashes.add(tx.getTxId()); - } + // getTransactions() returns a Set which automatically deduplicates by txId + // since Transaction.equals() and hashCode() are based on getTxId() + Set transactions = getTransactions(true); - int size1 = transactions.size(); - if (size1 != hashes.size()) { + // If sizes differ, there were duplicate transactions + if (transactions.size() != expectedSize) { throw new IllegalStateException("Two transactions with same hash"); } - int size2 = unspent.size() + spent.size() + pending.size() + dead.size(); - if (size1 != size2) { - throw new IllegalStateException("Inconsistent wallet sizes: " + size1 + ", " + size2); - } - for (Transaction tx : unspent.values()) { if (!isTxConsistent(tx, false)) { throw new IllegalStateException("Inconsistent unspent tx: " + tx.getTxId()); @@ -2233,29 +2252,40 @@ public boolean isTransactionRelevant(Transaction tx) throws ScriptException { private Set findDoubleSpendsAgainst(Transaction tx, Map candidates) { checkState(lock.isHeldByCurrentThread()); if (tx.isCoinBase()) return Sets.newHashSet(); - // Compile a set of outpoints that are spent by tx. - HashSet outpoints = new HashSet<>(); - for (TransactionInput input : tx.getInputs()) { - outpoints.add(input.getOutpoint()); - } - // Now for each pending transaction, see if it shares any outpoints with this tx. + + // Use the index for O(I) lookup instead of O(W × I) scan + // For each input in tx, look up which transactions spend the same outpoint Set doubleSpendTxns = Sets.newHashSet(); - for (Transaction p : candidates.values()) { - if (p.equals(tx)) - continue; - for (TransactionInput input : p.getInputs()) { - // This relies on the fact that TransactionOutPoint equality is defined at the protocol not object - // level - outpoints from two different inputs that point to the same output compare the same. - TransactionOutPoint outpoint = input.getOutpoint(); - if (outpoints.contains(outpoint)) { - // It does, it's a double spend against the candidates, which makes it relevant. - doubleSpendTxns.add(p); + for (TransactionInput input : tx.getInputs()) { + TransactionOutPoint outpoint = input.getOutpoint(); + Set txHashes = spentOutpointsIndex.get(outpoint); + if (txHashes != null) { + for (Sha256Hash hash : txHashes) { + Transaction walletTx = candidates.get(hash); + if (walletTx != null && !walletTx.equals(tx)) { + doubleSpendTxns.add(walletTx); + } } } } return doubleSpendTxns; } + private void removeFromSpentOutpointsIndex(Transaction tx) { + checkState(lock.isHeldByCurrentThread()); + for (TransactionInput input : tx.getInputs()) { + TransactionOutPoint outpoint = input.getOutpoint(); + Set txHashes = spentOutpointsIndex.get(outpoint); + if (txHashes != null) { + txHashes.remove(tx.getTxId()); + if (txHashes.isEmpty()) { + spentOutpointsIndex.remove(outpoint); + } + } + } + } + + /** * Adds to txSet all the txns in txPool spending outputs of txns in txSet, * and all txns spending the outputs of those txns, recursively. @@ -2574,7 +2604,7 @@ public void notifyNewBestBlock(StoredBlock block) throws VerificationException { informConfidenceListenersIfNotReorganizing(); maybeQueueOnWalletChanged(); - if (hardSaveOnNextBlock) { + if (hardSaveOnNextBlock && saveOnNextBlock) { saveNow(); hardSaveOnNextBlock = false; } else { @@ -2773,6 +2803,7 @@ private void killTxns(Set txnsToKill, @Nullable Transaction overrid pending.remove(tx.getTxId()); unspent.remove(tx.getTxId()); spent.remove(tx.getTxId()); + removeFromSpentOutpointsIndex(tx); addWalletTransaction(Pool.DEAD, tx); for (TransactionInput deadInput : tx.getInputs()) { Transaction connected = deadInput.getConnectedTransaction(); @@ -3398,6 +3429,11 @@ protected void addWalletTransaction(Pool pool, Transaction tx) { myUnspents.add(output); } } + // Update spent outpoints index + for (TransactionInput input : tx.getInputs()) { + TransactionOutPoint outpoint = input.getOutpoint(); + spentOutpointsIndex.computeIfAbsent(outpoint, k -> new HashSet<>()).add(tx.getTxId()); + } // This is safe even if the listener has been added before, as TransactionConfidence ignores duplicate // registration requests. That makes the code in the wallet simpler. tx.getConfidence().addEventListener(Threading.SAME_THREAD, txConfidenceListener); @@ -3508,6 +3544,7 @@ public void reset() { lock.lock(); try { clearTransactions(); + spentOutpointsIndex.clear(); lastBlockSeenHash = null; lastBlockSeenHeight = -1; // Magic value for 'never'. lastBlockSeenTimeSecs = 0; diff --git a/core/src/main/java/org/bitcoinj/wallet/WalletEx.java b/core/src/main/java/org/bitcoinj/wallet/WalletEx.java index ef9ae9a37..c78062c38 100644 --- a/core/src/main/java/org/bitcoinj/wallet/WalletEx.java +++ b/core/src/main/java/org/bitcoinj/wallet/WalletEx.java @@ -614,18 +614,23 @@ public List selectCoinsGroupedByAddresses(boolean skipDenomina * Count the number of unspent outputs that have a certain value */ public int countInputsWithAmount(Coin inputValue) { - int count = 0; - for (TransactionOutput output : myUnspents) { - TransactionConfidence confidence = output.getParentTransaction().getConfidence(context); - // confirmations must be 0 or higher, not conflicted or dead - if (confidence != null && (confidence.getConfidenceType() == TransactionConfidence.ConfidenceType.PENDING || confidence.getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING)) { - // inputValue must match, the TX is mine and is not spent - if (output.getValue().equals(inputValue) && output.getSpentBy() == null) { - count++; + lock.lock(); + try { + int count = 0; + for (TransactionOutput output : myUnspents) { + TransactionConfidence confidence = output.getParentTransaction().getConfidence(context); + // confirmations must be 0 or higher, not conflicted or dead + if (confidence != null && (confidence.getConfidenceType() == TransactionConfidence.ConfidenceType.PENDING || confidence.getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING)) { + // inputValue must match, the TX is mine and is not spent + if (output.getValue().equals(inputValue) && output.getSpentBy() == null) { + count++; + } } } + return count; + } finally { + lock.unlock(); } - return count; } /** locks an unspent outpoint so that it cannot be spent */ diff --git a/core/src/main/java/org/bitcoinj/wallet/WalletFiles.java b/core/src/main/java/org/bitcoinj/wallet/WalletFiles.java index 03448731c..d85730f03 100644 --- a/core/src/main/java/org/bitcoinj/wallet/WalletFiles.java +++ b/core/src/main/java/org/bitcoinj/wallet/WalletFiles.java @@ -138,7 +138,7 @@ private void saveNowInternal() throws IOException { if (listener != null) listener.onAfterAutoSave(file); watch.stop(); - log.info("Save completed in {}", watch); + log.info("Save completed in {} ({} bytes); {}", watch, file.length(), file); } /** Queues up a save in the background. Useful for not very important wallet changes. */ diff --git a/core/src/main/java/org/bitcoinj/wallet/WalletProtobufSerializer.java b/core/src/main/java/org/bitcoinj/wallet/WalletProtobufSerializer.java index 234398434..f770f8f06 100644 --- a/core/src/main/java/org/bitcoinj/wallet/WalletProtobufSerializer.java +++ b/core/src/main/java/org/bitcoinj/wallet/WalletProtobufSerializer.java @@ -95,6 +95,7 @@ public class WalletProtobufSerializer { private boolean requireMandatoryExtensions = true; private boolean requireAllExtensionsKnown = false; private int walletWriteBufferSize = CodedOutputStream.DEFAULT_BUFFER_SIZE; + private boolean useAdaptiveBufferSizing = true; public interface WalletFactory { Wallet create(NetworkParameters params, KeyChainGroup keyChainGroup); @@ -144,6 +145,47 @@ public void setRequireAllExtensionsKnown(boolean value) { */ public void setWalletWriteBufferSize(int walletWriteBufferSize) { this.walletWriteBufferSize = walletWriteBufferSize; + this.useAdaptiveBufferSizing = false; + } + + /** + * Enable or disable adaptive buffer sizing based on wallet size. When enabled, buffer size + * is automatically chosen based on the number of transactions and size of the wallet. + * @param useAdaptiveBufferSizing true to enable adaptive sizing, false to use fixed buffer size + */ + public void setUseAdaptiveBufferSizing(boolean useAdaptiveBufferSizing) { + this.useAdaptiveBufferSizing = useAdaptiveBufferSizing; + } + + /** + * Calculate optimal buffer size based on wallet characteristics. + * Small wallets: 8KB, Medium wallets: 16KB, Large wallets: 64KB+ + */ + private int calculateOptimalBufferSize(Wallet wallet) { + if (!useAdaptiveBufferSizing) { + return walletWriteBufferSize; + } + + int transactionCount = wallet.getTransactions(false).size(); + int watchedScriptCount = wallet.getWatchedScripts().size(); + int keyCount = wallet.getKeyChainGroupSize(); + + // Estimate wallet complexity score + int complexityScore = transactionCount + (watchedScriptCount * 2) + keyCount; + + if (complexityScore < 100) { + // Small wallet: 8KB buffer + return 8 * 1024; + } else if (complexityScore < 1000) { + // Medium wallet: 16KB buffer + return 16 * 1024; + } else if (complexityScore < 5000) { + // Large wallet: 32KB buffer + return 32 * 1024; + } else { + // Very large wallet: 64KB buffer + return 64 * 1024; + } } /** @@ -152,10 +194,29 @@ public void setWalletWriteBufferSize(int walletWriteBufferSize) { * Equivalent to {@code walletToProto(wallet).writeTo(output);} */ public void writeWallet(Wallet wallet, OutputStream output) throws IOException { + // Measure serialization time (wallet -> protobuf) + long serializationStart = System.nanoTime(); Protos.Wallet walletProto = walletToProto(wallet); - final CodedOutputStream codedOutput = CodedOutputStream.newInstance(output, this.walletWriteBufferSize); + long serializationEnd = System.nanoTime(); + long serializationMs = (serializationEnd - serializationStart) / 1_000_000; + + // Measure I/O time (protobuf -> stream) + final int bufferSize = calculateOptimalBufferSize(wallet); + final CodedOutputStream codedOutput = CodedOutputStream.newInstance(output, bufferSize); + + long ioStart = System.nanoTime(); walletProto.writeTo(codedOutput); codedOutput.flush(); + long ioEnd = System.nanoTime(); + long ioMs = (ioEnd - ioStart) / 1_000_000; + + // Log detailed timing breakdown + long totalMs = serializationMs + ioMs; + int serializationPercent = (int)((serializationMs * 100) / Math.max(totalMs, 1)); + int ioPercent = (int)((ioMs * 100) / Math.max(totalMs, 1)); + + log.info("writeWallet timing: {}ms total ({}ms/{}% serialization, {}ms/{}% I/O) buffer={}KB", + totalMs, serializationMs, serializationPercent, ioMs, ioPercent, bufferSize/1024); } /** diff --git a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java index ea906e38e..0761b59bb 100644 --- a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java +++ b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java @@ -884,4 +884,31 @@ public void testMaxOfMostFreq() throws Exception { assertEquals(1, PeerGroup.maxOfMostFreq(Arrays.asList(1, 1, 2, 2, 1))); assertEquals(-1, PeerGroup.maxOfMostFreq(Arrays.asList(-1, -1, 2, 2, -1))); } + + @Test + public void testForceStop() throws Exception { + peerGroup.start(); + connectPeer(1); + + assertTrue(peerGroup.isRunning()); + + long startTime = System.currentTimeMillis(); + peerGroup.forceStop(10_000); + long endTime = System.currentTimeMillis(); + + assertTrue("Force stop should take less than the wait time", endTime - startTime <= 10_000); + assertFalse(peerGroup.isRunning()); + } + + @Test + public void testForceStopWhenNotRunning() throws Exception { + assertFalse(peerGroup.isRunning()); + + long startTime = System.currentTimeMillis(); + peerGroup.forceStop(50); + long endTime = System.currentTimeMillis(); + + assertTrue("Force stop should still wait even when not running", endTime - startTime >= 50); + assertFalse(peerGroup.isRunning()); + } } diff --git a/core/src/test/java/org/bitcoinj/store/WalletProtobufSerializerTest.java b/core/src/test/java/org/bitcoinj/store/WalletProtobufSerializerTest.java index 7a3cde00a..72b3efb9b 100644 --- a/core/src/test/java/org/bitcoinj/store/WalletProtobufSerializerTest.java +++ b/core/src/test/java/org/bitcoinj/store/WalletProtobufSerializerTest.java @@ -160,7 +160,7 @@ public void oneTx() throws Exception { assertEquals(Protos.Key.Type.ORIGINAL, walletProto.getKey(0).getType()); assertEquals(0, walletProto.getExtensionCount()); assertEquals(1, walletProto.getTransactionCount()); - assertEquals(6, walletProto.getKeyCount()); + assertEquals(18, walletProto.getKeyCount()); Protos.Transaction t1p = walletProto.getTransaction(0); assertEquals(0, t1p.getBlockHashCount()); diff --git a/core/src/test/java/org/bitcoinj/wallet/LargeCoinJoinWalletTest.java b/core/src/test/java/org/bitcoinj/wallet/LargeCoinJoinWalletTest.java index fd3d6fdc2..f8acca251 100644 --- a/core/src/test/java/org/bitcoinj/wallet/LargeCoinJoinWalletTest.java +++ b/core/src/test/java/org/bitcoinj/wallet/LargeCoinJoinWalletTest.java @@ -19,15 +19,22 @@ import com.google.common.base.Stopwatch; import org.bitcoinj.core.Coin; import org.bitcoinj.core.Context; +import org.bitcoinj.core.Transaction; import org.bitcoinj.params.TestNet3Params; import org.bitcoinj.utils.BriefLogFormatter; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.List; import static org.junit.Assert.assertEquals; @@ -39,8 +46,8 @@ public class LargeCoinJoinWalletTest { @Before public void setup() { - BriefLogFormatter.initWithSilentBitcoinJ(); - try (InputStream is = getClass().getResourceAsStream("coinjoin-large.wallet")) { + BriefLogFormatter.initVerbose(); + try (InputStream is = getClass().getResourceAsStream("coinjoin-cache.wallet")) { Stopwatch watch = Stopwatch.createStarted(); wallet = (WalletEx) new WalletProtobufSerializer().readWallet(is); info("loading wallet: {}; {} transactions", watch, wallet.getTransactionCount(true)); @@ -65,7 +72,7 @@ public void coinJoinInfoTest() { @Test public void balanceAndMixingProgressTest() { Stopwatch watch0 = Stopwatch.createStarted(); - assertEquals(Coin.valueOf(13662399906L), wallet.getBalance(Wallet.BalanceType.ESTIMATED)); + assertEquals(Coin.valueOf(16724708510L), wallet.getBalance(Wallet.BalanceType.ESTIMATED)); info("getBalance(ESTIMATED): {}", watch0); Stopwatch watch1 = Stopwatch.createStarted(); @@ -92,11 +99,11 @@ public void transactionReportTest() { @Test public void walletToStringTest() { Stopwatch watch0 = Stopwatch.createStarted(); - wallet.toString(true, false, true, null); + wallet.toString(false, false, true, null); info("wallet.toString: {}", watch0); Stopwatch watch1 = Stopwatch.createStarted(); - wallet.toString(true, false, true, null); + wallet.toString(false, false, true, null); info("wallet.toString: {}", watch1); Stopwatch watch2 = Stopwatch.createStarted(); @@ -131,6 +138,215 @@ public void getTransactionsTest() { info("wallet.getWalletTransactions(): {}", watch3); } + @Test + public void walletSavePerformanceTest() throws IOException { + // Show wallet statistics + int transactionCount = wallet.getTransactionCount(true); + int watchedScriptCount = wallet.getWatchedScripts().size(); + int keyCount = wallet.getKeyChainGroupSize(); + + info("=== Wallet Statistics ==="); + info("Transactions: {}", transactionCount); + info("Watched Scripts: {}", watchedScriptCount); + info("Keys: {}", keyCount); + info("Complexity Score: {}", transactionCount + (watchedScriptCount * 2) + keyCount); + + // Test different buffer sizes + int[] bufferSizes = { + 1024, // 1KB - Very small + 4096, // 4KB - Original default + 8192, // 8KB - Small wallet adaptive + 16384, // 16KB - Medium wallet adaptive + 32768, // 32KB - Large wallet adaptive + 65536, // 64KB - Very large wallet adaptive + 131072, // 128KB - Extra large + 262144 // 256KB - Maximum reasonable + }; + + String[] sizeNames = {"1KB", "4KB", "8KB", "16KB", "32KB", "64KB", "128KB", "256KB"}; + + info("\n=== Fixed Buffer Size Performance Comparison ==="); + + long[] averageTimes = new long[bufferSizes.length]; + + for (int bufferIndex = 0; bufferIndex < bufferSizes.length; bufferIndex++) { + int bufferSize = bufferSizes[bufferIndex]; + String sizeName = sizeNames[bufferIndex]; + + WalletProtobufSerializer serializer = new WalletProtobufSerializer(); + serializer.setUseAdaptiveBufferSizing(false); // Disable adaptive sizing + serializer.setWalletWriteBufferSize(bufferSize); + + // Warm up JVM for this buffer size + for (int i = 0; i < 2; i++) { + ByteArrayOutputStream warmupStream = new ByteArrayOutputStream(); + serializer.writeWallet(wallet, warmupStream); + } + + // Measure performance for this buffer size + long[] times = new long[10]; + for (int i = 0; i < 10; i++) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Stopwatch watch = Stopwatch.createStarted(); + serializer.writeWallet(wallet, stream); + watch.stop(); + times[i] = watch.elapsed().toMillis(); + } + + long avgTime = java.util.Arrays.stream(times).sum() / times.length; + averageTimes[bufferIndex] = avgTime; + + info("{} buffer: {} ms avg (runs: {} {} {} {} {} {} {} {} {} {})", + sizeName, avgTime, times[0], times[1], times[2], times[3], times[4], + times[5], times[6], times[7], times[8], times[9]); + } + + // Test with adaptive buffer sizing + WalletProtobufSerializer adaptiveSerializer = new WalletProtobufSerializer(); + adaptiveSerializer.setUseAdaptiveBufferSizing(true); // Enable adaptive sizing (default) + + info("\n=== Adaptive Buffer Sizing ==="); + + // Warm up JVM + for (int i = 0; i < 3; i++) { + ByteArrayOutputStream warmupStream = new ByteArrayOutputStream(); + adaptiveSerializer.writeWallet(wallet, warmupStream); + } + + // Measure adaptive performance + long[] adaptiveTimes = new long[10]; + for (int i = 0; i < 10; i++) { + ByteArrayOutputStream adaptiveStream = new ByteArrayOutputStream(); + Stopwatch adaptiveWatch = Stopwatch.createStarted(); + adaptiveSerializer.writeWallet(wallet, adaptiveStream); + adaptiveWatch.stop(); + adaptiveTimes[i] = adaptiveWatch.elapsed().toMillis(); + } + + long adaptiveAvg = java.util.Arrays.stream(adaptiveTimes).sum() / adaptiveTimes.length; + info("Adaptive sizing: {} ms avg (runs: {} {} {} {} {} {} {} {} {} {})", + adaptiveAvg, adaptiveTimes[0], adaptiveTimes[1], adaptiveTimes[2], adaptiveTimes[3], adaptiveTimes[4], + adaptiveTimes[5], adaptiveTimes[6], adaptiveTimes[7], adaptiveTimes[8], adaptiveTimes[9]); + + // Find the best fixed buffer size + int bestFixedIndex = 0; + long bestFixedTime = averageTimes[0]; + for (int i = 1; i < averageTimes.length; i++) { + if (averageTimes[i] < bestFixedTime) { + bestFixedTime = averageTimes[i]; + bestFixedIndex = i; + } + } + + // Compare with original 4KB (index 1) + long originalTime = averageTimes[1]; // 4KB is at index 1 + + info("\n=== Performance Analysis ==="); + info("Original 4KB buffer: {} ms", originalTime); + info("Best fixed buffer ({}): {} ms", sizeNames[bestFixedIndex], bestFixedTime); + info("Adaptive buffer: {} ms", adaptiveAvg); + + double improvementOverOriginal = ((double)(originalTime - adaptiveAvg) / originalTime) * 100; + double improvementOverBest = ((double)(bestFixedTime - adaptiveAvg) / bestFixedTime) * 100; + + info("Adaptive vs Original 4KB: {:.1f}% improvement", improvementOverOriginal); + info("Adaptive vs Best Fixed: {:.1f}% improvement", improvementOverBest); + + // Show the actual buffer size chosen by adaptive algorithm + info("\n=== Adaptive Algorithm Details ==="); + // Calculate what the adaptive algorithm chose + int complexityScore = transactionCount + (watchedScriptCount * 2) + keyCount; + int chosenBufferSize; + if (complexityScore < 100) { + chosenBufferSize = 8 * 1024; + } else if (complexityScore < 1000) { + chosenBufferSize = 16 * 1024; + } else if (complexityScore < 5000) { + chosenBufferSize = 32 * 1024; + } else { + chosenBufferSize = 64 * 1024; + } + + info("Complexity score: {}", complexityScore); + info("Adaptive algorithm chose: {} KB buffer", chosenBufferSize / 1024); + + // Create performance comparison chart + info("\n=== Performance Chart (relative to 4KB baseline) ==="); + for (int i = 0; i < bufferSizes.length; i++) { + double relativePerformance = (double)originalTime / averageTimes[i]; + String bar = createPerformanceBar(relativePerformance); + info("{}: {:.2f}x {} ({} ms)", sizeNames[i], relativePerformance, bar, averageTimes[i]); + } + + double adaptiveRelative = (double)originalTime / adaptiveAvg; + String adaptiveBar = createPerformanceBar(adaptiveRelative); + info("Adaptive: {:.2f}x {} ({} ms)", adaptiveRelative, adaptiveBar, adaptiveAvg); + + // Save files with different buffer sizes for actual file system testing + File tempDir = new File(System.getProperty("java.io.tmpdir")); + info("\n=== File System Performance Test ==="); + + // Test original vs adaptive with actual file I/O + WalletProtobufSerializer originalSerializer = new WalletProtobufSerializer(); + originalSerializer.setUseAdaptiveBufferSizing(false); + originalSerializer.setWalletWriteBufferSize(4096); + + File originalFile = new File(tempDir, "wallet-original-4kb.dat"); + File adaptiveFile = new File(tempDir, "wallet-adaptive.dat"); + try { + Stopwatch originalFileWatch = Stopwatch.createStarted(); + try (FileOutputStream originalFileStream = new FileOutputStream(originalFile)) { + originalSerializer.writeWallet(wallet, originalFileStream); + } + originalFileWatch.stop(); + + Stopwatch adaptiveFileWatch = Stopwatch.createStarted(); + try (FileOutputStream adaptiveFileStream = new FileOutputStream(adaptiveFile)) { + adaptiveSerializer.writeWallet(wallet, adaptiveFileStream); + } + adaptiveFileWatch.stop(); + + info("Original 4KB file save: {} ms", originalFileWatch.elapsed().toMillis()); + info("Adaptive file save: {} ms", adaptiveFileWatch.elapsed().toMillis()); + info("File sizes - Original: {} bytes, Adaptive: {} bytes", originalFile.length(), adaptiveFile.length()); + + double fileImprovementPercent = ((double) (originalFileWatch.elapsed().toMillis() - adaptiveFileWatch.elapsed().toMillis()) / originalFileWatch.elapsed().toMillis()) * 100; + info("File I/O improvement: {:.1f}%", fileImprovementPercent); + } finally { + // Clean up temp files + if (originalFile.exists()) originalFile.delete(); + if (adaptiveFile.exists()) adaptiveFile.delete(); + } + // Final summary + info("\n=== CONCLUSION ==="); + if (adaptiveAvg <= bestFixedTime) { + info("✓ Adaptive sizing performs as well as or better than the best fixed buffer size!"); + } else { + info("⚠ Adaptive sizing is close to optimal but {} ms slower than best fixed size", adaptiveAvg - bestFixedTime); + } + + if (improvementOverOriginal > 0) { + info("✓ Adaptive sizing improves performance by {:.1f}% over original implementation", improvementOverOriginal); + } else { + info("⚠ No significant improvement over original implementation"); + } + } + + private String createPerformanceBar(double relativePerformance) { + int barLength = Math.min(20, (int)(relativePerformance * 10)); + StringBuilder bar = new StringBuilder(); + for (int i = 0; i < barLength; i++) { + if (i < 10) { + bar.append("█"); + } else if (i < 15) { + bar.append("▓"); + } else { + bar.append("░"); + } + } + return bar.toString(); + } + public static void info(String format, Object... args) { log("INFO", format, args); } @@ -198,4 +414,159 @@ private static String formatMessage(String format, Object... args) { return result; } + + @Test @Ignore + public void walletConsistencyAndCachingPerformanceTest() throws IOException, UnreadableWalletException { + info("=== Wallet Consistency and Caching Performance Test ==="); + + // Save the original wallet to get baseline data + ByteArrayOutputStream originalStream = new ByteArrayOutputStream(); + WalletProtobufSerializer serializer = new WalletProtobufSerializer(); + + Stopwatch originalSaveWatch = Stopwatch.createStarted(); + serializer.writeWallet(wallet, originalStream); + originalSaveWatch.stop(); + byte[] originalBytes = originalStream.toByteArray(); + + info("Original wallet save: {} ms, size: {} bytes", originalSaveWatch.elapsed().toMillis(), originalBytes.length); + + // Load the wallet from the saved bytes + WalletProtobufSerializer loader = new WalletProtobufSerializer(); + Stopwatch loadWatch = Stopwatch.createStarted(); + WalletEx loadedWallet = (WalletEx) loader.readWallet(new ByteArrayInputStream(originalBytes)); + loadWatch.stop(); + + info("Wallet load: {} ms", loadWatch.elapsed().toMillis()); + + // Verify basic properties match + assertEquals("Transaction count should match", wallet.getTransactionCount(true), loadedWallet.getTransactionCount(true)); + assertEquals("Balance should match", wallet.getBalance(Wallet.BalanceType.ESTIMATED), loadedWallet.getBalance(Wallet.BalanceType.ESTIMATED)); + + // Now save the loaded wallet 3 times and track performance and consistency + long[] saveTimes = new long[3]; + WalletEx[] reloadedWallets = new WalletEx[3]; + + for (int i = 0; i < 3; i++) { + ByteArrayOutputStream saveStream = new ByteArrayOutputStream(); + + Stopwatch saveWatch = Stopwatch.createStarted(); + serializer.writeWallet(loadedWallet, saveStream); + saveWatch.stop(); + + saveTimes[i] = saveWatch.elapsed().toMillis(); + byte[] savedBytes = saveStream.toByteArray(); + + // Immediately reload the wallet to verify consistency + reloadedWallets[i] = (WalletEx) loader.readWallet(new ByteArrayInputStream(savedBytes)); + + info("Save #{}: {} ms, size: {} bytes", i + 1, saveTimes[i], savedBytes.length); + } + + // Verify all reloaded wallets have identical transaction content + for (int i = 1; i < 3; i++) { + compareWallets(reloadedWallets[0], reloadedWallets[i], i + 1); + } + + // Analyze performance improvements from caching + long firstSaveTime = saveTimes[0]; + long bestSubsequentTime = Math.min(saveTimes[1], saveTimes[2]); + long avgSubsequentTime = (saveTimes[1] + saveTimes[2]) / 2; + + info("\n=== Performance Analysis ==="); + info("First save (cache misses): {} ms", firstSaveTime); + info("Second save: {} ms", saveTimes[1]); + info("Third save: {} ms", saveTimes[2]); + info("Best subsequent save: {} ms", bestSubsequentTime); + info("Average subsequent saves: {} ms", avgSubsequentTime); + + if (firstSaveTime > 0) { + double improvementPercent = ((double)(firstSaveTime - bestSubsequentTime) / firstSaveTime) * 100; + info("Best improvement from caching: {:.1f}%", improvementPercent); + + double avgImprovementPercent = ((double)(firstSaveTime - avgSubsequentTime) / firstSaveTime) * 100; + info("Average improvement from caching: {:.1f}%", avgImprovementPercent); + } + + // Verify the final saved wallet (reloadedWallets[2]) matches original + assertEquals("Final wallet transaction count should match original", + wallet.getTransactionCount(true), reloadedWallets[2].getTransactionCount(true)); + assertEquals("Final wallet balance should match original", + wallet.getBalance(Wallet.BalanceType.ESTIMATED), reloadedWallets[2].getBalance(Wallet.BalanceType.ESTIMATED)); + + info("\n=== Test Results ==="); + info("✓ All saves produced identical results"); + info("✓ Loaded wallet matches original wallet properties"); + info("✓ Transaction protobuf caching is working correctly"); + + // Performance expectations (these are guidelines, not strict requirements) + if (saveTimes[1] < firstSaveTime && saveTimes[2] < firstSaveTime) { + info("✓ Subsequent saves are faster than first save (caching working)"); + } else { + info("⚠ Expected subsequent saves to be faster due to caching"); + } + } + + /** + * Compare two wallets for transaction consistency + */ + private void compareWallets(WalletEx wallet1, WalletEx wallet2, int wallet2Number) { + // Compare basic properties + assertEquals("Wallet #" + wallet2Number + " transaction count should match wallet #1", + wallet1.getTransactionCount(true), wallet2.getTransactionCount(true)); + assertEquals("Wallet #" + wallet2Number + " balance should match wallet #1", + wallet1.getBalance(Wallet.BalanceType.ESTIMATED), wallet2.getBalance(Wallet.BalanceType.ESTIMATED)); + + // Compare transactions in detail + List txs1 = new java.util.ArrayList<>(wallet1.getTransactions(true)); + List txs2 = new java.util.ArrayList<>(wallet2.getTransactions(true)); + + assertEquals("Wallet #" + wallet2Number + " should have same number of transactions as wallet #1", + txs1.size(), txs2.size()); + + info("Comparing {} transactions between wallet #1 and wallet #{}", txs1.size(), wallet2Number); + + // Create a map of transactions by ID for wallet2 for easier lookup + java.util.Map txMap2 = new java.util.HashMap<>(); + for (Transaction tx : txs2) { + txMap2.put(tx.getTxId(), tx); + } + + // Compare each transaction from wallet1 with its corresponding transaction in wallet2 + for (int i = 0; i < txs1.size(); i++) { + Transaction tx1 = txs1.get(i); + Transaction tx2 = txMap2.get(tx1.getTxId()); + + // Verify the transaction exists in wallet2 + assertEquals("Transaction " + tx1.getTxId() + " should exist in wallet #" + wallet2Number, + true, tx2 != null); + + // Compare memos + assertEquals("Transaction " + tx1.getTxId() + " memo should match between wallets", + tx1.getMemo(), tx2.getMemo()); + + // Compare exchange rates + if (tx1.getExchangeRate() == null) { + assertEquals("Transaction " + tx1.getTxId() + " exchange rate should be null in both wallets", + null, tx2.getExchangeRate()); + } else if (tx2.getExchangeRate() != null) { + assertEquals("Transaction " + tx1.getTxId() + " exchange rate should match between wallets", + tx1.getExchangeRate().coin, tx2.getExchangeRate().coin); + assertEquals("Transaction " + tx1.getTxId() + " exchange rate fiat should match between wallets", + tx1.getExchangeRate().fiat, tx2.getExchangeRate().fiat); + } else { + assertEquals("Transaction " + tx1.getTxId() + " exchange rate should not be null in wallet #" + wallet2Number, + tx1.getExchangeRate(), tx2.getExchangeRate()); + } + + // Compare cached values + assertEquals("Transaction " + tx1.getTxId() + " cached value should match between wallets", + tx1.getCachedValue(), tx2.getCachedValue()); + + // Compare coinjoin transaction types + assertEquals("Transaction " + tx1.getTxId() + " coinjoin type should match between wallets", + tx1.getCoinJoinTransactionType(), tx2.getCoinJoinTransactionType()); + } + + info("✓ Wallet #{} transactions match wallet #1 perfectly", wallet2Number); + } } diff --git a/core/src/test/resources/org/bitcoinj/wallet/coinjoin-cache.wallet b/core/src/test/resources/org/bitcoinj/wallet/coinjoin-cache.wallet new file mode 100644 index 000000000..ab46e06f5 Binary files /dev/null and b/core/src/test/resources/org/bitcoinj/wallet/coinjoin-cache.wallet differ diff --git a/examples/src/main/java/org/bitcoinj/examples/RestoreFromSeedThenDump.java b/examples/src/main/java/org/bitcoinj/examples/RestoreFromSeedThenDump.java index 597a48746..536a752d3 100644 --- a/examples/src/main/java/org/bitcoinj/examples/RestoreFromSeedThenDump.java +++ b/examples/src/main/java/org/bitcoinj/examples/RestoreFromSeedThenDump.java @@ -45,7 +45,11 @@ public static void main(String[] args) throws Exception { System.out.println("RestoreFromSeedThenDump network \"seed phrase\""); System.out.println(" missing the network"); } - BriefLogFormatter.initWithSilentBitcoinJ(); + if (args.length == 3 && args[2].equals("--debuglog")) { + BriefLogFormatter.initVerbose(); + } else { + BriefLogFormatter.initWithSilentBitcoinJ(); + } String network = args[0]; NetworkParameters params;