Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
}

ignoreList := s.ignorelist.ToNodeList()
ignoredSet := hashedIDSetFromNodes(ignoreList)

globalClosestContacts := make(map[string]*NodeList)
var closestMu sync.RWMutex
Expand All @@ -800,7 +801,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
continue
}

top6 := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil)
top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil)
closestMu.Lock()
globalClosestContacts[keys[i]] = top6
closestMu.Unlock()
Expand Down Expand Up @@ -1144,6 +1145,7 @@ func (s *DHT) BatchRetrieveStream(
delete(knownNodes, string(self.ID))

ignoreList := s.ignorelist.ToNodeList()
ignoredSet := hashedIDSetFromNodes(ignoreList)
globalClosestContacts := make(map[string]*NodeList)
var closestMu sync.RWMutex

Expand All @@ -1152,7 +1154,7 @@ func (s *DHT) BatchRetrieveStream(
if _, found := resSeen.Load(hexKeys[i]); found {
continue
}
topK := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil)
topK := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil)
closestMu.Lock()
globalClosestContacts[keys[i]] = topK
closestMu.Unlock()
Expand Down Expand Up @@ -2134,6 +2136,8 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
globalClosestContacts := make(map[string]*NodeList)
knownNodes := make(map[string]*Node)
hashes := make([][]byte, len(values))
ignoreList := s.ignorelist.ToNodeList()
ignoredSet := hashedIDSetFromNodes(ignoreList)

