Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24.0

require (
github.com/LumeraProtocol/lumera v0.4.3
github.com/LumeraProtocol/rq-go v0.2.1
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cosmos/btcutil v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/LumeraProtocol/lumera v0.4.3 h1:q/FuT+JOLIpYdlunczRUr6K85r9Sn0lKvGltSrj4r6s=
github.com/LumeraProtocol/lumera v0.4.3/go.mod h1:MRqVY+f8edEBkDvpr4z2nJpglp3Qj1OUvjeWvrvIUSM=
github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4=
github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down
17 changes: 4 additions & 13 deletions p2p/kademlia/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
}

if strings.Contains(extP2P, "0.0.0.0") {
fmt.Println("skippping node")
return nil, errors.New("invalid address")
}

Expand Down Expand Up @@ -134,18 +133,14 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
}

if latestIP == "" {
log.P2P().WithContext(ctx).
WithField("supernode", supernode.SupernodeAccount).
Warn("No valid IP address found for supernode")
log.P2P().WithContext(ctx).WithField("supernode", supernode.SupernodeAccount).Warn("No valid IP address found for supernode")
continue
}

// Parse the node from the IP address
node, err := s.parseNode(latestIP, selfAddress)
if err != nil {
log.P2P().WithContext(ctx).WithError(err).
WithField("address", latestIP).
WithField("supernode", supernode.SupernodeAccount).
log.P2P().WithContext(ctx).WithError(err).WithField("address", latestIP).WithField("supernode", supernode.SupernodeAccount).
Warn("Skip Bad Bootstrap Address")
continue
}
Expand All @@ -157,7 +152,6 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string

// Convert the map to a slice
for _, node := range mapNodes {
fmt.Println("node adding", node.String(), "hashed id", string(node.HashedID))
boostrapNodes = append(boostrapNodes, node)
}
}
Expand All @@ -168,11 +162,8 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
}

for _, node := range boostrapNodes {
log.WithContext(ctx).WithFields(log.Fields{
"bootstap_ip": node.IP,
"bootstrap_port": node.Port,
"node_id": string(node.ID),
}).Info("adding p2p bootstrap node")
log.WithContext(ctx).WithFields(log.Fields{"bootstap_ip": node.IP, "bootstrap_port": node.Port, "node_id": string(node.ID)}).
Info("adding p2p bootstrap node")
}

