diff --git a/pkg/lumera/config.go b/pkg/lumera/config.go index 9c9208bc..52343727 100644 --- a/pkg/lumera/config.go +++ b/pkg/lumera/config.go @@ -1,6 +1,10 @@ package lumera -import "github.com/cosmos/cosmos-sdk/crypto/keyring" +import ( + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" +) // Config holds all the configuration needed for the client type Config struct { @@ -10,8 +14,8 @@ type Config struct { // ChainID is the ID of the chain ChainID string - // Timeout is the default request timeout in seconds - Timeout int + // Timeout is the default request timeout + Timeout time.Duration // keyring is the keyring conf for the node sign & verify keyring keyring.Keyring diff --git a/pkg/lumera/options.go b/pkg/lumera/options.go index 7bc5220e..c8ccacb9 100644 --- a/pkg/lumera/options.go +++ b/pkg/lumera/options.go @@ -1,6 +1,10 @@ package lumera -import "github.com/cosmos/cosmos-sdk/crypto/keyring" +import ( + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" +) // Option is a function that applies a change to Config type Option func(*Config) @@ -20,9 +24,9 @@ func WithChainID(chainID string) Option { } // WithTimeout sets the default timeout -func WithTimeout(seconds int) Option { +func WithTimeout(duration time.Duration) Option { return func(c *Config) { - c.Timeout = seconds + c.Timeout = duration } } diff --git a/sdk/action/client.go b/sdk/action/client.go index 993cca26..15c9b46f 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -3,6 +3,7 @@ package action import ( "context" "fmt" + "os" "github.com/LumeraProtocol/supernode/sdk/config" "github.com/LumeraProtocol/supernode/sdk/event" @@ -17,8 +18,8 @@ type Client interface { StartCascade(ctx context.Context, fileHash string, actionID string, filePath string, signedData string) (string, error) DeleteTask(ctx context.Context, taskID string) error GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) - SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) - SubscribeToAllEvents(ctx context.Context, handler event.Handler) + SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error + SubscribeToAllEvents(ctx context.Context, handler event.Handler) error } // ClientImpl implements the Client interface @@ -33,12 +34,7 @@ type ClientImpl struct { var _ Client = (*ClientImpl)(nil) // NewClient creates a new action client -func NewClient( - ctx context.Context, - config config.Config, - logger log.Logger, - keyring keyring.Keyring, -) (Client, error) { +func NewClient(ctx context.Context, config config.Config, logger log.Logger, keyring keyring.Keyring) (Client, error) { if logger == nil { logger = log.NewNoopLogger() } @@ -57,12 +53,11 @@ func NewClient( } // StartCascade initiates a cascade operation -func (c *ClientImpl) StartCascade( - ctx context.Context, - fileHash string, // Hash of the file to process - actionID string, // ID of the action to perform - filePath string, // Path to the file on disk - signedData string, // Optional signed authorization data +func (c *ClientImpl) StartCascade(ctx context.Context, + fileHash string, + actionID string, + filePath string, + signedData string, ) (string, error) { c.logger.Debug(ctx, "Starting cascade operation", "fileHash", fileHash, @@ -82,6 +77,11 @@ func (c *ClientImpl) StartCascade( c.logger.Error(ctx, "Empty file path provided") return "", ErrEmptyFilePath } + _, err := os.Stat(filePath) + if err != nil { + c.logger.Error(ctx, "File not found", "filePath", filePath) + return "", ErrEmptyFileNotFound + } taskID, err := c.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath, signedData) if err != nil { @@ -95,14 +95,13 @@ func (c *ClientImpl) StartCascade( // GetTask retrieves a task by its ID func (c *ClientImpl) GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) { - c.logger.Debug(ctx, "Getting task", "taskID", taskID) task, found := c.taskManager.GetTask(ctx, taskID) - if !found { - c.logger.Debug(ctx, "Task not found", "taskID", taskID) - } else { - c.logger.Debug(ctx, "Task found", "taskID", taskID, "status", task.Status) + if found { + return task, true } - return task, found + c.logger.Debug(ctx, "Task not found", "taskID", taskID) + + return nil, false } // DeleteTask removes a task by its ID @@ -113,32 +112,35 @@ func (c *ClientImpl) DeleteTask(ctx context.Context, taskID string) error { return fmt.Errorf("task ID cannot be empty") } - err := c.taskManager.DeleteTask(ctx, taskID) - if err != nil { + if err := c.taskManager.DeleteTask(ctx, taskID); err != nil { c.logger.Error(ctx, "Failed to delete task", "taskID", taskID, "error", err) return fmt.Errorf("failed to delete task: %w", err) } - c.logger.Info(ctx, "Task deleted successfully", "taskID", taskID) + return nil } // SubscribeToEvents registers a handler for specific event types -func (c *ClientImpl) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) { - c.logger.Debug(ctx, "Subscribing to events via task manager", "eventType", eventType) - if c.taskManager != nil { - c.taskManager.SubscribeToEvents(ctx, eventType, handler) - } else { - c.logger.Warn(ctx, "TaskManager is nil, cannot subscribe to events") +func (c *ClientImpl) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error { + if c.taskManager == nil { + return fmt.Errorf("TaskManager is nil, cannot subscribe to events") } + + c.logger.Debug(ctx, "Subscribing to events via task manager", "eventType", eventType) + c.taskManager.SubscribeToEvents(ctx, eventType, handler) + + return nil } // SubscribeToAllEvents registers a handler for all events -func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Handler) { - c.logger.Debug(ctx, "Subscribing to all events via task manager") - if c.taskManager != nil { - c.taskManager.SubscribeToAllEvents(ctx, handler) - } else { - c.logger.Warn(ctx, "TaskManager is nil, cannot subscribe to all events") +func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Handler) error { + if c.taskManager == nil { + return fmt.Errorf("TaskManager is nil, cannot subscribe to events") } + + c.logger.Debug(ctx, "Subscribing to all events via task manager") + c.taskManager.SubscribeToAllEvents(ctx, handler) + + return nil } diff --git a/sdk/action/errors.go b/sdk/action/errors.go index a22bbc1d..b6af6969 100644 --- a/sdk/action/errors.go +++ b/sdk/action/errors.go @@ -6,14 +6,15 @@ import ( ) var ( - ErrEmptyFileHash = errors.New("file hash cannot be empty") - ErrEmptyActionID = errors.New("action ID cannot be empty") - ErrEmptyFilePath = errors.New("file path cannot be empty") - ErrNoValidAction = errors.New("no action found with the specified ID") - ErrInvalidAction = errors.New("action is not in a valid state") - ErrNoSupernodes = errors.New("no valid supernodes available") - ErrTaskCreation = errors.New("failed to create task") - ErrCommunication = errors.New("communication with supernode failed") + ErrEmptyFileHash = errors.New("file hash cannot be empty") + ErrEmptyActionID = errors.New("action ID cannot be empty") + ErrEmptyFilePath = errors.New("file path cannot be empty") + ErrEmptyFileNotFound = errors.New("file not found at the specified path") + ErrNoValidAction = errors.New("no action found with the specified ID") + ErrInvalidAction = errors.New("action is not in a valid state") + ErrNoSupernodes = errors.New("no valid supernodes available") + ErrTaskCreation = errors.New("failed to create task") + ErrCommunication = errors.New("communication with supernode failed") ) // SupernodeError represents an error related to supernode operations diff --git a/sdk/adapters/lumera/adapter.go b/sdk/adapters/lumera/adapter.go index 7db8b4b4..953cee37 100644 --- a/sdk/adapters/lumera/adapter.go +++ b/sdk/adapters/lumera/adapter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/LumeraProtocol/supernode/sdk/log" @@ -23,7 +24,7 @@ type Client interface { type ConfigParams struct { GRPCAddr string ChainID string - Timeout int + Timeout time.Duration } type Adapter struct { diff --git a/sdk/adapters/lumera/types.go b/sdk/adapters/lumera/types.go index 721ac134..236cb76b 100644 --- a/sdk/adapters/lumera/types.go +++ b/sdk/adapters/lumera/types.go @@ -31,6 +31,8 @@ type Action struct { ExpirationTime string } +type Supernodes []Supernode + // Supernode represents information about a supernode in the network type Supernode struct { CosmosAddress string // Blockchain identity of the supernode diff --git a/sdk/config/config.go b/sdk/config/config.go index cf761a12..556a2dbc 100644 --- a/sdk/config/config.go +++ b/sdk/config/config.go @@ -1,14 +1,86 @@ package config +import ( + "errors" + "time" +) + +const ( + DefaultLocalCosmosAddress = "lumera1qv3" // Example address - replace with actual + DefaultChainID = "lumera-testnet" // Example chain ID - replace with actual + DefaultGRPCAddr = "127.0.0.1:9090" + DefaultTimeout = 10 * time.Second +) + +// AccountConfig holds peer-to-peer addresses, ports, etc. +type AccountConfig struct { + LocalCosmosAddress string // REQUIRED - cosmos account address used for signing tx +} + +// LumeraConfig wraps all chain-specific dials. +type LumeraConfig struct { + GRPCAddr string // REQUIRED – e.g. "127.0.0.1:9090" + ChainID string // REQUIRED – e.g. "lumera-mainnet" + Timeout time.Duration // OPTIONAL – defaults to DefaultTimeout +} + type Config struct { - Network struct { - DefaultSupernodePort int - LocalCosmosAddress string + Account AccountConfig + Lumera LumeraConfig +} + +// Option is a functional option for configuring the Config. +type Option func(*Config) + +// New builds a Config, applying the supplied functional options. +func New(opts ...Option) (*Config, error) { + cfg := &Config{ + Account: AccountConfig{LocalCosmosAddress: DefaultLocalCosmosAddress}, + Lumera: LumeraConfig{ + GRPCAddr: DefaultGRPCAddr, + Timeout: DefaultTimeout, + ChainID: DefaultChainID, + }, + } + + for _, opt := range opts { + opt(cfg) } - Lumera struct { - GRPCAddr string - ChainID string - Timeout int + if err := cfg.Validate(); err != nil { + return nil, err + } + return cfg, nil +} + +func WithLocalCosmosAddress(addr string) Option { + return func(c *Config) { c.Account.LocalCosmosAddress = addr } +} + +func WithGRPCAddr(addr string) Option { + return func(c *Config) { c.Lumera.GRPCAddr = addr } +} + +func WithChainID(id string) Option { + return func(c *Config) { c.Lumera.ChainID = id } +} + +func WithTimeout(d time.Duration) Option { + return func(c *Config) { c.Lumera.Timeout = d } +} + +// Validate checks the configuration for required fields and valid values. +func (c *Config) Validate() error { + switch { + case c.Account.LocalCosmosAddress == "": + return errors.New("config: Network.LocalCosmosAddress is required") + case c.Lumera.GRPCAddr == "": + return errors.New("config: Lumera.GRPCAddr is required") + case c.Lumera.ChainID == "": + return errors.New("config: Lumera.ChainID is required") + case c.Lumera.Timeout <= 0: + return errors.New("config: Lumera.Timeout must be > 0") + default: + return nil } } diff --git a/sdk/event/types.go b/sdk/event/types.go index e5c3ff51..9adcf330 100644 --- a/sdk/event/types.go +++ b/sdk/event/types.go @@ -7,33 +7,48 @@ import ( "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" ) -// EventType represents the type of event +// EventType represents the type of event emitted by the system type EventType string -// Event types constants +// Event types emitted by the system +// These events are used to track the progress of tasks +// and to notify subscribers about important changes in the system. const ( - // Task lifecycle events - TaskStarted EventType = "task.started" - TaskCompleted EventType = "task.completed" - TaskFailed EventType = "task.failed" - - // Phase lifecycle events - PhaseStarted EventType = "phase.started" - PhaseCompleted EventType = "phase.completed" - PhaseFailed EventType = "phase.failed" - - // Supernode events - SupernodeAttempt EventType = "supernode.attempt" - SupernodeSucceeded EventType = "supernode.succeeded" - SupernodeFailed EventType = "supernode.failed" + TaskStarted EventType = "task.started" + TaskProgressActionVerified EventType = "task.progress.action_verified" + TaskProgressActionVerificationFailed EventType = "task.progress.action_verification_failed" + TaskProgressSupernodesFound EventType = "task.progress.supernode_found" + TaskProgressSupernodesUnavailable EventType = "task.progress.supernodes_unavailable" + TaskProgressRegistrationInProgress EventType = "task.progress.registration_in_progress" + TaskProgressRegistrationFailure EventType = "task.progress.registration_failure" + TaskProgressRegistrationSuccessful EventType = "task.progress.registration_successful" + TaskCompleted EventType = "task.completed" + TaskFailed EventType = "task.failed" ) +// Task progress steps in order +// This is the order in which events are expected to occur +// during the task lifecycle. It is used to track progress. +// The order of events in this slice should match the order +// in which they are expected to occur in the task lifecycle. +// The index of each event in this slice represents its +// position in the task lifecycle. The first event in the slice is the +// first event that should be emitted when a task starts. +var taskProgressSteps = []EventType{ + TaskStarted, + TaskProgressActionVerified, + TaskProgressSupernodesFound, + TaskProgressRegistrationInProgress, + TaskCompleted, +} + // Event represents an event emitted by the system type Event struct { Type EventType // Type of event TaskID string // ID of the task that emitted the event TaskType string // Type of task (CASCADE, SENSE) Timestamp time.Time // When the event occurred + ActionID string // ID of the action associated with the task Data map[string]interface{} // Additional contextual data } @@ -43,7 +58,7 @@ type SupernodeData struct { Error string // Error message if applicable } -func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, data map[string]interface{}) Event { +func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, actionID string, data map[string]interface{}) Event { if data == nil { data = make(map[string]interface{}) } @@ -54,5 +69,17 @@ func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, TaskType: taskType, Timestamp: time.Now(), Data: data, + ActionID: actionID, + } +} + +// GetTaskProgress returns current progress as (y, x), where y = current step number, x = total steps. +func GetTaskProgress(current EventType) (int, int) { + for idx, step := range taskProgressSteps { + if step == current { + return idx + 1, len(taskProgressSteps) + } } + // Unknown event, treat as 0 progress + return 0, len(taskProgressSteps) } diff --git a/sdk/net/factory.go b/sdk/net/factory.go index 731461cc..9006fc13 100644 --- a/sdk/net/factory.go +++ b/sdk/net/factory.go @@ -13,8 +13,7 @@ import ( // FactoryConfig contains configuration for the ClientFactory type FactoryConfig struct { - LocalCosmosAddress string - DefaultSupernodePort int + LocalCosmosAddress string } // ClientFactory creates and manages supernode clients @@ -26,19 +25,13 @@ type ClientFactory struct { } // NewClientFactory creates a new client factory with the provided dependencies -func NewClientFactory( - ctx context.Context, - logger log.Logger, - keyring keyring.Keyring, - config FactoryConfig, -) *ClientFactory { +func NewClientFactory(ctx context.Context, logger log.Logger, keyring keyring.Keyring, config FactoryConfig) *ClientFactory { if logger == nil { logger = log.NewNoopLogger() } logger.Debug(ctx, "Creating supernode client factory", - "localAddress", config.LocalCosmosAddress, - "defaultPort", config.DefaultSupernodePort) + "localAddress", config.LocalCosmosAddress) return &ClientFactory{ logger: logger, @@ -54,29 +47,14 @@ func (f *ClientFactory) CreateClient(ctx context.Context, supernode lumera.Super return nil, fmt.Errorf("supernode has no gRPC endpoint: %s", supernode.CosmosAddress) } - // Ensure endpoint has port - endpoint := AddPortIfMissing(supernode.GrpcEndpoint, f.config.DefaultSupernodePort) - f.logger.Debug(ctx, "Creating supernode client", "supernode", supernode.CosmosAddress, - "endpoint", endpoint) - - // Update the supernode with the properly formatted endpoint - supernode.GrpcEndpoint = endpoint + "endpoint", supernode.GrpcEndpoint) // Create client with dependencies - client, err := NewSupernodeClient( - ctx, - f.logger, - f.keyring, - f.config.LocalCosmosAddress, - supernode, - f.clientOptions, - ) - + client, err := NewSupernodeClient(ctx, f.logger, f.keyring, f.config.LocalCosmosAddress, supernode, f.clientOptions) if err != nil { - return nil, fmt.Errorf("failed to create supernode client for %s: %w", - supernode.CosmosAddress, err) + return nil, fmt.Errorf("failed to create supernode client for %s: %w", supernode.CosmosAddress, err) } return client, nil diff --git a/sdk/net/impl.go b/sdk/net/impl.go index f349bf25..6e016634 100644 --- a/sdk/net/impl.go +++ b/sdk/net/impl.go @@ -3,7 +3,6 @@ package net import ( "context" "fmt" - "os" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" @@ -29,13 +28,8 @@ type supernodeClient struct { var _ SupernodeClient = (*supernodeClient)(nil) // NewSupernodeClient creates a new supernode client -func NewSupernodeClient( - ctx context.Context, - logger log.Logger, - keyring keyring.Keyring, - localCosmosAddress string, - targetSupernode lumera.Supernode, - clientOptions *client.ClientOptions, +func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring.Keyring, + localCosmosAddress string, targetSupernode lumera.Supernode, clientOptions *client.ClientOptions, ) (SupernodeClient, error) { // Validate required parameters if logger == nil { @@ -111,27 +105,9 @@ func NewSupernodeClient( } // UploadInputData sends data to the supernode for cascade processing -func (c *supernodeClient) UploadInputData( - ctx context.Context, - in *supernodeservice.UploadInputDataRequest, - opts ...grpc.CallOption, +func (c *supernodeClient) UploadInputData(ctx context.Context, + in *supernodeservice.UploadInputDataRequest, opts ...grpc.CallOption, ) (*supernodeservice.UploadInputDataResponse, error) { - // Get file info for logging - fileInfo, err := os.Stat(in.FilePath) - var fileSize int64 - if err != nil { - c.logger.Warn(ctx, "Failed to get file stats", - "filePath", in.FilePath, - "error", err) - } else { - fileSize = fileInfo.Size() - } - - c.logger.Debug(ctx, "Uploading input data", - "actionID", in.ActionID, - "filename", in.Filename, - "filePath", in.FilePath, - "fileSize", fileSize) resp, err := c.cascadeClient.UploadInputData(ctx, in, opts...) if err != nil { @@ -139,9 +115,7 @@ func (c *supernodeClient) UploadInputData( } c.logger.Info(ctx, "Input data uploaded successfully", - "actionID", in.ActionID, - "filename", in.Filename, - "filePath", in.FilePath) + "actionID", in.ActionID, "filename", in.Filename, "filePath", in.FilePath) return resp, nil } diff --git a/sdk/net/utils..go b/sdk/net/utils..go deleted file mode 100644 index f076fbaa..00000000 --- a/sdk/net/utils..go +++ /dev/null @@ -1,35 +0,0 @@ -package net - -import ( - "context" - "fmt" - "net" - "strings" -) - -// GetFreePortInRange finds a free port within the given range -func GetFreePortInRange(ctx context.Context, start, end int) (int, error) { - for port := start; port <= end; port++ { - select { - case <-ctx.Done(): - return 0, ctx.Err() - default: - addr := fmt.Sprintf("localhost:%d", port) - var lc net.ListenConfig - listener, err := lc.Listen(ctx, "tcp", addr) - if err == nil { - listener.Close() - return port, nil - } - } - } - return 0, fmt.Errorf("no free port found in range %d-%d", start, end) -} - -// AddPortIfMissing adds a default port to an endpoint if no port is specified -func AddPortIfMissing(endpoint string, defaultPort int) string { - if !strings.Contains(endpoint, ":") { - return fmt.Sprintf("%s:%d", endpoint, defaultPort) - } - return endpoint -} diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index fb3bc8cc..7c059322 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -4,21 +4,22 @@ import ( "context" "errors" "fmt" - "os" "path/filepath" + "sync" "time" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/net" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/health/grpc_health_v1" ) const ( - UploadTimeout = 60 * time.Second // Timeout for upload requests - ConnectTimeout = 60 * time.Second // Timeout for connection requests + registrationTimeout = 120 * time.Second // Timeout for registration requests + connectionTimeout = 10 * time.Second // Timeout for connection requests ) type CascadeTask struct { @@ -43,394 +44,195 @@ func NewCascadeTask( } } +// Run executes the full cascade‐task lifecycle. func (t *CascadeTask) Run(ctx context.Context) error { - t.logger.Info(ctx, "Running cascade task", - "taskID", t.TaskID, - "actionID", t.ActionID) + t.logEvent(ctx, event.TaskStarted, "Running cascade task", nil) + + action, err := t.fetchAndValidateAction(ctx, t.ActionID) + if err != nil { + return t.fail(ctx, event.TaskProgressActionVerificationFailed, err) + } + t.logEvent(ctx, event.TaskProgressActionVerified, "Action verified.", nil) - // Emit task started event with phase information - t.EmitEvent(ctx, event.TaskStarted, map[string]interface{}{ - "total_phases": 3, // Action validation, supernode selection, upload + supernodes, err := t.fetchSupernodes(ctx, action.Height) + if err != nil { + return t.fail(ctx, event.TaskProgressSupernodesUnavailable, err) + } + t.logEvent(ctx, event.TaskProgressSupernodesFound, "Supernodes found.", map[string]interface{}{ + "count": len(supernodes), }) - // Update status - t.Status = StatusProcessing + if err := t.registerWithSupernodes(ctx, supernodes); err != nil { + return t.fail(ctx, event.TaskProgressRegistrationFailure, err) + } + t.logEvent(ctx, event.TaskCompleted, "Cascade task completed successfully", nil) + t.Status = StatusCompleted - // 1. Action Validation Phase - t.logger.Debug(ctx, "Starting action validation phase", "taskID", t.TaskID) - t.EmitEvent(ctx, event.PhaseStarted, map[string]interface{}{ - "phase": "action_validation", - "phase_number": 1, - "total_phases": 3, - }) + return nil +} - action, err := t.validateAction(ctx) +// fetchAndValidateAction checks if the action exists and is in PENDING state +func (t *CascadeTask) fetchAndValidateAction(ctx context.Context, actionID string) (lumera.Action, error) { + action, err := t.client.GetAction(ctx, actionID) if err != nil { - t.Status = StatusFailed - t.Err = fmt.Errorf("action validation failed: %w", err) - - t.logger.Error(ctx, "Action validation failed", - "taskID", t.TaskID, - "actionID", t.ActionID, - "error", err) - - t.EmitEvent(ctx, event.PhaseFailed, map[string]interface{}{ - "phase": "action_validation", - "phase_number": 1, - "error": err.Error(), - }) - - t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ - "phase": "action_validation", - "error": t.Err.Error(), - }) + return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) + } - return t.Err + // Check if action exists + if action.ID == "" { + return lumera.Action{}, errors.New("no action found with the specified ID") } - t.logger.Debug(ctx, "Action validation successful", - "taskID", t.TaskID, - "actionID", action.ID, - "state", action.State) - - t.EmitEvent(ctx, event.PhaseCompleted, map[string]interface{}{ - "phase": "action_validation", - "phase_number": 1, - "action_id": action.ID, - "action_state": string(action.State), - "action_height": action.Height, - }) + // Check action state + if action.State != lumera.ACTION_STATE_PENDING { + return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) + } - // 2. Supernode Selection Phase - t.logger.Debug(ctx, "Starting supernode selection phase", "taskID", t.TaskID) - t.EmitEvent(ctx, event.PhaseStarted, map[string]interface{}{ - "phase": "supernode_selection", - "phase_number": 2, - "total_phases": 3, - }) + return action, nil +} - supernodes, err := t.client.GetSupernodes(ctx, action.Height) +func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { + sns, err := t.client.GetSupernodes(ctx, height) if err != nil { - t.Status = StatusFailed - t.Err = fmt.Errorf("supernode selection failed: %w", err) + return nil, fmt.Errorf("fetch supernodes: %w", err) + } + t.logger.Info(ctx, "Supernodes fetched", "count", len(sns)) - t.logger.Error(ctx, "Supernode selection failed", - "taskID", t.TaskID, - "error", err) + if len(sns) == 0 { + return nil, errors.New("no supernodes found") + } - t.EmitEvent(ctx, event.PhaseFailed, map[string]interface{}{ - "phase": "supernode_selection", - "phase_number": 2, - "error": err.Error(), - }) + if len(sns) > 10 { + sns = sns[:10] + } - t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ - "phase": "supernode_selection", - "error": t.Err.Error(), + // Keep only SERVING nodes (done in parallel – keeps latency flat) + healthy := make(lumera.Supernodes, 0, len(sns)) + eg, ctx := errgroup.WithContext(ctx) + mu := sync.Mutex{} + + for _, sn := range sns { + sn := sn + eg.Go(func() error { + if t.isServing(ctx, sn) { + mu.Lock() + healthy = append(healthy, sn) + mu.Unlock() + } + return nil }) - - return t.Err + } + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("health-check goroutines: %w", err) } - t.logger.Debug(ctx, "Supernode selection successful", - "taskID", t.TaskID, - "supernodeCount", len(supernodes)) + if len(healthy) == 0 { + return nil, errors.New("no healthy supernodes found") + } + t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy)) - t.EmitEvent(ctx, event.PhaseCompleted, map[string]interface{}{ - "phase": "supernode_selection", - "phase_number": 2, - "supernode_count": len(supernodes), - }) + return healthy, nil +} - // 3. Create client factory - t.logger.Debug(ctx, "Creating client factory", "taskID", t.TaskID) +// isServing pings the super-node once with a short timeout. +func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool { + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() - factoryConfig := net.FactoryConfig{ - LocalCosmosAddress: t.config.Network.LocalCosmosAddress, - DefaultSupernodePort: t.config.Network.DefaultSupernodePort, - } - clientFactory := net.NewClientFactory( - ctx, - t.logger, - t.keyring, - factoryConfig, - ) - - // Verify the file exists before we try to upload it - t.logger.Debug(ctx, "Verifying file exists", "taskID", t.TaskID, "filePath", t.FilePath) - fileInfo, err := os.Stat(t.FilePath) + client, err := net.NewClientFactory(ctx, t.logger, nil, net.FactoryConfig{}).CreateClient(ctx, sn) if err != nil { - t.Status = StatusFailed - t.Err = fmt.Errorf("failed to access file: %w", err) - - t.logger.Error(ctx, "Failed to access file", - "taskID", t.TaskID, - "filePath", t.FilePath, - "error", err) - - t.EmitEvent(ctx, event.PhaseFailed, map[string]interface{}{ - "phase": "upload", - "phase_number": 3, - "error": t.Err.Error(), - }) + return false + } + defer client.Close(ctx) - t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ - "phase": "file_access", - "error": t.Err.Error(), - }) + resp, err := client.HealthCheck(ctx) + return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING +} - return t.Err +func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { + factoryCfg := net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, } + clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, factoryCfg) - // 5. Create upload request - filename := filepath.Base(t.FilePath) - t.logger.Debug(ctx, "Creating upload request", - "taskID", t.TaskID, - "filename", filename, - "actionID", t.ActionID, - "fileHash", t.FileHash, - "fileSize", fileInfo.Size(), - "hasSignedData", t.SignedData != "") - - uploadRequest := &supernodeservice.UploadInputDataRequest{ - Filename: filename, + req := &supernodeservice.UploadInputDataRequest{ + Filename: filepath.Base(t.FilePath), ActionID: t.ActionID, DataHash: t.FileHash, SignedData: t.SignedData, - FilePath: t.FilePath, // Pass the file path to the request + FilePath: t.FilePath, } - // 6. Upload Phase - Try each supernode until success - t.logger.Debug(ctx, "Starting upload phase", - "taskID", t.TaskID, - "supernodeCount", len(supernodes)) - - t.EmitEvent(ctx, event.PhaseStarted, map[string]interface{}{ - "phase": "upload", - "phase_number": 3, - "total_phases": 3, - "supernode_count": len(supernodes), - }) - var lastErr error - for i, sn := range supernodes { - t.logger.Debug(ctx, "Attempting upload to supernode", - "taskID", t.TaskID, - "supernodeIndex", i+1, - "supernodeAddress", sn.CosmosAddress, - "endpoint", sn.GrpcEndpoint) - - // Emit supernode attempt event - t.EmitEvent(ctx, event.SupernodeAttempt, map[string]interface{}{ - "phase": "upload", - "supernode_index": i + 1, - "total_supernodes": len(supernodes), - "supernode_address": sn.CosmosAddress, - "supernode_endpoint": sn.GrpcEndpoint, - }) - - // Try to upload to this supernode - success, err := t.tryUploadToSupernode(ctx, clientFactory, sn, uploadRequest) - if err != nil { + for idx, sn := range supernodes { + if err := t.attemptRegistration(ctx, idx, sn, clientFactory, req); err != nil { lastErr = err - - t.logger.Error(ctx, "Failed to upload to supernode", - "taskID", t.TaskID, - "supernodeIndex", i+1, - "supernodeAddress", sn.CosmosAddress, - "error", err) - - // Emit supernode failed event - t.EmitEvent(ctx, event.SupernodeFailed, map[string]interface{}{ - "phase": "upload", - "supernode_index": i + 1, - "supernode_address": sn.CosmosAddress, - "error": err.Error(), - }) - continue } - - if success { - t.logger.Info(ctx, "Successfully uploaded to supernode", - "taskID", t.TaskID, - "supernodeIndex", i+1, - "supernodeAddress", sn.CosmosAddress) - - // Emit supernode success event - t.EmitEvent(ctx, event.SupernodeSucceeded, map[string]interface{}{ - "phase": "upload", - "supernode_index": i + 1, - "supernode_address": sn.CosmosAddress, - }) - - // Emit phase completed event - t.EmitEvent(ctx, event.PhaseCompleted, map[string]interface{}{ - "phase": "upload", - "phase_number": 3, - "attempts": i + 1, - "successful_supernode": sn.CosmosAddress, - }) - - // Emit task completed event - t.EmitEvent(ctx, event.TaskCompleted, map[string]interface{}{ - "total_phases": 3, - "successful_supernode": sn.CosmosAddress, - }) - - // Update status and return success - t.Status = StatusCompleted - return nil - } + return nil // success } - // All supernodes failed - t.Status = StatusFailed - t.Err = fmt.Errorf("all supernodes failed: %w", lastErr) - - t.logger.Error(ctx, "All supernode upload attempts failed", - "taskID", t.TaskID, - "attempts", len(supernodes), - "lastError", lastErr) - - // Emit phase failed event - t.EmitEvent(ctx, event.PhaseFailed, map[string]interface{}{ - "phase": "upload", - "phase_number": 3, - "attempts": len(supernodes), - "error": t.Err.Error(), - }) - - // Emit task failed event - t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ - "phase": "upload", - "error": t.Err.Error(), - }) - - return t.Err + return fmt.Errorf("failed to upload to all supernodes: %w", lastErr) } +func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode, + factory *net.ClientFactory, req *supernodeservice.UploadInputDataRequest) error { -// tryUploadToSupernode attempts to upload data to a single supernode -func (t *CascadeTask) tryUploadToSupernode( - ctx context.Context, - clientFactory *net.ClientFactory, - supernode lumera.Supernode, - request *supernodeservice.UploadInputDataRequest, -) (bool, error) { - // Create a client for this supernode - only need to provide the context and supernode - t.logger.Debug(ctx, "Creating client for supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress) + t.logEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", map[string]interface{}{ + "supernode": sn.GrpcEndpoint, + "sn-address": sn.CosmosAddress, + "iteration": index + 1, + }) - client, err := clientFactory.CreateClient(ctx, supernode) + client, err := factory.CreateClient(ctx, sn) if err != nil { - t.logger.Error(ctx, "Failed to create client for supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "error", err) - return false, fmt.Errorf("failed to create client for supernode %s: %w", supernode.CosmosAddress, err) + return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) } - // Ensure connection is closed when we're done with this function defer client.Close(ctx) - // Check if supernode is healthy - t.logger.Debug(ctx, "Checking supernode health", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress) - - healthCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) - healthResp, err := client.HealthCheck(healthCtx) - cancel() - - if err != nil { - t.logger.Error(ctx, "Health check failed for supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "error", err) - return false, fmt.Errorf("health check failed for supernode %s: %w", supernode.CosmosAddress, err) - } - - if healthResp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - t.logger.Warn(ctx, "Supernode is not in serving state", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "state", healthResp.Status) - return false, fmt.Errorf("supernode %s is not in serving state", supernode.CosmosAddress) - } - - // Upload data to supernode - t.logger.Debug(ctx, "Uploading data to supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "filename", request.Filename, - "filePath", request.FilePath) - - uploadCtx, cancel := context.WithTimeout(ctx, UploadTimeout) + uploadCtx, cancel := context.WithTimeout(ctx, registrationTimeout) defer cancel() - resp, err := client.UploadInputData(uploadCtx, request) + resp, err := client.UploadInputData(uploadCtx, req) if err != nil { - t.logger.Error(ctx, "Upload failed to supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "error", err) - return false, fmt.Errorf("upload failed to supernode %s: %w", supernode.CosmosAddress, err) + return fmt.Errorf("upload to %s: %w", sn.CosmosAddress, err) } - - // Check if the upload was successful based on response if !resp.Success { - t.logger.Error(ctx, "Upload rejected by supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "message", resp.Message) - return false, fmt.Errorf("upload rejected by supernode %s: %s", supernode.CosmosAddress, resp.Message) + return fmt.Errorf("upload rejected by %s: %s", sn.CosmosAddress, resp.Message) } - // Success! - t.logger.Info(ctx, "Successfully uploaded data to supernode", - "taskID", t.TaskID, - "supernodeAddress", supernode.CosmosAddress, - "message", resp.Message) - return true, nil + t.logger.Info(ctx, "upload OK", "taskID", t.TaskID, "address", sn.CosmosAddress) + return nil } -// validateAction checks if the action exists and is in PENDING state -func (t *CascadeTask) validateAction(ctx context.Context) (lumera.Action, error) { - t.logger.Debug(ctx, "Validating action", +// logEvent writes a structured log entry **and** emits the SDK event. +func (t *CascadeTask) logEvent(ctx context.Context, evt event.EventType, msg string, additionalInfo map[string]interface{}) { + // Base fields that are always present + kvs := []interface{}{ "taskID", t.TaskID, - "actionID", t.ActionID) - - action, err := t.client.GetAction(ctx, t.ActionID) - if err != nil { - t.logger.Error(ctx, "Failed to get action", - "taskID", t.TaskID, - "actionID", t.ActionID, - "error", err) - return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) + "actionID", t.ActionID, } - // Check if action exists - if action.ID == "" { - t.logger.Error(ctx, "No action found with the specified ID", - "taskID", t.TaskID, - "actionID", t.ActionID) - return lumera.Action{}, errors.New("no action found with the specified ID") + // Merge additional fields + for k, v := range additionalInfo { + kvs = append(kvs, k, v) } - // Check action state - if action.State != lumera.ACTION_STATE_PENDING { - t.logger.Error(ctx, "Action is in invalid state", - "taskID", t.TaskID, - "actionID", t.ActionID, - "state", action.State, - "expectedState", lumera.ACTION_STATE_PENDING) - return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) - } + t.logger.Info(ctx, msg, kvs...) + t.EmitEvent(ctx, evt, additionalInfo) +} - t.logger.Debug(ctx, "Action validated successfully", - "taskID", t.TaskID, - "actionID", t.ActionID, - "state", action.State, - "height", action.Height) - return action, nil +func (t *CascadeTask) fail(ctx context.Context, failureEvent event.EventType, err error) error { + t.Status = StatusFailed + t.Err = err + + t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) + + t.EmitEvent(ctx, failureEvent, map[string]interface{}{ + "error": err.Error(), + }) + t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ + "error": err.Error(), + }) + + return err } diff --git a/sdk/task/task.go b/sdk/task/task.go index b6825053..9259658a 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -57,7 +57,7 @@ type BaseTask struct { func (t *BaseTask) EmitEvent(ctx context.Context, eventType event.EventType, data map[string]interface{}) { if t.onEvent != nil { // Create event with the provided context - e := event.NewEvent(ctx, eventType, t.TaskID, string(t.TaskType), data) + e := event.NewEvent(ctx, eventType, t.TaskID, string(t.TaskType), t.ActionID, data) // Pass context to the callback t.onEvent(ctx, e) } diff --git a/supernode/cmd/supernode.go b/supernode/cmd/supernode.go index 552bcc4f..b188bee5 100644 --- a/supernode/cmd/supernode.go +++ b/supernode/cmd/supernode.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "github.com/LumeraProtocol/supernode/p2p" "github.com/LumeraProtocol/supernode/pkg/logtrace" @@ -141,7 +142,7 @@ func initLumeraClient(ctx context.Context, config *config.Config) (lumera.Client ctx, lumera.WithGRPCAddr(config.LumeraClientConfig.GRPCAddr), lumera.WithChainID(config.LumeraClientConfig.ChainID), - lumera.WithTimeout(config.LumeraClientConfig.Timeout), + lumera.WithTimeout(time.Duration(config.LumeraClientConfig.Timeout)*time.Second), ) }