{
f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"}
Expand All @@ -2145,7 +2149,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
for i := 0; i < len(values); i++ {
target, _ := utils.Blake3Hash(values[i])
hashes[i] = target
top6 := s.ht.closestContactsWithIncludingNode(Alpha, target, s.ignorelist.ToNodeList(), nil)
top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil)

globalClosestContacts[base58.Encode(target)] = top6
// log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin")
Expand Down
213 changes: 182 additions & 31 deletions p2p/kademlia/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,95 @@ func ensureHashedTarget(target []byte) []byte {
return target
}

type hashedIDKey [32]byte

func hashedKeyFromNode(node *Node) (hashedIDKey, bool) {
var k hashedIDKey
if node == nil {
return k, false
}

hashedID := node.HashedID
if len(hashedID) != 32 {
if len(node.ID) == 0 {
return k, false
}
h, err := utils.Blake3Hash(node.ID)
if err != nil || len(h) != 32 {
return k, false
}
hashedID = h
}

copy(k[:], hashedID)
return k, true
}

type hashedIDSet map[hashedIDKey]struct{}

func hashedIDSetFromNodes(nodes []*Node) hashedIDSet {
if len(nodes) == 0 {
return nil
}
set := make(hashedIDSet, len(nodes))
for _, n := range nodes {
if k, ok := hashedKeyFromNode(n); ok {
set[k] = struct{}{}
}
}
return set
}

func (s hashedIDSet) contains(hashedID []byte) bool {
if len(s) == 0 || len(hashedID) != 32 {
return false
}
var k hashedIDKey
copy(k[:], hashedID)
_, ok := s[k]
return ok
}

func hashedKeysFromNodes(nodes []*Node) []hashedIDKey {
if len(nodes) == 0 {
return nil
}
keys := make([]hashedIDKey, 0, len(nodes))
for _, n := range nodes {
if k, ok := hashedKeyFromNode(n); ok {
keys = append(keys, k)
}
}
return keys
}

func hasHashedID(hashedID []byte, keys []hashedIDKey) bool {
if len(keys) == 0 || len(hashedID) != 32 {
return false
}
for i := range keys {
if bytes.Equal(hashedID, keys[i][:]) {
return true
}
}
return false
}

func hasNodeWithHashedID(nodes []*Node, hashedID []byte) bool {
if len(hashedID) != 32 {
return false
}
for _, n := range nodes {
if n == nil || len(n.HashedID) != 32 {
continue
}
if bytes.Equal(n.HashedID, hashedID) {
return true
}
}
return false
}

// hasBucketNode: compare on HashedID
func (ht *HashTable) hasBucketNode(bucket int, hashedID []byte) bool {
ht.mutex.RLock()
Expand Down Expand Up @@ -250,85 +339,147 @@ func (ht *HashTable) RemoveNode(index int, hashedID []byte) bool {

// closestContacts: use HashedID in ignored-map
func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Node) (*NodeList, int) {
ht.mutex.RLock()
defer ht.mutex.RUnlock()

hashedTarget := ensureHashedTarget(target)
ignoredKeys := hashedKeysFromNodes(ignoredNodes)

ignoredMap := make(map[string]bool, len(ignoredNodes))
for _, node := range ignoredNodes {
ignoredMap[string(node.HashedID)] = true
ht.mutex.RLock()
total := 0
for _, bucket := range ht.routeTable {
total += len(bucket)
}

nl := &NodeList{Comparator: hashedTarget}
nodes := make([]*Node, 0, total)
counter := 0
for _, bucket := range ht.routeTable {
for _, node := range bucket {
counter++
if !ignoredMap[string(node.HashedID)] {
nl.AddNodes([]*Node{node})
if node == nil {
continue
}
if hasHashedID(node.HashedID, ignoredKeys) {
continue
}
nodes = append(nodes, node)
}
}
ht.mutex.RUnlock()

nl := &NodeList{Comparator: hashedTarget, Nodes: nodes}
nl.Sort()
nl.TopN(num)
return nl, counter
}

// keep an alias for old callers; fix typo in new name
func (ht *HashTable) closestContactsWithIncludingNode(num int, target []byte, ignoredNodes []*Node, includeNode *Node) *NodeList {
hashedTarget := ensureHashedTarget(target)
ignoredKeys := hashedKeysFromNodes(ignoredNodes)

ht.mutex.RLock()
defer ht.mutex.RUnlock()
total := 0
for _, bucket := range ht.routeTable {
total += len(bucket)
}
nodes := make([]*Node, 0, total+1)
for _, bucket := range ht.routeTable {
for _, node := range bucket {
if node == nil {
continue
}
if hasHashedID(node.HashedID, ignoredKeys) {
continue
}
nodes = append(nodes, node)
}
}
ht.mutex.RUnlock()

hashedTarget := ensureHashedTarget(target)
ignoredMap := make(map[string]bool, len(ignoredNodes))
for _, node := range ignoredNodes {
ignoredMap[string(node.HashedID)] = true
if includeNode != nil {
includeNode.SetHashedID()
if !hasNodeWithHashedID(nodes, includeNode.HashedID) {
nodes = append(nodes, includeNode)
}
}

nl := &NodeList{Comparator: hashedTarget}
nl := &NodeList{Comparator: hashedTarget, Nodes: nodes}
nl.Sort()
nl.TopN(num)
return nl
}

// closestContactsWithIncludingNodeWithIgnoredSet is an optimized variant for batch callers that
// can precompute the ignored hashed-ID set once and reuse it across many lookups.
func (ht *HashTable) closestContactsWithIncludingNodeWithIgnoredSet(num int, target []byte, ignoredSet hashedIDSet, includeNode *Node) *NodeList {
hashedTarget := ensureHashedTarget(target)

ht.mutex.RLock()
total := 0
for _, bucket := range ht.routeTable {
total += len(bucket)
}
nodes := make([]*Node, 0, total+1)
for _, bucket := range ht.routeTable {
for _, node := range bucket {
if !ignoredMap[string(node.HashedID)] {
nl.AddNodes([]*Node{node})
if node == nil {
continue
}
if ignoredSet.contains(node.HashedID) {
continue
}
nodes = append(nodes, node)
}
}
ht.mutex.RUnlock()

if includeNode != nil {
nl.AddNodes([]*Node{includeNode})
includeNode.SetHashedID()
if !hasNodeWithHashedID(nodes, includeNode.HashedID) {
nodes = append(nodes, includeNode)
}
}

nl := &NodeList{Comparator: hashedTarget, Nodes: nodes}
nl.Sort()
nl.TopN(num)
return nl
}

func (ht *HashTable) closestContactsWithIncludingNodeList(num int, target []byte, ignoredNodes []*Node, nodesToInclude []*Node) *NodeList {
ht.mutex.RLock()
defer ht.mutex.RUnlock()

hashedTarget := ensureHashedTarget(target)
ignoredMap := make(map[string]bool, len(ignoredNodes))
for _, node := range ignoredNodes {
ignoredMap[string(node.HashedID)] = true
}
ignoredKeys := hashedKeysFromNodes(ignoredNodes)

nl := &NodeList{Comparator: hashedTarget}
ht.mutex.RLock()
total := 0
for _, bucket := range ht.routeTable {
total += len(bucket)
}
nodes := make([]*Node, 0, total+len(nodesToInclude))
for _, bucket := range ht.routeTable {
for _, node := range bucket {
if !ignoredMap[string(node.HashedID)] {
nl.AddNodes([]*Node{node})
if node == nil {
continue
}
if hasHashedID(node.HashedID, ignoredKeys) {
continue
}
nodes = append(nodes, node)
}
}
ht.mutex.RUnlock()

if len(nodesToInclude) > 0 {
for _, node := range nodesToInclude {
if !nl.exists(node) {
nl.AddNodes([]*Node{node})
if node == nil {
continue
}
node.SetHashedID()
if hasNodeWithHashedID(nodes, node.HashedID) {
continue
}
nodes = append(nodes, node)
}
}

nl := &NodeList{Comparator: hashedTarget, Nodes: nodes}
nl.Sort()
nl.TopN(num)
return nl
Expand Down
20 changes: 19 additions & 1 deletion p2p/kademlia/rq_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kademlia
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"time"

Expand All @@ -19,9 +21,12 @@ func (s *DHT) startStoreSymbolsWorker(ctx context.Context) {
// Minimal visibility for lifecycle + each tick
logtrace.Debug(ctx, "rq_symbols worker started", logtrace.Fields{logtrace.FieldModule: "p2p"})

ticker := time.NewTicker(defaultSoreSymbolsInterval)
defer ticker.Stop()

for {
select {
case <-time.After(defaultSoreSymbolsInterval):
case <-ticker.C:
if err := s.storeSymbols(ctx); err != nil {
logtrace.Error(ctx, "store symbols", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err})
}
Expand Down Expand Up @@ -102,6 +107,19 @@ func (s *DHT) scanDirAndStoreSymbols(ctx context.Context, dir, txid string) erro
if err := s.rqstore.SetIsCompleted(txid); err != nil {
return fmt.Errorf("set is-completed: %w", err)
}

cleanDir := filepath.Clean(dir)
if txid == "" || cleanDir == "" || cleanDir == "." || cleanDir == ".." || cleanDir == string(filepath.Separator) {
logtrace.Warn(ctx, "worker: skip removing unsafe symbols dir", logtrace.Fields{"dir": dir, "txid": txid, "clean_dir": cleanDir})
return nil
}
if filepath.Base(cleanDir) != txid {
logtrace.Warn(ctx, "worker: skip removing symbols dir with unexpected base", logtrace.Fields{"dir": dir, "txid": txid, "clean_dir": cleanDir})
return nil
}
if err := os.RemoveAll(cleanDir); err != nil {
logtrace.Warn(ctx, "worker: remove symbols dir failed", logtrace.Fields{"dir": cleanDir, "txid": txid, logtrace.FieldError: err.Error()})
}
return nil
}

Expand Down
Loading
Loading