s.options.BootstrapNodes = append(s.options.BootstrapNodes, boostrapNodes...)
Expand Down
6 changes: 1 addition & 5 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *DHT) getExternalIP() (string, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
// if listen IP is localhost - then return itself
if s.ht.self.IP == "127.0.0.1" || s.ht.self.IP == "localhost" {
if s.ht.self.IP == "127.0.0.1" || s.ht.self.IP == "localhost" || s.ht.self.IP == "0.0.0.0" {
return s.ht.self.IP, nil
}

Expand Down Expand Up @@ -1042,10 +1042,8 @@ func (s *DHT) sendStoreData(ctx context.Context, n *Node, request *StoreDataRequ

// add a node into the appropriate k bucket, return the removed node if it's full
func (s *DHT) addNode(ctx context.Context, node *Node) *Node {
fmt.Println("add node called", node.String())
// ensure this is not itself address
if bytes.Equal(node.ID, s.ht.self.ID) {
fmt.Println("self node skipped")
log.P2P().WithContext(ctx).Debug("trying to add itself")
return nil
}
Expand Down Expand Up @@ -1405,8 +1403,6 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
var wg sync.WaitGroup

for key, node := range nodes {

fmt.Println("node in batch store network")
log.WithContext(ctx).WithField("Port#", node.String()).Info("node")
if s.ignorelist.Banned(node) {
log.WithContext(ctx).WithField("node", node.String()).Debug("Ignoring banned node in batch store network call")
Expand Down
80 changes: 40 additions & 40 deletions p2p/kademlia/rq_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kademlia
import (
"context"
"fmt"
"sort"
"time"

"github.com/LumeraProtocol/supernode/pkg/log"
Expand Down Expand Up @@ -48,64 +49,63 @@ func (s *DHT) storeSymbols(ctx context.Context) error {
return nil
}

// ---------------------------------------------------------------------
// 1. Scan dir → send ALL symbols (no sampling)
// ---------------------------------------------------------------------
func (s *DHT) scanDirAndStoreSymbols(ctx context.Context, dir, txid string) error {
keys, err := utils.ReadDirFilenames(dir)
// Collect relative file paths like "block_0/foo.sym"
keySet, err := utils.ReadDirFilenames(dir)
if err != nil {
return fmt.Errorf("read dir filenames: %w", err)
}

// Iterate over sorted keys in batches
batchKeys := make(map[string][]byte)
count := 0

log.WithContext(ctx).WithField("total-count", len(keys)).WithField("txid", txid).WithField("dir", dir).Info("read dir & found keys")
for key := range keys {
batchKeys[key] = nil
count++
if count%loadSymbolsBatchSize == 0 {
if err := s.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil {
return err
}
batchKeys = make(map[string][]byte) // Reset batchKeys after storing
}
// Turn the set into a sorted slice for deterministic batching
keys := make([]string, 0, len(keySet))
for k := range keySet {
keys = append(keys, k)
}

// Store any remaining symbols in the last batch
if len(batchKeys) > 0 {
if err := s.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil {
return fmt.Errorf("scanDirAndStoreSymbols: store symbols in p2p: %w", err)
sort.Strings(keys)

log.WithContext(ctx).
WithField("txid", txid).
WithField("dir", dir).
WithField("total", len(keys)).
Info("p2p-worker: storing ALL RaptorQ symbols")

// Batch-flush at loadSymbolsBatchSize
for start := 0; start < len(keys); {
end := start + loadSymbolsBatchSize
if end > len(keys) {
end = len(keys)
}

log.WithContext(ctx).WithField("dir", dir).WithField("txid", txid).Info("rq_symbols worker: stored raptorQ symbols in p2p")
if err := s.storeSymbolsInP2P(ctx, dir, keys[start:end]); err != nil {
return err
}
start = end
}

// Mark this directory as completed in rqstore
if err := s.rqstore.SetIsCompleted(txid); err != nil {
return fmt.Errorf("error updating first batch stored flag in rq DB: %w", err)
return fmt.Errorf("set is-completed: %w", err)
}

return nil
}
func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, batchKeys map[string][]byte) error {
loadedSymbols, err := utils.LoadSymbols(dir, batchKeys)

// ---------------------------------------------------------------------
// 2. Load → StoreBatch → Delete for a slice of keys
// ---------------------------------------------------------------------
func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, keys []string) error {
loaded, err := utils.LoadSymbols(dir, keys)
if err != nil {
return fmt.Errorf("p2p worker: load batch symbols from db: %w", err)
}
// Prepare batch for P2P storage
result := make([][]byte, len(loadedSymbols))
i := 0
for _, value := range loadedSymbols {
result[i] = value
i++
return fmt.Errorf("load symbols: %w", err)
}

// Store the loaded symbols in P2P
if err := s.StoreBatch(ctx, result, 1, dir); err != nil {
return fmt.Errorf("p2p worker: store batch raptorq symbols in p2p: %w", err)
if err := s.StoreBatch(ctx, loaded, 1, dir); err != nil {
return fmt.Errorf("p2p store batch: %w", err)
}

if err := utils.DeleteSymbols(ctx, dir, loadedSymbols); err != nil {
return fmt.Errorf("p2p worker: delete batch symbols from db: %w", err)
if err := utils.DeleteSymbols(ctx, dir, keys); err != nil {
return fmt.Errorf("delete symbols: %w", err)
}

return nil
}
1 change: 0 additions & 1 deletion p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (s *p2p) configure(ctx context.Context) error {
if s.config.BootstrapNodes != "" && s.config.ExternalIP != "" {
kadOpts.ExternalIP = s.config.ExternalIP
}

// new a kademlia distributed hash table
dht, err := kademlia.NewDHT(ctx, s.store, s.metaStore, kadOpts, s.rqstore)

Expand Down
39 changes: 39 additions & 0 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:generate mockgen -destination=codec_mock.go -package=codec -source=codec.go

package codec

import (
"context"
)

// EncodeResponse represents the response of the encode request.
type EncodeResponse struct {
Metadata Layout
SymbolsDir string
}

type Layout struct {
Blocks []Block `json:"blocks"`
}

// Block is the schema for each entry in the “blocks” array.
type Block struct {
BlockID int `json:"block_id"`
EncoderParameters []int `json:"encoder_parameters"`
OriginalOffset int64 `json:"original_offset"`
Size int64 `json:"size"`
Symbols []string `json:"symbols"`
Hash string `json:"hash"`
}

// EncodeRequest represents the request to encode a file.
type EncodeRequest struct {
TaskID string
Data []byte
}

// RaptorQ contains methods for request services from RaptorQ service.
type Codec interface {
// Encode a file
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
}
81 changes: 81 additions & 0 deletions pkg/codec/raptorq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package codec

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

raptorq "github.com/LumeraProtocol/rq-go"
)

type raptorQ struct {
symbolsBaseDir string
}

func NewRaptorQCodec(dir string) Codec {
return &raptorQ{
symbolsBaseDir: dir,
}

}

func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) {
/* ---------- 1. initialise RaptorQ processor ---------- */

processor, err := raptorq.NewDefaultRaptorQProcessor()
if err != nil {
return EncodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err)
}
defer processor.Free()

/* ---------- 2. persist req.Data to a temp file ---------- */

tmp, err := os.CreateTemp("", "rq-encode-*")
if err != nil {
return EncodeResponse{}, fmt.Errorf("create temp file: %w", err)
}
tmpPath := tmp.Name()
if _, err := tmp.Write(req.Data); err != nil {
tmp.Close()
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("write temp file: %w", err)
}
if err := tmp.Close(); err != nil { // sync to disk
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("close temp file: %w", err)
}

/* ---------- 3. run the encoder ---------- */

blockSize := processor.GetRecommendedBlockSize(uint64(len(req.Data)))

symbolsDir := filepath.Join(rq.symbolsBaseDir, req.TaskID)
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err)
}

resp, err := processor.EncodeFile(tmpPath, symbolsDir, blockSize)
if err != nil {
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("raptorq encode: %w", err)
}

/* we no longer need the temp file */
_ = os.Remove(tmpPath)

/* ---------- 4. read the layout JSON ---------- */
layoutData, err := os.ReadFile(resp.LayoutFilePath)
if err != nil {
return EncodeResponse{}, fmt.Errorf("read layout %s: %w", resp.LayoutFilePath, err)
}

var encodeResp EncodeResponse
if err := json.Unmarshal(layoutData, &resp); err != nil {
return EncodeResponse{}, fmt.Errorf("unmarshal layout: %w", err)
}

return encodeResp, nil
}
33 changes: 0 additions & 33 deletions pkg/log/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ package log

import (
"context"
"io"
"net"
"net/http"

"github.com/LumeraProtocol/supernode/pkg/errors"
"github.com/LumeraProtocol/supernode/pkg/log/hooks"
)

Expand All @@ -32,10 +28,6 @@ const (

// ContextWithPrefix returns a new context with PrefixKey value.
func ContextWithPrefix(ctx context.Context, prefix string) context.Context {
ip, err := GetExternalIPAddress()
if err != nil {
WithContext(ctx).WithError(err).Error("unable to fetch server ip")
}

ctx = ContextWithServer(ctx, ip)

Expand All @@ -53,28 +45,3 @@ func init() {
return msg, fields
}))
}

// GetExternalIPAddress returns external IP address
func GetExternalIPAddress() (externalIP string, err error) {
if ip != "" {
return ip, nil
}

resp, err := http.Get("http://ipinfo.io/ip")
if err != nil {
return "", err
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

if net.ParseIP(string(body)) == nil {
return "", errors.Errorf("invalid IP response from %s", "ipconf.ip")
}

return string(body), nil
}
Loading