diff --git a/go.mod b/go.mod index f4047d77..568e2d87 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,12 @@ module github.com/LumeraProtocol/supernode go 1.24.0 require ( + + github.com/LumeraProtocol/dd-service/gen v0.0.0-20250305185425-22977769a449 + github.com/LumeraProtocol/lumera v0.4.3 + github.com/LumeraProtocol/rq-service/gen v0.0.0-20250305185258-cf252902b897 + cosmossdk.io/api v0.7.6 - github.com/LumeraProtocol/lumera v0.4.2 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cenkalti/backoff/v4 v4.3.0 github.com/cosmos/btcutil v1.0.5 @@ -33,6 +37,7 @@ require ( ) require ( + cosmossdk.io/api v0.7.6 // indirect cosmossdk.io/collections v0.4.0 // indirect cosmossdk.io/core v0.11.1 // indirect cosmossdk.io/depinject v1.1.0 // indirect diff --git a/go.sum b/go.sum index 7b8e02be..fd1d4bfe 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,12 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3 github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v0.4.2 h1:yW7mwoYiBCcFLFNs9AgmaLc0DVkir95NGFtR2j/VYsw= -github.com/LumeraProtocol/lumera v0.4.2/go.mod h1:MRqVY+f8edEBkDvpr4z2nJpglp3Qj1OUvjeWvrvIUSM= +github.com/LumeraProtocol/dd-service/gen v0.0.0-20250305185425-22977769a449 h1:VwXwh/bNCNmj8OgIyM+LfqMCqqSGjRtQv983HiYCKcE= +github.com/LumeraProtocol/dd-service/gen v0.0.0-20250305185425-22977769a449/go.mod h1:/ieXOhfSDyTbcyB1GFmKr4t2b0YGYn0O8ikMNBMnOMA= +github.com/LumeraProtocol/lumera v0.4.3 h1:q/FuT+JOLIpYdlunczRUr6K85r9Sn0lKvGltSrj4r6s= +github.com/LumeraProtocol/lumera v0.4.3/go.mod h1:MRqVY+f8edEBkDvpr4z2nJpglp3Qj1OUvjeWvrvIUSM= +github.com/LumeraProtocol/rq-service/gen v0.0.0-20250305185258-cf252902b897 h1:sxqhMpcQm8KjDFvhs6yg3Vyv9gt9uxBnfpZRewxAFos= +github.com/LumeraProtocol/rq-service/gen v0.0.0-20250305185258-cf252902b897/go.mod h1:+b6pn5XADYaATzzaKRZtCeIyYW2845v34gZ8NaPtPgI= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index 96330598..06d97e52 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -11,7 +11,6 @@ import ( "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/pkg/lumera" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" ) @@ -77,7 +76,7 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes } nodes = append(nodes, &Node{ - ID: []byte(lumeraAddress.Identity), + ID: []byte(lumeraAddress.Identity), IP: lumeraAddress.Host, Port: lumeraAddress.Port, }) @@ -88,6 +87,7 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes return nil } +// ConfigureBootstrapNodes connects with lumera client & gets p2p boostrap ip & port // ConfigureBootstrapNodes connects with lumera client & gets p2p boostrap ip & port func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { if bootstrapNodes != "" { @@ -100,53 +100,77 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string } selfAddress = fmt.Sprintf("%s:%d", selfAddress, s.options.Port) - get := func(ctx context.Context, f func(context.Context) (lumera.SuperNodeAddressInfos, error)) ([]*Node, error) { - mns, err := f(ctx) + var boostrapNodes []*Node + + if s.options.LumeraClient != nil { + // Get the latest block to determine height + latestBlockResp, err := s.options.LumeraClient.Node().GetLatestBlock(ctx) + if err != nil { + return fmt.Errorf("failed to get latest block: %w", err) + } + + // Get the block height + blockHeight := uint64(latestBlockResp.SdkBlock.Header.Height) + + // Get top supernodes for this block + supernodeResp, err := s.options.LumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) if err != nil { - return []*Node{}, err + return fmt.Errorf("failed to get top supernodes: %w", err) } mapNodes := map[string]*Node{} - for _, mn := range mns { - node, err := s.parseNode(mn.ExtP2P, selfAddress) + + for _, supernode := range supernodeResp.Supernodes { + // Find the latest IP address (with highest block height) + var latestIP string + var maxHeight int64 = -1 + + for _, ipHistory := range supernode.PrevIpAddresses { + if ipHistory.Height > maxHeight { + maxHeight = ipHistory.Height + latestIP = ipHistory.Address + } + } + + if latestIP == "" { + log.P2P().WithContext(ctx). + WithField("supernode", supernode.SupernodeAccount). + Warn("No valid IP address found for supernode") + continue + } + + // Parse the node from the IP address + node, err := s.parseNode(latestIP, selfAddress) if err != nil { - log.P2P().WithContext(ctx).WithError(err).WithField("extP2P", mn.ExtP2P).Warn("Skip Bad Boostrap Address") + log.P2P().WithContext(ctx).WithError(err). + WithField("address", latestIP). + WithField("supernode", supernode.SupernodeAccount). + Warn("Skip Bad Bootstrap Address") continue } - mapNodes[mn.ExtP2P] = node + // Store the supernode account as the node ID + node.ID = []byte(supernode.SupernodeAccount) + mapNodes[latestIP] = node } - nodes := []*Node{} + // Convert the map to a slice for _, node := range mapNodes { - nodes = append(nodes, node) + boostrapNodes = append(boostrapNodes, node) } - - return nodes, nil } - var boostrapNodes []*Node - if s.options.LumeraNetwork != nil { - boostrapNodes, err := get(ctx, s.options.LumeraNetwork.MasterNodesExtra) - if err != nil { - return fmt.Errorf("masternodesTop failed: %s", err) - } else if len(boostrapNodes) == 0 { - boostrapNodes, err = get(ctx, s.options.LumeraNetwork.MasterNodesTop) - if err != nil { - return fmt.Errorf("masternodesExtra failed: %s", err) - } else if len(boostrapNodes) == 0 { - log.P2P().WithContext(ctx).Error("unable to fetch bootstrap ip. Missing extP2P") - - return nil - } - } + if len(boostrapNodes) == 0 { + log.P2P().WithContext(ctx).Error("unable to fetch bootstrap IP addresses. No valid supernodes found.") + return nil } for _, node := range boostrapNodes { log.P2P().WithContext(ctx).WithFields(log.Fields{ "bootstap_ip": node.IP, "bootstrap_port": node.Port, - }).Info("adding p2p bootstap node") + "node_id": string(node.ID), + }).Info("adding p2p bootstrap node") } s.options.BootstrapNodes = append(s.options.BootstrapNodes, boostrapNodes...) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 0b0a138b..96ff5083 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -12,33 +12,34 @@ import ( "github.com/btcsuite/btcutil/base58" "github.com/cenkalti/backoff/v4" + "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" + "github.com/LumeraProtocol/supernode/pkg/lumera" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" "github.com/LumeraProtocol/supernode/pkg/storage" "github.com/LumeraProtocol/supernode/pkg/storage/memory" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/LumeraProtocol/supernode/pkg/lumera" ) const ( - defaultNetworkPort uint16 = 4445 - defaultNetworkAddr = "0.0.0.0" - defaultRefreshTime = time.Second * 3600 - defaultPingTime = time.Second * 10 - defaultCleanupInterval = time.Minute * 2 - defaultDisabledKeyExpirationInterval = time.Minute * 30 - defaultRedundantDataCleanupInterval = 12 * time.Hour - defaultDeleteDataInterval = 11 * time.Hour - delKeysCountThreshold = 10 - lowSpaceThreshold = 50 // GB - batchStoreSize = 2500 - storeSameSymbolsBatchConcurrency = 1 - storeSymbolsBatchConcurrency = 2.0 - minimumDataStoreSuccessRate = 75.0 + defaultNetworkPort uint16 = 4445 + defaultNetworkAddr = "0.0.0.0" + defaultRefreshTime = time.Second * 3600 + defaultPingTime = time.Second * 10 + defaultCleanupInterval = time.Minute * 2 + defaultDisabledKeyExpirationInterval = time.Minute * 30 + defaultRedundantDataCleanupInterval = 12 * time.Hour + defaultDeleteDataInterval = 11 * time.Hour + delKeysCountThreshold = 10 + lowSpaceThreshold = 50 // GB + batchStoreSize = 2500 + storeSameSymbolsBatchConcurrency = 1 + storeSymbolsBatchConcurrency = 2.0 + minimumDataStoreSuccessRate = 75.0 maxIterations = 4 ) @@ -74,13 +75,16 @@ type Options struct { // node there is no way to connect to the network BootstrapNodes []*Node - LumeraClient *lumera.Client + // Lumera client for interacting with the blockchain + LumeraClient lumera.Client - LumeraNetwork *lumera.LumeraNetwork + // Keyring for credentials + Keyring keyring.Keyring ExternalIP string } +// NewDHT returns a new DHT node // NewDHT returns a new DHT node func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Options, rqstore rqstore.Store) (*DHT, error) { // validate the options, if it's invalid, set them to default value @@ -107,13 +111,15 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Opti s.externalIP = options.ExternalIP } - kr := options.LumeraClient.GetKeyring() - if kr == nil { - return nil, fmt.Errorf("keyring is not initialized in lumera client context") + // Check that keyring is provided + if options.Keyring == nil { + return nil, fmt.Errorf("keyring is required but not provided") } + + // Initialize client credentials with the provided keyring clientCreds, err := ltc.NewClientCreds(<c.ClientOptions{ CommonOptions: ltc.CommonOptions{ - Keyring: kr, + Keyring: options.Keyring, LocalIdentity: string(options.ID), PeerType: securekeyx.Supernode, }, diff --git a/p2p/p2p.go b/p2p/p2p.go index 43392eed..dcb99d02 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -6,17 +6,17 @@ import ( "fmt" "time" + "github.com/LumeraProtocol/supernode/p2p/kademlia" "github.com/LumeraProtocol/supernode/p2p/kademlia/store/cloud.go" "github.com/LumeraProtocol/supernode/p2p/kademlia/store/meta" - + "github.com/LumeraProtocol/supernode/p2p/kademlia/store/sqlite" "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" + "github.com/LumeraProtocol/supernode/pkg/lumera" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/LumeraProtocol/supernode/p2p/kademlia" - "github.com/LumeraProtocol/supernode/p2p/kademlia/store/sqlite" - "github.com/LumeraProtocol/supernode/pkg/lumera" "github.com/btcsuite/btcutil/base58" + "github.com/cosmos/cosmos-sdk/crypto/keyring" ) const ( @@ -45,7 +45,8 @@ type p2p struct { dht *kademlia.DHT // the kademlia network config *Config // the service configuration running bool // if the kademlia network is ready - lumeraClient *lumera.Client + lumeraClient lumera.Client + keyring keyring.Keyring // Add the keyring field rqstore rqstore.Store } @@ -231,7 +232,8 @@ func (s *p2p) NClosestNodesWithIncludingNodeList(ctx context.Context, n int, key func (s *p2p) configure(ctx context.Context) error { // new the queries storage kadOpts := &kademlia.Options{ - LumeraClient: s.lumeraClient, + LumeraClient: s.lumeraClient, + Keyring: s.keyring, // Pass the keyring BootstrapNodes: []*kademlia.Node{}, IP: s.config.ListenAddress, Port: s.config.Port, @@ -259,7 +261,7 @@ func (s *p2p) configure(ctx context.Context) error { } // New returns a new p2p instance. -func New(ctx context.Context, config *Config, lumeraClient *lumera.Client, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) { +func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) { store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst) if err != nil { return nil, errors.Errorf("new kademlia store: %w", err) @@ -275,6 +277,7 @@ func New(ctx context.Context, config *Config, lumeraClient *lumera.Client, rqsto metaStore: meta, config: config, lumeraClient: lumeraClient, + keyring: kr, // Store the keyring rqstore: rqstore, }, nil } diff --git a/pkg/lumera/modules/action/impl.go b/pkg/lumera/modules/action/impl.go index 507028d9..f706a2ce 100644 --- a/pkg/lumera/modules/action/impl.go +++ b/pkg/lumera/modules/action/impl.go @@ -36,14 +36,15 @@ func (m *module) GetAction(ctx context.Context, actionID string) (*types.QueryGe return resp, nil } -//// GetActionFee calculates fee for processing data with given size -//func (m *module) GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) { -// resp, err := m.client.GetActionFee(ctx, &types.QueryGetActionFeeRequest{ -// DataSize: dataSize, -// }) -// if err != nil { -// return nil, fmt.Errorf("failed to get action fee: %w", err) -// } -// -// return resp, nil -//} + +// GetActionFee calculates fee for processing data with given size +func (m *module) GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) { + resp, err := m.client.GetActionFee(ctx, &types.QueryGetActionFeeRequest{ + DataSize: dataSize, + }) + if err != nil { + return nil, fmt.Errorf("failed to get action fee: %w", err) + } + + return resp, nil +} diff --git a/pkg/lumera/modules/action/interface.go b/pkg/lumera/modules/action/interface.go index 2aa2c7d7..c38c4467 100644 --- a/pkg/lumera/modules/action/interface.go +++ b/pkg/lumera/modules/action/interface.go @@ -10,7 +10,9 @@ import ( // Module defines the interface for interacting with the action module type Module interface { GetAction(ctx context.Context, actionID string) (*types.QueryGetActionResponse, error) - //GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) + + GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) + } // NewModule creates a new Action module client diff --git a/pkg/testutil/lumera.go b/pkg/testutil/lumera.go new file mode 100644 index 00000000..cdffd6da --- /dev/null +++ b/pkg/testutil/lumera.go @@ -0,0 +1,163 @@ +package testutil + +import ( + "context" + + cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" + + "github.com/LumeraProtocol/lumera/x/action/types" + supernodeTypes "github.com/LumeraProtocol/lumera/x/supernode/types" + "github.com/LumeraProtocol/supernode/pkg/lumera" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/node" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/tx" +) + +// MockLumeraClient implements the lumera.Client interface for testing purposes +type MockLumeraClient struct { + actionMod *MockActionModule + supernodeMod *MockSupernodeModule + txMod *MockTxModule + nodeMod *MockNodeModule + kr keyring.Keyring + addresses []string // Store node addresses for testing +} + +// NewMockLumeraClient creates a new mock Lumera client for testing +func NewMockLumeraClient(kr keyring.Keyring, addresses []string) (lumera.Client, error) { + actionMod := &MockActionModule{} + supernodeMod := &MockSupernodeModule{addresses: addresses} + txMod := &MockTxModule{} + nodeMod := &MockNodeModule{} + + return &MockLumeraClient{ + actionMod: actionMod, + supernodeMod: supernodeMod, + txMod: txMod, + nodeMod: nodeMod, + kr: kr, + addresses: addresses, + }, nil +} + +// Action returns the Action module client +func (c *MockLumeraClient) Action() action.Module { + return c.actionMod +} + +// SuperNode returns the SuperNode module client +func (c *MockLumeraClient) SuperNode() supernode.Module { + return c.supernodeMod +} + +// Tx returns the Transaction module client +func (c *MockLumeraClient) Tx() tx.Module { + return c.txMod +} + +// Node returns the Node module client +func (c *MockLumeraClient) Node() node.Module { + return c.nodeMod +} + +// Close closes all connections +func (c *MockLumeraClient) Close() error { + return nil +} + +// MockActionModule implements the action.Module interface for testing +type MockActionModule struct{} + +func (m *MockActionModule) GetAction(ctx context.Context, actionID string) (*types.QueryGetActionResponse, error) { + return &types.QueryGetActionResponse{}, nil +} + +func (m *MockActionModule) GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) { + return &types.QueryGetActionFeeResponse{}, nil +} + +// MockSupernodeModule implements the supernode.Module interface for testing +type MockSupernodeModule struct { + addresses []string +} + +func (m *MockSupernodeModule) GetTopSuperNodesForBlock(ctx context.Context, blockHeight uint64) (*supernodeTypes.QueryGetTopSuperNodesForBlockResponse, error) { + // Create supernodes with the actual node addresses supplied in the test + supernodes := make([]*supernodeTypes.SuperNode, 0, len(m.addresses)) + + for i, addr := range m.addresses { + if i >= 2 { // Only use first couple for bootstrap + break + } + + supernode := &supernodeTypes.SuperNode{ + SupernodeAccount: addr, // Use the real account address for testing + PrevIpAddresses: []*supernodeTypes.IPAddressHistory{ + { + Address: "127.0.0.1:900" + string('0'+i), + Height: 10, + }, + }, + } + supernodes = append(supernodes, supernode) + } + + return &supernodeTypes.QueryGetTopSuperNodesForBlockResponse{ + Supernodes: supernodes, + }, nil +} + +func (m *MockSupernodeModule) GetSuperNode(ctx context.Context, address string) (*supernodeTypes.QueryGetSuperNodeResponse, error) { + return &supernodeTypes.QueryGetSuperNodeResponse{}, nil +} + +// MockTxModule implements the tx.Module interface for testing +type MockTxModule struct{} + +func (m *MockTxModule) BroadcastTx(ctx context.Context, txBytes []byte, mode sdktx.BroadcastMode) (*sdktx.BroadcastTxResponse, error) { + return &sdktx.BroadcastTxResponse{}, nil +} + +func (m *MockTxModule) SimulateTx(ctx context.Context, txBytes []byte) (*sdktx.SimulateResponse, error) { + return &sdktx.SimulateResponse{}, nil +} + +func (m *MockTxModule) GetTx(ctx context.Context, hash string) (*sdktx.GetTxResponse, error) { + return &sdktx.GetTxResponse{}, nil +} + +// MockNodeModule implements the node.Module interface for testing +type MockNodeModule struct{} + +func (m *MockNodeModule) GetLatestBlock(ctx context.Context) (*cmtservice.GetLatestBlockResponse, error) { + return &cmtservice.GetLatestBlockResponse{ + SdkBlock: &cmtservice.Block{ + Header: cmtservice.Header{ + Height: 100, + }, + }, + }, nil +} + +func (m *MockNodeModule) GetBlockByHeight(ctx context.Context, height int64) (*cmtservice.GetBlockByHeightResponse, error) { + return &cmtservice.GetBlockByHeightResponse{}, nil +} + +func (m *MockNodeModule) GetNodeInfo(ctx context.Context) (*cmtservice.GetNodeInfoResponse, error) { + return &cmtservice.GetNodeInfoResponse{}, nil +} + +func (m *MockNodeModule) GetSyncing(ctx context.Context) (*cmtservice.GetSyncingResponse, error) { + return &cmtservice.GetSyncingResponse{}, nil +} + +func (m *MockNodeModule) GetLatestValidatorSet(ctx context.Context) (*cmtservice.GetLatestValidatorSetResponse, error) { + return &cmtservice.GetLatestValidatorSetResponse{}, nil +} + +func (m *MockNodeModule) GetValidatorSetByHeight(ctx context.Context, height int64) (*cmtservice.GetValidatorSetByHeightResponse, error) { + return &cmtservice.GetValidatorSetByHeightResponse{}, nil +} diff --git a/supernode/cmd/.gitkeep b/supernode/cmd/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/supernode/cmd/root.go b/supernode/cmd/root.go new file mode 100644 index 00000000..aaea44ee --- /dev/null +++ b/supernode/cmd/root.go @@ -0,0 +1,31 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" +) + +var ( + cfgFile string +) + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "supernode", + Short: "Lumera Supernode for processing Sense and Cascade actions", + Long: `Lumera Supernode processes Sense and Cascade actions on the Lumera network`, +} + +// Execute adds all child commands to the root command and sets flags appropriately +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "config.yml", "config file") +} diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go new file mode 100644 index 00000000..b4f094ad --- /dev/null +++ b/supernode/cmd/start.go @@ -0,0 +1,83 @@ +package cmd + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/supernode/config" + "github.com/spf13/cobra" +) + +// startCmd represents the start command +var startCmd = &cobra.Command{ + Use: "start", + Short: "Start the supernode", + Run: func(cmd *cobra.Command, args []string) { + // Initialize logging + logtrace.Setup("supernode", "dev", slog.LevelInfo) + + // Create context with correlation ID for tracing + ctx := logtrace.CtxWithCorrelationID(context.Background(), "supernode-start") + + // Create context that can be canceled on shutdown + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Load configuration + cfg, err := config.LoadConfig(cfgFile) + if err != nil { + logtrace.Error(ctx, "Failed to load configuration", logtrace.Fields{ + "error": err.Error(), + "config_file": cfgFile, + }) + os.Exit(1) + } + + // Initialize and start the supernode + supernode, err := NewSupernode(ctx, cfg) + if err != nil { + logtrace.Error(ctx, "Failed to initialize supernode", logtrace.Fields{ + "error": err.Error(), + }) + os.Exit(1) + } + + // Start the supernode + if err := supernode.Start(ctx); err != nil { + logtrace.Error(ctx, "Failed to start supernode", logtrace.Fields{ + "error": err.Error(), + }) + os.Exit(1) + } + + logtrace.Info(ctx, "Supernode started", logtrace.Fields{ + "supernode_id": cfg.SupernodeID, + "listen_address": cfg.P2P.ListenAddress, + "port": cfg.P2P.Port, + }) + + // Wait for shutdown signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + logtrace.Info(ctx, "Shutdown signal received", logtrace.Fields{}) + + // Stop the supernode + if err := supernode.Stop(ctx); err != nil { + logtrace.Error(ctx, "Error stopping supernode", logtrace.Fields{ + "error": err.Error(), + }) + } + + logtrace.Info(ctx, "Supernode shutdown complete", logtrace.Fields{}) + }, +} + +func init() { + rootCmd.AddCommand(startCmd) +} diff --git a/supernode/cmd/supernode.go b/supernode/cmd/supernode.go new file mode 100644 index 00000000..d276cb07 --- /dev/null +++ b/supernode/cmd/supernode.go @@ -0,0 +1,225 @@ +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/LumeraProtocol/supernode/p2p" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/pkg/lumera" + "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" + "github.com/LumeraProtocol/supernode/supernode/config" + "github.com/cosmos/cosmos-sdk/crypto/keyring" +) + +// Supernode represents a supernode in the Lumera network +type Supernode struct { + config *config.Config + lumeraClient lumera.Client + p2pService p2p.P2P + keyring keyring.Keyring + rqStore rqstore.Store +} + +// NewSupernode creates a new supernode instance +func NewSupernode(ctx context.Context, config *config.Config) (*Supernode, error) { + if config == nil { + return nil, fmt.Errorf("config is nil") + } + + // Initialize keyring + kr, err := initKeyring(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to initialize keyring: %w", err) + } + + // Initialize Lumera client + lumeraClient, err := initLumeraClient(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to initialize Lumera client: %w", err) + } + + // Initialize RaptorQ store for Cascade processing + rqStore, err := initRQStore(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to initialize RaptorQ store: %w", err) + } + + // Create the supernode instance + supernode := &Supernode{ + config: config, + lumeraClient: lumeraClient, + keyring: kr, + rqStore: rqStore, + } + + return supernode, nil +} + +// Start starts all supernode services +func (s *Supernode) Start(ctx context.Context) error { + // Initialize p2p service + p2pConfig := &p2p.Config{ + ListenAddress: s.config.P2P.ListenAddress, + Port: s.config.P2P.Port, + DataDir: s.config.P2P.DataDir, + BootstrapNodes: s.config.P2P.BootstrapNodes, + ExternalIP: s.config.P2P.ExternalIP, + ID: s.config.SupernodeID, + } + + logtrace.Info(ctx, "Initializing P2P service", logtrace.Fields{ + "listen_address": p2pConfig.ListenAddress, + "port": p2pConfig.Port, + "data_dir": p2pConfig.DataDir, + }) + + p2pService, err := p2p.New(ctx, p2pConfig, s.lumeraClient, s.keyring, s.rqStore, nil, nil) + if err != nil { + return fmt.Errorf("failed to initialize p2p service: %w", err) + } + s.p2pService = p2pService + + // Run the p2p service + logtrace.Info(ctx, "Starting P2P service", logtrace.Fields{}) + if err := s.p2pService.Run(ctx); err != nil { + return fmt.Errorf("p2p service error: %w", err) + } + + return nil +} + +// Stop stops all supernode services +func (s *Supernode) Stop(ctx context.Context) error { + // Close the Lumera client connection + if s.lumeraClient != nil { + logtrace.Info(ctx, "Closing Lumera client", logtrace.Fields{}) + if err := s.lumeraClient.Close(); err != nil { + logtrace.Error(ctx, "Error closing Lumera client", logtrace.Fields{ + "error": err.Error(), + }) + } + } + + return nil +} + +// initKeyring initializes the keyring based on configuration +func initKeyring(ctx context.Context, config *config.Config) (keyring.Keyring, error) { + if config == nil { + return nil, fmt.Errorf("config is nil") + } + + // Set default directory if not provided + keyringDir := "./keys" + if config.Keyring.Dir != "" { + keyringDir = config.Keyring.Dir + } + + // Set default backend if not provided + backend := keyring.BackendFile + if config.Keyring.Backend != "" { + switch config.Keyring.Backend { + case "file": + backend = keyring.BackendFile + case "os": + backend = keyring.BackendOS + case "memory": + backend = keyring.BackendMemory + default: + logtrace.Warn(ctx, "Unsupported keyring backend, using file backend", logtrace.Fields{ + "backend": config.Keyring.Backend, + }) + } + } + + // Create the keyring directory if it doesn't exist + if err := os.MkdirAll(keyringDir, 0700); err != nil { + return nil, fmt.Errorf("failed to create keyring directory: %w", err) + } + + logtrace.Info(ctx, "Initializing keyring", logtrace.Fields{ + "backend": backend, + "directory": keyringDir, + }) + + // Initialize the keyring + kr, err := keyring.New( + "lumera", + backend, + keyringDir, + os.Stdin, + nil, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize keyring: %w", err) + } + + return kr, nil +} + +// initLumeraClient initializes the Lumera client based on configuration +func initLumeraClient(ctx context.Context, config *config.Config) (lumera.Client, error) { + if config == nil { + return nil, fmt.Errorf("config is nil") + } + + // Set default values if not provided + grpcAddr := "localhost:9090" + if config.Lumera.GRPCAddr != "" { + grpcAddr = config.Lumera.GRPCAddr + } + + chainID := "lumera" + if config.Lumera.ChainID != "" { + chainID = config.Lumera.ChainID + } + + timeout := 10 + if config.Lumera.Timeout > 0 { + timeout = config.Lumera.Timeout + } + + logtrace.Info(ctx, "Initializing Lumera client", logtrace.Fields{ + "grpc_addr": grpcAddr, + "chain_id": chainID, + "timeout": timeout, + }) + + return lumera.NewClient( + ctx, + lumera.WithGRPCAddr(grpcAddr), + lumera.WithChainID(chainID), + lumera.WithTimeout(timeout), + ) +} + +// initRQStore initializes the RaptorQ store for Cascade processing +func initRQStore(ctx context.Context, config *config.Config) (rqstore.Store, error) { + if config == nil { + return nil, fmt.Errorf("config is nil") + } + + // Set default directory if not provided + dataDir := "./data/p2p" + if config.P2P.DataDir != "" { + dataDir = config.P2P.DataDir + } + + // Create RaptorQ store directory if it doesn't exist + rqDir := dataDir + "/rq" + if err := os.MkdirAll(rqDir, 0700); err != nil { + return nil, fmt.Errorf("failed to create RQ store directory: %w", err) + } + + // Create the SQLite file path + rqStoreFile := rqDir + "/rqstore.db" + + logtrace.Info(ctx, "Initializing RaptorQ store", logtrace.Fields{ + "file_path": rqStoreFile, + }) + + // Initialize RaptorQ store with SQLite + return rqstore.NewSQLiteRQStore(rqStoreFile) +} diff --git a/supernode/conf/.gitkeep b/supernode/conf/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/supernode/config.yml b/supernode/config.yml new file mode 100644 index 00000000..4015a89c --- /dev/null +++ b/supernode/config.yml @@ -0,0 +1,17 @@ +supernode_id: "cosmos12xasxwxw" # Unique identifier for this supernode +keyring: + backend: "file" # Can be "file", "os", "memory", etc. + dir: "./keys" + password: "keyring-password" # Optional, can be omitted for interactive prompting + +p2p: + listen_address: "0.0.0.0" + port: 4445 + data_dir: "./data/p2p" + bootstrap_nodes: "" # Comma-separated list of bootstrap nodes for testing (optional) + external_ip: "" # External IP for testing (optional) + +lumera: + grpc_addr: "localhost:9090" + chain_id: "lumera" + timeout: 10 # in seconds \ No newline at end of file diff --git a/supernode/config/config.go b/supernode/config/config.go new file mode 100644 index 00000000..b743cbb5 --- /dev/null +++ b/supernode/config/config.go @@ -0,0 +1,142 @@ +package config + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "gopkg.in/yaml.v3" +) + +// Config represents the YAML configuration structure +type Config struct { + SupernodeID string `yaml:"supernode_id"` + + Keyring struct { + Backend string `yaml:"backend"` + Dir string `yaml:"dir"` + Password string `yaml:"password"` + } `yaml:"keyring"` + + P2P struct { + ListenAddress string `yaml:"listen_address"` + Port uint16 `yaml:"port"` + DataDir string `yaml:"data_dir"` + BootstrapNodes string `yaml:"bootstrap_nodes"` + ExternalIP string `yaml:"external_ip"` + } `yaml:"p2p"` + + Lumera struct { + GRPCAddr string `yaml:"grpc_addr"` + ChainID string `yaml:"chain_id"` + Timeout int `yaml:"timeout"` + } `yaml:"lumera"` +} + +// LoadConfig loads the configuration from a file +func LoadConfig(filename string) (*Config, error) { + ctx := context.Background() + + // Check if config file exists + absPath, err := filepath.Abs(filename) + if err != nil { + return nil, fmt.Errorf("error getting absolute path for config file: %w", err) + } + + logtrace.Info(ctx, "Loading configuration", logtrace.Fields{ + "path": absPath, + }) + + if _, err := os.Stat(absPath); os.IsNotExist(err) { + return nil, fmt.Errorf("config file %s does not exist", absPath) + } + + data, err := os.ReadFile(absPath) + if err != nil { + return nil, fmt.Errorf("error reading config file: %w", err) + } + + var config Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("error parsing config file: %w", err) + } + + // Set default values if not provided + if config.SupernodeID == "" { + return nil, fmt.Errorf("supernode_id is required in config file") + } + + // Set defaults for P2P + if config.P2P.ListenAddress == "" { + config.P2P.ListenAddress = "0.0.0.0" + logtrace.Info(ctx, "Using default P2P listen address", logtrace.Fields{ + "address": config.P2P.ListenAddress, + }) + } + + if config.P2P.Port == 0 { + config.P2P.Port = 4445 + logtrace.Info(ctx, "Using default P2P port", logtrace.Fields{ + "port": config.P2P.Port, + }) + } + + if config.P2P.DataDir == "" { + config.P2P.DataDir = "./data/p2p" + logtrace.Info(ctx, "Using default P2P data directory", logtrace.Fields{ + "dir": config.P2P.DataDir, + }) + } + + // Create data directory if it doesn't exist + if err := os.MkdirAll(config.P2P.DataDir, 0700); err != nil { + return nil, fmt.Errorf("failed to create P2P data directory: %w", err) + } + + // Set defaults for Keyring + if config.Keyring.Backend == "" { + config.Keyring.Backend = "file" + logtrace.Info(ctx, "Using default keyring backend", logtrace.Fields{ + "backend": config.Keyring.Backend, + }) + } + + if config.Keyring.Dir == "" { + config.Keyring.Dir = "./keys" + logtrace.Info(ctx, "Using default keyring directory", logtrace.Fields{ + "dir": config.Keyring.Dir, + }) + } + + // Create keyring directory if it doesn't exist + if err := os.MkdirAll(config.Keyring.Dir, 0700); err != nil { + return nil, fmt.Errorf("failed to create keyring directory: %w", err) + } + + // Set defaults for Lumera + if config.Lumera.GRPCAddr == "" { + config.Lumera.GRPCAddr = "localhost:9090" + logtrace.Info(ctx, "Using default Lumera gRPC address", logtrace.Fields{ + "address": config.Lumera.GRPCAddr, + }) + } + + if config.Lumera.ChainID == "" { + config.Lumera.ChainID = "lumera" + logtrace.Info(ctx, "Using default Lumera chain ID", logtrace.Fields{ + "chain_id": config.Lumera.ChainID, + }) + } + + if config.Lumera.Timeout <= 0 { + config.Lumera.Timeout = 10 + logtrace.Info(ctx, "Using default Lumera timeout", logtrace.Fields{ + "timeout": config.Lumera.Timeout, + }) + } + + logtrace.Info(ctx, "Configuration loaded successfully", logtrace.Fields{}) + return &config, nil +} diff --git a/supernode/main.go b/supernode/main.go new file mode 100644 index 00000000..8638380e --- /dev/null +++ b/supernode/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/LumeraProtocol/supernode/supernode/cmd" +) + +func main() { + cmd.Execute() +} diff --git a/tests/integration/p2p/p2p_integration_test.go b/tests/integration/p2p/p2p_integration_test.go index 46d6f392..9e8dde2e 100644 --- a/tests/integration/p2p/p2p_integration_test.go +++ b/tests/integration/p2p/p2p_integration_test.go @@ -1,5 +1,3 @@ -//go:build integration - package integration import ( @@ -18,12 +16,11 @@ import ( "github.com/LumeraProtocol/supernode/p2p" "github.com/LumeraProtocol/supernode/p2p/kademlia" - "github.com/LumeraProtocol/supernode/pkg/lumera" + ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" + "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/pkg/testutil" "github.com/LumeraProtocol/supernode/pkg/utils" - ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" - "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" ) func TestP2PBasicIntegration(t *testing.T) { @@ -147,7 +144,7 @@ func SetupTestP2PNodes(t *testing.T, ctx context.Context) ([]p2p.Client, []*rqst var rqStores []*rqstore.SQLiteRQStore kr := testutil.CreateTestKeyring() - + // Create test accounts accountNames := make([]string, 0) numP2PNodes := kademlia.Alpha + 1 @@ -161,22 +158,16 @@ func SetupTestP2PNodes(t *testing.T, ctx context.Context) ([]p2p.Client, []*rqst for i := 0; i < numP2PNodes; i++ { nodeConfigs = append(nodeConfigs, ltc.LumeraAddress{ Identity: accountAddresses[i], - Host: "127.0.0.1", - Port: uint16(9000+i), + Host: "127.0.0.1", + Port: uint16(9000 + i), }) } // Create and start nodes for i, config := range nodeConfigs { - tClient, err := lumera.NewTendermintClient( - lumera.WithKeyring(kr), - ) + mockClient, err := testutil.NewMockLumeraClient(kr, accountAddresses) require.NoError(t, err, "failed to create tendermint client") - // cast to lumera.Client - lumeraClient, ok := tClient.(*lumera.Client) - require.True(t, ok, "failed to cast to lumera.Client") - // Create data directory for the node dataDir := fmt.Sprintf("./data/node%d", i) err = os.MkdirAll(dataDir, 0755) @@ -189,11 +180,11 @@ func SetupTestP2PNodes(t *testing.T, ctx context.Context) ([]p2p.Client, []*rqst } p2pConfig := &p2p.Config{ - ListenAddress: config.Host, - Port: config.Port, - DataDir: dataDir, - ID: config.Identity, - BootstrapNodes: strings.Join(bootstrapAddresses, ","), + ListenAddress: config.Host, + Port: config.Port, + DataDir: dataDir, + ID: config.Identity, + BootstrapNodes: strings.Join(bootstrapAddresses, ","), } // Initialize SQLite RQ store for each node @@ -205,7 +196,7 @@ func SetupTestP2PNodes(t *testing.T, ctx context.Context) ([]p2p.Client, []*rqst require.NoError(t, err, "failed to create rqstore for node %d: %v", i, err) rqStores = append(rqStores, rqStore) - service, err := p2p.New(ctx, p2pConfig, lumeraClient, rqStore, nil, nil) + service, err := p2p.New(ctx, p2pConfig, mockClient, kr, rqStore, nil, nil) require.NoError(t, err, "failed to create p2p service for node %d: %v", i, err) // Start P2P service