Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
91e733c
feat: add dbos.Go to run steps inside Go routine
af-md Sep 3, 2025
61fbc57
run step and get result using channel
af-md Sep 4, 2025
dca040f
add comments
af-md Sep 4, 2025
9c41921
append stepID to options
af-md Sep 4, 2025
4b35674
assign stepID
af-md Sep 4, 2025
bbc337e
remove extra stepID argument
af-md Sep 4, 2025
4e4d8d4
refactor: change Go function to return a channel of stepOutcome and s…
af-md Sep 6, 2025
4263549
refactor: remove Go function from DBOSContext interface
af-md Sep 6, 2025
19b2707
test: add tests for Go function execution within workflows and introd…
af-md Sep 6, 2025
c8a0ca7
fix: ensure results and errors channels are closed after workflow exe…
af-md Sep 7, 2025
956ec68
fix: include step name in error message when workflow state is not fo…
af-md Sep 7, 2025
81ada93
docs: enhance documentation for Go function, detailing its usage and …
af-md Sep 8, 2025
134949b
test: add validation for deterministic step IDs in Go workflow execution
af-md Sep 10, 2025
74deb3a
feat: add Go function to DBOSContext for executing steps in Go routin…
af-md Oct 16, 2025
d28b3c6
refactor: improve error handling in Go function and update tests for …
af-md Oct 20, 2025
9d497bd
cleanup: remove commented TODO in Go function and tidy up test code b…
af-md Oct 20, 2025
f01ca09
refactor: reorganize test code for step execution and enhance determi…
af-md Oct 23, 2025
ebeaa35
refactor: rename StepOptions to stepOptions for consistency and updat…
af-md Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type DBOSContext interface {
// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) // Starts a step inside a Go routine and returns a channel to receive the result
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
Expand Down
120 changes: 114 additions & 6 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,11 +940,12 @@ type Step[R any] func(ctx context.Context) (R, error)

// stepOptions holds the configuration for step execution using functional options pattern.
type stepOptions struct {
maxRetries int // Maximum number of retry attempts (0 = no retries)
backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0)
baseInterval time.Duration // Initial delay between retries (default: 100ms)
maxInterval time.Duration // Maximum delay between retries (default: 5s)
stepName string // Custom name for the step (defaults to function name)
maxRetries int // Maximum number of retry attempts (0 = no retries)
backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0)
baseInterval time.Duration // Initial delay between retries (default: 100ms)
maxInterval time.Duration // Maximum delay between retries (default: 5s)
stepName string // Custom name for the step (defaults to function name)
preGeneratedStepID *int // Pre generated stepID in case we want to run the function in a Go routine
}

// setDefaults applies default values to stepOptions
Expand Down Expand Up @@ -1006,6 +1007,20 @@ func WithMaxInterval(interval time.Duration) StepOption {
}
}


func WithNextStepID(stepID int) StepOption {
return func(opts *stepOptions) {
opts.preGeneratedStepID = &stepID
}
}

// StepOutcome holds the result and error from a step execution
// This struct is returned as part of a channel from the Go function when running the step inside a Go routine
type StepOutcome[R any] struct {
result R
err error
}

// RunAsStep executes a function as a durable step within a workflow.
// Steps provide at-least-once execution guarantees and automatic retry capabilities.
// If a step has already been executed (e.g., during workflow recovery), its recorded
Expand Down Expand Up @@ -1107,10 +1122,18 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
return fn(c)
}

// Get stepID if it has been pre generated
var stepID int
if stepOpts.preGeneratedStepID != nil {
stepID = *stepOpts.preGeneratedStepID
} else {
stepID = wfState.nextStepID() // crucially, this increments the step ID on the *workflow* state
}

// Setup step state
stepState := workflowState{
workflowID: wfState.workflowID,
stepID: wfState.nextStepID(), // crucially, this increments the step ID on the *workflow* state
stepID: stepID,
isWithinStep: true,
}

Expand Down Expand Up @@ -1197,6 +1220,91 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
return stepOutput, stepError
}

// Go runs a step inside a Go routine and returns a channel to receive the result.
// Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic.
// The step ID is used to track the steps within the same workflow and use the step ID to perform recovery.
// The folliwing examples shows how to use Go:
//
// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
// return "Hello, World!", nil
// })
//
// resultChan := <-resultChan // wait for the channel to receive
// if resultChan.err != nil {
// // Handle error
// }
// result := resultChan.result
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) {
if ctx == nil {
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil")
}

if fn == nil {
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil")
}

// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
opts = append(opts, WithStepName(stepName))

// Type-erase the function
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })

result, err := ctx.Go(ctx, typeErasedFn, opts...)
// Step function could return a nil result
if result == nil {
return *new(chan StepOutcome[R]), err
}

outcomeChan := make(chan StepOutcome[R], 1)
defer close(outcomeChan)

outcome := <-result

if outcome.err != nil {
outcomeChan <- StepOutcome[R]{
result: *new(R),
err: outcome.err,
}
return outcomeChan, nil
}

// Otherwise type-check and cast the result
typedResult, ok := outcome.result.(R)
if !ok {
return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result)
}
outcomeChan <- StepOutcome[R]{
result: typedResult,
err: nil,
}

return outcomeChan, nil
}

func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) {
// create a determistic step ID
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return nil, newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?")
}
stepID := wfState.nextStepID()
opts = append(opts, WithNextStepID(stepID))

