diff --git a/blockchain/accept.go b/blockchain/accept.go index 4adc2f6127..a409aacc77 100644 --- a/blockchain/accept.go +++ b/blockchain/accept.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/database" + "github.com/btcsuite/btcd/wire" ) // maybeAcceptBlock potentially accepts a block into the block chain and, if @@ -63,11 +64,20 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) // Create a new block node for the block and add it to the node index. Even // if the block ultimately gets connected to the main chain, it starts out // on a side chain. - blockHeader := &block.MsgBlock().Header - newNode := newBlockNode(blockHeader, prevNode) - newNode.status = statusDataStored - - b.index.AddNode(newNode) + // + // If a header-only node already exists (from maybeAcceptBlockHeader), + // upgrade its status rather than creating a new node. Creating a new + // node would overwrite the index entry, orphaning the pointer held by + // bestHeader's chainView and breaking Contains checks. + newNode := b.index.LookupNode(block.Hash()) + if newNode != nil { + b.index.SetStatusFlags(newNode, statusDataStored) + } else { + blockHeader := &block.MsgBlock().Header + newNode = newBlockNode(blockHeader, prevNode) + newNode.status = statusDataStored | statusHeaderStored + b.index.AddNode(newNode) + } err = b.index.flushToDB() if err != nil { return false, err @@ -92,3 +102,135 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) return isMainChain, nil } + +// maybeAcceptBlockHeader potentially accepts the header to the block index and, +// if accepted, returns a bool indicating if the header extended the best chain +// of headers. It also performs several context independent checks as well as +// those which depend on its position within the header chain. +// +// The flags are passed to CheckBlockHeaderSanity and CheckBlockHeaderContext +// which allow the skipping of PoW check or the check for the block difficulty, +// median time check, and the BIP94 check. +// +// The skipCheckpoint boolean allows skipping of the check for if the header is +// part of the existing checkpoints. +// +// In the case the block header is already known, the associated block node is +// examined to determine if the block is already known to be invalid, in which +// case an appropriate error will be returned. +// +// This function MUST be called with the chain lock held (for writes). +func (b *BlockChain) maybeAcceptBlockHeader(header *wire.BlockHeader, + flags BehaviorFlags, skipCheckpoint bool) (bool, error) { + + // Orphan headers are not allowed and this function should never be called + // with the genesis block. + prevHash := &header.PrevBlock + prevNode := b.index.LookupNode(prevHash) + if prevNode == nil { + str := fmt.Sprintf("previous block %s is not known", prevHash) + return false, ruleError(ErrPreviousBlockUnknown, str) + } + + // This header is invalid if its previous node is invalid. + if b.index.NodeStatus(prevNode).KnownInvalid() { + str := fmt.Sprintf( + "previous block %s is known to be invalid", prevHash) + return false, ruleError(ErrInvalidAncestorBlock, str) + } + + // Avoid validating the header again if its validation status is already + // known. Invalid headers are never added to the block index, so if there + // is an entry for the block hash, the header itself is known to be valid. + hash := header.BlockHash() + node := b.index.LookupNode(&hash) + if node != nil { + nodeStatus := b.index.NodeStatus(node) + if nodeStatus&statusValidateFailed != 0 { + str := fmt.Sprintf("block %s is known to be invalid", hash) + return false, ruleError(ErrKnownInvalidBlock, str) + } else if nodeStatus&statusInvalidAncestor != 0 { + str := fmt.Sprintf("block %s has an invalid ancestor", hash) + return false, ruleError(ErrInvalidAncestorBlock, str) + } + + // If the node is in the bestHeaders chainview, it's in the main chain. + // If it isn't, then we'll go through the verification process below. + if b.bestHeader.Contains(node) { + return true, nil + } + } + + // Perform context-free sanity checks on the block header. + err := CheckBlockHeaderSanity( + header, b.chainParams.PowLimit, b.timeSource, flags) + if err != nil { + return false, err + } + + // The block must pass all of the validation rules which depend on the + // position of the block within the block chain. + err = CheckBlockHeaderContext(header, prevNode, flags, b, skipCheckpoint) + if err != nil { + return false, err + } + + // Create a new block node for the block and add it to the block index. + // + // Note that the additional information for the actual transactions and + // witnesses in the block can't be populated until the full block data is + // known since that information is not available in the header. + if node == nil { + node = newBlockNode(header, prevNode) + node.status = statusHeaderStored + b.index.AddNode(node) + } + + // Flush the block index to database at this point since we added the + // node. + err = b.index.flushToDB() + if err != nil { + return false, err + } + + // Check if the header extends the best header tip. + isMainChain := false + parentHash := &header.PrevBlock + if parentHash.IsEqual(&b.bestHeader.Tip().hash) { + log.Debugf("accepted header %v as the new header tip", node.hash) + + // This header is now the end of the best headers. + b.bestHeader.SetTip(node) + isMainChain = true + return isMainChain, nil + } + + // We're extending (or creating) a side chain, but the cumulative + // work for this new side chain is not enough to make it the new chain. + if node.workSum.Cmp(b.bestHeader.Tip().workSum) <= 0 { + // Log information about how the header is forking the chain. + fork := b.bestHeader.FindFork(node) + if fork.hash.IsEqual(parentHash) { + log.Infof("FORK: BlockHeader %v(%v) forks the chain at block %v(%v) "+ + "but did not have enough work to be the "+ + "main chain", node.hash, node.height, fork.hash, fork.height) + } else { + log.Infof("EXTEND FORK: BlockHeader %v(%v) extends a side chain "+ + "which forks the chain at block %v(%v)", + node.hash, node.height, fork.hash, fork.height) + } + + return false, nil + } + + prevTip := b.bestHeader.Tip() + log.Infof("NEW BEST HEADER CHAIN: BlockHeader %v(%v) is now a longer "+ + "PoW chain than the previous header tip of %v(%v).", + node.hash, node.height, + prevTip.hash, prevTip.height) + + b.bestHeader.SetTip(node) + isMainChain = true + + return isMainChain, nil +} diff --git a/blockchain/accept_test.go b/blockchain/accept_test.go new file mode 100644 index 0000000000..28a1ff6997 --- /dev/null +++ b/blockchain/accept_test.go @@ -0,0 +1,75 @@ +// Copyright (c) 2013-2026 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package blockchain + +import ( + "testing" + + "github.com/btcsuite/btcd/blockchain/internal/testhelper" + "github.com/btcsuite/btcd/btcutil" +) + +// TestMaybeAcceptBlockReusesHeaderNode ensures that when a block header is +// processed first via ProcessBlockHeader and later the full block arrives via +// ProcessBlock, the existing blockNode pointer is reused rather than replaced. +// Replacing the pointer would orphan the entry held by bestHeader's chainView, +// causing bestHeader.Contains(index.LookupNode(hash)) to return false and +// breaking IsValidHeader and downstream netsync checks. +func TestMaybeAcceptBlockReusesHeaderNode(t *testing.T) { + chain, params, tearDown := utxoCacheTestChain( + "TestMaybeAcceptBlockReusesHeaderNode") + defer tearDown() + + // Build a base chain of 3 blocks. + // + // genesis -> 1 -> 2 -> 3 + tip := btcutil.NewBlock(params.GenesisBlock) + _, _, err := addBlocks(3, chain, tip, []*testhelper.SpendableOut{}) + if err != nil { + t.Fatalf("failed to build base chain: %v", err) + } + + // Create block 4 without processing it. + prevBlock, err := chain.BlockByHeight(3) + if err != nil { + t.Fatalf("failed to get block at height 3: %v", err) + } + block4, _, err := newBlock(chain, prevBlock, nil) + if err != nil { + t.Fatalf("failed to create block 4: %v", err) + } + + // Process block 4's header first. + block4Hash := block4.Hash() + _, err = chain.ProcessBlockHeader( + &block4.MsgBlock().Header, BFNone, false) + if err != nil { + t.Fatalf("ProcessBlockHeader fail: %v", err) + } + + // Capture the header-only node pointer from the index. + headerNode := chain.index.LookupNode(block4Hash) + if headerNode == nil { + t.Fatal("header node not found in block index") + } + + // Now process the full block. + _, _, err = chain.ProcessBlock(block4, BFNone) + if err != nil { + t.Fatalf("ProcessBlock fail: %v", err) + } + + // The index must still hold the same pointer that bestHeader has. + // Before the fix, maybeAcceptBlock would create a fresh node and + // overwrite the index entry, orphaning the pointer in bestHeader. + fullBlockNode := chain.index.LookupNode(block4Hash) + if fullBlockNode != headerNode { + t.Fatal("ProcessBlock replaced the header node pointer " + + "instead of reusing it") + } + if !chain.bestHeader.Contains(fullBlockNode) { + t.Fatal("node no longer in bestHeader after ProcessBlock") + } +} diff --git a/blockchain/blockindex.go b/blockchain/blockindex.go index 5273cb488b..ff04c5bab6 100644 --- a/blockchain/blockindex.go +++ b/blockchain/blockindex.go @@ -33,6 +33,9 @@ const ( // has failed validation, thus the block is also invalid. statusInvalidAncestor + // statusHeaderStored indicates that the block's header is stored on disk. + statusHeaderStored + // statusNone indicates that the block has no validation state flags set. // // NOTE: This must be defined last in order to avoid influencing iota. @@ -46,6 +49,11 @@ func (status blockStatus) HaveData() bool { return status&statusDataStored != 0 } +// HaveHeader returns whether the header data is stored in the database. +func (status blockStatus) HaveHeader() bool { + return status&statusHeaderStored != 0 +} + // KnownValid returns whether the block is known to be valid. This will return // false for a valid block that has not been fully validated yet. func (status blockStatus) KnownValid() bool { @@ -377,14 +385,16 @@ func newBlockIndex(db database.DB, chainParams *chaincfg.Params) *blockIndex { } } -// HaveBlock returns whether or not the block index contains the provided hash. +// HaveBlock returns whether or not the block index contains the provided hash +// and if the data exists on disk. // // This function is safe for concurrent access. func (bi *blockIndex) HaveBlock(hash *chainhash.Hash) bool { bi.RLock() - _, hasBlock := bi.index[*hash] + node, hasBlock := bi.index[*hash] + haveData := hasBlock && node.status.HaveData() bi.RUnlock() - return hasBlock + return haveData } // LookupNode returns the block node identified by the provided hash. It will @@ -497,8 +507,37 @@ func (bi *blockIndex) flushToDB() error { return nil } + // Check if any dirty node actually needs to be written. Header-only + // nodes are skipped for backwards compatibility (see NOTE below), so + // if every dirty node is header-only, we can avoid opening a write + // transaction entirely. This matters during header sync where every + // ProcessBlockHeader call would otherwise open a no-op write txn. + needsWrite := false + for node := range bi.dirty { + if node.status.HaveData() { + needsWrite = true + break + } + } + if !needsWrite { + bi.dirty = make(map[*blockNode]struct{}) + bi.Unlock() + return nil + } + err := bi.db.Update(func(dbTx database.Tx) error { for node := range bi.dirty { + // NOTE: we specifically don't flush the block indexes that + // we don't have the data for backwards compatibility. + // While flushing would save us the work of re-downloading + // the block headers upon restart, if the user were to start + // up a btcd node with an older version, it would result in + // an unrecoverable error as older versions would consider a + // blockNode being present as having the block data as well. + if node.status.HaveHeader() && + !node.status.HaveData() { + continue + } err := dbStoreBlockNode(dbTx, node) if err != nil { return err diff --git a/blockchain/blockindex_test.go b/blockchain/blockindex_test.go index cd08969f14..47a47e9238 100644 --- a/blockchain/blockindex_test.go +++ b/blockchain/blockindex_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023 The utreexo developers +// Copyright (c) 2015-2026 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -7,8 +7,146 @@ package blockchain import ( "math/rand" "testing" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/database" + "github.com/btcsuite/btcd/wire" ) +// countingDB wraps a database.DB and counts the number of Update calls. +type countingDB struct { + database.DB + updates int +} + +// Update increments the updates counter on a call. +func (c *countingDB) Update(fn func(tx database.Tx) error) error { + c.updates++ + return c.DB.Update(fn) +} + +// TestFlushToDB tests that flushToDB only opens a write transaction when at +// least one dirty node has block data and skips the transaction when all dirty +// nodes are header-only. +func TestFlushToDB(t *testing.T) { + tests := []struct { + name string + + // statuses defines the dirty nodes to create for this test + // case. Each entry's status determines whether the node is + // header-only or has block data. A nil slice means no nodes + // are added (empty dirty set). + statuses []blockStatus + + // wantUpdates is the expected number of DB Update calls. + wantUpdates int + }{ + { + name: "empty dirty set", + statuses: nil, + wantUpdates: 0, + }, + { + name: "single header-only node", + statuses: []blockStatus{statusHeaderStored}, + wantUpdates: 0, + }, + { + name: "multiple header-only nodes", + statuses: []blockStatus{ + statusHeaderStored, + statusHeaderStored, + statusHeaderStored, + }, + wantUpdates: 0, + }, + { + name: "single data node", + statuses: []blockStatus{statusDataStored | statusHeaderStored}, + wantUpdates: 1, + }, + { + name: "header-only and data nodes mixed", + statuses: []blockStatus{ + statusHeaderStored, + statusDataStored | statusHeaderStored, + }, + wantUpdates: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + chain, teardown, err := chainSetup( + "flushtodbtest", &chaincfg.SimNetParams, + ) + if err != nil { + t.Fatalf("failed to setup chain: %v", err) + } + defer teardown() + + bi := chain.index + cdb := &countingDB{DB: bi.db} + bi.db = cdb + + // Create the dirty nodes for this test case, chaining + // each off the genesis tip. + tip := chain.bestChain.Tip() + var nodes []*blockNode + for i, status := range test.statuses { + node := newBlockNode(&wire.BlockHeader{ + PrevBlock: tip.hash, + Nonce: uint32(i), + }, tip) + node.status = status + bi.AddNode(node) + nodes = append(nodes, node) + tip = node + } + + err = bi.flushToDB() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cdb.updates != test.wantUpdates { + t.Fatalf("expected %d Update calls, got %d", + test.wantUpdates, cdb.updates) + } + + bi.RLock() + dirtyLen := len(bi.dirty) + bi.RUnlock() + + if dirtyLen != 0 { + t.Fatalf("expected dirty set to be empty, got %d", + dirtyLen) + } + + // Nodes with block data should be in the DB; + // header-only nodes should not. + for i, node := range nodes { + var found bool + err := bi.db.View(func(dbTx database.Tx) error { + bucket := dbTx.Metadata().Bucket(blockIndexBucketName) + key := blockIndexKey(&node.hash, uint32(node.height)) + found = bucket.Get(key) != nil + return nil + }) + if err != nil { + t.Fatalf("node %d: View failed: %v", i, err) + } + + wantInDB := node.status.HaveData() + if found != wantInDB { + t.Fatalf("node %d: in database = %v, want %v", + i, found, wantInDB) + } + } + }) + } +} + func TestAncestor(t *testing.T) { height := 500_000 blockNodes := chainedNodes(nil, height) diff --git a/blockchain/chain.go b/blockchain/chain.go index 952d0bc279..fb624ef9c4 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -128,8 +128,12 @@ type BlockChain struct { // // bestChain tracks the current active chain by making use of an // efficient chain view into the block index. - index *blockIndex - bestChain *chainView + // + // bestHeader tracks the current active header chain. The tip is the last + // header we have on the block index. + index *blockIndex + bestChain *chainView + bestHeader *chainView // The UTXO state holds a cached view of the UTXO state of the chain. // It is protected by the chain lock. @@ -191,9 +195,10 @@ type BlockChain struct { notifications []NotificationCallback } -// HaveBlock returns whether or not the chain instance has the block represented -// by the passed hash. This includes checking the various places a block can -// be like part of the main chain, on a side chain, or in the orphan pool. +// HaveBlock returns whether or not the chain instance has the block data +// represented by the passed hash. This includes checking the various places a +// block can be like part of the main chain, on a side chain, or in the orphan +// pool. // // This function is safe for concurrent access. func (b *BlockChain) HaveBlock(hash *chainhash.Hash) (bool, error) { @@ -1339,6 +1344,30 @@ func (b *BlockChain) BestSnapshot() *BestState { return snapshot } +// BestHeader returns the hash and the height of the best header. +func (b *BlockChain) BestHeader() (chainhash.Hash, int32) { + b.chainLock.RLock() + defer b.chainLock.RUnlock() + + best := b.bestHeader.Tip() + return best.hash, best.height +} + +// BestChainHeaderForkHeight returns the height of the fork point between the +// best chain and the best header chain. This can be used to determine the +// starting height for block downloads when the best header chain has diverged +// from the best chain due to a reorg. +func (b *BlockChain) BestChainHeaderForkHeight() int32 { + b.chainLock.RLock() + defer b.chainLock.RUnlock() + + fork := b.bestChain.FindFork(b.bestHeader.Tip()) + if fork == nil { + return 0 + } + return fork.height +} + // TipStatus is the status of a chain tip. type TipStatus byte @@ -1530,6 +1559,54 @@ func (b *BlockChain) BlockHashByHeight(blockHeight int32) (*chainhash.Hash, erro return &node.hash, nil } +// IsValidHeader checks that we've already checked that this header connects to the +// chain of best headers and did not receive an invalid state. +func (b *BlockChain) IsValidHeader(blockHash *chainhash.Hash) bool { + node := b.index.LookupNode(blockHash) + if node == nil || !b.bestHeader.Contains(node) { + return false + } + + return !b.index.NodeStatus(node).KnownInvalid() +} + +// LatestBlockLocatorByHeader returns a block locator for the latest known tip of the +// header chain. +// +// This function is safe for concurrent access. +func (b *BlockChain) LatestBlockLocatorByHeader() (BlockLocator, error) { + b.chainLock.RLock() + locator := b.bestHeader.BlockLocator(nil) + b.chainLock.RUnlock() + return locator, nil +} + +// HeaderHashByHeight returns the block header's hash given its height. +// +// NOTE: If the blockNode at the given blockHeight is not included in the +// bestHeader chain, the function will return an error indicating that the +// blockHeight wasn't found. +func (b *BlockChain) HeaderHashByHeight(blockHeight int32) ( + *chainhash.Hash, error) { + + node := b.bestHeader.NodeByHeight(blockHeight) + if node == nil || !b.bestHeader.Contains(node) { + return nil, fmt.Errorf("blockheight %v not found", blockHeight) + } + + return &node.hash, nil +} + +// HeaderHeightByHash returns the height of the header given its hash. +func (b *BlockChain) HeaderHeightByHash(blockHash chainhash.Hash) (int32, error) { + node := b.index.LookupNode(&blockHash) + if node == nil || !b.bestHeader.Contains(node) { + return -1, fmt.Errorf("blockhash %v not found", blockHash) + } + + return node.height, nil +} + // HeightRange returns a range of block hashes for the given start and end // heights. It is inclusive of the start height and exclusive of the end // height. The end height will be limited to the current main chain height. @@ -2188,6 +2265,7 @@ func New(config *Config) (*BlockChain, error) { utxoCache: newUtxoCache(config.DB, config.UtxoCacheMaxSize), hashCache: config.HashCache, bestChain: newChainView(nil), + bestHeader: newChainView(nil), orphans: make(map[chainhash.Hash]*orphanBlock), prevOrphans: make(map[chainhash.Hash][]*orphanBlock), warningCaches: newThresholdCaches(vbNumBits), diff --git a/blockchain/chain_test.go b/blockchain/chain_test.go index b3bccf56f7..b0e07cf3f2 100644 --- a/blockchain/chain_test.go +++ b/blockchain/chain_test.go @@ -21,6 +21,7 @@ import ( // TestHaveBlock tests the HaveBlock API to ensure proper functionality. func TestHaveBlock(t *testing.T) { // Load up blocks such that there is a side chain. + // We'll only process the header for block 4. // (genesis block) -> 1 -> 2 -> 3 -> 4 // \-> 3a testFiles := []string{ @@ -51,7 +52,29 @@ func TestHaveBlock(t *testing.T) { // maturity to 1. chain.TstSetCoinbaseMaturity(1) + // We want to process just the header for block 4. + block4Hash := newHashFromStr("000000002f264d6504013e73b9c913de9098d4d771c1bb219af475d2a01b128e") + for i := 1; i < len(blocks); i++ { + // Add just the header for the block 4. + if blocks[i].Hash().IsEqual(block4Hash) { + + isMainChain, err := chain.ProcessBlockHeader( + &blocks[i].MsgBlock().Header, BFNone, false) + if err != nil { + t.Errorf("ProcessBlockHeader fail on block %v: %v\n", + i, err) + return + } + if !isMainChain { + t.Errorf("ProcessBlockHeader incorrectly returned "+ + "block %v is a side-chain\n", i) + return + } + + continue + } + _, isOrphan, err := chain.ProcessBlock(blocks[i], BFNone) if err != nil { t.Errorf("ProcessBlock fail on block %v: %v\n", i, err) @@ -87,6 +110,9 @@ func TestHaveBlock(t *testing.T) { // Block 3a should be present (on a side chain). {hash: "00000000474284d20067a4d33f6a02284e6ef70764a3a26d6a5b9df52ef663dd", want: true}, + // Block 4 shouldn't be present as we only have its header. + {hash: "000000002f264d6504013e73b9c913de9098d4d771c1bb219af475d2a01b128e", want: false}, + // Block 100000 should be present (as an orphan). {hash: "000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506", want: true}, diff --git a/blockchain/chainio.go b/blockchain/chainio.go index 27028eac90..e85f581351 100644 --- a/blockchain/chainio.go +++ b/blockchain/chainio.go @@ -1079,6 +1079,7 @@ func (b *BlockChain) createChainState() error { node := newBlockNode(header, nil) node.status = statusDataStored | statusValid b.bestChain.SetTip(node) + b.bestHeader.SetTip(node) // Add the new node to the index which is used for faster lookups. b.index.addNode(node) @@ -1262,13 +1263,14 @@ func (b *BlockChain) initChainState() error { i++ } - // Set the best chain view to the stored best state. + // Set the best chain view and the best header to the stored best state. tip := b.index.LookupNode(&state.hash) if tip == nil { return AssertError(fmt.Sprintf("initChainState: cannot find "+ "chain tip %s in block index", state.hash)) } b.bestChain.SetTip(tip) + b.bestHeader.SetTip(tip) // Load the raw block bytes for the best block. blockBytes, err := dbTx.FetchBlock(&state.hash) diff --git a/blockchain/error.go b/blockchain/error.go index 8a7d4a7dc7..6104e7ab17 100644 --- a/blockchain/error.go +++ b/blockchain/error.go @@ -224,6 +224,11 @@ const ( // ErrTimewarpAttack indicates a timewarp attack i.e. // when block's timestamp is too early on diff adjustment block. ErrTimewarpAttack + + // ErrKnownInvalidBlock indicates that the block itself has previously + // been found to violate a consensus rule, as opposed to having an + // invalid ancestor. + ErrKnownInvalidBlock ) // Map of ErrorCode values back to their constant names for pretty printing. @@ -271,6 +276,7 @@ var errorCodeStrings = map[ErrorCode]string{ ErrPreviousBlockUnknown: "ErrPreviousBlockUnknown", ErrInvalidAncestorBlock: "ErrInvalidAncestorBlock", ErrPrevBlockNotBest: "ErrPrevBlockNotBest", + ErrKnownInvalidBlock: "ErrKnownInvalidBlock", } // String returns the ErrorCode as a human-readable name. diff --git a/blockchain/error_test.go b/blockchain/error_test.go index c0e56ab897..94b5dafda1 100644 --- a/blockchain/error_test.go +++ b/blockchain/error_test.go @@ -58,6 +58,7 @@ func TestErrorCodeStringer(t *testing.T) { {ErrPreviousBlockUnknown, "ErrPreviousBlockUnknown"}, {ErrInvalidAncestorBlock, "ErrInvalidAncestorBlock"}, {ErrPrevBlockNotBest, "ErrPrevBlockNotBest"}, + {ErrKnownInvalidBlock, "ErrKnownInvalidBlock"}, {0xffff, "Unknown ErrorCode (65535)"}, } diff --git a/blockchain/process.go b/blockchain/process.go index 64d5c1e14f..0c114f4758 100644 --- a/blockchain/process.go +++ b/blockchain/process.go @@ -11,6 +11,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/database" + "github.com/btcsuite/btcd/wire" ) // BehaviorFlags is a bitmask defining tweaks to the normal behavior when @@ -242,3 +243,34 @@ func (b *BlockChain) ProcessBlock(block *btcutil.Block, flags BehaviorFlags) (bo return isMainChain, false, nil } + +// ProcessBlockHeader is the main workhorse for handling insertion of new block +// headers into the block chain using headers-first semantics. It includes +// functionality such as rejecting headers that do not connect to an existing +// known header, ensuring headers follow all rules and insertion into the block +// index. +// +// Block headers that have already been inserted are ignored, unless they have +// subsequently been marked invalid, in which case an appropriate error is +// returned. +// +// It should be noted that this function intentionally does not accept block +// headers that do not connect to an existing known header or to headers which +// are already known to be a part of an invalid branch. This means headers must +// be processed in order. +// +// The skipCheckpoint boolean allows skipping of the check for if the header is +// part of the existing checkpoints. +// +// The returned boolean indicates whether or not the header was in the main chain +// or not. +// +// This function is safe for concurrent access. +func (b *BlockChain) ProcessBlockHeader(header *wire.BlockHeader, + flags BehaviorFlags, skipCheckpoint bool) (bool, error) { + + b.chainLock.Lock() + defer b.chainLock.Unlock() + + return b.maybeAcceptBlockHeader(header, flags, skipCheckpoint) +} diff --git a/blockchain/process_test.go b/blockchain/process_test.go new file mode 100644 index 0000000000..bc5de726ee --- /dev/null +++ b/blockchain/process_test.go @@ -0,0 +1,182 @@ +package blockchain + +import ( + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/btcsuite/btcd/blockchain/internal/testhelper" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +// chainedHeaders returns desired amount of connected headers from the parentHeight. +func chainedHeaders(parent *wire.BlockHeader, chainParams *chaincfg.Params, + parentHeight int32, numHeaders int) []*wire.BlockHeader { + + headers := make([]*wire.BlockHeader, 0, numHeaders) + tip := parent + + blockHeight := parentHeight + for range numHeaders { + // Use a timestamp that is one second after the previous block unless + // this is the first block in which case the current time is used. + var ts time.Time + if blockHeight == 1 { + ts = time.Unix(time.Now().Unix(), 0) + } else { + ts = tip.Timestamp.Add(time.Second) + } + + var randBytes [4]byte + rand.Read(randBytes[:]) + merkle := chainhash.HashH(randBytes[:]) + + header := wire.BlockHeader{ + Version: 1, + PrevBlock: tip.BlockHash(), + MerkleRoot: merkle, + Bits: chainParams.PowLimitBits, + Timestamp: ts, + Nonce: 0, + } + if !testhelper.SolveBlock(&header) { + panic(fmt.Sprintf("Unable to solve block at height %d", + blockHeight)) + } + headers = append(headers, &header) + tip = &header + } + + return headers +} + +func TestProcessBlockHeader(t *testing.T) { + chain, params, tearDown := utxoCacheTestChain("TestProcessBlockHeader") + defer tearDown() + + // Generate and process the intial 10 block headers. + // + // genesis -> 1 -> 2 -> ... -> 10 (active) + headers := chainedHeaders(¶ms.GenesisBlock.Header, params, 0, 10) + + // Set checkpoint at block 4. + fourthHeader := headers[3] + fourthHeaderHash := fourthHeader.BlockHash() + checkpoint := chaincfg.Checkpoint{ + Height: 4, + Hash: &fourthHeaderHash, + } + chain.checkpoints = append(chain.checkpoints, checkpoint) + chain.checkpointsByHeight = make(map[int32]*chaincfg.Checkpoint) + chain.checkpointsByHeight[checkpoint.Height] = &checkpoint + + for _, header := range headers { + isMainChain, err := chain.ProcessBlockHeader(header, BFNone, false) + require.NoError(t, err) + require.True(t, isMainChain) + } + + // Check that the tip is correct. + lastHeader := headers[len(headers)-1] + lastHeaderHash := lastHeader.BlockHash() + tipNode := chain.bestHeader.Tip() + require.Equal(t, lastHeaderHash, tipNode.hash) + require.Equal(t, statusHeaderStored, tipNode.status) + require.Equal(t, int32(len(headers)), tipNode.height) + + // Create invalid header at the checkpoint. + thirdHeaderHash := headers[2].BlockHash() + thirdNode := chain.index.LookupNode(&thirdHeaderHash) + invalidForkHeight := thirdNode.height + invalidHeaders := chainedHeaders(headers[2], params, invalidForkHeight, 1) + + // Check that the header fails validation. + _, err := chain.ProcessBlockHeader(invalidHeaders[0], BFNone, false) + require.Errorf(t, err, + "invalidHeader %v passed verification but "+ + "should've failed verification "+ + "as the header doesn't match the checkpoint", + invalidHeaders[0].BlockHash().String(), + ) + + // Create sidechain block headers. + // + // genesis -> 1 -> 2 -> 3 -> 4 -> 5 -> ... -> 10 (active) + // \-> 6 -> ... -> 8 (valid-fork) + blockHash := headers[4].BlockHash() + node := chain.index.LookupNode(&blockHash) + forkHeight := node.height + sideChainHeaders := chainedHeaders(headers[4], params, node.height, 3) + sidechainTip := sideChainHeaders[len(sideChainHeaders)-1] + + // Test that the last block header fails as it's missing the previous block + // header. + _, err = chain.ProcessBlockHeader(sidechainTip, BFNone, false) + require.Errorf(t, err, + "sideChainHeader %v passed verification but "+ + "should've failed verification"+ + "as the previous header is not known", + sideChainHeaders[len(sideChainHeaders)-1].BlockHash().String(), + ) + + // Verify that the side-chain headers verify. + for _, header := range sideChainHeaders { + isMainChain, err := chain.ProcessBlockHeader(header, BFNone, false) + require.NoError(t, err) + require.False(t, isMainChain) + } + + // Check that the tip is still the same as before. + tipNode = chain.bestHeader.Tip() + require.Equal(t, lastHeaderHash, tipNode.hash) + require.Equal(t, statusHeaderStored, tipNode.status) + require.Equal(t, int32(len(headers)), tipNode.height) + + // Verify that the side-chain extending headers verify. + sidechainExtendingHeaders := chainedHeaders( + sidechainTip, params, forkHeight+int32(len(sideChainHeaders)), 10) + for _, header := range sidechainExtendingHeaders { + isMainChain, err := chain.ProcessBlockHeader(header, BFNone, false) + require.NoError(t, err) + + blockHash := header.BlockHash() + node := chain.index.LookupNode(&blockHash) + if node.height <= 10 { + require.False(t, isMainChain) + } else { + require.True(t, isMainChain) + } + } + + // Create more sidechain block headers so that it becomes the active chain. + // + // genesis -> 1 -> 2 -> 3 -> 4 -> 5 -> ... -> 10 (valid-fork) + // \-> 6 -> ... -> 18 (active) + lastSideChainHeaderIdx := len(sidechainExtendingHeaders) - 1 + lastSidechainHeader := sidechainExtendingHeaders[lastSideChainHeaderIdx] + lastSidechainHeaderHash := lastSidechainHeader.BlockHash() + + // Check that the tip is now different. + tipNode = chain.bestHeader.Tip() + require.Equal(t, lastSidechainHeaderHash, tipNode.hash) + require.Equal(t, statusHeaderStored, tipNode.status) + require.Equal(t, + int32(len(sideChainHeaders)+len(sidechainExtendingHeaders))+forkHeight, + tipNode.height) + + // Extend the original headers and check it still verifies. + extendedOrigHdrs := chainedHeaders(lastHeader, params, int32(len(headers)), 2) + for _, header := range extendedOrigHdrs { + isMainChain, err := chain.ProcessBlockHeader(header, BFNone, false) + require.NoError(t, err) + require.False(t, isMainChain) + } + + // Check that the tip didn't change. + tipNode = chain.bestHeader.Tip() + require.Equal(t, lastSidechainHeaderHash, tipNode.hash) +} diff --git a/integration/reorg_test.go b/integration/reorg_test.go new file mode 100644 index 0000000000..e9463d4911 --- /dev/null +++ b/integration/reorg_test.go @@ -0,0 +1,138 @@ +package integration + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/btcjson" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/rpcclient" + "github.com/stretchr/testify/require" +) + +// TestReorgFromForkPoint tests that when two nodes with a shared chain history +// diverge and reconnect, the shorter node correctly downloads blocks starting +// from the fork point. This is a regression test for a bug where +// fetchHeaderBlocks started from bestState.Height + 1 instead of the fork +// point between the best chain and the best header chain, causing blocks +// between the fork point and the current height on the new chain to never be +// downloaded. +func TestReorgFromForkPoint(t *testing.T) { + t.Parallel() + + const ( + sharedBlocks = 10 + shorterBlocks = 5 + longerBlocks = 20 + ) + var ( + sharedHeight = int32(sharedBlocks) + shorterHeight = sharedHeight + int32(shorterBlocks) + longerHeight = sharedHeight + int32(longerBlocks) + forkBranchLen = int32(shorterBlocks) + ) + + longer, err := rpctest.New(&chaincfg.SimNetParams, nil, []string{}, "") + require.NoError(t, err) + require.NoError(t, longer.SetUp(false, 0)) + t.Cleanup(func() { require.NoError(t, longer.TearDown()) }) + + shorter, err := rpctest.New(&chaincfg.SimNetParams, nil, []string{}, "") + require.NoError(t, err) + require.NoError(t, shorter.SetUp(false, 0)) + t.Cleanup(func() { require.NoError(t, shorter.TearDown()) }) + + // Mine the shared chain on the longer node before connecting so that + // it is "current" and can serve headers/blocks to the shorter node. + _, err = longer.Client.Generate(sharedBlocks) + require.NoError(t, err) + + // Connect and let the shorter node sync the shared chain. + require.NoError(t, rpctest.ConnectNode(shorter, longer)) + require.NoError(t, rpctest.JoinNodes( + []*rpctest.Harness{longer, shorter}, rpctest.Blocks, + )) + + // Disconnect so they can mine independently. + require.NoError(t, shorter.Client.AddNode( + longer.P2PAddress(), rpcclient.ANRemove, + )) + + // Wait for both sides to see the disconnect. + require.Eventually(t, + func() bool { + p1, err1 := longer.Client.GetPeerInfo() + p2, err2 := shorter.Client.GetPeerInfo() + + return err1 == nil && err2 == nil && + len(p1) == 0 && len(p2) == 0 + }, + 5*time.Second, 100*time.Millisecond, + ) + + // Mine divergent chains. Both chains diverge at the shared block at + // the shared height. + // + // Mine the shorter chain. + _, err = shorter.Client.Generate(shorterBlocks) + require.NoError(t, err) + _, gotShorterHeight, err := shorter.Client.GetBestBlock() + require.NoError(t, err) + require.Equal(t, shorterHeight, gotShorterHeight) + + // Now the longer chain. + _, err = longer.Client.Generate(longerBlocks) + require.NoError(t, err) + _, gotLongerHeight, err := longer.Client.GetBestBlock() + require.NoError(t, err) + require.Equal(t, longerHeight, gotLongerHeight) + + // Verify the chains actually diverged at the first block after the + // fork point. + forkDivergence := int64(sharedHeight + 1) + longerHashAtFork, err := longer.Client.GetBlockHash(forkDivergence) + require.NoError(t, err) + shorterHashAtFork, err := shorter.Client.GetBlockHash(forkDivergence) + require.NoError(t, err) + require.NotEqual(t, longerHashAtFork, shorterHashAtFork, + "chains should have diverged at height %d", forkDivergence) + + // Reconnect. The shorter node should reorg to the longer chain. + // This requires fetchHeaderBlocks to download blocks from the fork + // point rather than from the shorter node's height, since blocks + // after the fork point on the longer chain differ from the shorter's. + require.NoError(t, rpctest.ConnectNode(shorter, longer)) + require.NoError(t, rpctest.JoinNodes( + []*rpctest.Harness{longer, shorter}, rpctest.Blocks, + )) + + // Both nodes should be on the same chain at the longer height. + longerBestHash, gotLongerHeight, err := longer.Client.GetBestBlock() + require.NoError(t, err) + shorterBestHash, gotShorterHeight, err := shorter.Client.GetBestBlock() + require.NoError(t, err) + require.Equal(t, longerHeight, gotLongerHeight) + require.Equal(t, longerHeight, gotShorterHeight) + require.Equal(t, longerBestHash, shorterBestHash) + + // Verify the reorged node has the orphaned fork as a side chain. + tips, err := shorter.Client.GetChainTips() + require.NoError(t, err) + require.Equal(t, 2, len(tips)) + + var activeTip, forkTip *btcjson.GetChainTipsResult + for _, tip := range tips { + switch tip.Status { + case "active": + activeTip = tip + case "valid-fork": + forkTip = tip + } + } + require.NotNil(t, activeTip) + require.Equal(t, longerHeight, activeTip.Height) + require.NotNil(t, forkTip) + require.Equal(t, shorterHeight, forkTip.Height) + require.Equal(t, forkBranchLen, forkTip.BranchLen) +} diff --git a/netsync/manager.go b/netsync/manager.go index d10188d53f..44c5fac66b 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -5,7 +5,6 @@ package netsync import ( - "container/list" "math/rand" "net" "sync" @@ -24,8 +23,8 @@ import ( const ( // minInFlightBlocks is the minimum number of blocks that should be - // in the request queue for headers-first mode before requesting - // more. + // in the request queue for the initial block download mode before + // requesting more. minInFlightBlocks = 10 // maxRejectedTxns is the maximum number of rejected transactions @@ -198,32 +197,13 @@ type SyncManager struct { peerStates map[*peerpkg.Peer]*peerSyncState lastProgressTime time.Time - // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint + // The following fields are used for the initial block download mode. + ibdMode bool // An optional fee estimator. feeEstimator *mempool.FeeEstimator } -// resetHeaderState sets the headers-first mode state to values appropriate for -// syncing from a new peer. -func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { - sm.headersFirstMode = false - sm.headerList.Init() - sm.startHeader = nil - - // When there is a next checkpoint, add an entry for the latest known - // block into the header pool. This allows the next downloaded header - // to prove it links to the chain properly. - if sm.nextCheckpoint != nil { - node := headerNode{height: newestHeight, hash: newestHash} - sm.headerList.PushBack(&node) - } -} - // findNextHeaderCheckpoint returns the next checkpoint after the passed height. // It returns nil when there is not one either because the height is already // later than the final checkpoint or some other reason such as disabled @@ -252,6 +232,66 @@ func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoi return nextCheckpoint } +// fetchHigherPeers returns all the peers that are at a higher block than the +// given height. The peers that are not sync candidates are omitted from the +// returned list. +func (sm *SyncManager) fetchHigherPeers(height int32) []*peerpkg.Peer { + higherPeers := make([]*peerpkg.Peer, 0, len(sm.peerStates)) + for peer, state := range sm.peerStates { + if !state.syncCandidate { + continue + } + + if peer.LastBlock() <= height { + continue + } + + higherPeers = append(higherPeers, peer) + } + + return higherPeers +} + +// isInIBDMode returns true if there's more blocks needed to be downloaded to +// catch up to the latest chain tip. +func (sm *SyncManager) isInIBDMode() bool { + best := sm.chain.BestSnapshot() + higherPeers := sm.fetchHigherPeers(best.Height) + if sm.chain.IsCurrent() && len(higherPeers) == 0 { + return false + } + + return true +} + +// fetchHeaders randomly picks a peer that has a higher advertised header +// and pushes a get headers message to it. +func (sm *SyncManager) fetchHeaders() { + _, height := sm.chain.BestHeader() + higherPeers := sm.fetchHigherPeers(height) + if len(higherPeers) == 0 { + log.Warnf("No sync peer candidates available") + return + } + bestPeer := higherPeers[rand.Intn(len(higherPeers))] + + locator, err := sm.chain.LatestBlockLocatorByHeader() + if err != nil { + log.Errorf("Failed to get block locator for the "+ + "latest block header: %v", err) + return + } + + log.Infof("Downloading headers for blocks %d to "+ + "%d from peer %s", height+1, + bestPeer.LastBlock(), bestPeer.Addr()) + + bestPeer.PushGetHeadersMsg(locator, &zeroHash) + + sm.ibdMode = true + sm.syncPeer = bestPeer +} + // startSync will choose the best peer among the available candidate peers to // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer @@ -262,60 +302,35 @@ func (sm *SyncManager) startSync() { return } - // Once the segwit soft-fork package has activated, we only - // want to sync from peers which are witness enabled to ensure - // that we fully validate all blockchain data. - segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) - if err != nil { - log.Errorf("Unable to query for segwit soft-fork state: %v", err) + // Check to see if we're in the initial block download mode. + if !sm.isInIBDMode() { return } - best := sm.chain.BestSnapshot() - var higherPeers, equalPeers []*peerpkg.Peer - for peer, state := range sm.peerStates { - if !state.syncCandidate { - continue - } + // If we're in the initial block download mode, check if we have + // peers that we can download headers from. + _, bestHeaderHeight := sm.chain.BestHeader() + higherHeaderPeers := sm.fetchHigherPeers(bestHeaderHeight) + if len(higherHeaderPeers) != 0 { + sm.fetchHeaders() - if segwitActive && !peer.IsWitnessEnabled() { - log.Debugf("peer %v not witness enabled, skipping", peer) - continue - } - - // Remove sync candidate peers that are no longer candidates due - // to passing their latest known block. NOTE: The < is - // intentional as opposed to <=. While technically the peer - // doesn't have a later block when it's equal, it will likely - // have one soon so it is a reasonable choice. It also allows - // the case where both are at 0 such as during regression test. - if peer.LastBlock() < best.Height { - state.syncCandidate = false - continue - } - - // If the peer is at the same height as us, we'll add it a set - // of backup peers in case we do not find one with a higher - // height. If we are synced up with all of our peers, all of - // them will be in this set. - if peer.LastBlock() == best.Height { - equalPeers = append(equalPeers, peer) - continue + // Reset the last progress time now that we have a + // non-nil syncPeer to avoid the stall handler firing + // before any headers have been received. + if sm.syncPeer != nil { + sm.lastProgressTime = time.Now() } - - // This peer has a height greater than our own, we'll consider - // it in the set of better peers from which we'll randomly - // select. - higherPeers = append(higherPeers, peer) - } - - if sm.chain.IsCurrent() && len(higherPeers) == 0 { - log.Infof("Caught up to block %s(%d)", best.Hash.String(), best.Height) return } - // Pick randomly from the set of peers greater than our block height, - // falling back to a random peer of the same height if none are greater. + // We don't have any more headers to download at this + // point so start asking for blocks. + best := sm.chain.BestSnapshot() + higherPeers := sm.fetchHigherPeers(best.Height) + + // Pick randomly from the set of peers greater than our + // block height, falling back to a random peer of the same + // height if none are greater. // // TODO(conner): Use a better algorithm to ranking peers based on // observed metrics and/or sync in parallel. @@ -323,66 +338,24 @@ func (sm *SyncManager) startSync() { switch { case len(higherPeers) > 0: bestPeer = higherPeers[rand.Intn(len(higherPeers))] - - case len(equalPeers) > 0: - bestPeer = equalPeers[rand.Intn(len(equalPeers))] } - // Start syncing from the best peer if one was selected. - if bestPeer != nil { - // Clear the requestedBlocks if the sync peer changes, otherwise - // we may ignore blocks we need that the last sync peer failed - // to send. - sm.requestedBlocks = make(map[chainhash.Hash]struct{}) + if bestPeer == nil { + log.Warnf("No sync peer candidates available") + return + } - locator, err := sm.chain.LatestBlockLocator() - if err != nil { - log.Errorf("Failed to get block locator for the "+ - "latest block: %v", err) - return - } + sm.syncPeer = bestPeer + sm.ibdMode = true - log.Infof("Syncing to block height %d from peer %v", - bestPeer.LastBlock(), bestPeer.Addr()) - - // When the current height is less than a known checkpoint we - // can use block headers to learn about which blocks comprise - // the chain up to the checkpoint and perform less validation - // for them. This is possible since each header contains the - // hash of the previous header and a merkle root. Therefore if - // we validate all of the received headers link together - // properly and the checkpoint hashes match, we can be sure the - // hashes for the blocks in between are accurate. Further, once - // the full blocks are downloaded, the merkle root is computed - // and compared against the value in the header which proves the - // full block hasn't been tampered with. - // - // Once we have passed the final checkpoint, or checkpoints are - // disabled, use standard inv messages learn about the blocks - // and fully validate them. Finally, regression test mode does - // not support the headers-first approach so do normal block - // downloads when in regression test mode. - if sm.nextCheckpoint != nil && - best.Height < sm.nextCheckpoint.Height && - sm.chainParams != &chaincfg.RegressionNetParams { - - bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) - sm.headersFirstMode = true - log.Infof("Downloading headers for blocks %d to "+ - "%d from peer %s", best.Height+1, - sm.nextCheckpoint.Height, bestPeer.Addr()) - } else { - bestPeer.PushGetBlocksMsg(locator, &zeroHash) - } - sm.syncPeer = bestPeer + // Reset the last progress time now that we have a non-nil + // syncPeer to avoid instantly detecting it as stalled in the + // event the progress time hasn't been updated recently. + sm.lastProgressTime = time.Now() - // Reset the last progress time now that we have a non-nil - // syncPeer to avoid instantly detecting it as stalled in the - // event the progress time hasn't been updated recently. - sm.lastProgressTime = time.Now() - } else { - log.Warnf("No sync peer candidates available") - } + log.Infof("Syncing to block height %d from peer %v", + sm.syncPeer.LastBlock(), sm.syncPeer.Addr()) + sm.fetchHeaderBlocks(sm.syncPeer) } // isSyncCandidate returns whether or not the peer is a candidate to consider @@ -592,12 +565,6 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { sm.syncPeer.Disconnect() } - // Reset any header state before we choose our next active sync peer. - if sm.headersFirstMode { - best := sm.chain.BestSnapshot() - sm.resetHeaderState(&best.Hash, best.Height) - } - sm.syncPeer = nil sm.startSync() } @@ -690,6 +657,47 @@ func (sm *SyncManager) current() bool { return true } +// checkHeadersList checks if the sync manager is in the initial block download +// mode and returns if the given block hash is a checkpointed block and the +// behavior flags for this block. If the block is still under the checkpoint, +// then it's given the fast-add flag. +func (sm *SyncManager) checkHeadersList(blockHash *chainhash.Hash) ( + bool, blockchain.BehaviorFlags) { + + // Always return false and BFNone if we're not in ibd mode. + if !sm.ibdMode { + return false, blockchain.BFNone + } + + isCheckpointBlock := false + behaviorFlags := blockchain.BFNone + + // If we don't already know this is a valid header, return false and + // BFNone. + if !sm.chain.IsValidHeader(blockHash) { + return false, blockchain.BFNone + } + + height, err := sm.chain.HeaderHeightByHash(*blockHash) + if err != nil { + return false, blockchain.BFNone + } + + // Since findNextHeaderCheckpoint returns the next checkpoint after the + // passed height, we do a -1 to include the current block. + checkpoint := sm.findNextHeaderCheckpoint(height - 1) + if checkpoint == nil { + return false, blockchain.BFNone + } + + behaviorFlags |= blockchain.BFFastAdd + if blockHash.IsEqual(checkpoint.Hash) { + isCheckpointBlock = true + } + + return isCheckpointBlock, behaviorFlags +} + // handleBlockMsg handles block messages from all peers. func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer @@ -715,29 +723,10 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // When in headers-first mode, if the block matches the hash of the - // first header in the list of headers that are being fetched, it's - // eligible for less validation since the headers have already been - // verified to link together and are valid up to the next checkpoint. - // Also, remove the list entry for all blocks except the checkpoint - // since it is needed to verify the next round of headers links - // properly. - isCheckpointBlock := false - behaviorFlags := blockchain.BFNone - if sm.headersFirstMode { - firstNodeEl := sm.headerList.Front() - if firstNodeEl != nil { - firstNode := firstNodeEl.Value.(*headerNode) - if blockHash.IsEqual(firstNode.hash) { - behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { - isCheckpointBlock = true - } else { - sm.headerList.Remove(firstNodeEl) - } - } - } - } + // Check if the block is eligible for less validation since the headers + // have already been verified to link together and are valid up to the + // next checkpoint. + isCheckpointBlock, behaviorFlags := sm.checkHeadersList(blockHash) // Remove block from request maps. Either chain will know about it and // so we shouldn't have any more instances of trying to fetch it, or we @@ -845,85 +834,92 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // If we are not in headers first mode, it's a good time to periodically - // flush the blockchain cache because we don't expect new blocks immediately. - // After that, there is nothing more to do. - if !sm.headersFirstMode { + // If we are not in the initial block download mode, it's a good time to + // periodically flush the blockchain cache because we don't expect new + // blocks immediately. After that, there is nothing more to do. + if !sm.ibdMode { if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { log.Errorf("Error while flushing the blockchain cache: %v", err) } return } - // This is headers-first mode, so if the block is not a checkpoint - // request more blocks using the header list when the request queue is - // getting short. - if !isCheckpointBlock { - if sm.startHeader != nil && - len(state.requestedBlocks) < minInFlightBlocks { - sm.fetchHeaderBlocks() + // If we're on a checkpointed block, check if we still have checkpoints + // to let the user know if we're switching to normal mode. + if isCheckpointBlock { + log.Infof("Continuing IBD, on checkpoint block %v(%v)", + bmsg.block.Hash(), bmsg.block.Height()) + nextCheckpoint := sm.findNextHeaderCheckpoint(bmsg.block.Height()) + if nextCheckpoint == nil { + log.Infof("Reached the final checkpoint -- " + + "switching to normal mode") } - return } - // This is headers-first mode and the block is a checkpoint. When - // there is a next checkpoint, get the next round of headers by asking - // for headers starting from the block after this one up to the next - // checkpoint. - prevHeight := sm.nextCheckpoint.Height - prevHash := sm.nextCheckpoint.Hash - sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) - if sm.nextCheckpoint != nil { - locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) - return - } - log.Infof("Downloading headers for blocks %d to %d from "+ - "peer %s", prevHeight+1, sm.nextCheckpoint.Height, - sm.syncPeer.Addr()) + // Fetch more blocks if we're still not caught up to the best header and + // the number of in-flight blocks has dropped below the minimum threshold. + _, lastHeight := sm.chain.BestHeader() + if bmsg.block.Height() < lastHeight && + len(state.requestedBlocks) < minInFlightBlocks { + sm.fetchHeaderBlocks(sm.syncPeer) return } - // This is headers-first mode, the block is a checkpoint, and there are - // no more checkpoints, so switch to normal mode by requesting blocks - // from the block after this one up to the end of the chain (zero hash). - sm.headersFirstMode = false - sm.headerList.Init() - log.Infof("Reached the final checkpoint -- switching to normal mode") - locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Warnf("Failed to send getblocks message to peer %s: %v", - peer.Addr(), err) - return + if bmsg.block.Height() >= lastHeight { + log.Infof("Finished the initial block download and "+ + "caught up to block %v(%v) -- now listening to blocks.", + bmsg.block.Hash(), bmsg.block.Height()) + sm.ibdMode = false } } -// fetchHeaderBlocks creates and sends a request to the syncPeer for the next +// fetchHeaderBlocks creates and sends a request to the given peer for the next // list of blocks to be downloaded based on the current list of headers. -func (sm *SyncManager) fetchHeaderBlocks() { - // Nothing to do if there is no start header. - if sm.startHeader == nil { - log.Warnf("fetchHeaderBlocks called with no start header") +func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) { + if peer == nil { + log.Warnf("fetchHeaderBlocks called with a nil peer") return } - // Build up a getdata request for the list of blocks the headers - // describe. The size hint will be limited to wire.MaxInvPerMsg by - // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) + gdmsg := sm.buildBlockRequest(peer) + if len(gdmsg.InvList) > 0 { + peer.QueueMessage(gdmsg, nil) + } +} + +// buildBlockRequest builds a getdata message for blocks that need to be +// downloaded based on the current list of headers. +// +// Start fetching from the fork point between the best chain and +// the best header chain rather than from the best chain height. +// When the best header chain has diverged (e.g. due to a reorg), +// blocks between the fork point and the current height on the new +// chain are different and must also be downloaded. +func (sm *SyncManager) buildBlockRequest(peer *peerpkg.Peer) *wire.MsgGetData { + // Return early if the peer is nil. + if peer == nil { + return wire.NewMsgGetDataSizeHint(0) + } + + _, bestHeaderHeight := sm.chain.BestHeader() + forkHeight := sm.chain.BestChainHeaderForkHeight() + if bestHeaderHeight < forkHeight { + // Should never happen but we're guarding against the uint cast + // that happens below. + return wire.NewMsgGetDataSizeHint(0) + } + length := bestHeaderHeight - forkHeight + gdmsg := wire.NewMsgGetDataSizeHint(uint(length)) numRequested := 0 - for e := sm.startHeader; e != nil; e = e.Next() { - node, ok := e.Value.(*headerNode) - if !ok { - log.Warn("Header list node type is not a headerNode") - continue + for h := forkHeight + 1; h <= bestHeaderHeight; h++ { + hash, err := sm.chain.HeaderHashByHeight(h) + if err != nil { + log.Warnf("error while fetching the block hash for height %v -- %v", + h, err) + return gdmsg } - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) + iv := wire.NewInvVect(wire.InvTypeBlock, hash) haveInv, err := sm.haveInventory(iv) if err != nil { log.Warnf("Unexpected failure when checking for "+ @@ -931,33 +927,42 @@ func (sm *SyncManager) fetchHeaderBlocks() { "fetch: %v", err) } if !haveInv { - syncPeerState := sm.peerStates[sm.syncPeer] + // Skip blocks that are already in-flight to avoid + // sending duplicate getdata requests. Duplicates + // cause the peer to send the block twice; the second + // copy arrives after the first has been processed and + // removed from requestedBlocks, triggering an + // "unrequested block" disconnect. + if _, exists := sm.requestedBlocks[*hash]; exists { + continue + } + + peerState := sm.peerStates[peer] - sm.requestedBlocks[*node.hash] = struct{}{} - syncPeerState.requestedBlocks[*node.hash] = struct{}{} + sm.requestedBlocks[*hash] = struct{}{} + peerState.requestedBlocks[*hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the // witness data in the blocks. - if sm.syncPeer.IsWitnessEnabled() { + if peer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessBlock } gdmsg.AddInvVect(iv) numRequested++ } - sm.startHeader = e.Next() + if numRequested >= wire.MaxInvPerMsg { break } } - if len(gdmsg.InvList) > 0 { - sm.syncPeer.QueueMessage(gdmsg, nil) - } + return gdmsg } // handleHeadersMsg handles block header messages from all peers. Headers are -// requested when performing a headers-first sync. +// requested when performing the ibd and are propagated by peers once the ibd +// is complete. func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer _, exists := sm.peerStates[peer] @@ -966,102 +971,53 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - // The remote peer is misbehaving if we didn't request headers. + // Nothing to do for an empty headers message. msg := hmsg.headers numHeaders := len(msg.Headers) - if !sm.headersFirstMode { - log.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, peer.Addr()) - peer.Disconnect() - return - } - - // Nothing to do for an empty headers message. if numHeaders == 0 { return } - // Process all of the received headers ensuring each one connects to the - // previous and that checkpoints match. - receivedCheckpoint := false - var finalHash *chainhash.Hash for _, blockHeader := range msg.Headers { - blockHash := blockHeader.BlockHash() - finalHash = &blockHash - - // Ensure there is a previous header to compare against. - prevNodeEl := sm.headerList.Back() - if prevNodeEl == nil { - log.Warnf("Header list does not contain a previous" + - "element as expected -- disconnecting peer") + _, err := sm.chain.ProcessBlockHeader( + blockHeader, blockchain.BFNone, false, + ) + if err != nil { + log.Warnf("Received block header from peer %v "+ + "failed header verification -- disconnecting", + peer.Addr()) peer.Disconnect() return } - // Ensure the header properly connects to the previous one and - // add it to the list of headers. - node := headerNode{hash: &blockHash} - prevNode := prevNodeEl.Value.(*headerNode) - if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { - node.height = prevNode.height + 1 - e := sm.headerList.PushBack(&node) - if sm.startHeader == nil { - sm.startHeader = e - } - } else { - log.Warnf("Received block header that does not "+ - "properly connect to the chain from peer %s "+ - "-- disconnecting", peer.Addr()) - peer.Disconnect() + sm.progressLogger.SetLastLogTime(time.Now()) + } + + bestHash, bestHeight := sm.chain.BestHeader() + if sm.ibdMode { + if sm.syncPeer == nil { + // Return if we've disconnected from the syncPeer. return } - // Verify the header at the next checkpoint height matches. - if node.height == sm.nextCheckpoint.Height { - if node.hash.IsEqual(sm.nextCheckpoint.Hash) { - receivedCheckpoint = true - log.Infof("Verified downloaded block "+ - "header against checkpoint at height "+ - "%d/hash %s", node.height, node.hash) - } else { - log.Warnf("Block header at height %d/hash "+ - "%s from peer %s does NOT match "+ - "expected checkpoint hash of %s -- "+ - "disconnecting", node.height, - node.hash, peer.Addr(), - sm.nextCheckpoint.Hash) - peer.Disconnect() - return - } - break + // Update the last progress time to prevent the stall handler + // from disconnecting the sync peer during header download. + if peer == sm.syncPeer { + sm.lastProgressTime = time.Now() } - } - // When this header is a checkpoint, switch to fetching the blocks for - // all of the headers since the last checkpoint. - if receivedCheckpoint { - // Since the first entry of the list is always the final block - // that is already in the database and is only used to ensure - // the next header links properly, it must be removed before - // fetching the blocks. - sm.headerList.Remove(sm.headerList.Front()) - log.Infof("Received %v block headers: Fetching blocks", - sm.headerList.Len()) - sm.progressLogger.SetLastLogTime(time.Now()) - sm.fetchHeaderBlocks() - return + if bestHeight < sm.syncPeer.LastBlock() { + locator := blockchain.BlockLocator([]*chainhash.Hash{&bestHash}) + sm.syncPeer.PushGetHeadersMsg(locator, &zeroHash) + return + } } - // This header is not a checkpoint, so request the next batch of - // headers starting from the latest known header and ending with the - // next checkpoint. - locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) - return - } + bestHeaderHash, bestHeaderHeight := sm.chain.BestHeader() + log.Infof("downloaded headers to %v(%v) from peer %v "+ + "-- now fetching blocks", + bestHeaderHash, bestHeaderHeight, hmsg.peer.String()) + sm.fetchHeaderBlocks(peer) } // handleNotFoundMsg handles notfound messages from all peers. @@ -1210,8 +1166,8 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // for the peer. peer.AddKnownInventory(iv) - // Ignore inventory when we're in headers-first mode. - if sm.headersFirstMode { + // Ignore inventory when we're in the initial block download mode. + if sm.ibdMode { continue } @@ -1685,19 +1641,11 @@ func New(config *Config) (*SyncManager, error) { peerStates: make(map[*peerpkg.Peer]*peerSyncState), progressLogger: newBlockProgressLogger("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), quit: make(chan struct{}), feeEstimator: config.FeeEstimator, } - best := sm.chain.BestSnapshot() - if !config.DisableCheckpoints { - // Initialize the next checkpoint based on the current height. - sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height) - if sm.nextCheckpoint != nil { - sm.resetHeaderState(&best.Hash, best.Height) - } - } else { + if config.DisableCheckpoints { log.Info("Checkpoints are disabled") } diff --git a/netsync/manager_test.go b/netsync/manager_test.go new file mode 100644 index 0000000000..becf1a7e99 --- /dev/null +++ b/netsync/manager_test.go @@ -0,0 +1,1215 @@ +package netsync + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "math" + "os" + "path/filepath" + "testing" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/database" + _ "github.com/btcsuite/btcd/database/ffldb" + "github.com/btcsuite/btcd/mempool" + "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +// The package-level log variable is nil by default. Set it to the +// disabled logger so that log calls in the sync manager don't panic. +func init() { + DisableLog() +} + +// noopPeerNotifier is a no-op implementation of PeerNotifier for tests. +type noopPeerNotifier struct{} + +func (noopPeerNotifier) AnnounceNewTransactions([]*mempool.TxDesc) {} +func (noopPeerNotifier) UpdatePeerHeights(*chainhash.Hash, int32, *peer.Peer) {} +func (noopPeerNotifier) RelayInventory(*wire.InvVect, interface{}) {} +func (noopPeerNotifier) TransactionConfirmed(*btcutil.Tx) {} + +// dbSetup is used to create a new db with the genesis block already inserted. +// It returns a teardown function the caller should invoke when done testing to +// clean up. The database is stored under t.TempDir() which is automatically +// removed when the test finishes. +func dbSetup(t *testing.T, params *chaincfg.Params) (database.DB, func(), error) { + dbPath := filepath.Join(t.TempDir(), "ffldb") + db, err := database.Create("ffldb", dbPath, params.Net) + if err != nil { + return nil, nil, fmt.Errorf("error creating db: %v", err) + } + + teardown := func() { + db.Close() + } + + return db, teardown, nil +} + +// chainSetup is used to create a new db and chain instance with the genesis +// block already inserted. In addition to the new chain instance, it returns +// a teardown function the caller should invoke when done testing to clean up. +func chainSetup(t *testing.T, params *chaincfg.Params) ( + *blockchain.BlockChain, func(), error) { + + db, teardown, err := dbSetup(t, params) + if err != nil { + return nil, nil, err + } + + // Copy the chain params to ensure any modifications the tests do to + // the chain parameters do not affect the global instance. + paramsCopy := *params + + // Deep-copy deployment starters/enders so that parallel tests don't + // race on the shared blockClock field written by SynchronizeClock. + for i := range paramsCopy.Deployments { + d := ¶msCopy.Deployments[i] + if s, ok := d.DeploymentStarter.(*chaincfg.MedianTimeDeploymentStarter); ok { + d.DeploymentStarter = chaincfg.NewMedianTimeDeploymentStarter( + s.StartTime()) + } + if e, ok := d.DeploymentEnder.(*chaincfg.MedianTimeDeploymentEnder); ok { + d.DeploymentEnder = chaincfg.NewMedianTimeDeploymentEnder( + e.EndTime()) + } + } + + // Create the main chain instance. + chain, err := blockchain.New(&blockchain.Config{ + DB: db, + Checkpoints: paramsCopy.Checkpoints, + ChainParams: ¶msCopy, + TimeSource: blockchain.NewMedianTime(), + SigCache: txscript.NewSigCache(1000), + }) + if err != nil { + teardown() + err := fmt.Errorf("failed to create chain instance: %v", err) + return nil, nil, err + } + return chain, teardown, nil +} + +// loadHeaders loads headers from mainnet from 1 to 11. +func loadHeaders(t *testing.T) []*wire.BlockHeader { + testFile := "blockheaders-mainnet-1-11.txt" + filename := filepath.Join("testdata/", testFile) + + file, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + + headers := make([]*wire.BlockHeader, 0, 10) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + b, err := hex.DecodeString(line) + if err != nil { + t.Fatalf("failed to read block headers from file %v", testFile) + } + + r := bytes.NewReader(b) + header := new(wire.BlockHeader) + header.Deserialize(r) + + headers = append(headers, header) + } + + return headers +} + +func makeMockSyncManager(t *testing.T, + params *chaincfg.Params) (*SyncManager, func()) { + + t.Helper() + + chain, tearDown, err := chainSetup(t, params) + require.NoError(t, err) + + sm, err := New(&Config{ + PeerNotifier: noopPeerNotifier{}, + Chain: chain, + ChainParams: params, + }) + require.NoError(t, err) + + return sm, tearDown +} + +func TestCheckHeadersList(t *testing.T) { + // Set params to mainnet with a checkpoint at block 11. + params := chaincfg.MainNetParams + checkpointHeight := int32(11) + checkpointHash, err := chainhash.NewHashFromStr( + "0000000097be56d606cdd9c54b04d4747e957d3608abe69198c661f2add73073") + if err != nil { + t.Fatal(err) + } + params.Checkpoints = []chaincfg.Checkpoint{ + { + Height: checkpointHeight, + Hash: checkpointHash, + }, + } + + // Create mock SyncManager. + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + // Setup SyncManager with headers processed. + headers := loadHeaders(t) + for _, header := range headers { + isMainChain, err := sm.chain.ProcessBlockHeader( + header, blockchain.BFNone, false) + if err != nil { + t.Fatal(err) + } + + if !isMainChain { + t.Fatalf("expected block header %v to be in the main chain", + header.BlockHash()) + } + } + + tests := []struct { + hash string + isCheckpointBlock bool + behaviorFlags blockchain.BehaviorFlags + }{ + { + hash: chaincfg.MainNetParams.GenesisHash.String(), + isCheckpointBlock: false, + behaviorFlags: blockchain.BFFastAdd, + }, + { + // Block 10. + hash: "000000002c05cc2e78923c34df87fd108b22221ac6076c18f3ade378a4d915e9", + isCheckpointBlock: false, + behaviorFlags: blockchain.BFFastAdd, + }, + { + // Block 11. + hash: "0000000097be56d606cdd9c54b04d4747e957d3608abe69198c661f2add73073", + isCheckpointBlock: true, + behaviorFlags: blockchain.BFFastAdd, + }, + { + // Block 12. + hash: "0000000027c2488e2510d1acf4369787784fa20ee084c258b58d9fbd43802b5e", + isCheckpointBlock: false, + behaviorFlags: blockchain.BFNone, + }, + } + + for _, test := range tests { + hash, err := chainhash.NewHashFromStr(test.hash) + if err != nil { + t.Errorf("NewHashFromStr: %v", err) + continue + } + + // Make sure that when the ibd mode is off, we always get + // false and BFNone. + sm.ibdMode = false + isCheckpoint, gotFlags := sm.checkHeadersList(hash) + require.Equal(t, false, isCheckpoint) + require.Equal(t, blockchain.BFNone, gotFlags) + + // Now check that the test values are correct. + sm.ibdMode = true + isCheckpoint, gotFlags = sm.checkHeadersList(hash) + require.Equal(t, test.isCheckpointBlock, isCheckpoint) + require.Equal(t, test.behaviorFlags, gotFlags) + } +} + +func TestFetchHigherPeers(t *testing.T) { + // Create mock SyncManager. + sm, tearDown := makeMockSyncManager(t, &chaincfg.MainNetParams) + defer tearDown() + + tests := []struct { + peerHeights []int32 + peerSyncCandidate []bool + height int32 + expectedCnt int + }{ + { + peerHeights: []int32{9, 10, 10, 10}, + peerSyncCandidate: []bool{true, true, true, true}, + height: 5, + expectedCnt: 4, + }, + + { + peerHeights: []int32{9, 10, 10, 10}, + peerSyncCandidate: []bool{false, false, true, true}, + height: 5, + expectedCnt: 2, + }, + + { + peerHeights: []int32{1, 100, 100, 100, 100}, + peerSyncCandidate: []bool{true, false, true, true, false}, + height: 100, + expectedCnt: 0, + }, + } + + for _, test := range tests { + // Setup peers. + sm.peerStates = make(map[*peer.Peer]*peerSyncState) + for i, height := range test.peerHeights { + peer := peer.NewInboundPeer(&peer.Config{}) + peer.UpdateLastBlockHeight(height) + sm.peerStates[peer] = &peerSyncState{ + syncCandidate: test.peerSyncCandidate[i], + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + } + + // Fetch higher peers and assert. + peers := sm.fetchHigherPeers(test.height) + require.Equal(t, test.expectedCnt, len(peers)) + + for _, peer := range peers { + state, found := sm.peerStates[peer] + require.True(t, found) + require.True(t, state.syncCandidate) + } + } +} + +// mockTimeSource is used to trick the BlockChain instance to think that we're +// in the past. This is so that we can force it to return true for isCurrent(). +type mockTimeSource struct { + adjustedTime time.Time +} + +// AdjustedTime returns the internal adjustedTime. +// +// Part of the MedianTimeSource interface implementation. +func (m *mockTimeSource) AdjustedTime() time.Time { + return m.adjustedTime +} + +// AddTimeSample isn't relevant so we just leave it as emtpy. +// +// Part of the MedianTimeSource interface implementation. +func (m *mockTimeSource) AddTimeSample(id string, timeVal time.Time) { + // purposely left empty +} + +// Offset isn't relevant so we just return 0. +// +// Part of the MedianTimeSource interface implementation. +func (m *mockTimeSource) Offset() time.Duration { + return 0 +} + +// TestBuildBlockRequestSkipsInflightBlocks verifies that buildBlockRequest +// does not re-request blocks that are already in sm.requestedBlocks. When +// the pipeline refill threshold triggers fetchHeaderBlocks while blocks are +// still in-flight, re-requesting them causes the peer to send duplicates. +// The first copy gets processed (removing the hash from requestedBlocks), +// and the second copy then arrives as "unrequested", disconnecting the peer. +func TestBuildBlockRequestSkipsInflightBlocks(t *testing.T) { + tests := []struct { + name string + // inflightHeights are the block heights already in + // requestedBlocks before calling buildBlockRequest. + inflightHeights []int32 + // wantRequestedHeights are the block heights that should + // appear in the returned getdata message. + wantRequestedHeights []int32 + }{ + { + name: "no blocks in-flight requests all", + inflightHeights: nil, + wantRequestedHeights: []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + { + name: "all blocks in-flight requests none", + inflightHeights: []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + wantRequestedHeights: nil, + }, + { + name: "first 5 in-flight requests remaining 6", + inflightHeights: []int32{1, 2, 3, 4, 5}, + wantRequestedHeights: []int32{6, 7, 8, 9, 10, 11}, + }, + { + name: "last 6 in-flight requests first 5", + inflightHeights: []int32{6, 7, 8, 9, 10, 11}, + wantRequestedHeights: []int32{1, 2, 3, 4, 5}, + }, + { + name: "scattered in-flight requests gaps", + inflightHeights: []int32{2, 4, 6, 8, 10}, + wantRequestedHeights: []int32{1, 3, 5, 7, 9, 11}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + params := chaincfg.MainNetParams + params.Checkpoints = nil + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + // Process headers 1-11 so the header chain is + // ahead of the block chain. + headers := loadHeaders(t) + for _, header := range headers { + _, err := sm.chain.ProcessBlockHeader( + header, blockchain.BFNone, false) + require.NoError(t, err) + } + + // Set up a disconnected peer as syncPeer. + syncPeer := peer.NewInboundPeer(&peer.Config{}) + sm.syncPeer = syncPeer + syncPeerState := &peerSyncState{ + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + sm.peerStates[syncPeer] = syncPeerState + + // Pre-populate in-flight blocks. + for _, h := range tc.inflightHeights { + hash, err := sm.chain.HeaderHashByHeight(h) + require.NoError(t, err) + sm.requestedBlocks[*hash] = struct{}{} + syncPeerState.requestedBlocks[*hash] = struct{}{} + } + + gdmsg := sm.buildBlockRequest(syncPeer) + + // Collect the hashes from the getdata message. + got := make(map[chainhash.Hash]struct{}, len(gdmsg.InvList)) + for _, iv := range gdmsg.InvList { + got[iv.Hash] = struct{}{} + } + + require.Equal(t, len(tc.wantRequestedHeights), len(gdmsg.InvList)) + for _, h := range tc.wantRequestedHeights { + hash, err := sm.chain.HeaderHashByHeight(h) + require.NoError(t, err) + require.Contains(t, got, *hash, + "block at height %d should be requested", h) + } + for _, h := range tc.inflightHeights { + hash, err := sm.chain.HeaderHashByHeight(h) + require.NoError(t, err) + require.NotContains(t, got, *hash, + "in-flight block at height %d should not be re-requested", h) + } + }) + } +} + +func TestIsInIBDMode(t *testing.T) { + tests := []struct { + peerState map[*peer.Peer]*peerSyncState + params *chaincfg.Params + timesource *mockTimeSource + isIBDMode bool + }{ + // Is not current, higher peers. + { + params: &chaincfg.MainNetParams, + peerState: func() map[*peer.Peer]*peerSyncState { + ps := make(map[*peer.Peer]*peerSyncState) + peer := peer.NewInboundPeer(&peer.Config{}) + peer.UpdateLastBlockHeight(900_000) + ps[peer] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + return ps + }(), + timesource: nil, + isIBDMode: true, + }, + // Is not current, no higher peers. + { + params: &chaincfg.MainNetParams, + peerState: func() map[*peer.Peer]*peerSyncState { + ps := make(map[*peer.Peer]*peerSyncState) + peer := peer.NewInboundPeer(&peer.Config{}) + peer.UpdateLastBlockHeight(0) + ps[peer] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + return ps + }(), + timesource: nil, + isIBDMode: true, + }, + // Is current, higher peers. + { + params: func() *chaincfg.Params { + params := chaincfg.MainNetParams + params.Checkpoints = nil + return ¶ms + }(), + peerState: func() map[*peer.Peer]*peerSyncState { + ps := make(map[*peer.Peer]*peerSyncState) + peer := peer.NewInboundPeer(&peer.Config{}) + peer.UpdateLastBlockHeight(900_000) + ps[peer] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + return ps + }(), + timesource: &mockTimeSource{ + chaincfg.MainNetParams.GenesisBlock.Header.Timestamp, + }, + isIBDMode: true, + }, + // Is current, no higher peers. + { + params: func() *chaincfg.Params { + params := chaincfg.MainNetParams + params.Checkpoints = nil + return ¶ms + }(), + peerState: func() map[*peer.Peer]*peerSyncState { + ps := make(map[*peer.Peer]*peerSyncState) + peer := peer.NewInboundPeer(&peer.Config{}) + peer.UpdateLastBlockHeight(0) + ps[peer] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + return ps + }(), + timesource: &mockTimeSource{ + chaincfg.MainNetParams.GenesisBlock.Header.Timestamp, + }, + isIBDMode: false, + }, + } + + for _, test := range tests { + db, tearDown, err := dbSetup(t, test.params) + if err != nil { + tearDown() + t.Fatal(err) + } + + timesource := blockchain.NewMedianTime() + if test.timesource != nil { + timesource = test.timesource + } + + // Create the main chain instance. + chain, err := blockchain.New(&blockchain.Config{ + DB: db, + Checkpoints: test.params.Checkpoints, + ChainParams: test.params, + TimeSource: timesource, + SigCache: txscript.NewSigCache(1000), + }) + if err != nil { + tearDown() + t.Fatal(err) + } + sm, err := New(&Config{ + Chain: chain, + ChainParams: test.params, + }) + if err != nil { + tearDown() + t.Fatal(err) + } + + // Run test and assert. + sm.peerStates = test.peerState + got := sm.isInIBDMode() + require.Equal(t, test.isIBDMode, got) + tearDown() + } +} + +// createTestCoinbase creates a minimal coinbase transaction for the given +// block height. The signature script encodes the height to ensure unique +// transaction hashes across blocks. +func createTestCoinbase(height int32, params *chaincfg.Params) *wire.MsgTx { + tx := wire.NewMsgTx(wire.TxVersion) + + // Push the height as data to guarantee unique txids per block. + sigScript := []byte{ + 0x04, + byte(height), byte(height >> 8), + byte(height >> 16), byte(height >> 24), + } + + tx.AddTxIn(&wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: wire.MaxPrevOutIndex, + }, + SignatureScript: sigScript, + Sequence: wire.MaxTxInSequenceNum, + }) + + tx.AddTxOut(&wire.TxOut{ + Value: blockchain.CalcBlockSubsidy(height, params), + PkScript: []byte{txscript.OP_TRUE}, + }) + + return tx +} + +// solveTestBlock finds a nonce that satisfies the proof of work for the given +// header. With regression test parameters the difficulty is minimal and a +// solution is found almost immediately. +func solveTestBlock(header *wire.BlockHeader, params *chaincfg.Params) bool { + target := blockchain.CompactToBig(params.PowLimitBits) + for nonce := uint32(0); nonce < math.MaxUint32; nonce++ { + header.Nonce = nonce + hash := header.BlockHash() + if blockchain.HashToBig(&hash).Cmp(target) <= 0 { + return true + } + } + + return false +} + +// generateTestBlocks creates count valid blocks chaining from the genesis +// block of the given params. Each block contains only a coinbase transaction. +func generateTestBlocks( + t *testing.T, params *chaincfg.Params, count int) []*btcutil.Block { + + t.Helper() + + blocks := make([]*btcutil.Block, 0, count) + prevHash := params.GenesisHash + prevTime := params.GenesisBlock.Header.Timestamp + + for h := int32(1); h <= int32(count); h++ { + cb := createTestCoinbase(h, params) + merkleRoot := cb.TxHash() + + header := wire.BlockHeader{ + Version: 1, + PrevBlock: *prevHash, + MerkleRoot: merkleRoot, + Timestamp: prevTime.Add(time.Minute), + Bits: params.PowLimitBits, + } + require.True(t, solveTestBlock(&header, params), + "failed to solve block at height %d", h) + + msgBlock := &wire.MsgBlock{ + Header: header, + Transactions: []*wire.MsgTx{cb}, + } + block := btcutil.NewBlock(msgBlock) + blocks = append(blocks, block) + + bh := block.Hash() + prevHash = bh + prevTime = header.Timestamp + } + + return blocks +} + +// TestSyncStateMachine exercises the end-to-end IBD sync flow: +// +// ┌→ startSync +// │ ↓ +// │ fetchHeaders +// │ ↓ +// │ handleHeadersMsg +// │ ↓ +// │ fetchHeaderBlocks ←┐ +// │ ↓ │ (refill) +// │ handleBlockMsg ────┘──→ IBD complete +// │ +// │ (stall detected at any phase above) +// │ ↓ +// │ handleStallSample +// │ ↓ +// └── handleDonePeerMsg +// +// It verifies that header processing transitions to block download, that the +// pipeline refill path in handleBlockMsg is exercised, and that IBD mode is +// properly cleared once the chain catches up to the best header. +// +// The "fresh ibd" case tests a complete sync from genesis: headers are fetched +// and then blocks are downloaded. +// +// The "stall before any headers" and "stall mid header download" cases test +// recovery when the sync peer stalls during header download. A replacement +// peer delivers the remaining (or all) headers and then all blocks. +// +// The "headers complete peer stalls on blocks" case tests recovery when the +// sync peer delivers all headers but stalls before sending any blocks; a +// replacement peer downloads all blocks. +// +// The "stalled sync peer recovery" case tests recovery mid-block-download: a +// sync peer stops responding after some blocks, handleStallSample detects the +// inactivity, the stalled peer is disconnected, and a replacement peer +// finishes IBD. +// +// The "stall mid headers then stall on blocks" case combines both failure +// modes: one peer stalls during headers (peer 2 takes over and finishes +// headers), then peer 2 stalls during block download (peer 3 finishes blocks). +// This exercises recovery across three distinct peers. +func TestSyncStateMachine(t *testing.T) { + t.Parallel() + + const testTotalBlocks = 2 * minInFlightBlocks + + tests := []struct { + name string + totalBlocks int + + // stallHeadersAfter, when >= 0, triggers a stall during + // header download: deliver this many headers, then stall + // the sync peer and verify a replacement finishes header + // download. Set to -1 for no header stall. + stallHeadersAfter int + + // stallAfter, when >= 0, triggers a stall during block + // download: deliver all headers, then process this many + // blocks before stalling. Set to -1 for no block stall. + stallAfter int + }{ + { + name: "fresh ibd", + totalBlocks: testTotalBlocks, + stallHeadersAfter: -1, + stallAfter: -1, + }, + { + name: "stall before any headers", + totalBlocks: testTotalBlocks, + stallHeadersAfter: 0, + stallAfter: -1, + }, + { + name: "stall mid header download", + totalBlocks: testTotalBlocks, + stallHeadersAfter: testTotalBlocks / 2, + stallAfter: -1, + }, + { + name: "headers complete peer stalls on blocks", + totalBlocks: testTotalBlocks, + stallHeadersAfter: -1, + stallAfter: 0, + }, + { + name: "stalled sync peer recovery", + totalBlocks: testTotalBlocks, + stallHeadersAfter: -1, + stallAfter: 5, + }, + { + name: "stall mid headers then stall on blocks", + totalBlocks: testTotalBlocks, + stallHeadersAfter: testTotalBlocks / 2, + stallAfter: 5, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + params := chaincfg.RegressionNetParams + params.Checkpoints = nil + + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + blocks := generateTestBlocks(t, ¶ms, tc.totalBlocks) + + // Register a sync candidate and call startSync, + // which activates IBD mode and sends getheaders. + peer1 := startIBD(t, sm, tc.totalBlocks) + + if tc.stallHeadersAfter >= 0 { + // Stall during header download; + // replacement sends remaining headers. + peer2 := newSyncCandidate(t, sm, + int32(tc.totalBlocks)) + syncStalledHeaderRecovery( + t, sm, peer1, peer2, + blocks, tc.stallHeadersAfter, + tc.totalBlocks, + ) + + if tc.stallAfter >= 0 { + peer3 := newSyncCandidate(t, sm, + int32(tc.totalBlocks)) + syncStalledPeerRecovery( + t, sm, peer2, + peer3, blocks, + tc.stallAfter, + tc.totalBlocks, + ) + } else { + syncProcessBlocks(t, sm, + peer2, blocks, + tc.totalBlocks) + } + } else { + syncSendHeaders(t, sm, peer1, + blocks, tc.totalBlocks) + + if tc.stallAfter >= 0 { + peer2 := newSyncCandidate(t, sm, + int32(tc.totalBlocks)) + syncStalledPeerRecovery( + t, sm, peer1, + peer2, blocks, + tc.stallAfter, + tc.totalBlocks, + ) + } else { + syncProcessBlocks(t, sm, + peer1, blocks, + tc.totalBlocks) + } + } + }) + } +} + +// newSyncCandidate creates and registers a sync-candidate peer at the +// given height without triggering startSync. +func newSyncCandidate(t *testing.T, sm *SyncManager, + height int32) *peer.Peer { + + t.Helper() + + p := peer.NewInboundPeer(&peer.Config{ + ChainParams: sm.chainParams, + }) + p.UpdateLastBlockHeight(height) + sm.peerStates[p] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + return p +} + +// assertIBDComplete verifies that IBD finished: chain height matches +// totalBlocks, ibdMode is off, and no blocks remain in-flight. +func assertIBDComplete(t *testing.T, sm *SyncManager, + peerState *peerSyncState, totalBlocks int) { + + t.Helper() + + best := sm.chain.BestSnapshot() + require.Equal(t, int32(totalBlocks), best.Height) + require.False(t, sm.ibdMode, + "ibdMode should be off after catching up") + require.Empty(t, sm.requestedBlocks, + "all requested blocks should be fulfilled") + require.Empty(t, peerState.requestedBlocks, + "peer should have no outstanding block requests") +} + +// startIBD registers a sync peer and calls startSync, verifying that IBD +// mode is activated and the peer is selected. +func startIBD(t *testing.T, sm *SyncManager, + peerHeight int) *peer.Peer { + + t.Helper() + + syncPeer := newSyncCandidate(t, sm, int32(peerHeight)) + + sm.startSync() + + require.True(t, sm.syncPeer == syncPeer, + "syncPeer should be set after startSync") + require.True(t, sm.ibdMode, "ibdMode should be on") + require.False(t, sm.lastProgressTime.IsZero(), + "lastProgressTime should be set") + + return syncPeer +} + +// syncSendHeaders delivers block headers to the sync manager and verifies +// that block requests are generated. +func syncSendHeaders(t *testing.T, sm *SyncManager, + syncPeer *peer.Peer, blocks []*btcutil.Block, totalBlocks int) { + + t.Helper() + + // Record the progress time set by startIBD so we can verify + // that handleHeadersMsg advances it. + progressBefore := sm.lastProgressTime + + headers := wire.NewMsgHeaders() + for _, block := range blocks { + err := headers.AddBlockHeader(&block.MsgBlock().Header) + require.NoError(t, err) + } + + sm.handleHeadersMsg(&headersMsg{ + headers: headers, + peer: syncPeer, + }) + + _, bestHeaderHeight := sm.chain.BestHeader() + require.Equal(t, int32(totalBlocks), bestHeaderHeight) + + require.True(t, sm.lastProgressTime.After(progressBefore), + "handleHeadersMsg should update lastProgressTime") + + wantRequested := make(map[chainhash.Hash]struct{}, len(blocks)) + for _, block := range blocks { + wantRequested[*block.Hash()] = struct{}{} + } + require.Equal(t, wantRequested, sm.requestedBlocks) + require.Equal(t, wantRequested, sm.peerStates[syncPeer].requestedBlocks) +} + +// syncProcessBlocks feeds all blocks to handleBlockMsg and verifies that IBD +// mode remains active until the final block, at which point IBD completes. +func syncProcessBlocks(t *testing.T, sm *SyncManager, syncPeer *peer.Peer, + blocks []*btcutil.Block, totalBlocks int) { + + t.Helper() + + peerState := sm.peerStates[syncPeer] + + for i, block := range blocks { + sm.handleBlockMsg(&blockMsg{ + block: block, + peer: syncPeer, + reply: make(chan struct{}, 1), + }) + + if i < len(blocks)-1 { + require.True(t, sm.ibdMode, + "ibdMode should still be on at height %d", i+1) + } + } + + assertIBDComplete(t, sm, peerState, totalBlocks) +} + +// syncStalledPeerRecovery processes stallAfter blocks from stalledPeer, +// triggers stall detection, verifies that stalledPeer is removed and +// replacementPeer takes over, then feeds remaining blocks and verifies +// IBD completes. +func syncStalledPeerRecovery(t *testing.T, sm *SyncManager, + stalledPeer, replacementPeer *peer.Peer, + blocks []*btcutil.Block, stallAfter, totalBlocks int) { + + t.Helper() + + // Process the first stallAfter blocks from the stalled peer. + for _, block := range blocks[:stallAfter] { + sm.handleBlockMsg(&blockMsg{ + block: block, + peer: stalledPeer, + reply: make(chan struct{}, 1), + }) + } + + best := sm.chain.BestSnapshot() + require.Equal(t, int32(stallAfter), best.Height) + require.True(t, sm.ibdMode) + + // Trigger stall detection. + sm.lastProgressTime = time.Now().Add( + -(maxStallDuration + time.Minute)) + sm.handleStallSample() + + // Verify that handleStallSample called Disconnect() on the + // stalled peer (which closes p.quit, making WaitForDisconnect + // return immediately). + disconnected := make(chan struct{}) + go func() { + stalledPeer.WaitForDisconnect() + close(disconnected) + }() + select { + case <-disconnected: + case <-time.After(time.Second): + t.Fatal("Disconnect() was not called on stalled peer") + } + + // Snapshot the stalled peer's outstanding requested blocks before + // disconnection so we can verify they are cleaned up. + stalledState := sm.peerStates[stalledPeer] + stalledRequested := make([]chainhash.Hash, 0, len(stalledState.requestedBlocks)) + for hash := range stalledState.requestedBlocks { + stalledRequested = append(stalledRequested, hash) + } + require.NotEmpty(t, stalledRequested, + "stalled peer should have outstanding requested blocks") + + // In production, Disconnect() triggers handleDonePeerMsg + // asynchronously via the peer goroutine. Call it directly to + // complete the removal. Note: handleDonePeerMsg first clears the + // stalled peer's requested blocks from the global map via + // clearRequestedState, then updateSyncPeer → startSync immediately + // re-requests them for the replacement peer. + sm.handleDonePeerMsg(stalledPeer) + + _, stalledTracked := sm.peerStates[stalledPeer] + require.False(t, stalledTracked, + "stalled peer should be removed") + require.True(t, sm.syncPeer == replacementPeer, + "replacement peer should take over as sync peer") + require.True(t, sm.ibdMode) + + // Verify that the replacement peer re-requested the exact same + // blocks that were outstanding from the stalled peer. + replacementState := sm.peerStates[replacementPeer] + require.Equal(t, len(stalledRequested), + len(replacementState.requestedBlocks), + "replacement peer should request same number of blocks") + for _, hash := range stalledRequested { + _, exists := replacementState.requestedBlocks[hash] + require.True(t, exists, + "block %v should be requested from replacement peer", + hash) + } + + // Feed remaining blocks from the replacement peer. + for _, block := range blocks[stallAfter:] { + sm.handleBlockMsg(&blockMsg{ + block: block, + peer: replacementPeer, + reply: make(chan struct{}, 1), + }) + } + + assertIBDComplete(t, sm, replacementState, totalBlocks) +} + +// syncStalledHeaderRecovery simulates a stall during header download. +// It optionally delivers headersSent headers from stalledPeer, triggers stall +// detection, verifies that stalledPeer is removed and replacementPeer takes +// over, then delivers remaining headers and verifies block requests are +// generated. The caller is responsible for the block-download phase. +func syncStalledHeaderRecovery(t *testing.T, sm *SyncManager, + stalledPeer, replacementPeer *peer.Peer, + blocks []*btcutil.Block, headersSent, totalBlocks int) { + + t.Helper() + + // Deliver partial headers from the stalled peer. When + // headersSent is 0, this is a no-op (peer stalls immediately). + if headersSent > 0 { + headers := wire.NewMsgHeaders() + for _, block := range blocks[:headersSent] { + err := headers.AddBlockHeader( + &block.MsgBlock().Header) + require.NoError(t, err) + } + + sm.handleHeadersMsg(&headersMsg{ + headers: headers, + peer: stalledPeer, + }) + + _, bestHeaderHeight := sm.chain.BestHeader() + require.Equal(t, int32(headersSent), bestHeaderHeight) + } + + // No blocks should have been requested during header download + // since the headers haven't caught up to the peer's height yet. + require.Empty(t, sm.requestedBlocks, + "no blocks should be requested during header download") + + // Trigger stall detection. + sm.lastProgressTime = time.Now().Add( + -(maxStallDuration + time.Minute)) + sm.handleStallSample() + + // Verify that handleStallSample called Disconnect() on the + // stalled peer. + disconnected := make(chan struct{}) + go func() { + stalledPeer.WaitForDisconnect() + close(disconnected) + }() + select { + case <-disconnected: + case <-time.After(time.Second): + t.Fatal("Disconnect() was not called on stalled peer") + } + + // Complete peer removal. handleDonePeerMsg clears state and + // triggers startSync which selects the replacement peer. + sm.handleDonePeerMsg(stalledPeer) + + _, stalledTracked := sm.peerStates[stalledPeer] + require.False(t, stalledTracked, + "stalled peer should be removed") + require.True(t, sm.syncPeer == replacementPeer, + "replacement peer should take over as sync peer") + require.True(t, sm.ibdMode) + + // Deliver remaining headers from the replacement peer. When + // headersSent is 0, this is all headers. + remainingHeaders := wire.NewMsgHeaders() + for _, block := range blocks[headersSent:] { + err := remainingHeaders.AddBlockHeader( + &block.MsgBlock().Header) + require.NoError(t, err) + } + sm.handleHeadersMsg(&headersMsg{ + headers: remainingHeaders, + peer: replacementPeer, + }) + + _, bestHeaderHeight := sm.chain.BestHeader() + require.Equal(t, int32(totalBlocks), bestHeaderHeight) + + // Verify all blocks were requested from the replacement. + wantRequested := make(map[chainhash.Hash]struct{}, len(blocks)) + for _, block := range blocks { + wantRequested[*block.Hash()] = struct{}{} + } + require.Equal(t, wantRequested, sm.requestedBlocks) + replacementState := sm.peerStates[replacementPeer] + require.Equal(t, wantRequested, replacementState.requestedBlocks) +} + +// TestStartSyncBlockFallback verifies the startSync fallback path where +// headers are already caught up but the block chain lags behind. In this +// case startSync should skip header download and directly request blocks. +func TestStartSyncBlockFallback(t *testing.T) { + t.Parallel() + + params := chaincfg.RegressionNetParams + params.Checkpoints = nil + + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + // Process headers so the header chain is at numBlocks while the + // block chain stays at genesis. + const numBlocks = 11 + blocks := generateTestBlocks(t, ¶ms, numBlocks) + for _, block := range blocks { + _, err := sm.chain.ProcessBlockHeader( + &block.MsgBlock().Header, blockchain.BFNone, false) + require.NoError(t, err) + } + + // Add a peer whose height equals the header height. + // fetchHigherPeers(bestHeaderHeight) returns nothing because + // the peer is not strictly higher than our headers. + // fetchHigherPeers(bestBlockHeight=0) returns the peer. + syncPeer := peer.NewInboundPeer(&peer.Config{}) + syncPeer.UpdateLastBlockHeight(int32(numBlocks)) + sm.peerStates[syncPeer] = &peerSyncState{ + syncCandidate: true, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } + + sm.startSync() + + require.NotNil(t, sm.syncPeer, + "sync peer should be set for block download") + require.NotEmpty(t, sm.requestedBlocks, + "blocks should be requested via fetchHeaderBlocks") +} + +// TestStallNoDisconnectAtSameHeight verifies that handleStallSample does +// not disconnect a sync peer whose advertised height equals our own. +func TestStallNoDisconnectAtSameHeight(t *testing.T) { + t.Parallel() + + params := chaincfg.RegressionNetParams + params.Checkpoints = nil + + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + p := peer.NewInboundPeer(&peer.Config{}) + p.UpdateLastBlockHeight(0) // Same height as our genesis chain. + sm.peerStates[p] = &peerSyncState{} + sm.syncPeer = p + sm.ibdMode = true + sm.lastProgressTime = time.Now().Add( + -(maxStallDuration + time.Minute)) + + sm.handleStallSample() + + _, tracked := sm.peerStates[p] + require.True(t, tracked, + "peer at same height should not be disconnected") + require.Nil(t, sm.syncPeer, + "we should have nil syncPeer after handleStallSample") +} + +// TestStartSyncChainCurrent verifies that startSync does not set syncPeer +// or ibdMode when the chain is current and no peer is strictly higher. +// isInIBDMode sees IsCurrent()==true with no higher peers, returns false, +// and startSync exits immediately. +func TestStartSyncChainCurrent(t *testing.T) { + t.Parallel() + + params := chaincfg.RegressionNetParams + params.Checkpoints = nil + + sm, tearDown := makeMockSyncManager(t, ¶ms) + defer tearDown() + + // Mine a single block with a recent timestamp so + // IsCurrent() returns true. + cb := createTestCoinbase(1, ¶ms) + header := wire.BlockHeader{ + Version: 1, + PrevBlock: *params.GenesisHash, + MerkleRoot: cb.TxHash(), + Timestamp: time.Now().Truncate(time.Second), + Bits: params.PowLimitBits, + } + require.True(t, solveTestBlock(&header, ¶ms)) + + block := btcutil.NewBlock(&wire.MsgBlock{ + Header: header, + Transactions: []*wire.MsgTx{cb}, + }) + _, _, err := sm.chain.ProcessBlock(block, blockchain.BFNone) + require.NoError(t, err) + require.True(t, sm.chain.IsCurrent()) + + // Peer at our height — not higher. + newSyncCandidate(t, sm, 1) + + sm.startSync() + + require.Nil(t, sm.syncPeer, + "syncPeer should not be set when chain is already current") + require.False(t, sm.ibdMode, + "ibdMode should not be activated when chain is already current") +} diff --git a/netsync/testdata/blockheaders-mainnet-1-11.txt b/netsync/testdata/blockheaders-mainnet-1-11.txt new file mode 100644 index 0000000000..27b5f18cfc --- /dev/null +++ b/netsync/testdata/blockheaders-mainnet-1-11.txt @@ -0,0 +1,11 @@ +010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e36299 +010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd61 +01000000bddd99ccfda39da1b108ce1a5d70038d0a967bacb68b6b63065f626a0000000044f672226090d85db9a9f2fbfe5f0f9609b387af7be5b7fbb7a1767c831c9e995dbe6649ffff001d05e0ed6d +010000004944469562ae1c2c74d9a535e00b6f3e40ffbad4f2fda3895501b582000000007a06ea98cd40ba2e3288262b28638cec5337c1456aaf5eedc8e9e5a20f062bdf8cc16649ffff001d2bfee0a9 +0100000085144a84488ea88d221c8bd6c059da090e88f8a2c99690ee55dbba4e00000000e11c48fecdd9e72510ca84f023370c9a38bf91ac5cae88019bee94d24528526344c36649ffff001d1d03e477 +01000000fc33f596f822a0a1951ffdbf2a897b095636ad871707bf5d3162729b00000000379dfb96a5ea8c81700ea4ac6b97ae9a9312b2d4301a29580e924ee6761a2520adc46649ffff001d189c4c97 +010000008d778fdc15a2d3fb76b7122a3b5582bea4f21f5a0c693537e7a03130000000003f674005103b42f984169c7d008370967e91920a6a5d64fd51282f75bc73a68af1c66649ffff001d39a59c86 +010000004494c8cf4154bdcc0720cd4a59d9c9b285e4b146d45f061d2b6c967100000000e3855ed886605b6d4a99d5fa2ef2e9b0b164e63df3c4136bebf2d0dac0f1f7a667c86649ffff001d1c4b5666 +01000000c60ddef1b7618ca2348a46e868afc26e3efc68226c78aa47f8488c4000000000c997a5e56e104102fa209c6a852dd90660a20b2d9c352423edce25857fcd37047fca6649ffff001d28404f53 +010000000508085c47cc849eb80ea905cc7800a3be674ffc57263cf210c59d8d00000000112ba175a1e04b14ba9e7ea5f76ab640affeef5ec98173ac9799a852fa39add320cd6649ffff001d1e2de565 +01000000e915d9a478e3adf3186c07c61a22228b10fd87df343c92782ecc052c000000006e06373c80de397406dc3d19c90d71d230058d28293614ea58d6a57f8f5d32f8b8ce6649ffff001d173807f8