From eea755b2776011682f487bc714300d33e971649a Mon Sep 17 00:00:00 2001 From: j-rafique Date: Thu, 1 Jan 2026 15:09:12 +0500 Subject: [PATCH 1/4] perf: reduce symbol I/O and add decode retries --- p2p/kademlia/rq_symbols.go | 20 ++- pkg/utils/utils.go | 30 ++++ supernode/adaptors/p2p.go | 169 ++++++++++++++------ supernode/cascade/download.go | 284 +++++++++++++++++----------------- supernode/cascade/helper.go | 4 +- supernode/cascade/register.go | 2 +- supernode/status/metrics.go | 10 +- 7 files changed, 322 insertions(+), 197 deletions(-) diff --git a/p2p/kademlia/rq_symbols.go b/p2p/kademlia/rq_symbols.go index 7aa2c578..891f6070 100644 --- a/p2p/kademlia/rq_symbols.go +++ b/p2p/kademlia/rq_symbols.go @@ -3,6 +3,8 @@ package kademlia import ( "context" "fmt" + "os" + "path/filepath" "sort" "time" @@ -19,9 +21,12 @@ func (s *DHT) startStoreSymbolsWorker(ctx context.Context) { // Minimal visibility for lifecycle + each tick logtrace.Debug(ctx, "rq_symbols worker started", logtrace.Fields{logtrace.FieldModule: "p2p"}) + ticker := time.NewTicker(defaultSoreSymbolsInterval) + defer ticker.Stop() + for { select { - case <-time.After(defaultSoreSymbolsInterval): + case <-ticker.C: if err := s.storeSymbols(ctx); err != nil { logtrace.Error(ctx, "store symbols", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err}) } @@ -102,6 +107,19 @@ func (s *DHT) scanDirAndStoreSymbols(ctx context.Context, dir, txid string) erro if err := s.rqstore.SetIsCompleted(txid); err != nil { return fmt.Errorf("set is-completed: %w", err) } + + cleanDir := filepath.Clean(dir) + if txid == "" || cleanDir == "" || cleanDir == "." || cleanDir == ".." || cleanDir == string(filepath.Separator) { + logtrace.Warn(ctx, "worker: skip removing unsafe symbols dir", logtrace.Fields{"dir": dir, "txid": txid, "clean_dir": cleanDir}) + return nil + } + if filepath.Base(cleanDir) != txid { + logtrace.Warn(ctx, "worker: skip removing symbols dir with unexpected base", logtrace.Fields{"dir": dir, "txid": txid, "clean_dir": cleanDir}) + return nil + } + if err := os.RemoveAll(cleanDir); err != nil { + logtrace.Warn(ctx, "worker: remove symbols dir failed", logtrace.Fields{"dir": cleanDir, "txid": txid, logtrace.FieldError: err.Error()}) + } return nil } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 720b2f87..fad26462 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -364,6 +364,36 @@ func DeleteSymbols(ctx context.Context, dir string, keys []string) error { return nil } +// PruneEmptyBlockDirs removes empty "block_*" directories under dir. +// Best-effort: errors are returned but callers may choose to log and continue. +func PruneEmptyBlockDirs(dir string) error { + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + for _, ent := range entries { + if !ent.IsDir() { + continue + } + name := ent.Name() + if !strings.HasPrefix(name, "block_") { + continue + } + p := filepath.Join(dir, name) + children, err := os.ReadDir(p) + if err != nil { + return err + } + if len(children) != 0 { + continue + } + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + return err + } + } + return nil +} + // ReadDirFilenames returns a map whose keys are "block_*/file" paths, values nil. func ReadDirFilenames(dirPath string) (map[string][]byte, error) { idMap := make(map[string][]byte) diff --git a/supernode/adaptors/p2p.go b/supernode/adaptors/p2p.go index ce218a4f..2900652d 100644 --- a/supernode/adaptors/p2p.go +++ b/supernode/adaptors/p2p.go @@ -3,15 +3,14 @@ package adaptors import ( "context" "fmt" - "io/fs" "math" "math/rand/v2" "path/filepath" "sort" - "strings" "time" "github.com/LumeraProtocol/supernode/v2/p2p" + "github.com/LumeraProtocol/supernode/v2/pkg/codec" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/v2/pkg/utils" @@ -42,35 +41,65 @@ type StoreArtefactsRequest struct { ActionID string IDFiles [][]byte SymbolsDir string + Layout codec.Layout } func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error { - logtrace.Info(ctx, "store: p2p start", logtrace.Fields{"taskID": req.TaskID, "actionID": req.ActionID, "id_files": len(req.IDFiles), "symbols_dir": req.SymbolsDir}) + idFilesBytes := totalBytes(req.IDFiles) + logtrace.Info(ctx, "store: p2p start", logtrace.Fields{ + "taskID": req.TaskID, + "actionID": req.ActionID, + "id_files": len(req.IDFiles), + "id_files_bytes": idFilesBytes, + "id_files_mb_est": utils.BytesIntToMB(idFilesBytes), + "symbols_dir": req.SymbolsDir, + }) start := time.Now() - firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles) + firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles, req.Layout) if err != nil { return fmt.Errorf("error storing artefacts: %w", err) } - remaining := 0 - if req.SymbolsDir != "" { - if keys, werr := walkSymbolTree(req.SymbolsDir); werr == nil { - remaining = len(keys) - } + remainingEst := totalSymbols - firstPassSymbols + if remainingEst < 0 { + remainingEst = 0 } - logtrace.Info(ctx, "store: first-pass complete", logtrace.Fields{"taskID": req.TaskID, "symbols_first_pass": firstPassSymbols, "symbols_total_available": totalSymbols, "id_files_count": len(req.IDFiles), "symbols_left_on_disk": remaining, "ms": time.Since(start).Milliseconds()}) - if remaining == 0 { + logtrace.Info(ctx, "store: first-pass complete", logtrace.Fields{ + "taskID": req.TaskID, + "symbols_first_pass": firstPassSymbols, + "symbols_total_available": totalSymbols, + "id_files_count": len(req.IDFiles), + "symbols_left_on_disk_est": remainingEst, + "ms": time.Since(start).Milliseconds(), + }) + if remainingEst == 0 { logtrace.Info(ctx, "store: dir empty after first-pass", logtrace.Fields{"taskID": req.TaskID, "dir": req.SymbolsDir}) } return nil } -func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte) (int, int, error) { +func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte, layout codec.Layout) (int, int, error) { if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil { return 0, 0, fmt.Errorf("store symbol dir: %w", err) } - keys, err := walkSymbolTree(symbolsDir) + metadataBytes := totalBytes(metadataFiles) + keys, err := symbolKeysFromLayout(layout) if err != nil { - return 0, 0, err + logtrace.Warn(ctx, "store: layout keys unavailable; falling back to disk scan", logtrace.Fields{ + "taskID": taskID, + "dir": symbolsDir, + "err": err.Error(), + }) + if symbolsDir == "" { + return 0, 0, err + } + keySet, derr := utils.ReadDirFilenames(symbolsDir) + if derr != nil { + return 0, 0, fmt.Errorf("symbol keys from layout: %w; dir scan: %v", err, derr) + } + keys = make([]string, 0, len(keySet)) + for k := range keySet { + keys = append(keys, k) + } } totalAvailable := len(keys) targetCount := int(math.Ceil(float64(totalAvailable) * storeSymbolsPercent / 100.0)) @@ -90,6 +119,8 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action logtrace.Info(ctx, "store: selected symbols", logtrace.Fields{"selected": len(keys), "of_total": totalAvailable, "dir": symbolsDir}) logtrace.Info(ctx, "store: sending symbols", logtrace.Fields{"count": len(keys)}) totalSymbols := 0 + totalBytesStored := 0 + metadataBytesStored := 0 firstBatchProcessed := false for start := 0; start < len(keys); { end := min(start+loadSymbolsBatchSize, len(keys)) @@ -107,18 +138,37 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action if err != nil { return 0, 0, fmt.Errorf("load symbols: %w", err) } + symBytesLen := totalBytes(symBytes) payload := make([][]byte, 0, len(metadataFiles)+len(symBytes)) payload = append(payload, metadataFiles...) payload = append(payload, symBytes...) - logtrace.Info(ctx, "store: batch send (first)", logtrace.Fields{"taskID": taskID, "metadata_count": len(metadataFiles), "symbols_in_batch": len(symBytes), "payload_total": len(payload)}) + logtrace.Info(ctx, "store: batch send (first)", logtrace.Fields{ + "taskID": taskID, + "metadata_count": len(metadataFiles), + "metadata_bytes": metadataBytes, + "metadata_mb_est": utils.BytesIntToMB(metadataBytes), + "symbols_in_batch": len(symBytes), + "symbols_bytes": symBytesLen, + "symbols_mb_est": utils.BytesIntToMB(symBytesLen), + "payload_total": len(payload), + "payload_bytes": metadataBytes + symBytesLen, + "payload_mb_est": utils.BytesIntToMB(metadataBytes + symBytesLen), + }) bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout) err = p.p2p.StoreBatch(bctx, payload, P2PDataRaptorQSymbol, taskID) cancel() if err != nil { return totalSymbols, totalAvailable, fmt.Errorf("p2p store batch (first): %w", err) } - logtrace.Info(ctx, "store: batch ok (first)", logtrace.Fields{"taskID": taskID, "symbols_stored": len(symBytes)}) + logtrace.Info(ctx, "store: batch ok (first)", logtrace.Fields{ + "taskID": taskID, + "symbols_stored": len(symBytes), + "symbols_bytes": symBytesLen, + "payload_bytes": metadataBytes + symBytesLen, + }) totalSymbols += len(symBytes) + totalBytesStored += symBytesLen + metadataBytes + metadataBytesStored += metadataBytes if len(batch) > 0 { if err := utils.DeleteSymbols(ctx, symbolsDir, batch); err != nil { return totalSymbols, totalAvailable, fmt.Errorf("delete symbols: %w", err) @@ -126,62 +176,93 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action } firstBatchProcessed = true } else { - count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch) + count, bytes, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch) if err != nil { return totalSymbols, totalAvailable, err } totalSymbols += count + totalBytesStored += bytes } start = end } + if err := utils.PruneEmptyBlockDirs(symbolsDir); err != nil { + logtrace.Warn(ctx, "store: prune block dirs failed", logtrace.Fields{"taskID": taskID, "dir": symbolsDir, logtrace.FieldError: err.Error()}) + } if err := p.rqStore.UpdateIsFirstBatchStored(taskID); err != nil { return totalSymbols, totalAvailable, fmt.Errorf("update first-batch flag: %w", err) } + logtrace.Info(ctx, "store: first-pass bytes summary", logtrace.Fields{ + "taskID": taskID, + "symbols_stored": totalSymbols, + "symbols_total": totalAvailable, + "symbols_bytes_stored": totalBytesStored - metadataBytesStored, + "metadata_bytes": metadataBytes, + "payload_bytes_stored": totalBytesStored, + "payload_mb_est": utils.BytesIntToMB(totalBytesStored), + }) return totalSymbols, totalAvailable, nil } -func walkSymbolTree(root string) ([]string, error) { - var keys []string - err := filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return nil - } - if strings.EqualFold(filepath.Ext(d.Name()), ".json") { - return nil - } - rel, err := filepath.Rel(root, path) - if err != nil { - return err +func symbolKeysFromLayout(layout codec.Layout) ([]string, error) { + if len(layout.Blocks) == 0 { + return nil, fmt.Errorf("empty layout: no blocks") + } + keys := make([]string, 0, 1024) + seen := make(map[string]struct{}, 1024) + for _, block := range layout.Blocks { + blockDir := fmt.Sprintf("block_%d", block.BlockID) + for _, symbolID := range block.Symbols { + if symbolID == "" { + continue + } + k := filepath.Join(blockDir, symbolID) + if _, ok := seen[k]; ok { + continue + } + seen[k] = struct{}{} + keys = append(keys, k) } - keys = append(keys, rel) - return nil - }) - if err != nil { - return nil, fmt.Errorf("walk symbol tree: %w", err) } return keys, nil } -func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) (int, error) { +func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) (count int, bytes int, err error) { logtrace.Debug(ctx, "loading batch symbols", logtrace.Fields{"taskID": taskID, "count": len(fileKeys)}) symbols, err := utils.LoadSymbols(root, fileKeys) if err != nil { - return 0, fmt.Errorf("load symbols: %w", err) + return 0, 0, fmt.Errorf("load symbols: %w", err) } + symbolsBytes := totalBytes(symbols) symCtx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout) defer cancel() - logtrace.Info(ctx, "store: batch send (symbols)", logtrace.Fields{"taskID": taskID, "symbols_in_batch": len(symbols)}) + logtrace.Info(ctx, "store: batch send (symbols)", logtrace.Fields{ + "taskID": taskID, + "symbols_in_batch": len(symbols), + "symbols_bytes": symbolsBytes, + "symbols_mb_est": utils.BytesIntToMB(symbolsBytes), + }) if err := c.p2p.StoreBatch(symCtx, symbols, P2PDataRaptorQSymbol, taskID); err != nil { - return len(symbols), fmt.Errorf("p2p store batch: %w", err) + return len(symbols), symbolsBytes, fmt.Errorf("p2p store batch: %w", err) } - logtrace.Info(ctx, "store: batch ok (symbols)", logtrace.Fields{"taskID": taskID, "symbols_stored": len(symbols)}) + logtrace.Info(ctx, "store: batch ok (symbols)", logtrace.Fields{ + "taskID": taskID, + "symbols_stored": len(symbols), + "symbols_bytes": symbolsBytes, + }) if err := utils.DeleteSymbols(ctx, root, fileKeys); err != nil { - return len(symbols), fmt.Errorf("delete symbols: %w", err) + return len(symbols), symbolsBytes, fmt.Errorf("delete symbols: %w", err) + } + return len(symbols), symbolsBytes, nil +} + +func totalBytes(chunks [][]byte) int { + // totalBytes sums the byte lengths of each blob in a slice. + // It is used for store telemetry and batch sizing diagnostics. + total := 0 + for _, c := range chunks { + total += len(c) } - return len(symbols), nil + return total } func min(a, b int) int { diff --git a/supernode/cascade/download.go b/supernode/cascade/download.go index 02ab879d..a1851ea8 100644 --- a/supernode/cascade/download.go +++ b/supernode/cascade/download.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "sort" - "strings" "sync" "sync/atomic" "time" @@ -192,84 +191,6 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID, send) } -func (task *CascadeRegistrationTask) restoreFileFromLayoutDeprecated(ctx context.Context, layout codec.Layout, dataHash string, actionID string, send func(resp *DownloadResponse) error) (string, string, error) { - fields := logtrace.Fields{logtrace.FieldActionID: actionID} - symSet := make(map[string]struct{}) - for _, block := range layout.Blocks { - for _, s := range block.Symbols { - symSet[s] = struct{}{} - } - } - allSymbols := make([]string, 0, len(symSet)) - for s := range symSet { - allSymbols = append(allSymbols, s) - } - sort.Strings(allSymbols) - totalSymbols := len(allSymbols) - fields["totalSymbols"] = totalSymbols - targetRequiredCount := (totalSymbols*targetRequiredPercent + 99) / 100 - if targetRequiredCount < 1 && totalSymbols > 0 { - targetRequiredCount = 1 - } - logtrace.Info(ctx, "download: plan symbols", logtrace.Fields{"total_symbols": totalSymbols, "target_required_percent": targetRequiredPercent, "target_required_count": targetRequiredCount}) - retrieveStart := time.Now() - reqCount := targetRequiredCount - if reqCount > totalSymbols { - reqCount = totalSymbols - } - rStart := time.Now() - logtrace.Info(ctx, "download: batch retrieve start", logtrace.Fields{"action_id": actionID, "requested": reqCount, "total_candidates": totalSymbols}) - symbols, err := task.P2PClient.BatchRetrieve(ctx, allSymbols, reqCount, actionID) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "batch retrieve failed", fields) - return "", "", fmt.Errorf("batch retrieve symbols: %w", err) - } - retrieveMS := time.Since(retrieveStart).Milliseconds() - logtrace.Info(ctx, "download: batch retrieve ok", logtrace.Fields{"action_id": actionID, "received": len(symbols), "ms": time.Since(rStart).Milliseconds()}) - decodeStart := time.Now() - dStart := time.Now() - logtrace.Info(ctx, "download: decode start", logtrace.Fields{"action_id": actionID}) - decodeInfo, err := task.RQ.Decode(ctx, adaptors.DecodeRequest{ActionID: actionID, Symbols: symbols, Layout: layout}) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "decode failed", fields) - return "", "", fmt.Errorf("decode symbols using RaptorQ: %w", err) - } - decodeMS := time.Since(decodeStart).Milliseconds() - logtrace.Info(ctx, "download: decode ok", logtrace.Fields{"action_id": actionID, "ms": time.Since(dStart).Milliseconds(), "tmp_dir": decodeInfo.DecodeTmpDir, "file_path": decodeInfo.FilePath}) - // Emit timing metrics for network retrieval and decode phases - logtrace.Debug(ctx, "download: timing", logtrace.Fields{"action_id": actionID, "retrieve_ms": retrieveMS, "decode_ms": decodeMS}) - - // Verify reconstructed file hash matches action metadata - fileHash, herr := utils.Blake3HashFile(decodeInfo.FilePath) - if herr != nil { - fields[logtrace.FieldError] = herr.Error() - logtrace.Error(ctx, "failed to hash file", fields) - return "", "", fmt.Errorf("hash file: %w", herr) - } - if fileHash == nil { - fields[logtrace.FieldError] = "file hash is nil" - logtrace.Error(ctx, "failed to hash file", fields) - return "", "", errors.New("file hash is nil") - } - if verr := cascadekit.VerifyB64DataHash(fileHash, dataHash); verr != nil { - fields[logtrace.FieldError] = verr.Error() - logtrace.Error(ctx, "failed to verify hash", fields) - return "", decodeInfo.DecodeTmpDir, verr - } - logtrace.Debug(ctx, "request data-hash has been matched with the action data-hash", fields) - logtrace.Info(ctx, "download: file verified", fields) - // Emit minimal JSON payload (metrics system removed) - info := map[string]interface{}{"action_id": actionID, "found_symbols": len(symbols), "target_percent": targetRequiredPercent} - if b, err := json.Marshal(info); err == nil { - if err := task.streamDownloadEvent(ctx, SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send); err != nil { - return "", decodeInfo.DecodeTmpDir, err - } - } - return decodeInfo.FilePath, decodeInfo.DecodeTmpDir, nil -} - func (task *CascadeRegistrationTask) restoreFileFromLayout( ctx context.Context, layout codec.Layout, @@ -280,18 +201,37 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( fields := logtrace.Fields{logtrace.FieldActionID: actionID} - // Unique symbols - symSet := make(map[string]struct{}, 1024) - for _, block := range layout.Blocks { - for _, s := range block.Symbols { - symSet[s] = struct{}{} + // Prefer layout order for single-block Cascade (reduces "missing open" churn vs lexicographic sorts). + var allSymbols []string + if len(layout.Blocks) == 1 { + allSymbols = make([]string, 0, len(layout.Blocks[0].Symbols)) + seen := make(map[string]struct{}, len(layout.Blocks[0].Symbols)) + for _, s := range layout.Blocks[0].Symbols { + if s == "" { + continue + } + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + allSymbols = append(allSymbols, s) } + } else { + symSet := make(map[string]struct{}, 1024) + for _, block := range layout.Blocks { + for _, s := range block.Symbols { + if s == "" { + continue + } + symSet[s] = struct{}{} + } + } + allSymbols = make([]string, 0, len(symSet)) + for s := range symSet { + allSymbols = append(allSymbols, s) + } + sort.Strings(allSymbols) } - allSymbols := make([]string, 0, len(symSet)) - for s := range symSet { - allSymbols = append(allSymbols, s) - } - sort.Strings(allSymbols) totalSymbols := len(allSymbols) fields["totalSymbols"] = totalSymbols @@ -309,7 +249,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( return "", "", errors.New("no symbols present in layout") } - // Prepare RQ workspace once; stream symbols directly into it + // Prepare RQ workspace once; stream symbols directly into it, and retry decode by fetching more. logtrace.Info(ctx, "download: prepare RQ workspace", logtrace.Fields{"action_id": actionID}) _, writeSymbol, cleanup, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout) if perr != nil { @@ -324,89 +264,147 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( } }() - // Track exactly which symbol IDs we wrote (base58 IDs) + var writtenSet sync.Map // base58 symbol id -> struct{} var written int32 - var writtenSet sync.Map // b58 symbol id -> struct{} onSymbol := func(symbolID string, data []byte) error { if _, err := writeSymbol(-1, symbolID, data); err != nil { return err } - writtenSet.Store(symbolID, struct{}{}) - atomic.AddInt32(&written, 1) + if _, loaded := writtenSet.LoadOrStore(symbolID, struct{}{}); !loaded { + atomic.AddInt32(&written, 1) + } return nil } - // 1) Local batched streaming retrieveStart := time.Now() - logtrace.Info(ctx, "download: local scan start", logtrace.Fields{"action_id": actionID, "requested": targetRequiredCount, "total_candidates": totalSymbols}) - localFound, lerr := task.P2PClient.BatchRetrieveStream(ctx, allSymbols, int32(targetRequiredCount), actionID, onSymbol, true) - if lerr != nil && !strings.Contains(strings.ToLower(lerr.Error()), "local-only") { - fields[logtrace.FieldError] = lerr.Error() - logtrace.Error(ctx, "local batch retrieve stream failed", fields) - return "", ws.SymbolsDir, fmt.Errorf("local batch retrieve stream: %w", lerr) - } - - // If needed, compute the remaining keys that were NOT written in pass 1 - if int(localFound) < targetRequiredCount { - remaining := int32(targetRequiredCount) - localFound + reqCount := targetRequiredCount + step := (totalSymbols*5 + 99) / 100 // +5% of total symbols (rounded up) + if step < 1 { + step = 1 + } + const maxDecodeAttempts = 4 + + var decodeInfo adaptors.DecodeResult + var lastDecodeErr error + for attempt := 1; attempt <= maxDecodeAttempts; attempt++ { + have := int(atomic.LoadInt32(&written)) + need := reqCount - have + if need > 0 { + // Start with a smaller candidate set; expand if we don't have enough remaining keys. + candidates := allSymbols + if want := reqCount * 2; want < len(allSymbols) { + candidateCount := want + maxStart := len(allSymbols) - candidateCount + start := ((attempt - 1) * candidateCount) % (maxStart + 1) + candidates = allSymbols[start : start+candidateCount] + } - // Build a compact slice of only the symbols not written by the local pass - remainingKeys := make([]string, 0, len(allSymbols)) - for _, k := range allSymbols { - if _, ok := writtenSet.Load(k); !ok { + remainingKeys := make([]string, 0, len(candidates)) + for _, k := range candidates { + if k == "" { + continue + } + if _, ok := writtenSet.Load(k); ok { + continue + } remainingKeys = append(remainingKeys, k) } + + if len(remainingKeys) < need && len(candidates) < len(allSymbols) { + // Fall back to all symbols to avoid getting stuck on a narrow prefix. + remainingKeys = remainingKeys[:0] + for _, k := range allSymbols { + if k == "" { + continue + } + if _, ok := writtenSet.Load(k); ok { + continue + } + remainingKeys = append(remainingKeys, k) + } + } + + logtrace.Info(ctx, "download: batch retrieve start", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "requested": need, + "have": have, + "target": reqCount, + "candidates": len(candidates), + "keys": len(remainingKeys), + }) + rStart := time.Now() + if _, rerr := task.P2PClient.BatchRetrieveStream(ctx, remainingKeys, int32(need), actionID, onSymbol); rerr != nil { + fields[logtrace.FieldError] = rerr.Error() + logtrace.Error(ctx, "batch retrieve stream failed", fields) + return "", ws.SymbolsDir, fmt.Errorf("batch retrieve stream: %w", rerr) + } + logtrace.Info(ctx, "download: batch retrieve ok", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "ms": time.Since(rStart).Milliseconds(), + "have": atomic.LoadInt32(&written), + }) } - logtrace.Info(ctx, "download: network retrieve start", logtrace.Fields{ - "action_id": actionID, "remaining": remaining, "candidate_keys": len(remainingKeys), + decodeStart := time.Now() + logtrace.Info(ctx, "download: decode start", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "received": atomic.LoadInt32(&written), + "target": reqCount, }) - - if len(remainingKeys) == 0 { - logtrace.Warn(ctx, "no remaining keys after local pass but remaining > 0; proceeding with allSymbols as fallback", - logtrace.Fields{"action_id": actionID, "remaining": remaining}) - remainingKeys = allSymbols + decodeInfo, lastDecodeErr = task.RQ.DecodeFromPrepared(ctx, ws, layout) + if lastDecodeErr == nil { + retrieveMS := time.Since(retrieveStart).Milliseconds() + decodeMS := time.Since(decodeStart).Milliseconds() + logtrace.Info(ctx, "download: decode ok", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "ms": decodeMS, + "tmp_dir": decodeInfo.DecodeTmpDir, + "file_path": decodeInfo.FilePath, + }) + logtrace.Debug(ctx, "download: timing", logtrace.Fields{"action_id": actionID, "retrieve_ms": retrieveMS, "decode_ms": decodeMS}) + break } - // Network phase on only the remaining keys; avoids a second local scan & duplicate writes - if _, nerr := task.P2PClient.BatchRetrieveStream(ctx, remainingKeys, remaining, actionID, onSymbol /* network allowed */); nerr != nil { - fields[logtrace.FieldError] = nerr.Error() - logtrace.Error(ctx, "network batch retrieve stream failed", fields) - return "", ws.SymbolsDir, fmt.Errorf("network batch retrieve stream: %w", nerr) + fields[logtrace.FieldError] = lastDecodeErr.Error() + logtrace.Warn(ctx, "decode failed; will fetch more symbols and retry", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "received": atomic.LoadInt32(&written), + "target": reqCount, + "err": lastDecodeErr.Error(), + }) + + if reqCount >= totalSymbols { + return "", ws.SymbolsDir, fmt.Errorf("decode symbols using RaptorQ: %w", lastDecodeErr) + } + reqCount += step + if reqCount > totalSymbols { + reqCount = totalSymbols } } - retrieveMS := time.Since(retrieveStart).Milliseconds() - logtrace.Info(ctx, "download: batch retrieve (stream) ok", logtrace.Fields{ - "action_id": actionID, "received": atomic.LoadInt32(&written), "retrieve_ms": retrieveMS, - }) - - // 2) Decode from prepared workspace - decodeStart := time.Now() - logtrace.Info(ctx, "download: decode start", logtrace.Fields{"action_id": actionID}) - decodeInfo, derr := task.RQ.DecodeFromPrepared(ctx, ws, layout) - if derr != nil { - fields[logtrace.FieldError] = derr.Error() - logtrace.Error(ctx, "decode failed", fields) - return "", ws.SymbolsDir, fmt.Errorf("decode RaptorQ: %w", derr) + if decodeInfo.FilePath == "" { + if lastDecodeErr != nil { + return "", ws.SymbolsDir, fmt.Errorf("decode symbols using RaptorQ: %w", lastDecodeErr) + } + return "", ws.SymbolsDir, errors.New("decode failed after retries") } - decodeMS := time.Since(decodeStart).Milliseconds() - logtrace.Info(ctx, "download: decode ok", logtrace.Fields{ - "action_id": actionID, "ms": decodeMS, "tmp_dir": decodeInfo.DecodeTmpDir, "file_path": decodeInfo.FilePath, - }) - logtrace.Debug(ctx, "download: timing", logtrace.Fields{"action_id": actionID, "retrieve_ms": retrieveMS, "decode_ms": decodeMS}) // 3) Verify hash fileHash, herr := utils.Blake3HashFile(decodeInfo.FilePath) if herr != nil { fields[logtrace.FieldError] = herr.Error() logtrace.Error(ctx, "failed to hash file", fields) - return "", ws.SymbolsDir, fmt.Errorf("hash file: %w", herr) + return "", decodeInfo.DecodeTmpDir, fmt.Errorf("hash file: %w", herr) } if fileHash == nil { fields[logtrace.FieldError] = "file hash is nil" logtrace.Error(ctx, "failed to hash file", fields) - return "", ws.SymbolsDir, errors.New("file hash is nil") + return "", decodeInfo.DecodeTmpDir, errors.New("file hash is nil") } if verr := cascadekit.VerifyB64DataHash(fileHash, dataHash); verr != nil { fields[logtrace.FieldError] = verr.Error() @@ -418,7 +416,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( logtrace.Info(ctx, "download: file verified", fields) // Event - info := map[string]interface{}{"action_id": actionID, "found_symbols": int(atomic.LoadInt32(&written)), "target_percent": targetRequiredPercent} + info := map[string]interface{}{"action_id": actionID, "found_symbols": atomic.LoadInt32(&written), "target_percent": targetRequiredPercent} if b, err := json.Marshal(info); err == nil { if err := task.streamDownloadEvent(ctx, SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send); err != nil { return "", decodeInfo.DecodeTmpDir, err diff --git a/supernode/cascade/helper.go b/supernode/cascade/helper.go index b5537d24..6b887345 100644 --- a/supernode/cascade/helper.go +++ b/supernode/cascade/helper.go @@ -137,7 +137,7 @@ func (task *CascadeRegistrationTask) generateRQIDFiles(ctx context.Context, meta return indexIDs, allFiles, nil } -func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionID string, idFiles [][]byte, symbolsDir string, f logtrace.Fields) error { +func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionID string, idFiles [][]byte, symbolsDir string, layout codec.Layout, f logtrace.Fields) error { if f == nil { f = logtrace.Fields{} } @@ -147,7 +147,7 @@ func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionI } ctx = logtrace.CtxWithOrigin(ctx, "first_pass") logtrace.Info(ctx, "store: first-pass begin", lf) - if err := task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{IDFiles: idFiles, SymbolsDir: symbolsDir, TaskID: task.taskID, ActionID: actionID}, f); err != nil { + if err := task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{IDFiles: idFiles, SymbolsDir: symbolsDir, Layout: layout, TaskID: task.taskID, ActionID: actionID}, f); err != nil { return task.wrapErr(ctx, "failed to store artefacts", err, lf) } logtrace.Info(ctx, "store: first-pass ok", lf) diff --git a/supernode/cascade/register.go b/supernode/cascade/register.go index 1693d61f..d3f78da8 100644 --- a/supernode/cascade/register.go +++ b/supernode/cascade/register.go @@ -167,7 +167,7 @@ func (task *CascadeRegistrationTask) Register( } // Step 12: Store artefacts to the network store - if err := task.storeArtefacts(ctx, action.ActionID, idFiles, encodeResult.SymbolsDir, fields); err != nil { + if err := task.storeArtefacts(ctx, action.ActionID, idFiles, encodeResult.SymbolsDir, encodeResult.Layout, fields); err != nil { return err } if err := task.emitArtefactsStored(ctx, fields, encodeResult.Layout, send); err != nil { diff --git a/supernode/status/metrics.go b/supernode/status/metrics.go index 3263ded3..2184808d 100644 --- a/supernode/status/metrics.go +++ b/supernode/status/metrics.go @@ -18,8 +18,8 @@ import ( // Rationale: // - Keep the adjustment in exactly one place (the status metrics source) so all // downstream consumers remain consistent. -// - Apply it to both total and free to preserve internal consistency between -// total/free/usage%. +// - Apply it to total only to match the expected decimal-GB figure while +// leaving free as reported by the runtime. const diskSizeAdjustFactor = 1.1 func adjustDiskBytes(value uint64) uint64 { @@ -87,10 +87,8 @@ func (m *MetricsCollector) CollectStorageMetrics(ctx context.Context, paths []st continue } totalBytes := adjustDiskBytes(usage.Total) - availableBytes := adjustDiskBytes(usage.Free) - if availableBytes > totalBytes { - availableBytes = totalBytes - } + availableBytes := usage.Free + availableBytes = min(availableBytes, totalBytes) usedBytes := totalBytes - availableBytes usagePercent := 0.0 if totalBytes > 0 { From 74e0dcf31615684edadbe33d0a1079d8a15e15b2 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Thu, 1 Jan 2026 19:39:36 +0500 Subject: [PATCH 2/4] perf: optimize cascade download retrieval --- supernode/cascade/download.go | 92 ++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 8 deletions(-) diff --git a/supernode/cascade/download.go b/supernode/cascade/download.go index a1851ea8..2a975d6e 100644 --- a/supernode/cascade/download.go +++ b/supernode/cascade/download.go @@ -19,7 +19,22 @@ import ( "github.com/LumeraProtocol/supernode/v2/supernode/adaptors" ) -const targetRequiredPercent = 17 +// Step 0: Download reconstruction parameters. +// +// targetRequiredPercent is the initial minimum symbol coverage we require before the first decode attempt. +// Empirically, decode tends to succeed around ~17–18% for single-block cascades; we use a small buffer. +// Decode may require more than this, so the algorithm can progressively fetch more symbols. +const targetRequiredPercent = 20 + +// retrieveKeyFanoutFactor caps how many candidate keys we pass to BatchRetrieveStream relative to `need`. +// The DHT implementation does per-key preprocessing (e.g., base58 decode, routing/contact setup), so +// passing huge key lists when `need` is small can waste CPU and allocations. +// The fanout factor + minimum are chosen to keep a high probability of satisfying `need` even if some +// keys are unavailable. +const ( + retrieveKeyFanoutFactor = 50 + retrieveKeyMinCandidates = 5000 +) type DownloadRequest struct { ActionID string @@ -41,6 +56,7 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download fields := logtrace.Fields{logtrace.FieldMethod: "Download", logtrace.FieldRequest: req} logtrace.Info(ctx, "download: request", fields) + // Step 1: Fetch action. actionDetails, err := task.LumeraClient.GetAction(ctx, req.ActionID) if err != nil { fields[logtrace.FieldError] = err.Error() @@ -51,6 +67,7 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download return err } + // Step 2: Validate action state. if actionDetails.GetAction().State != actiontypes.ActionStateDone { err = errors.New("action is not in a valid state") fields[logtrace.FieldError] = "action state is not done yet" @@ -59,6 +76,7 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download } logtrace.Info(ctx, "download: action state ok", fields) + // Step 3: Decode cascade metadata. metadata, err := cascadekit.UnmarshalCascadeMetadata(actionDetails.GetAction().Metadata) if err != nil { fields[logtrace.FieldError] = err.Error() @@ -69,12 +87,13 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download return err } + // Step 4: Verify download signature for private cascades. if !metadata.Public { if req.Signature == "" { fields[logtrace.FieldError] = "missing signature for private download" return task.wrapErr(ctx, "private cascade requires a download signature", nil, fields) } - if err := task.VerifyDownloadSignature(ctx, req.ActionID, req.Signature); err != nil { + if err := task.verifyDownloadSignatureWithCreator(ctx, req.ActionID, req.Signature, actionDetails.GetAction().Creator); err != nil { fields[logtrace.FieldError] = err.Error() return task.wrapErr(ctx, "failed to verify download signature", err, fields) } @@ -83,15 +102,18 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download logtrace.Info(ctx, "download: public cascade (no signature)", fields) } + // Step 5: Emit retrieval-start event. if err := task.streamDownloadEvent(ctx, SupernodeEventTypeNetworkRetrieveStarted, "Network retrieval started", "", "", send); err != nil { return err } + // Step 6: Resolve layout + retrieve symbols + decode + verify hash. logtrace.Info(ctx, "download: network retrieval start", logtrace.Fields{logtrace.FieldActionID: actionDetails.GetAction().ActionID}) filePath, tmpDir, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields, send) if err != nil { fields[logtrace.FieldError] = err.Error() if tmpDir != "" { + // Step 7: Clean up temporary workspace on failure. if cerr := task.CleanupDownload(ctx, tmpDir); cerr != nil { logtrace.Warn(ctx, "cleanup of tmp dir after error failed", logtrace.Fields{"tmp_dir": tmpDir, logtrace.FieldError: cerr.Error()}) } @@ -102,6 +124,8 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download return task.wrapErr(ctx, "failed to download artifacts", err, fields) } logtrace.Debug(ctx, "File reconstructed and hash verified", fields) + + // Step 8: Emit decode-completed event. if err := task.streamDownloadEvent(ctx, SupernodeEventTypeDecodeCompleted, "Decode completed", filePath, tmpDir, send); err != nil { if tmpDir != "" { if cerr := task.CleanupDownload(ctx, tmpDir); cerr != nil { @@ -115,6 +139,7 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download } func (task *CascadeRegistrationTask) CleanupDownload(ctx context.Context, tmpDir string) error { + // Step 0: Best-effort cleanup of any temporary workspace created during download/decode. if tmpDir == "" { return nil } @@ -128,12 +153,18 @@ func (task *CascadeRegistrationTask) VerifyDownloadSignature(ctx context.Context if signature == "" { return errors.New("signature required") } - // Fetch the action to get the creator address for verification + // Fetch the action to get the creator address for verification. act, err := task.LumeraClient.GetAction(ctx, actionID) if err != nil { return fmt.Errorf("get action for signature verification: %w", err) } - creator := act.GetAction().Creator + return task.verifyDownloadSignatureWithCreator(ctx, actionID, signature, act.GetAction().Creator) +} + +func (task *CascadeRegistrationTask) verifyDownloadSignatureWithCreator(ctx context.Context, actionID, signature, creator string) error { + if signature == "" || creator == "" { + return errors.New("signature and creator are required") + } if err := cascadekit.VerifyStringRawOrADR36(actionID, signature, creator, func(data, sig []byte) error { return task.LumeraClient.Verify(ctx, creator, data, sig) }); err != nil { @@ -154,7 +185,7 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti var layoutFetchMS, layoutDecodeMS int64 var layoutAttempts int - // Retrieve via index IDs + // Step 1: Retrieve index file(s) via index IDs. if len(metadata.RqIdsIds) > 0 { for _, indexID := range metadata.RqIdsIds { iStart := time.Now() @@ -172,6 +203,8 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti } var netMS, decMS int64 var attempts int + + // Step 2: Resolve and decode layout referenced by the index. layout, netMS, decMS, attempts, err = task.retrieveLayoutFromIndex(ctx, indexData, fields) if err != nil { logtrace.Warn(ctx, "failed to retrieve layout from index", logtrace.Fields{"index_id": indexID, logtrace.FieldError: err.Error(), "attempts": attempts}) @@ -188,6 +221,8 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti return "", "", errors.New("no symbols found in RQ metadata") } fields["layout_fetch_ms"], fields["layout_decode_ms"], fields["layout_attempts"] = layoutFetchMS, layoutDecodeMS, layoutAttempts + + // Step 3: Reconstruct file from layout + symbols; verify hash. return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID, send) } @@ -198,12 +233,14 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( actionID string, send func(resp *DownloadResponse) error, ) (string, string, error) { - fields := logtrace.Fields{logtrace.FieldActionID: actionID} + // Step 1: Build symbol candidate list from the layout. // Prefer layout order for single-block Cascade (reduces "missing open" churn vs lexicographic sorts). var allSymbols []string + writeBlockID := -1 if len(layout.Blocks) == 1 { + writeBlockID = layout.Blocks[0].BlockID allSymbols = make([]string, 0, len(layout.Blocks[0].Symbols)) seen := make(map[string]struct{}, len(layout.Blocks[0].Symbols)) for _, s := range layout.Blocks[0].Symbols { @@ -235,6 +272,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( totalSymbols := len(allSymbols) fields["totalSymbols"] = totalSymbols + // Step 2: Compute initial required symbol count (ceil(targetRequiredPercent%)). targetRequiredCount := (totalSymbols*targetRequiredPercent + 99) / 100 if targetRequiredCount < 1 && totalSymbols > 0 { targetRequiredCount = 1 @@ -249,6 +287,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( return "", "", errors.New("no symbols present in layout") } + // Step 3: Prepare RQ workspace once; stream symbols into it across attempts. // Prepare RQ workspace once; stream symbols directly into it, and retry decode by fetching more. logtrace.Info(ctx, "download: prepare RQ workspace", logtrace.Fields{"action_id": actionID}) _, writeSymbol, cleanup, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout) @@ -267,7 +306,10 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( var writtenSet sync.Map // base58 symbol id -> struct{} var written int32 onSymbol := func(symbolID string, data []byte) error { - if _, err := writeSymbol(-1, symbolID, data); err != nil { + // Write each retrieved symbol into the prepared workspace. + // Count unique symbol IDs only, so progress reflects real coverage (not duplicates). + // In the single-block case, write directly to the known block to avoid per-symbol block lookup. + if _, err := writeSymbol(writeBlockID, symbolID, data); err != nil { return err } if _, loaded := writtenSet.LoadOrStore(symbolID, struct{}{}); !loaded { @@ -287,9 +329,12 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( var decodeInfo adaptors.DecodeResult var lastDecodeErr error for attempt := 1; attempt <= maxDecodeAttempts; attempt++ { + // Step 4.1: Determine how many unique symbols we have vs the current target. have := int(atomic.LoadInt32(&written)) need := reqCount - have if need > 0 { + // Step 4.2: Fetch exactly the delta (`need`) from not-yet-written candidate keys. + // Select a candidate slice to spread load across the keyspace; fall back to all keys if needed. // Start with a smaller candidate set; expand if we don't have enough remaining keys. candidates := allSymbols if want := reqCount * 2; want < len(allSymbols) { @@ -299,7 +344,15 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( candidates = allSymbols[start : start+candidateCount] } - remainingKeys := make([]string, 0, len(candidates)) + // Cap how many keys we pass to the DHT when `need` is small, and stop scanning once reached. + keyCap := need * retrieveKeyFanoutFactor + if keyCap < retrieveKeyMinCandidates { + keyCap = retrieveKeyMinCandidates + } + if keyCap < need { + keyCap = need + } + remainingKeys := make([]string, 0, min(keyCap, len(candidates))) for _, k := range candidates { if k == "" { continue @@ -308,6 +361,9 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( continue } remainingKeys = append(remainingKeys, k) + if len(remainingKeys) >= keyCap { + break + } } if len(remainingKeys) < need && len(candidates) < len(allSymbols) { @@ -321,6 +377,9 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( continue } remainingKeys = append(remainingKeys, k) + if len(remainingKeys) >= keyCap { + break + } } } @@ -347,7 +406,22 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( }) } + // Don't spend CPU attempting decode until we have at least the requested count for this attempt. + // Step 4.3: Skip decode until `have >= reqCount`. + have = int(atomic.LoadInt32(&written)) + if have < reqCount { + lastDecodeErr = fmt.Errorf("insufficient symbols to attempt decode: have %d want %d", have, reqCount) + logtrace.Warn(ctx, "download: skip decode; insufficient symbols", logtrace.Fields{ + "action_id": actionID, + "attempt": attempt, + "received": have, + "target": reqCount, + }) + continue + } + decodeStart := time.Now() + // Step 4.4: Attempt decode from the prepared workspace. logtrace.Info(ctx, "download: decode start", logtrace.Fields{ "action_id": actionID, "attempt": attempt, @@ -394,6 +468,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( return "", ws.SymbolsDir, errors.New("decode failed after retries") } + // Step 5: Verify reconstructed file hash matches action metadata. // 3) Verify hash fileHash, herr := utils.Blake3HashFile(decodeInfo.FilePath) if herr != nil { @@ -415,6 +490,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( logtrace.Debug(ctx, "request data-hash has been matched with the action data-hash", fields) logtrace.Info(ctx, "download: file verified", fields) + // Step 6: Emit final download event and return. // Event info := map[string]interface{}{"action_id": actionID, "found_symbols": atomic.LoadInt32(&written), "target_percent": targetRequiredPercent} if b, err := json.Marshal(info); err == nil { From 391ffb065190a65949d68f2b3ea310155a49b855 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Fri, 2 Jan 2026 02:07:43 +0500 Subject: [PATCH 3/4] p2p/kademlia: cut allocs in closest-node selection --- p2p/kademlia/dht.go | 10 +- p2p/kademlia/hashtable.go | 213 ++++++++++++++++++++++++++++++++------ 2 files changed, 189 insertions(+), 34 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 70af36fb..0fdc5ef3 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -791,6 +791,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, } ignoreList := s.ignorelist.ToNodeList() + ignoredSet := hashedIDSetFromNodes(ignoreList) globalClosestContacts := make(map[string]*NodeList) var closestMu sync.RWMutex @@ -800,7 +801,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, continue } - top6 := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil) + top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil) closestMu.Lock() globalClosestContacts[keys[i]] = top6 closestMu.Unlock() @@ -1144,6 +1145,7 @@ func (s *DHT) BatchRetrieveStream( delete(knownNodes, string(self.ID)) ignoreList := s.ignorelist.ToNodeList() + ignoredSet := hashedIDSetFromNodes(ignoreList) globalClosestContacts := make(map[string]*NodeList) var closestMu sync.RWMutex @@ -1152,7 +1154,7 @@ func (s *DHT) BatchRetrieveStream( if _, found := resSeen.Load(hexKeys[i]); found { continue } - topK := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil) + topK := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil) closestMu.Lock() globalClosestContacts[keys[i]] = topK closestMu.Unlock() @@ -2134,6 +2136,8 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) hashes := make([][]byte, len(values)) + ignoreList := s.ignorelist.ToNodeList() + ignoredSet := hashedIDSetFromNodes(ignoreList) { f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"} @@ -2145,7 +2149,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i for i := 0; i < len(values); i++ { target, _ := utils.Blake3Hash(values[i]) hashes[i] = target - top6 := s.ht.closestContactsWithIncludingNode(Alpha, target, s.ignorelist.ToNodeList(), nil) + top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil) globalClosestContacts[base58.Encode(target)] = top6 // log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin") diff --git a/p2p/kademlia/hashtable.go b/p2p/kademlia/hashtable.go index 2fa5eaa5..5e7a7bf5 100644 --- a/p2p/kademlia/hashtable.go +++ b/p2p/kademlia/hashtable.go @@ -203,6 +203,95 @@ func ensureHashedTarget(target []byte) []byte { return target } +type hashedIDKey [32]byte + +func hashedKeyFromNode(node *Node) (hashedIDKey, bool) { + var k hashedIDKey + if node == nil { + return k, false + } + + hashedID := node.HashedID + if len(hashedID) != 32 { + if len(node.ID) == 0 { + return k, false + } + h, err := utils.Blake3Hash(node.ID) + if err != nil || len(h) != 32 { + return k, false + } + hashedID = h + } + + copy(k[:], hashedID) + return k, true +} + +type hashedIDSet map[hashedIDKey]struct{} + +func hashedIDSetFromNodes(nodes []*Node) hashedIDSet { + if len(nodes) == 0 { + return nil + } + set := make(hashedIDSet, len(nodes)) + for _, n := range nodes { + if k, ok := hashedKeyFromNode(n); ok { + set[k] = struct{}{} + } + } + return set +} + +func (s hashedIDSet) contains(hashedID []byte) bool { + if len(s) == 0 || len(hashedID) != 32 { + return false + } + var k hashedIDKey + copy(k[:], hashedID) + _, ok := s[k] + return ok +} + +func hashedKeysFromNodes(nodes []*Node) []hashedIDKey { + if len(nodes) == 0 { + return nil + } + keys := make([]hashedIDKey, 0, len(nodes)) + for _, n := range nodes { + if k, ok := hashedKeyFromNode(n); ok { + keys = append(keys, k) + } + } + return keys +} + +func hasHashedID(hashedID []byte, keys []hashedIDKey) bool { + if len(keys) == 0 || len(hashedID) != 32 { + return false + } + for i := range keys { + if bytes.Equal(hashedID, keys[i][:]) { + return true + } + } + return false +} + +func hasNodeWithHashedID(nodes []*Node, hashedID []byte) bool { + if len(hashedID) != 32 { + return false + } + for _, n := range nodes { + if n == nil || len(n.HashedID) != 32 { + continue + } + if bytes.Equal(n.HashedID, hashedID) { + return true + } + } + return false +} + // hasBucketNode: compare on HashedID func (ht *HashTable) hasBucketNode(bucket int, hashedID []byte) bool { ht.mutex.RLock() @@ -250,26 +339,31 @@ func (ht *HashTable) RemoveNode(index int, hashedID []byte) bool { // closestContacts: use HashedID in ignored-map func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Node) (*NodeList, int) { - ht.mutex.RLock() - defer ht.mutex.RUnlock() - hashedTarget := ensureHashedTarget(target) + ignoredKeys := hashedKeysFromNodes(ignoredNodes) - ignoredMap := make(map[string]bool, len(ignoredNodes)) - for _, node := range ignoredNodes { - ignoredMap[string(node.HashedID)] = true + ht.mutex.RLock() + total := 0 + for _, bucket := range ht.routeTable { + total += len(bucket) } - - nl := &NodeList{Comparator: hashedTarget} + nodes := make([]*Node, 0, total) counter := 0 for _, bucket := range ht.routeTable { for _, node := range bucket { counter++ - if !ignoredMap[string(node.HashedID)] { - nl.AddNodes([]*Node{node}) + if node == nil { + continue + } + if hasHashedID(node.HashedID, ignoredKeys) { + continue } + nodes = append(nodes, node) } } + ht.mutex.RUnlock() + + nl := &NodeList{Comparator: hashedTarget, Nodes: nodes} nl.Sort() nl.TopN(num) return nl, counter @@ -277,58 +371,115 @@ func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Nod // keep an alias for old callers; fix typo in new name func (ht *HashTable) closestContactsWithIncludingNode(num int, target []byte, ignoredNodes []*Node, includeNode *Node) *NodeList { + hashedTarget := ensureHashedTarget(target) + ignoredKeys := hashedKeysFromNodes(ignoredNodes) + ht.mutex.RLock() - defer ht.mutex.RUnlock() + total := 0 + for _, bucket := range ht.routeTable { + total += len(bucket) + } + nodes := make([]*Node, 0, total+1) + for _, bucket := range ht.routeTable { + for _, node := range bucket { + if node == nil { + continue + } + if hasHashedID(node.HashedID, ignoredKeys) { + continue + } + nodes = append(nodes, node) + } + } + ht.mutex.RUnlock() - hashedTarget := ensureHashedTarget(target) - ignoredMap := make(map[string]bool, len(ignoredNodes)) - for _, node := range ignoredNodes { - ignoredMap[string(node.HashedID)] = true + if includeNode != nil { + includeNode.SetHashedID() + if !hasNodeWithHashedID(nodes, includeNode.HashedID) { + nodes = append(nodes, includeNode) + } } - nl := &NodeList{Comparator: hashedTarget} + nl := &NodeList{Comparator: hashedTarget, Nodes: nodes} + nl.Sort() + nl.TopN(num) + return nl +} + +// closestContactsWithIncludingNodeWithIgnoredSet is an optimized variant for batch callers that +// can precompute the ignored hashed-ID set once and reuse it across many lookups. +func (ht *HashTable) closestContactsWithIncludingNodeWithIgnoredSet(num int, target []byte, ignoredSet hashedIDSet, includeNode *Node) *NodeList { + hashedTarget := ensureHashedTarget(target) + + ht.mutex.RLock() + total := 0 + for _, bucket := range ht.routeTable { + total += len(bucket) + } + nodes := make([]*Node, 0, total+1) for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[string(node.HashedID)] { - nl.AddNodes([]*Node{node}) + if node == nil { + continue } + if ignoredSet.contains(node.HashedID) { + continue + } + nodes = append(nodes, node) } } + ht.mutex.RUnlock() + if includeNode != nil { - nl.AddNodes([]*Node{includeNode}) + includeNode.SetHashedID() + if !hasNodeWithHashedID(nodes, includeNode.HashedID) { + nodes = append(nodes, includeNode) + } } + + nl := &NodeList{Comparator: hashedTarget, Nodes: nodes} nl.Sort() nl.TopN(num) return nl } func (ht *HashTable) closestContactsWithIncludingNodeList(num int, target []byte, ignoredNodes []*Node, nodesToInclude []*Node) *NodeList { - ht.mutex.RLock() - defer ht.mutex.RUnlock() - hashedTarget := ensureHashedTarget(target) - ignoredMap := make(map[string]bool, len(ignoredNodes)) - for _, node := range ignoredNodes { - ignoredMap[string(node.HashedID)] = true - } + ignoredKeys := hashedKeysFromNodes(ignoredNodes) - nl := &NodeList{Comparator: hashedTarget} + ht.mutex.RLock() + total := 0 + for _, bucket := range ht.routeTable { + total += len(bucket) + } + nodes := make([]*Node, 0, total+len(nodesToInclude)) for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[string(node.HashedID)] { - nl.AddNodes([]*Node{node}) + if node == nil { + continue } + if hasHashedID(node.HashedID, ignoredKeys) { + continue + } + nodes = append(nodes, node) } } + ht.mutex.RUnlock() if len(nodesToInclude) > 0 { for _, node := range nodesToInclude { - if !nl.exists(node) { - nl.AddNodes([]*Node{node}) + if node == nil { + continue + } + node.SetHashedID() + if hasNodeWithHashedID(nodes, node.HashedID) { + continue } + nodes = append(nodes, node) } } + nl := &NodeList{Comparator: hashedTarget, Nodes: nodes} nl.Sort() nl.TopN(num) return nl From 6090df0456b6f369a84944577d50b543d13abb7d Mon Sep 17 00:00:00 2001 From: j-rafique Date: Fri, 2 Jan 2026 17:02:48 +0500 Subject: [PATCH 4/4] sqlite: wait for original StoreBatch commit --- p2p/kademlia/store/sqlite/sqlite.go | 33 +++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index 841636c7..ee0c5e5a 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -39,6 +39,9 @@ type Job struct { ReqID string DataType int IsOriginal bool + // Done is an optional completion signal for callers that need to know when the DB write finished. + // The sqlite worker will attempt a non-blocking send to avoid stalling the write loop. + Done chan error } // Worker represents the worker that executes the job @@ -309,7 +312,16 @@ func (s *Store) start(ctx context.Context) { for { select { case job := <-s.worker.JobQueue: - if err := s.performJob(job); err != nil { + err := s.performJob(job) + if job.Done != nil { + // Never block the DB worker on a completion signal. + // Callers that need the result should provide a buffered channel and wait on it. + select { + case job.Done <- err: + default: + } + } + if err != nil { logtrace.Error(ctx, "Failed to perform job", logtrace.Fields{logtrace.FieldError: err.Error()}) } case <-s.worker.quit: @@ -358,11 +370,18 @@ func (s *Store) Store(ctx context.Context, key []byte, value []byte, datatype in // StoreBatch stores a batch of key/value pairs for the queries node with the replication func (s *Store) StoreBatch(ctx context.Context, values [][]byte, datatype int, isOriginal bool) error { + var done chan error + if isOriginal { + // For original/local batches, "success" must mean "durably stored": + // first-pass callers may delete the source symbol files right after StoreBatch returns. + done = make(chan error, 1) + } job := Job{ JobType: "BatchInsert", Values: values, DataType: datatype, IsOriginal: isOriginal, + Done: done, } if val := ctx.Value(logtrace.CorrelationIDKey); val != nil { @@ -378,7 +397,17 @@ func (s *Store) StoreBatch(ctx context.Context, values [][]byte, datatype int, i case s.worker.JobQueue <- job: } - return nil + if done == nil { + return nil + } + + // Wait for the DB worker so original/local data is committed before returning to the caller. + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-done: + return err + } } // Delete a key/value pair from the store