blockchain, netsync: implement a complete headers-first download during ibd#2428
Conversation
Pull Request Test Coverage Report for Build 19373443826Details
💛 - Coveralls |
242fa3b to
e6fadc9
Compare
|
cc: @gijswijs for review |
|
cc: @mohamedawnallah for review |
Roasbeef
left a comment
There was a problem hiding this comment.
Great set of changes! I found the commit structure very easy to follow as well.
Have performed an initial review pass, but will start to run some active tests on one of my nodes to get a better feel for the changes.
e6fadc9 to
2f4c749
Compare
|
Addressed all the review comments by roasbeef |
|
Tested in the wild, and does what it says on the tin: Details``` 2025-12-17 18:32:10.885 [INF] SYNC: Downloading headers for blocks 136001 to 2401568 from peer PEER_A:18333 2025-12-17 18:32:10.885 [INF] SYNC: Lost peer PEER_B:18333 (outbound) 2025-12-17 18:32:16.309 [INF] SYNC: New valid peer PEER_C:18333 (outbound) (/Satoshi:27.1.0/) 2025-12-17 18:32:40.885 [INF] SYNC: Downloading headers for blocks 136001 to 4807558 from peer PEER_D:18333 2025-12-17 18:32:40.885 [INF] SYNC: Lost peer PEER_A:18333 (outbound) 2025-12-17 18:32:41.044 [INF] SYNC: New valid peer PEER_E:18333 (outbound) (/Satoshi:28.1.0/) 2025-12-17 18:32:41.135 [INF] SYNC: New valid peer PEER_F:18333 (outbound) (/Satoshi:27.0.0/) 2025-12-17 18:32:46.438 [INF] CHAN: Verified checkpoint at height 200000/block 0000000000287bffd321963ef05feab753ebe274e1d78b2fd4e2bfe9ad3aa6f2 2025-12-17 18:32:51.245 [INF] SYNC: New valid peer PEER_G:18333 (outbound) (/Satoshi:28.1.0/) 2025-12-17 18:32:55.171 [INF] CHAN: Verified checkpoint at height 300001/block 0000000000004829474748f3d1bc8fcf893c88be255e6d7f571c548aff57abf4 2025-12-17 18:33:03.103 [INF] CHAN: Verified checkpoint at height 400002/block 0000000005e2c73b8ecb82ae2dbc2e8274614ebad7172b53528aba7501f5a089 2025-12-17 18:33:10.885 [INF] SYNC: Downloading headers for blocks 490001 to 4807558 from peer PEER_G:18333 2025-12-17 18:33:10.885 [INF] SYNC: Lost peer PEER_D:18333 (outbound) 2025-12-17 18:33:13.135 [INF] CHAN: Verified checkpoint at height 500011/block 00000000000929f63977fbac92ff570a9bd9e7715401ee96f2848f7b07750b02 2025-12-17 18:33:25.990 [INF] CHAN: Verified checkpoint at height 600002/block 000000000001f471389afd6ee94dcace5ccc44adc18e8bff402443f034b07240 2025-12-17 18:33:38.331 [INF] CHAN: Verified checkpoint at height 700000/block 000000000000406178b12a4dea3b27e13b3c4fe4510994fd667d7c1e6a3f4dc1 2025-12-17 18:33:40.884 [INF] SYNC: Downloading headers for blocks 718001 to 4807558 from peer PEER_E:18333 2025-12-17 18:33:40.888 [INF] SYNC: Lost peer PEER_G:18333 (outbound) 2025-12-17 18:33:46.254 [INF] SYNC: New valid peer PEER_H:18333 (outbound) (/Satoshi:25.1.0/) 2025-12-17 18:33:50.353 [INF] CHAN: Verified checkpoint at height 800010/block 000000000017ed35296433190b6829db01e657d80631d43f5983fa403bfdb4c1 2025-12-17 18:34:01.433 [INF] CHAN: Verified checkpoint at height 900000/block 0000000000356f8d8924556e765b7a94aaebc6b5c8685dcfa2b1ee8b41acd89b 2025-12-17 18:34:10.884 [INF] SYNC: Downloading headers for blocks 986001 to 4807558 from peer PEER_E:18333 2025-12-17 18:34:10.884 [INF] SYNC: Lost peer PEER_E:18333 (outbound) 2025-12-17 18:34:10.884 [INF] SYNC: Downloading headers for blocks 986001 to 4807558 from peer PEER_H:18333 2025-12-17 18:34:11.269 [INF] SYNC: New valid peer PEER_I:18333 (outbound) (/Satoshi:25.1.0/) 2025-12-17 18:34:11.270 [INF] SYNC: New valid peer PEER_J:18333 (outbound) (/Satoshi:0.20.1/) 2025-12-17 18:34:13.213 [INF] CHAN: Verified checkpoint at height 1000007/block 00000000001ccb893d8a1f25b70ad173ce955e5f50124261bbbc50379a612ddf 2025-12-17 18:34:16.312 [INF] SYNC: New valid peer PEER_K:18333 (outbound) (/Satoshi:27.0.0/) 2025-12-17 18:34:25.081 [INF] CHAN: Verified checkpoint at height 1100007/block 00000000000abc7b2cd18768ab3dee20857326a818d1946ed6796f42d66dd1e8 ```One thing I noticed though is that the peers may D/C us along the way, so we rotate through a few peers to fetch all the headers. This is testnet3 , there's quite a lot of headers (over 4 million). Are we hitting something like default per-peer upload limit for IIUC this is a stepping stone for multi-peer header download, so perhaps that'll be sorted out as we'll fetch blocks of headers from many peers in parallel. |
| return headers | ||
| } | ||
|
|
||
| func TestProcessBlockHeader(t *testing.T) { |
There was a problem hiding this comment.
I guess it's better to have comments on the tests too but test files are exempt from go documentation unless if they're example functions.
I'll skip them in this round to focus on other parts of the review.
| filename := filepath.Join("testdata/", testFile) | ||
|
|
||
| file, err := os.Open(filename) | ||
| if err != nil { |
There was a problem hiding this comment.
I'll skip the require package changes so it's easier to diff with other parts of the code change from the outstanding reviews.
| for scanner.Scan() { | ||
| line := scanner.Text() | ||
| b, err := hex.DecodeString(line) | ||
| if err != nil { |
There was a problem hiding this comment.
nit: use the require package, and other places where t.Fatal was used.
There was a problem hiding this comment.
I'll skip the require package changes so it's easier to diff with other parts of the code change from the outstanding reviews.
| t.Fatal(err) | ||
| } | ||
|
|
||
| if !isMainChain { |
There was a problem hiding this comment.
yeah we can use require.True
There was a problem hiding this comment.
I'll skip the require package changes so it's easier to diff with other parts of the code change from the outstanding reviews.
netsync/manager_test.go
Outdated
| header.BlockHash()) | ||
| } | ||
| } | ||
| if err != nil { |
There was a problem hiding this comment.
I think this check is redundant.
There was a problem hiding this comment.
Removed the redundant if branch in the latest push
| require.Equal(t, test.behaviorFlags, gotFlags) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Same as above. I'll skip on these on this round
| return 0 | ||
| } | ||
|
|
||
| func TestIsInIBDMode(t *testing.T) { |
There was a problem hiding this comment.
Same as above. Since godoc skips tests I'll skip on these for now
blockchain/accept.go
Outdated
| // Orphan headers are not allowed and this function should never be called | ||
| // with the genesis block. | ||
| prevHash := &header.PrevBlock | ||
| prevNode := b.index.LookupNode(prevHash) |
There was a problem hiding this comment.
seems like a duplicate prevnode lookup, we just did it above in the if node != nil block. Maybe we should consolidate it?.
There was a problem hiding this comment.
Simplified the logic and removed the lookup in the branching in the latest push
netsync/manager.go
Outdated
| _, height := sm.chain.BestHeader() | ||
| higherPeers := sm.fetchHigherPeers(height) | ||
| var bestPeer *peerpkg.Peer | ||
| switch { |
There was a problem hiding this comment.
do we expect other cases in the future else we can just use if?
There was a problem hiding this comment.
We can just use if. Applied in the latest push
| return | ||
| bestHash, bestHeight := sm.chain.BestHeader() | ||
| if sm.ibdMode { | ||
| if bestHeight < sm.syncPeer.LastBlock() { |
There was a problem hiding this comment.
sm.syncPeer.LastBlock() here has no nil guard. syncPeer can already be nil when we enter handleHeadersMsg (e.g., if a peer disconnect was processed first), so this is a reachable panic. A quick nil check before this call would make it safe.
There was a problem hiding this comment.
Will add a nil guard
| return sm, tearDown | ||
| } | ||
|
|
||
| func TestCheckHeadersList(t *testing.T) { |
There was a problem hiding this comment.
The new tests cover the helper predicates well, but the risky control flow in handleHeadersMsg, fetchHeaderBlocks, startSync, and stall handling isn't exercised. Those are where the trickiest behavior changes live — would be great to have at least a basic end-to-end test that drives headers through to block download, so regressions in the sync state machine get caught.
There was a problem hiding this comment.
Addressed as well now in the latest push. Covered mainly by TestSyncStateMachine
abd3307 to
9339e78
Compare
|
Cool lmk when this is ready for another look! |
We add a chainview of bestHeaders so that we'll be able to keep track of headers separately from the bestChain. This is needed as we're getting headers for new block annoucements instead of invs.
Since we may now have blockNodes with just the block header stored without the data, we add a new status to account for this.
Distinguish between a block that has itself failed validation versus one that is only invalid due to an ancestor.
maybeAcceptHeader performs checks to accept block headers into the header chain. This function allows for a true headers-first download where we only accept block headers for downloading new blocks.
checkHeadersList takes in a blockhash and returns if it's a checkpointed block and the correct behavior flags for the verification of the block.
fetchHigherPeers provides a convenient function to get peers that are sync candidates and are at a higher advertised height than the passed in height.
isInIBDMode returns if the SyncManager needs to download blocks and sync to the latest chain tip. It determines if it's in ibd mode by checking if the blockchain thinks we're current and if we don't have peers that are at higher advertised blocks.
fetchHeaders picks a random peer at a higher advertised block and requests headers from them.
Instead of the old headerList based header processing, we make use of the new ProcessBlockHeader function.
between the best chain and the best header chain
headers We introduce buildBlockRequest that'll create a getdata message based off of the block index instead of the headerList in SyncManager. The new fetchHeaderBlocks utilizes buildBlockRequest and now will create fetch requests based on the processed block headers.
Refactor fetchHeaderBlocks and buildBlockRequest to take an explicit peer parameter instead of implicitly using sm.syncPeer. This makes the caller responsible for choosing which peer to fetch from and adds a nil guard to prevent a panic if the sync peer has been cleared.
This tests that after two nodes diverge and reconnect, the shorter node downloads blocks starting from the fork point rather than its current height. It verifies the reorg completes, both nodes converge, and the orphaned fork appears as a side chain via getchaintips.
ince we now utilize ProcessBlockHeaders, we change the startSync function to utilize the block index for downloading blocks/headers instead of using the headerList in SyncManager.
handleBlockMsg is updated to not be based off of the headerList and sm.nextCheckpoint when doing operations related to checkpoints and headers.
Since we no longer utilize the headerList for doing headers-first download and checkpoint tracking, we remove related code.
ibdMode is a more fitting name than headersFirstMode since all blocks are downloaded headers-first during the initial block download.
Tests the full IBD state machine: startSync → fetchHeaders → handleHeadersMsg → fetchHeaderBlocks → handleBlockMsg → IBD complete. Three sub-cases exercise different paths: - fresh IBD from genesis - node restart with partially synced blocks - stalled sync peer recovery via handleStallSample
Verify that startSync skips header download and directly requests blocks when the header chain is already caught up to the peer's height but the block chain lags behind.
Verify that startSync does not set syncPeer or ibdMode when the chain tip is recent and no peer advertises a height above ours.
c0201b1 to
2aae8a6
Compare
yyforyongyu
left a comment
There was a problem hiding this comment.
One more question then I think it's good to go!
| continue | ||
| } | ||
|
|
||
| if peer.LastBlock() <= height { |
There was a problem hiding this comment.
request blocks from the peer that gave us the inv for the block, not the syncPeer.
Correct - but we do have this check before we process the inv, at L1137
if peer != sm.syncPeer && !sm.current() {If syncPeer is nil, the inv processing will be aborted.
2aae8a6 to
f9aad9a
Compare
yyforyongyu
left a comment
There was a problem hiding this comment.
LGTM🥇
The only comment I have is this one, it's non-blocking as I believe it's extremely unlikely to happen on mainnet, tho plausible on testnet or regtest, while I still consider it an edge case, I think it's good to flag it here for future ref.
| @@ -24,8 +23,8 @@ import ( | |||
|
|
|||
| const ( | |||
| // minInFlightBlocks is the minimum number of blocks that should be | |||
| // in the request queue for headers-first mode before requesting | |||
| // more. | |||
| // in the request queue for the initial block download mode before | |||
| // requesting more. | |||
| minInFlightBlocks = 10 | |||
|
|
|||
| // maxRejectedTxns is the maximum number of rejected transactions | |||
| @@ -198,32 +197,13 @@ type SyncManager struct { | |||
| peerStates map[*peerpkg.Peer]*peerSyncState | |||
| lastProgressTime time.Time | |||
|
|
|||
| // The following fields are used for headers-first mode. | |||
| headersFirstMode bool | |||
| headerList *list.List | |||
| startHeader *list.Element | |||
| nextCheckpoint *chaincfg.Checkpoint | |||
| // The following fields are used for the initial block download mode. | |||
| ibdMode bool | |||
|
|
|||
| // An optional fee estimator. | |||
| feeEstimator *mempool.FeeEstimator | |||
| } | |||
|
|
|||
| // resetHeaderState sets the headers-first mode state to values appropriate for | |||
| // syncing from a new peer. | |||
| func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { | |||
| sm.headersFirstMode = false | |||
| sm.headerList.Init() | |||
| sm.startHeader = nil | |||
|
|
|||
| // When there is a next checkpoint, add an entry for the latest known | |||
| // block into the header pool. This allows the next downloaded header | |||
| // to prove it links to the chain properly. | |||
| if sm.nextCheckpoint != nil { | |||
| node := headerNode{height: newestHeight, hash: newestHash} | |||
| sm.headerList.PushBack(&node) | |||
| } | |||
| } | |||
|
|
|||
| // findNextHeaderCheckpoint returns the next checkpoint after the passed height. | |||
| // It returns nil when there is not one either because the height is already | |||
| // later than the final checkpoint or some other reason such as disabled | |||
| @@ -252,70 +232,122 @@ func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoi | |||
| return nextCheckpoint | |||
| } | |||
|
|
|||
| // startSync will choose the best peer among the available candidate peers to | |||
| // download/sync the blockchain from. When syncing is already running, it | |||
| // simply returns. It also examines the candidates for any which are no longer | |||
| // candidates and removes them as needed. | |||
| func (sm *SyncManager) startSync() { | |||
| // Return now if we're already syncing. | |||
| if sm.syncPeer != nil { | |||
| return | |||
| } | |||
|
|
|||
| // Once the segwit soft-fork package has activated, we only | |||
| // want to sync from peers which are witness enabled to ensure | |||
| // that we fully validate all blockchain data. | |||
| segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) | |||
| if err != nil { | |||
| log.Errorf("Unable to query for segwit soft-fork state: %v", err) | |||
| return | |||
| } | |||
|
|
|||
| best := sm.chain.BestSnapshot() | |||
| var higherPeers, equalPeers []*peerpkg.Peer | |||
| // fetchHigherPeers returns all the peers that are at a higher block than the | |||
| // given height. The peers that are not sync candidates are omitted from the | |||
| // returned list. | |||
| func (sm *SyncManager) fetchHigherPeers(height int32) []*peerpkg.Peer { | |||
| higherPeers := make([]*peerpkg.Peer, 0, len(sm.peerStates)) | |||
| for peer, state := range sm.peerStates { | |||
| if !state.syncCandidate { | |||
| continue | |||
| } | |||
|
|
|||
| if segwitActive && !peer.IsWitnessEnabled() { | |||
| log.Debugf("peer %v not witness enabled, skipping", peer) | |||
| if peer.LastBlock() <= height { | |||
| continue | |||
| } | |||
|
|
|||
| // Remove sync candidate peers that are no longer candidates due | |||
| // to passing their latest known block. NOTE: The < is | |||
| // intentional as opposed to <=. While technically the peer | |||
| // doesn't have a later block when it's equal, it will likely | |||
| // have one soon so it is a reasonable choice. It also allows | |||
| // the case where both are at 0 such as during regression test. | |||
| if peer.LastBlock() < best.Height { | |||
| state.syncCandidate = false | |||
| continue | |||
| } | |||
| higherPeers = append(higherPeers, peer) | |||
| } | |||
|
|
|||
| // If the peer is at the same height as us, we'll add it a set | |||
| // of backup peers in case we do not find one with a higher | |||
| // height. If we are synced up with all of our peers, all of | |||
| // them will be in this set. | |||
| if peer.LastBlock() == best.Height { | |||
| equalPeers = append(equalPeers, peer) | |||
| return higherPeers | |||
| } | |||
|
|
|||
| // fetchSameHeightPeers returns all sync-candidate peers that advertise the | |||
| // given block height. | |||
| func (sm *SyncManager) fetchSameHeightPeers(height int32) []*peerpkg.Peer { | |||
| sameHeightPeers := make([]*peerpkg.Peer, 0, len(sm.peerStates)) | |||
| for peer, state := range sm.peerStates { | |||
| if !state.syncCandidate { | |||
| continue | |||
| } | |||
|
|
|||
| // This peer has a height greater than our own, we'll consider | |||
| // it in the set of better peers from which we'll randomly | |||
| // select. | |||
| higherPeers = append(higherPeers, peer) | |||
| if peer.LastBlock() == height { | |||
| sameHeightPeers = append(sameHeightPeers, peer) | |||
| } | |||
| } | |||
|
|
|||
| return sameHeightPeers | |||
| } | |||
|
|
|||
| // isInIBDMode returns true if there's more blocks needed to be downloaded to | |||
| // catch up to the latest chain tip. | |||
| func (sm *SyncManager) isInIBDMode() bool { | |||
| best := sm.chain.BestSnapshot() | |||
| higherPeers := sm.fetchHigherPeers(best.Height) | |||
| if sm.chain.IsCurrent() && len(higherPeers) == 0 { | |||
| log.Infof("Caught up to block %s(%d)", best.Hash.String(), best.Height) | |||
| return false | |||
| } | |||
|
|
|||
| return true | |||
| } | |||
|
|
|||
| // fetchHeaders randomly picks a peer that has a higher advertised header | |||
| // and pushes a get headers message to it. | |||
| func (sm *SyncManager) fetchHeaders() { | |||
| _, height := sm.chain.BestHeader() | |||
| higherPeers := sm.fetchHigherPeers(height) | |||
| if len(higherPeers) == 0 { | |||
| log.Warnf("No sync peer candidates available") | |||
| return | |||
| } | |||
| bestPeer := higherPeers[rand.Intn(len(higherPeers))] | |||
|
|
|||
| // Pick randomly from the set of peers greater than our block height, | |||
| // falling back to a random peer of the same height if none are greater. | |||
| locator, err := sm.chain.LatestBlockLocatorByHeader() | |||
| if err != nil { | |||
| log.Errorf("Failed to get block locator for the "+ | |||
| "latest block header: %v", err) | |||
| return | |||
| } | |||
|
|
|||
| log.Infof("Downloading headers for blocks %d to "+ | |||
| "%d from peer %s", height+1, | |||
| bestPeer.LastBlock(), bestPeer.Addr()) | |||
|
|
|||
| bestPeer.PushGetHeadersMsg(locator, &zeroHash) | |||
|
|
|||
| sm.ibdMode = true | |||
| sm.syncPeer = bestPeer | |||
| } | |||
|
|
|||
| // startSync will choose the best peer among the available candidate peers to | |||
| // download/sync the blockchain from. When syncing is already running, it | |||
| // simply returns. It also examines the candidates for any which are no longer | |||
| // candidates and removes them as needed. | |||
| func (sm *SyncManager) startSync() { | |||
| // Return now if we're already syncing. | |||
| if sm.syncPeer != nil { | |||
| return | |||
| } | |||
|
|
|||
| // Check to see if we're in the initial block download mode. | |||
| if !sm.isInIBDMode() { | |||
| return | |||
| } | |||
|
|
|||
| // If we're in the initial block download mode, check if we have | |||
| // peers that we can download headers from. | |||
| _, bestHeaderHeight := sm.chain.BestHeader() | |||
| higherHeaderPeers := sm.fetchHigherPeers(bestHeaderHeight) | |||
| if len(higherHeaderPeers) != 0 { | |||
| sm.fetchHeaders() | |||
|
|
|||
| // Reset the last progress time now that we have a | |||
| // non-nil syncPeer to avoid the stall handler firing | |||
| // before any headers have been received. | |||
| if sm.syncPeer != nil { | |||
| sm.lastProgressTime = time.Now() | |||
| } | |||
| return | |||
| } | |||
|
|
|||
| // We don't have any more headers to download at this | |||
| // point so start asking for blocks. | |||
| best := sm.chain.BestSnapshot() | |||
| higherPeers := sm.fetchHigherPeers(best.Height) | |||
|
|
|||
| // Pick randomly from the set of peers greater than our | |||
| // block height, falling back to a random peer of the same | |||
| // height if none are greater. | |||
| // | |||
| // TODO(conner): Use a better algorithm to ranking peers based on | |||
| // observed metrics and/or sync in parallel. | |||
| @@ -324,65 +356,33 @@ func (sm *SyncManager) startSync() { | |||
| case len(higherPeers) > 0: | |||
| bestPeer = higherPeers[rand.Intn(len(higherPeers))] | |||
|
|
|||
| case len(equalPeers) > 0: | |||
| bestPeer = equalPeers[rand.Intn(len(equalPeers))] | |||
| default: | |||
| // Fall back to a random peer at the same block height. | |||
| // This can happen when we've fetched all headers but | |||
| // still need blocks, and all remaining peers advertise | |||
| // the same height as our block tip. | |||
| sameHeightPeers := sm.fetchSameHeightPeers(best.Height) | |||
| if len(sameHeightPeers) > 0 { | |||
| bestPeer = sameHeightPeers[rand.Intn(len(sameHeightPeers))] | |||
| } | |||
| } | |||
|
|
|||
| // Start syncing from the best peer if one was selected. | |||
| if bestPeer != nil { | |||
| // Clear the requestedBlocks if the sync peer changes, otherwise | |||
| // we may ignore blocks we need that the last sync peer failed | |||
| // to send. | |||
| sm.requestedBlocks = make(map[chainhash.Hash]struct{}) | |||
| if bestPeer == nil { | |||
| log.Warnf("No sync peer candidates available") | |||
| return | |||
| } | |||
|
|
|||
| locator, err := sm.chain.LatestBlockLocator() | |||
| if err != nil { | |||
| log.Errorf("Failed to get block locator for the "+ | |||
| "latest block: %v", err) | |||
| return | |||
| } | |||
| sm.syncPeer = bestPeer | |||
| sm.ibdMode = true | |||
|
|
|||
| log.Infof("Syncing to block height %d from peer %v", | |||
| bestPeer.LastBlock(), bestPeer.Addr()) | |||
|
|
|||
| // When the current height is less than a known checkpoint we | |||
| // can use block headers to learn about which blocks comprise | |||
| // the chain up to the checkpoint and perform less validation | |||
| // for them. This is possible since each header contains the | |||
| // hash of the previous header and a merkle root. Therefore if | |||
| // we validate all of the received headers link together | |||
| // properly and the checkpoint hashes match, we can be sure the | |||
| // hashes for the blocks in between are accurate. Further, once | |||
| // the full blocks are downloaded, the merkle root is computed | |||
| // and compared against the value in the header which proves the | |||
| // full block hasn't been tampered with. | |||
| // | |||
| // Once we have passed the final checkpoint, or checkpoints are | |||
| // disabled, use standard inv messages learn about the blocks | |||
| // and fully validate them. Finally, regression test mode does | |||
| // not support the headers-first approach so do normal block | |||
| // downloads when in regression test mode. | |||
| if sm.nextCheckpoint != nil && | |||
| best.Height < sm.nextCheckpoint.Height && | |||
| sm.chainParams != &chaincfg.RegressionNetParams { | |||
|
|
|||
| bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) | |||
| sm.headersFirstMode = true | |||
| log.Infof("Downloading headers for blocks %d to "+ | |||
| "%d from peer %s", best.Height+1, | |||
| sm.nextCheckpoint.Height, bestPeer.Addr()) | |||
| } else { | |||
| bestPeer.PushGetBlocksMsg(locator, &zeroHash) | |||
| } | |||
| sm.syncPeer = bestPeer | |||
| // Reset the last progress time now that we have a non-nil | |||
| // syncPeer to avoid instantly detecting it as stalled in the | |||
| // event the progress time hasn't been updated recently. | |||
| sm.lastProgressTime = time.Now() | |||
|
|
|||
| // Reset the last progress time now that we have a non-nil | |||
| // syncPeer to avoid instantly detecting it as stalled in the | |||
| // event the progress time hasn't been updated recently. | |||
| sm.lastProgressTime = time.Now() | |||
| } else { | |||
| log.Warnf("No sync peer candidates available") | |||
| } | |||
| log.Infof("Syncing to block height %d from peer %v", | |||
| sm.syncPeer.LastBlock(), sm.syncPeer.Addr()) | |||
| sm.fetchHeaderBlocks(sm.syncPeer) | |||
| } | |||
|
|
|||
| // isSyncCandidate returns whether or not the peer is a candidate to consider | |||
| @@ -592,12 +592,6 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { | |||
| sm.syncPeer.Disconnect() | |||
| } | |||
|
|
|||
| // Reset any header state before we choose our next active sync peer. | |||
| if sm.headersFirstMode { | |||
| best := sm.chain.BestSnapshot() | |||
| sm.resetHeaderState(&best.Hash, best.Height) | |||
| } | |||
|
|
|||
| sm.syncPeer = nil | |||
| sm.startSync() | |||
| } | |||
| @@ -690,6 +684,47 @@ func (sm *SyncManager) current() bool { | |||
| return true | |||
| } | |||
|
|
|||
| // checkHeadersList checks if the sync manager is in the initial block download | |||
| // mode and returns if the given block hash is a checkpointed block and the | |||
| // behavior flags for this block. If the block is still under the checkpoint, | |||
| // then it's given the fast-add flag. | |||
| func (sm *SyncManager) checkHeadersList(blockHash *chainhash.Hash) ( | |||
| bool, blockchain.BehaviorFlags) { | |||
|
|
|||
| // Always return false and BFNone if we're not in ibd mode. | |||
| if !sm.ibdMode { | |||
| return false, blockchain.BFNone | |||
| } | |||
|
|
|||
| isCheckpointBlock := false | |||
| behaviorFlags := blockchain.BFNone | |||
|
|
|||
| // If we don't already know this is a valid header, return false and | |||
| // BFNone. | |||
| if !sm.chain.IsValidHeader(blockHash) { | |||
| return false, blockchain.BFNone | |||
| } | |||
|
|
|||
| height, err := sm.chain.HeaderHeightByHash(*blockHash) | |||
| if err != nil { | |||
| return false, blockchain.BFNone | |||
| } | |||
|
|
|||
| // Since findNextHeaderCheckpoint returns the next checkpoint after the | |||
| // passed height, we do a -1 to include the current block. | |||
| checkpoint := sm.findNextHeaderCheckpoint(height - 1) | |||
| if checkpoint == nil { | |||
| return false, blockchain.BFNone | |||
| } | |||
|
|
|||
| behaviorFlags |= blockchain.BFFastAdd | |||
| if blockHash.IsEqual(checkpoint.Hash) { | |||
| isCheckpointBlock = true | |||
| } | |||
|
|
|||
| return isCheckpointBlock, behaviorFlags | |||
| } | |||
|
|
|||
| // handleBlockMsg handles block messages from all peers. | |||
| func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { | |||
| peer := bmsg.peer | |||
| @@ -715,29 +750,10 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { | |||
| } | |||
| } | |||
|
|
|||
| // When in headers-first mode, if the block matches the hash of the | |||
| // first header in the list of headers that are being fetched, it's | |||
| // eligible for less validation since the headers have already been | |||
| // verified to link together and are valid up to the next checkpoint. | |||
| // Also, remove the list entry for all blocks except the checkpoint | |||
| // since it is needed to verify the next round of headers links | |||
| // properly. | |||
| isCheckpointBlock := false | |||
| behaviorFlags := blockchain.BFNone | |||
| if sm.headersFirstMode { | |||
| firstNodeEl := sm.headerList.Front() | |||
| if firstNodeEl != nil { | |||
| firstNode := firstNodeEl.Value.(*headerNode) | |||
| if blockHash.IsEqual(firstNode.hash) { | |||
| behaviorFlags |= blockchain.BFFastAdd | |||
| if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { | |||
| isCheckpointBlock = true | |||
| } else { | |||
| sm.headerList.Remove(firstNodeEl) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| // Check if the block is eligible for less validation since the headers | |||
| // have already been verified to link together and are valid up to the | |||
| // next checkpoint. | |||
| isCheckpointBlock, behaviorFlags := sm.checkHeadersList(blockHash) | |||
|
|
|||
| // Remove block from request maps. Either chain will know about it and | |||
| // so we shouldn't have any more instances of trying to fetch it, or we | |||
| @@ -845,119 +861,135 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { | |||
| } | |||
| } | |||
|
|
|||
| // If we are not in headers first mode, it's a good time to periodically | |||
| // flush the blockchain cache because we don't expect new blocks immediately. | |||
| // After that, there is nothing more to do. | |||
| if !sm.headersFirstMode { | |||
| // If we are not in the initial block download mode, it's a good time to | |||
| // periodically flush the blockchain cache because we don't expect new | |||
| // blocks immediately. After that, there is nothing more to do. | |||
| if !sm.ibdMode { | |||
| if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { | |||
| log.Errorf("Error while flushing the blockchain cache: %v", err) | |||
| } | |||
| return | |||
| } | |||
|
|
|||
| // This is headers-first mode, so if the block is not a checkpoint | |||
| // request more blocks using the header list when the request queue is | |||
| // getting short. | |||
| if !isCheckpointBlock { | |||
| if sm.startHeader != nil && | |||
| len(state.requestedBlocks) < minInFlightBlocks { | |||
| sm.fetchHeaderBlocks() | |||
| // If we're on a checkpointed block, check if we still have checkpoints | |||
| // to let the user know if we're switching to normal mode. | |||
| if isCheckpointBlock { | |||
| log.Infof("Continuing IBD, on checkpoint block %v(%v)", | |||
| bmsg.block.Hash(), bmsg.block.Height()) | |||
| nextCheckpoint := sm.findNextHeaderCheckpoint(bmsg.block.Height()) | |||
| if nextCheckpoint == nil { | |||
| log.Infof("Reached the final checkpoint -- " + | |||
| "switching to normal mode") | |||
| } | |||
| return | |||
| } | |||
|
|
|||
| // This is headers-first mode and the block is a checkpoint. When | |||
| // there is a next checkpoint, get the next round of headers by asking | |||
| // for headers starting from the block after this one up to the next | |||
| // checkpoint. | |||
| prevHeight := sm.nextCheckpoint.Height | |||
| prevHash := sm.nextCheckpoint.Hash | |||
| sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) | |||
| if sm.nextCheckpoint != nil { | |||
| locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) | |||
| err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) | |||
| if err != nil { | |||
| log.Warnf("Failed to send getheaders message to "+ | |||
| "peer %s: %v", peer.Addr(), err) | |||
| return | |||
| } | |||
| log.Infof("Downloading headers for blocks %d to %d from "+ | |||
| "peer %s", prevHeight+1, sm.nextCheckpoint.Height, | |||
| sm.syncPeer.Addr()) | |||
| // Fetch more blocks if we're still not caught up to the best header and | |||
| // the number of in-flight blocks has dropped below the minimum threshold. | |||
| _, lastHeight := sm.chain.BestHeader() | |||
| if bmsg.block.Height() < lastHeight && | |||
| len(state.requestedBlocks) < minInFlightBlocks { | |||
| sm.fetchHeaderBlocks(sm.syncPeer) | |||
| return | |||
| } | |||
|
|
|||
| // This is headers-first mode, the block is a checkpoint, and there are | |||
| // no more checkpoints, so switch to normal mode by requesting blocks | |||
| // from the block after this one up to the end of the chain (zero hash). | |||
| sm.headersFirstMode = false | |||
| sm.headerList.Init() | |||
| log.Infof("Reached the final checkpoint -- switching to normal mode") | |||
| locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) | |||
| err = peer.PushGetBlocksMsg(locator, &zeroHash) | |||
| if err != nil { | |||
| log.Warnf("Failed to send getblocks message to peer %s: %v", | |||
| peer.Addr(), err) | |||
| return | |||
| if bmsg.block.Height() >= lastHeight { | |||
| log.Infof("Finished the initial block download and "+ | |||
| "caught up to block %v(%v) -- now listening to blocks.", | |||
| bmsg.block.Hash(), bmsg.block.Height()) | |||
| sm.ibdMode = false | |||
| } | |||
| } | |||
|
|
|||
| // fetchHeaderBlocks creates and sends a request to the syncPeer for the next | |||
| // fetchHeaderBlocks creates and sends a request to the given peer for the next | |||
| // list of blocks to be downloaded based on the current list of headers. | |||
| func (sm *SyncManager) fetchHeaderBlocks() { | |||
| // Nothing to do if there is no start header. | |||
| if sm.startHeader == nil { | |||
| log.Warnf("fetchHeaderBlocks called with no start header") | |||
| func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) { | |||
| if peer == nil { | |||
| log.Warnf("fetchHeaderBlocks called with a nil peer") | |||
| return | |||
| } | |||
|
|
|||
| // Build up a getdata request for the list of blocks the headers | |||
| // describe. The size hint will be limited to wire.MaxInvPerMsg by | |||
| // the function, so no need to double check it here. | |||
| gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) | |||
| gdmsg := sm.buildBlockRequest(peer) | |||
| if len(gdmsg.InvList) > 0 { | |||
| peer.QueueMessage(gdmsg, nil) | |||
| } | |||
| } | |||
|
|
|||
| // buildBlockRequest builds a getdata message for blocks that need to be | |||
| // downloaded based on the current list of headers. | |||
| // | |||
| // Start fetching from the fork point between the best chain and | |||
| // the best header chain rather than from the best chain height. | |||
| // When the best header chain has diverged (e.g. due to a reorg), | |||
| // blocks between the fork point and the current height on the new | |||
| // chain are different and must also be downloaded. | |||
| func (sm *SyncManager) buildBlockRequest(peer *peerpkg.Peer) *wire.MsgGetData { | |||
| // Return early if the peer is nil. | |||
| if peer == nil { | |||
| return wire.NewMsgGetDataSizeHint(0) | |||
| } | |||
|
|
|||
| _, bestHeaderHeight := sm.chain.BestHeader() | |||
| forkHeight := sm.chain.BestChainHeaderForkHeight() | |||
| if bestHeaderHeight < forkHeight { | |||
| // Should never happen but we're guarding against the uint cast | |||
| // that happens below. | |||
| return wire.NewMsgGetDataSizeHint(0) | |||
| } | |||
| length := bestHeaderHeight - forkHeight | |||
| gdmsg := wire.NewMsgGetDataSizeHint(uint(length)) | |||
| numRequested := 0 | |||
| for e := sm.startHeader; e != nil; e = e.Next() { | |||
| node, ok := e.Value.(*headerNode) | |||
| if !ok { | |||
| log.Warn("Header list node type is not a headerNode") | |||
| continue | |||
| for h := forkHeight + 1; h <= bestHeaderHeight; h++ { | |||
| hash, err := sm.chain.HeaderHashByHeight(h) | |||
| if err != nil { | |||
| log.Warnf("error while fetching the block hash for height %v -- %v", | |||
| h, err) | |||
| return gdmsg | |||
| } | |||
|
|
|||
| iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) | |||
| iv := wire.NewInvVect(wire.InvTypeBlock, hash) | |||
| haveInv, err := sm.haveInventory(iv) | |||
| if err != nil { | |||
| log.Warnf("Unexpected failure when checking for "+ | |||
| "existing inventory during header block "+ | |||
| "fetch: %v", err) | |||
| } | |||
| if !haveInv { | |||
| syncPeerState := sm.peerStates[sm.syncPeer] | |||
| // Skip blocks that are already in-flight to avoid | |||
| // sending duplicate getdata requests. Duplicates | |||
| // cause the peer to send the block twice; the second | |||
| // copy arrives after the first has been processed and | |||
| // removed from requestedBlocks, triggering an | |||
| // "unrequested block" disconnect. | |||
| if _, exists := sm.requestedBlocks[*hash]; exists { | |||
| continue | |||
| } | |||
|
|
|||
| peerState := sm.peerStates[peer] | |||
|
|
|||
| sm.requestedBlocks[*node.hash] = struct{}{} | |||
| syncPeerState.requestedBlocks[*node.hash] = struct{}{} | |||
| sm.requestedBlocks[*hash] = struct{}{} | |||
| peerState.requestedBlocks[*hash] = struct{}{} | |||
|
|
|||
| // If we're fetching from a witness enabled peer | |||
| // post-fork, then ensure that we receive all the | |||
| // witness data in the blocks. | |||
| if sm.syncPeer.IsWitnessEnabled() { | |||
| if peer.IsWitnessEnabled() { | |||
| iv.Type = wire.InvTypeWitnessBlock | |||
| } | |||
|
|
|||
| gdmsg.AddInvVect(iv) | |||
| numRequested++ | |||
| } | |||
| sm.startHeader = e.Next() | |||
|
|
|||
| if numRequested >= wire.MaxInvPerMsg { | |||
| break | |||
| } | |||
| } | |||
| if len(gdmsg.InvList) > 0 { | |||
| sm.syncPeer.QueueMessage(gdmsg, nil) | |||
| } | |||
| return gdmsg | |||
| } | |||
|
|
|||
| // handleHeadersMsg handles block header messages from all peers. Headers are | |||
| // requested when performing a headers-first sync. | |||
| // requested when performing the ibd and are propagated by peers once the ibd | |||
| // is complete. | |||
| func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { | |||
| peer := hmsg.peer | |||
| _, exists := sm.peerStates[peer] | |||
| @@ -966,102 +998,53 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { | |||
| return | |||
| } | |||
|
|
|||
| // The remote peer is misbehaving if we didn't request headers. | |||
| // Nothing to do for an empty headers message. | |||
| msg := hmsg.headers | |||
| numHeaders := len(msg.Headers) | |||
| if !sm.headersFirstMode { | |||
| log.Warnf("Got %d unrequested headers from %s -- "+ | |||
| "disconnecting", numHeaders, peer.Addr()) | |||
| peer.Disconnect() | |||
| return | |||
| } | |||
|
|
|||
| // Nothing to do for an empty headers message. | |||
| if numHeaders == 0 { | |||
| return | |||
| } | |||
|
|
|||
| // Process all of the received headers ensuring each one connects to the | |||
| // previous and that checkpoints match. | |||
| receivedCheckpoint := false | |||
| var finalHash *chainhash.Hash | |||
| for _, blockHeader := range msg.Headers { | |||
| blockHash := blockHeader.BlockHash() | |||
| finalHash = &blockHash | |||
|
|
|||
| // Ensure there is a previous header to compare against. | |||
| prevNodeEl := sm.headerList.Back() | |||
| if prevNodeEl == nil { | |||
| log.Warnf("Header list does not contain a previous" + | |||
| "element as expected -- disconnecting peer") | |||
| _, err := sm.chain.ProcessBlockHeader( | |||
| blockHeader, blockchain.BFNone, false, | |||
| ) | |||
| if err != nil { | |||
| log.Warnf("Received block header from peer %v "+ | |||
| "failed header verification -- disconnecting", | |||
| peer.Addr()) | |||
| peer.Disconnect() | |||
| return | |||
| } | |||
|
|
|||
| // Ensure the header properly connects to the previous one and | |||
| // add it to the list of headers. | |||
| node := headerNode{hash: &blockHash} | |||
| prevNode := prevNodeEl.Value.(*headerNode) | |||
| if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { | |||
| node.height = prevNode.height + 1 | |||
| e := sm.headerList.PushBack(&node) | |||
| if sm.startHeader == nil { | |||
| sm.startHeader = e | |||
| } | |||
| } else { | |||
| log.Warnf("Received block header that does not "+ | |||
| "properly connect to the chain from peer %s "+ | |||
| "-- disconnecting", peer.Addr()) | |||
| peer.Disconnect() | |||
| sm.progressLogger.SetLastLogTime(time.Now()) | |||
| } | |||
|
|
|||
| bestHash, bestHeight := sm.chain.BestHeader() | |||
| if sm.ibdMode { | |||
| if sm.syncPeer == nil { | |||
| // Return if we've disconnected from the syncPeer. | |||
| return | |||
| } | |||
|
|
|||
| // Verify the header at the next checkpoint height matches. | |||
| if node.height == sm.nextCheckpoint.Height { | |||
| if node.hash.IsEqual(sm.nextCheckpoint.Hash) { | |||
| receivedCheckpoint = true | |||
| log.Infof("Verified downloaded block "+ | |||
| "header against checkpoint at height "+ | |||
| "%d/hash %s", node.height, node.hash) | |||
| } else { | |||
| log.Warnf("Block header at height %d/hash "+ | |||
| "%s from peer %s does NOT match "+ | |||
| "expected checkpoint hash of %s -- "+ | |||
| "disconnecting", node.height, | |||
| node.hash, peer.Addr(), | |||
| sm.nextCheckpoint.Hash) | |||
| peer.Disconnect() | |||
| return | |||
| } | |||
| break | |||
| // Update the last progress time to prevent the stall handler | |||
| // from disconnecting the sync peer during header download. | |||
| if peer == sm.syncPeer { | |||
| sm.lastProgressTime = time.Now() | |||
| } | |||
| } | |||
|
|
|||
| // When this header is a checkpoint, switch to fetching the blocks for | |||
| // all of the headers since the last checkpoint. | |||
| if receivedCheckpoint { | |||
| // Since the first entry of the list is always the final block | |||
| // that is already in the database and is only used to ensure | |||
| // the next header links properly, it must be removed before | |||
| // fetching the blocks. | |||
| sm.headerList.Remove(sm.headerList.Front()) | |||
| log.Infof("Received %v block headers: Fetching blocks", | |||
| sm.headerList.Len()) | |||
| sm.progressLogger.SetLastLogTime(time.Now()) | |||
| sm.fetchHeaderBlocks() | |||
| return | |||
| if bestHeight < sm.syncPeer.LastBlock() { | |||
| locator := blockchain.BlockLocator([]*chainhash.Hash{&bestHash}) | |||
| sm.syncPeer.PushGetHeadersMsg(locator, &zeroHash) | |||
| return | |||
| } | |||
| } | |||
|
|
|||
| // This header is not a checkpoint, so request the next batch of | |||
| // headers starting from the latest known header and ending with the | |||
| // next checkpoint. | |||
| locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) | |||
| err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) | |||
| if err != nil { | |||
| log.Warnf("Failed to send getheaders message to "+ | |||
| "peer %s: %v", peer.Addr(), err) | |||
| return | |||
| } | |||
| bestHeaderHash, bestHeaderHeight := sm.chain.BestHeader() | |||
| log.Infof("downloaded headers to %v(%v) from peer %v "+ | |||
| "-- now fetching blocks", | |||
| bestHeaderHash, bestHeaderHeight, hmsg.peer.String()) | |||
| sm.fetchHeaderBlocks(peer) | |||
| } | |||
|
|
|||
| // handleNotFoundMsg handles notfound messages from all peers. | |||
| @@ -1210,8 +1193,8 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { | |||
| // for the peer. | |||
| peer.AddKnownInventory(iv) | |||
|
|
|||
| // Ignore inventory when we're in headers-first mode. | |||
| if sm.headersFirstMode { | |||
| // Ignore inventory when we're in the initial block download mode. | |||
| if sm.ibdMode { | |||
| continue | |||
| } | |||
|
|
|||
| @@ -1685,19 +1668,11 @@ func New(config *Config) (*SyncManager, error) { | |||
| peerStates: make(map[*peerpkg.Peer]*peerSyncState), | |||
| progressLogger: newBlockProgressLogger("Processed", log), | |||
| msgChan: make(chan interface{}, config.MaxPeers*3), | |||
| headerList: list.New(), | |||
| quit: make(chan struct{}), | |||
| feeEstimator: config.FeeEstimator, | |||
| } | |||
|
|
|||
| best := sm.chain.BestSnapshot() | |||
| if !config.DisableCheckpoints { | |||
| // Initialize the next checkpoint based on the current height. | |||
| sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height) | |||
| if sm.nextCheckpoint != nil { | |||
| sm.resetHeaderState(&best.Hash, best.Height) | |||
| } | |||
| } else { | |||
| if config.DisableCheckpoints { | |||
| log.Info("Checkpoints are disabled") | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
There is an edge case, which I believe will probably only happen in regtest/testnet. Say sm.chain.IsCurrent() is false if the tip block timestamp is older than 24 hours, even if the node's height matches peers.
startSync()sees!IsCurrent()and enters IBD logic at L322.- Since there are no higher-header peers, it falls back to same-height block peers at L359.
- It sets
sm.syncPeerandsm.ibdMode= true at L375. - But if
bestHeader == bestBlock,buildBlockRequest()has nothing to ask for, sofetchHeaderBlocks()sends no getdata at L911.
and if that sync peer later announces a new block in the normal way inv, we can still be stuck because this line.
There was a problem hiding this comment.
Wouldn't this be fixed if we only fetched from higher peers? I'll revert the last change.
f9aad9a to
2aae8a6
Compare
Roasbeef
left a comment
There was a problem hiding this comment.
LGTM 🦂
(stray comment I forgot to post)
netsync/manager.go
Outdated
| higherPeers := sm.fetchHigherPeers(height) | ||
| var bestPeer *peerpkg.Peer | ||
| switch { | ||
| case len(higherPeers) > 0: |
There was a problem hiding this comment.
I think we can simplify this a bit as:
if len(higherPeers) == 0 {"
log
return
}
bestPeer = higherPeers[rand.Intn(len(higherPeers))]
Change Description
Right now the headers-first download is only based off of the checkpoints and is thus limited to the last checkpoint.
The newly implemented headers-first download will always download headers-first and will validate them to see if the headers connect and have proper proof of work.
Then the block download will be based off of the verified headers. This now eliminates any potential downloading of txs or orphan blocks during ibd. It also makes future parallel block download much better as a parallel block download can only happen for blocks we already have headers for.
It's not yet put into the code yet but this allows the node to also receive block headers instead of invs during block propagation.I'll do this in a follow up later.Steps to Test
Pull Request Checklist
Testing
Code Style and Documentation
📝 Please see our Contribution Guidelines for further guidance.