From 14470ec1ded434f627581356cd8e40d2935b11f5 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 15 May 2025 13:18:06 +0500 Subject: [PATCH 1/2] sdk improvements --- sdk/action/testutil/mocks/client_mock.go | 143 ------------------ .../lumera/testutil/mocks/lumera_mock.go | 87 ----------- sdk/adapters/supernodeservice/adapter.go | 8 +- .../testutil/mocks/cascade_service_mock.go | 69 --------- sdk/adapters/supernodeservice/types.go | 2 +- sdk/event/bus.go | 25 +-- sdk/event/keys.go | 21 +++ sdk/event/types.go | 19 ++- sdk/task/cache.go | 33 +++- sdk/task/cache_test.go | 122 --------------- sdk/task/cascade.go | 121 +++++---------- sdk/task/manager.go | 39 ++++- sdk/task/task.go | 22 ++- sdk/task/testutil/mocks/manager_mock.go | 117 -------------- 14 files changed, 171 insertions(+), 657 deletions(-) delete mode 100644 sdk/action/testutil/mocks/client_mock.go delete mode 100644 sdk/adapters/lumera/testutil/mocks/lumera_mock.go delete mode 100644 sdk/adapters/supernodeservice/testutil/mocks/cascade_service_mock.go create mode 100644 sdk/event/keys.go delete mode 100644 sdk/task/cache_test.go delete mode 100644 sdk/task/testutil/mocks/manager_mock.go diff --git a/sdk/action/testutil/mocks/client_mock.go b/sdk/action/testutil/mocks/client_mock.go deleted file mode 100644 index da7b1a12..00000000 --- a/sdk/action/testutil/mocks/client_mock.go +++ /dev/null @@ -1,143 +0,0 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - event "github.com/LumeraProtocol/supernode/sdk/event" - mock "github.com/stretchr/testify/mock" - - task "github.com/LumeraProtocol/supernode/sdk/task" -) - -// Client is an autogenerated mock type for the Client type -type Client struct { - mock.Mock -} - -// DeleteTask provides a mock function with given fields: ctx, taskID -func (_m *Client) DeleteTask(ctx context.Context, taskID string) error { - ret := _m.Called(ctx, taskID) - - if len(ret) == 0 { - panic("no return value specified for DeleteTask") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, taskID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// GetTask provides a mock function with given fields: ctx, taskID -func (_m *Client) GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) { - ret := _m.Called(ctx, taskID) - - if len(ret) == 0 { - panic("no return value specified for GetTask") - } - - var r0 *task.TaskEntry - var r1 bool - if rf, ok := ret.Get(0).(func(context.Context, string) (*task.TaskEntry, bool)); ok { - return rf(ctx, taskID) - } - if rf, ok := ret.Get(0).(func(context.Context, string) *task.TaskEntry); ok { - r0 = rf(ctx, taskID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*task.TaskEntry) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { - r1 = rf(ctx, taskID) - } else { - r1 = ret.Get(1).(bool) - } - - return r0, r1 -} - -// StartCascade provides a mock function with given fields: ctx, data, actionID -func (_m *Client) StartCascade(ctx context.Context, data []byte, actionID string) (string, error) { - ret := _m.Called(ctx, data, actionID) - - if len(ret) == 0 { - panic("no return value specified for StartCascade") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []byte, string) (string, error)); ok { - return rf(ctx, data, actionID) - } - if rf, ok := ret.Get(0).(func(context.Context, []byte, string) string); ok { - r0 = rf(ctx, data, actionID) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, []byte, string) error); ok { - r1 = rf(ctx, data, actionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// SubscribeToAllEvents provides a mock function with given fields: ctx, handler -func (_m *Client) SubscribeToAllEvents(ctx context.Context, handler event.Handler) error { - ret := _m.Called(ctx, handler) - - if len(ret) == 0 { - panic("no return value specified for SubscribeToAllEvents") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, event.Handler) error); ok { - r0 = rf(ctx, handler) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// SubscribeToEvents provides a mock function with given fields: ctx, eventType, handler -func (_m *Client) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error { - ret := _m.Called(ctx, eventType, handler) - - if len(ret) == 0 { - panic("no return value specified for SubscribeToEvents") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, event.EventType, event.Handler) error); ok { - r0 = rf(ctx, eventType, handler) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewClient(t interface { - mock.TestingT - Cleanup(func()) -}) *Client { - mock := &Client{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/sdk/adapters/lumera/testutil/mocks/lumera_mock.go b/sdk/adapters/lumera/testutil/mocks/lumera_mock.go deleted file mode 100644 index 683bab87..00000000 --- a/sdk/adapters/lumera/testutil/mocks/lumera_mock.go +++ /dev/null @@ -1,87 +0,0 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - lumera "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" - mock "github.com/stretchr/testify/mock" -) - -// Client is an autogenerated mock type for the Client type -type Client struct { - mock.Mock -} - -// GetAction provides a mock function with given fields: ctx, actionID -func (_m *Client) GetAction(ctx context.Context, actionID string) (lumera.Action, error) { - ret := _m.Called(ctx, actionID) - - if len(ret) == 0 { - panic("no return value specified for GetAction") - } - - var r0 lumera.Action - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (lumera.Action, error)); ok { - return rf(ctx, actionID) - } - if rf, ok := ret.Get(0).(func(context.Context, string) lumera.Action); ok { - r0 = rf(ctx, actionID) - } else { - r0 = ret.Get(0).(lumera.Action) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, actionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetSupernodes provides a mock function with given fields: ctx, height -func (_m *Client) GetSupernodes(ctx context.Context, height int64) ([]lumera.Supernode, error) { - ret := _m.Called(ctx, height) - - if len(ret) == 0 { - panic("no return value specified for GetSupernodes") - } - - var r0 []lumera.Supernode - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) ([]lumera.Supernode, error)); ok { - return rf(ctx, height) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) []lumera.Supernode); ok { - r0 = rf(ctx, height) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]lumera.Supernode) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, height) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewClient(t interface { - mock.TestingT - Cleanup(func()) -}) *Client { - mock := &Client{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 3d5fe314..f7013907 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -138,7 +138,13 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca a.logger.Info(ctx, "Supernode progress update received", "event_type", resp.EventType, "message", resp.Message, "tx_hash", resp.TxHash, "task_id", in.TaskId, "action_id", in.ActionID) if in.EventLogger != nil { - in.EventLogger(ctx, toSdkEvent(resp.EventType), resp.Message, nil) + in.EventLogger(ctx, toSdkEvent(resp.EventType), resp.Message, event.EventData{ + event.KeyEventType: resp.EventType, + event.KeyMessage: resp.Message, + event.KeyTxHash: resp.TxHash, + event.KeyTaskID: in.TaskId, + event.KeyActionID: in.ActionID, + }) } // Optionally capture the final response diff --git a/sdk/adapters/supernodeservice/testutil/mocks/cascade_service_mock.go b/sdk/adapters/supernodeservice/testutil/mocks/cascade_service_mock.go deleted file mode 100644 index 09562581..00000000 --- a/sdk/adapters/supernodeservice/testutil/mocks/cascade_service_mock.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - grpc "google.golang.org/grpc" - - mock "github.com/stretchr/testify/mock" - - supernodeservice "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" -) - -// CascadeServiceClient is an autogenerated mock type for the CascadeServiceClient type -type CascadeServiceClient struct { - mock.Mock -} - -// CascadeSupernodeRegister provides a mock function with given fields: ctx, in, opts -func (_m *CascadeServiceClient) CascadeSupernodeRegister(ctx context.Context, in *supernodeservice.CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeRegisterResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - if len(ret) == 0 { - panic("no return value specified for CascadeSupernodeRegister") - } - - var r0 *supernodeservice.CascadeSupernodeRegisterResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *supernodeservice.CascadeSupernodeRegisterRequest, ...grpc.CallOption) (*supernodeservice.CascadeSupernodeRegisterResponse, error)); ok { - return rf(ctx, in, opts...) - } - if rf, ok := ret.Get(0).(func(context.Context, *supernodeservice.CascadeSupernodeRegisterRequest, ...grpc.CallOption) *supernodeservice.CascadeSupernodeRegisterResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*supernodeservice.CascadeSupernodeRegisterResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *supernodeservice.CascadeSupernodeRegisterRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewCascadeServiceClient creates a new instance of CascadeServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewCascadeServiceClient(t interface { - mock.TestingT - Cleanup(func()) -}) *CascadeServiceClient { - mock := &CascadeServiceClient{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index 1c864fbb..e110d1d4 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -12,7 +12,7 @@ type LoggerFunc func( ctx context.Context, eventType event.EventType, message string, - data map[string]interface{}, + data event.EventData, ) type CascadeSupernodeRegisterRequest struct { diff --git a/sdk/event/bus.go b/sdk/event/bus.go index 2ec8bba7..5e4a0889 100644 --- a/sdk/event/bus.go +++ b/sdk/event/bus.go @@ -2,9 +2,10 @@ package event import ( "context" - "github.com/LumeraProtocol/supernode/sdk/log" "runtime/debug" "sync" + + "github.com/LumeraProtocol/supernode/sdk/log" ) // Handler is a function that processes events @@ -77,12 +78,7 @@ func (b *Bus) safelyCallHandler(ctx context.Context, handler Handler, event Even // Recover from panics if r := recover(); r != nil { stackTrace := debug.Stack() - b.logger.Error(ctx, - "Event handler panicked", - "error", r, - "eventType", event.Type, - "stackTrace", string(stackTrace), - ) + b.logger.Error(ctx, "Event handler panicked", "error", r, "eventType", event.Type, "stackTrace", string(stackTrace)) } }() @@ -102,7 +98,8 @@ func copyEvent(e Event) Event { TaskID: e.TaskID, TaskType: e.TaskType, Timestamp: e.Timestamp, - Data: make(map[string]interface{}, len(e.Data)), + ActionID: e.ActionID, + Data: make(EventData, len(e.Data)), } // Copy the data map @@ -118,16 +115,11 @@ func (b *Bus) Publish(ctx context.Context, event Event) { b.mu.RLock() defer b.mu.RUnlock() - b.logger.Debug(ctx, "Publishing event", - "type", event.Type, - "taskID", event.TaskID, - "taskType", event.TaskType) + b.logger.Debug(ctx, "Publishing event", "type", event.Type, "taskID", event.TaskID, "taskType", event.TaskType) // Call type-specific handlers if handlers, exists := b.subscribers[event.Type]; exists { - b.logger.Debug(ctx, "Calling type-specific handlers", - "eventType", event.Type, - "handlerCount", len(handlers)) + b.logger.Debug(ctx, "Calling type-specific handlers", "eventType", event.Type, "handlerCount", len(handlers)) for _, handler := range handlers { b.safelyCallHandler(ctx, handler, event) @@ -136,8 +128,7 @@ func (b *Bus) Publish(ctx context.Context, event Event) { // Call wildcard handlers if len(b.wildcardHandlers) > 0 { - b.logger.Debug(ctx, "Calling wildcard handlers", - "handlerCount", len(b.wildcardHandlers)) + b.logger.Debug(ctx, "Calling wildcard handlers", "handlerCount", len(b.wildcardHandlers)) for _, handler := range b.wildcardHandlers { b.safelyCallHandler(ctx, handler, event) diff --git a/sdk/event/keys.go b/sdk/event/keys.go new file mode 100644 index 00000000..87895280 --- /dev/null +++ b/sdk/event/keys.go @@ -0,0 +1,21 @@ +package event + +// EventDataKey defines standard keys used in event data +type EventDataKey string + +const ( + // Common data keys + KeyError EventDataKey = "error" + KeyCount EventDataKey = "count" + KeySupernode EventDataKey = "supernode" + KeySupernodeAddress EventDataKey = "sn-address" + KeyIteration EventDataKey = "iteration" + KeyTxHash EventDataKey = "txhash" + KeyMessage EventDataKey = "message" + KeyProgress EventDataKey = "progress" + KeyEventType EventDataKey = "event_type" + + // Task specific keys + KeyTaskID EventDataKey = "task_id" + KeyActionID EventDataKey = "action_id" +) diff --git a/sdk/event/types.go b/sdk/event/types.go index c0002d20..d0cac2b2 100644 --- a/sdk/event/types.go +++ b/sdk/event/types.go @@ -54,14 +54,17 @@ var taskProgressSteps = []EventType{ TaskCompleted, } +// EventData is a map of event data attributes using standardized keys +type EventData map[EventDataKey]any + // Event represents an event emitted by the system type Event struct { - Type EventType // Type of event - TaskID string // ID of the task that emitted the event - TaskType string // Type of task (CASCADE, SENSE) - Timestamp time.Time // When the event occurred - ActionID string // ID of the action associated with the task - Data map[string]interface{} // Additional contextual data + Type EventType // Type of event + TaskID string // ID of the task that emitted the event + TaskType string // Type of task (CASCADE, SENSE) + Timestamp time.Time // When the event occurred + ActionID string // ID of the action associated with the task + Data EventData // Additional contextual data } // SupernodeData contains information about a supernode involved in an event @@ -70,9 +73,9 @@ type SupernodeData struct { Error string // Error message if applicable } -func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, actionID string, data map[string]interface{}) Event { +func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, actionID string, data EventData) Event { if data == nil { - data = make(map[string]interface{}) + data = make(EventData) } return Event{ diff --git a/sdk/task/cache.go b/sdk/task/cache.go index 69cbc031..dbce2087 100644 --- a/sdk/task/cache.go +++ b/sdk/task/cache.go @@ -15,8 +15,10 @@ import ( type TaskEntry struct { Task Task TaskID string + ActionID string TaskType TaskType Status TaskStatus + TxHash string Error error Events []event.Event CreatedAt time.Time @@ -63,7 +65,7 @@ func (tc *TaskCache) getOrCreateMutex(taskID string) *sync.Mutex { } // Set stores a task in the cache with initial metadata -func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType TaskType) bool { +func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType TaskType, actionID string) bool { mu := tc.getOrCreateMutex(taskID) mu.Lock() defer mu.Unlock() @@ -74,6 +76,7 @@ func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType entry := &TaskEntry{ Task: task, TaskID: taskID, + ActionID: actionID, TaskType: taskType, Status: StatusPending, Events: make([]event.Event, 0), @@ -127,6 +130,34 @@ func (tc *TaskCache) UpdateStatus(ctx context.Context, taskID string, status Tas return success } +// UpdateTxHash updates the transaction hash of a task in the cache atomically +func (tc *TaskCache) UpdateTxHash(ctx context.Context, taskID string, txHash string) bool { + mu := tc.getOrCreateMutex(taskID) + mu.Lock() + defer mu.Unlock() + + tc.logger.Info(ctx, "Updating task txHash (locked)", "taskID", taskID, "txHash", txHash) + + // Perform Get-Modify-Set within the lock + existingEntry, found := tc.cache.Get(taskID) + if !found { + tc.logger.Warn(ctx, "Cannot update txHash - task not found (locked)", "taskID", taskID) + return false // Task doesn't exist + } + + // Create a new entry with updated txHash + updatedEntry := *existingEntry // Copy the struct + updatedEntry.TxHash = txHash + updatedEntry.LastUpdatedAt = time.Now() + + // Set the modified entry back into the cache + success := tc.cache.Set(taskID, &updatedEntry, 1) + if !success { + tc.logger.Warn(ctx, "Failed to update txHash in cache (locked)", "taskID", taskID) + } + return success +} + // AddEvent adds an event to the task's event history atomically func (tc *TaskCache) AddEvent(ctx context.Context, taskID string, e event.Event) bool { mu := tc.getOrCreateMutex(taskID) diff --git a/sdk/task/cache_test.go b/sdk/task/cache_test.go deleted file mode 100644 index 18765094..00000000 --- a/sdk/task/cache_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package task - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/LumeraProtocol/supernode/sdk/event" - "github.com/LumeraProtocol/supernode/sdk/log" - "github.com/stretchr/testify/assert" -) - -func newTestCache(t *testing.T) *TaskCache { - t.Helper() - - tc, err := NewTaskCache(context.Background(), log.NewNoopLogger()) - if err != nil { - t.Fatalf("creating cache: %v", err) - } - return tc -} - -func TestTaskCache_ConcurrentAccess(t *testing.T) { - ctx := context.Background() - tc := newTestCache(t) - tc.Set(ctx, "conc", nil, TaskTypeCascade) - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - for i := 0; i < 50; i++ { - tc.UpdateStatus(ctx, "conc", StatusProcessing, nil) - tc.AddEvent(ctx, "conc", event.Event{Type: event.TaskStarted}) - } - }() - - go func() { - defer wg.Done() - tc.Del(ctx, "conc") // race with updates - }() - - done := make(chan struct{}) - go func() { wg.Wait(); close(done) }() - - select { - case <-done: - case <-time.After(2 * time.Second): - t.Fatal("concurrent operations deadlocked") - } -} - -func TestTaskCache_SetGet(t *testing.T) { - ctx := context.Background() - tc := newTestCache(t) - - tests := map[string]struct { - taskID string - taskType TaskType - }{ - "first insert": {"id1", TaskTypeCascade}, - "second insert": {"id2", TaskTypeCascade}, - } - - for name, tt := range tests { - t.Run(name, func(t *testing.T) { - tc.Set(ctx, tt.taskID, nil, tt.taskType) - tc.Wait() // <-- ensure write propagated - - entry, found := tc.Get(ctx, tt.taskID) - assert.True(t, found, "entry should exist") - assert.Equal(t, tt.taskID, entry.TaskID) - assert.Equal(t, StatusPending, entry.Status) - }) - } -} - -func TestTaskCache_UpdateStatus(t *testing.T) { - ctx := context.Background() - tc := newTestCache(t) - - tc.Set(ctx, "jobX", nil, TaskTypeCascade) - tc.Wait() - - tc.UpdateStatus(ctx, "jobX", StatusProcessing, nil) - tc.Wait() - - ent, _ := tc.Get(ctx, "jobX") - assert.Equal(t, StatusProcessing, ent.Status) -} - -func TestTaskCache_AddEvent(t *testing.T) { - ctx := context.Background() - tc := newTestCache(t) - - tc.Set(ctx, "evt1", nil, TaskTypeCascade) - tc.Wait() - - ev := event.Event{Type: event.TaskStarted, TaskID: "evt1"} - tc.AddEvent(ctx, "evt1", ev) - tc.Wait() - - ent, _ := tc.Get(ctx, "evt1") - assert.Len(t, ent.Events, 1) - assert.Equal(t, ev, ent.Events[0]) -} - -func TestTaskCache_Delete(t *testing.T) { - ctx := context.Background() - tc := newTestCache(t) - - tc.Set(ctx, "gone", nil, TaskTypeCascade) - tc.Wait() - - tc.Del(ctx, "gone") - tc.Wait() - - _, found := tc.Get(ctx, "gone") - assert.False(t, found) -} diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 34293c85..a3f7bcd2 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" @@ -39,49 +38,38 @@ func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTas // Run executes the full cascade‐task lifecycle. func (t *CascadeTask) Run(ctx context.Context) error { - t.logEvent(ctx, event.TaskStarted, "Running cascade task", nil) + t.LogEvent(ctx, event.TaskStarted, "Running cascade task", nil) - action, err := t.fetchAndValidateAction(ctx, t.ActionID) + // Use the already validated Action directly to get the height + supernodes, err := t.fetchSupernodes(ctx, t.Action.Height) if err != nil { - return t.fail(ctx, event.TaskProgressActionVerificationFailed, err) - } - t.logEvent(ctx, event.TaskProgressActionVerified, "Action verified.", nil) - - supernodes, err := t.fetchSupernodes(ctx, action.Height) - if err != nil { - return t.fail(ctx, event.TaskProgressSupernodesUnavailable, err) + t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) + t.EmitEvent(ctx, event.TaskProgressSupernodesUnavailable, event.EventData{ + event.KeyError: err.Error(), + }) + t.EmitEvent(ctx, event.TaskFailed, event.EventData{ + event.KeyError: err.Error(), + }) + return err } - t.logEvent(ctx, event.TaskProgressSupernodesFound, "Supernodes found.", map[string]interface{}{ - "count": len(supernodes), + t.LogEvent(ctx, event.TaskProgressSupernodesFound, "Supernodes found.", event.EventData{ + event.KeyCount: len(supernodes), }) if err := t.registerWithSupernodes(ctx, supernodes); err != nil { - return t.fail(ctx, event.TaskProgressRegistrationFailure, err) - } - t.logEvent(ctx, event.TaskCompleted, "Cascade task completed successfully", nil) - t.Status = StatusCompleted - - return nil -} - -// fetchAndValidateAction checks if the action exists and is in PENDING state -func (t *CascadeTask) fetchAndValidateAction(ctx context.Context, actionID string) (lumera.Action, error) { - action, err := t.client.GetAction(ctx, actionID) - if err != nil { - return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) - } - - // Check if action exists - if action.ID == "" { - return lumera.Action{}, errors.New("no action found with the specified ID") + t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) + t.EmitEvent(ctx, event.TaskProgressRegistrationFailure, event.EventData{ + event.KeyError: err.Error(), + }) + t.EmitEvent(ctx, event.TaskFailed, event.EventData{ + event.KeyError: err.Error(), + }) + return err } - // Check action state - if action.State != lumera.ACTION_STATE_PENDING { - return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) - } + t.LogEvent(ctx, event.TaskCompleted, "Cascade task completed successfully", nil) - return action, nil + return nil } func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { @@ -169,10 +157,13 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum return fmt.Errorf("failed to upload to all supernodes: %w", lastErr) } -func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { - t.logEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", map[string]interface{}{ - "supernode": sn.GrpcEndpoint, "sn-address": sn.CosmosAddress, "iteration": index + 1}) +func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { + t.LogEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: index + 1, + }) client, err := factory.CreateClient(ctx, sn) if err != nil { @@ -183,8 +174,8 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum uploadCtx, cancel := context.WithTimeout(ctx, registrationTimeout) defer cancel() - req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data map[string]interface{}) { - t.logEvent(ctx, evt, msg, data) + req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { + t.LogEvent(ctx, evt, msg, data) } resp, err := client.RegisterCascade(uploadCtx, req) if err != nil { @@ -194,10 +185,10 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum return fmt.Errorf("upload rejected by %s: %s", sn.CosmosAddress, resp.Message) } - txhash := CleanTxHash(resp.TxHash) - t.logEvent(ctx, event.TxhasReceived, "txhash received", map[string]interface{}{ - "txhash": txhash, - "supernode": sn.CosmosAddress, + // Use txhash directly without cleaning + t.LogEvent(ctx, event.TxhasReceived, "txhash received", event.EventData{ + event.KeyTxHash: resp.TxHash, + event.KeySupernode: sn.CosmosAddress, }) t.logger.Info(ctx, "upload OK", "taskID", t.TaskID, "address", sn.CosmosAddress) @@ -205,45 +196,3 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum } // logEvent writes a structured log entry **and** emits the SDK event. -func (t *CascadeTask) logEvent(ctx context.Context, evt event.EventType, msg string, additionalInfo map[string]interface{}) { - // Base fields that are always present - kvs := []interface{}{ - "taskID", t.TaskID, - "actionID", t.ActionID, - } - - // Merge additional fields - for k, v := range additionalInfo { - kvs = append(kvs, k, v) - } - - t.logger.Info(ctx, msg, kvs...) - t.EmitEvent(ctx, evt, additionalInfo) -} - -func (t *CascadeTask) fail(ctx context.Context, failureEvent event.EventType, err error) error { - t.Status = StatusFailed - t.Err = err - - t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) - - t.EmitEvent(ctx, failureEvent, map[string]interface{}{ - "error": err.Error(), - }) - t.EmitEvent(ctx, event.TaskFailed, map[string]interface{}{ - "error": err.Error(), - }) - - return err -} - -func CleanTxHash(input string) string { - // Split by colon and get the last part - parts := strings.Split(input, ":") - if len(parts) <= 1 { - return input - } - - // Return the last part with spaces trimmed - return strings.TrimSpace(parts[len(parts)-1]) -} diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 45d07523..9d29fde5 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -77,8 +77,35 @@ func NewManager(ctx context.Context, config config.Config, logger log.Logger, kr }, nil } +// validateAction checks if an action exists and is in PENDING state +// Moved the validation logic from CascadeTask to Manager +func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lumera.Action, error) { + action, err := m.lumeraClient.GetAction(ctx, actionID) + if err != nil { + return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) + } + + // Check if action exists + if action.ID == "" { + return lumera.Action{}, fmt.Errorf("no action found with the specified ID") + } + + // Check action state + if action.State != lumera.ACTION_STATE_PENDING { + return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) + } + + return action, nil +} + // CreateCascadeTask creates and starts a Cascade task using the new pattern func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, actionID string) (string, error) { + // First validate the action before creating the task + action, err := m.validateAction(ctx, actionID) + if err != nil { + return "", err + } + taskID := uuid.New().String()[:8] m.logger.Debug(ctx, "Generated task ID", "taskID", taskID) @@ -87,17 +114,19 @@ func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, ac TaskID: taskID, ActionID: actionID, TaskType: TaskTypeCascade, + Action: action, client: m.lumeraClient, keyring: m.keyring, config: m.config, onEvent: m.handleEvent, logger: m.logger, } + // Create cascade-specific task task := NewCascadeTask(baseTask, filePath, actionID) // Store task in cache - m.taskCache.Set(ctx, taskID, task, TaskTypeCascade) + m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID) // Ensure task is stored before returning m.taskCache.Wait() @@ -178,13 +207,19 @@ func (m *ManagerImpl) handleEvent(ctx context.Context, e event.Event) { m.taskCache.UpdateStatus(ctx, e.TaskID, StatusCompleted, nil) case event.TaskFailed: var err error - if errMsg, ok := e.Data["error"].(string); ok { + if errMsg, ok := e.Data[event.KeyError].(string); ok { err = fmt.Errorf("%s", errMsg) m.logger.Error(ctx, "Task failed", "taskID", e.TaskID, "taskType", e.TaskType, "error", errMsg) } else { m.logger.Error(ctx, "Task failed with unknown error", "taskID", e.TaskID, "taskType", e.TaskType) } m.taskCache.UpdateStatus(ctx, e.TaskID, StatusFailed, err) + case event.TxhasReceived: + // Capture and store transaction hash from event + if txHash, ok := e.Data[event.KeyTxHash].(string); ok && txHash != "" { + m.logger.Info(ctx, "Transaction hash received", "taskID", e.TaskID, "txHash", txHash) + m.taskCache.UpdateTxHash(ctx, e.TaskID, txHash) + } } // Forward to the global event bus if configured diff --git a/sdk/task/task.go b/sdk/task/task.go index 060eb620..aa158613 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -41,8 +41,7 @@ type BaseTask struct { TaskID string ActionID string TaskType TaskType - Status TaskStatus - Err error + Action lumera.Action // Dependencies keyring keyring.Keyring @@ -53,7 +52,7 @@ type BaseTask struct { } // EmitEvent creates and sends an event with the specified type and data -func (t *BaseTask) EmitEvent(ctx context.Context, eventType event.EventType, data map[string]interface{}) { +func (t *BaseTask) EmitEvent(ctx context.Context, eventType event.EventType, data event.EventData) { if t.onEvent != nil { // Create event with the provided context e := event.NewEvent(ctx, eventType, t.TaskID, string(t.TaskType), t.ActionID, data) @@ -61,3 +60,20 @@ func (t *BaseTask) EmitEvent(ctx context.Context, eventType event.EventType, dat t.onEvent(ctx, e) } } + +// logEvent is a helper function to log events with the task's logger +func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string, additionalInfo event.EventData) { + // Base fields that are always present + kvs := []interface{}{ + "taskID", t.TaskID, + "actionID", t.ActionID, + } + + // Merge additional fields + for k, v := range additionalInfo { + kvs = append(kvs, k, v) + } + + t.logger.Info(ctx, msg, kvs...) + t.EmitEvent(ctx, evt, additionalInfo) +} diff --git a/sdk/task/testutil/mocks/manager_mock.go b/sdk/task/testutil/mocks/manager_mock.go deleted file mode 100644 index 172c24bb..00000000 --- a/sdk/task/testutil/mocks/manager_mock.go +++ /dev/null @@ -1,117 +0,0 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - event "github.com/LumeraProtocol/supernode/sdk/event" - mock "github.com/stretchr/testify/mock" - - task "github.com/LumeraProtocol/supernode/sdk/task" -) - -// Manager is an autogenerated mock type for the Manager type -type Manager struct { - mock.Mock -} - -// CreateCascadeTask provides a mock function with given fields: ctx, data, actionID -func (_m *Manager) CreateCascadeTask(ctx context.Context, data []byte, actionID string) (string, error) { - ret := _m.Called(ctx, data, actionID) - - if len(ret) == 0 { - panic("no return value specified for CreateCascadeTask") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []byte, string) (string, error)); ok { - return rf(ctx, data, actionID) - } - if rf, ok := ret.Get(0).(func(context.Context, []byte, string) string); ok { - r0 = rf(ctx, data, actionID) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, []byte, string) error); ok { - r1 = rf(ctx, data, actionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DeleteTask provides a mock function with given fields: ctx, taskID -func (_m *Manager) DeleteTask(ctx context.Context, taskID string) error { - ret := _m.Called(ctx, taskID) - - if len(ret) == 0 { - panic("no return value specified for DeleteTask") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, taskID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// GetTask provides a mock function with given fields: ctx, taskID -func (_m *Manager) GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) { - ret := _m.Called(ctx, taskID) - - if len(ret) == 0 { - panic("no return value specified for GetTask") - } - - var r0 *task.TaskEntry - var r1 bool - if rf, ok := ret.Get(0).(func(context.Context, string) (*task.TaskEntry, bool)); ok { - return rf(ctx, taskID) - } - if rf, ok := ret.Get(0).(func(context.Context, string) *task.TaskEntry); ok { - r0 = rf(ctx, taskID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*task.TaskEntry) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { - r1 = rf(ctx, taskID) - } else { - r1 = ret.Get(1).(bool) - } - - return r0, r1 -} - -// SubscribeToAllEvents provides a mock function with given fields: ctx, handler -func (_m *Manager) SubscribeToAllEvents(ctx context.Context, handler event.Handler) { - _m.Called(ctx, handler) -} - -// SubscribeToEvents provides a mock function with given fields: ctx, eventType, handler -func (_m *Manager) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) { - _m.Called(ctx, eventType, handler) -} - -// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewManager(t interface { - mock.TestingT - Cleanup(func()) -}) *Manager { - mock := &Manager{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} From bedbee46867985e372d5cd561d125d70f2268777 Mon Sep 17 00:00:00 2001 From: Matee ullah Malik Date: Thu, 15 May 2025 15:16:43 +0500 Subject: [PATCH 2/2] Verify data hash in sdk --- sdk/action/client.go | 10 ++-- sdk/adapters/lumera/adapter.go | 34 ++++++++++++- sdk/adapters/lumera/types.go | 3 ++ sdk/net/impl.go | 1 - sdk/task/helpers.go | 82 ++++++++++++++++++++++++++++++++ sdk/task/manager.go | 30 +++--------- tests/system/e2e_cascade_test.go | 11 +++++ 7 files changed, 143 insertions(+), 28 deletions(-) create mode 100644 sdk/task/helpers.go diff --git a/sdk/action/client.go b/sdk/action/client.go index d6a3e2ff..6bbcaa95 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -16,7 +16,11 @@ import ( // //go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go type Client interface { - StartCascade(ctx context.Context, filePath string, actionID string) (string, error) + // - signature: Base64-encoded cryptographic signature of the file's data hash (blake3) + // 1- hash(blake3) > 2- sign > 3- base64 + // The signature must be created by the same account that created the Lumera action. + // It must be a digital signature of the data hash found in the action's CASCADE metadata. + StartCascade(ctx context.Context, filePath string, actionID string, signature string) (string, error) DeleteTask(ctx context.Context, taskID string) error GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error @@ -54,7 +58,7 @@ func NewClient(ctx context.Context, config config.Config, logger log.Logger, key } // StartCascade initiates a cascade operation -func (c *ClientImpl) StartCascade(ctx context.Context, filePath string, actionID string) (string, error) { +func (c *ClientImpl) StartCascade(ctx context.Context, filePath string, actionID string, signature string) (string, error) { if actionID == "" { c.logger.Error(ctx, "Empty action ID provided") return "", ErrEmptyActionID @@ -64,7 +68,7 @@ func (c *ClientImpl) StartCascade(ctx context.Context, filePath string, actionID return "", ErrEmptyData } - taskID, err := c.taskManager.CreateCascadeTask(ctx, filePath, actionID) + taskID, err := c.taskManager.CreateCascadeTask(ctx, filePath, actionID, signature) if err != nil { c.logger.Error(ctx, "Failed to create cascade task", "error", err) return "", fmt.Errorf("failed to create cascade task: %w", err) diff --git a/sdk/adapters/lumera/adapter.go b/sdk/adapters/lumera/adapter.go index 7e02638a..69f59566 100644 --- a/sdk/adapters/lumera/adapter.go +++ b/sdk/adapters/lumera/adapter.go @@ -12,12 +12,15 @@ import ( sntypes "github.com/LumeraProtocol/lumera/x/supernode/types" lumeraclient "github.com/LumeraProtocol/supernode/pkg/lumera" "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/golang/protobuf/proto" ) //go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=lumera_mock.go type Client interface { GetAction(ctx context.Context, actionID string) (Action, error) GetSupernodes(ctx context.Context, height int64) ([]Supernode, error) + DecodeCascadeMetadata(ctx context.Context, action Action) (actiontypes.CascadeMetadata, error) + VerifySignature(ctx context.Context, accountAddr string, data []byte, signature []byte) error } // ConfigParams holds configuration parameters from global config @@ -128,13 +131,42 @@ func (a *Adapter) GetSupernodes(ctx context.Context, height int64) ([]Supernode, return supernodes, nil } -func toSdkAction(resp *actiontypes.QueryGetActionResponse) Action { +func (a *Adapter) VerifySignature(ctx context.Context, accountAddr string, data, signature []byte) error { + + err := a.client.Auth().Verify(ctx, accountAddr, data, signature) + if err != nil { + a.logger.Error(ctx, "Signature verification failed", "accountAddr", accountAddr, "error", err) + return fmt.Errorf("signature verification failed: %w", err) + } + a.logger.Debug(ctx, "Signature verified successfully", "accountAddr", accountAddr) + return nil +} + +// DecodeCascadeMetadata decodes the raw metadata bytes into CascadeMetadata +func (a *Adapter) DecodeCascadeMetadata(ctx context.Context, action Action) (actiontypes.CascadeMetadata, error) { + if action.ActionType != "ACTION_TYPE_CASCADE" { + return actiontypes.CascadeMetadata{}, fmt.Errorf("action is not of type CASCADE, got %s", action.ActionType) + } + + var meta actiontypes.CascadeMetadata + if err := proto.Unmarshal(action.Metadata, &meta); err != nil { + a.logger.Error(ctx, "Failed to unmarshal cascade metadata", "actionID", action.ID, "error", err) + return meta, fmt.Errorf("failed to unmarshal cascade metadata: %w", err) + } + a.logger.Debug(ctx, "Successfully decoded cascade metadata", "actionID", action.ID) + return meta, nil +} + +func toSdkAction(resp *actiontypes.QueryGetActionResponse) Action { return Action{ ID: resp.Action.ActionID, State: ACTION_STATE(resp.Action.State.String()), Height: resp.Action.BlockHeight, ExpirationTime: resp.Action.ExpirationTime, + ActionType: resp.Action.ActionType.String(), + Metadata: resp.Action.Metadata, + Creator: resp.Action.Creator, } } diff --git a/sdk/adapters/lumera/types.go b/sdk/adapters/lumera/types.go index 429cf1f5..4c012013 100644 --- a/sdk/adapters/lumera/types.go +++ b/sdk/adapters/lumera/types.go @@ -29,6 +29,9 @@ type Action struct { State ACTION_STATE Height int64 ExpirationTime int64 + ActionType string // Type of the action (e.g., CASCADE) + Metadata []byte // Raw metadata bytes + Creator string // Creator of the action } type Supernodes []Supernode diff --git a/sdk/net/impl.go b/sdk/net/impl.go index 30f230fe..92bde1e8 100644 --- a/sdk/net/impl.go +++ b/sdk/net/impl.go @@ -1,4 +1,3 @@ -// File: impl.go package net import ( diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go new file mode 100644 index 00000000..7e2abbd2 --- /dev/null +++ b/sdk/task/helpers.go @@ -0,0 +1,82 @@ +package task + +import ( + "context" + "encoding/base64" + "fmt" + + "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" +) + +func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lumera.Action, error) { + action, err := m.lumeraClient.GetAction(ctx, actionID) + if err != nil { + return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) + } + + // Check if action exists + if action.ID == "" { + return lumera.Action{}, fmt.Errorf("no action found with the specified ID") + } + + // Check action state + if action.State != lumera.ACTION_STATE_PENDING { + return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) + } + + return action, nil +} + +// validateSignature verifies the authenticity of a signature against an action's data hash. +// +// This function performs the following steps: +// 1. Decodes the CASCADE metadata from the provided Lumera action +// 2. Extracts the base64-encoded data hash from the metadata +// 3. Decodes both the data hash and the provided signature from base64 format +// 4. Verifies the signature against the data hash using the Lumera client +// +// Parameters: +// - ctx: Context for the operation, used for cancellation and tracing +// - action: The Lumera action object containing CASCADE metadata with the data hash +// - signature: Base64-encoded signature string to verify +// +// Returns: +// - nil if the signature is valid +// - An error if any step fails, including metadata decoding issues, +// base64 decoding problems, or if the signature is invalid +// +// The signature is expected to be produced by the creator of the action, +// and the verification uses the creator's public key to validate the signature. +func (m *ManagerImpl) validateSignature(ctx context.Context, action lumera.Action, signature string) error { + // Decode the CASCADE metadata to access the data hash + cascadeMetaData, err := m.lumeraClient.DecodeCascadeMetadata(ctx, action) + if err != nil { + return fmt.Errorf("failed to decode cascade metadata: %w", err) + } + + // Extract the base64-encoded data hash from the metadata + base64EnTcketDataHash := cascadeMetaData.DataHash + + // Decode the data hash from base64 to raw bytes + dataHashBytes, err := base64.StdEncoding.DecodeString(base64EnTcketDataHash) + if err != nil { + return fmt.Errorf("failed to decode data hash: %w", err) + } + + // Decode the provided signature from base64 to raw bytes + signatureBytes, err := base64.StdEncoding.DecodeString(signature) + if err != nil { + return fmt.Errorf("failed to decode signature: %w", err) + } + + // Verify the signature using the Lumera client + // This checks if the signature was produced by the action creator + // for the given data hash + err = m.lumeraClient.VerifySignature(ctx, action.Creator, dataHashBytes, signatureBytes) + if err != nil { + m.logger.Error(ctx, "Signature validation failed", "actionID", action.ID, "error", err) + return fmt.Errorf("signature validation failed: %w", err) + } + + return nil +} diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 9d29fde5..fb5bb97b 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -19,7 +19,7 @@ const MAX_EVENT_WORKERS = 100 // //go:generate mockery --name=Manager --output=testutil/mocks --outpkg=mocks --filename=manager_mock.go type Manager interface { - CreateCascadeTask(ctx context.Context, filePath string, actionID string) (string, error) + CreateCascadeTask(ctx context.Context, filePath string, actionID string, signature string) (string, error) GetTask(ctx context.Context, taskID string) (*TaskEntry, bool) DeleteTask(ctx context.Context, taskID string) error SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) @@ -77,35 +77,19 @@ func NewManager(ctx context.Context, config config.Config, logger log.Logger, kr }, nil } -// validateAction checks if an action exists and is in PENDING state -// Moved the validation logic from CascadeTask to Manager -func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lumera.Action, error) { - action, err := m.lumeraClient.GetAction(ctx, actionID) - if err != nil { - return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) - } - - // Check if action exists - if action.ID == "" { - return lumera.Action{}, fmt.Errorf("no action found with the specified ID") - } - - // Check action state - if action.State != lumera.ACTION_STATE_PENDING { - return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State) - } - - return action, nil -} - // CreateCascadeTask creates and starts a Cascade task using the new pattern -func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, actionID string) (string, error) { +func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, actionID, signature string) (string, error) { // First validate the action before creating the task action, err := m.validateAction(ctx, actionID) if err != nil { return "", err } + // verify signature + if err := m.validateSignature(ctx, action, signature); err != nil { + return "", err + } + taskID := uuid.New().String()[:8] m.logger.Debug(ctx, "Generated task ID", "taskID", taskID) diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index a2752635..fdef64d0 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -297,6 +297,14 @@ func TestCascadeE2E(t *testing.T) { hash, err := Blake3Hash(data) b64EncodedHash := base64.StdEncoding.EncodeToString(hash) require.NoError(t, err, "Failed to compute Blake3 hash") + + // Also Create a signature for the hash + signedHash, err := keyring.SignBytes(keplrKeyring, userKeyName, hash) + require.NoError(t, err, "Failed to sign hash") + + // Encode the signed hash as base64 + signedHashBase64 := base64.StdEncoding.EncodeToString(signedHash) + // --------------------------------------- t.Log("Step 7: Creating metadata and submitting action request") @@ -437,11 +445,14 @@ func TestCascadeE2E(t *testing.T) { require.NoError(t, err, "Failed to subscribe to events") // Start cascade operation + + // t.Logf("Starting cascade operation with action ID: %s", actionID) taskID, err := actionClient.StartCascade( ctx, testFileFullpath, // path actionID, // Action ID from the transaction + signedHashBase64, // Signed hash of the file ) require.NoError(t, err, "Failed to start cascade operation") t.Logf("Cascade operation started with task ID: %s", taskID)