diff --git a/common/bytes.go b/common/bytes.go index d1f5c6c99586..ddcf248cc02a 100644 --- a/common/bytes.go +++ b/common/bytes.go @@ -149,3 +149,24 @@ func TrimRightZeroes(s []byte) []byte { } return s[:idx] } + +// UpperBound returns the upper bound for iteration over keys with the given prefix. +// It returns the next key in lexicographic order that is greater than all keys with +// the given prefix. This is useful for setting iteration bounds in databases. +// Returns nil if no such upper bound exists (e.g., if prefix is empty or all 0xff bytes). +func UpperBound(prefix []byte) []byte { + if len(prefix) == 0 { + return nil + } + var limit []byte + for i := len(prefix) - 1; i >= 0; i-- { + c := prefix[i] + if c < 0xff { + limit = make([]byte, i+1) + copy(limit, prefix) + limit[i] = c + 1 + break + } + } + return limit +} diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index 8abe7d4bc72b..d0aa5c8bf31f 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -450,21 +450,6 @@ func (d *Database) NewBatchWithSize(size int) ethdb.Batch { } } -// upperBound returns the upper bound for the given prefix -func upperBound(prefix []byte) (limit []byte) { - for i := len(prefix) - 1; i >= 0; i-- { - c := prefix[i] - if c == 0xff { - continue - } - limit = make([]byte, i+1) - copy(limit, prefix) - limit[i] = c + 1 - break - } - return limit -} - // Stat returns the internal metrics of Pebble in a text format. It's a developer // method to read everything there is to read, independent of Pebble version. func (d *Database) Stat() (string, error) { @@ -731,7 +716,7 @@ type pebbleIterator struct { func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator { iter, _ := d.db.NewIter(&pebble.IterOptions{ LowerBound: append(prefix, start...), - UpperBound: upperBound(prefix), + UpperBound: common.UpperBound(prefix), }) iter.First() return &pebbleIterator{iter: iter, moved: true, released: false} diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 2cd211e2c20f..250932594c31 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -20,20 +20,20 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "errors" "fmt" "net/netip" "os" + "path/filepath" "sync" "time" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) // Keys in the node database. @@ -59,7 +59,7 @@ const ( const ( dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped. dbCleanupCycle = time.Hour // Time period for running the expiration task. - dbVersion = 9 + dbVersion = 10 ) var ( @@ -71,7 +71,7 @@ var zeroIP = netip.IPv6Unspecified() // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { - lvl *leveldb.DB // Interface to the database itself + db *pebble.DB // The pebble database runner sync.Once // Ensures we can start at most one expirer quit chan struct{} // Channel to signal the expiring thread to stop } @@ -87,49 +87,81 @@ func OpenDB(path string) (*DB, error) { // newMemoryDB creates a new in-memory node database without a persistent backend. func newMemoryDB() (*DB, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) + db, err := pebble.Open("", &pebble.Options{ + FS: vfs.NewMem(), + }) if err != nil { return nil, err } - return &DB{lvl: db, quit: make(chan struct{})}, nil + return &DB{db: db, quit: make(chan struct{})}, nil } -// newPersistentDB creates/opens a leveldb backed persistent node database, -// also flushing its contents in case of a version mismatch. +// newPersistentDB creates/opens a pebble-backed persistent node database. +// If an old leveldb database is detected, it will be removed and replaced with pebble. func newPersistentDB(path string) (*DB, error) { - opts := &opt.Options{OpenFilesCacheCapacity: 5} - db, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) + // Check if there's a pre-existing leveldb database + if hasLevelDB(path) { + log.Info("Detected old leveldb node database, migrating to pebble", "path", path) + if err := os.RemoveAll(path); err != nil { + return nil, fmt.Errorf("Failed to remove old leveldb database: %w", err) + } } + opts := &pebble.Options{MaxOpenFiles: 5} + db, err := pebble.Open(path, opts) if err != nil { - return nil, err + // If the database is corrupted, remove it and try again. + if pebble.IsCorruptionError(err) { + log.Warn("Corrupted node database detected, removing and recreating", "path", path, "err", err) + if removeErr := os.RemoveAll(path); removeErr != nil { + return nil, fmt.Errorf("Failed to remove corrupted database: %w", removeErr) + } + db, err = pebble.Open(path, opts) + } + if err != nil { + return nil, err + } } // The nodes contained in the cache correspond to a certain protocol version. // Flush all nodes if the version doesn't match. - currentVer := make([]byte, binary.MaxVarintLen64) - currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))] - - blob, err := db.Get([]byte(dbVersionKey), nil) - switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil { + currentVer := encodeVarint(int64(dbVersion)) + blob, closer, err := db.Get([]byte(dbVersionKey)) + if err == pebble.ErrNotFound { + // New database, write version + if err := db.Set([]byte(dbVersionKey), currentVer, pebble.Sync); err != nil { db.Close() return nil, err } - - case nil: - // Version present, flush if different - if !bytes.Equal(blob, currentVer) { + } else if err != nil { + db.Close() + return nil, err + } else { + // Check version match + match := bytes.Equal(blob, currentVer) + closer.Close() + if !match { + // Version mismatch, reset database db.Close() - if err = os.RemoveAll(path); err != nil { + if err := os.RemoveAll(path); err != nil { return nil, err } return newPersistentDB(path) } } - return &DB{lvl: db, quit: make(chan struct{})}, nil + return &DB{db: db, quit: make(chan struct{})}, nil +} + +// hasLevelDB checks if the path contains a leveldb database. +func hasLevelDB(path string) bool { + // Check for CURRENT file + if _, err := os.Stat(filepath.Join(path, "CURRENT")); err != nil { + return false + } + // Check for absence of OPTIONS* files (which indicates pebble) + matches, err := filepath.Glob(filepath.Join(path, "OPTIONS*")) + if err != nil { + return false + } + return len(matches) == 0 } // nodeKey returns the database key for a node record. @@ -194,12 +226,27 @@ func localItemKey(id ID, field string) []byte { return key } +// encodeVarint encodes an int64 as a varint. +func encodeVarint(n int64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + buf = buf[:binary.PutVarint(buf, n)] + return buf +} + +// encodeUvarint encodes a uint64 as a varint. +func encodeUvarint(n uint64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + buf = buf[:binary.PutUvarint(buf, n)] + return buf +} + // fetchInt64 retrieves an integer associated with a particular key. func (db *DB) fetchInt64(key []byte) int64 { - blob, err := db.lvl.Get(key, nil) + blob, closer, err := db.db.Get(key) if err != nil { return 0 } + defer closer.Close() val, read := binary.Varint(blob) if read <= 0 { return 0 @@ -209,34 +256,32 @@ func (db *DB) fetchInt64(key []byte) int64 { // storeInt64 stores an integer in the given key. func (db *DB) storeInt64(key []byte, n int64) error { - blob := make([]byte, binary.MaxVarintLen64) - blob = blob[:binary.PutVarint(blob, n)] - return db.lvl.Put(key, blob, nil) + return db.db.Set(key, encodeVarint(n), pebble.Sync) } // fetchUint64 retrieves an integer associated with a particular key. func (db *DB) fetchUint64(key []byte) uint64 { - blob, err := db.lvl.Get(key, nil) + blob, closer, err := db.db.Get(key) if err != nil { return 0 } + defer closer.Close() val, _ := binary.Uvarint(blob) return val } // storeUint64 stores an integer in the given key. func (db *DB) storeUint64(key []byte, n uint64) error { - blob := make([]byte, binary.MaxVarintLen64) - blob = blob[:binary.PutUvarint(blob, n)] - return db.lvl.Put(key, blob, nil) + return db.db.Set(key, encodeUvarint(n), pebble.Sync) } // Node retrieves a node with a given id from the database. func (db *DB) Node(id ID) *Node { - blob, err := db.lvl.Get(nodeKey(id), nil) + blob, closer, err := db.db.Get(nodeKey(id)) if err != nil { return nil } + defer closer.Close() return mustDecodeNode(id[:], blob) } @@ -260,7 +305,7 @@ func (db *DB) UpdateNode(node *Node) error { if err != nil { return err } - if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil { + if err := db.db.Set(nodeKey(node.ID()), blob, pebble.Sync); err != nil { return err } return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) @@ -282,14 +327,17 @@ func (db *DB) Resolve(n *Node) *Node { // DeleteNode deletes all information associated with a node. func (db *DB) DeleteNode(id ID) { - deleteRange(db.lvl, nodeKey(id)) + deleteRange(db.db, nodeKey(id)) } -func deleteRange(db *leveldb.DB, prefix []byte) { - it := db.NewIterator(util.BytesPrefix(prefix), nil) - defer it.Release() - for it.Next() { - db.Delete(it.Key(), nil) +func deleteRange(db *pebble.DB, prefix []byte) { + iter, _ := db.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + UpperBound: common.UpperBound(prefix), + }) + defer iter.Close() + for iter.First(); iter.Valid(); iter.Next() { + db.Delete(iter.Key(), pebble.Sync) } } @@ -324,9 +372,13 @@ func (db *DB) expirer() { // expireNodes iterates over the database and deletes all nodes that have not // been seen (i.e. received a pong from) for some time. func (db *DB) expireNodes() { - it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil) - defer it.Release() - if !it.Next() { + iter, _ := db.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte(dbNodePrefix), + UpperBound: common.UpperBound([]byte(dbNodePrefix)), + }) + defer iter.Close() + + if !iter.First() { return } @@ -336,24 +388,24 @@ func (db *DB) expireNodes() { atEnd = false ) for !atEnd { - id, ip, field := splitNodeItemKey(it.Key()) + id, ip, field := splitNodeItemKey(iter.Key()) if field == dbNodePong { - time, _ := binary.Varint(it.Value()) + time, _ := binary.Varint(iter.Value()) if time > youngestPong { youngestPong = time } if time < threshold { // Last pong from this IP older than threshold, remove fields belonging to it. - deleteRange(db.lvl, nodeItemKey(id, ip, "")) + deleteRange(db.db, nodeItemKey(id, ip, "")) } } - atEnd = !it.Next() - nextID, _ := splitNodeKey(it.Key()) + atEnd = !iter.Next() + nextID, _ := splitNodeKey(iter.Key()) if atEnd || nextID != id { // We've moved beyond the last entry of the current ID. // Remove everything if there was no recent enough pong. if youngestPong > 0 && youngestPong < threshold { - deleteRange(db.lvl, nodeKey(id)) + deleteRange(db.db, nodeKey(id)) } youngestPong = 0 } @@ -390,7 +442,7 @@ func (db *DB) LastPongReceived(id ID, ip netip.Addr) time.Time { // UpdateLastPongReceived updates the last pong time of a node. func (db *DB) UpdateLastPongReceived(id ID, ip netip.Addr, instance time.Time) error { if !ip.IsValid() { - return errInvalidIP + return fmt.Errorf("invalid IP") } return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix()) } @@ -406,7 +458,7 @@ func (db *DB) FindFails(id ID, ip netip.Addr) int { // UpdateFindFails updates the number of findnode failures since bonding. func (db *DB) UpdateFindFails(id ID, ip netip.Addr, fails int) error { if !ip.IsValid() { - return errInvalidIP + return fmt.Errorf("invalid IP") } return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails)) } @@ -422,7 +474,7 @@ func (db *DB) FindFailsV5(id ID, ip netip.Addr) int { // UpdateFindFailsV5 stores the discv5 findnode failure counter. func (db *DB) UpdateFindFailsV5(id ID, ip netip.Addr, fails int) error { if !ip.IsValid() { - return errInvalidIP + return fmt.Errorf("invalid IP") } return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails)) } @@ -448,10 +500,10 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { var ( now = time.Now() nodes = make([]*Node, 0, n) - it = db.lvl.NewIterator(nil, nil) + it, _ = db.db.NewIter(nil) id ID ) - defer it.Release() + defer it.Close() seek: for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { @@ -461,7 +513,7 @@ seek: ctr := id[0] rand.Read(id[:]) id[0] = ctr + id[0]%16 - it.Seek(nodeKey(id)) + it.SeekGE(nodeKey(id)) n := nextNode(it) if n == nil { @@ -483,13 +535,15 @@ seek: // reads the next node record from the iterator, skipping over other // database entries. -func nextNode(it iterator.Iterator) *Node { - for end := false; !end; end = !it.Next() { +func nextNode(it *pebble.Iterator) *Node { + for it.Valid() { id, rest := splitNodeKey(it.Key()) - if string(rest) != dbDiscoverRoot { - continue + if string(rest) == dbDiscoverRoot { + return mustDecodeNode(id[:], it.Value()) + } + if !it.Next() { + break } - return mustDecodeNode(id[:], it.Value()) } return nil } @@ -501,5 +555,5 @@ func (db *DB) Close() { default: close(db.quit) } - db.lvl.Close() + db.db.Close() }