Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a1e5cb4
fix: add maxStalls getters/setters in PeerGroup
HashEngineering Oct 29, 2025
c7cb6f8
fix: prevent concurrent modification in WalletEx.countInputsWithAmount
HashEngineering Oct 29, 2025
29bcd74
feat: add forceStop to PeerGroup
HashEngineering Oct 31, 2025
25b8bfc
fix: improve CoinJoinExtension serialization speed
HashEngineering Nov 7, 2025
16aa2c8
fix: improve FlatDB write performance
HashEngineering Nov 7, 2025
284447a
tests: add LargeCoinJoinWalletTests for saving wallet
HashEngineering Nov 7, 2025
dbc3cf2
fix: revert Peer changes for downloading headers, blocks
HashEngineering Nov 13, 2025
da1612b
feat: add TimeoutErrorListener to PeerGroup
HashEngineering Nov 18, 2025
7556925
fix: narrow the focus to peekByteArray for blockstore memory timeout
HashEngineering Nov 21, 2025
80f2a6a
fix: add handling if blocks cannot be found when verifying quorums
HashEngineering Nov 21, 2025
5c79026
fix: remove unnecessary logs
HashEngineering Nov 24, 2025
a7e30cf
tests: add coinjoin-cache.wallet file
HashEngineering Nov 25, 2025
272a076
tests: update LargeCoinJoinWalletTest to work with new coinjoin-cache…
HashEngineering Nov 25, 2025
d7d8b42
tests: update PeerGroupTest.testForceStop
HashEngineering Nov 25, 2025
12b105c
fix: make hardSaveOnNextBlock optional
HashEngineering Nov 25, 2025
ad7d588
tests: update key count with wallet
HashEngineering Nov 25, 2025
dd73c93
chore: remove comments in PeerGroup
HashEngineering Nov 25, 2025
c9cb4de
fix: update close in SimplifiedMasternodeListManager
HashEngineering Nov 25, 2025
f3726fa
tests: improve walletSavePerformanceTest
HashEngineering Nov 25, 2025
2ca30b2
fix: SimplifiedMasternodeListManager.close fix
HashEngineering Dec 4, 2025
dcf885f
fix: improve isConsistentOrThrow speed
HashEngineering Dec 4, 2025
5c23bb9
fix: improve speed of Wallet.findDoubleSpendsAgainst
HashEngineering Dec 4, 2025
8589b35
fix: add logging for DashSystem.close()
HashEngineering Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 171 additions & 7 deletions core/src/main/java/org/bitcoinj/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public class PeerGroup implements TransactionBroadcaster, GovernanceVoteBroadcas
= new CopyOnWriteArrayList<>();
protected final CopyOnWriteArrayList<ListenerRegistration<MasternodeListDownloadedListener>> masternodeListDownloadListeners
= new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ListenerRegistration<TimeoutErrorListener>> timeoutErrorListeners
= new CopyOnWriteArrayList<>();
// Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private final CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers;
// The version message to use for new connections.
Expand Down Expand Up @@ -902,7 +904,6 @@ public void addPreBlocksDownloadListener(Executor executor, PreBlocksDownloadLis
preBlocksDownloadListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor));
}

/** See {@link Peer#addOnTransactionBroadcastListener(OnTransactionBroadcastListener)} */
public void addMasternodeListDownloadListener(Executor executor, MasternodeListDownloadedListener listener) {
masternodeListDownloadListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor));
for (Peer peer : getConnectedPeers())
Expand All @@ -911,6 +912,14 @@ public void addMasternodeListDownloadListener(Executor executor, MasternodeListD
peer.addMasternodeListDownloadedListener(executor, listener);
}

public void addTimeoutErrorListener(Executor executor, TimeoutErrorListener listener) {
timeoutErrorListeners.add(new ListenerRegistration<>(checkNotNull(listener), executor));
for (Peer peer : getConnectedPeers())
peer.addTimeoutErrorListener(executor, listener);
for (Peer peer : getPendingPeers())
peer.addTimeoutErrorListener(executor, listener);
}

/** See {@link Peer#addPreMessageReceivedEventListener(PreMessageReceivedEventListener)} */
public void addPreMessageReceivedEventListener(PreMessageReceivedEventListener listener) {
addPreMessageReceivedEventListener(Threading.USER_THREAD, listener);
Expand Down Expand Up @@ -1022,6 +1031,16 @@ public boolean removeMasternodeListDownloadedListener(MasternodeListDownloadedLi
return result;
}

/** The given event listener will no longer be called with events. */
public boolean removeTimeoutErrorListener(TimeoutErrorListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, timeoutErrorListeners);
for (Peer peer : getConnectedPeers())
peer.removeTimeoutErrorListener(listener);
for (Peer peer : getPendingPeers())
peer.removeTimeoutErrorListener(listener);
return result;
}