// run step inside a Go routine by passing a stepID
result := make(chan StepOutcome[any], 1)
go func() {
res, err := ctx.RunAsStep(ctx, fn, opts...)
result <- StepOutcome[any]{
result: res,
err: err,
}
}()

return result, nil
}

/****************************************/
/******* WORKFLOW COMMUNICATIONS ********/
/****************************************/
Expand Down
193 changes: 193 additions & 0 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbos

import (
"context"
"encoding/gob"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -42,6 +43,11 @@ func simpleStepError(_ context.Context) (string, error) {
return "", fmt.Errorf("step failure")
}

func stepWithSleep(_ context.Context, duration time.Duration) (string, error) {
time.Sleep(duration)
return fmt.Sprintf("from step that slept for %s", duration), nil
}

func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) {
return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
return simpleStepError(ctx)
Expand Down Expand Up @@ -855,6 +861,193 @@ func TestSteps(t *testing.T) {
})
}

type stepWithSleepOutput struct {
StepID int
Result string
Error error
}

var (
stepDeterminismStartEvent *Event
stepDeterminismEvent *Event
)

func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) {
time.Sleep(duration)
return stepWithSleepOutput{
StepID: stepID,
Result: fmt.Sprintf("from step that slept for %s", duration),
Error: nil,
}, nil
}

// blocks indefinitely
func stepThatBlocks(_ context.Context) (string, error) {
stepDeterminismStartEvent.Set()
fmt.Println("stepThatBlocks: started to block")
stepDeterminismEvent.Wait()
fmt.Println("stepThatBlocks: unblocked")
return "from step that blocked", nil
}

func TestGoRunningStepsInsideGoRoutines(t *testing.T) {

dbosCtx := setupDBOS(t, true, true)

// Register custom types for Gob encoding
var stepOutput stepWithSleepOutput
gob.Register(stepOutput)
t.Run("Go must run steps inside a workflow", func(t *testing.T) {
_, err := Go(dbosCtx, func(ctx context.Context) (string, error) {
return stepWithSleep(ctx, 1*time.Second)
})
require.Error(t, err, "expected error when running step outside of workflow context, but got none")

dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
require.Equal(t, StepExecutionError, dbosErr.Code)
expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?"
require.Contains(t, err.Error(), expectedMessagePart, "expected error message to contain %q, but got %q", expectedMessagePart, err.Error())
})

t.Run("Go must return step error correctly", func(t *testing.T) {
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
result, _ := Go(dbosCtx, func(ctx context.Context) (string, error) {
return "", fmt.Errorf("step error")
})

resultChan := <-result
if resultChan.err != nil {
return "", resultChan.err
}
return resultChan.result, nil
}

RegisterWorkflow(dbosCtx, goWorkflow)

handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input")
require.NoError(t, err, "failed to run go workflow")
_, err = handle.GetResult()
require.Error(t, err, "expected error when running step, but got none")
require.Equal(t, "step error", err.Error())
})

t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) {
// run 100 steps simultaneously
const numSteps = 100
results := make(chan string, numSteps)
errors := make(chan error, numSteps)
var resultChans []<-chan StepOutcome[stepWithSleepOutput]

goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
for i := range numSteps {
resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) {
return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i)
})

if err != nil {
return "", err
}
resultChans = append(resultChans, resultChan)
}

for i, resultChan := range resultChans {
result1 := <-resultChan
if result1.err != nil {
errors <- result1.result.Error
}
assert.Equal(t, i, result1.result.StepID, "expected step ID to be %d, got %d", i, result1.result.StepID)
results <- result1.result.Result
}
return "", nil
}

RegisterWorkflow(dbosCtx, goWorkflow)
handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input")
require.NoError(t, err, "failed to run go workflow")
_, err = handle.GetResult()
close(results)
close(errors)
require.NoError(t, err, "failed to get result from go workflow")
assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results))
assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors))
})

t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) {

stepDeterminismStartEvent = NewEvent()
stepDeterminismEvent = NewEvent()

goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
_, err := Go(dbosCtx, func(ctx context.Context) (string, error) {
return stepWithSleep(ctx, 1*time.Second)
})

if err != nil {
return "", err
}

_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
return stepWithSleep(ctx, 1*time.Second)
})

if err != nil {
return "", err
}

_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
return stepWithSleep(ctx, 1*time.Second)
})

if err != nil {
return "", err
}

_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
return stepThatBlocks(ctx)
})

if err != nil {
return "", err
}

return "WORKFLOW EXECUTED DETERMINISTICALLY", nil
}

// Run the first workflow
RegisterWorkflow(dbosCtx, goWorkflow)
handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input")
require.NoError(t, err, "failed to run go workflow")

// Wait for the first workflow to reach the blocking step
stepDeterminismStartEvent.Wait()
stepDeterminismStartEvent.Clear()

// Run the second workflow
handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID()))

// If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow
require.NoError(t, err, "failed to run go workflow")

// Complete the blocked workflow
stepDeterminismEvent.Set()

_, err = handle2.GetResult()
require.NoError(t, err, "failed to get result from go workflow")

// Verify workflow status is SUCCESS
status, err := handle.GetStatus()
require.NoError(t, err, "failed to get workflow status")
require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess")


// Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY"
result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from go workflow")
require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'")
})
}

func TestChildWorkflow(t *testing.T) {
dbosCtx := setupDBOS(t, true, true)

Expand Down
Loading