From dbf377e68888c0c46cf939f399242bde426af122 Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Fri, 2 May 2025 05:50:12 +0500 Subject: [PATCH 1/2] use rq lib, refactor code --- go.mod | 1 + go.sum | 2 + p2p/kademlia/bootstrap.go | 17 +- p2p/kademlia/dht.go | 6 +- p2p/kademlia/rq_symbols.go | 80 ++-- p2p/p2p.go | 1 - pkg/codec/codec.go | 39 ++ pkg/codec/raptorq.go | 83 +++++ pkg/log/context.go | 33 -- .../credentials/alts/handshake/handshake.go | 11 +- pkg/raptorq/client.go | 46 --- pkg/raptorq/config.go | 40 -- pkg/raptorq/connection.go | 24 -- pkg/raptorq/decode.go | 46 --- pkg/raptorq/encode.go | 48 --- pkg/raptorq/encode_metadata.go | 49 --- pkg/raptorq/gen_rq_identifier_files.go | 68 ---- pkg/raptorq/helper.go | 180 --------- pkg/raptorq/interfaces.go | 40 -- pkg/raptorq/raptorq.go | 140 ------- pkg/raptorq/rq_mock.go | 186 ---------- pkg/raptorq/rq_server_client.go | 35 -- pkg/raptorq/valdate_rqids.go | 57 --- pkg/utils/utils.go | 113 +++--- supernode/cmd/start.go | 17 +- .../server/cascade/cascade_action_server.go | 8 +- supernode/services/cascade/metadata.go | 96 +++++ supernode/services/cascade/register.go | 257 +++++++++++++ supernode/services/cascade/service.go | 22 +- supernode/services/cascade/task.go | 16 +- supernode/services/cascade/upload.go | 195 ---------- supernode/services/common/reg_task_helper.go | 140 ------- supernode/services/common/service.go | 2 - supernode/services/common/status.go | 81 +--- supernode/services/common/status_test.go | 350 ------------------ supernode/services/common/storage_handler.go | 321 +++------------- .../securegrpc/secure_connection_test.go | 7 +- 37 files changed, 675 insertions(+), 2182 deletions(-) create mode 100644 pkg/codec/codec.go create mode 100644 pkg/codec/raptorq.go delete mode 100644 pkg/raptorq/client.go delete mode 100644 pkg/raptorq/config.go delete mode 100644 pkg/raptorq/connection.go delete mode 100644 pkg/raptorq/decode.go delete mode 100644 pkg/raptorq/encode.go delete mode 100644 pkg/raptorq/encode_metadata.go delete mode 100644 pkg/raptorq/gen_rq_identifier_files.go delete mode 100644 pkg/raptorq/helper.go delete mode 100644 pkg/raptorq/interfaces.go delete mode 100644 pkg/raptorq/raptorq.go delete mode 100644 pkg/raptorq/rq_mock.go delete mode 100644 pkg/raptorq/rq_server_client.go delete mode 100644 pkg/raptorq/valdate_rqids.go create mode 100644 supernode/services/cascade/metadata.go create mode 100644 supernode/services/cascade/register.go delete mode 100644 supernode/services/cascade/upload.go delete mode 100644 supernode/services/common/reg_task_helper.go delete mode 100644 supernode/services/common/status_test.go diff --git a/go.mod b/go.mod index 81f83008..46902fc3 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.0 require ( github.com/LumeraProtocol/lumera v0.4.3 + github.com/LumeraProtocol/rq-go v0.2.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cenkalti/backoff/v4 v4.3.0 github.com/cosmos/btcutil v1.0.5 diff --git a/go.sum b/go.sum index 2a421fba..ac50e7a8 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/LumeraProtocol/lumera v0.4.3 h1:q/FuT+JOLIpYdlunczRUr6K85r9Sn0lKvGltSrj4r6s= github.com/LumeraProtocol/lumera v0.4.3/go.mod h1:MRqVY+f8edEBkDvpr4z2nJpglp3Qj1OUvjeWvrvIUSM= +github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= +github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index ac13ea34..8d73a62e 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -32,7 +32,6 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { } if strings.Contains(extP2P, "0.0.0.0") { - fmt.Println("skippping node") return nil, errors.New("invalid address") } @@ -134,18 +133,14 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string } if latestIP == "" { - log.P2P().WithContext(ctx). - WithField("supernode", supernode.SupernodeAccount). - Warn("No valid IP address found for supernode") + log.P2P().WithContext(ctx).WithField("supernode", supernode.SupernodeAccount).Warn("No valid IP address found for supernode") continue } // Parse the node from the IP address node, err := s.parseNode(latestIP, selfAddress) if err != nil { - log.P2P().WithContext(ctx).WithError(err). - WithField("address", latestIP). - WithField("supernode", supernode.SupernodeAccount). + log.P2P().WithContext(ctx).WithError(err).WithField("address", latestIP).WithField("supernode", supernode.SupernodeAccount). Warn("Skip Bad Bootstrap Address") continue } @@ -157,7 +152,6 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string // Convert the map to a slice for _, node := range mapNodes { - fmt.Println("node adding", node.String(), "hashed id", string(node.HashedID)) boostrapNodes = append(boostrapNodes, node) } } @@ -168,11 +162,8 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string } for _, node := range boostrapNodes { - log.WithContext(ctx).WithFields(log.Fields{ - "bootstap_ip": node.IP, - "bootstrap_port": node.Port, - "node_id": string(node.ID), - }).Info("adding p2p bootstrap node") + log.WithContext(ctx).WithFields(log.Fields{"bootstap_ip": node.IP, "bootstrap_port": node.Port, "node_id": string(node.ID)}). + Info("adding p2p bootstrap node") } s.options.BootstrapNodes = append(s.options.BootstrapNodes, boostrapNodes...) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index aeb38ab9..d4e6e8d9 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -152,7 +152,7 @@ func (s *DHT) getExternalIP() (string, error) { s.mtx.Lock() defer s.mtx.Unlock() // if listen IP is localhost - then return itself - if s.ht.self.IP == "127.0.0.1" || s.ht.self.IP == "localhost" { + if s.ht.self.IP == "127.0.0.1" || s.ht.self.IP == "localhost" || s.ht.self.IP == "0.0.0.0" { return s.ht.self.IP, nil } @@ -1042,10 +1042,8 @@ func (s *DHT) sendStoreData(ctx context.Context, n *Node, request *StoreDataRequ // add a node into the appropriate k bucket, return the removed node if it's full func (s *DHT) addNode(ctx context.Context, node *Node) *Node { - fmt.Println("add node called", node.String()) // ensure this is not itself address if bytes.Equal(node.ID, s.ht.self.ID) { - fmt.Println("self node skipped") log.P2P().WithContext(ctx).Debug("trying to add itself") return nil } @@ -1405,8 +1403,6 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ var wg sync.WaitGroup for key, node := range nodes { - - fmt.Println("node in batch store network") log.WithContext(ctx).WithField("Port#", node.String()).Info("node") if s.ignorelist.Banned(node) { log.WithContext(ctx).WithField("node", node.String()).Debug("Ignoring banned node in batch store network call") diff --git a/p2p/kademlia/rq_symbols.go b/p2p/kademlia/rq_symbols.go index 93b8e570..ed37ad61 100644 --- a/p2p/kademlia/rq_symbols.go +++ b/p2p/kademlia/rq_symbols.go @@ -3,6 +3,7 @@ package kademlia import ( "context" "fmt" + "sort" "time" "github.com/LumeraProtocol/supernode/pkg/log" @@ -48,64 +49,63 @@ func (s *DHT) storeSymbols(ctx context.Context) error { return nil } +// --------------------------------------------------------------------- +// 1. Scan dir → send ALL symbols (no sampling) +// --------------------------------------------------------------------- func (s *DHT) scanDirAndStoreSymbols(ctx context.Context, dir, txid string) error { - keys, err := utils.ReadDirFilenames(dir) + // Collect relative file paths like "block_0/foo.sym" + keySet, err := utils.ReadDirFilenames(dir) if err != nil { return fmt.Errorf("read dir filenames: %w", err) } - // Iterate over sorted keys in batches - batchKeys := make(map[string][]byte) - count := 0 - - log.WithContext(ctx).WithField("total-count", len(keys)).WithField("txid", txid).WithField("dir", dir).Info("read dir & found keys") - for key := range keys { - batchKeys[key] = nil - count++ - if count%loadSymbolsBatchSize == 0 { - if err := s.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil { - return err - } - batchKeys = make(map[string][]byte) // Reset batchKeys after storing - } + // Turn the set into a sorted slice for deterministic batching + keys := make([]string, 0, len(keySet)) + for k := range keySet { + keys = append(keys, k) } - - // Store any remaining symbols in the last batch - if len(batchKeys) > 0 { - if err := s.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil { - return fmt.Errorf("scanDirAndStoreSymbols: store symbols in p2p: %w", err) + sort.Strings(keys) + + log.WithContext(ctx). + WithField("txid", txid). + WithField("dir", dir). + WithField("total", len(keys)). + Info("p2p-worker: storing ALL RaptorQ symbols") + + // Batch-flush at loadSymbolsBatchSize + for start := 0; start < len(keys); { + end := start + loadSymbolsBatchSize + if end > len(keys) { + end = len(keys) } - - log.WithContext(ctx).WithField("dir", dir).WithField("txid", txid).Info("rq_symbols worker: stored raptorQ symbols in p2p") + if err := s.storeSymbolsInP2P(ctx, dir, keys[start:end]); err != nil { + return err + } + start = end } + // Mark this directory as completed in rqstore if err := s.rqstore.SetIsCompleted(txid); err != nil { - return fmt.Errorf("error updating first batch stored flag in rq DB: %w", err) + return fmt.Errorf("set is-completed: %w", err) } - return nil } -func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, batchKeys map[string][]byte) error { - loadedSymbols, err := utils.LoadSymbols(dir, batchKeys) + +// --------------------------------------------------------------------- +// 2. Load → StoreBatch → Delete for a slice of keys +// --------------------------------------------------------------------- +func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, keys []string) error { + loaded, err := utils.LoadSymbols(dir, keys) if err != nil { - return fmt.Errorf("p2p worker: load batch symbols from db: %w", err) - } - // Prepare batch for P2P storage - result := make([][]byte, len(loadedSymbols)) - i := 0 - for _, value := range loadedSymbols { - result[i] = value - i++ + return fmt.Errorf("load symbols: %w", err) } - // Store the loaded symbols in P2P - if err := s.StoreBatch(ctx, result, 1, dir); err != nil { - return fmt.Errorf("p2p worker: store batch raptorq symbols in p2p: %w", err) + if err := s.StoreBatch(ctx, loaded, 1, dir); err != nil { + return fmt.Errorf("p2p store batch: %w", err) } - if err := utils.DeleteSymbols(ctx, dir, loadedSymbols); err != nil { - return fmt.Errorf("p2p worker: delete batch symbols from db: %w", err) + if err := utils.DeleteSymbols(ctx, dir, keys); err != nil { + return fmt.Errorf("delete symbols: %w", err) } - return nil } diff --git a/p2p/p2p.go b/p2p/p2p.go index dcb99d02..0330e39f 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -248,7 +248,6 @@ func (s *p2p) configure(ctx context.Context) error { if s.config.BootstrapNodes != "" && s.config.ExternalIP != "" { kadOpts.ExternalIP = s.config.ExternalIP } - // new a kademlia distributed hash table dht, err := kademlia.NewDHT(ctx, s.store, s.metaStore, kadOpts, s.rqstore) diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go new file mode 100644 index 00000000..9963c988 --- /dev/null +++ b/pkg/codec/codec.go @@ -0,0 +1,39 @@ +//go:generate mockgen -destination=codec_mock.go -package=codec -source=codec.go + +package codec + +import ( + "context" +) + +// EncodeResponse represents the response of the encode request. +type EncodeResponse struct { + Metadata Layout + SymbolsDir string +} + +type Layout struct { + Blocks []Block `json:"blocks"` +} + +// Block is the schema for each entry in the “blocks” array. +type Block struct { + BlockID int `json:"block_id"` + EncoderParameters []int `json:"encoder_parameters"` + OriginalOffset int64 `json:"original_offset"` + Size int64 `json:"size"` + Symbols []string `json:"symbols"` + Hash string `json:"hash"` +} + +// EncodeRequest represents the request to encode a file. +type EncodeRequest struct { + TaskID string + Data []byte +} + +// RaptorQ contains methods for request services from RaptorQ service. +type Codec interface { + // Encode a file + Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) +} diff --git a/pkg/codec/raptorq.go b/pkg/codec/raptorq.go new file mode 100644 index 00000000..4c744f4f --- /dev/null +++ b/pkg/codec/raptorq.go @@ -0,0 +1,83 @@ +package codec + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + + raptorq "github.com/LumeraProtocol/rq-go" +) + +type raptorQ struct { + symbolsBaseDir string +} + +func NewRaptorQCodec(dir string) Codec { + return &raptorQ{ + symbolsBaseDir: dir, + } + +} + +func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) { + /* ---------- 1. initialise RaptorQ processor ---------- */ + + processor, err := raptorq.NewDefaultRaptorQProcessor() + if err != nil { + return EncodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err) + } + defer processor.Free() + + /* ---------- 2. persist req.Data to a temp file ---------- */ + + tmp, err := os.CreateTemp("", "rq-encode-*") + if err != nil { + return EncodeResponse{}, fmt.Errorf("create temp file: %w", err) + } + tmpPath := tmp.Name() + if _, err := tmp.Write(req.Data); err != nil { + tmp.Close() + os.Remove(tmpPath) + return EncodeResponse{}, fmt.Errorf("write temp file: %w", err) + } + if err := tmp.Close(); err != nil { // sync to disk + os.Remove(tmpPath) + return EncodeResponse{}, fmt.Errorf("close temp file: %w", err) + } + + /* ---------- 3. run the encoder ---------- */ + + blockSize := processor.GetRecommendedBlockSize(uint64(len(req.Data))) + + symbolsDir := filepath.Join(rq.symbolsBaseDir, req.TaskID) + if err := os.MkdirAll(symbolsDir, 0o755); err != nil { + os.Remove(tmpPath) + return EncodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err) + } + + _, err = processor.EncodeFile(tmpPath, symbolsDir, blockSize) + if err != nil { + os.Remove(tmpPath) + return EncodeResponse{}, fmt.Errorf("raptorq encode: %w", err) + } + + /* we no longer need the temp file */ + _ = os.Remove(tmpPath) + + /* ---------- 4. read the layout JSON ---------- */ + + layoutPath := filepath.Join(symbolsDir, "_raptorq_layout.json") + layoutData, err := os.ReadFile(layoutPath) + if err != nil { + return EncodeResponse{}, fmt.Errorf("read layout %s: %w", layoutPath, err) + } + + var resp EncodeResponse + if err := json.Unmarshal(layoutData, &resp); err != nil { + return EncodeResponse{}, fmt.Errorf("unmarshal layout: %w", err) + } + + return resp, nil +} diff --git a/pkg/log/context.go b/pkg/log/context.go index 1988a994..e6aa91f0 100644 --- a/pkg/log/context.go +++ b/pkg/log/context.go @@ -2,11 +2,7 @@ package log import ( "context" - "io" - "net" - "net/http" - "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log/hooks" ) @@ -32,10 +28,6 @@ const ( // ContextWithPrefix returns a new context with PrefixKey value. func ContextWithPrefix(ctx context.Context, prefix string) context.Context { - ip, err := GetExternalIPAddress() - if err != nil { - WithContext(ctx).WithError(err).Error("unable to fetch server ip") - } ctx = ContextWithServer(ctx, ip) @@ -53,28 +45,3 @@ func init() { return msg, fields })) } - -// GetExternalIPAddress returns external IP address -func GetExternalIPAddress() (externalIP string, err error) { - if ip != "" { - return ip, nil - } - - resp, err := http.Get("http://ipinfo.io/ip") - if err != nil { - return "", err - } - - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } - - if net.ParseIP(string(body)) == nil { - return "", errors.Errorf("invalid IP response from %s", "ipconf.ip") - } - - return string(body), nil -} diff --git a/pkg/net/credentials/alts/handshake/handshake.go b/pkg/net/credentials/alts/handshake/handshake.go index 38e3c454..b7b8a9b5 100644 --- a/pkg/net/credentials/alts/handshake/handshake.go +++ b/pkg/net/credentials/alts/handshake/handshake.go @@ -88,12 +88,12 @@ func newHandshaker(keyExchange securekeyx.KeyExchanger, conn net.Conn, remoteAdd side Side, timeout time.Duration, opts interface{}) *secureHandshaker { hs := &secureHandshaker{ - conn: conn, + conn: conn, keyExchanger: keyExchange, - remoteAddr: remoteAddr, - side: side, - protocol: RecordProtocolXChaCha20Poly1305ReKey, // Default to XChaCha20-Poly1305 - timeout: timeout, + remoteAddr: remoteAddr, + side: side, + protocol: RecordProtocolXChaCha20Poly1305ReKey, // Default to XChaCha20-Poly1305 + timeout: timeout, } if side == ClientSide { @@ -212,7 +212,6 @@ func (h *secureHandshaker) ClientHandshake(ctx context.Context) (net.Conn, crede // Create ALTS connection altsConn, err := NewConn(h.conn, h.side, h.protocol, expandedKey, nil) if err != nil { - fmt.Println("Failed to create ALTS connection on client: %w", err) return nil, nil, fmt.Errorf("failed to create ALTS connection: %w", err) } diff --git a/pkg/raptorq/client.go b/pkg/raptorq/client.go deleted file mode 100644 index 93820358..00000000 --- a/pkg/raptorq/client.go +++ /dev/null @@ -1,46 +0,0 @@ -package raptorq - -import ( - "context" - "fmt" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/pkg/random" - - "google.golang.org/grpc" -) - -type client struct{} - -// Connect implements node.Client.Connect() -func (client *client) Connect(ctx context.Context, address string) (Connection, error) { - // Limits the dial timeout, prevent got stuck too long - dialCtx, cancel := context.WithTimeout(ctx, defaultConnectTimeout) - defer cancel() - - id, _ := random.String(8, random.Base62Chars) - ctx = log.ContextWithPrefix(ctx, fmt.Sprintf("%s-%s", logPrefix, id)) - - grpcConn, err := grpc.DialContext(dialCtx, address, - - grpc.WithInsecure(), - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.Errorf("fail to dial: %w", err).WithField("address", address) - } - log.WithContext(ctx).Debugf("Connected to RQ %s", address) - - conn := newClientConn(id, grpcConn) - go func() { - //<-conn.Done() // FIXME: to be implemented by new gRPC package - log.WithContext(ctx).Debugf("Disconnected RQ %s", grpcConn.Target()) - }() - return conn, nil -} - -// NewClient returns a new client instance. -func NewClient() ClientInterface { - return &client{} -} diff --git a/pkg/raptorq/config.go b/pkg/raptorq/config.go deleted file mode 100644 index ad6d0dd3..00000000 --- a/pkg/raptorq/config.go +++ /dev/null @@ -1,40 +0,0 @@ -package raptorq - -import "fmt" - -const ( - errValidationStr = "raptorq validation failed - missing val" - defaultHost = "localhost" - defaultPort = 50051 -) - -// Config contains settings of the p2p service -type Config struct { - // the queries IPv4 or IPv6 address - Host string `mapstructure:"host" json:"host,omitempty"` - - // the queries port to listen for connections on - Port int `mapstructure:"port" json:"port,omitempty"` - - RqFilesDir string `mapstructure:"rqfiles_dir" json:"rqfiles_dir,omitempty"` -} - -// NewConfig returns a new Config instance. -func NewConfig() *Config { - return &Config{ - Host: defaultHost, - Port: defaultPort, - } -} - -// Validate raptorq configs -func (config *Config) Validate() error { - if config.Host == "" { - return fmt.Errorf("%s: %s", errValidationStr, "host") - } - if config.Port == 0 { - return fmt.Errorf("%s: %s", errValidationStr, "port") - } - - return nil -} diff --git a/pkg/raptorq/connection.go b/pkg/raptorq/connection.go deleted file mode 100644 index 137b0f52..00000000 --- a/pkg/raptorq/connection.go +++ /dev/null @@ -1,24 +0,0 @@ -package raptorq - -import ( - "github.com/LumeraProtocol/supernode/pkg/lumera" - "google.golang.org/grpc" -) - -// clientConn represents grpc client conneciton. -type clientConn struct { - *grpc.ClientConn - - id string -} - -func (conn *clientConn) RaptorQ(config *Config, lc lumera.Client) RaptorQ { - return NewRaptorQServerClient(conn, config, lc) -} - -func newClientConn(id string, conn *grpc.ClientConn) Connection { - return &clientConn{ - ClientConn: conn, - id: id, - } -} diff --git a/pkg/raptorq/decode.go b/pkg/raptorq/decode.go deleted file mode 100644 index 041cef4a..00000000 --- a/pkg/raptorq/decode.go +++ /dev/null @@ -1,46 +0,0 @@ -package raptorq - -import ( - "context" - "fmt" - - rq "github.com/LumeraProtocol/supernode/gen/raptorq" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/net" -) - -type DecodeRequest struct { - EncoderParameters []byte - Path string -} - -type DecodeResponse struct { - SymbolsCount uint32 - Path string -} - -// Decode handles data decoding -func (c *raptorQServerClient) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { - ctx = net.AddCorrelationID(ctx) - fields := logtrace.Fields{ - logtrace.FieldMethod: "Decode", - logtrace.FieldRequest: req, - } - logtrace.Info(ctx, "decoding data", fields) - - res, err := c.rqService.Decode(ctx, &rq.DecodeRequest{EncoderParameters: req.EncoderParameters, Path: req.Path}) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to decode data", fields) - return DecodeResponse{}, fmt.Errorf("raptorQ decode error: %w", err) - } - - logtrace.Info(ctx, "successfully decoded data", fields) - return toDecodedResponse(res), nil -} - -func toDecodedResponse(reply *rq.DecodeReply) DecodeResponse { - return DecodeResponse{ - Path: reply.Path, - } -} diff --git a/pkg/raptorq/encode.go b/pkg/raptorq/encode.go deleted file mode 100644 index a802c84c..00000000 --- a/pkg/raptorq/encode.go +++ /dev/null @@ -1,48 +0,0 @@ -package raptorq - -import ( - "context" - "fmt" - - rq "github.com/LumeraProtocol/supernode/gen/raptorq" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/net" -) - -type EncodeRequest struct { - Path string -} - -type EncodeResponse struct { - EncoderParameters []byte - SymbolsCount uint32 - Path string -} - -// Encode handles data encoding -func (c *raptorQServerClient) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) { - ctx = net.AddCorrelationID(ctx) - fields := logtrace.Fields{ - logtrace.FieldMethod: "Encode", - logtrace.FieldRequest: req, - } - logtrace.Info(ctx, "encoding metadata", fields) - - res, err := c.rqService.Encode(ctx, &rq.EncodeRequest{Path: req.Path}) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to encode data", fields) - return EncodeResponse{}, fmt.Errorf("raptorQ encode error: %w", err) - } - - logtrace.Info(ctx, "successfully encoded data", fields) - return toEncodedResponse(res), nil -} - -func toEncodedResponse(reply *rq.EncodeReply) EncodeResponse { - return EncodeResponse{ - EncoderParameters: reply.EncoderParameters, - SymbolsCount: reply.SymbolsCount, - Path: reply.Path, - } -} diff --git a/pkg/raptorq/encode_metadata.go b/pkg/raptorq/encode_metadata.go deleted file mode 100644 index 40238d65..00000000 --- a/pkg/raptorq/encode_metadata.go +++ /dev/null @@ -1,49 +0,0 @@ -package raptorq - -import ( - "context" - - rq "github.com/LumeraProtocol/supernode/gen/raptorq" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/net" -) - -type EncodeMetadataRequest struct { - Path string - FilesNumber uint32 - BlockHash string - PastelId string -} - -// EncodeMetaData handles encoding metadata -func (c *raptorQServerClient) EncodeMetaData(ctx context.Context, req EncodeMetadataRequest) (EncodeResponse, error) { - ctx = net.AddCorrelationID(ctx) - fields := logtrace.Fields{ - logtrace.FieldMethod: "EncodeMetaData", - logtrace.FieldRequest: req, - } - logtrace.Info(ctx, "encoding metadata", fields) - - res, err := c.rqService.EncodeMetaData(ctx, &rq.EncodeMetaDataRequest{ - Path: req.Path, - FilesNumber: req.FilesNumber, - BlockHash: req.BlockHash, - PastelId: req.PastelId, - }) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "error encoding metadata", fields) - return EncodeResponse{}, nil - } - - logtrace.Info(ctx, "successfully encoded metadata", fields) - return toEncodedMetadataResponse(res), nil -} - -func toEncodedMetadataResponse(reply *rq.EncodeMetaDataReply) EncodeResponse { - return EncodeResponse{ - EncoderParameters: reply.EncoderParameters, - SymbolsCount: reply.SymbolsCount, - Path: reply.Path, - } -} diff --git a/pkg/raptorq/gen_rq_identifier_files.go b/pkg/raptorq/gen_rq_identifier_files.go deleted file mode 100644 index 43b04550..00000000 --- a/pkg/raptorq/gen_rq_identifier_files.go +++ /dev/null @@ -1,68 +0,0 @@ -package raptorq - -import ( - "context" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/lumera" -) - -const BlockHash = "block_hash" - -type GenRQIdentifiersFilesRequest struct { - Data []byte - RqMax uint32 - CreatorSNAddress string - SignedData string - LC lumera.Client -} - -type GenRQIdentifiersFilesResponse struct { - RQIDsIc uint32 - RQIDs []string - RQIDsFiles [][]byte - RQIDsFile []byte - CreatorSignature []byte - RQEncodeParams EncoderParameters -} - -func (s *raptorQServerClient) GenRQIdentifiersFiles(ctx context.Context, req GenRQIdentifiersFilesRequest) ( - GenRQIdentifiersFilesResponse, error) { - - // Step 1: Encode the original data to get symbol IDs - encodeInfo, err := s.encodeInfo(ctx, req.Data, req.RqMax, BlockHash, req.CreatorSNAddress) - if err != nil { - return GenRQIdentifiersFilesResponse{}, errors.Errorf("error encoding info: %w", err) - } - - // Step 2: Process the symbol ID files (taking just the first one) - var rawRQIDFile RawSymbolIDFile - for i := range encodeInfo.SymbolIDFiles { - rawRQIDFile = encodeInfo.SymbolIDFiles[i] - if len(rawRQIDFile.SymbolIdentifiers) == 0 { - return GenRQIdentifiersFilesResponse{}, errors.Errorf("empty symbol identifiers in raw file") - } - break // Only process the first valid file - } - - // Step 5: Generate RQIDs using the validated data - genRQIDsRes, err := s.generateRQIDs(ctx, generateRQIDsRequest{ - lc: req.LC, - rawFile: rawRQIDFile, - creatorAddress: req.CreatorSNAddress, - maxFiles: req.RqMax, - signedData: req.SignedData, - }) - if err != nil { - return GenRQIdentifiersFilesResponse{}, errors.Errorf("error generating rqids: %w", err) - } - - return GenRQIdentifiersFilesResponse{ - RQIDsIc: genRQIDsRes.RQIDsIc, - RQIDs: genRQIDsRes.RQIDs, - RQIDsFiles: genRQIDsRes.RQIDsFiles, - RQIDsFile: genRQIDsRes.RQIDsFile, - RQEncodeParams: encodeInfo.EncoderParam, - CreatorSignature: genRQIDsRes.signature, - }, nil -} diff --git a/pkg/raptorq/helper.go b/pkg/raptorq/helper.go deleted file mode 100644 index de696e51..00000000 --- a/pkg/raptorq/helper.go +++ /dev/null @@ -1,180 +0,0 @@ -package raptorq - -import ( - "bytes" - "context" - "encoding/json" - "math/rand/v2" - "os" - "strconv" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/cosmos/btcutil/base58" -) - -const ( - InputEncodeFileName = "input.data" - SeparatorByte byte = 46 // separator in dd_and_fingerprints.signature i.e. '.' -) - -// EncoderParameters represents the encoding params used by raptorq services -type EncoderParameters struct { - Oti []byte -} - -// EncodeInfo represents the response returns by encodeInfo method -type EncodeInfo struct { - SymbolIDFiles map[string]RawSymbolIDFile - EncoderParam EncoderParameters -} - -// Encode represents the response returns by Encode method -type Encode struct { - Symbols map[string][]byte - EncoderParam EncoderParameters -} - -// Decode represents the response returns by Decode method -type Decode struct { - File []byte -} - -// func (s *raptorQServerClient) encodeInfo(ctx context.Context, taskID string, data []byte, copies uint32, blockHash string, pastelID string) (*EncodeInfo, error) { -func (s *raptorQServerClient) encodeInfo(ctx context.Context, data []byte, copies uint32, blockHash string, pastelID string) (*EncodeInfo, error) { - - s.semaphore <- struct{}{} // Acquire slot - defer func() { - <-s.semaphore // Release the semaphore slot - }() - - if data == nil { - return nil, errors.Errorf("invalid data") - } - - _, inputPath, err := createInputEncodeFile(s.config.RqFilesDir, data) - if err != nil { - return nil, errors.Errorf("create input file: %w", err) - } - res, err := s.EncodeMetaData(ctx, EncodeMetadataRequest{ - FilesNumber: copies, - BlockHash: blockHash, - PastelId: pastelID, - Path: inputPath, - }) - if err != nil { - return nil, errors.Errorf("encode metadata %s: %w", res.Path, err) - } - - filesMap, err := scanSymbolIDFiles(res.Path) - if err != nil { - return nil, errors.Errorf("scan symbol id files folder %s: %w", res.Path, err) - } - - if len(filesMap) != int(copies) { - return nil, errors.Errorf("symbol id files count not match: expect %d, output %d", copies, len(filesMap)) - } - - output := &EncodeInfo{ - SymbolIDFiles: filesMap, - EncoderParam: EncoderParameters{ - Oti: res.EncoderParameters, - }, - } - - if err := os.Remove(inputPath); err != nil { - logtrace.Error(ctx, "encode info: error removing input file", logtrace.Fields{"Path": inputPath}) - } - - return output, nil -} - -type generateRQIDsRequest struct { - lc lumera.Client - rawFile RawSymbolIDFile - creatorAddress string - maxFiles uint32 - signedData string -} - -type generateRQIDsResponse struct { - RQIDsIc uint32 - RQIDs []string - RQIDsFile []byte - RQIDsFiles [][]byte - signature []byte -} - -func (s *raptorQServerClient) generateRQIDs(ctx context.Context, req generateRQIDsRequest) (generateRQIDsResponse, error) { - // RQID file generated by supernode - rqIDsfile, err := json.Marshal(req.rawFile) - if err != nil { - return generateRQIDsResponse{}, errors.Errorf("marshal rqID file: %w", err) - } - encRqIDsfile := utils.B64Encode(rqIDsfile) - - // Create the RQID file by combining the encoded file with the signature - var buffer bytes.Buffer - buffer.Write(encRqIDsfile) - buffer.WriteByte(SeparatorByte) - buffer.Write([]byte(req.signedData)) - rqIDFile := buffer.Bytes() - - // Generate the specified number of variant IDs - RQIDsIc := rand.Uint32() - RQIDs, RQIDsFiles, err := GetIDFiles(ctx, rqIDFile, RQIDsIc, req.maxFiles) - if err != nil { - return generateRQIDsResponse{}, errors.Errorf("get ID Files: %w", err) - } - - // Create the compressed version of the RQID file - comp, err := utils.HighCompress(ctx, rqIDFile) - if err != nil { - return generateRQIDsResponse{}, errors.Errorf("compress: %w", err) - } - RQIDsFile := utils.B64Encode(comp) - - return generateRQIDsResponse{ - RQIDsIc: RQIDsIc, - RQIDs: RQIDs, - RQIDsFile: RQIDsFile, - RQIDsFiles: RQIDsFiles, - signature: []byte(req.signedData), - }, nil -} - -// GetIDFiles generates ID Files for dd_and_fingerprints files and rq_id files -// file is b64 encoded file appended with signatures and compressed, ic is the initial counter -// and max is the number of ids to generate -func GetIDFiles(ctx context.Context, file []byte, ic uint32, max uint32) (ids []string, files [][]byte, err error) { - idFiles := make([][]byte, 0, max) - ids = make([]string, 0, max) - var buffer bytes.Buffer - - for i := uint32(0); i < max; i++ { - buffer.Reset() - counter := ic + i - - buffer.Write(file) - buffer.WriteByte(SeparatorByte) - buffer.WriteString(strconv.Itoa(int(counter))) // Using the string representation to maintain backward compatibility - - compressedData, err := utils.HighCompress(ctx, buffer.Bytes()) // Ensure you're using the same compression level - if err != nil { - return ids, idFiles, errors.Errorf("compress identifiers file: %w", err) - } - - idFiles = append(idFiles, compressedData) - - hash, err := utils.Sha3256hash(compressedData) - if err != nil { - return ids, idFiles, errors.Errorf("sha3-256-hash error getting an id file: %w", err) - } - - ids = append(ids, base58.Encode(hash)) - } - - return ids, idFiles, nil -} diff --git a/pkg/raptorq/interfaces.go b/pkg/raptorq/interfaces.go deleted file mode 100644 index 92cc1efc..00000000 --- a/pkg/raptorq/interfaces.go +++ /dev/null @@ -1,40 +0,0 @@ -//go:generate mockgen -destination=rq_mock.go -package=raptorq -source=interfaces.go - -package raptorq - -import ( - "context" - - "github.com/LumeraProtocol/supernode/pkg/lumera" -) - -// ClientInterface represents a base connection interface. -type ClientInterface interface { - // Connect connects to the server at the given address. - Connect(ctx context.Context, address string) (Connection, error) -} - -// Connection represents a client connection -type Connection interface { - // Close closes connection. - Close() error - - // RaptorQ returns a new RaptorQ stream. - RaptorQ(config *Config, lc lumera.Client) RaptorQ - - // FIXME: - // Done returns a channel that's closed when connection is shutdown. - //Done() <-chan struct{} -} - -// RaptorQ contains methods for request services from RaptorQ service. -type RaptorQ interface { - // Encode Get map of symbols - Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) - // Decode returns a path to restored file. - Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) - // EncodeMetaData Get encode info(include encode parameters + symbol id files) - EncodeMetaData(ctx context.Context, req EncodeMetadataRequest) (EncodeResponse, error) - // GenRQIdentifiersFiles generates the RQ identifier files - GenRQIdentifiersFiles(ctx context.Context, req GenRQIdentifiersFilesRequest) (GenRQIdentifiersFilesResponse, error) -} diff --git a/pkg/raptorq/raptorq.go b/pkg/raptorq/raptorq.go deleted file mode 100644 index 0eb38e1c..00000000 --- a/pkg/raptorq/raptorq.go +++ /dev/null @@ -1,140 +0,0 @@ -package raptorq - -import ( - "io/fs" - "io/ioutil" - "os" - "path/filepath" - - "github.com/LumeraProtocol/supernode/pkg/errors" - - json "github.com/json-iterator/go" - - "github.com/google/uuid" - - pb "github.com/LumeraProtocol/supernode/gen/raptorq" -) - -const ( - inputEncodeFileName = "input.data" - symbolIDFileSubDir = "meta" - symbolFileSubdir = "symbols" - concurrency = 1 -) - -type RawSymbolIDFile struct { - ID string `json:"id"` - BlockHash string `json:"block_hash"` - PastelID string `json:"pastel_id"` - SymbolIdentifiers []string `json:"symbol_identifiers"` -} - -type raptorQ struct { - conn *clientConn - client pb.RaptorQClient - config Config - semaphore chan struct{} // Semaphore to control concurrency -} - -func randID() string { - id := uuid.NewString() - return id[0:8] -} - -func writeFile(path string, data []byte) error { - return ioutil.WriteFile(path, data, 0750) -} - -func readFile(path string) ([]byte, error) { - return ioutil.ReadFile(path) -} - -func createTaskFolder(base string, subDirs ...string) (string, error) { - taskID := randID() - taskPath := filepath.Join(base, taskID) - taskPath = filepath.Join(taskPath, filepath.Join(subDirs...)) - - err := os.MkdirAll(taskPath, 0750) - - if err != nil { - return "", err - } - - return taskPath, nil -} - -func createInputEncodeFile(base string, data []byte) (taskPath string, inputFile string, err error) { - taskPath, err = createTaskFolder(base) - - if err != nil { - return "", "", errors.Errorf("create task folder: %w", err) - } - - inputFile = filepath.Join(taskPath, inputEncodeFileName) - err = writeFile(inputFile, data) - - if err != nil { - return "", "", errors.Errorf("write file: %w", err) - } - - return taskPath, inputFile, nil -} - -func createInputDecodeSymbols(base string, symbols map[string][]byte) (path string, err error) { - path, err = createTaskFolder(base, symbolFileSubdir) - - if err != nil { - return "", errors.Errorf("create task folder: %w", err) - } - - for id, data := range symbols { - symbolFile := filepath.Join(path, id) - err = writeFile(symbolFile, data) - - if err != nil { - return "", errors.Errorf("write symbol file: %w", err) - } - } - - return path, nil -} - -// scan symbol id files in "meta" folder, return map of file Ids & contents of file (as list of line) -func scanSymbolIDFiles(dirPath string) (map[string]RawSymbolIDFile, error) { - filesMap := make(map[string]RawSymbolIDFile) - - err := filepath.Walk(dirPath, func(path string, info fs.FileInfo, err error) error { - if err != nil { - return errors.Errorf("scan a path %s: %w", path, err) - } - - if info.IsDir() { - // TODO - compare it to root - return nil - } - - fileID := filepath.Base(path) - - configFile, err := os.Open(path) - if err != nil { - return errors.Errorf("opening file: %s - err: %w", path, err) - } - defer configFile.Close() - - file := RawSymbolIDFile{} - jsonParser := json.NewDecoder(configFile) - if err = jsonParser.Decode(&file); err != nil { - return errors.Errorf("parsing file: %s - err: %w", path, err) - } - - filesMap[fileID] = file - - return nil - }) - - if err != nil { - return nil, err - } - - return filesMap, nil -} diff --git a/pkg/raptorq/rq_mock.go b/pkg/raptorq/rq_mock.go deleted file mode 100644 index 25e4bd99..00000000 --- a/pkg/raptorq/rq_mock.go +++ /dev/null @@ -1,186 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go - -// Package raptorq is a generated GoMock package. -package raptorq - -import ( - context "context" - reflect "reflect" - - lumera "github.com/LumeraProtocol/supernode/pkg/lumera" - rqstore "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" - gomock "github.com/golang/mock/gomock" -) - -// MockClientInterface is a mock of ClientInterface interface. -type MockClientInterface struct { - ctrl *gomock.Controller - recorder *MockClientInterfaceMockRecorder -} - -// MockClientInterfaceMockRecorder is the mock recorder for MockClientInterface. -type MockClientInterfaceMockRecorder struct { - mock *MockClientInterface -} - -// NewMockClientInterface creates a new mock instance. -func NewMockClientInterface(ctrl *gomock.Controller) *MockClientInterface { - mock := &MockClientInterface{ctrl: ctrl} - mock.recorder = &MockClientInterfaceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockClientInterface) EXPECT() *MockClientInterfaceMockRecorder { - return m.recorder -} - -// Connect mocks base method. -func (m *MockClientInterface) Connect(ctx context.Context, address string) (Connection, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Connect", ctx, address) - ret0, _ := ret[0].(Connection) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Connect indicates an expected call of Connect. -func (mr *MockClientInterfaceMockRecorder) Connect(ctx, address interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockClientInterface)(nil).Connect), ctx, address) -} - -// MockConnection is a mock of Connection interface. -type MockConnection struct { - ctrl *gomock.Controller - recorder *MockConnectionMockRecorder -} - -// MockConnectionMockRecorder is the mock recorder for MockConnection. -type MockConnectionMockRecorder struct { - mock *MockConnection -} - -// NewMockConnection creates a new mock instance. -func NewMockConnection(ctrl *gomock.Controller) *MockConnection { - mock := &MockConnection{ctrl: ctrl} - mock.recorder = &MockConnectionMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockConnection) EXPECT() *MockConnectionMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockConnection) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockConnectionMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockConnection)(nil).Close)) -} - -// RaptorQ mocks base method. -func (m *MockConnection) RaptorQ(config *Config, lc lumera.Client, store rqstore.Store) RaptorQ { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RaptorQ", config, lc, store) - ret0, _ := ret[0].(RaptorQ) - return ret0 -} - -// RaptorQ indicates an expected call of RaptorQ. -func (mr *MockConnectionMockRecorder) RaptorQ(config, lc, store interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RaptorQ", reflect.TypeOf((*MockConnection)(nil).RaptorQ), config, lc, store) -} - -// MockRaptorQ is a mock of RaptorQ interface. -type MockRaptorQ struct { - ctrl *gomock.Controller - recorder *MockRaptorQMockRecorder -} - -// MockRaptorQMockRecorder is the mock recorder for MockRaptorQ. -type MockRaptorQMockRecorder struct { - mock *MockRaptorQ -} - -// NewMockRaptorQ creates a new mock instance. -func NewMockRaptorQ(ctrl *gomock.Controller) *MockRaptorQ { - mock := &MockRaptorQ{ctrl: ctrl} - mock.recorder = &MockRaptorQMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockRaptorQ) EXPECT() *MockRaptorQMockRecorder { - return m.recorder -} - -// Decode mocks base method. -func (m *MockRaptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Decode", ctx, req) - ret0, _ := ret[0].(DecodeResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Decode indicates an expected call of Decode. -func (mr *MockRaptorQMockRecorder) Decode(ctx, req interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockRaptorQ)(nil).Decode), ctx, req) -} - -// Encode mocks base method. -func (m *MockRaptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Encode", ctx, req) - ret0, _ := ret[0].(EncodeResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Encode indicates an expected call of Encode. -func (mr *MockRaptorQMockRecorder) Encode(ctx, req interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Encode", reflect.TypeOf((*MockRaptorQ)(nil).Encode), ctx, req) -} - -// EncodeMetaData mocks base method. -func (m *MockRaptorQ) EncodeMetaData(ctx context.Context, req EncodeMetadataRequest) (EncodeResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "EncodeMetaData", ctx, req) - ret0, _ := ret[0].(EncodeResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// EncodeMetaData indicates an expected call of EncodeMetaData. -func (mr *MockRaptorQMockRecorder) EncodeMetaData(ctx, req interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EncodeMetaData", reflect.TypeOf((*MockRaptorQ)(nil).EncodeMetaData), ctx, req) -} - -// GenRQIdentifiersFiles mocks base method. -func (m *MockRaptorQ) GenRQIdentifiersFiles(ctx context.Context, req GenRQIdentifiersFilesRequest) (GenRQIdentifiersFilesResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenRQIdentifiersFiles", ctx, req) - ret0, _ := ret[0].(GenRQIdentifiersFilesResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GenRQIdentifiersFiles indicates an expected call of GenRQIdentifiersFiles. -func (mr *MockRaptorQMockRecorder) GenRQIdentifiersFiles(ctx, req interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenRQIdentifiersFiles", reflect.TypeOf((*MockRaptorQ)(nil).GenRQIdentifiersFiles), ctx, req) -} diff --git a/pkg/raptorq/rq_server_client.go b/pkg/raptorq/rq_server_client.go deleted file mode 100644 index ef6c5f64..00000000 --- a/pkg/raptorq/rq_server_client.go +++ /dev/null @@ -1,35 +0,0 @@ -package raptorq - -import ( - "time" - - rq "github.com/LumeraProtocol/supernode/gen/raptorq" - "github.com/LumeraProtocol/supernode/pkg/lumera" -) - -const ( - logPrefix = "grpc-raptorqClient" - defaultConnectTimeout = 120 * time.Second -) - -type raptorQServerClient struct { - config *Config - conn *clientConn - rqService rq.RaptorQClient - lumeraClient lumera.Client - semaphore chan struct{} // Semaphore to control concurrency -} - -func NewRaptorQServerClient(conn *clientConn, config *Config, lc lumera.Client) RaptorQ { - return &raptorQServerClient{ - conn: conn, - rqService: rq.NewRaptorQClient(conn), - lumeraClient: lc, - config: config, - semaphore: make(chan struct{}, concurrency), - } -} - -func (c *raptorQServerClient) Close() { - c.conn.Close() -} diff --git a/pkg/raptorq/valdate_rqids.go b/pkg/raptorq/valdate_rqids.go deleted file mode 100644 index d739b0e3..00000000 --- a/pkg/raptorq/valdate_rqids.go +++ /dev/null @@ -1,57 +0,0 @@ -package raptorq - -import ( - "context" - "strings" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/utils" - json "github.com/json-iterator/go" -) - -// ValidateRQIDs validates the RaptorQ IDs by: -// 1. Parsing the signedData into encoded data and signature -// 2. Verifying the creator's signature against the encoded file -// 3. Confirming that symbol identifiers match between creator and SN versions -func ValidateRQIDs(ctx context.Context, lc lumera.Client, signedData string, - encRQIDsFileBySN []byte, symbolIdentifiers []string, - creatorAddress string) (err error) { - - // Split the signedData into encoded part and signature part - parts := strings.SplitN(signedData, ".", 2) // b64Encode(encodedData).Signature - if len(parts) != 2 { - return errors.New("invalid signed data format: missing signature") - } - - // Extract the parts - encodedRqids := parts[0] - creatorSignature := []byte(parts[1]) - - // Decode the base64 encoded rqids file received in the action request - rqidsFileBytesByCreator, err := utils.B64Decode([]byte(encodedRqids)) - if err != nil { - return errors.Errorf("failed to decode creator's RQID file: %w", err) - } - - // Verify signature against the encodedRQIDFile generated by the supernode - err = lc.Auth().Verify(ctx, creatorAddress, encRQIDsFileBySN, creatorSignature) - if err != nil { - return errors.Errorf("signature verification failed: %w", err) - } - - // Parse the creator's RQID file - var actionInputRqIDsRawFile RawSymbolIDFile - err = json.Unmarshal(rqidsFileBytesByCreator, &actionInputRqIDsRawFile) - if err != nil { - return errors.Errorf("failed to parse creator's RQID file: %w", err) - } - - // Verify that the symbol identifiers match between versions - if err := utils.EqualStrList(symbolIdentifiers, actionInputRqIDsRawFile.SymbolIdentifiers); err != nil { - return errors.Errorf("symbol identifiers don't match: %w", err) - } - - return nil - -} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 3024cbc5..d8fe2e4b 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -38,6 +38,41 @@ const ( highCompressionLevel = 4 ) +var ipEndpoints = []string{ + "https://api.ipify.org", + "https://ifconfig.co/ip", + "https://checkip.amazonaws.com", + "https://ipv4.icanhazip.com", +} + +// GetExternalIPAddress returns the first valid public IP obtained +// from a list of providers, or an error if none work. +func GetExternalIPAddress() (string, error) { + client := &http.Client{Timeout: 4 * time.Second} + + for _, url := range ipEndpoints { + req, _ := http.NewRequest(http.MethodGet, url, nil) + + resp, err := client.Do(req) + if err != nil { + continue // provider down? try next + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + continue + } + + ip := strings.TrimSpace(string(body)) + if net.ParseIP(ip) != nil { + return ip, nil + } + } + + return "", errors.New("unable to determine external IP address from any provider") +} + var sem = semaphore.NewWeighted(maxParallelHighCompressCalls) // DiskStatus cotains info of disk storage @@ -117,28 +152,6 @@ func IsContextErr(err error) bool { return false } -// GetExternalIPAddress returns external IP address -func GetExternalIPAddress() (externalIP string, err error) { - - resp, err := http.Get("http://ipinfo.io/ip") - if err != nil { - return "", err - } - - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } - - if net.ParseIP(string(body)) == nil { - return "", errors.Errorf("invalid IP response from %s", "ipconf.ip") - } - - return string(body), nil -} - // B64Encode base64 encodes func B64Encode(in []byte) (out []byte) { out = make([]byte, base64.StdEncoding.EncodedLen(len(in))) @@ -422,9 +435,10 @@ func HighCompress(cctx context.Context, data []byte) ([]byte, error) { // LoadSymbols takes a directory path and a map where keys are filenames. It reads each file in the directory // corresponding to the keys in the map and updates the map with the content of the files as byte slices. -func LoadSymbols(dir string, keys map[string][]byte) (map[string][]byte, error) { +func LoadSymbols(dir string, keys []string) (ret [][]byte, err error) { // Iterate over the map keys which are filenames - for filename := range keys { + ret = make([][]byte, 0, len(keys)) + for _, filename := range keys { // Construct the full path to the file fullPath := filepath.Join(dir, filename) @@ -435,17 +449,17 @@ func LoadSymbols(dir string, keys map[string][]byte) (map[string][]byte, error) } // Update the map with the file data - keys[filename] = data + ret = append(ret, data) } // Return the updated map - return keys, nil + return ret, nil } // DeleteSymbols takes a directory path and a map where keys are filenames. It deletes each file in the directory -func DeleteSymbols(ctx context.Context, dir string, keys map[string][]byte) error { +func DeleteSymbols(ctx context.Context, dir string, keys []string) error { // Iterate over the map keys which are filenames - for filename := range keys { + for _, filename := range keys { // Construct the full path to the file fullPath := filepath.Join(dir, filename) @@ -458,30 +472,39 @@ func DeleteSymbols(ctx context.Context, dir string, keys map[string][]byte) erro return nil } -// ReadDirFilenames reads all the filenames in a directory and returns them in a map with the filename as the key +// ReadDirFilenames returns a map whose keys are "block_*/file" paths, values nil. func ReadDirFilenames(dirPath string) (map[string][]byte, error) { - idMap := make(map[string][]byte) // Map to store file names + idMap := make(map[string][]byte) - err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { - if err != nil { - return errors.Errorf("scan a path %s: %w", path, err) - } + entries, err := os.ReadDir(dirPath) + if err != nil { + return nil, fmt.Errorf("read base dir: %w", err) + } - if info.IsDir() { - return nil // Skip directories + for _, ent := range entries { + if !ent.IsDir() { + // file at root (rare) – keep backward-compat + if ent.Name() == "_raptorq_layout.json" { + continue + } + idMap[ent.Name()] = nil + continue } - fileID := filepath.Base(path) - idMap[fileID] = nil // Store the file name with a nil value in the map - - return nil - }) - - if err != nil { - return nil, err + blockDir := filepath.Join(dirPath, ent.Name()) + files, err := os.ReadDir(blockDir) + if err != nil { + return nil, fmt.Errorf("read %s: %w", blockDir, err) + } + for _, f := range files { + if f.IsDir() || f.Name() == "_raptorq_layout.json" { + continue + } + rel := filepath.Join(ent.Name(), f.Name()) // "block_0/abc.sym" + idMap[rel] = nil + } } - // Here you might want to do something with idMap or just return it return idMap, nil } diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index ea221936..87c9eb74 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -11,10 +11,10 @@ import ( "github.com/LumeraProtocol/supernode/p2p" "github.com/LumeraProtocol/supernode/p2p/kademlia/store/cloud.go" "github.com/LumeraProtocol/supernode/p2p/kademlia/store/sqlite" + "github.com/LumeraProtocol/supernode/pkg/codec" "github.com/LumeraProtocol/supernode/pkg/keyring" "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/raptorq" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/supernode/config" "github.com/LumeraProtocol/supernode/supernode/node/action/server/cascade" @@ -86,28 +86,17 @@ The supernode will connect to the Lumera network and begin participating in the return err } - // Initialize RaptorQ client connection - raptorQClientConnection, err := raptorq.NewClient().Connect(ctx, appConfig.RaptorQConfig.ServiceAddress) - if err != nil { - logtrace.Error(ctx, "Failed to initialize raptor q client connection interface", logtrace.Fields{ - "error": err.Error(), - }) - return err - } - // Configure cascade service cService := cascadeService.NewCascadeService( &cascadeService.Config{ Config: common.Config{ SupernodeAccountAddress: appConfig.SupernodeConfig.KeyName, }, - RaptorQServiceAddress: appConfig.RaptorQConfig.ServiceAddress, - RqFilesDir: appConfig.GetRaptorQFilesDir(), + RqFilesDir: appConfig.GetRaptorQFilesDir(), }, lumeraClient, *p2pService, - raptorQClientConnection.RaptorQ(raptorq.NewConfig(), lumeraClient), - raptorq.NewClient(), + codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()), rqStore, ) diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 3fdedd08..397866f4 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -80,12 +80,8 @@ func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_Uplo // Process the complete data task := server.service.NewCascadeRegistrationTask() res, err := task.UploadInputData(ctx, &cascadeService.UploadInputDataRequest{ - Filename: metadata.Filename, - ActionID: metadata.ActionId, - DataHash: metadata.DataHash, - RqMax: metadata.RqMax, - SignedData: metadata.SignedData, - Data: allData, + ActionID: metadata.ActionId, + Data: allData, }) if err != nil { diff --git a/supernode/services/cascade/metadata.go b/supernode/services/cascade/metadata.go new file mode 100644 index 00000000..1ddb74a9 --- /dev/null +++ b/supernode/services/cascade/metadata.go @@ -0,0 +1,96 @@ +package cascade + +import ( + "context" + + "bytes" + + "strconv" + + "github.com/LumeraProtocol/supernode/pkg/codec" + "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/utils" + "github.com/cosmos/btcutil/base58" + json "github.com/json-iterator/go" +) + +const ( + SeparatorByte byte = 46 // separator in dd_and_fingerprints.signature i.e. '.' +) + +type GenRQIdentifiersFilesRequest struct { + Metadata codec.Layout + RqMax uint32 + CreatorSNAddress string + Signature string + IC uint32 +} + +type GenRQIdentifiersFilesResponse struct { + // IDs of the Redundant Metadata Files -- len(RQIDs) == len(RedundantMetadataFiles) + RQIDs []string + // RedundantMetadataFiles is a list of redundant files that are generated from the Metadata file + RedundantMetadataFiles [][]byte + // Compressed[B64(JSON(layout)).Signature] + B64EncodedMetadataFileWithSignatureCompressed []byte +} + +// GenRQIdentifiersFiles generates Redundant Metadata Files and IDs +func GenRQIdentifiersFiles(ctx context.Context, req GenRQIdentifiersFilesRequest) (resp GenRQIdentifiersFilesResponse, err error) { + metadataFile, err := json.Marshal(req.Metadata) + if err != nil { + return resp, errors.Errorf("marshal rqID file: %w", err) + } + b64EncodedMetadataFile := utils.B64Encode(metadataFile) + + // Create the RQID file by combining the encoded file with the signature + var buffer bytes.Buffer + buffer.Write(b64EncodedMetadataFile) + buffer.WriteByte(SeparatorByte) + buffer.Write([]byte(req.Signature)) + encMetadataFileWithSignature := buffer.Bytes() + + // Generate the specified number of variant IDs + _, RQIDsFiles, err := GetIDFiles(ctx, encMetadataFileWithSignature, req.IC, req.RqMax) + if err != nil { + return resp, errors.Errorf("get ID Files: %w", err) + } + + return GenRQIdentifiersFilesResponse{ + RedundantMetadataFiles: RQIDsFiles, + }, nil +} + +// GetIDFiles generates Redundant Files for dd_and_fingerprints files and rq_id files +// encMetadataFileWithSignature is b64 encoded layout file appended with signatures and compressed, ic is the initial counter +// and max is the number of ids to generate +func GetIDFiles(ctx context.Context, encMetadataFileWithSignature []byte, ic uint32, max uint32) (ids []string, files [][]byte, err error) { + idFiles := make([][]byte, 0, max) + ids = make([]string, 0, max) + var buffer bytes.Buffer + + for i := uint32(0); i < max; i++ { + buffer.Reset() + counter := ic + i + + buffer.Write(encMetadataFileWithSignature) + buffer.WriteByte(SeparatorByte) + buffer.WriteString(strconv.Itoa(int(counter))) // Using the string representation to maintain backward compatibility + + compressedData, err := utils.HighCompress(ctx, buffer.Bytes()) + if err != nil { + return ids, idFiles, errors.Errorf("compress identifiers file: %w", err) + } + + idFiles = append(idFiles, compressedData) + + hash, err := utils.Sha3256hash(compressedData) + if err != nil { + return ids, idFiles, errors.Errorf("sha3-256-hash error getting an id file: %w", err) + } + + ids = append(ids, base58.Encode(hash)) + } + + return ids, idFiles, nil +} diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go new file mode 100644 index 00000000..35bd3f0a --- /dev/null +++ b/supernode/services/cascade/register.go @@ -0,0 +1,257 @@ +package cascade + +import ( + "context" + "encoding/hex" + "strings" + + "github.com/LumeraProtocol/supernode/pkg/codec" + "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/log" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" + "github.com/LumeraProtocol/supernode/pkg/utils" + "github.com/LumeraProtocol/supernode/supernode/services/common" + json "github.com/json-iterator/go" + + actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// UploadInputDataRequest contains parameters for upload request +type UploadInputDataRequest struct { + ActionID string + Data []byte +} + +// UploadInputDataResponse contains the result of upload +type UploadInputDataResponse struct { + Success bool + Message string +} + +// UploadInputData processes the upload request for cascade input data +func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *UploadInputDataRequest) (*UploadInputDataResponse, error) { + fields := logtrace.Fields{ + logtrace.FieldMethod: "UploadInputData", + logtrace.FieldRequest: req, + } + + // Get action details from Lumera + actionRes, err := task.lumeraClient.Action().GetAction(ctx, req.ActionID) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to get action", fields) + return nil, status.Errorf(codes.Internal, "failed to get action") + } + if actionRes.GetAction().ActionID == "" { + logtrace.Error(ctx, "action not found", fields) + return nil, status.Errorf(codes.Internal, "action not found") + } + actionDetails := actionRes.GetAction() + logtrace.Info(ctx, "action has been retrieved", fields) + + topSNsRes, err := task.lumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, uint64(actionDetails.BlockHeight)) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to get top SNs", fields) + return nil, status.Errorf(codes.Internal, "failed to get top SNs") + } + logtrace.Info(ctx, "top sns have been fetched", fields) + + // Verify current supernode is in the top list + if !supernode.Exists(topSNsRes.Supernodes, task.config.SupernodeAccountAddress) { + logtrace.Error(ctx, "current supernode do not exist in the top sns list", fields) + return nil, status.Errorf(codes.Internal, "current supernode does not exist in the top sns list") + } + logtrace.Info(ctx, "current supernode exists in the top sns list", fields) + + // Parse the action metadata to CascadeMetadata + var cascadeMetadata actiontypes.CascadeMetadata + if err := proto.Unmarshal(actionDetails.Metadata, &cascadeMetadata); err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to unmarshal cascade metadata", fields) + return nil, status.Errorf(codes.Internal, "failed to unmarshal cascade metadata") + } + + // Verify data hash matches action metadata + dataHash, _ := utils.Sha3256hash(req.Data) + hash := utils.B64Encode(dataHash) + if hex.EncodeToString(hash) != cascadeMetadata.DataHash { + logtrace.Error(ctx, "data hash doesn't match", fields) + return nil, status.Errorf(codes.Internal, "data hash doesn't match") + } + logtrace.Info(ctx, "request data-hash has been matched with the action data-hash", fields) + + encodeRequest := codec.EncodeRequest{ + Data: req.Data, + TaskID: task.ID(), + } + + resp, err := task.codec.Encode(ctx, encodeRequest) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to encode data", fields) + return nil, status.Errorf(codes.Internal, "failed to encode data") + } + + encodedMetadata, signature, err := extractSignatureAndFirstPart(cascadeMetadata.Signatures) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to extract signature and first part", fields) + return nil, status.Errorf(codes.Internal, "failed to extract signature and first part") + } + + // Verify signature against the encodedRQIDFile generated by the supernode + err = task.lumeraClient.Auth().Verify(ctx, actionDetails.GetCreator(), []byte(encodedMetadata), []byte(signature)) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to verify signature", fields) + return nil, status.Errorf(codes.Internal, "failed to verify signature") + } + + // Decode the metadata file + layout, err := decodeMetadataFile(encodedMetadata) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to decode metadata file", fields) + return nil, status.Errorf(codes.Internal, "failed to decode metadata file") + } + + // Generate RaptorQ identifiers + res, err := GenRQIdentifiersFiles(ctx, GenRQIdentifiersFilesRequest{ + Metadata: resp.Metadata, + CreatorSNAddress: actionDetails.GetCreator(), + RqMax: uint32(cascadeMetadata.RqIdsMax), + Signature: signature, + IC: uint32(cascadeMetadata.RqIdsIc), + }) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to generate RQID Files", fields) + return nil, status.Errorf(codes.Internal, "failed to generate RQID Files") + } + logtrace.Info(ctx, "rq symbols, rq-ids and rqid-files have been generated", fields) + + // Verify that the symbol identifiers match between versions + if err := verifyIDs(ctx, layout, resp.Metadata); err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to verify IDs", fields) + return nil, status.Errorf(codes.Internal, "failed to verify IDs") + } + // About to store ID files + logtrace.Info(ctx, "About to store ID files", logtrace.Fields{ + "taskID": task.ID(), + "fileCount": 0, + }) + + // Store ID files to P2P + if err = task.storeIDFiles(ctx, res.RedundantMetadataFiles); err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to store ID files", fields) + return nil, status.Errorf(codes.Internal, "failed to store ID files") + } + logtrace.Info(ctx, "id files have been stored", fields) + + // Store RaptorQ symbols + if err = task.storeRaptorQSymbols(ctx, resp.SymbolsDir); err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "error storing raptor-q symbols", fields) + return nil, status.Errorf(codes.Internal, "error storing raptor-q symbols") + } + logtrace.Info(ctx, "raptor-q symbols have been stored", fields) + + return &UploadInputDataResponse{ + Success: true, + Message: "successfully uploaded input data", + }, nil +} + +// extractSignatureAndFirstPart extracts the signature and first part from the encoded data +// data is expected to be in format: b64(JSON(Layout)).Signature +func extractSignatureAndFirstPart(data string) (encodedMetadata string, signature string, err error) { + parts := strings.Split(data, ".") + if len(parts) < 2 { + return "", "", errors.New("invalid data format") + } + + // The first part is the base64 encoded data + return parts[0], parts[1], nil +} + +func decodeMetadataFile(data string) (layout codec.Layout, err error) { + // Decode the base64 encoded data + decodedData, err := utils.B64Decode([]byte(data)) + if err != nil { + return layout, errors.Errorf("failed to decode data: %w", err) + } + + // Unmarshal the decoded data into a layout + if err := json.Unmarshal(decodedData, &layout); err != nil { + return layout, errors.Errorf("failed to unmarshal data: %w", err) + } + + return layout, nil +} + +func verifyIDs(ctx context.Context, ticketMetadata, metadata codec.Layout) error { + // Verify that the symbol identifiers match between versions + if err := utils.EqualStrList(ticketMetadata.Blocks[0].Symbols, metadata.Blocks[0].Symbols); err != nil { + return errors.Errorf("symbol identifiers don't match: %w", err) + } + + // Verify that the block hashes match + if ticketMetadata.Blocks[0].Hash != metadata.Blocks[0].Hash { + return errors.New("block hashes don't match") + } + + return nil +} + +// storeIDFiles stores ID files to P2P +func (task *CascadeRegistrationTask) storeIDFiles(ctx context.Context, metadataFiles [][]byte) error { + ctx = context.WithValue(ctx, log.TaskIDKey, task.ID()) + task.storage.TaskID = task.ID() + + // Log basic info before storing + logtrace.Info(ctx, "Storing ID files", logtrace.Fields{ + "taskID": task.ID(), + }) + + // Check if files exist + if len(metadataFiles) == 0 { + logtrace.Error(ctx, "No ID files to store", nil) + return errors.New("no ID files to store") + } + + // Store files with better error handling + if err := task.storage.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata); err != nil { + logtrace.Error(ctx, "Store operation failed", logtrace.Fields{ + "error": err.Error(), + "fileCount": len(metadataFiles), + }) + return errors.Errorf("store ID files into kademlia: %w", err) + } + + logtrace.Info(ctx, "ID files stored successfully", nil) + return nil +} + +// storeRaptorQSymbols stores RaptorQ symbols to P2P +func (task *CascadeRegistrationTask) storeRaptorQSymbols(ctx context.Context, symbolsDir string) error { + // Add improved logging + logtrace.Info(ctx, "Storing RaptorQ symbols", logtrace.Fields{ + "taskID": task.ID(), + }) + + err := task.storage.StoreRaptorQSymbolsIntoP2P(ctx, task.ID(), symbolsDir) + if err != nil { + logtrace.Error(ctx, "Failed to store RaptorQ symbols", logtrace.Fields{ + "taskID": task.ID(), + "error": err.Error(), + }) + } + return err +} diff --git a/supernode/services/cascade/service.go b/supernode/services/cascade/service.go index 74737ea8..e6cdf147 100644 --- a/supernode/services/cascade/service.go +++ b/supernode/services/cascade/service.go @@ -4,8 +4,9 @@ import ( "context" "github.com/LumeraProtocol/supernode/p2p" + "github.com/LumeraProtocol/supernode/pkg/codec" "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/raptorq" + "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/supernode/services/common" ) @@ -14,10 +15,9 @@ type CascadeService struct { *common.SuperNodeService config *Config - lumeraClient lumera.Client - raptorQClient raptorq.ClientInterface - rqstore rqstore.Store - raptorQ raptorq.RaptorQ + lumeraClient lumera.Client + rqstore rqstore.Store + codec codec.Codec } // NewCascadeRegistrationTask creates a new task for cascade registration @@ -33,20 +33,12 @@ func (service *CascadeService) Run(ctx context.Context) error { } // NewCascadeService returns a new CascadeService instance -func NewCascadeService( - config *Config, - lumera lumera.Client, - p2pClient p2p.Client, - rqC raptorq.RaptorQ, - rqClient raptorq.ClientInterface, - rqstore rqstore.Store, -) *CascadeService { +func NewCascadeService(config *Config, lumera lumera.Client, p2pClient p2p.Client, codec codec.Codec, rqstore rqstore.Store) *CascadeService { return &CascadeService{ config: config, SuperNodeService: common.NewSuperNodeService(p2pClient), lumeraClient: lumera, - raptorQ: rqC, - raptorQClient: rqClient, + codec: codec, rqstore: rqstore, } } diff --git a/supernode/services/cascade/task.go b/supernode/services/cascade/task.go index 8246f908..fd15ccc9 100644 --- a/supernode/services/cascade/task.go +++ b/supernode/services/cascade/task.go @@ -3,25 +3,12 @@ package cascade import ( "context" - "github.com/LumeraProtocol/supernode/pkg/raptorq" "github.com/LumeraProtocol/supernode/pkg/storage/files" "github.com/LumeraProtocol/supernode/supernode/services/common" ) -// RQInfo holds RaptorQ-related information -type RQInfo struct { - rqIDsIC uint32 - rqIDs []string - rqIDEncodeParams raptorq.EncoderParameters - - rqIDsFile []byte - rawRqFile []byte - rqIDFiles [][]byte -} - // CascadeRegistrationTask is the task for cascade registration type CascadeRegistrationTask struct { - RQInfo *CascadeService *common.SuperNodeTask @@ -51,8 +38,7 @@ func NewCascadeRegistrationTask(service *CascadeService) *CascadeRegistrationTas task := &CascadeRegistrationTask{ SuperNodeTask: common.NewSuperNodeTask(logPrefix), CascadeService: service, - storage: common.NewStorageHandler(service.P2PClient, service.raptorQClient, - service.config.RaptorQServiceAddress, service.config.RqFilesDir, service.rqstore), + storage: common.NewStorageHandler(service.P2PClient, service.config.RqFilesDir, service.rqstore), } return task diff --git a/supernode/services/cascade/upload.go b/supernode/services/cascade/upload.go deleted file mode 100644 index 9c1d4fb4..00000000 --- a/supernode/services/cascade/upload.go +++ /dev/null @@ -1,195 +0,0 @@ -package cascade - -import ( - "context" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/raptorq" - "github.com/LumeraProtocol/supernode/supernode/services/common" - - actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// UploadInputDataRequest contains parameters for upload request -type UploadInputDataRequest struct { - ActionID string - Filename string - DataHash string - RqMax int32 - SignedData string - Data []byte -} - -// UploadInputDataResponse contains the result of upload -type UploadInputDataResponse struct { - Success bool - Message string -} - -// UploadInputData processes the upload request for cascade input data -func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *UploadInputDataRequest) (*UploadInputDataResponse, error) { - fields := logtrace.Fields{ - logtrace.FieldMethod: "UploadInputData", - logtrace.FieldRequest: req, - } - - // Get action details from Lumera - actionRes, err := task.lumeraClient.Action().GetAction(ctx, req.ActionID) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to get action", fields) - return nil, status.Errorf(codes.Internal, "failed to get action") - } - if actionRes.GetAction().ActionID == "" { - logtrace.Error(ctx, "action not found", fields) - return nil, status.Errorf(codes.Internal, "action not found") - } - actionDetails := actionRes.GetAction() - logtrace.Info(ctx, "action has been retrieved", fields) - - // Get latest block information - // latestBlock, err := task.lumeraClient.Node().GetLatestBlock(ctx) - // if err != nil { - // fields[logtrace.FieldError] = err.Error() - // logtrace.Error(ctx, "failed to get latest block", fields) - // return nil, status.Errorf(codes.Internal, "failed to get latest block") - // } - // latestBlockHeight := uint64(latestBlock.GetSdkBlock().GetHeader().Height) - // latestBlockHash := latestBlock.GetBlockId().GetHash() - // fields[logtrace.FieldBlockHeight] = latestBlockHeight - // logtrace.Info(ctx, "latest block has been retrieved", fields) - - // Get top supernodes - // topSNsRes, err := task.lumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, uint64(actionDetails.BlockHeight)) - // if err != nil { - // fields[logtrace.FieldError] = err.Error() - // logtrace.Error(ctx, "failed to get top SNs", fields) - // return nil, status.Errorf(codes.Internal, "failed to get top SNs") - // } - // logtrace.Info(ctx, "top sns have been fetched", fields) - - // // Verify current supernode is in the top list - // if !supernode.Exists(topSNsRes.Supernodes, task.config.SupernodeAccountAddress) { - // logtrace.Error(ctx, "current supernode do not exist in the top sns list", fields) - // return nil, status.Errorf(codes.Internal, "current supernode does not exist in the top sns list") - // } - // logtrace.Info(ctx, "current supernode exists in the top sns list", fields) - - // Parse the action metadata to CascadeMetadata - var cascadeMetadata actiontypes.CascadeMetadata - if err := proto.Unmarshal(actionDetails.Metadata, &cascadeMetadata); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to unmarshal cascade metadata", fields) - return nil, status.Errorf(codes.Internal, "failed to unmarshal cascade metadata") - } - - // Verify data hash matches action metadata - if req.DataHash != cascadeMetadata.DataHash { - logtrace.Error(ctx, "data hash doesn't match", fields) - return nil, status.Errorf(codes.Internal, "data hash doesn't match") - } - logtrace.Info(ctx, "request data-hash has been matched with the action data-hash", fields) - - // Generate RaptorQ identifiers - res, err := task.raptorQ.GenRQIdentifiersFiles(ctx, raptorq.GenRQIdentifiersFilesRequest{ - Data: req.Data, - CreatorSNAddress: actionDetails.GetCreator(), - RqMax: uint32(cascadeMetadata.RqIdsMax), - SignedData: req.SignedData, - LC: task.lumeraClient, - }) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to generate RQID Files", fields) - return nil, status.Errorf(codes.Internal, "failed to generate RQID Files") - } - logtrace.Info(ctx, "rq symbols, rq-ids and rqid-files have been generated", fields) - - // Store RaptorQ information - task.RQInfo.rqIDsIC = res.RQIDsIc - task.RQInfo.rqIDs = res.RQIDs - task.RQInfo.rqIDFiles = res.RQIDsFiles - task.RQInfo.rqIDsFile = res.RQIDsFile - task.RQInfo.rqIDEncodeParams = res.RQEncodeParams - task.creatorSignature = res.CreatorSignature - - // About to store ID files - logtrace.Info(ctx, "About to store ID files", logtrace.Fields{ - "taskID": task.ID(), - "fileCount": len(task.RQInfo.rqIDFiles), - }) - - // Store ID files to P2P - if err = task.storeIDFiles(ctx); err != nil { - fields[logtrace.FieldError] = err.Error() - fields["fileCount"] = len(task.RQInfo.rqIDFiles) - logtrace.Error(ctx, "failed to store ID files", fields) - return nil, status.Errorf(codes.Internal, "failed to store ID files") - } - logtrace.Info(ctx, "id files have been stored", fields) - - // Store RaptorQ symbols - if err = task.storeRaptorQSymbols(ctx); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "error storing raptor-q symbols", fields) - return nil, status.Errorf(codes.Internal, "error storing raptor-q symbols") - } - logtrace.Info(ctx, "raptor-q symbols have been stored", fields) - - return &UploadInputDataResponse{ - Success: true, - Message: "successfully uploaded input data", - }, nil -} - -// storeIDFiles stores ID files to P2P -func (task *CascadeRegistrationTask) storeIDFiles(ctx context.Context) error { - ctx = context.WithValue(ctx, log.TaskIDKey, task.ID()) - task.storage.TaskID = task.ID() - - // Log basic info before storing - logtrace.Info(ctx, "Storing ID files", logtrace.Fields{ - "taskID": task.ID(), - "fileCount": len(task.RQInfo.rqIDFiles), - }) - - // Check if files exist - if len(task.RQInfo.rqIDFiles) == 0 { - logtrace.Error(ctx, "No ID files to store", nil) - return errors.New("no ID files to store") - } - - // Store files with better error handling - if err := task.storage.StoreBatch(ctx, task.RQInfo.rqIDFiles, common.P2PDataCascadeMetadata); err != nil { - logtrace.Error(ctx, "Store operation failed", logtrace.Fields{ - "error": err.Error(), - "fileCount": len(task.RQInfo.rqIDFiles), - }) - return errors.Errorf("store ID files into kademlia: %w", err) - } - - logtrace.Info(ctx, "ID files stored successfully", nil) - return nil -} - -// storeRaptorQSymbols stores RaptorQ symbols to P2P -func (task *CascadeRegistrationTask) storeRaptorQSymbols(ctx context.Context) error { - // Add improved logging - logtrace.Info(ctx, "Storing RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - }) - - err := task.storage.StoreRaptorQSymbolsIntoP2P(ctx, task.ID()) - if err != nil { - logtrace.Error(ctx, "Failed to store RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - "error": err.Error(), - }) - } - return err -} diff --git a/supernode/services/common/reg_task_helper.go b/supernode/services/common/reg_task_helper.go deleted file mode 100644 index d6722ee6..00000000 --- a/supernode/services/common/reg_task_helper.go +++ /dev/null @@ -1,140 +0,0 @@ -package common - -import ( - "bytes" - "context" - "sync" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/raptorq" - "github.com/LumeraProtocol/supernode/pkg/utils" -) - -const ( - SeparatorByte = 46 -) - -// RegTaskHelper common operations related to (any) Ticket registration -type RegTaskHelper struct { - *SuperNodeTask - - NetworkHandler *NetworkHandler - LumeraHandler *lumera.Client - - peersTicketSignatureMtx *sync.Mutex - PeersTicketSignature map[string][]byte - AllSignaturesReceivedChn chan struct{} -} - -// NewRegTaskHelper creates instance of RegTaskHelper -func NewRegTaskHelper(task *SuperNodeTask, - lumeraClient lumera.Client, - NetworkHandler *NetworkHandler, -) *RegTaskHelper { - return &RegTaskHelper{ - SuperNodeTask: task, - LumeraHandler: &lumeraClient, - NetworkHandler: NetworkHandler, - peersTicketSignatureMtx: &sync.Mutex{}, - PeersTicketSignature: make(map[string][]byte), - AllSignaturesReceivedChn: make(chan struct{}), - } -} - -// AddPeerTicketSignature waits for ticket signatures from other SNs and adds them into internal array -func (h *RegTaskHelper) AddPeerTicketSignature(nodeID string, signature []byte, reqStatus Status) error { - h.peersTicketSignatureMtx.Lock() - defer h.peersTicketSignatureMtx.Unlock() - - if err := h.RequiredStatus(reqStatus); err != nil { - return err - } - - var err error - - <-h.NewAction(func(ctx context.Context) error { - log.WithContext(ctx).Debugf("receive NFT ticket signature from node %s", nodeID) - if node := h.NetworkHandler.Accepted.ByID(nodeID); node == nil { - log.WithContext(ctx).WithField("node", nodeID).Errorf("node is not in Accepted list") - err = errors.Errorf("node %s not in Accepted list", nodeID) - return nil - } - - h.PeersTicketSignature[nodeID] = signature - if len(h.PeersTicketSignature) == len(h.NetworkHandler.Accepted) { - log.WithContext(ctx).Debug("all signature received") - go func() { - close(h.AllSignaturesReceivedChn) - }() - } - return nil - }) - return err -} - -// ValidateIDFiles validates received (IDs) file and its (50) IDs: -// 1. checks signatures -// 2. generates list of 50 IDs and compares them to received -func (h *RegTaskHelper) ValidateIDFiles(ctx context.Context, - data []byte, ic uint32, max uint32, ids []string, numSignRequired int, - snAccAddresses []string, - lumeraClient lumera.Client, - creatorSignaure []byte, -) ([]byte, [][]byte, error) { - - dec, err := utils.B64Decode(data) - if err != nil { - return nil, nil, errors.Errorf("decode data: %w", err) - } - - decData, err := utils.Decompress(dec) - if err != nil { - return nil, nil, errors.Errorf("decompress: %w", err) - } - - splits := bytes.Split(decData, []byte{SeparatorByte}) - if len(splits) != numSignRequired+1 { - return nil, nil, errors.New("invalid data") - } - - file, err := utils.B64Decode(splits[0]) - if err != nil { - return nil, nil, errors.Errorf("decode file: %w", err) - } - - verifications := 0 - verifiedNodes := make(map[int]bool) - for i := 1; i < numSignRequired+1; i++ { - for j := 0; j < len(snAccAddresses); j++ { - if _, ok := verifiedNodes[j]; ok { - continue - } - - err := lumeraClient.Auth().Verify(ctx, snAccAddresses[j], file, creatorSignaure) // TODO : verify the signature - if err != nil { - return nil, nil, errors.Errorf("verify file signature %w", err) - } - - verifiedNodes[j] = true - verifications++ - break - } - } - - if verifications != numSignRequired { - return nil, nil, errors.Errorf("file verification failed: need %d verifications, got %d", numSignRequired, verifications) - } - - gotIDs, idFiles, err := raptorq.GetIDFiles(ctx, decData, ic, max) - if err != nil { - return nil, nil, errors.Errorf("get ids: %w", err) - } - - if err := utils.EqualStrList(gotIDs, ids); err != nil { - return nil, nil, errors.Errorf("IDs don't match: %w", err) - } - - return file, idFiles, nil -} diff --git a/supernode/services/common/service.go b/supernode/services/common/service.go index 5366cb96..bb90d9a6 100644 --- a/supernode/services/common/service.go +++ b/supernode/services/common/service.go @@ -21,8 +21,6 @@ type SuperNodeServiceInterface interface { // SuperNodeService common "class" for Services type SuperNodeService struct { *task.Worker - // *files.Storage - P2PClient p2p.Client } diff --git a/supernode/services/common/status.go b/supernode/services/common/status.go index 53af3527..22b63b7a 100644 --- a/supernode/services/common/status.go +++ b/supernode/services/common/status.go @@ -3,7 +3,6 @@ package common // List of task statuses. const ( StatusTaskStarted Status = iota - // Mode StatusPrimaryMode StatusSecondaryMode @@ -11,77 +10,15 @@ const ( // Process StatusConnected - StatusImageProbed - StatusAssetUploaded - StatusImageAndThumbnailCoordinateUploaded - StatusRegistrationFeeCalculated - StatusFileDecoded - - // Error - StatusErrorInvalidBurnTxID - StatusRequestTooLate - StatusNftRegGettingFailed - StatusNftRegDecodingFailed - StatusNftRegTicketInvalid - StatusListTradeTicketsFailed - StatusTradeTicketsNotFound - StatusTradeTicketMismatched - StatusTimestampVerificationFailed - StatusTimestampInvalid - StatusRQServiceConnectionFailed - StatusSymbolFileNotFound - StatusSymbolFileInvalid - StatusSymbolNotFound - StatusSymbolMismatched - StatusSymbolsNotEnough - StatusFileDecodingFailed - StatusFileReadingFailed - StatusFileMismatched - StatusFileEmpty - StatusKeyNotFound - StatusFileRestoreFailed - StatusFileExists - // Final StatusTaskCanceled StatusTaskCompleted ) var statusNames = map[Status]string{ - StatusTaskStarted: "Task started", - StatusPrimaryMode: "Primary Mode", - StatusSecondaryMode: "Secondary Mode", - StatusConnected: "Connected", - StatusImageProbed: "Image Probed", - StatusAssetUploaded: "Asset Uploaded", - StatusImageAndThumbnailCoordinateUploaded: "Imaged And Thumbnail Coordinate Uploaded", - StatusRegistrationFeeCalculated: "Registration Fee Caculated", - StatusFileDecoded: "File Decoded", - StatusErrorInvalidBurnTxID: "Error Invalid Burn TxID", - StatusRequestTooLate: "Request too late", - StatusNftRegGettingFailed: "NFT registered getting failed", - StatusNftRegDecodingFailed: "NFT registered decoding failed", - StatusNftRegTicketInvalid: "NFT registered ticket invalid", - StatusListTradeTicketsFailed: "Could not get available trade tickets", - StatusTradeTicketsNotFound: "Trade tickets not found", - StatusTradeTicketMismatched: "Trade ticket mismatched", - StatusTimestampVerificationFailed: "Could not verify timestamp", - StatusTimestampInvalid: "Timestamp invalid", - StatusRQServiceConnectionFailed: "RQ Service connection failed", - StatusSymbolFileNotFound: "Symbol file not found", - StatusSymbolFileInvalid: "Symbol file invalid", - StatusSymbolNotFound: "Symbol not found", - StatusSymbolMismatched: "Symbol mismatched", - StatusSymbolsNotEnough: "Symbols not enough", - StatusFileDecodingFailed: "File decoding failed", - StatusFileReadingFailed: "File reading failed", - StatusFileEmpty: "File empty", - StatusFileMismatched: "File mismatched", - StatusKeyNotFound: "Key not found", - StatusFileExists: "File hash exists", - StatusFileRestoreFailed: "File restore failed", - StatusTaskCanceled: "Task Canceled", - StatusTaskCompleted: "Task Completed", + StatusTaskStarted: "Task started", + StatusTaskCanceled: "Task Canceled", + StatusTaskCompleted: "Task Completed", } // Status represents status of the task @@ -101,17 +38,7 @@ func (status Status) IsFinal() bool { // IsFailure returns true if the task failed due to an error func (status Status) IsFailure() bool { - return status == StatusTaskCanceled || status == StatusRequestTooLate || - status == StatusNftRegGettingFailed || status == StatusNftRegDecodingFailed || - status == StatusNftRegTicketInvalid || status == StatusListTradeTicketsFailed || - status == StatusTradeTicketsNotFound || status == StatusTradeTicketMismatched || - status == StatusTimestampVerificationFailed || status == StatusTimestampInvalid || - status == StatusRQServiceConnectionFailed || status == StatusSymbolFileNotFound || - status == StatusSymbolFileInvalid || status == StatusSymbolNotFound || - status == StatusSymbolMismatched || status == StatusSymbolsNotEnough || - status == StatusFileDecodingFailed || status == StatusFileReadingFailed || - status == StatusFileEmpty || status == StatusFileMismatched || - status == StatusKeyNotFound || status == StatusFileRestoreFailed || status == StatusFileExists + return status == StatusTaskCanceled } // StatusNames returns a sorted list of status names. diff --git a/supernode/services/common/status_test.go b/supernode/services/common/status_test.go deleted file mode 100644 index 3f6de1be..00000000 --- a/supernode/services/common/status_test.go +++ /dev/null @@ -1,350 +0,0 @@ -package common - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestStatusNames(t *testing.T) { - t.Parallel() - - testCases := []struct { - expectedStatues []Status - }{ - { - expectedStatues: []Status{ - StatusTaskStarted, - StatusPrimaryMode, - StatusSecondaryMode, - - // Process - StatusConnected, - StatusImageProbed, - StatusAssetUploaded, - StatusImageAndThumbnailCoordinateUploaded, - StatusRegistrationFeeCalculated, - StatusFileDecoded, - - // Error - StatusErrorInvalidBurnTxID, - StatusRequestTooLate, - StatusNftRegGettingFailed, - StatusNftRegDecodingFailed, - StatusNftRegTicketInvalid, - StatusListTradeTicketsFailed, - StatusTradeTicketsNotFound, - StatusTradeTicketMismatched, - StatusTimestampVerificationFailed, - StatusTimestampInvalid, - StatusRQServiceConnectionFailed, - StatusSymbolFileNotFound, - StatusSymbolFileInvalid, - StatusSymbolNotFound, - StatusSymbolMismatched, - StatusSymbolsNotEnough, - StatusFileDecodingFailed, - StatusFileReadingFailed, - StatusFileMismatched, - StatusFileEmpty, - StatusKeyNotFound, - StatusFileRestoreFailed, - StatusFileExists, - - // Final - StatusTaskCanceled, - StatusTaskCompleted, - }, - }, - } - - for i, testCase := range testCases { - testCase := testCase - - t.Run(fmt.Sprintf("testCase:%d", i), func(t *testing.T) { - t.Parallel() - - var expectedNames []string - for _, status := range testCase.expectedStatues { - expectedNames = append(expectedNames, StatusNames()[status]) - } - - assert.Equal(t, expectedNames, StatusNames()) - }) - - } -} - -func TestStatusString(t *testing.T) { - t.Parallel() - - testCases := []struct { - status Status - expectedValue string - }{ - { - status: StatusTaskStarted, - expectedValue: StatusNames()[StatusTaskStarted], - }, { - status: StatusFileDecoded, - expectedValue: StatusNames()[StatusFileDecoded], - }, { - status: StatusRequestTooLate, - expectedValue: StatusNames()[StatusRequestTooLate], - }, { - status: StatusNftRegGettingFailed, - expectedValue: StatusNames()[StatusNftRegGettingFailed], - }, { - status: StatusNftRegDecodingFailed, - expectedValue: StatusNames()[StatusNftRegDecodingFailed], - }, { - status: StatusNftRegTicketInvalid, - expectedValue: StatusNames()[StatusNftRegTicketInvalid], - }, { - status: StatusListTradeTicketsFailed, - expectedValue: StatusNames()[StatusListTradeTicketsFailed], - }, { - status: StatusTradeTicketsNotFound, - expectedValue: StatusNames()[StatusTradeTicketsNotFound], - }, { - status: StatusTradeTicketMismatched, - expectedValue: StatusNames()[StatusTradeTicketMismatched], - }, { - status: StatusTimestampVerificationFailed, - expectedValue: StatusNames()[StatusTimestampVerificationFailed], - }, { - status: StatusTimestampInvalid, - expectedValue: StatusNames()[StatusTimestampInvalid], - }, { - status: StatusRQServiceConnectionFailed, - expectedValue: StatusNames()[StatusRQServiceConnectionFailed], - }, { - status: StatusSymbolFileNotFound, - expectedValue: StatusNames()[StatusSymbolFileNotFound], - }, { - status: StatusSymbolFileInvalid, - expectedValue: StatusNames()[StatusSymbolFileInvalid], - }, { - status: StatusSymbolNotFound, - expectedValue: StatusNames()[StatusSymbolNotFound], - }, { - status: StatusSymbolMismatched, - expectedValue: StatusNames()[StatusSymbolMismatched], - }, { - status: StatusSymbolsNotEnough, - expectedValue: StatusNames()[StatusSymbolsNotEnough], - }, { - status: StatusFileDecodingFailed, - expectedValue: StatusNames()[StatusFileDecodingFailed], - }, { - status: StatusFileReadingFailed, - expectedValue: StatusNames()[StatusFileReadingFailed], - }, { - status: StatusFileMismatched, - expectedValue: StatusNames()[StatusFileMismatched], - }, { - status: StatusFileEmpty, - expectedValue: StatusNames()[StatusFileEmpty], - }, { - status: StatusTaskCanceled, - expectedValue: StatusNames()[StatusTaskCanceled], - }, { - status: StatusTaskCompleted, - expectedValue: StatusNames()[StatusTaskCompleted], - }, - } - - for _, testCase := range testCases { - testCase := testCase - - t.Run(fmt.Sprintf("status:%v/value:%s", testCase.status, testCase.expectedValue), func(t *testing.T) { - t.Parallel() - - value := testCase.status.String() - assert.Equal(t, testCase.expectedValue, value) - }) - } -} - -func TestStatusIsFinal(t *testing.T) { - t.Parallel() - - testCases := []struct { - status Status - expectedValue bool - }{ - { - status: StatusTaskStarted, - expectedValue: false, - }, { - status: StatusFileDecoded, - expectedValue: false, - }, { - status: StatusRequestTooLate, - expectedValue: false, - }, { - status: StatusNftRegGettingFailed, - expectedValue: false, - }, { - status: StatusNftRegDecodingFailed, - expectedValue: false, - }, { - status: StatusNftRegTicketInvalid, - expectedValue: false, - }, { - status: StatusListTradeTicketsFailed, - expectedValue: false, - }, { - status: StatusTradeTicketsNotFound, - expectedValue: false, - }, { - status: StatusTradeTicketMismatched, - expectedValue: false, - }, { - status: StatusTimestampVerificationFailed, - expectedValue: false, - }, { - status: StatusTimestampInvalid, - expectedValue: false, - }, { - status: StatusRQServiceConnectionFailed, - expectedValue: false, - }, { - status: StatusSymbolFileNotFound, - expectedValue: false, - }, { - status: StatusSymbolFileInvalid, - expectedValue: false, - }, { - status: StatusSymbolNotFound, - expectedValue: false, - }, { - status: StatusSymbolMismatched, - expectedValue: false, - }, { - status: StatusSymbolsNotEnough, - expectedValue: false, - }, { - status: StatusFileDecodingFailed, - expectedValue: false, - }, { - status: StatusFileReadingFailed, - expectedValue: false, - }, { - status: StatusFileMismatched, - expectedValue: false, - }, { - status: StatusFileEmpty, - expectedValue: false, - }, { - status: StatusTaskCanceled, - expectedValue: true, - }, { - status: StatusTaskCompleted, - expectedValue: true, - }, - } - - for _, testCase := range testCases { - testCase := testCase - - t.Run(fmt.Sprintf("status:%v/value:%v", testCase.status, testCase.expectedValue), func(t *testing.T) { - t.Parallel() - - value := testCase.status.IsFinal() - assert.Equal(t, testCase.expectedValue, value) - }) - } -} - -func TestStatusIsFailure(t *testing.T) { - t.Parallel() - - testCases := []struct { - status Status - expectedValue bool - }{ - { - status: StatusTaskStarted, - expectedValue: false, - }, { - status: StatusFileDecoded, - expectedValue: false, - }, { - status: StatusRequestTooLate, - expectedValue: true, - }, { - status: StatusNftRegGettingFailed, - expectedValue: true, - }, { - status: StatusNftRegDecodingFailed, - expectedValue: true, - }, { - status: StatusNftRegTicketInvalid, - expectedValue: true, - }, { - status: StatusListTradeTicketsFailed, - expectedValue: true, - }, { - status: StatusTradeTicketsNotFound, - expectedValue: true, - }, { - status: StatusTradeTicketMismatched, - expectedValue: true, - }, { - status: StatusTimestampVerificationFailed, - expectedValue: true, - }, { - status: StatusTimestampInvalid, - expectedValue: true, - }, { - status: StatusRQServiceConnectionFailed, - expectedValue: true, - }, { - status: StatusSymbolFileNotFound, - expectedValue: true, - }, { - status: StatusSymbolFileInvalid, - expectedValue: true, - }, { - status: StatusSymbolNotFound, - expectedValue: true, - }, { - status: StatusSymbolMismatched, - expectedValue: true, - }, { - status: StatusSymbolsNotEnough, - expectedValue: true, - }, { - status: StatusFileDecodingFailed, - expectedValue: true, - }, { - status: StatusFileReadingFailed, - expectedValue: true, - }, { - status: StatusFileMismatched, - expectedValue: true, - }, { - status: StatusFileEmpty, - expectedValue: true, - }, { - status: StatusTaskCanceled, - expectedValue: true, - }, { - status: StatusTaskCompleted, - expectedValue: false, - }, - } - - for _, testCase := range testCases { - testCase := testCase - - t.Run(fmt.Sprintf("status:%v/value:%v", testCase.status, testCase.expectedValue), func(t *testing.T) { - t.Parallel() - - value := testCase.status.IsFailure() - assert.Equal(t, testCase.expectedValue, value) - }) - } -} diff --git a/supernode/services/common/storage_handler.go b/supernode/services/common/storage_handler.go index 49fb7189..7ad1a4da 100644 --- a/supernode/services/common/storage_handler.go +++ b/supernode/services/common/storage_handler.go @@ -3,14 +3,17 @@ package common import ( "context" "fmt" + "io/fs" "math" + "math/rand/v2" + "path/filepath" "sort" + "strings" "time" "github.com/LumeraProtocol/supernode/p2p" "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" - rqnode "github.com/LumeraProtocol/supernode/pkg/raptorq" "github.com/LumeraProtocol/supernode/pkg/storage/files" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/pkg/utils" @@ -25,9 +28,6 @@ const ( // StorageHandler provides common logic for RQ and P2P operations type StorageHandler struct { P2PClient p2p.Client - RqClient rqnode.ClientInterface - - rqAddress string rqDir string TaskID string @@ -38,13 +38,9 @@ type StorageHandler struct { } // NewStorageHandler creates instance of StorageHandler -func NewStorageHandler(p2p p2p.Client, rq rqnode.ClientInterface, - rqAddress string, rqDir string, store rqstore.Store) *StorageHandler { - +func NewStorageHandler(p2p p2p.Client, rqDir string, store rqstore.Store) *StorageHandler { return &StorageHandler{ P2PClient: p2p, - RqClient: rq, - rqAddress: rqAddress, rqDir: rqDir, store: store, semaphore: make(chan struct{}, concurrency), @@ -77,298 +73,99 @@ func (h *StorageHandler) StoreBatch(ctx context.Context, list [][]byte, typ int) return h.P2PClient.StoreBatch(ctx, list, typ, taskID) } -func (h *StorageHandler) StoreRaptorQSymbolsIntoP2P(ctx context.Context, taskID string) error { - h.semaphore <- struct{}{} // Acquire slot - defer func() { - <-h.semaphore // Release the semaphore slot - }() - - dir, err := h.store.GetDirectoryByTxID(taskID) - if err != nil { - return fmt.Errorf("error fetching symbols dir from rq DB: %w", err) +// StoreRaptorQSymbolsIntoP2P stores RaptorQ symbols into P2P +// It first records the directory in the database, then gathers all symbol paths +// under the specified directory. If the number of keys exceeds a certain threshold, +// it randomly samples a percentage of them. Finally, it streams the symbols in +// fixed-size batches to the P2P network. +func (h *StorageHandler) StoreRaptorQSymbolsIntoP2P(ctx context.Context, taskID, symbolsDir string) error { + /* record directory in DB */ + if err := h.store.StoreSymbolDirectory(taskID, symbolsDir); err != nil { + return fmt.Errorf("store symbol dir: %w", err) } - fileMap, err := utils.ReadDirFilenames(dir) + /* gather every symbol path under symbolsDir ------------------------- */ + keys, err := walkSymbolTree(symbolsDir) if err != nil { - return fmt.Errorf("error reading file-names from symbols dir: %w", err) - } - - // Create a slice of keys from keysMap and sort it - keys := make([]string, 0, len(fileMap)) - for key := range fileMap { - keys = append(keys, key) + return err } - sort.Strings(keys) // Sort the keys alphabetically + /* down-sample if we exceed the “big directory” threshold ------------- */ if len(keys) > loadSymbolsBatchSize { - // Calculate 15% of the total keys, rounded up - requiredKeysCount := int(math.Ceil(float64(len(keys)) * storeSymbolsPercent / 100)) - - // Get the subset of keys (15%) - if requiredKeysCount > len(keys) { - requiredKeysCount = len(keys) // Ensure we don't exceed the available keys count + want := int(math.Ceil(float64(len(keys)) * storeSymbolsPercent / 100)) + if want < len(keys) { + rand.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) + keys = keys[:want] } - keys = keys[:requiredKeysCount] + sort.Strings(keys) // deterministic order inside the sample } - // Iterate over sorted keys in batches - batchKeys := make(map[string][]byte) - count := 0 + log.WithContext(ctx).WithField("count", len(keys)).Info("storing RaptorQ symbols") - log.WithContext(ctx).WithField("count", len(keys)).Info("storing raptorQ symbols") - for _, key := range keys { - batchKeys[key] = nil - count++ - if count%loadSymbolsBatchSize == 0 { - if err := h.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil { - return err - } - batchKeys = make(map[string][]byte) // Reset batchKeys after storing + /* stream in fixed-size batches -------------------------------------- */ + for start := 0; start < len(keys); { + end := start + loadSymbolsBatchSize + if end > len(keys) { + end = len(keys) } - } - - // Store any remaining symbols in the last batch - if len(batchKeys) > 0 { - if err := h.storeSymbolsInP2P(ctx, dir, batchKeys); err != nil { + if err := h.storeSymbolsInP2P(ctx, taskID, symbolsDir, keys[start:end]); err != nil { return err } + start = end } if err := h.store.UpdateIsFirstBatchStored(h.TxID); err != nil { - return fmt.Errorf("error updating first batch stored flag in rq DB: %w", err) - } - log.WithContext(ctx).WithField("curr-time", time.Now().UTC()).WithField("count", len(keys)).Info("stored RaptorQ symbols") - - return nil -} - -func (h *StorageHandler) storeSymbolsInP2P(ctx context.Context, dir string, batchKeys map[string][]byte) error { - val := ctx.Value(log.TaskIDKey) - taskID := "" - if val != nil { - taskID = fmt.Sprintf("%v", val) - } - // Load symbols from the database for the current batch - log.WithContext(ctx).WithField("count", len(batchKeys)).Info("loading batch symbols") - loadedSymbols, err := utils.LoadSymbols(dir, batchKeys) - if err != nil { - return fmt.Errorf("load batch symbols from db: %w", err) - } - - log.WithContext(ctx).WithField("count", len(loadedSymbols)).Info("loaded batch symbols, storing now") - // Prepare batch for P2P storage return nil - result := make([][]byte, len(loadedSymbols)) - i := 0 - for key, value := range loadedSymbols { - result[i] = value - loadedSymbols[key] = nil // Release the reference for faster memory cleanup - i++ + return fmt.Errorf("update first-batch flag: %w", err) } - - // Store the loaded symbols in P2P - if err := h.P2PClient.StoreBatch(ctx, result, P2PDataRaptorQSymbol, taskID); err != nil { - return fmt.Errorf("store batch raptorq symbols in p2p: %w", err) - } - log.WithContext(ctx).WithField("count", len(loadedSymbols)).Info("stored batch symbols") - - if err := utils.DeleteSymbols(ctx, dir, batchKeys); err != nil { - return fmt.Errorf("delete batch symbols from db: %w", err) - } - log.WithContext(ctx).WithField("count", len(loadedSymbols)).Info("deleted batch symbols") + log.WithContext(ctx).WithField("curr-time", time.Now().UTC()).WithField("count", len(keys)). + Info("finished storing RaptorQ symbols") return nil } -/* -// GenerateRaptorQSymbols calls RQ service to produce RQ Symbols -func (h *StorageHandler) GenerateRaptorQSymbols(ctx context.Context, data []byte, name string) (map[string][]byte, error) { - if h.RqClient == nil { - log.WithContext(ctx).Warnf("RQ Server is not initialized") - return nil, errors.Errorf("RQ Server is not initialized") - } - - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = 3 * time.Minute - b.InitialInterval = 200 * time.Millisecond - - var conn rqnode.Connection - if err := backoff.Retry(backoff.Operation(func() error { - var err error - conn, err = h.RqClient.Connect(ctx, h.rqAddress) +func walkSymbolTree(root string) ([]string, error) { + var keys []string + err := filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { if err != nil { - return errors.Errorf("connect to raptorq service: %w", err) + return err // propagate I/O errors } - - return nil - }), b); err != nil { - return nil, fmt.Errorf("retry connect to raptorq service: %w", err) - } - defer func() { - if err := conn.Close(); err != nil { - log.WithContext(ctx).WithError(err).Error("error closing rq-connection") + if d.IsDir() { + return nil // skip directory nodes } - }() - - rqService := conn.RaptorQ(&rqnode.Config{ - RqFilesDir: h.rqDir, - }) - - b.Reset() - - // encodeResp := &rqnode.EncodeResponse{} - if err := backoff.Retry(backoff.Operation(func() error { - var err error - // encodeResp, err = rqService.RQEncode(ctx, data, h.TxID, h.store) - _, err = rqService.Encode(ctx, rqnode.EncodeRequest{}) // FIXME : use the resp - // encodeResp = &encodeRes - if err != nil { - return errors.Errorf("create raptorq symbol from data %s: %w", name, err) + // ignore layout json if present + if strings.EqualFold(filepath.Ext(d.Name()), ".json") { + return nil } - - return nil - }), b); err != nil { - return nil, fmt.Errorf("retry do rqencode service: %w", err) - } - - return map[string][]byte{}, nil // FIXME : return proper symbols -} - -// GetRaptorQEncodeInfo calls RQ service to get Encoding info and list of RQIDs -func (h *StorageHandler) GetRaptorQEncodeInfo(ctx context.Context, - data []byte, num uint32, hash string, pastelID string, -) (encodeInfo *rqnode.EncodeResponse, err error) { - if h.RqClient == nil { - log.WithContext(ctx).Warnf("RQ Server is not initialized") - return nil, errors.Errorf("RQ Server is not initialized") - } - - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = 3 * time.Minute - b.InitialInterval = 500 * time.Millisecond - - var conn rqnode.Connection - if err := backoff.Retry(backoff.Operation(func() error { - var err error - conn, err = h.RqClient.Connect(ctx, h.rqAddress) + rel, err := filepath.Rel(root, path) if err != nil { - return errors.Errorf("connect to raptorq service: %w", err) + return err } - + keys = append(keys, rel) // store as "block_0/filename" return nil - }), b); err != nil { - return nil, fmt.Errorf("retry connect to raptorq service: %w", err) - } - defer func() { - if err := conn.Close(); err != nil { - log.WithContext(ctx).WithError(err).Error("error closing rq-connection") - } - }() - - rqService := conn.RaptorQ(&rqnode.Config{ - RqFilesDir: h.rqDir, }) - - b.Reset() - if err := backoff.Retry(backoff.Operation(func() error { - var err error - // encodeInfo, err = rqService.EncodeMetaData(ctx, data, num, hash, pastelID) // TODO : remove - encodeI, err := rqService.EncodeMetaData(ctx, rqnode.EncodeMetadataRequest{ - Path: "", // FIXME - FilesNumber: num, - BlockHash: hash, - PastelId: pastelID, - }) - if err != nil { - return errors.Errorf("get raptorq encode info: %w", err) - } - encodeInfo = &encodeI - return nil - }), b); err != nil { - return nil, fmt.Errorf("retry do encode info on raptorq service: %w", err) + if err != nil { + return nil, fmt.Errorf("walk symbol tree: %w", err) } - - return encodeInfo, nil + return keys, nil } -// ValidateRaptorQSymbolIDs calls RQ service to get Encoding info and list of RQIDs and compares them to the similar data received from WN -func (h *StorageHandler) ValidateRaptorQSymbolIDs(ctx context.Context, - data []byte, num uint32, hash string, pastelID string, - haveData []byte) error { - - if len(haveData) == 0 { - return errors.Errorf("no symbols identifiers") - } +func (h *StorageHandler) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) error { + log.WithContext(ctx).WithField("count", len(fileKeys)).Info("loading batch symbols") - encodeInfo, err := h.GetRaptorQEncodeInfo(ctx, data, num, hash, pastelID) + symbols, err := utils.LoadSymbols(root, fileKeys) if err != nil { - return err + return fmt.Errorf("load symbols: %w", err) } - // scan return symbol Id files - filesMap, err := scanSymbolIDFiles(encodeInfo.Path) - if err != nil { - return errors.Errorf("scan symbol id files folder %s: %w", encodeInfo.Path, err) + if err := h.P2PClient.StoreBatch(ctx, symbols, P2PDataRaptorQSymbol, taskID); err != nil { + return fmt.Errorf("p2p store batch: %w", err) } + log.WithContext(ctx).WithField("count", len(symbols)).Info("stored batch symbols") - if len(filesMap) != int(num) { // FIXME : copies == num ? - return errors.Errorf("symbol id files count not match: expect %d, output %d", num, len(filesMap)) + if err := utils.DeleteSymbols(ctx, root, fileKeys); err != nil { + return fmt.Errorf("delete symbols: %w", err) } + log.WithContext(ctx).WithField("count", len(symbols)).Info("deleted batch symbols") - // pick just one file generated to compare - var gotFile, haveFile rqnode.RawSymbolIDFile - for _, v := range filesMap { - gotFile = v - break - } - - if err := json.Unmarshal(haveData, &haveFile); err != nil { - return errors.Errorf("decode raw rq file: %w", err) - } - - if err := utils.EqualStrList(gotFile.SymbolIdentifiers, haveFile.SymbolIdentifiers); err != nil { - return errors.Errorf("raptor symbol mismatched: %w", err) - } return nil } - -// scan symbol id files in "meta" folder, return map of file Ids & contents of file (as list of line) -func scanSymbolIDFiles(dirPath string) (map[string]rqnode.RawSymbolIDFile, error) { - filesMap := make(map[string]rqnode.RawSymbolIDFile) - - err := filepath.Walk(dirPath, func(path string, info fs.FileInfo, err error) error { - if err != nil { - return errors.Errorf("scan a path %s: %w", path, err) - } - - if info.IsDir() { - // TODO - compare it to root - return nil - } - - fileID := filepath.Base(path) - - configFile, err := os.Open(path) - if err != nil { - return errors.Errorf("opening file: %s - err: %w", path, err) - } - defer configFile.Close() - - file := rqnode.RawSymbolIDFile{} - jsonParser := json.NewDecoder(configFile) - if err = jsonParser.Decode(&file); err != nil { - return errors.Errorf("parsing file: %s - err: %w", path, err) - } - - filesMap[fileID] = file - - return nil - }) - - if err != nil { - return nil, err - } - - return filesMap, nil -} - -*/ diff --git a/tests/integration/securegrpc/secure_connection_test.go b/tests/integration/securegrpc/secure_connection_test.go index b7604307..ae4d7eaf 100644 --- a/tests/integration/securegrpc/secure_connection_test.go +++ b/tests/integration/securegrpc/secure_connection_test.go @@ -8,9 +8,9 @@ import ( "fmt" "net" "os" + "regexp" "testing" "time" - "regexp" "github.com/stretchr/testify/require" "google.golang.org/grpc/grpclog" @@ -21,9 +21,9 @@ import ( pb "github.com/LumeraProtocol/supernode/gen/supernode/tests/integration/securegrpc" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" - "github.com/LumeraProtocol/supernode/pkg/testutil" "github.com/LumeraProtocol/supernode/pkg/net/grpc/client" "github.com/LumeraProtocol/supernode/pkg/net/grpc/server" + "github.com/LumeraProtocol/supernode/pkg/testutil" ) func waitForServerReady(address string, timeout time.Duration) error { @@ -32,7 +32,6 @@ func waitForServerReady(address string, timeout time.Duration) error { conn, err := net.Dial("tcp", address) if err == nil { conn.Close() - fmt.Println("Server is ready and accepting connections.") return nil } time.Sleep(100 * time.Millisecond) @@ -48,7 +47,7 @@ func (s *TestServiceImpl) TestMethod(ctx context.Context, req *pb.TestRequest) ( // request is "Hello Lumera Server! I'm [TestClient]!" re := regexp.MustCompile(`\[(.*?)\]`) matches := re.FindStringSubmatch(req.Message) - + clientName := "Unknown Client" if len(matches) > 1 { clientName = matches[1] From 9a5eb7618b00d061d7271e11704eaac463b9c13d Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Fri, 2 May 2025 05:53:43 +0500 Subject: [PATCH 2/2] layout file --- pkg/codec/raptorq.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/codec/raptorq.go b/pkg/codec/raptorq.go index 4c744f4f..f6cac657 100644 --- a/pkg/codec/raptorq.go +++ b/pkg/codec/raptorq.go @@ -57,7 +57,7 @@ func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeRespons return EncodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err) } - _, err = processor.EncodeFile(tmpPath, symbolsDir, blockSize) + resp, err := processor.EncodeFile(tmpPath, symbolsDir, blockSize) if err != nil { os.Remove(tmpPath) return EncodeResponse{}, fmt.Errorf("raptorq encode: %w", err) @@ -67,17 +67,15 @@ func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeRespons _ = os.Remove(tmpPath) /* ---------- 4. read the layout JSON ---------- */ - - layoutPath := filepath.Join(symbolsDir, "_raptorq_layout.json") - layoutData, err := os.ReadFile(layoutPath) + layoutData, err := os.ReadFile(resp.LayoutFilePath) if err != nil { - return EncodeResponse{}, fmt.Errorf("read layout %s: %w", layoutPath, err) + return EncodeResponse{}, fmt.Errorf("read layout %s: %w", resp.LayoutFilePath, err) } - var resp EncodeResponse + var encodeResp EncodeResponse if err := json.Unmarshal(layoutData, &resp); err != nil { return EncodeResponse{}, fmt.Errorf("unmarshal layout: %w", err) } - return resp, nil + return encodeResp, nil }