diff --git a/vms/evm/database/blockdb/database.go b/vms/evm/database/blockdb/database.go new file mode 100644 index 000000000000..c5f54296c9ac --- /dev/null +++ b/vms/evm/database/blockdb/database.go @@ -0,0 +1,516 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +var ( + _ ethdb.Database = (*Database)(nil) + _ ethdb.Batch = (*batch)(nil) + + migratorDBPrefix = []byte("migrator") + + // blockDBMinHeightKey stores the minimum block height of the + // height-indexed block databases. + // It is set at initialization and cannot be changed without + // recreating the databases. + blockDBMinHeightKey = []byte("blockdb_min_height") + + errUnexpectedKey = errors.New("unexpected database key") + errNotInitialized = errors.New("database not initialized") + errAlreadyInitialized = errors.New("database already initialized") +) + +// Key prefixes for block data in [ethdb.Database]. +// This is copied from libevm because they are not exported. +// Since the prefixes should never be changed, we can avoid libevm changes by +// duplicating them here. +var ( + evmHeaderPrefix = []byte("h") + evmBlockBodyPrefix = []byte("b") + evmReceiptsPrefix = []byte("r") +) + +const ( + hashDataElements = 2 + blockNumberSize = 8 + blockHashSize = 32 + + headerDBName = "headerdb" + bodyDBName = "bodydb" + receiptsDBName = "receiptsdb" +) + +// Database wraps an [ethdb.Database] and routes block headers, bodies, and receipts +// to separate [database.HeightIndex] databases for blocks at or above the minimum height. +// All other data uses the underlying [ethdb.Database] directly. +type Database struct { + ethdb.Database + + // Databases + stateDB database.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + // Configuration + config heightindexdb.DatabaseConfig + dbPath string + minHeight uint64 + + migrator *migrator + heightDBsReady bool + + reg prometheus.Registerer + logger logging.Logger +} + +// New creates a new [Database] over the provided [ethdb.Database]. +// +// If allowDeferredInit is true and no minimum block height is known, +// New defers initializing the height-indexed block databases until +// [Database.InitBlockDBs] is called. +// +// The second return value is true if the block databases were initialized, +// and false if deferred. +func New( + stateDB database.Database, + evmDB ethdb.Database, + dbPath string, + allowDeferredInit bool, + config heightindexdb.DatabaseConfig, + logger logging.Logger, + reg prometheus.Registerer, +) (*Database, bool, error) { + db := &Database{ + stateDB: stateDB, + Database: evmDB, + dbPath: dbPath, + config: config, + reg: reg, + logger: logger, + } + + minHeight, ok, err := databaseMinHeight(db.stateDB) + if err != nil { + return nil, false, err + } + + // Databases already exist, load with existing min height. + if ok { + if err := db.InitBlockDBs(minHeight); err != nil { + return nil, false, err + } + return db, true, nil + } + + // Initialize using the minimum block height of existing blocks to migrate. + minHeight, ok, err = minBlockHeightToMigrate(evmDB) + if err != nil { + return nil, false, err + } + if ok { + if err := db.InitBlockDBs(minHeight); err != nil { + return nil, false, err + } + return db, true, nil + } + + // Initialize with min height 1 if deferred initialization is not allowed + // and no blocks exist to migrate. + if !allowDeferredInit { + if err := db.InitBlockDBs(1); err != nil { + return nil, false, err + } + return db, true, nil + } + + db.logger.Info( + "Deferring block database initialization until minimum height is known", + ) + return db, false, nil +} + +// InitBlockDBs initializes [database.HeightIndex] databases for blocks +// with the specified minimum height. +// Once initialized, the minimum height cannot be changed without recreating the +// databases. +// +// Returns an error if already initialized. +func (db *Database) InitBlockDBs(minHeight uint64) error { + if db.heightDBsReady { + return errAlreadyInitialized + } + + if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + return err + } + headerDB, err := db.newMeteredHeightDB(headerDBName, minHeight) + if err != nil { + return err + } + bodyDB, err := db.newMeteredHeightDB(bodyDBName, minHeight) + if err != nil { + return err + } + receiptsDB, err := db.newMeteredHeightDB(receiptsDBName, minHeight) + if err != nil { + return err + } + db.headerDB = headerDB + db.bodyDB = bodyDB + db.receiptsDB = receiptsDB + + if err := db.initMigrator(); err != nil { + return fmt.Errorf("failed to initialize migrator: %w", err) + } + + db.heightDBsReady = true + db.minHeight = minHeight + + db.logger.Info( + "Initialized height-indexed block databases", + zap.Uint64("minHeight", db.minHeight), + ) + + return nil +} + +// StartMigration begins the background migration of block data from the +// [ethdb.Database] to the height-indexed block databases. +// +// Returns an error if the databases are not initialized. +// No error if already running. +func (db *Database) StartMigration() error { + if !db.heightDBsReady { + return errNotInitialized + } + db.migrator.start() + return nil +} + +func (db *Database) Put(key []byte, value []byte) error { + if !db.useHeightIndexedDB(key) { + return db.Database.Put(key, value) + } + + heightDB, err := db.heightDBForKey(key) + if err != nil { + return err + } + num, hash, err := parseBlockKey(key) + if err != nil { + return err + } + return writeHashAndData(heightDB, num, hash, value) +} + +func (db *Database) Get(key []byte) ([]byte, error) { + if !db.useHeightIndexedDB(key) { + return db.Database.Get(key) + } + + heightDB, err := db.heightDBForKey(key) + if err != nil { + return nil, err + } + return readHashAndData(heightDB, db.Database, key, db.migrator) +} + +func (db *Database) Has(key []byte) (bool, error) { + if !db.useHeightIndexedDB(key) { + return db.Database.Has(key) + } + + _, err := db.Get(key) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +// Delete removes the key from the underlying database for non-block data. +// Block data deletion is a no-op because [database.HeightIndex] does not support deletion. +func (db *Database) Delete(key []byte) error { + if !db.useHeightIndexedDB(key) { + return db.Database.Delete(key) + } + + num, hash, err := parseBlockKey(key) + if err != nil { + return err + } + db.logger.Warn( + "Deleting block data is a no-op", + zap.Uint64("height", num), + zap.Stringer("hash", hash), + ) + return nil +} + +func (db *Database) Close() error { + if db.migrator != nil { + db.migrator.stop() + } + if db.heightDBsReady { + // Don't close stateDB since the caller should be managing it. + return errors.Join( + db.headerDB.Close(), + db.bodyDB.Close(), + db.receiptsDB.Close(), + db.Database.Close(), + ) + } + return db.Database.Close() +} + +func (db *Database) initMigrator() error { + if db.migrator != nil { + return nil + } + mdb := prefixdb.New(migratorDBPrefix, db.stateDB) + migrator, err := newMigrator( + mdb, + db.headerDB, + db.bodyDB, + db.receiptsDB, + db.Database, + db.logger, + ) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +func (db *Database) newMeteredHeightDB( + namespace string, + minHeight uint64, +) (database.HeightIndex, error) { + path := filepath.Join(db.dbPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + ndb, err := heightindexdb.New(config, db.logger) + if err != nil { + return nil, fmt.Errorf("failed to create %s database at %s: %w", namespace, path, err) + } + + mdb, err := meterdb.New(db.reg, namespace, ndb) + if err != nil { + return nil, errors.Join( + fmt.Errorf("failed to create metered %s database: %w", namespace, err), + ndb.Close(), + ) + } + + return mdb, nil +} + +func (db *Database) heightDBForKey(key []byte) (database.HeightIndex, error) { + switch { + case isHeaderKey(key): + return db.headerDB, nil + case isBodyKey(key): + return db.bodyDB, nil + case isReceiptsKey(key): + return db.receiptsDB, nil + default: + return nil, errUnexpectedKey + } +} + +func (db *Database) useHeightIndexedDB(key []byte) bool { + if !db.heightDBsReady { + return false + } + + var n int + switch { + case isBodyKey(key): + n = len(evmBlockBodyPrefix) + case isHeaderKey(key): + n = len(evmHeaderPrefix) + case isReceiptsKey(key): + n = len(evmReceiptsPrefix) + default: + return false + } + num := binary.BigEndian.Uint64(key[n : n+blockNumberSize]) + return num >= db.minHeight +} + +type batch struct { + ethdb.Batch + db *Database +} + +func (db *Database) NewBatch() ethdb.Batch { + return &batch{ + db: db, + Batch: db.Database.NewBatch(), + } +} + +func (db *Database) NewBatchWithSize(size int) ethdb.Batch { + return &batch{ + db: db, + Batch: db.Database.NewBatchWithSize(size), + } +} + +func (b *batch) Put(key []byte, value []byte) error { + if !b.db.useHeightIndexedDB(key) { + return b.Batch.Put(key, value) + } + return b.db.Put(key, value) +} + +func (b *batch) Delete(key []byte) error { + if !b.db.useHeightIndexedDB(key) { + return b.Batch.Delete(key) + } + return b.db.Delete(key) +} + +func parseBlockKey(key []byte) (num uint64, hash common.Hash, err error) { + var n int + switch { + case isBodyKey(key): + n = len(evmBlockBodyPrefix) + case isHeaderKey(key): + n = len(evmHeaderPrefix) + case isReceiptsKey(key): + n = len(evmReceiptsPrefix) + default: + return 0, common.Hash{}, errUnexpectedKey + } + num = binary.BigEndian.Uint64(key[n : n+blockNumberSize]) + hash = common.BytesToHash(key[n+blockNumberSize:]) + return num, hash, nil +} + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +func isBodyKey(key []byte) bool { + if len(key) != len(evmBlockBodyPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmBlockBodyPrefix) +} + +func isHeaderKey(key []byte) bool { + if len(key) != len(evmHeaderPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmHeaderPrefix) +} + +func isReceiptsKey(key []byte) bool { + if len(key) != len(evmReceiptsPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, evmReceiptsPrefix) +} + +func databaseMinHeight(db database.KeyValueReader) (uint64, bool, error) { + minBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return 0, false, nil + } + return 0, false, err + } + height := binary.BigEndian.Uint64(minBytes) + return height, true, nil +} + +func writeHashAndData( + db database.HeightIndex, + height uint64, + hash common.Hash, + data []byte, +) error { + encoded, err := rlp.EncodeToBytes([][]byte{hash.Bytes(), data}) + if err != nil { + return err + } + return db.Put(height, encoded) +} + +// readHashAndData reads data from [database.HeightIndex] and falls back +// to the [ethdb.Database] if the data is not found and migration is not complete. +func readHashAndData( + heightDB database.HeightIndex, + evmDB ethdb.KeyValueReader, + key []byte, + migrator *migrator, +) ([]byte, error) { + num, hash, err := parseBlockKey(key) + if err != nil { + return nil, err + } + data, err := heightDB.Get(num) + if err != nil { + if errors.Is(err, database.ErrNotFound) && !migrator.isCompleted() { + return evmDB.Get(key) + } + return nil, err + } + + var elems [][]byte + if err := rlp.DecodeBytes(data, &elems); err != nil { + return nil, err + } + if len(elems) != hashDataElements { + return nil, fmt.Errorf( + "invalid hash+data format: expected %d elements, got %d", + hashDataElements, + len(elems), + ) + } + h := common.BytesToHash(elems[0]) + if h != hash { + // Hash mismatch means we are trying to read a different block at this height. + return nil, database.ErrNotFound + } + + return elems[1], nil +} + +// IsEnabled checks if blockdb has ever been initialized. +// Returns true if the minimum block height key exists, which indicates the +// block database has been created and initialized before. +func IsEnabled(db database.KeyValueReader) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} diff --git a/vms/evm/database/blockdb/database_test.go b/vms/evm/database/blockdb/database_test.go new file mode 100644 index 000000000000..4f5c9cece7fb --- /dev/null +++ b/vms/evm/database/blockdb/database_test.go @@ -0,0 +1,514 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "os" + "slices" + "testing" + + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/plugin/evm/customtypes" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/ethdb" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +func TestMain(m *testing.M) { + customtypes.Register() + params.RegisterExtras() + os.Exit(m.Run()) +} + +func TestDatabaseWriteAndReadBlock(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for _, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + } +} + +func TestDatabaseWriteAndReadReceipts(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for i, block := range blocks { + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + actualReceipts := rawdb.ReadReceipts( + db, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig, + ) + requireRLPEqual(t, receipts[i], actualReceipts) + } +} + +func TestDatabaseReadLogs(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(db, blocks, receipts) + + for i, block := range blocks { + actualLogs := rawdb.ReadLogs(db, block.Hash(), block.NumberU64()) + recs := receipts[i] + requireRLPEqual(t, logsFromReceipts(recs), actualLogs) + } +} + +func TestDatabaseDeleteBlocksNoOp(t *testing.T) { + // Verifies that block header, body and receipts cannot be deleted (no-op), + // but hash to height mapping should be deleted. + tests := []struct { + name string + useBatch bool + batchSize int + }{ + {name: "delete block data is a no-op"}, + {name: "batch delete", useBatch: true}, + {name: "batch delete with size", useBatch: true, batchSize: 1024}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + allBlocks, allReceipts := createBlocks(t, 4) + blocks := allBlocks[1:] // skip genesis block + receipts := allReceipts[1:] + writeBlocks(db, blocks, receipts) + + // perform delete operations on all blocks + if tc.useBatch { + var batch ethdb.Batch + if tc.batchSize > 0 { + batch = db.NewBatchWithSize(tc.batchSize) + } else { + batch = db.NewBatch() + } + + for _, block := range blocks { + rawdb.DeleteBlock(batch, block.Hash(), block.NumberU64()) + } + require.NoError(t, batch.Write()) + } else { + for _, block := range blocks { + rawdb.DeleteBlock(db, block.Hash(), block.NumberU64()) + } + } + + // verify blocks still exist + for i, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + expReceipts := receipts[i] + logs := rawdb.ReadLogs(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, logsFromReceipts(expReceipts), logs) + } + }) + } +} + +func TestDatabaseWriteToHeightIndexedDB(t *testing.T) { + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + writeBlocks(db, blocks, receipts) + + block := blocks[1] + + // verify no block data in evmDB + require.False(t, rawdb.HasHeader(evmDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(evmDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasReceipts(evmDB, block.Hash(), block.NumberU64())) + + // verify block data in height-indexed databases + ok, err := db.headerDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) + ok, err = db.bodyDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) + ok, err = db.receiptsDB.Has(block.NumberU64()) + require.NoError(t, err) + require.True(t, ok) +} + +func TestDatabaseNewBatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block := blocks[1] + batch := db.NewBatch() + writeBlocks(batch, blocks, receipts) + + // after adding blocks to batch, blocks and receipts should be available immediately + require.True(t, rawdb.HasBody(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasHeader(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + + // header number should not be available until batch is written + require.Nil(t, rawdb.ReadHeaderNumber(db, block.Hash())) + require.NoError(t, batch.Write()) + num := rawdb.ReadHeaderNumber(db, block.Hash()) + require.Equal(t, block.NumberU64(), *num) +} + +func TestDatabaseNewBatchWithSize(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + batch := db.NewBatchWithSize(2048) + writeBlocks(batch, blocks, receipts) + require.NoError(t, batch.Write()) + + for _, block := range blocks { + require.True(t, rawdb.HasHeader(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(db, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasReceipts(db, block.Hash(), block.NumberU64())) + } +} + +func TestDatabaseWriteSameBlockTwice(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 2) + block := blocks[1] + + // write same block twice + rawdb.WriteBlock(db, block) + rawdb.WriteBlock(db, block) + + // we should be able to read the block after duplicate writes + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) +} + +func TestDatabaseWriteDifferentBlocksAtSameHeight(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + b1 := blocks[1] + r1 := receipts[1] + + // create a second block with the same height but different tx + to := addrFromTest(t, "different-to") + blocks2, receipts2 := createBlocksToUser(t, 2, to) + b2 := blocks2[1] + r2 := receipts2[1] + + // ensure both blocks have the same height but different hashes + require.Equal(t, b1.NumberU64(), b2.NumberU64()) + require.NotEqual(t, b1.Hash(), b2.Hash()) + + writeBlocks(db, []*types.Block{b1, b2}, []types.Receipts{r1, r2}) + + // reading by the first block's hash should not return anything + require.Nil(t, rawdb.ReadHeader(db, b1.Hash(), b1.NumberU64())) + require.Nil(t, rawdb.ReadBody(db, b1.Hash(), b1.NumberU64())) + require.Nil(t, rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig)) + require.False(t, rawdb.HasHeader(db, b1.Hash(), b1.NumberU64())) + require.False(t, rawdb.HasBody(db, b1.Hash(), b1.NumberU64())) + require.False(t, rawdb.HasReceipts(db, b1.Hash(), b1.NumberU64())) + + // reading by the second block's hash returns second block data + requireRLPEqual(t, b2, rawdb.ReadBlock(db, b2.Hash(), b2.NumberU64())) + actualReceipts := rawdb.ReadReceipts(db, b2.Hash(), b2.NumberU64(), b2.Time(), params.TestChainConfig) + requireRLPEqual(t, r2, actualReceipts) +} + +func TestDatabaseReopen(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + writeBlocks(db, blocks, receipts) + b1 := blocks[1] + r1 := receipts[1] + + // close db and verify we can no longer read block data + require.NoError(t, db.Close()) + block := rawdb.ReadBlock(db, b1.Hash(), b1.NumberU64()) + require.Nil(t, block) + recs := rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig) + require.Nil(t, recs) + _, err := db.headerDB.Get(b1.NumberU64()) + require.ErrorIs(t, err, database.ErrClosed) + + // reopen the database and data can be read again + db, _ = newDatabasesFromDir(t, dataDir) + block = rawdb.ReadBlock(db, b1.Hash(), b1.NumberU64()) + requireRLPEqual(t, b1, block) + actualReceipts := rawdb.ReadReceipts(db, b1.Hash(), b1.NumberU64(), b1.Time(), params.TestChainConfig) + requireRLPEqual(t, r1, actualReceipts) +} + +func TestDatabaseInitialization(t *testing.T) { + blocks, _ := createBlocks(t, 10) + + tests := []struct { + name string + deferInit bool + evmDBBlocks []*types.Block + dbMinHeight uint64 + wantDBReady bool + wantMinHeight uint64 + }{ + { + name: "empty evmDB and no deferred init", + wantDBReady: true, + wantMinHeight: 1, + }, + { + name: "empty evmDB and deferred init", + deferInit: true, + wantDBReady: false, // db should not be ready due to deferred init + }, + { + name: "non genesis blocks to migrate", + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate - including genesis", + evmDBBlocks: slices.Concat([]*types.Block{blocks[0]}, blocks[5:10]), + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate and deferred init", + deferInit: true, + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "existing db created with min height", + evmDBBlocks: blocks[5:8], + dbMinHeight: 2, + wantDBReady: true, + wantMinHeight: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + // create block databases with existing min height if needed + if tc.dbMinHeight > 0 { + db := Database{ + stateDB: base, + Database: evmDB, + dbPath: dataDir, + config: heightindexdb.DefaultConfig(), + reg: prometheus.NewRegistry(), + logger: logging.NoLog{}, + } + require.NoError(t, db.InitBlockDBs(tc.dbMinHeight)) + require.NoError(t, db.headerDB.Close()) + require.NoError(t, db.bodyDB.Close()) + require.NoError(t, db.receiptsDB.Close()) + minHeight, ok, err := databaseMinHeight(base) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, tc.dbMinHeight, minHeight) + } + + writeBlocks(evmDB, tc.evmDBBlocks, []types.Receipts{}) + db, _, err := New( + base, + evmDB, + dataDir, + tc.deferInit, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.Equal(t, tc.wantDBReady, db.heightDBsReady, "database ready mismatch") + require.Equal(t, tc.wantMinHeight, db.minHeight, "database min height mismatch") + }) + } +} + +func TestDatabaseGenesisBlockHandling(t *testing.T) { + // Verifies that genesis blocks (block 0) only exist in evmDB and not + // in the height-indexed databases. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 1) // first block is genesis + writeBlocks(db, blocks, receipts) + + // Validate genesis block can be retrieved and its stored in evmDB. + hash := rawdb.ReadCanonicalHash(evmDB, 0) + block := rawdb.ReadBlock(db, hash, 0) + requireRLPEqual(t, blocks[0], block) + _, err := db.headerDB.Get(0) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.receiptsDB.Get(0) + require.ErrorIs(t, err, database.ErrNotFound) + require.Equal(t, uint64(1), db.minHeight) +} + +func TestDatabaseInitBlockDBs(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + db, initialized, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.False(t, initialized) + + require.NoError(t, db.InitBlockDBs(10)) + require.Equal(t, uint64(10), db.minHeight) +} + +func TestDatabaseMinHeightWrites(t *testing.T) { + // Verifies writes are gated by minHeight: below threshold go to evmDB, + // at/above threshold go to height-index DBs. + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + db, _, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.NoError(t, db.InitBlockDBs(10)) + blocks, receipts := createBlocks(t, 11) + + // write block 9 (below minHeight) and block 10 (at minHeight) + writeBlocks(db, blocks[9:11], receipts[9:11]) + + // below threshold should not be in height DBs but in kvDB + has, err := db.headerDB.Has(9) + require.NoError(t, err) + require.False(t, has) + has, err = db.bodyDB.Has(9) + require.NoError(t, err) + require.False(t, has) + has, err = db.receiptsDB.Has(9) + require.NoError(t, err) + require.False(t, has) + require.True(t, rawdb.HasHeader(evmDB, blocks[9].Hash(), 9)) + require.True(t, rawdb.HasBody(evmDB, blocks[9].Hash(), 9)) + require.True(t, rawdb.HasReceipts(evmDB, blocks[9].Hash(), 9)) + + // at/above threshold should be in height DBs + _, err = db.bodyDB.Get(10) + require.NoError(t, err) + _, err = db.headerDB.Get(10) + require.NoError(t, err) + _, err = db.receiptsDB.Get(10) + require.NoError(t, err) + require.Nil(t, rawdb.ReadBlock(evmDB, blocks[10].Hash(), 10)) + require.False(t, rawdb.HasReceipts(evmDB, blocks[10].Hash(), 10)) +} + +func TestDatabaseHasReturnsFalseOnHashMismatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 3) + writeBlocks(db, blocks[1:3], receipts[1:3]) + + // fetch block 2 with block 1's hash + require.False(t, rawdb.HasHeader(db, blocks[1].Hash(), blocks[2].NumberU64())) + require.False(t, rawdb.HasBody(db, blocks[1].Hash(), blocks[2].NumberU64())) + require.False(t, rawdb.HasReceipts(db, blocks[1].Hash(), blocks[2].NumberU64())) +} + +func TestDatabaseAlreadyInitializedError(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + + err := db.InitBlockDBs(5) + require.ErrorIs(t, err, errAlreadyInitialized) + require.Equal(t, uint64(1), db.minHeight) +} + +func TestDatabaseGetNotFoundOnHashMismatch(t *testing.T) { + dataDir := t.TempDir() + db, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 3) + writeBlocks(db, blocks, receipts) + + // get block 1 with block 0's hash + _, err := db.Get(blockHeaderKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.Get(blockBodyKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) + _, err = db.Get(receiptsKey(1, blocks[0].Hash())) + require.ErrorIs(t, err, database.ErrNotFound) +} + +func TestIsEnabled(t *testing.T) { + // Verifies database min height is set on first init. + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + // initially not enabled + enabled, err := IsEnabled(base) + require.NoError(t, err) + require.False(t, enabled) + + // create db but don't initialize + db, initialized, err := New( + base, + evmDB, + dataDir, + true, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.False(t, initialized) + + // not enabled since InitBlockDBs was not called + enabled, err = IsEnabled(base) + require.NoError(t, err) + require.False(t, enabled) + + // now enabled + require.NoError(t, db.InitBlockDBs(10)) + enabled, err = IsEnabled(base) + require.NoError(t, err) + require.True(t, enabled) +} diff --git a/vms/evm/database/blockdb/helpers_test.go b/vms/evm/database/blockdb/helpers_test.go new file mode 100644 index 000000000000..09abf5c47b86 --- /dev/null +++ b/vms/evm/database/blockdb/helpers_test.go @@ -0,0 +1,198 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "fmt" + "math/big" + "slices" + "testing" + "time" + + "github.com/ava-labs/coreth/consensus/dummy" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +// blockingDatabase wraps a HeightIndex and blocks indefinitely on Put +// if shouldBlock is true. +type blockingDatabase struct { + database.HeightIndex + shouldBlock func() bool +} + +func (b *blockingDatabase) Put(num uint64, encodedBlock []byte) error { + if b.shouldBlock == nil || b.shouldBlock() { + <-make(chan struct{}) + } + return b.HeightIndex.Put(num, encodedBlock) +} + +func newDatabasesFromDir(t *testing.T, dataDir string) (*Database, ethdb.Database) { + t.Helper() + + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + return db, evmDB +} + +// addrFromTest returns a deterministic address derived from the test name and supplied salt. +func addrFromTest(t *testing.T, salt string) common.Address { + t.Helper() + h := crypto.Keccak256Hash([]byte(t.Name() + ":" + salt)) + return common.BytesToAddress(h.Bytes()[12:]) +} + +// createBlocksToUser generates blocks with a single funded sender and a tx to the provided recipient. +func createBlocksToUser(t *testing.T, numBlocks int, to common.Address) ([]*types.Block, []types.Receipts) { + t.Helper() + + key1, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 := crypto.PubkeyToAddress(key1.PublicKey) + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + gap := uint64(10) + db, blocks, receipts, err := core.GenerateChainWithGenesis( + gspec, engine, numBlocks-1, gap, func(_ int, gen *core.BlockGen) { + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &to, + Gas: 500_000, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + + // add genesis block since generated blocks and receipts don't include it + genHash := rawdb.ReadCanonicalHash(db, 0) + genBlock := rawdb.ReadBlock(db, genHash, 0) + genReceipts := rawdb.ReadReceipts(db, genHash, 0, 0, params.TestChainConfig) + blocks = slices.Concat([]*types.Block{genBlock}, blocks) + receipts = slices.Concat([]types.Receipts{genReceipts}, receipts) + + return blocks, receipts +} + +func createBlocks(t *testing.T, numBlocks int) ([]*types.Block, []types.Receipts) { + t.Helper() + to := addrFromTest(t, "default-to") + return createBlocksToUser(t, numBlocks, to) +} + +func writeBlocks(db ethdb.KeyValueWriter, blocks []*types.Block, receipts []types.Receipts) { + for i, block := range blocks { + rawdb.WriteBlock(db, block) + if i < len(receipts) { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + } + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + } +} + +func requireRLPEqual(t *testing.T, expected, actual interface{}) { + t.Helper() + expectedBytes, err := rlp.EncodeToBytes(expected) + require.NoError(t, err) + actualBytes, err := rlp.EncodeToBytes(actual) + require.NoError(t, err) + require.Equal(t, expectedBytes, actualBytes) +} + +func logsFromReceipts(receipts types.Receipts) [][]*types.Log { + logs := make([][]*types.Log, len(receipts)) + for i := range receipts { + logs[i] = receipts[i].Logs + } + return logs +} + +func startPartialMigration(t *testing.T, db *Database, blocksToMigrate uint64) { + t.Helper() + + n := uint64(0) + db.migrator.headerDB = &blockingDatabase{ + HeightIndex: db.headerDB, + shouldBlock: func() bool { + n++ + return n > blocksToMigrate + }, + } + startMigration(t, db, false) + require.Eventually(t, func() bool { + return db.migrator.processed.Load() >= blocksToMigrate + }, 5*time.Second, 100*time.Millisecond) +} + +func startMigration(t *testing.T, db *Database, waitForCompletion bool) { + t.Helper() + + db.migrator.completed.Store(false) + require.NoError(t, db.StartMigration()) + + if waitForCompletion { + timeout := 5 * time.Second + msg := fmt.Sprintf("Migration did not complete within timeout: %v", timeout) + require.True(t, waitMigratorDone(db.migrator, timeout), msg) + require.True(t, db.migrator.isCompleted()) + } +} + +// waitMigratorDone waits until the current migration run completes. +// If timeout <= 0, it waits indefinitely. +// Returns true if completed, false on timeout. +func waitMigratorDone(m *migrator, timeout time.Duration) bool { + // Snapshot done to avoid race with goroutine cleanup + m.mu.Lock() + done := m.done + m.mu.Unlock() + + if done == nil { + return true + } + if timeout <= 0 { + <-done + return true + } + t := time.NewTimer(timeout) + defer t.Stop() + select { + case <-done: + return true + case <-t.C: + return false + } +} diff --git a/vms/evm/database/blockdb/migration_test.go b/vms/evm/database/blockdb/migration_test.go new file mode 100644 index 000000000000..81d314ad7b46 --- /dev/null +++ b/vms/evm/database/blockdb/migration_test.go @@ -0,0 +1,298 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "slices" + "testing" + + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +func TestMigrationCompletion(t *testing.T) { + tests := []struct { + name string + want bool + dataToMigrate bool + migrate bool + }{ + { + name: "completed when no data to migrate", + want: true, + }, + { + name: "not completed if data to migrate", + dataToMigrate: true, + }, + { + name: "completed after migration", + dataToMigrate: true, + migrate: true, + want: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + if tc.dataToMigrate { + blocks, receipts := createBlocks(t, 5) + writeBlocks(evmDB, blocks, receipts) + } + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + if tc.migrate { + startMigration(t, db, true) + } + require.Equal(t, tc.want, db.migrator.isCompleted()) + }) + } +} + +func TestMigrationInProcess(t *testing.T) { + // Verifies blocks are readable during migration for both migrated + // and un-migrated blocks. + // The test generates 21 blocks, migrates 20 but pauses after 5, + // writes block 21, and verifies migrated and un-migrated blocks are readable. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // add blocks 0-19 to KVDB to migrate + writeBlocks(evmDB, blocks[0:20], receipts[0:20]) + + // migrate blocks 1-6 + startPartialMigration(t, db, 6) + + // write block 20 to simulate new block being added during migration + writeBlocks(db, blocks[20:21], receipts[20:21]) + + // verify all 21 blocks are readable via the db + for i, block := range blocks { + num := block.NumberU64() + expReceipts := receipts[i] + + // We should be able to fetch block, receipts and logs. + actualBlock := rawdb.ReadBlock(db, block.Hash(), num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, block.Hash(), num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(db, block.Hash(), num) + requireRLPEqual(t, logsFromReceipts(expReceipts), actualLogs) + + // header number should also be readable + actualNum := rawdb.ReadHeaderNumber(db, block.Hash()) + require.NotNil(t, actualNum) + require.Equal(t, num, *actualNum) + + // Block 1-6 and 20 should be migrated, others should not. + has, err := db.headerDB.Has(num) + require.NoError(t, err) + migrated := num >= 1 && num <= 6 || num == 20 + require.Equal(t, migrated, has) + } +} + +func TestMigrationStart(t *testing.T) { + tests := []struct { + name string + toMigrateHeights []uint64 + migratedHeights []uint64 + }{ + { + name: "migrate blocks 0-4", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + }, + { + name: "migrate blocks 20-24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + }, + { + name: "migrate non consecutive blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + }, + { + name: "migrated 0-5 and to migrate 6-10", + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "all blocks migrated", + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "no blocks to migrate or migrated", + }, + { + name: "non consecutive blocks migrated and blocks to migrate", + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + allHeights := slices.Concat(tc.toMigrateHeights, tc.migratedHeights) + var maxHeight uint64 + if len(allHeights) > 0 { + maxHeight = slices.Max(allHeights) + } + blocks, receipts := createBlocks(t, int(maxHeight)+1) + + // set initial db state + for _, height := range tc.toMigrateHeights { + writeBlocks(evmDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(db, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Verify all blocks and receipts are accessible after migration. + startMigration(t, db, true) + for _, height := range allHeights { + expBlock := blocks[height] + expReceipts := receipts[height] + block := rawdb.ReadBlock(db, expBlock.Hash(), height) + requireRLPEqual(t, expBlock, block) + receipts := rawdb.ReadReceipts(db, expBlock.Hash(), height, expBlock.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, receipts) + logs := rawdb.ReadLogs(db, expBlock.Hash(), height) + requireRLPEqual(t, logsFromReceipts(expReceipts), logs) + + // Verify evmDB no longer has any blocks or receipts (except for genesis). + hasData := height == 0 + require.Equal(t, hasData, rawdb.HasHeader(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasBody(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasReceipts(evmDB, expBlock.Hash(), height)) + } + }) + } +} + +func TestMigrationResume(t *testing.T) { + // Verifies migration can be stopped mid-run and resumed. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks, receipts) + + // block migration after 3 blocks + startPartialMigration(t, db, 3) + require.False(t, db.migrator.isCompleted()) + + for i := 0; i < 10; i++ { + migrated := i >= 1 && i <= 3 // blocks 1-3 are migrated + has, err := db.bodyDB.Has(uint64(i)) + require.NoError(t, err) + require.Equal(t, migrated, has) + } + + // stop migration and start again + require.NoError(t, db.Database.Close()) + db, _ = newDatabasesFromDir(t, dataDir) + require.False(t, db.migrator.isCompleted()) + startMigration(t, db, true) + + // verify all blocks are accessible after migration + for i, block := range blocks { + num := block.NumberU64() + hash := block.Hash() + actualBlock := rawdb.ReadBlock(db, hash, num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, hash, num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, receipts[i], actualReceipts) + } +} + +func TestMigrationSkipsGenesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks[0:1], receipts[0:1]) + writeBlocks(evmDB, blocks[5:10], receipts[5:10]) + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, db.heightDBsReady) + require.Equal(t, uint64(5), db.minHeight) + + // migrate and verify genesis block is not migrated + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + genHash := rawdb.ReadCanonicalHash(evmDB, 0) + require.True(t, rawdb.HasHeader(evmDB, genHash, 0)) + has, err := db.bodyDB.Has(0) + require.NoError(t, err) + require.False(t, has) +} + +func TestMigrationWithoutReceipts(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, _ := createBlocks(t, 5) + + // write blocks without receipts to evmDB + for _, block := range blocks { + rawdb.WriteBlock(evmDB, block) + rawdb.WriteCanonicalHash(evmDB, block.Hash(), block.NumberU64()) + } + + db, initialized, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, initialized) + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + + // verify all blocks are accessible and receipts are nil + for _, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + recs := rawdb.ReadReceipts(db, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + require.Nil(t, recs) + } +} diff --git a/vms/evm/database/blockdb/migrator.go b/vms/evm/database/blockdb/migrator.go new file mode 100644 index 000000000000..02218bf73585 --- /dev/null +++ b/vms/evm/database/blockdb/migrator.go @@ -0,0 +1,483 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" +) + +const ( + // logProgressInterval controls how often migration progress is logged. + logProgressInterval = 30 * time.Second + // compactionInterval is the number of blocks to process before compacting the database. + compactionInterval = 250_000 + // stopTimeout is the maximum time to wait for migration to stop gracefully. + // 5 seconds allows cleanup operations to complete without blocking shutdown indefinitely. + stopTimeout = 5 * time.Second +) + +// targetBlockHeightKey stores the head height captured at first run for ETA only. +var targetBlockHeightKey = []byte("migration_target_block_height") + +type migrator struct { + // Databases + evmDB ethdb.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + // Concurrency control + mu sync.Mutex // protects cancel and done + cancel context.CancelFunc + done chan struct{} + + // Migration state + completed atomic.Bool + processed atomic.Uint64 + endHeight uint64 + + logger logging.Logger +} + +func newMigrator( + db database.Database, + headerDB database.HeightIndex, + bodyDB database.HeightIndex, + receiptsDB database.HeightIndex, + evmDB ethdb.Database, + logger logging.Logger, +) (*migrator, error) { + m := &migrator{ + headerDB: headerDB, + bodyDB: bodyDB, + receiptsDB: receiptsDB, + evmDB: evmDB, + logger: logger, + } + + _, ok, err := minBlockHeightToMigrate(evmDB) + if err != nil { + return nil, err + } + if !ok { + m.completed.Store(true) + m.logger.Info("No block data to migrate; migration already complete") + return m, nil + } + + // load saved end block height + endHeight, ok, err := targetBlockHeight(db) + if err != nil { + return nil, err + } + if !ok { + // load and save head block number as end block height + if num, ok := headBlockNumber(evmDB); ok { + endHeight = num + if err := writeTargetBlockHeight(db, endHeight); err != nil { + return nil, err + } + m.logger.Info( + "Migration target height set", + zap.Uint64("targetHeight", endHeight), + ) + } + } + m.endHeight = endHeight + + return m, nil +} + +func (m *migrator) isCompleted() bool { + return m.completed.Load() +} + +func (m *migrator) stop() { + // Snapshot cancel/done to avoid TOCTOU race with endRun. + m.mu.Lock() + cancel := m.cancel + done := m.done + m.mu.Unlock() + + if cancel == nil { + return // no active migration + } + + cancel() + if done != nil { + select { + case <-done: + // worker finished cleanup + case <-time.After(stopTimeout): + m.logger.Warn("Migration shutdown timeout exceeded") + } + } +} + +// start begins the migration process in a background goroutine. +// Returns immediately if migration is already completed or running. +func (m *migrator) start() { + if m.isCompleted() { + return + } + ctx, ok := m.beginRun() + if !ok { + m.logger.Warn("Migration already running") + return + } + + go func() { + defer close(m.done) + defer m.endRun() + if err := m.run(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + m.logger.Error("Migration failed", zap.Error(err)) + } + } + }() +} + +func (m *migrator) beginRun() (context.Context, bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancel != nil { + return nil, false // migration already running + } + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + m.done = make(chan struct{}) + m.processed.Store(0) + return ctx, true +} + +func (m *migrator) endRun() { + m.mu.Lock() + defer m.mu.Unlock() + + m.cancel = nil + m.done = nil +} + +func (m *migrator) run(ctx context.Context) error { + var ( + // Progress tracking + etaTarget uint64 // target # of blocks to process + etaTracker = timer.NewEtaTracker(10, 1) + start = time.Now() + nextLog = start.Add(logProgressInterval) + + // Batch to accumulate delete operations before writing + batch = m.evmDB.NewBatch() + lastCompact uint64 // blocks processed at last compaction + + // Compaction tracking + canCompact bool + startBlockNum uint64 + endBlockNum uint64 + + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating (e.g., canonical hash mappings). + iter = m.evmDB.NewIterator(evmBlockBodyPrefix, nil) + ) + + defer func() { + iter.Release() + + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + m.logger.Error("Failed to write final delete batch", zap.Error(err)) + } + } + + // Compact final range if we processed any blocks after last interval compaction. + if canCompact { + m.compactBlockRange(startBlockNum, endBlockNum) + } + + duration := time.Since(start) + m.logger.Info( + "Block data migration ended", + zap.Uint64("targetHeight", m.endHeight), + zap.Uint64("blocksProcessed", m.processed.Load()), + zap.Uint64("lastProcessedHeight", endBlockNum), + zap.Duration("duration", duration), + zap.Bool("completed", m.isCompleted()), + ) + }() + + m.logger.Info( + "Block data migration started", + zap.Uint64("targetHeight", m.endHeight), + ) + + // Iterate over all block bodies in ascending order by block number. + for iter.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + key := iter.Key() + if !isMigratableKey(m.evmDB, key) { + continue + } + + blockNum, hash, err := parseBlockKey(key) + if err != nil { + return err + } + + if etaTarget == 0 && m.endHeight > 0 && blockNum < m.endHeight { + etaTarget = m.endHeight - blockNum + etaTracker.AddSample(0, etaTarget, start) + } + + // track the range of blocks for compaction + if !canCompact { + startBlockNum = blockNum + canCompact = true + } + endBlockNum = blockNum + + if err := m.migrateHeader(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate header data: %w", err) + } + if err := m.migrateBody(blockNum, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate body data: %w", err) + } + if err := m.migrateReceipts(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate receipt data: %w", err) + } + if err := deleteBlock(batch, blockNum, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + processed := m.processed.Add(1) + + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + batch.Reset() + } + + // compact every compactionInterval blocks + if processed-lastCompact >= compactionInterval { + // write any remaining deletes in batch before compaction + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + batch.Reset() + } + + iter.Release() + if canCompact { + m.compactBlockRange(startBlockNum, endBlockNum) + } + + start := encodeBlockNumber(blockNum + 1) + iter = m.evmDB.NewIterator(evmBlockBodyPrefix, start) + lastCompact = processed + canCompact = false + } + + // log progress every logProgressInterval + if now := time.Now(); now.After(nextLog) { + fields := []zap.Field{ + zap.Uint64("blocksProcessed", processed), + zap.Uint64("lastProcessedHeight", blockNum), + zap.Duration("timeElapsed", time.Since(start)), + } + if etaTarget > 0 { + eta, pct := etaTracker.AddSample(processed, etaTarget, now) + if eta != nil { + fields = append(fields, + zap.Duration("eta", *eta), + zap.String("progress", fmt.Sprintf("%.2f%%", pct)), + ) + } + } + + m.logger.Info("Block data migration progress", fields...) + nextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over evmDB: %w", iter.Error()) + } + + m.completed.Store(true) + return nil +} + +func (m *migrator) compactBlockRange(startNum, endNum uint64) { + start := time.Now() + + compactRange(m.evmDB, blockHeaderKey, startNum, endNum, m.logger) + compactRange(m.evmDB, blockBodyKey, startNum, endNum, m.logger) + compactRange(m.evmDB, receiptsKey, startNum, endNum, m.logger) + + m.logger.Info("Compaction of block range completed", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Duration("duration", time.Since(start))) +} + +func (m *migrator) migrateHeader(num uint64, hash common.Hash) error { + header := rawdb.ReadHeader(m.evmDB, hash, num) + if header == nil { + return fmt.Errorf("header not found for block %d hash %s", num, hash) + } + hBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + if err := writeHashAndData(m.headerDB, num, hash, hBytes); err != nil { + return fmt.Errorf("failed to write header to headerDB: %w", err) + } + return nil +} + +func (m *migrator) migrateBody(num uint64, hash common.Hash, body []byte) error { + if err := writeHashAndData(m.bodyDB, num, hash, body); err != nil { + return fmt.Errorf("failed to write body to bodyDB: %w", err) + } + return nil +} + +func (m *migrator) migrateReceipts(num uint64, hash common.Hash) error { + receipts := rawdb.ReadReceiptsRLP(m.evmDB, hash, num) + if receipts == nil { + return nil + } + + if err := writeHashAndData(m.receiptsDB, num, hash, receipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + return nil +} + +func deleteBlock(db ethdb.KeyValueWriter, num uint64, hash common.Hash) error { + // Avoid rawdb.DeleteHeader to preserve number/hash canonical mappings during migration. + headerKey := blockHeaderKey(num, hash) + if err := db.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from evmDB: %w", err) + } + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + return nil +} + +func targetBlockHeight(db database.KeyValueReader) (uint64, bool, error) { + numBytes, err := db.Get(targetBlockHeightKey) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return 0, false, nil + } + return 0, false, err + } + if len(numBytes) != blockNumberSize { + return 0, false, fmt.Errorf("invalid block number encoding length: %d", len(numBytes)) + } + height := binary.BigEndian.Uint64(numBytes) + return height, true, nil +} + +func headBlockNumber(db ethdb.KeyValueReader) (uint64, bool) { + hash := rawdb.ReadHeadHeaderHash(db) + num := rawdb.ReadHeaderNumber(db, hash) + if num == nil || *num == 0 { + return 0, false + } + return *num, true +} + +func writeTargetBlockHeight(db database.KeyValueWriter, endHeight uint64) error { + return db.Put(targetBlockHeightKey, encodeBlockNumber(endHeight)) +} + +func isMigratableKey(db ethdb.Reader, key []byte) bool { + if !isBodyKey(key) { + return false + } + num, hash, err := parseBlockKey(key) + if err != nil { + return false + } + + // Skip genesis since all nodes have it and we won't benefit from + // minHeight > 0 when state-syncing. + if num == 0 { + return false + } + + canonHash := rawdb.ReadCanonicalHash(db, num) + return canonHash == hash +} + +func blockHeaderKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmHeaderPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func blockBodyKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmBlockBodyPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func receiptsKey(num uint64, hash common.Hash) []byte { + return slices.Concat(evmReceiptsPrefix, encodeBlockNumber(num), hash.Bytes()) +} + +func minBlockHeightToMigrate(db ethdb.Database) (uint64, bool, error) { + iter := db.NewIterator(evmBlockBodyPrefix, nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key() + if !isMigratableKey(db, key) { + continue + } + blockNum, _, err := parseBlockKey(key) + if err != nil { + continue + } + return blockNum, true, nil + } + return 0, false, iter.Error() +} + +func compactRange( + db ethdb.Compacter, + keyFunc func(uint64, common.Hash) []byte, + startNum, endNum uint64, + logger logging.Logger, +) { + startKey := keyFunc(startNum, common.Hash{}) + endKey := keyFunc(endNum+1, common.Hash{}) + if err := db.Compact(startKey, endKey); err != nil { + logger.Error("Failed to compact data in range", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Error(err)) + } +}