public boolean removePreMessageReceivedEventListener(PreMessageReceivedEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peersPreMessageReceivedEventListeners);
for (Peer peer : getConnectedPeers())
Expand Down Expand Up @@ -1279,6 +1298,56 @@ public void start() {
public ListenableFuture stopAsync() {
checkState(vRunning);
vRunning = false;

// Log executor state and remaining jobs
log.info("stopAsync() called - executor shutdown: {}, terminated: {}",
executor.isShutdown(), executor.isTerminated());

// The executor is wrapped by MoreExecutors.listeningDecorator, need to access the underlying executor
if (executor instanceof ListeningScheduledExecutorService) {
// Try to get the underlying ScheduledThreadPoolExecutor
try {
// Use reflection to access the delegate field in ListeningDecorator
java.lang.reflect.Field delegateField = executor.getClass().getDeclaredField("delegate");
delegateField.setAccessible(true);
Object delegate = delegateField.get(executor);

if (delegate instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) delegate;
log.info("Executor queue size: {}, active threads: {}",
stpe.getQueue().size(), stpe.getActiveCount());

// Log remaining jobs in queue
stpe.getQueue().forEach(job -> {
log.info("Remaining job: {} (class: {})", job.toString(), job.getClass().getSimpleName());
});

// Get call stacks of executor threads
ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
while (rootGroup.getParent() != null) {
rootGroup = rootGroup.getParent();
}

Thread[] threads = new Thread[rootGroup.activeCount()];
rootGroup.enumerate(threads);

for (Thread thread : threads) {
if (thread != null && thread.getName().contains("PeerGroup Thread")) {
log.info("Found PeerGroup thread: {} (state: {})", thread.getName(), thread.getState());
StackTraceElement[] stackTrace = thread.getStackTrace();
log.info("Stack trace for thread {}:", thread.getName());
for (int i = 0; i < Math.min(stackTrace.length, 15); i++) {
log.info(" at {}", stackTrace[i]);
}
}
}
}
} catch (Exception e) {
log.info("Could not access underlying executor details: {}", e.getMessage());
}
}

log.info("About to submit shutdown task to executor");
ListenableFuture future = executor.submit(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -1320,6 +1389,52 @@ public void stop() {
}
}

public void forceStop(int waitMsBeforeShutdown) {
try {
Stopwatch watch = Stopwatch.createStarted();
// we may have already called this function
if (vRunning) {
stopAsync();
}
log.info("Awaiting PeerGroup shutdown ... (forcing after {} second(s))", waitMsBeforeShutdown/1000);
boolean result = executor.awaitTermination(waitMsBeforeShutdown, TimeUnit.MILLISECONDS);
log.info("... took {} (timed out: {})", watch, !result);
if (!result) {
log.info("PeerGroup shutdown timed out after {}, forcing shutdown now",
watch);
List<Runnable> remainingJobs = executor.shutdownNow();
// Optional: wait a brief moment for forced shutdown
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
remainingJobs.forEach(job -> {
String jobDescription = getJobDescription(job);
log.info("Remaining job: {} (class: {}) - {}", job.toString(), job.getClass().getSimpleName(), jobDescription);
});
}

log.info("PeerGroup shutdown completed in {}", watch);
} catch (SecurityException e) {
log.info("failure to force stop remaining jobs");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private String getJobDescription(Runnable job) {
if (job == triggerConnectionsJob) {
return "Peer connection management";
} else if (job instanceof ChainDownloadSpeedCalculator) {
return "Chain download speed monitoring";
} else if (job.toString().contains("ping")) {
return "Peer ping task";
} else if (job.toString().contains("recalculate")) {
return "Bloom filter recalculation";
} else if (job.toString().contains("listener")) {
return "Event listener task";
} else {
return "Unknown task";
}
}

/**
* Gracefully drops all connected peers.
*/
Expand Down Expand Up @@ -1607,6 +1722,9 @@ protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, i
peer.addConnectedEventListener(Threading.SAME_THREAD, startupListener);
peer.addDisconnectedEventListener(Threading.SAME_THREAD, startupListener);
peer.setMinProtocolVersion(vMinRequiredProtocolVersion);
// Add timeout error listeners early so they're available even if peer times out during connection
for (ListenerRegistration<TimeoutErrorListener> registration : timeoutErrorListeners)
peer.addTimeoutErrorListener(registration.executor, registration.listener);
pendingPeers.add(peer);

try {
Expand Down Expand Up @@ -1968,6 +2086,8 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) {
peer.removePreMessageReceivedEventListener(registration.listener);
for (ListenerRegistration<MasternodeListDownloadedListener> registration: masternodeListDownloadListeners)
peer.removeMasternodeListDownloadedListener(registration.listener);
for (ListenerRegistration<TimeoutErrorListener> registration: timeoutErrorListeners)
peer.removeTimeoutErrorListener(registration.listener);
for (ListenerRegistration<OnTransactionBroadcastListener> registration : peersTransactionBroadastEventListeners)
peer.removeOnTransactionBroadcastListener(registration.listener);
for (final ListenerRegistration<PeerDisconnectedEventListener> registration : peerDisconnectedEventListeners) {
Expand All @@ -1988,6 +2108,7 @@ public void run() {

@GuardedBy("lock") private int stallPeriodSeconds = 10;
@GuardedBy("lock") private int stallMinSpeedBytesSec = Block.HEADER_SIZE * 10;
@GuardedBy("lock") private int maxStalls = 3;

/**
* Configures the stall speed: the speed at which a peer is considered to be serving us the block chain
Expand All @@ -2010,6 +2131,36 @@ public void setStallThreshold(int periodSecs, int bytesPerSecond) {
}
}

/**
* Sets the maximum number of stalls allowed before giving up on stall disconnections.
* After this many stalls, the system assumes the network is terminally slow and stops
* disconnecting peers for stalls. Defaults to 3.
*
* @param maxStalls Maximum number of stalls allowed, or 0 to disable stall disconnections entirely
*/
public void setMaxStalls(int maxStalls) {
lock.lock();
try {
this.maxStalls = maxStalls;
} finally {
lock.unlock();
}
}

/**
* Gets the maximum number of stalls allowed before giving up on stall disconnections.
*
* @return Maximum number of stalls allowed
*/
public int getMaxStalls() {
lock.lock();
try {
return maxStalls;
} finally {
lock.unlock();
}
}

private class ChainDownloadSpeedCalculator implements BlocksDownloadedEventListener, Runnable,
HeadersDownloadedEventListener, PreBlocksDownloadListener, MasternodeListDownloadedListener {
private int blocksInLastSecond, txnsInLastSecond, origTxnsInLastSecond;
Expand All @@ -2020,7 +2171,7 @@ private class ChainDownloadSpeedCalculator implements BlocksDownloadedEventListe

// If we take more stalls than this, we assume we're on some kind of terminally slow network and the
// stall threshold just isn't set properly. We give up on stall disconnects after that.
private int maxStalls = 3;
// This now uses the configurable maxStalls from the outer PeerGroup class.

// How many seconds the peer has until we start measuring its speed.
private int warmupSeconds = -1;
Expand Down Expand Up @@ -2104,17 +2255,24 @@ private void calculate() {
bytesInLastSecond / 1024.0, chainHeight, mostCommonChainHeight);
String thresholdString = String.format(Locale.US, "(threshold <%.2f KB/sec for %d seconds)",
minSpeedBytesPerSec / 1024.0, samples.length);
if (maxStalls <= 0) {
int currentMaxStalls = getMaxStalls();
if (currentMaxStalls <= 0) {
log.info(statsString + ", stall disabled " + thresholdString);
} else if (warmupSeconds > 0) {
warmupSeconds--;
if (bytesInLastSecond > 0)
log.info(statsString
+ String.format(Locale.US, " (warming up %d more seconds)", warmupSeconds));
} else if (average < minSpeedBytesPerSec && !waitForPreBlockDownload) {
log.info(statsString + ", STALLED " + thresholdString);
maxStalls--;
if (maxStalls == 0) {
log.info("{}, STALLED {}, maxStalls: {}", statsString, thresholdString, currentMaxStalls);
lock.lock();
try {
PeerGroup.this.maxStalls--;
currentMaxStalls = PeerGroup.this.maxStalls;
} finally {
lock.unlock();
}
if (currentMaxStalls == 0) {
// We could consider starting to drop the Bloom filtering FP rate at this point, because
// we tried a bunch of peers and no matter what we don't seem to be able to go any faster.
// This implies we're bandwidth bottlenecked and might want to start using bandwidth
Expand All @@ -2127,7 +2285,7 @@ private void calculate() {
Peer peer = getDownloadPeer();
log.warn(String.format(Locale.US,
"Chain download stalled: received %.2f KB/sec for %d seconds, require average of %.2f KB/sec, disconnecting %s, %d stalls left",
average / 1024.0, samples.length, minSpeedBytesPerSec / 1024.0, peer, maxStalls));
average / 1024.0, samples.length, minSpeedBytesPerSec / 1024.0, peer, currentMaxStalls));
if (peer != null) {
peer.close();
}
Expand Down Expand Up @@ -2295,6 +2453,12 @@ public void run() {
}
}

public void queueTimeoutErrorListeners(TimeoutError error, PeerAddress peerAddress) {
for (final ListenerRegistration<TimeoutErrorListener> registration : timeoutErrorListeners) {
registration.executor.execute(() -> registration.listener.onTimeout(error, peerAddress));
}
}

@VisibleForTesting
void startBlockChainDownloadFromPeer(Peer peer) {
lock.lock();
Expand Down
Loading
Loading