From 5178c603599a806b88539f452efebf2b798e81de Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Wed, 30 Apr 2025 23:14:58 +0500 Subject: [PATCH 1/8] Add finaliza action --- gen/lumera/action/types/codec.go | 26 ++ pkg/lumera/client.go | 19 + pkg/lumera/config.go | 6 +- pkg/lumera/interface.go | 2 + pkg/lumera/modules/action/tx/impl.go | 141 ------ pkg/lumera/modules/action/tx/interface.go | 22 - pkg/lumera/modules/action_msg/impl.go | 302 +++++++++++++ pkg/lumera/modules/action_msg/interface.go | 36 ++ pkg/lumera/options.go | 7 + supernode/cmd/start.go | 2 +- supernode/cmd/supernode.go | 3 +- supernode/services/cascade/upload.go | 4 + tests/system/broadcast_tx_test.go | 491 +++++++++++++++++++++ tests/system/e2e_cascade_test.go | 12 +- 14 files changed, 901 insertions(+), 172 deletions(-) create mode 100644 gen/lumera/action/types/codec.go delete mode 100644 pkg/lumera/modules/action/tx/impl.go delete mode 100644 pkg/lumera/modules/action/tx/interface.go create mode 100644 pkg/lumera/modules/action_msg/impl.go create mode 100644 pkg/lumera/modules/action_msg/interface.go create mode 100644 tests/system/broadcast_tx_test.go diff --git a/gen/lumera/action/types/codec.go b/gen/lumera/action/types/codec.go new file mode 100644 index 00000000..98e1bf63 --- /dev/null +++ b/gen/lumera/action/types/codec.go @@ -0,0 +1,26 @@ +package types + +import ( + cdctypes "github.com/cosmos/cosmos-sdk/codec/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/msgservice" + // this line is used by starport scaffolding # 1 +) + +func RegisterInterfaces(registry cdctypes.InterfaceRegistry) { + registry.RegisterImplementations((*sdk.Msg)(nil), + &MsgRequestAction{}, + ) + registry.RegisterImplementations((*sdk.Msg)(nil), + &MsgFinalizeAction{}, + ) + registry.RegisterImplementations((*sdk.Msg)(nil), + &MsgApproveAction{}, + ) + // this line is used by starport scaffolding # 3 + + registry.RegisterImplementations((*sdk.Msg)(nil), + &MsgUpdateParams{}, + ) + msgservice.RegisterMsgServiceDesc(registry, &_Msg_serviceDesc) +} diff --git a/pkg/lumera/client.go b/pkg/lumera/client.go index 9f7cbf7e..89c993ed 100644 --- a/pkg/lumera/client.go +++ b/pkg/lumera/client.go @@ -4,6 +4,7 @@ import ( "context" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action_msg" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/auth" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" @@ -15,6 +16,7 @@ type lumeraClient struct { cfg *Config authMod auth.Module actionMod action.Module + actionMsgMod action_msg.Module supernodeMod supernode.Module txMod tx.Module nodeMod node.Module @@ -49,6 +51,17 @@ func newClient(ctx context.Context, opts ...Option) (Client, error) { return nil, err } + actionMsgModule, err := action_msg.NewModule( + conn.GetConn(), + cfg.keyring, + cfg.KeyName, + cfg.ChainID, + ) + if err != nil { + conn.Close() + return nil, err + } + supernodeModule, err := supernode.NewModule(conn.GetConn()) if err != nil { conn.Close() @@ -71,6 +84,7 @@ func newClient(ctx context.Context, opts ...Option) (Client, error) { cfg: cfg, authMod: authModule, actionMod: actionModule, + actionMsgMod: actionMsgModule, supernodeMod: supernodeModule, txMod: txModule, nodeMod: nodeModule, @@ -88,6 +102,11 @@ func (c *lumeraClient) Action() action.Module { return c.actionMod } +// ActionMsg returns the ActionMsg module client +func (c *lumeraClient) ActionMsg() action_msg.Module { + return c.actionMsgMod +} + // SuperNode returns the SuperNode module client func (c *lumeraClient) SuperNode() supernode.Module { return c.supernodeMod diff --git a/pkg/lumera/config.go b/pkg/lumera/config.go index 9c9208bc..246cbe84 100644 --- a/pkg/lumera/config.go +++ b/pkg/lumera/config.go @@ -15,6 +15,9 @@ type Config struct { // keyring is the keyring conf for the node sign & verify keyring keyring.Keyring + + // KeyName is the name of the key to use for signing + KeyName string } // DefaultConfig returns a default configuration @@ -22,6 +25,7 @@ func DefaultConfig() *Config { return &Config{ GRPCAddr: "localhost:9090", ChainID: "lumera", - Timeout: 10, + Timeout: 30, + KeyName: "", } } diff --git a/pkg/lumera/interface.go b/pkg/lumera/interface.go index 940d4ab1..37495129 100644 --- a/pkg/lumera/interface.go +++ b/pkg/lumera/interface.go @@ -5,6 +5,7 @@ import ( "context" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action_msg" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/auth" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" @@ -15,6 +16,7 @@ import ( type Client interface { Auth() auth.Module Action() action.Module + ActionMsg() action_msg.Module SuperNode() supernode.Module Tx() tx.Module Node() node.Module diff --git a/pkg/lumera/modules/action/tx/impl.go b/pkg/lumera/modules/action/tx/impl.go deleted file mode 100644 index 485f6f1f..00000000 --- a/pkg/lumera/modules/action/tx/impl.go +++ /dev/null @@ -1,141 +0,0 @@ -package tx - -import ( - "context" - "fmt" - "time" - - txmodule "github.com/LumeraProtocol/supernode/pkg/lumera/modules/tx" - - "github.com/LumeraProtocol/supernode/gen/lumera/action/types" - - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/tx" - cKeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" - sdk "github.com/cosmos/cosmos-sdk/types" - sdktx "github.com/cosmos/cosmos-sdk/types/tx" - "google.golang.org/grpc" -) - -// module implements the Module interface -type module struct { - client types.MsgClient - txModule txmodule.Module -} - -// newModule creates a new Action tx module client -func newModule(conn *grpc.ClientConn) (Module, error) { - if conn == nil { - return nil, fmt.Errorf("connection cannot be nil") - } - - txMod, err := txmodule.NewModule(conn) - if err != nil { - return nil, fmt.Errorf("failed to create tx module: %w", err) - } - - return &module{ - client: types.NewMsgClient(conn), - txModule: txMod, - }, nil -} - -// FinalizeAction finalizes the given action -func (m *module) FinalizeAction( - ctx context.Context, - txConfig client.TxConfig, - keyRing cKeyring.Keyring, - actionID, creator, actionType, metadata, rpcURL, chainID string, -) (*types.MsgFinalizeActionResponse, error) { - fromName := creator // assuming `creator` is the key name in keyring - info, err := keyRing.Key(fromName) - if err != nil { - return nil, fmt.Errorf("failed to get key info: %w", err) - } - fromAddr, err := info.GetAddress() - if err != nil { - return nil, fmt.Errorf("failed to get address from key: %w", err) - } - - _ = client.Context{}. - WithNodeURI(rpcURL). - WithChainID(chainID). - WithKeyring(keyRing). - WithFromName(fromName). - WithFromAddress(fromAddr). - WithTxConfig(txConfig) - - msg := &types.MsgFinalizeAction{ - ActionId: actionID, - Creator: creator, - ActionType: actionType, - Metadata: metadata, - } - - txf := tx.Factory{}. - WithChainID(chainID). - WithKeybase(keyRing). - WithTxConfig(txConfig). - WithGasAdjustment(1.2). - WithGasPrices("0.025stake") - - builder := txConfig.NewTxBuilder() - if err := builder.SetMsgs(msg); err != nil { - return nil, fmt.Errorf("failed to set message: %w", err) - } - builder.SetGasLimit(0) // zero for simulation - - gasCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - txf = txf.WithGas(0) - - simBuilder := txConfig.NewTxBuilder() - if err := simBuilder.SetMsgs(msg); err != nil { - return nil, fmt.Errorf("failed to set messages for simulation: %w", err) - } - - if err := tx.Sign(gasCtx, txf, fromName, simBuilder, true); err != nil { - return nil, fmt.Errorf("failed to sign transaction for simulation: %w", err) - } - - simBytes, err := txConfig.TxEncoder()(simBuilder.GetTx()) - if err != nil { - return nil, fmt.Errorf("failed to encode tx for simulation: %w", err) - } - - simRes, err := m.txModule.SimulateTx(gasCtx, simBytes) - if err != nil { - return nil, fmt.Errorf("failed to simulate transaction: %w", err) - } - - estimatedGas := simRes.GasInfo.GasUsed - builder.SetGasLimit(uint64(float64(estimatedGas) * txf.GasAdjustment())) - - gasPrices := txf.GasPrices() - if len(gasPrices) > 0 { - var fees sdk.Coins - for _, gasPrice := range gasPrices { - feeAmount := gasPrice.Amount.MulInt64(int64(builder.GetTx().GetGas())) - fees = append(fees, sdk.NewCoin(gasPrice.Denom, feeAmount.TruncateInt())) - } - - builder.SetFeeAmount(fees) - } - - if err := tx.Sign(ctx, txf, fromName, builder, true); err != nil { - return nil, fmt.Errorf("failed to sign transaction: %w", err) - } - - txBytes, err := txConfig.TxEncoder()(builder.GetTx()) - if err != nil { - return nil, fmt.Errorf("failed to encode tx: %w", err) - } - - _, err = m.txModule.BroadcastTx(ctx, txBytes, sdktx.BroadcastMode_BROADCAST_MODE_BLOCK) - if err != nil { - return nil, fmt.Errorf("failed to broadcast tx: %w", err) - } - - return &types.MsgFinalizeActionResponse{}, nil -} diff --git a/pkg/lumera/modules/action/tx/interface.go b/pkg/lumera/modules/action/tx/interface.go deleted file mode 100644 index a2627ae2..00000000 --- a/pkg/lumera/modules/action/tx/interface.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:generate mockgen -destination=tx_mock.go -package=tx -source=interface.go -package tx - -import ( - "context" - - "github.com/LumeraProtocol/supernode/gen/lumera/action/types" - - "github.com/cosmos/cosmos-sdk/client" - cKeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" - "google.golang.org/grpc" -) - -// Module defines the interface for interacting with the action tx module -type Module interface { - FinalizeAction(ctx context.Context, txConfig client.TxConfig, keyRing cKeyring.Keyring, actionID, creator, actionType, metadata, rpcURL, chainID string) (*types.MsgFinalizeActionResponse, error) -} - -// NewModule creates a new Action tx module client -func NewModule(conn *grpc.ClientConn) (Module, error) { - return newModule(conn) -} diff --git a/pkg/lumera/modules/action_msg/impl.go b/pkg/lumera/modules/action_msg/impl.go new file mode 100644 index 00000000..9b27afdf --- /dev/null +++ b/pkg/lumera/modules/action_msg/impl.go @@ -0,0 +1,302 @@ +package action_msg + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/LumeraProtocol/supernode/gen/lumera/action/types" + actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + txtypes "github.com/cosmos/cosmos-sdk/types/tx" + signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing" + authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + "google.golang.org/grpc" +) + +// module implements the Module interface +type module struct { + conn *grpc.ClientConn + client types.MsgClient + kr keyring.Keyring + keyName string + chainID string + nodeAddr string +} + +// newModule creates a new ActionMsg module client +func newModule(conn *grpc.ClientConn, kr keyring.Keyring, keyName string, chainID string) (Module, error) { + if conn == nil { + return nil, fmt.Errorf("connection cannot be nil") + } + + if kr == nil { + return nil, fmt.Errorf("keyring cannot be nil") + } + + if keyName == "" { + return nil, fmt.Errorf("key name cannot be empty") + } + + if chainID == "" { + return nil, fmt.Errorf("chain ID cannot be empty") + } + + // Extract node address from connection + nodeAddr := conn.Target() + + return &module{ + conn: conn, + client: types.NewMsgClient(conn), + kr: kr, + keyName: keyName, + chainID: chainID, + nodeAddr: nodeAddr, + }, nil +} + +// FinalizeCascadeAction finalizes a CASCADE action with the given parameters +func (m *module) FinalizeCascadeAction( + ctx context.Context, + actionId string, + rqIdsIds []string, + rqIdsOti []string, +) (*FinalizeActionResult, error) { + // Basic validation + if actionId == "" { + return nil, fmt.Errorf("action ID cannot be empty") + } + if len(rqIdsIds) == 0 { + return nil, fmt.Errorf("rq_ids_ids cannot be empty for cascade action") + } + if len(rqIdsOti) == 0 { + return nil, fmt.Errorf("rq_ids_oti cannot be empty for cascade action") + } + + // Get creator address from keyring + key, err := m.kr.Key(m.keyName) + if err != nil { + return nil, fmt.Errorf("failed to get key from keyring: %w", err) + } + + addr, err := key.GetAddress() + if err != nil { + return nil, fmt.Errorf("failed to get address from key: %w", err) + } + creator := addr.String() + + // Create CASCADE metadata + metadata := map[string]interface{}{ + "rq_ids_ids": rqIdsIds, + "rq_ids_oti": rqIdsOti, + } + + // Convert metadata to JSON + metadataJSON, err := json.Marshal(metadata) + if err != nil { + return nil, fmt.Errorf("failed to marshal metadata: %w", err) + } + + // Create the message + msg := &types.MsgFinalizeAction{ + Creator: creator, + ActionId: actionId, + ActionType: "CASCADE", + Metadata: string(metadataJSON), + } + + // Create encoding config + encCfg := makeEncodingConfig() + + // Get account info for signing + accInfo, err := m.getAccountInfo(ctx, creator) + if err != nil { + return nil, fmt.Errorf("failed to get account info: %w", err) + } + + // Create client context with keyring + clientCtx := client.Context{}. + WithCodec(encCfg.Codec). + WithTxConfig(encCfg.TxConfig). + WithKeyring(m.kr). + WithBroadcastMode("sync") + + // Create transaction factory + factory := tx.Factory{}. + WithTxConfig(clientCtx.TxConfig). + WithKeybase(m.kr). + WithAccountNumber(accInfo.AccountNumber). + WithSequence(accInfo.Sequence). + WithChainID(m.chainID). + WithGas(200000). + WithGasAdjustment(1.5). + WithSignMode(signingtypes.SignMode_SIGN_MODE_DIRECT) + + // Build unsigned transaction + txBuilder, err := factory.BuildUnsignedTx(msg) + if err != nil { + return nil, fmt.Errorf("failed to build unsigned tx: %w", err) + } + + // Simulate transaction to get accurate gas estimation + gasInfo, err := m.simulateTx(ctx, clientCtx, txBuilder) + if err != nil { + return nil, fmt.Errorf("failed to simulate transaction: %w", err) + } + + // Update gas amount based on simulation + factory = factory.WithGas(gasInfo + 10000) + txBuilder, err = factory.BuildUnsignedTx(msg) + if err != nil { + return nil, fmt.Errorf("failed to rebuild unsigned tx: %w", err) + } + + // Sign transaction + err = tx.Sign(ctx, factory, m.keyName, txBuilder, true) + if err != nil { + return nil, fmt.Errorf("failed to sign transaction: %w", err) + } + + // Broadcast transaction + txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + return nil, fmt.Errorf("failed to encode transaction: %w", err) + } + + resp, err := m.broadcastTx(ctx, txBytes) + if err != nil { + return &FinalizeActionResult{ + Success: false, + }, fmt.Errorf("failed to broadcast transaction: %w", err) + } + + return &FinalizeActionResult{ + TxHash: resp.TxHash, + Success: true, + }, nil +} + +// Helper function to simulate transaction and return gas used +func (m *module) simulateTx(ctx context.Context, clientCtx client.Context, txBuilder client.TxBuilder) (uint64, error) { + txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + return 0, err + } + // Create gRPC client for tx service + txClient := txtypes.NewServiceClient(m.conn) + + // Simulate transaction + simReq := &txtypes.SimulateRequest{ + TxBytes: txBytes, + } + + simRes, err := txClient.Simulate(ctx, simReq) + if err != nil { + return 0, err + } + + return simRes.GasInfo.GasUsed, nil +} + +// Helper function to broadcast transaction +func (m *module) broadcastTx(ctx context.Context, txBytes []byte) (*TxResponse, error) { + // Create gRPC client for tx service + txClient := txtypes.NewServiceClient(m.conn) + + // Broadcast transaction + req := &txtypes.BroadcastTxRequest{ + TxBytes: txBytes, + Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC, + } + + resp, err := txClient.BroadcastTx(ctx, req) + if err != nil { + return nil, err + } + + if resp.TxResponse.Code != 0 { + return nil, fmt.Errorf("transaction failed: %s", resp.TxResponse.RawLog) + } + + return &TxResponse{ + TxHash: resp.TxResponse.TxHash, + Code: resp.TxResponse.Code, + RawLog: resp.TxResponse.RawLog, + }, nil +} + +// Helper function to get account info +func (m *module) getAccountInfo(ctx context.Context, address string) (*AccountInfo, error) { + // Create gRPC client for auth service + authClient := authtypes.NewQueryClient(m.conn) + + // Query account info + req := &authtypes.QueryAccountRequest{ + Address: address, + } + + resp, err := authClient.Account(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get account info: %w", err) + } + + // Unmarshal account + var account authtypes.AccountI + err = m.getEncodingConfig().InterfaceRegistry.UnpackAny(resp.Account, &account) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal account: %w", err) + } + + return &AccountInfo{ + AccountNumber: account.GetAccountNumber(), + Sequence: account.GetSequence(), + }, nil +} + +// makeEncodingConfig creates an EncodingConfig for transaction handling +func makeEncodingConfig() EncodingConfig { + interfaceRegistry := codectypes.NewInterfaceRegistry() + cryptocodec.RegisterInterfaces(interfaceRegistry) + authtypes.RegisterInterfaces(interfaceRegistry) + actiontypes.RegisterInterfaces(interfaceRegistry) + + marshaler := codec.NewProtoCodec(interfaceRegistry) + txConfig := authtx.NewTxConfig(marshaler, authtx.DefaultSignModes) + + return EncodingConfig{ + InterfaceRegistry: interfaceRegistry, + Codec: marshaler, + TxConfig: txConfig, + } +} + +// getEncodingConfig returns the module's encoding config +func (m *module) getEncodingConfig() EncodingConfig { + return makeEncodingConfig() +} + +// EncodingConfig specifies the concrete encoding types to use +type EncodingConfig struct { + InterfaceRegistry codectypes.InterfaceRegistry + Codec codec.Codec + TxConfig client.TxConfig +} + +// AccountInfo holds account information for transaction signing +type AccountInfo struct { + AccountNumber uint64 + Sequence uint64 +} + +// TxResponse holds transaction response information +type TxResponse struct { + TxHash string + Code uint32 + RawLog string +} diff --git a/pkg/lumera/modules/action_msg/interface.go b/pkg/lumera/modules/action_msg/interface.go new file mode 100644 index 00000000..e9092378 --- /dev/null +++ b/pkg/lumera/modules/action_msg/interface.go @@ -0,0 +1,36 @@ +//go:generate mockgen -destination=action_msg_mock.go -package=action_msg -source=interface.go +package action_msg + +import ( + "context" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "google.golang.org/grpc" +) + +// FinalizeActionResult represents the result of a finalized action +type FinalizeActionResult struct { + TxHash string // Transaction hash + Success bool // Whether the transaction was successful +} + +// Module defines the interface for action messages operations +type Module interface { + // FinalizeCascadeAction finalizes a CASCADE action with the given parameters + FinalizeCascadeAction( + ctx context.Context, + actionId string, + rqIdsIds []string, + rqIdsOti []string, + ) (*FinalizeActionResult, error) +} + +// NewModule creates a new ActionMsg module client +func NewModule( + conn *grpc.ClientConn, + kr keyring.Keyring, + keyName string, + chainID string, +) (Module, error) { + return newModule(conn, kr, keyName, chainID) +} diff --git a/pkg/lumera/options.go b/pkg/lumera/options.go index 7bc5220e..c9cf6728 100644 --- a/pkg/lumera/options.go +++ b/pkg/lumera/options.go @@ -32,3 +32,10 @@ func WithKeyring(k keyring.Keyring) Option { c.keyring = k } } + +// WithKeyName sets the key name to use for signing +func WithKeyName(keyName string) Option { + return func(c *Config) { + c.KeyName = keyName + } +} diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index ea221936..1347bb3b 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -60,7 +60,7 @@ The supernode will connect to the Lumera network and begin participating in the } // Initialize Lumera client - lumeraClient, err := initLumeraClient(ctx, appConfig) + lumeraClient, err := initLumeraClient(ctx, appConfig, kr) if err != nil { return fmt.Errorf("failed to initialize Lumera client: %w", err) } diff --git a/supernode/cmd/supernode.go b/supernode/cmd/supernode.go index 552bcc4f..af8d0111 100644 --- a/supernode/cmd/supernode.go +++ b/supernode/cmd/supernode.go @@ -126,7 +126,7 @@ func (s *Supernode) Stop(ctx context.Context) error { } // initLumeraClient initializes the Lumera client based on configuration -func initLumeraClient(ctx context.Context, config *config.Config) (lumera.Client, error) { +func initLumeraClient(ctx context.Context, config *config.Config, kr keyring.Keyring) (lumera.Client, error) { if config == nil { return nil, fmt.Errorf("config is nil") } @@ -142,6 +142,7 @@ func initLumeraClient(ctx context.Context, config *config.Config) (lumera.Client lumera.WithGRPCAddr(config.LumeraClientConfig.GRPCAddr), lumera.WithChainID(config.LumeraClientConfig.ChainID), lumera.WithTimeout(config.LumeraClientConfig.Timeout), + lumera.WithKeyring(kr), ) } diff --git a/supernode/services/cascade/upload.go b/supernode/services/cascade/upload.go index 9c1d4fb4..671534ae 100644 --- a/supernode/services/cascade/upload.go +++ b/supernode/services/cascade/upload.go @@ -141,6 +141,10 @@ func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *U } logtrace.Info(ctx, "raptor-q symbols have been stored", fields) + // Message Finalize Action + + task.lumeraClient. + return &UploadInputDataResponse{ Success: true, Message: "successfully uploaded input data", diff --git a/tests/system/broadcast_tx_test.go b/tests/system/broadcast_tx_test.go new file mode 100644 index 00000000..86f9b185 --- /dev/null +++ b/tests/system/broadcast_tx_test.go @@ -0,0 +1,491 @@ +package system + +import ( + "context" + "crypto/sha3" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/LumeraProtocol/supernode/pkg/keyring" + "github.com/LumeraProtocol/supernode/pkg/lumera" + "github.com/LumeraProtocol/supernode/pkg/raptorq" + + "github.com/LumeraProtocol/supernode/sdk/action" + "github.com/LumeraProtocol/supernode/sdk/event" + "github.com/LumeraProtocol/supernode/sdk/task" + + "github.com/LumeraProtocol/supernode/gen/lumera/action/types" + sdkconfig "github.com/LumeraProtocol/supernode/sdk/config" + + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +func TestCascadeE2E(t *testing.T) { + // --------------------------------------- + // Constants and Configuration Parameters + // --------------------------------------- + + // Test account credentials - these values are consistent across test runs + const testMnemonic = "odor kiss switch swarm spell make planet bundle skate ozone path planet exclude butter atom ahead angle royal shuffle door prevent merry alter robust" + const expectedAddress = "lumera1em87kgrvgttrkvuamtetyaagjrhnu3vjy44at4" + const testKeyName = "testkey1" + const fundAmount = "1000000ulume" + + // Network and service configuration constants + const ( + raptorQHost = "localhost" // RaptorQ service host + raptorQPort = 50051 // RaptorQ service port + raptorQFilesDir = "./supernode-data/raptorq_files_test" // Directory for RaptorQ files + lumeraGRPCAddr = "localhost:9090" // Lumera blockchain GRPC address + lumeraChainID = "testing" // Lumera chain ID for testing + ) + + // Action request parameters + const ( + actionType = "CASCADE" // The action type for fountain code processing + price = "10ulume" // Price for the action in ulume tokens + ) + t.Log("Step 1: Starting all services") + + // Reset and start the blockchain + // sut.ResetChain(t) + // sut.StartChain(t) + cli := NewLumeradCLI(t, sut, true) + // --------------------------------------- + // Register Multiple Supernodes to process the request + // --------------------------------------- + t.Log("Registering multiple supernodes to process requests") + + // Helper function to register a supernode + registerSupernode := func(nodeKey string, port string) { + // Get account and validator addresses for registration + accountAddr := cli.GetKeyAddr(nodeKey) + valAddrOutput := cli.Keys("keys", "show", nodeKey, "--bech", "val", "-a") + valAddr := strings.TrimSpace(valAddrOutput) + + t.Logf("Registering supernode for %s (validator: %s, account: %s)", nodeKey, valAddr, accountAddr) + + // Register the supernode with the network + registerCmd := []string{ + "tx", "supernode", "register-supernode", + valAddr, // validator address + "localhost:" + port, // IP address with unique port + "1.0.0", // version + accountAddr, // supernode account + "--from", nodeKey, + } + + resp := cli.CustomCommand(registerCmd...) + RequireTxSuccess(t, resp) + + // Wait for transaction to be included in a block + sut.AwaitNextBlock(t) + } + + // Register three supernodes with different ports + registerSupernode("node0", "4444") + // registerSupernode("node1", "4446") + // registerSupernode("node2", "4448") + + t.Log("Successfully registered three supernodes") + + // --------------------------------------- + // Step 1: Start all required services + // --------------------------------------- + + // // Start the RaptorQ service for encoding/decoding with fountain codes + rq_cmd := StartRQService(t) + defer StopRQService(rq_cmd) // Ensure service is stopped after test + + // Start the supernode service to process cascade requests + // cmds := StartAllSupernodes(t) + // defer StopAllSupernodes(cmds) // Ensure service is stopped after test + + // --------------------------------------- + // Step 2: Set up test account and keys + // --------------------------------------- + t.Log("Step 2: Setting up test account") + + // Locate and set up path to binary and home directory + binaryPath := locateExecutable(sut.ExecBinary) + homePath := filepath.Join(WorkDir, sut.outputDir) + + // Add account key to the blockchain using the mnemonic + cmd := exec.Command( + binaryPath, + "keys", "add", testKeyName, + "--recover", + "--keyring-backend=test", + "--home", homePath, + ) + cmd.Stdin = strings.NewReader(testMnemonic + "\n") + output, err := cmd.CombinedOutput() + require.NoError(t, err, "Key recovery failed: %s", string(output)) + t.Logf("Key recovery output: %s", string(output)) + + // Create CLI helper and verify the address matches expected + recoveredAddress := cli.GetKeyAddr(testKeyName) + t.Logf("Recovered key %s with address: %s", testKeyName, recoveredAddress) + require.Equal(t, expectedAddress, recoveredAddress, "Recovered address should match expected address") + + // Fund the account with tokens for transactions + t.Logf("Funding address %s with %s", recoveredAddress, fundAmount) + cli.FundAddress(recoveredAddress, fundAmount) // ulume tokens for action fees + cli.FundAddress(recoveredAddress, "10000000stake") // stake tokens + sut.AwaitNextBlock(t) // Wait for funding transaction to be processed + + // Create an in-memory keyring for cryptographic operations + // This keyring is separate from the blockchain keyring and used for local signing + keplrKeyring, err := keyring.InitKeyring("memory", "") + require.NoError(t, err, "Failed to initialize in-memory keyring") + + // Add the same key to the in-memory keyring for consistency + record, err := keyring.RecoverAccountFromMnemonic(keplrKeyring, testKeyName, testMnemonic) + require.NoError(t, err, "Failed to recover account from mnemonic in local keyring") + + // Verify the addresses match between chain and local keyring + localAddr, err := record.GetAddress() + require.NoError(t, err, "Failed to get address from record") + require.Equal(t, expectedAddress, localAddr.String(), + "Local keyring address should match expected address") + t.Logf("Successfully recovered key in local keyring with matching address: %s", localAddr.String()) + + // Verify account has sufficient balance for transactions + balanceOutput := cli.CustomQuery("query", "bank", "balances", recoveredAddress) + t.Logf("Balance for account: %s", balanceOutput) + require.Contains(t, balanceOutput, fundAmount[:len(fundAmount)-5], + "Account should have the funded amount") + + // --------------------------------------- + // Step 3: Set up RaptorQ service and clients + // --------------------------------------- + t.Log("Step 3: Setting up RaptorQ service and initializing clients") + + // Create directory for RaptorQ files if it doesn't exist + err = os.MkdirAll(raptorQFilesDir, 0755) + require.NoError(t, err, "Failed to create RaptorQ files directory") + + // Create context with timeout for service operations + ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Second) + defer cancel() + + // Initialize RaptorQ configuration with host, port and file directory + rqConfig := raptorq.NewConfig() + rqConfig.Host = raptorQHost + rqConfig.Port = raptorQPort + rqConfig.RqFilesDir = raptorQFilesDir + + // Create RaptorQ client and establish connection + client := raptorq.NewClient() + address := fmt.Sprintf("%s:%d", raptorQHost, raptorQPort) + t.Logf("Connecting to RaptorQ server at %s", address) + connection, err := client.Connect(ctx, address) + require.NoError(t, err, "Failed to connect to RaptorQ server") + defer connection.Close() + + // Initialize Lumera blockchain client for interactions + t.Log("Initializing Lumera client") + lumeraClient, err := lumera.NewClient( + ctx, + lumera.WithGRPCAddr(lumeraGRPCAddr), + lumera.WithChainID(lumeraChainID), + ) + require.NoError(t, err, "Failed to initialize Lumera client") + + // --------------------------------------- + // Step 4: Create and prepare test file + // --------------------------------------- + t.Log("Step 4: Creating test file for RaptorQ encoding") + + // Create a test file with sample data in a temporary directory + testFileName := filepath.Join(t.TempDir(), "testfile.data") + testData := []byte("This is test data for RaptorQ encoding in the Lumera network") + err = os.WriteFile(testFileName, testData, 0644) + require.NoError(t, err, "Failed to write test file") + + // Read the file into memory for processing + file, err := os.Open(testFileName) + require.NoError(t, err, "Failed to open test file") + defer file.Close() + + // Read the entire file content into a byte slice + fileInfo, err := file.Stat() + require.NoError(t, err, "Failed to get file stats") + data := make([]byte, fileInfo.Size()) + _, err = io.ReadFull(file, data) + require.NoError(t, err, "Failed to read file contents") + t.Logf("Read %d bytes from test file", len(data)) + + // Calculate SHA3-256 hash of the file data for identification + // This hash is used in metadata and for CASCADE operation + hash := sha3.New256() + hash.Write(data) + hashBytes := hash.Sum(nil) + hashHex := fmt.Sprintf("%X", hashBytes) + t.Logf("File hash: %s", hashHex) + time.Sleep(1 * time.Minute) + // --------------------------------------- + // Step 5: Sign data and generate RaptorQ identifiers + // --------------------------------------- + t.Log("Step 5: Signing data and generating RaptorQ identifiers") + + // Sign the original file data for verification purposes + // This signature proves the data came from this account + signedData, err := keyring.SignBytes(keplrKeyring, testKeyName, data) + base64EncodedData := base64.StdEncoding.EncodeToString(signedData) + require.NoError(t, err, "Failed to sign data") + t.Logf("Signed data length: %d bytes", len(signedData)) + + // Initialize RaptorQ client for fountain code processing + rq := connection.RaptorQ(rqConfig, lumeraClient) + + // Get current block hash or use file hash as fallback + // Block hash adds randomness to the fountain code generation + // blockHash := hashHex // Default to file hash + // latestBlock, err := lumeraClient.Node().GetLatestBlock(ctx) + // if err == nil && latestBlock != nil && len(latestBlock.BlockId.Hash) > 0 { + // blockHash = fmt.Sprintf("%X", latestBlock.BlockId.Hash) + // } + // t.Logf("Using block hash: %s", blockHash) + + t.Log("Generating RQ identifiers") + genRqIdsResp, err := rq.GenRQIdentifiersFiles(ctx, raptorq.GenRQIdentifiersFilesRequest{ + + RqMax: 50, + Data: data, + CreatorSNAddress: localAddr.String(), + SignedData: base64EncodedData, + }) + require.NoError(t, err, "Failed to generate RQ identifiers") + + t.Logf("RQ identifiers generated successfully with RQ_IDs_IC: %d", genRqIdsResp.RQIDsIc) + + // --------------------------------------- + // Step 6: Sign the RQ IDs file for verification + // --------------------------------------- + t.Log("Step 6: Signing the RQ IDs file for the action request") + + // Base64 encode the RQIDsFile for consistent transmission + // IMPORTANT: We sign the encoded version, not the raw bytes + rqIdsFileBase64 := base64.StdEncoding.EncodeToString(genRqIdsResp.RQIDsFile) + t.Logf("Base64 encoded RQ IDs file length: %d", len(rqIdsFileBase64)) + + // Sign the Base64-encoded RQ IDs file + // This critical step creates a signature that proves ownership + rqIdsSignature, err := keyring.SignBytes(keplrKeyring, testKeyName, []byte(rqIdsFileBase64)) + require.NoError(t, err, "Failed to sign Base64-encoded RQIDsFile") + + // Encode the signature itself to base64 for consistent format + rqIdsSignatureBase64 := base64.StdEncoding.EncodeToString(rqIdsSignature) + + // Format according to expected verification pattern: Base64(rq_ids).signature + // This format allows verification of both the data and the signature + signatureFormat := fmt.Sprintf("%s.%s", rqIdsFileBase64, rqIdsSignatureBase64) + t.Logf("Signature format prepared with length: %d bytes", len(signatureFormat)) + + // --------------------------------------- + // Step 7: Create metadata and submit action request + // --------------------------------------- + t.Log("Step 7: Creating metadata and submitting action request") + + // Create CascadeMetadata struct with all required fields + // This structured approach ensures all required fields are included + cascadeMetadata := types.CascadeMetadata{ + DataHash: hashHex, // Hash of the original file + FileName: filepath.Base(testFileName), // Original filename + RqIdsIc: uint64(genRqIdsResp.RQIDsIc), // Count of RQ identifiers + Signatures: signatureFormat, // Combined signature format + } + + // Marshal the struct to JSON for the blockchain transaction + metadataBytes, err := json.Marshal(cascadeMetadata) + require.NoError(t, err, "Failed to marshal CascadeMetadata to JSON") + metadata := string(metadataBytes) + + // Set expiration time 25 hours in the future (minimum is 24 hours) + // This defines how long the action request is valid + expirationTime := fmt.Sprintf("%d", time.Now().Add(25*time.Hour).Unix()) + + t.Logf("Requesting cascade action with metadata: %s", metadata) + t.Logf("Action type: %s, Price: %s, Expiration: %s", actionType, price, expirationTime) + + // Submit the action request transaction to the blockchain + // This registers the request with metadata for supernodes to process + actionRequestResp := cli.CustomCommand( + "tx", "action", "request-action", + actionType, // CASCADE action type + metadata, // JSON metadata with all required fields + price, // Price in ulume tokens + expirationTime, // Unix timestamp for expiration + "--from", testKeyName, + "--gas", "auto", + "--gas-adjustment", "1.5", + ) + + // Verify the transaction was successful + RequireTxSuccess(t, actionRequestResp) + t.Logf("Action request successful: %s", actionRequestResp) + + // Wait for transaction to be included in a block + sut.AwaitNextBlock(t) + + // Extract transaction hash from response for verification + txHash := gjson.Get(actionRequestResp, "txhash").String() + require.NotEmpty(t, txHash, "Transaction hash should not be empty") + t.Logf("Transaction hash: %s", txHash) + + // Query the transaction by hash to verify success and extract events + txResp := cli.CustomQuery("q", "tx", txHash) + t.Logf("Transaction query response: %s", txResp) + + // Verify transaction code indicates success (0 = success) + txCode := gjson.Get(txResp, "code").Int() + require.Equal(t, int64(0), txCode, "Transaction should have success code 0") + + // --------------------------------------- + // Step 8: Extract action ID and start cascade + // --------------------------------------- + t.Log("Step 8: Extracting action ID and creating cascade request") + + // Extract action ID from transaction events + // The action_id is needed to reference this specific action in operations + events := gjson.Get(txResp, "events").Array() + var actionID string + for _, event := range events { + if event.Get("type").String() == "action_registered" { + attrs := event.Get("attributes").Array() + for _, attr := range attrs { + if attr.Get("key").String() == "action_id" { + actionID = attr.Get("value").String() + break + } + } + if actionID != "" { + break + } + } + } + require.NotEmpty(t, actionID, "Action ID should not be empty") + t.Logf("Extracted action ID: %s", actionID) + + // Set up action client configuration + // This defines how to connect to network services + actionConfig := sdkconfig.Config{ + Network: struct { + DefaultSupernodePort int + LocalCosmosAddress string + }{ + DefaultSupernodePort: 4444, // Default supernode gRPC port + LocalCosmosAddress: recoveredAddress, // Account address + }, + Lumera: struct { + GRPCAddr string + ChainID string + Timeout int + }{ + GRPCAddr: lumeraGRPCAddr, + ChainID: lumeraChainID, + Timeout: 300, // 30 seconds timeout + }, + } + + // Initialize action client for cascade operations + actionClient, err := action.NewClient( + ctx, + actionConfig, + nil, // Nil logger - use default + keplrKeyring, // Use the in-memory keyring for signing + ) + require.NoError(t, err, "Failed to create action client") + + // Start cascade operation with all required parameters + // INPUTS: + // - hashHex: File hash for identification + // - actionID: The blockchain action ID + // - testFileName: Path to the original file + // - signedData: Proof of data ownership + t.Logf("Starting cascade operation with action ID: %s", actionID) + taskID, err := actionClient.StartCascade( + ctx, + hashHex, // File hash + actionID, // Action ID from the events + testFileName, // Path to the test file + base64EncodedData, // Signed data + ) + require.NoError(t, err, "Failed to start cascade operation") + require.NotEmpty(t, taskID, "Task ID should not be empty") + t.Logf("Cascade operation started with task ID: %s", taskID) + + // --------------------------------------- + // Step 9: Monitor task completion + // --------------------------------------- + + // Set up event channels for task monitoring + completionCh := make(chan bool) + errorCh := make(chan error) + + // Subscribe to task completion events + actionClient.SubscribeToEvents(ctx, event.TaskCompleted, func(ctx context.Context, e event.Event) { + if e.TaskID == taskID { + t.Logf("Task completed: %s", taskID) + completionCh <- true + } + }) + + // Subscribe to task failure events + actionClient.SubscribeToEvents(ctx, event.TaskFailed, func(ctx context.Context, e event.Event) { + if e.TaskID == taskID { + errorMsg, _ := e.Data["error"].(string) + errorCh <- fmt.Errorf("task failed: %s", errorMsg) + } + }) + + // Wait for task completion, failure, or timeout + t.Log("Waiting for cascade task to complete...") + select { + case <-completionCh: + t.Log("Cascade task completed successfully") + case err := <-errorCh: + t.Fatalf("Cascade task failed: %v", err) + case <-time.After(60 * time.Second): + t.Fatalf("Timeout waiting for cascade task to complete") + } + + // --------------------------------------- + // Step 10: Verify task completion and results + // --------------------------------------- + + // Get the task details to verify status + taskEntry, found := actionClient.GetTask(ctx, taskID) + require.True(t, found, "Task should be found") + require.Equal(t, taskEntry.Status, task.StatusCompleted, "Task should be completed") + t.Logf("Task status: %s", taskEntry.Status) + + // Additional verification based on the events in the task + eventCount := len(taskEntry.Events) + t.Logf("Task recorded %d events", eventCount) + require.Greater(t, eventCount, 0, "Task should have recorded events") + + // Check if we can find a successful supernode in the events + // This confirms the cascade operation was processed correctly + var successfulSupernode string + for _, e := range taskEntry.Events { + if e.Type == event.SupernodeSucceeded { + if addr, ok := e.Data["supernode_address"].(string); ok { + successfulSupernode = addr + break + } + } + } + require.NotEmpty(t, successfulSupernode, "Should have a successful supernode in events") + t.Logf("Cascade successfully processed by supernode: %s", successfulSupernode) +} diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 688564a5..dba1bcb7 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -70,8 +70,8 @@ func TestCascadeE2E(t *testing.T) { t.Log("Step 1: Starting all services") // Reset and start the blockchain - sut.ResetChain(t) - sut.StartChain(t) + // sut.ResetChain(t) + // sut.StartChain(t) cli := NewLumeradCLI(t, sut, true) // --------------------------------------- // Register Multiple Supernodes to process the request @@ -106,8 +106,8 @@ func TestCascadeE2E(t *testing.T) { // Register three supernodes with different ports registerSupernode("node0", "4444") - registerSupernode("node1", "4446") - registerSupernode("node2", "4448") + // registerSupernode("node1", "4446") + // registerSupernode("node2", "4448") t.Log("Successfully registered three supernodes") @@ -120,8 +120,8 @@ func TestCascadeE2E(t *testing.T) { defer StopRQService(rq_cmd) // Ensure service is stopped after test // Start the supernode service to process cascade requests - cmds := StartAllSupernodes(t) - defer StopAllSupernodes(cmds) // Ensure service is stopped after test + // cmds := StartAllSupernodes(t) + // defer StopAllSupernodes(cmds) // Ensure service is stopped after test // --------------------------------------- // Step 2: Set up test account and keys From 469fc5c6a13b2f18fa7e130d556d08af0c138071 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 03:12:45 +0500 Subject: [PATCH 2/8] Gas Fixes --- pkg/lumera/modules/action_msg/impl.go | 87 +++++++++++++++++++-------- 1 file changed, 61 insertions(+), 26 deletions(-) diff --git a/pkg/lumera/modules/action_msg/impl.go b/pkg/lumera/modules/action_msg/impl.go index 9b27afdf..3c89e143 100644 --- a/pkg/lumera/modules/action_msg/impl.go +++ b/pkg/lumera/modules/action_msg/impl.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" - "github.com/LumeraProtocol/supernode/gen/lumera/action/types" actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" @@ -13,6 +12,7 @@ import ( codectypes "github.com/cosmos/cosmos-sdk/codec/types" cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdk "github.com/cosmos/cosmos-sdk/types" txtypes "github.com/cosmos/cosmos-sdk/types/tx" signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" @@ -20,14 +20,28 @@ import ( "google.golang.org/grpc" ) +// Default gas parameters +const ( + defaultGasLimit = uint64(200000) + defaultMinGasLimit = uint64(100000) + defaultMaxGasLimit = uint64(1000000) + defaultGasAdjustment = float64(1.5) + defaultGasPadding = uint64(10000) // Added to simulated gas +) + // module implements the Module interface type module struct { - conn *grpc.ClientConn - client types.MsgClient - kr keyring.Keyring - keyName string - chainID string - nodeAddr string + conn *grpc.ClientConn + client actiontypes.MsgClient + kr keyring.Keyring + keyName string + chainID string + nodeAddr string + gasLimit uint64 // Default gas limit + minGasLimit uint64 // Minimum gas limit + maxGasLimit uint64 // Maximum gas limit + gasAdjustment float64 // Gas adjustment multiplier + gasPadding uint64 // Added to simulated gas } // newModule creates a new ActionMsg module client @@ -52,12 +66,17 @@ func newModule(conn *grpc.ClientConn, kr keyring.Keyring, keyName string, chainI nodeAddr := conn.Target() return &module{ - conn: conn, - client: types.NewMsgClient(conn), - kr: kr, - keyName: keyName, - chainID: chainID, - nodeAddr: nodeAddr, + conn: conn, + client: actiontypes.NewMsgClient(conn), + kr: kr, + keyName: keyName, + chainID: chainID, + nodeAddr: nodeAddr, + gasLimit: defaultGasLimit, + minGasLimit: defaultMinGasLimit, + maxGasLimit: defaultMaxGasLimit, + gasAdjustment: defaultGasAdjustment, + gasPadding: defaultGasPadding, }, nil } @@ -104,7 +123,7 @@ func (m *module) FinalizeCascadeAction( } // Create the message - msg := &types.MsgFinalizeAction{ + msg := &actiontypes.MsgFinalizeAction{ Creator: creator, ActionId: actionId, ActionType: "CASCADE", @@ -127,15 +146,15 @@ func (m *module) FinalizeCascadeAction( WithKeyring(m.kr). WithBroadcastMode("sync") - // Create transaction factory + // Create transaction factory with initial gas values factory := tx.Factory{}. WithTxConfig(clientCtx.TxConfig). WithKeybase(m.kr). WithAccountNumber(accInfo.AccountNumber). WithSequence(accInfo.Sequence). WithChainID(m.chainID). - WithGas(200000). - WithGasAdjustment(1.5). + WithGas(m.gasLimit). // Use default initially + WithGasAdjustment(m.gasAdjustment). // Configurable adjustment WithSignMode(signingtypes.SignMode_SIGN_MODE_DIRECT) // Build unsigned transaction @@ -145,13 +164,26 @@ func (m *module) FinalizeCascadeAction( } // Simulate transaction to get accurate gas estimation - gasInfo, err := m.simulateTx(ctx, clientCtx, txBuilder) + simulatedGas, err := m.simulateTx(ctx, clientCtx, txBuilder) if err != nil { - return nil, fmt.Errorf("failed to simulate transaction: %w", err) + return nil, fmt.Errorf("simulation failed: %v, using default gas limit of %d", err, m.gasLimit) } - // Update gas amount based on simulation - factory = factory.WithGas(gasInfo + 10000) + // Apply gas adjustment and padding to simulated gas value + adjustedGas := uint64(float64(simulatedGas) * m.gasAdjustment) + gasToUse := adjustedGas + m.gasPadding + + // Apply gas bounds + if gasToUse < m.minGasLimit { + return nil, fmt.Errorf("adjusted gas (%d) below minimum, transaction requires minimum gas limit: %d", adjustedGas+m.gasPadding, m.minGasLimit) + } else if gasToUse > m.maxGasLimit { + return nil, fmt.Errorf("adjusted gas (%d) above maximum, transaction exceeds maximum gas limit: %d", adjustedGas+m.gasPadding, m.maxGasLimit) + } + + // Update factory with calculated gas + factory = factory.WithGas(gasToUse) + + // Rebuild transaction with updated gas txBuilder, err = factory.BuildUnsignedTx(msg) if err != nil { return nil, fmt.Errorf("failed to rebuild unsigned tx: %w", err) @@ -173,6 +205,7 @@ func (m *module) FinalizeCascadeAction( if err != nil { return &FinalizeActionResult{ Success: false, + TxHash: "", // Empty when failed }, fmt.Errorf("failed to broadcast transaction: %w", err) } @@ -186,8 +219,9 @@ func (m *module) FinalizeCascadeAction( func (m *module) simulateTx(ctx context.Context, clientCtx client.Context, txBuilder client.TxBuilder) (uint64, error) { txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to encode transaction for simulation: %w", err) } + // Create gRPC client for tx service txClient := txtypes.NewServiceClient(m.conn) @@ -198,7 +232,7 @@ func (m *module) simulateTx(ctx context.Context, clientCtx client.Context, txBui simRes, err := txClient.Simulate(ctx, simReq) if err != nil { - return 0, err + return 0, fmt.Errorf("simulation failed: %w", err) } return simRes.GasInfo.GasUsed, nil @@ -217,11 +251,12 @@ func (m *module) broadcastTx(ctx context.Context, txBytes []byte) (*TxResponse, resp, err := txClient.BroadcastTx(ctx, req) if err != nil { - return nil, err + return nil, fmt.Errorf("broadcast failed: %w", err) } if resp.TxResponse.Code != 0 { - return nil, fmt.Errorf("transaction failed: %s", resp.TxResponse.RawLog) + return nil, fmt.Errorf("transaction failed (code %d): %s", + resp.TxResponse.Code, resp.TxResponse.RawLog) } return &TxResponse{ @@ -247,7 +282,7 @@ func (m *module) getAccountInfo(ctx context.Context, address string) (*AccountIn } // Unmarshal account - var account authtypes.AccountI + var account sdk.AccountI err = m.getEncodingConfig().InterfaceRegistry.UnpackAny(resp.Account, &account) if err != nil { return nil, fmt.Errorf("failed to unmarshal account: %w", err) From 43e6b7a92f7d4b4976979a427a3ed8d41ef94375 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 13:08:57 +0500 Subject: [PATCH 3/8] m --- supernode/services/cascade/upload.go | 2 - tests/system/broadcast_tx_test.go | 491 --------------------------- tests/system/e2e_cascade_test.go | 4 +- tests/system/main_test.go | 2 - tests/system/rq-utils.go | 2 - 5 files changed, 2 insertions(+), 499 deletions(-) delete mode 100644 tests/system/broadcast_tx_test.go diff --git a/supernode/services/cascade/upload.go b/supernode/services/cascade/upload.go index 671534ae..d52e46d1 100644 --- a/supernode/services/cascade/upload.go +++ b/supernode/services/cascade/upload.go @@ -143,8 +143,6 @@ func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *U // Message Finalize Action - task.lumeraClient. - return &UploadInputDataResponse{ Success: true, Message: "successfully uploaded input data", diff --git a/tests/system/broadcast_tx_test.go b/tests/system/broadcast_tx_test.go deleted file mode 100644 index 86f9b185..00000000 --- a/tests/system/broadcast_tx_test.go +++ /dev/null @@ -1,491 +0,0 @@ -package system - -import ( - "context" - "crypto/sha3" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "os" - "os/exec" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/LumeraProtocol/supernode/pkg/keyring" - "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/raptorq" - - "github.com/LumeraProtocol/supernode/sdk/action" - "github.com/LumeraProtocol/supernode/sdk/event" - "github.com/LumeraProtocol/supernode/sdk/task" - - "github.com/LumeraProtocol/supernode/gen/lumera/action/types" - sdkconfig "github.com/LumeraProtocol/supernode/sdk/config" - - "github.com/stretchr/testify/require" - "github.com/tidwall/gjson" -) - -func TestCascadeE2E(t *testing.T) { - // --------------------------------------- - // Constants and Configuration Parameters - // --------------------------------------- - - // Test account credentials - these values are consistent across test runs - const testMnemonic = "odor kiss switch swarm spell make planet bundle skate ozone path planet exclude butter atom ahead angle royal shuffle door prevent merry alter robust" - const expectedAddress = "lumera1em87kgrvgttrkvuamtetyaagjrhnu3vjy44at4" - const testKeyName = "testkey1" - const fundAmount = "1000000ulume" - - // Network and service configuration constants - const ( - raptorQHost = "localhost" // RaptorQ service host - raptorQPort = 50051 // RaptorQ service port - raptorQFilesDir = "./supernode-data/raptorq_files_test" // Directory for RaptorQ files - lumeraGRPCAddr = "localhost:9090" // Lumera blockchain GRPC address - lumeraChainID = "testing" // Lumera chain ID for testing - ) - - // Action request parameters - const ( - actionType = "CASCADE" // The action type for fountain code processing - price = "10ulume" // Price for the action in ulume tokens - ) - t.Log("Step 1: Starting all services") - - // Reset and start the blockchain - // sut.ResetChain(t) - // sut.StartChain(t) - cli := NewLumeradCLI(t, sut, true) - // --------------------------------------- - // Register Multiple Supernodes to process the request - // --------------------------------------- - t.Log("Registering multiple supernodes to process requests") - - // Helper function to register a supernode - registerSupernode := func(nodeKey string, port string) { - // Get account and validator addresses for registration - accountAddr := cli.GetKeyAddr(nodeKey) - valAddrOutput := cli.Keys("keys", "show", nodeKey, "--bech", "val", "-a") - valAddr := strings.TrimSpace(valAddrOutput) - - t.Logf("Registering supernode for %s (validator: %s, account: %s)", nodeKey, valAddr, accountAddr) - - // Register the supernode with the network - registerCmd := []string{ - "tx", "supernode", "register-supernode", - valAddr, // validator address - "localhost:" + port, // IP address with unique port - "1.0.0", // version - accountAddr, // supernode account - "--from", nodeKey, - } - - resp := cli.CustomCommand(registerCmd...) - RequireTxSuccess(t, resp) - - // Wait for transaction to be included in a block - sut.AwaitNextBlock(t) - } - - // Register three supernodes with different ports - registerSupernode("node0", "4444") - // registerSupernode("node1", "4446") - // registerSupernode("node2", "4448") - - t.Log("Successfully registered three supernodes") - - // --------------------------------------- - // Step 1: Start all required services - // --------------------------------------- - - // // Start the RaptorQ service for encoding/decoding with fountain codes - rq_cmd := StartRQService(t) - defer StopRQService(rq_cmd) // Ensure service is stopped after test - - // Start the supernode service to process cascade requests - // cmds := StartAllSupernodes(t) - // defer StopAllSupernodes(cmds) // Ensure service is stopped after test - - // --------------------------------------- - // Step 2: Set up test account and keys - // --------------------------------------- - t.Log("Step 2: Setting up test account") - - // Locate and set up path to binary and home directory - binaryPath := locateExecutable(sut.ExecBinary) - homePath := filepath.Join(WorkDir, sut.outputDir) - - // Add account key to the blockchain using the mnemonic - cmd := exec.Command( - binaryPath, - "keys", "add", testKeyName, - "--recover", - "--keyring-backend=test", - "--home", homePath, - ) - cmd.Stdin = strings.NewReader(testMnemonic + "\n") - output, err := cmd.CombinedOutput() - require.NoError(t, err, "Key recovery failed: %s", string(output)) - t.Logf("Key recovery output: %s", string(output)) - - // Create CLI helper and verify the address matches expected - recoveredAddress := cli.GetKeyAddr(testKeyName) - t.Logf("Recovered key %s with address: %s", testKeyName, recoveredAddress) - require.Equal(t, expectedAddress, recoveredAddress, "Recovered address should match expected address") - - // Fund the account with tokens for transactions - t.Logf("Funding address %s with %s", recoveredAddress, fundAmount) - cli.FundAddress(recoveredAddress, fundAmount) // ulume tokens for action fees - cli.FundAddress(recoveredAddress, "10000000stake") // stake tokens - sut.AwaitNextBlock(t) // Wait for funding transaction to be processed - - // Create an in-memory keyring for cryptographic operations - // This keyring is separate from the blockchain keyring and used for local signing - keplrKeyring, err := keyring.InitKeyring("memory", "") - require.NoError(t, err, "Failed to initialize in-memory keyring") - - // Add the same key to the in-memory keyring for consistency - record, err := keyring.RecoverAccountFromMnemonic(keplrKeyring, testKeyName, testMnemonic) - require.NoError(t, err, "Failed to recover account from mnemonic in local keyring") - - // Verify the addresses match between chain and local keyring - localAddr, err := record.GetAddress() - require.NoError(t, err, "Failed to get address from record") - require.Equal(t, expectedAddress, localAddr.String(), - "Local keyring address should match expected address") - t.Logf("Successfully recovered key in local keyring with matching address: %s", localAddr.String()) - - // Verify account has sufficient balance for transactions - balanceOutput := cli.CustomQuery("query", "bank", "balances", recoveredAddress) - t.Logf("Balance for account: %s", balanceOutput) - require.Contains(t, balanceOutput, fundAmount[:len(fundAmount)-5], - "Account should have the funded amount") - - // --------------------------------------- - // Step 3: Set up RaptorQ service and clients - // --------------------------------------- - t.Log("Step 3: Setting up RaptorQ service and initializing clients") - - // Create directory for RaptorQ files if it doesn't exist - err = os.MkdirAll(raptorQFilesDir, 0755) - require.NoError(t, err, "Failed to create RaptorQ files directory") - - // Create context with timeout for service operations - ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Second) - defer cancel() - - // Initialize RaptorQ configuration with host, port and file directory - rqConfig := raptorq.NewConfig() - rqConfig.Host = raptorQHost - rqConfig.Port = raptorQPort - rqConfig.RqFilesDir = raptorQFilesDir - - // Create RaptorQ client and establish connection - client := raptorq.NewClient() - address := fmt.Sprintf("%s:%d", raptorQHost, raptorQPort) - t.Logf("Connecting to RaptorQ server at %s", address) - connection, err := client.Connect(ctx, address) - require.NoError(t, err, "Failed to connect to RaptorQ server") - defer connection.Close() - - // Initialize Lumera blockchain client for interactions - t.Log("Initializing Lumera client") - lumeraClient, err := lumera.NewClient( - ctx, - lumera.WithGRPCAddr(lumeraGRPCAddr), - lumera.WithChainID(lumeraChainID), - ) - require.NoError(t, err, "Failed to initialize Lumera client") - - // --------------------------------------- - // Step 4: Create and prepare test file - // --------------------------------------- - t.Log("Step 4: Creating test file for RaptorQ encoding") - - // Create a test file with sample data in a temporary directory - testFileName := filepath.Join(t.TempDir(), "testfile.data") - testData := []byte("This is test data for RaptorQ encoding in the Lumera network") - err = os.WriteFile(testFileName, testData, 0644) - require.NoError(t, err, "Failed to write test file") - - // Read the file into memory for processing - file, err := os.Open(testFileName) - require.NoError(t, err, "Failed to open test file") - defer file.Close() - - // Read the entire file content into a byte slice - fileInfo, err := file.Stat() - require.NoError(t, err, "Failed to get file stats") - data := make([]byte, fileInfo.Size()) - _, err = io.ReadFull(file, data) - require.NoError(t, err, "Failed to read file contents") - t.Logf("Read %d bytes from test file", len(data)) - - // Calculate SHA3-256 hash of the file data for identification - // This hash is used in metadata and for CASCADE operation - hash := sha3.New256() - hash.Write(data) - hashBytes := hash.Sum(nil) - hashHex := fmt.Sprintf("%X", hashBytes) - t.Logf("File hash: %s", hashHex) - time.Sleep(1 * time.Minute) - // --------------------------------------- - // Step 5: Sign data and generate RaptorQ identifiers - // --------------------------------------- - t.Log("Step 5: Signing data and generating RaptorQ identifiers") - - // Sign the original file data for verification purposes - // This signature proves the data came from this account - signedData, err := keyring.SignBytes(keplrKeyring, testKeyName, data) - base64EncodedData := base64.StdEncoding.EncodeToString(signedData) - require.NoError(t, err, "Failed to sign data") - t.Logf("Signed data length: %d bytes", len(signedData)) - - // Initialize RaptorQ client for fountain code processing - rq := connection.RaptorQ(rqConfig, lumeraClient) - - // Get current block hash or use file hash as fallback - // Block hash adds randomness to the fountain code generation - // blockHash := hashHex // Default to file hash - // latestBlock, err := lumeraClient.Node().GetLatestBlock(ctx) - // if err == nil && latestBlock != nil && len(latestBlock.BlockId.Hash) > 0 { - // blockHash = fmt.Sprintf("%X", latestBlock.BlockId.Hash) - // } - // t.Logf("Using block hash: %s", blockHash) - - t.Log("Generating RQ identifiers") - genRqIdsResp, err := rq.GenRQIdentifiersFiles(ctx, raptorq.GenRQIdentifiersFilesRequest{ - - RqMax: 50, - Data: data, - CreatorSNAddress: localAddr.String(), - SignedData: base64EncodedData, - }) - require.NoError(t, err, "Failed to generate RQ identifiers") - - t.Logf("RQ identifiers generated successfully with RQ_IDs_IC: %d", genRqIdsResp.RQIDsIc) - - // --------------------------------------- - // Step 6: Sign the RQ IDs file for verification - // --------------------------------------- - t.Log("Step 6: Signing the RQ IDs file for the action request") - - // Base64 encode the RQIDsFile for consistent transmission - // IMPORTANT: We sign the encoded version, not the raw bytes - rqIdsFileBase64 := base64.StdEncoding.EncodeToString(genRqIdsResp.RQIDsFile) - t.Logf("Base64 encoded RQ IDs file length: %d", len(rqIdsFileBase64)) - - // Sign the Base64-encoded RQ IDs file - // This critical step creates a signature that proves ownership - rqIdsSignature, err := keyring.SignBytes(keplrKeyring, testKeyName, []byte(rqIdsFileBase64)) - require.NoError(t, err, "Failed to sign Base64-encoded RQIDsFile") - - // Encode the signature itself to base64 for consistent format - rqIdsSignatureBase64 := base64.StdEncoding.EncodeToString(rqIdsSignature) - - // Format according to expected verification pattern: Base64(rq_ids).signature - // This format allows verification of both the data and the signature - signatureFormat := fmt.Sprintf("%s.%s", rqIdsFileBase64, rqIdsSignatureBase64) - t.Logf("Signature format prepared with length: %d bytes", len(signatureFormat)) - - // --------------------------------------- - // Step 7: Create metadata and submit action request - // --------------------------------------- - t.Log("Step 7: Creating metadata and submitting action request") - - // Create CascadeMetadata struct with all required fields - // This structured approach ensures all required fields are included - cascadeMetadata := types.CascadeMetadata{ - DataHash: hashHex, // Hash of the original file - FileName: filepath.Base(testFileName), // Original filename - RqIdsIc: uint64(genRqIdsResp.RQIDsIc), // Count of RQ identifiers - Signatures: signatureFormat, // Combined signature format - } - - // Marshal the struct to JSON for the blockchain transaction - metadataBytes, err := json.Marshal(cascadeMetadata) - require.NoError(t, err, "Failed to marshal CascadeMetadata to JSON") - metadata := string(metadataBytes) - - // Set expiration time 25 hours in the future (minimum is 24 hours) - // This defines how long the action request is valid - expirationTime := fmt.Sprintf("%d", time.Now().Add(25*time.Hour).Unix()) - - t.Logf("Requesting cascade action with metadata: %s", metadata) - t.Logf("Action type: %s, Price: %s, Expiration: %s", actionType, price, expirationTime) - - // Submit the action request transaction to the blockchain - // This registers the request with metadata for supernodes to process - actionRequestResp := cli.CustomCommand( - "tx", "action", "request-action", - actionType, // CASCADE action type - metadata, // JSON metadata with all required fields - price, // Price in ulume tokens - expirationTime, // Unix timestamp for expiration - "--from", testKeyName, - "--gas", "auto", - "--gas-adjustment", "1.5", - ) - - // Verify the transaction was successful - RequireTxSuccess(t, actionRequestResp) - t.Logf("Action request successful: %s", actionRequestResp) - - // Wait for transaction to be included in a block - sut.AwaitNextBlock(t) - - // Extract transaction hash from response for verification - txHash := gjson.Get(actionRequestResp, "txhash").String() - require.NotEmpty(t, txHash, "Transaction hash should not be empty") - t.Logf("Transaction hash: %s", txHash) - - // Query the transaction by hash to verify success and extract events - txResp := cli.CustomQuery("q", "tx", txHash) - t.Logf("Transaction query response: %s", txResp) - - // Verify transaction code indicates success (0 = success) - txCode := gjson.Get(txResp, "code").Int() - require.Equal(t, int64(0), txCode, "Transaction should have success code 0") - - // --------------------------------------- - // Step 8: Extract action ID and start cascade - // --------------------------------------- - t.Log("Step 8: Extracting action ID and creating cascade request") - - // Extract action ID from transaction events - // The action_id is needed to reference this specific action in operations - events := gjson.Get(txResp, "events").Array() - var actionID string - for _, event := range events { - if event.Get("type").String() == "action_registered" { - attrs := event.Get("attributes").Array() - for _, attr := range attrs { - if attr.Get("key").String() == "action_id" { - actionID = attr.Get("value").String() - break - } - } - if actionID != "" { - break - } - } - } - require.NotEmpty(t, actionID, "Action ID should not be empty") - t.Logf("Extracted action ID: %s", actionID) - - // Set up action client configuration - // This defines how to connect to network services - actionConfig := sdkconfig.Config{ - Network: struct { - DefaultSupernodePort int - LocalCosmosAddress string - }{ - DefaultSupernodePort: 4444, // Default supernode gRPC port - LocalCosmosAddress: recoveredAddress, // Account address - }, - Lumera: struct { - GRPCAddr string - ChainID string - Timeout int - }{ - GRPCAddr: lumeraGRPCAddr, - ChainID: lumeraChainID, - Timeout: 300, // 30 seconds timeout - }, - } - - // Initialize action client for cascade operations - actionClient, err := action.NewClient( - ctx, - actionConfig, - nil, // Nil logger - use default - keplrKeyring, // Use the in-memory keyring for signing - ) - require.NoError(t, err, "Failed to create action client") - - // Start cascade operation with all required parameters - // INPUTS: - // - hashHex: File hash for identification - // - actionID: The blockchain action ID - // - testFileName: Path to the original file - // - signedData: Proof of data ownership - t.Logf("Starting cascade operation with action ID: %s", actionID) - taskID, err := actionClient.StartCascade( - ctx, - hashHex, // File hash - actionID, // Action ID from the events - testFileName, // Path to the test file - base64EncodedData, // Signed data - ) - require.NoError(t, err, "Failed to start cascade operation") - require.NotEmpty(t, taskID, "Task ID should not be empty") - t.Logf("Cascade operation started with task ID: %s", taskID) - - // --------------------------------------- - // Step 9: Monitor task completion - // --------------------------------------- - - // Set up event channels for task monitoring - completionCh := make(chan bool) - errorCh := make(chan error) - - // Subscribe to task completion events - actionClient.SubscribeToEvents(ctx, event.TaskCompleted, func(ctx context.Context, e event.Event) { - if e.TaskID == taskID { - t.Logf("Task completed: %s", taskID) - completionCh <- true - } - }) - - // Subscribe to task failure events - actionClient.SubscribeToEvents(ctx, event.TaskFailed, func(ctx context.Context, e event.Event) { - if e.TaskID == taskID { - errorMsg, _ := e.Data["error"].(string) - errorCh <- fmt.Errorf("task failed: %s", errorMsg) - } - }) - - // Wait for task completion, failure, or timeout - t.Log("Waiting for cascade task to complete...") - select { - case <-completionCh: - t.Log("Cascade task completed successfully") - case err := <-errorCh: - t.Fatalf("Cascade task failed: %v", err) - case <-time.After(60 * time.Second): - t.Fatalf("Timeout waiting for cascade task to complete") - } - - // --------------------------------------- - // Step 10: Verify task completion and results - // --------------------------------------- - - // Get the task details to verify status - taskEntry, found := actionClient.GetTask(ctx, taskID) - require.True(t, found, "Task should be found") - require.Equal(t, taskEntry.Status, task.StatusCompleted, "Task should be completed") - t.Logf("Task status: %s", taskEntry.Status) - - // Additional verification based on the events in the task - eventCount := len(taskEntry.Events) - t.Logf("Task recorded %d events", eventCount) - require.Greater(t, eventCount, 0, "Task should have recorded events") - - // Check if we can find a successful supernode in the events - // This confirms the cascade operation was processed correctly - var successfulSupernode string - for _, e := range taskEntry.Events { - if e.Type == event.SupernodeSucceeded { - if addr, ok := e.Data["supernode_address"].(string); ok { - successfulSupernode = addr - break - } - } - } - require.NotEmpty(t, successfulSupernode, "Should have a successful supernode in events") - t.Logf("Cascade successfully processed by supernode: %s", successfulSupernode) -} diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index dba1bcb7..437223a4 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -70,8 +70,8 @@ func TestCascadeE2E(t *testing.T) { t.Log("Step 1: Starting all services") // Reset and start the blockchain - // sut.ResetChain(t) - // sut.StartChain(t) + sut.ResetChain(t) + sut.StartChain(t) cli := NewLumeradCLI(t, sut, true) // --------------------------------------- // Register Multiple Supernodes to process the request diff --git a/tests/system/main_test.go b/tests/system/main_test.go index 14ea2e8c..67c97efa 100644 --- a/tests/system/main_test.go +++ b/tests/system/main_test.go @@ -1,5 +1,3 @@ -//go:build system_test - package system import ( diff --git a/tests/system/rq-utils.go b/tests/system/rq-utils.go index e20f0755..4fa14695 100644 --- a/tests/system/rq-utils.go +++ b/tests/system/rq-utils.go @@ -1,5 +1,3 @@ -//go:build system_test - package system import ( From 1a1c5fc5016a329e2e4d33af7ddfc9b8c79cfbad Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 14:13:28 +0500 Subject: [PATCH 4/8] fixes --- p2p/kademlia/bootstrap.go | 16 +++++++----- p2p/kademlia/dht.go | 4 +++ pkg/log/context.go | 11 +++------ pkg/net/credentials/alts/conn/register.go | 6 ++++- pkg/utils/utils.go | 30 +++++++++++------------ supernode/config.test-2.yml | 2 +- 6 files changed, 39 insertions(+), 30 deletions(-) diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index ac13ea34..8a7233ca 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -9,6 +9,7 @@ import ( "time" "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/utils" "github.com/LumeraProtocol/supernode/pkg/log" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" @@ -20,10 +21,10 @@ const ( ) func (s *DHT) skipBadBootstrapAddrs() { - skipAddress1 := fmt.Sprintf("%s:%d", "127.0.0.1", s.options.Port) - skipAddress2 := fmt.Sprintf("%s:%d", "localhost", s.options.Port) - s.cache.Set(skipAddress1, []byte("true")) - s.cache.Set(skipAddress2, []byte("true")) + //skipAddress1 := fmt.Sprintf("%s:%d", "127.0.0.1", s.options.Port) + //skipAddress2 := fmt.Sprintf("%s:%d", "localhost", s.options.Port) + //s.cache.Set(skipAddress1, []byte("true")) + //s.cache.Set(skipAddress2, []byte("true")) } func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { @@ -31,10 +32,10 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { return nil, errors.New("empty address") } - if strings.Contains(extP2P, "0.0.0.0") { + /*if strings.Contains(extP2P, "0.0.0.0") { fmt.Println("skippping node") return nil, errors.New("invalid address") - } + }*/ if extP2P == selfAddr { return nil, errors.New("self address") @@ -157,6 +158,9 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string // Convert the map to a slice for _, node := range mapNodes { + node.Port = node.Port + 1 + hID, _ := utils.Sha3256hash(node.ID) + node.HashedID = hID fmt.Println("node adding", node.String(), "hashed id", string(node.HashedID)) boostrapNodes = append(boostrapNodes, node) } diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index aeb38ab9..5003e7ff 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -148,6 +148,10 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Opti return s, nil } +func (s *DHT) NodesLen() int { + return len(s.ht.nodes()) +} + func (s *DHT) getExternalIP() (string, error) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/log/context.go b/pkg/log/context.go index 1988a994..73113514 100644 --- a/pkg/log/context.go +++ b/pkg/log/context.go @@ -2,11 +2,7 @@ package log import ( "context" - "io" - "net" - "net/http" - "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log/hooks" ) @@ -56,11 +52,12 @@ func init() { // GetExternalIPAddress returns external IP address func GetExternalIPAddress() (externalIP string, err error) { - if ip != "" { + return "localhost", nil + /*if ip != "" { return ip, nil } - resp, err := http.Get("http://ipinfo.io/ip") + resp, err := http.Get("https://api.ipify.org") if err != nil { return "", err } @@ -76,5 +73,5 @@ func GetExternalIPAddress() (externalIP string, err error) { return "", errors.Errorf("invalid IP response from %s", "ipconf.ip") } - return string(body), nil + return string(body), nil*/ } diff --git a/pkg/net/credentials/alts/conn/register.go b/pkg/net/credentials/alts/conn/register.go index 24bae62e..eb43410f 100644 --- a/pkg/net/credentials/alts/conn/register.go +++ b/pkg/net/credentials/alts/conn/register.go @@ -4,6 +4,10 @@ import ( . "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/common" ) +func init() { + RegisterALTSRecordProtocols() +} + var ( // ALTS record protocol names. ALTSRecordProtocols = make([]string, 0) @@ -32,4 +36,4 @@ func RegisterALTSRecordProtocols() { func UnregisterALTSRecordProtocols() { ALTSRecordProtocols = make([]string, 0) -} \ No newline at end of file +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 3024cbc5..1807048d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -15,7 +15,6 @@ import ( "math" "math/big" "net" - "net/http" "os" "os/exec" "path/filepath" @@ -119,24 +118,25 @@ func IsContextErr(err error) bool { // GetExternalIPAddress returns external IP address func GetExternalIPAddress() (externalIP string, err error) { + return "localhost", nil + /* + resp, err := http.Get("https://api.ipify.org") + if err != nil { + return "", err + } - resp, err := http.Get("http://ipinfo.io/ip") - if err != nil { - return "", err - } - - defer resp.Body.Close() + defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } - if net.ParseIP(string(body)) == nil { - return "", errors.Errorf("invalid IP response from %s", "ipconf.ip") - } + if net.ParseIP(string(body)) == nil { + return "", errors.Errorf("invalid IP response from %s", "ipconf.ip") + } - return string(body), nil + return string(body), nil */ } // B64Encode base64 encodes diff --git a/supernode/config.test-2.yml b/supernode/config.test-2.yml index 08bdabdd..eadea42f 100644 --- a/supernode/config.test-2.yml +++ b/supernode/config.test-2.yml @@ -2,7 +2,7 @@ # Supernode Configuration supernode: key_name: "testkey2" - identity: "lumera1ae37km54w88f783cktmpyd3fny0ycdn69ftt6e" + identity: "lumera1cf0ms9ttgdvz6zwlqfty4tjcawhuaq69p40w0c" ip_address: "0.0.0.0" port: 4446 From d616e5d5dc38e81c47547fe9f77ef528c3e2ec51 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 14:16:26 +0500 Subject: [PATCH 5/8] Add get param functions --- pkg/lumera/modules/action/impl.go | 10 ++++++++++ pkg/lumera/modules/action/interface.go | 1 + pkg/lumera/modules/supernode/impl.go | 12 +++++++++++- pkg/lumera/modules/supernode/interface.go | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/lumera/modules/action/impl.go b/pkg/lumera/modules/action/impl.go index 358aa07c..affa8b91 100644 --- a/pkg/lumera/modules/action/impl.go +++ b/pkg/lumera/modules/action/impl.go @@ -47,3 +47,13 @@ func (m *module) GetActionFee(ctx context.Context, dataSize string) (*types.Quer return resp, nil } + +// GetParams fetches the action module parameters +func (m *module) GetParams(ctx context.Context) (*types.QueryParamsResponse, error) { + resp, err := m.client.Params(ctx, &types.QueryParamsRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to get action params: %w", err) + } + + return resp, nil +} diff --git a/pkg/lumera/modules/action/interface.go b/pkg/lumera/modules/action/interface.go index 329d7889..84746f26 100644 --- a/pkg/lumera/modules/action/interface.go +++ b/pkg/lumera/modules/action/interface.go @@ -12,6 +12,7 @@ import ( type Module interface { GetAction(ctx context.Context, actionID string) (*types.QueryGetActionResponse, error) GetActionFee(ctx context.Context, dataSize string) (*types.QueryGetActionFeeResponse, error) + GetParams(ctx context.Context) (*types.QueryParamsResponse, error) } // NewModule creates a new Action module client diff --git a/pkg/lumera/modules/supernode/impl.go b/pkg/lumera/modules/supernode/impl.go index 6761ab6d..8e14b5a7 100644 --- a/pkg/lumera/modules/supernode/impl.go +++ b/pkg/lumera/modules/supernode/impl.go @@ -56,12 +56,22 @@ func (m *module) GetSupernodeBySupernodeAddress(ctx context.Context, address str SupernodeAddress: address, }) if err != nil { - fmt.Errorf("failed to get supernode: %w", err) + return nil, fmt.Errorf("failed to get supernode: %w", err) } return resp.Supernode, nil } +// GetParams fetches the supernode module parameters +func (m *module) GetParams(ctx context.Context) (*types.QueryParamsResponse, error) { + resp, err := m.client.Params(ctx, &types.QueryParamsRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to get supernode params: %w", err) + } + + return resp, nil +} + func Exists(nodes []*types.SuperNode, snAccAddress string) bool { for _, sn := range nodes { if sn.SupernodeAccount == snAccAddress { diff --git a/pkg/lumera/modules/supernode/interface.go b/pkg/lumera/modules/supernode/interface.go index 6fa6546c..1602e86c 100644 --- a/pkg/lumera/modules/supernode/interface.go +++ b/pkg/lumera/modules/supernode/interface.go @@ -13,6 +13,7 @@ type Module interface { GetTopSuperNodesForBlock(ctx context.Context, blockHeight uint64) (*types.QueryGetTopSuperNodesForBlockResponse, error) GetSuperNode(ctx context.Context, address string) (*types.QueryGetSuperNodeResponse, error) GetSupernodeBySupernodeAddress(ctx context.Context, address string) (*types.SuperNode, error) + GetParams(ctx context.Context) (*types.QueryParamsResponse, error) } // NewModule creates a new SuperNode module client From 2497b0ae29c6c77bc881cc720330edd82e623bfb Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 15:35:35 +0500 Subject: [PATCH 6/8] m --- sdk/adapters/lumera/adapter.go | 38 ++++++++++++++++++++++++++++---- sdk/adapters/lumera/types.go | 2 +- sdk/config/config.go | 1 + sdk/task/cascade.go | 18 +++++++-------- sdk/task/manager.go | 5 +++++ tests/system/e2e_cascade_test.go | 5 ++++- 6 files changed, 54 insertions(+), 15 deletions(-) diff --git a/sdk/adapters/lumera/adapter.go b/sdk/adapters/lumera/adapter.go index 7db8b4b4..c632c304 100644 --- a/sdk/adapters/lumera/adapter.go +++ b/sdk/adapters/lumera/adapter.go @@ -3,7 +3,6 @@ package lumera import ( "context" "fmt" - "strconv" "github.com/LumeraProtocol/supernode/sdk/log" @@ -24,6 +23,7 @@ type ConfigParams struct { GRPCAddr string ChainID string Timeout int + KeyName string } type Adapter struct { @@ -65,6 +65,10 @@ func NewAdapter( options = append(options, lumeraclient.WithKeyring(kr)) } + if config.KeyName != "" { + options = append(options, lumeraclient.WithKeyName(config.KeyName)) + } + // Initialize the client client, err := lumeraclient.NewClient(ctx, options...) if err != nil { @@ -89,6 +93,15 @@ func (a *Adapter) GetAction(ctx context.Context, actionID string) (Action, error return Action{}, fmt.Errorf("failed to get action: %w", err) } + // Add validation + if resp == nil { + return Action{}, fmt.Errorf("received nil response for action %s", actionID) + } + + if resp.Action == nil { + return Action{}, fmt.Errorf("action %s not found", actionID) + } + action := toSdkAction(resp) a.logger.Debug(ctx, "Successfully retrieved action", "actionID", action.ID, @@ -101,7 +114,14 @@ func (a *Adapter) GetAction(ctx context.Context, actionID string) (Action, error func (a *Adapter) GetSupernodes(ctx context.Context, height int64) ([]Supernode, error) { a.logger.Debug(ctx, "Getting top supernodes for block", "height", height) - resp, err := a.client.SuperNode().GetTopSuperNodesForBlock(ctx, uint64(height)) + // Safely convert int64 to uint64 + var blockHeight uint64 + if height < 0 { + return nil, fmt.Errorf("invalid block height: %d", height) + } + blockHeight = uint64(height) + + resp, err := a.client.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) if err != nil { a.logger.Error(ctx, "Failed to get supernodes", "height", height, "error", err) return nil, fmt.Errorf("failed to get supernodes: %w", err) @@ -112,12 +132,14 @@ func (a *Adapter) GetSupernodes(ctx context.Context, height int64) ([]Supernode, return supernodes, nil } + func toSdkAction(resp *actiontypes.QueryGetActionResponse) Action { + return Action{ ID: resp.Action.ActionID, State: ACTION_STATE(resp.Action.State.String()), - Height: int64(resp.Action.BlockHeight), - ExpirationTime: strconv.FormatInt(resp.Action.ExpirationTime, 10), + Height: resp.Action.BlockHeight, + ExpirationTime: resp.Action.ExpirationTime, } } @@ -133,10 +155,12 @@ func toSdkSupernodes(resp *sntypes.QueryGetTopSuperNodesForBlockResponse) []Supe continue } + // Check if States slice has at least one element if len(sn.States) == 0 { continue } + // Check if the first state is active if sn.States[0].State.String() != string(SUPERNODE_STATE_ACTIVE) { continue } @@ -155,9 +179,15 @@ func getLatestIP(supernode *sntypes.SuperNode) (string, error) { return "", fmt.Errorf("supernode is nil") } + // Check if the slice has elements before accessing it if len(supernode.PrevIpAddresses) == 0 { return "", fmt.Errorf("no ip history exists for the supernode") } + // Access the first element safely + if supernode.PrevIpAddresses[0] == nil { + return "", fmt.Errorf("first IP address in history is nil") + } + return supernode.PrevIpAddresses[0].Address, nil } diff --git a/sdk/adapters/lumera/types.go b/sdk/adapters/lumera/types.go index 721ac134..8edd2642 100644 --- a/sdk/adapters/lumera/types.go +++ b/sdk/adapters/lumera/types.go @@ -28,7 +28,7 @@ type Action struct { ID string State ACTION_STATE Height int64 - ExpirationTime string + ExpirationTime int64 } // Supernode represents information about a supernode in the network diff --git a/sdk/config/config.go b/sdk/config/config.go index cf761a12..fa8cc4e1 100644 --- a/sdk/config/config.go +++ b/sdk/config/config.go @@ -10,5 +10,6 @@ type Config struct { GRPCAddr string ChainID string Timeout int + KeyName string } } diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index fb3bc8cc..4b3fbf9a 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -417,15 +417,15 @@ func (t *CascadeTask) validateAction(ctx context.Context) (lumera.Action, error) return lumera.Action{}, errors.New("no action found with the specified ID") } - // Check action state - if action.State != lumera.ACTION_STATE_PENDING { - t.logger.Error(ctx, "Action is in invalid state", - "taskID", t.TaskID, - "actionID", t.ActionID, - "state", action.State, - "expectedState", lumera.ACTION_STATE_PENDING) - return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) - } + // // Check action state + // if action.State != lumera.ACTION_STATE_PENDING { + // t.logger.Error(ctx, "Action is in invalid state", + // "taskID", t.TaskID, + // "actionID", t.ActionID, + // "state", action.State, + // "expectedState", lumera.ACTION_STATE_PENDING) + // return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) + // } t.logger.Debug(ctx, "Action validated successfully", "taskID", t.TaskID, diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 8712e80d..4ce2d09e 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -56,10 +56,15 @@ func NewManager( GRPCAddr: config.Lumera.GRPCAddr, ChainID: config.Lumera.ChainID, Timeout: config.Lumera.Timeout, + KeyName: config.Lumera.KeyName, }, kr, logger) + if err != nil { + panic(fmt.Sprintf("Failed to create Lumera client: %v", err)) + } + taskCache, err := NewTaskCache(ctx, logger) if err != nil { logger.Error(ctx, "Failed to create task cache", "error", err) diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 437223a4..3a1f37c6 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -211,6 +211,8 @@ func TestCascadeE2E(t *testing.T) { ctx, lumera.WithGRPCAddr(lumeraGRPCAddr), lumera.WithChainID(lumeraChainID), + lumera.WithKeyring(keplrKeyring), + lumera.WithKeyName(testKeyName), ) require.NoError(t, err, "Failed to initialize Lumera client") @@ -245,7 +247,6 @@ func TestCascadeE2E(t *testing.T) { hashBytes := hash.Sum(nil) hashHex := fmt.Sprintf("%X", hashBytes) t.Logf("File hash: %s", hashHex) - time.Sleep(1 * time.Minute) // --------------------------------------- // Step 5: Sign data and generate RaptorQ identifiers // --------------------------------------- @@ -404,10 +405,12 @@ func TestCascadeE2E(t *testing.T) { GRPCAddr string ChainID string Timeout int + KeyName string }{ GRPCAddr: lumeraGRPCAddr, ChainID: lumeraChainID, Timeout: 300, // 30 seconds timeout + KeyName: testKeyName, }, } From cbebf6010bca8ecd6a262038334c419450116817 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 16:01:03 +0500 Subject: [PATCH 7/8] a --- sdk/net/factory.go | 1 + sdk/task/cascade.go | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/net/factory.go b/sdk/net/factory.go index 731461cc..80ca0978 100644 --- a/sdk/net/factory.go +++ b/sdk/net/factory.go @@ -57,6 +57,7 @@ func (f *ClientFactory) CreateClient(ctx context.Context, supernode lumera.Super // Ensure endpoint has port endpoint := AddPortIfMissing(supernode.GrpcEndpoint, f.config.DefaultSupernodePort) + fmt.Printf("Supernode endpoint: %s\n", endpoint) f.logger.Debug(ctx, "Creating supernode client", "supernode", supernode.CosmosAddress, "endpoint", endpoint) diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 4b3fbf9a..fb3bc8cc 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -417,15 +417,15 @@ func (t *CascadeTask) validateAction(ctx context.Context) (lumera.Action, error) return lumera.Action{}, errors.New("no action found with the specified ID") } - // // Check action state - // if action.State != lumera.ACTION_STATE_PENDING { - // t.logger.Error(ctx, "Action is in invalid state", - // "taskID", t.TaskID, - // "actionID", t.ActionID, - // "state", action.State, - // "expectedState", lumera.ACTION_STATE_PENDING) - // return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) - // } + // Check action state + if action.State != lumera.ACTION_STATE_PENDING { + t.logger.Error(ctx, "Action is in invalid state", + "taskID", t.TaskID, + "actionID", t.ActionID, + "state", action.State, + "expectedState", lumera.ACTION_STATE_PENDING) + return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) + } t.logger.Debug(ctx, "Action validated successfully", "taskID", t.TaskID, From 546a0611892f80c0943878729c9dfd6ef2abfa7d Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 1 May 2025 21:39:58 +0500 Subject: [PATCH 8/8] Finalize Action --- pkg/lumera/modules/action_msg/impl.go | 148 +++++++++++++++------ pkg/lumera/modules/action_msg/interface.go | 3 +- supernode/cmd/supernode.go | 1 + supernode/services/cascade/upload.go | 18 +++ tests/system/cli.go | 9 ++ tests/system/e2e_cascade_test.go | 20 ++- 6 files changed, 147 insertions(+), 52 deletions(-) diff --git a/pkg/lumera/modules/action_msg/impl.go b/pkg/lumera/modules/action_msg/impl.go index 3c89e143..1e32be43 100644 --- a/pkg/lumera/modules/action_msg/impl.go +++ b/pkg/lumera/modules/action_msg/impl.go @@ -6,13 +6,14 @@ import ( "fmt" actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" + "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/codec/types" codectypes "github.com/cosmos/cosmos-sdk/codec/types" cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" - sdk "github.com/cosmos/cosmos-sdk/types" txtypes "github.com/cosmos/cosmos-sdk/types/tx" signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" @@ -25,8 +26,8 @@ const ( defaultGasLimit = uint64(200000) defaultMinGasLimit = uint64(100000) defaultMaxGasLimit = uint64(1000000) - defaultGasAdjustment = float64(1.5) - defaultGasPadding = uint64(10000) // Added to simulated gas + defaultGasAdjustment = float64(3.0) + defaultGasPadding = uint64(50000) ) // module implements the Module interface @@ -36,12 +37,11 @@ type module struct { kr keyring.Keyring keyName string chainID string - nodeAddr string - gasLimit uint64 // Default gas limit - minGasLimit uint64 // Minimum gas limit - maxGasLimit uint64 // Maximum gas limit - gasAdjustment float64 // Gas adjustment multiplier - gasPadding uint64 // Added to simulated gas + gasLimit uint64 + minGasLimit uint64 + maxGasLimit uint64 + gasAdjustment float64 + gasPadding uint64 } // newModule creates a new ActionMsg module client @@ -62,16 +62,12 @@ func newModule(conn *grpc.ClientConn, kr keyring.Keyring, keyName string, chainI return nil, fmt.Errorf("chain ID cannot be empty") } - // Extract node address from connection - nodeAddr := conn.Target() - return &module{ conn: conn, client: actiontypes.NewMsgClient(conn), kr: kr, keyName: keyName, chainID: chainID, - nodeAddr: nodeAddr, gasLimit: defaultGasLimit, minGasLimit: defaultMinGasLimit, maxGasLimit: defaultMaxGasLimit, @@ -85,7 +81,7 @@ func (m *module) FinalizeCascadeAction( ctx context.Context, actionId string, rqIdsIds []string, - rqIdsOti []string, + rqIdsOti []byte, ) (*FinalizeActionResult, error) { // Basic validation if actionId == "" { @@ -110,6 +106,8 @@ func (m *module) FinalizeCascadeAction( } creator := addr.String() + logtrace.Info(ctx, "finalize action started", logtrace.Fields{"creator": creator}) + // Create CASCADE metadata metadata := map[string]interface{}{ "rq_ids_ids": rqIdsIds, @@ -139,6 +137,8 @@ func (m *module) FinalizeCascadeAction( return nil, fmt.Errorf("failed to get account info: %w", err) } + logtrace.Info(ctx, "account info retrieved", logtrace.Fields{"accountNumber": accInfo.AccountNumber}) + // Create client context with keyring clientCtx := client.Context{}. WithCodec(encCfg.Codec). @@ -146,55 +146,68 @@ func (m *module) FinalizeCascadeAction( WithKeyring(m.kr). WithBroadcastMode("sync") - // Create transaction factory with initial gas values - factory := tx.Factory{}. + // Simulate transaction to get gas estimate + txBuilder, err := tx.Factory{}. WithTxConfig(clientCtx.TxConfig). WithKeybase(m.kr). WithAccountNumber(accInfo.AccountNumber). WithSequence(accInfo.Sequence). WithChainID(m.chainID). - WithGas(m.gasLimit). // Use default initially - WithGasAdjustment(m.gasAdjustment). // Configurable adjustment - WithSignMode(signingtypes.SignMode_SIGN_MODE_DIRECT) - - // Build unsigned transaction - txBuilder, err := factory.BuildUnsignedTx(msg) + WithGas(m.gasLimit). + WithGasAdjustment(m.gasAdjustment). + WithSignMode(signingtypes.SignMode_SIGN_MODE_DIRECT). + BuildUnsignedTx(msg) if err != nil { - return nil, fmt.Errorf("failed to build unsigned tx: %w", err) + return nil, fmt.Errorf("failed to build unsigned tx for simulation: %w", err) } - - // Simulate transaction to get accurate gas estimation + pubKey, err := key.GetPubKey() + if err != nil { + return nil, fmt.Errorf("failed to get public key: %w", err) + } + txBuilder.SetSignatures(signingtypes.SignatureV2{ + PubKey: pubKey, // your signing pubkey + Data: &signingtypes.SingleSignatureData{SignMode: signingtypes.SignMode_SIGN_MODE_DIRECT, Signature: nil}, + Sequence: accInfo.Sequence, + }) simulatedGas, err := m.simulateTx(ctx, clientCtx, txBuilder) if err != nil { - return nil, fmt.Errorf("simulation failed: %v, using default gas limit of %d", err, m.gasLimit) + return nil, fmt.Errorf("simulation failed: %w", err) } - // Apply gas adjustment and padding to simulated gas value adjustedGas := uint64(float64(simulatedGas) * m.gasAdjustment) gasToUse := adjustedGas + m.gasPadding // Apply gas bounds - if gasToUse < m.minGasLimit { - return nil, fmt.Errorf("adjusted gas (%d) below minimum, transaction requires minimum gas limit: %d", adjustedGas+m.gasPadding, m.minGasLimit) - } else if gasToUse > m.maxGasLimit { - return nil, fmt.Errorf("adjusted gas (%d) above maximum, transaction exceeds maximum gas limit: %d", adjustedGas+m.gasPadding, m.maxGasLimit) + if gasToUse > m.maxGasLimit { + gasToUse = m.maxGasLimit } - // Update factory with calculated gas - factory = factory.WithGas(gasToUse) + logtrace.Info(ctx, "using simulated gas", logtrace.Fields{"simulatedGas": simulatedGas, "adjustedGas": gasToUse}) + + // Create transaction factory with final gas + factory := tx.Factory{}. + WithTxConfig(clientCtx.TxConfig). + WithKeybase(m.kr). + WithAccountNumber(accInfo.AccountNumber). + WithSequence(accInfo.Sequence). + WithChainID(m.chainID). + WithGas(gasToUse). + WithGasAdjustment(m.gasAdjustment). + WithSignMode(signingtypes.SignMode_SIGN_MODE_DIRECT) - // Rebuild transaction with updated gas + // Build and sign transaction txBuilder, err = factory.BuildUnsignedTx(msg) if err != nil { - return nil, fmt.Errorf("failed to rebuild unsigned tx: %w", err) + return nil, fmt.Errorf("failed to build unsigned tx: %w", err) } - // Sign transaction err = tx.Sign(ctx, factory, m.keyName, txBuilder, true) if err != nil { return nil, fmt.Errorf("failed to sign transaction: %w", err) } + logtrace.Info(ctx, "transaction signed successfully", nil) + // Broadcast transaction txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) if err != nil { @@ -205,23 +218,38 @@ func (m *module) FinalizeCascadeAction( if err != nil { return &FinalizeActionResult{ Success: false, - TxHash: "", // Empty when failed + TxHash: "", }, fmt.Errorf("failed to broadcast transaction: %w", err) } + logtrace.Info(ctx, "transaction broadcast success", logtrace.Fields{"txHash": resp.TxHash}) + return &FinalizeActionResult{ TxHash: resp.TxHash, + Code: resp.Code, Success: true, }, nil } // Helper function to simulate transaction and return gas used func (m *module) simulateTx(ctx context.Context, clientCtx client.Context, txBuilder client.TxBuilder) (uint64, error) { - txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) + // First, let's see what's in the txBuilder + tx := txBuilder.GetTx() + logtrace.Info(ctx, "transaction for simulation", logtrace.Fields{ + "messages": fmt.Sprintf("%v", tx.GetMsgs()), + "fee": fmt.Sprintf("%v", tx.GetFee()), + "gas": tx.GetGas(), + }) + + txBytes, err := clientCtx.TxConfig.TxEncoder()(tx) if err != nil { return 0, fmt.Errorf("failed to encode transaction for simulation: %w", err) } + logtrace.Info(ctx, "transaction encoded for simulation", logtrace.Fields{ + "bytesLength": len(txBytes), + }) + // Create gRPC client for tx service txClient := txtypes.NewServiceClient(m.conn) @@ -230,11 +258,33 @@ func (m *module) simulateTx(ctx context.Context, clientCtx client.Context, txBui TxBytes: txBytes, } + // Check if we have the tx in the request too + if simReq.Tx != nil { + logtrace.Info(ctx, "simulation request has tx field", logtrace.Fields{ + "txFieldPresent": true, + }) + } + + logtrace.Info(ctx, "sending simulation request", logtrace.Fields{ + "requestBytes": len(simReq.TxBytes), + "requestType": fmt.Sprintf("%T", simReq), + }) + simRes, err := txClient.Simulate(ctx, simReq) if err != nil { - return 0, fmt.Errorf("simulation failed: %w", err) + logtrace.Error(ctx, "simulation error details", logtrace.Fields{ + "error": err.Error(), + "errorType": fmt.Sprintf("%T", err), + "requestBytes": len(simReq.TxBytes), + }) + return 0, fmt.Errorf("simulation error: %w", err) } + logtrace.Info(ctx, "simulation response", logtrace.Fields{ + "gasUsed": simRes.GasInfo.GasUsed, + "gasWanted": simRes.GasInfo.GasWanted, + }) + return simRes.GasInfo.GasUsed, nil } @@ -282,20 +332,28 @@ func (m *module) getAccountInfo(ctx context.Context, address string) (*AccountIn } // Unmarshal account - var account sdk.AccountI + var account authtypes.AccountI err = m.getEncodingConfig().InterfaceRegistry.UnpackAny(resp.Account, &account) if err != nil { - return nil, fmt.Errorf("failed to unmarshal account: %w", err) + return nil, fmt.Errorf("failed to unpack account: %w", err) + } + + // Convert to BaseAccount + baseAcc, ok := account.(*authtypes.BaseAccount) + if !ok { + return nil, fmt.Errorf("received account is not a BaseAccount") } return &AccountInfo{ - AccountNumber: account.GetAccountNumber(), - Sequence: account.GetSequence(), + AccountNumber: baseAcc.AccountNumber, + Sequence: baseAcc.Sequence, }, nil } // makeEncodingConfig creates an EncodingConfig for transaction handling func makeEncodingConfig() EncodingConfig { + amino := codec.NewLegacyAmino() + interfaceRegistry := codectypes.NewInterfaceRegistry() cryptocodec.RegisterInterfaces(interfaceRegistry) authtypes.RegisterInterfaces(interfaceRegistry) @@ -308,6 +366,7 @@ func makeEncodingConfig() EncodingConfig { InterfaceRegistry: interfaceRegistry, Codec: marshaler, TxConfig: txConfig, + Amino: amino, } } @@ -318,9 +377,10 @@ func (m *module) getEncodingConfig() EncodingConfig { // EncodingConfig specifies the concrete encoding types to use type EncodingConfig struct { - InterfaceRegistry codectypes.InterfaceRegistry + InterfaceRegistry types.InterfaceRegistry Codec codec.Codec TxConfig client.TxConfig + Amino *codec.LegacyAmino } // AccountInfo holds account information for transaction signing diff --git a/pkg/lumera/modules/action_msg/interface.go b/pkg/lumera/modules/action_msg/interface.go index e9092378..210cf8f8 100644 --- a/pkg/lumera/modules/action_msg/interface.go +++ b/pkg/lumera/modules/action_msg/interface.go @@ -11,6 +11,7 @@ import ( // FinalizeActionResult represents the result of a finalized action type FinalizeActionResult struct { TxHash string // Transaction hash + Code uint32 // Code of the transaction Success bool // Whether the transaction was successful } @@ -21,7 +22,7 @@ type Module interface { ctx context.Context, actionId string, rqIdsIds []string, - rqIdsOti []string, + rqIdsOti []byte, ) (*FinalizeActionResult, error) } diff --git a/supernode/cmd/supernode.go b/supernode/cmd/supernode.go index af8d0111..085f670c 100644 --- a/supernode/cmd/supernode.go +++ b/supernode/cmd/supernode.go @@ -143,6 +143,7 @@ func initLumeraClient(ctx context.Context, config *config.Config, kr keyring.Key lumera.WithChainID(config.LumeraClientConfig.ChainID), lumera.WithTimeout(config.LumeraClientConfig.Timeout), lumera.WithKeyring(kr), + lumera.WithKeyName(config.SupernodeConfig.KeyName), ) } diff --git a/supernode/services/cascade/upload.go b/supernode/services/cascade/upload.go index d52e46d1..29a3142c 100644 --- a/supernode/services/cascade/upload.go +++ b/supernode/services/cascade/upload.go @@ -133,6 +133,24 @@ func (task *CascadeRegistrationTask) UploadInputData(ctx context.Context, req *U } logtrace.Info(ctx, "id files have been stored", fields) + faresponse, err := task.lumeraClient.ActionMsg().FinalizeCascadeAction(ctx, actionDetails.ActionID, res.RQIDs, task.RQInfo.rqIDEncodeParams.Oti) + if err != nil { + logtrace.Info(ctx, "failed to finalize action", logtrace.Fields{ + logtrace.FieldError: err.Error(), + "taskID": task.ID(), + "actionID": actionDetails.ActionID}) + } + + if faresponse == nil { + fields[logtrace.FieldError] = "finalize action response is nil" + } + + logtrace.Info(ctx, "finalize action response", logtrace.Fields{ + "taskID": task.ID(), + "actionID": actionDetails.ActionID, + "response": faresponse.TxHash, + "Code": faresponse.Code}) + // Store RaptorQ symbols if err = task.storeRaptorQSymbols(ctx); err != nil { fields[logtrace.FieldError] = err.Error() diff --git a/tests/system/cli.go b/tests/system/cli.go index de35b5a1..351156c1 100644 --- a/tests/system/cli.go +++ b/tests/system/cli.go @@ -290,6 +290,15 @@ func (c LumeradCli) GetKeyAddr(name string) string { const defaultSrcAddr = "node0" +func (c LumeradCli) FundAddressWithNode(destAddr, amount string, nodeAddr string) string { + require.NotEmpty(c.t, destAddr) + require.NotEmpty(c.t, amount) + cmd := []string{"tx", "bank", "send", nodeAddr, destAddr, amount} + rsp := c.CustomCommand(cmd...) + RequireTxSuccess(c.t, rsp) + return rsp +} + // FundAddress sends the token amount to the destination address func (c LumeradCli) FundAddress(destAddr, amount string) string { require.NotEmpty(c.t, destAddr) diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 3a1f37c6..7ebcec21 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -79,7 +79,7 @@ func TestCascadeE2E(t *testing.T) { t.Log("Registering multiple supernodes to process requests") // Helper function to register a supernode - registerSupernode := func(nodeKey string, port string) { + registerSupernode := func(nodeKey string, port string, addr string) { // Get account and validator addresses for registration accountAddr := cli.GetKeyAddr(nodeKey) valAddrOutput := cli.Keys("keys", "show", nodeKey, "--bech", "val", "-a") @@ -93,7 +93,7 @@ func TestCascadeE2E(t *testing.T) { valAddr, // validator address "localhost:" + port, // IP address with unique port "1.0.0", // version - accountAddr, // supernode account + addr, // supernode account "--from", nodeKey, } @@ -105,10 +105,13 @@ func TestCascadeE2E(t *testing.T) { } // Register three supernodes with different ports - registerSupernode("node0", "4444") - // registerSupernode("node1", "4446") - // registerSupernode("node2", "4448") + registerSupernode("node0", "4444", "lumera1em87kgrvgttrkvuamtetyaagjrhnu3vjy44at4") + registerSupernode("node1", "4446", "lumera1cf0ms9ttgdvz6zwlqfty4tjcawhuaq69p40w0c") + registerSupernode("node2", "4448", "lumera1cjyc4ruq739e2lakuhargejjkr0q5vg6x3d7kp") + cli.FundAddress("lumera1em87kgrvgttrkvuamtetyaagjrhnu3vjy44at4", "100000ulume") + cli.FundAddressWithNode("lumera1cf0ms9ttgdvz6zwlqfty4tjcawhuaq69p40w0c", "100000ulume", "node1") + cli.FundAddressWithNode("lumera1cjyc4ruq739e2lakuhargejjkr0q5vg6x3d7kp", "100000ulume", "node2") t.Log("Successfully registered three supernodes") // --------------------------------------- @@ -120,8 +123,8 @@ func TestCascadeE2E(t *testing.T) { defer StopRQService(rq_cmd) // Ensure service is stopped after test // Start the supernode service to process cascade requests - // cmds := StartAllSupernodes(t) - // defer StopAllSupernodes(cmds) // Ensure service is stopped after test + cmds := StartAllSupernodes(t) + defer StopAllSupernodes(cmds) // Ensure service is stopped after test // --------------------------------------- // Step 2: Set up test account and keys @@ -247,6 +250,7 @@ func TestCascadeE2E(t *testing.T) { hashBytes := hash.Sum(nil) hashHex := fmt.Sprintf("%X", hashBytes) t.Logf("File hash: %s", hashHex) + time.Sleep(1 * time.Minute) // --------------------------------------- // Step 5: Sign data and generate RaptorQ identifiers // --------------------------------------- @@ -502,6 +506,8 @@ func TestCascadeE2E(t *testing.T) { } } } + + time.Sleep(10 * time.Minute) // Wait for supernode processing require.NotEmpty(t, successfulSupernode, "Should have a successful supernode in events") t.Logf("Cascade successfully processed by supernode: %s", successfulSupernode) }