diff --git a/dbos/dbos.go b/dbos/dbos.go index f9aa829..657dd81 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -111,6 +111,7 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution + Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) // Starts a step inside a Go routine and returns a channel to receive the result Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow diff --git a/dbos/workflow.go b/dbos/workflow.go index bdfa648..1ac1160 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -940,11 +940,12 @@ type Step[R any] func(ctx context.Context) (R, error) // stepOptions holds the configuration for step execution using functional options pattern. type stepOptions struct { - maxRetries int // Maximum number of retry attempts (0 = no retries) - backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) - baseInterval time.Duration // Initial delay between retries (default: 100ms) - maxInterval time.Duration // Maximum delay between retries (default: 5s) - stepName string // Custom name for the step (defaults to function name) + maxRetries int // Maximum number of retry attempts (0 = no retries) + backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) + baseInterval time.Duration // Initial delay between retries (default: 100ms) + maxInterval time.Duration // Maximum delay between retries (default: 5s) + stepName string // Custom name for the step (defaults to function name) + preGeneratedStepID *int // Pre generated stepID in case we want to run the function in a Go routine } // setDefaults applies default values to stepOptions @@ -1006,6 +1007,20 @@ func WithMaxInterval(interval time.Duration) StepOption { } } + +func WithNextStepID(stepID int) StepOption { + return func(opts *stepOptions) { + opts.preGeneratedStepID = &stepID + } +} + +// StepOutcome holds the result and error from a step execution +// This struct is returned as part of a channel from the Go function when running the step inside a Go routine +type StepOutcome[R any] struct { + result R + err error +} + // RunAsStep executes a function as a durable step within a workflow. // Steps provide at-least-once execution guarantees and automatic retry capabilities. // If a step has already been executed (e.g., during workflow recovery), its recorded @@ -1107,10 +1122,18 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return fn(c) } + // Get stepID if it has been pre generated + var stepID int + if stepOpts.preGeneratedStepID != nil { + stepID = *stepOpts.preGeneratedStepID + } else { + stepID = wfState.nextStepID() // crucially, this increments the step ID on the *workflow* state + } + // Setup step state stepState := workflowState{ workflowID: wfState.workflowID, - stepID: wfState.nextStepID(), // crucially, this increments the step ID on the *workflow* state + stepID: stepID, isWithinStep: true, } @@ -1197,6 +1220,91 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return stepOutput, stepError } +// Go runs a step inside a Go routine and returns a channel to receive the result. +// Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic. +// The step ID is used to track the steps within the same workflow and use the step ID to perform recovery. +// The folliwing examples shows how to use Go: +// +// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) { +// return "Hello, World!", nil +// }) +// +// resultChan := <-resultChan // wait for the channel to receive +// if resultChan.err != nil { +// // Handle error +// } +// result := resultChan.result +func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) { + if ctx == nil { + return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil") + } + + if fn == nil { + return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil") + } + + // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + opts = append(opts, WithStepName(stepName)) + + // Type-erase the function + typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + + result, err := ctx.Go(ctx, typeErasedFn, opts...) + // Step function could return a nil result + if result == nil { + return *new(chan StepOutcome[R]), err + } + + outcomeChan := make(chan StepOutcome[R], 1) + defer close(outcomeChan) + + outcome := <-result + + if outcome.err != nil { + outcomeChan <- StepOutcome[R]{ + result: *new(R), + err: outcome.err, + } + return outcomeChan, nil + } + + // Otherwise type-check and cast the result + typedResult, ok := outcome.result.(R) + if !ok { + return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) + } + outcomeChan <- StepOutcome[R]{ + result: typedResult, + err: nil, + } + + return outcomeChan, nil +} + +func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) { + // create a determistic step ID + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { + return nil, newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") + } + stepID := wfState.nextStepID() + opts = append(opts, WithNextStepID(stepID)) + + // run step inside a Go routine by passing a stepID + result := make(chan StepOutcome[any], 1) + go func() { + res, err := ctx.RunAsStep(ctx, fn, opts...) + result <- StepOutcome[any]{ + result: res, + err: err, + } + }() + + return result, nil +} + /****************************************/ /******* WORKFLOW COMMUNICATIONS ********/ /****************************************/ diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 56b8cf8..892cdb5 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2,6 +2,7 @@ package dbos import ( "context" + "encoding/gob" "errors" "fmt" "reflect" @@ -42,6 +43,11 @@ func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } +func stepWithSleep(_ context.Context, duration time.Duration) (string, error) { + time.Sleep(duration) + return fmt.Sprintf("from step that slept for %s", duration), nil +} + func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStepError(ctx) @@ -855,6 +861,193 @@ func TestSteps(t *testing.T) { }) } +type stepWithSleepOutput struct { + StepID int + Result string + Error error +} + +var ( + stepDeterminismStartEvent *Event + stepDeterminismEvent *Event +) + +func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) { + time.Sleep(duration) + return stepWithSleepOutput{ + StepID: stepID, + Result: fmt.Sprintf("from step that slept for %s", duration), + Error: nil, + }, nil +} + +// blocks indefinitely +func stepThatBlocks(_ context.Context) (string, error) { + stepDeterminismStartEvent.Set() + fmt.Println("stepThatBlocks: started to block") + stepDeterminismEvent.Wait() + fmt.Println("stepThatBlocks: unblocked") + return "from step that blocked", nil +} + +func TestGoRunningStepsInsideGoRoutines(t *testing.T) { + + dbosCtx := setupDBOS(t, true, true) + + // Register custom types for Gob encoding + var stepOutput stepWithSleepOutput + gob.Register(stepOutput) + t.Run("Go must run steps inside a workflow", func(t *testing.T) { + _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + require.Error(t, err, "expected error when running step outside of workflow context, but got none") + + dbosErr, ok := err.(*DBOSError) + require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) + require.Equal(t, StepExecutionError, dbosErr.Code) + expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?" + require.Contains(t, err.Error(), expectedMessagePart, "expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) + }) + + t.Run("Go must return step error correctly", func(t *testing.T) { + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + result, _ := Go(dbosCtx, func(ctx context.Context) (string, error) { + return "", fmt.Errorf("step error") + }) + + resultChan := <-result + if resultChan.err != nil { + return "", resultChan.err + } + return resultChan.result, nil + } + + RegisterWorkflow(dbosCtx, goWorkflow) + + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + _, err = handle.GetResult() + require.Error(t, err, "expected error when running step, but got none") + require.Equal(t, "step error", err.Error()) + }) + + t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) { + // run 100 steps simultaneously + const numSteps = 100 + results := make(chan string, numSteps) + errors := make(chan error, numSteps) + var resultChans []<-chan StepOutcome[stepWithSleepOutput] + + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + for i := range numSteps { + resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) { + return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i) + }) + + if err != nil { + return "", err + } + resultChans = append(resultChans, resultChan) + } + + for i, resultChan := range resultChans { + result1 := <-resultChan + if result1.err != nil { + errors <- result1.result.Error + } + assert.Equal(t, i, result1.result.StepID, "expected step ID to be %d, got %d", i, result1.result.StepID) + results <- result1.result.Result + } + return "", nil + } + + RegisterWorkflow(dbosCtx, goWorkflow) + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + _, err = handle.GetResult() + close(results) + close(errors) + require.NoError(t, err, "failed to get result from go workflow") + assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) + assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) + }) + + t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) { + + stepDeterminismStartEvent = NewEvent() + stepDeterminismEvent = NewEvent() + + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepThatBlocks(ctx) + }) + + if err != nil { + return "", err + } + + return "WORKFLOW EXECUTED DETERMINISTICALLY", nil + } + + // Run the first workflow + RegisterWorkflow(dbosCtx, goWorkflow) + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + + // Wait for the first workflow to reach the blocking step + stepDeterminismStartEvent.Wait() + stepDeterminismStartEvent.Clear() + + // Run the second workflow + handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID())) + + // If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow + require.NoError(t, err, "failed to run go workflow") + + // Complete the blocked workflow + stepDeterminismEvent.Set() + + _, err = handle2.GetResult() + require.NoError(t, err, "failed to get result from go workflow") + + // Verify workflow status is SUCCESS + status, err := handle.GetStatus() + require.NoError(t, err, "failed to get workflow status") + require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess") + + + // Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY" + result, err := handle.GetResult() + require.NoError(t, err, "failed to get result from go workflow") + require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'") + }) +} + func TestChildWorkflow(t *testing.T) { dbosCtx := setupDBOS(t, true, true)