From a26f77737a1f218d300f8589eb9caaac3a8b5591 Mon Sep 17 00:00:00 2001 From: Phakorn Kiong Date: Sat, 4 Oct 2025 17:32:31 +0800 Subject: [PATCH 1/4] cancellable get result --- dbos/workflow.go | 76 ++++++++++++++++++++++++++++++------- dbos/workflows_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 13 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index bdfa648..20c3484 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -82,9 +82,9 @@ type workflowOutcome[R any] struct { // The type parameter R represents the expected return type of the workflow. // Handles can be used to wait for workflow completion, check status, and retrieve results. type WorkflowHandle[R any] interface { - GetResult() (R, error) // Wait for workflow completion and return the result - GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting - GetWorkflowID() string // Get the unique workflow identifier + GetResult(opts ...GetResultOption) (R, error) // Wait for workflow completion and return the result + GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting + GetWorkflowID() string // Get the unique workflow identifier } type baseWorkflowHandle struct { @@ -92,6 +92,22 @@ type baseWorkflowHandle struct { dbosContext DBOSContext } +// GetResultOption is a functional option for configuring GetResult behavior. +type GetResultOption func(*getResultOptions) + +// getResultOptions holds the configuration for GetResult execution. +type getResultOptions struct { + timeout time.Duration +} + +// WithHandleTimeout sets a timeout for the GetResult operation. +// If the timeout is reached before the workflow completes, GetResult will return a timeout error. +func WithHandleTimeout(timeout time.Duration) GetResultOption { + return func(opts *getResultOptions) { + opts.timeout = timeout + } +} + // GetStatus returns the current status of the workflow from the database // If the DBOSContext is running in client mode, do not load input and outputs func (h *baseWorkflowHandle) GetStatus() (WorkflowStatus, error) { @@ -162,12 +178,33 @@ type workflowHandle[R any] struct { outcomeChan chan workflowOutcome[R] } -func (h *workflowHandle[R]) GetResult() (R, error) { - outcome, ok := <-h.outcomeChan // Blocking read - if !ok { - // Return an error if the channel was closed. In normal operations this would happen if GetResul() is called twice on a handler. The first call should get the buffered result, the second call find zero values (channel is empty and closed). - return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?") +func (h *workflowHandle[R]) GetResult(opts ...GetResultOption) (R, error) { + options := &getResultOptions{} + for _, opt := range opts { + opt(options) } + + var timeoutChan <-chan time.Time + if options.timeout > 0 { + timeoutChan = time.After(options.timeout) + } + + select { + case outcome, ok := <-h.outcomeChan: + if !ok { + // Return error if channel closed (happens when GetResult() called twice) + return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?") + } + return h.processOutcome(outcome) + case <-h.dbosContext.Done(): + return *new(R), h.dbosContext.Err() + case <-timeoutChan: + return *new(R), fmt.Errorf("workflow result timeout after %v", options.timeout) + } +} + +// processOutcome handles the common logic for processing workflow outcomes +func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error) { // If we are calling GetResult inside a workflow, record the result as a step result workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil @@ -198,9 +235,22 @@ type workflowPollingHandle[R any] struct { baseWorkflowHandle } -func (h *workflowPollingHandle[R]) GetResult() (R, error) { - result, err := retryWithResult(h.dbosContext, func() (any, error) { - return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(h.dbosContext, h.workflowID) +func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) { + options := &getResultOptions{} + for _, opt := range opts { + opt(options) + } + + // Use timeout if specified, otherwise use DBOS context directly + ctx := h.dbosContext + var cancel context.CancelFunc + if options.timeout > 0 { + ctx, cancel = WithTimeout(h.dbosContext, options.timeout) + defer cancel() + } + + result, err := retryWithResult(ctx, func() (any, error) { + return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) if result != nil { typedResult, ok := result.(R) @@ -240,8 +290,8 @@ type workflowHandleProxy[R any] struct { wrappedHandle WorkflowHandle[any] } -func (h *workflowHandleProxy[R]) GetResult() (R, error) { - result, err := h.wrappedHandle.GetResult() +func (h *workflowHandleProxy[R]) GetResult(opts ...GetResultOption) (R, error) { + result, err := h.wrappedHandle.GetResult(opts...) if err != nil { var zero R return zero, err diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 56b8cf8..06616ce 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -38,6 +38,12 @@ func simpleStep(_ context.Context) (string, error) { return "from step", nil } +func slowWorkflow(dbosCtx DBOSContext, input string) (string, error) { + // Simulate a slow workflow that takes time to complete + time.Sleep(500 * time.Millisecond) + return input, nil +} + func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } @@ -4523,3 +4529,82 @@ func TestWorkflowIdentity(t *testing.T) { assert.Equal(t, []string{"reader", "writer"}, status.AuthenticatedRoles) }) } + +func TestWorkflowHandleTimeout(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, simpleWorkflow) + + t.Run("WorkflowHandleTimeout", func(t *testing.T) { + // Test timeout on workflowHandle (channel-based) + handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + require.NoError(t, err, "failed to start workflow") + + // Test with a very short timeout - should timeout + start := time.Now() + _, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond)) + duration := time.Since(start) + + require.Error(t, err, "expected timeout error") + assert.Contains(t, err.Error(), "workflow result timeout") + assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") + }) + + t.Run("WorkflowHandleNoTimeout", func(t *testing.T) { + // Test without timeout - should work normally + handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + require.NoError(t, err, "failed to start workflow") + + result, err := handle.GetResult() + require.NoError(t, err, "GetResult without timeout should succeed") + assert.Equal(t, "test", result) + }) + + t.Run("WorkflowHandleGetResultAfterChannelClose", func(t *testing.T) { + // Test getting result after the outcome channel would be closed + handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + require.NoError(t, err, "failed to start workflow") + + // Get result first time - this will close the outcome channel + result1, err := handle.GetResult() + require.NoError(t, err, "first GetResult should succeed") + assert.Equal(t, "test", result1) + + // Sleep briefly to ensure channel is closed + time.Sleep(10 * time.Millisecond) + + // Get result second time - should fail since channel is closed + _, err = handle.GetResult() + require.Error(t, err, "second GetResult should fail") + assert.Contains(t, err.Error(), "workflow result channel is already closed") + }) +} + +func TestWorkflowPollingHandleTimeout(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, simpleWorkflow) + + t.Run("WorkflowPollingHandleTimeout", func(t *testing.T) { + // Test timeout on workflowPollingHandle (database polling) + handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + require.NoError(t, err, "failed to start workflow") + + // Test with a very short timeout - should timeout + start := time.Now() + _, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond)) + duration := time.Since(start) + + require.Error(t, err, "expected timeout error") + assert.Contains(t, err.Error(), "workflow result timeout after 1ms") + assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") + }) + + t.Run("WorkflowPollingHandleNoTimeout", func(t *testing.T) { + // Test without timeout - should work normally + handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + require.NoError(t, err, "failed to start workflow") + + result, err := handle.GetResult() + require.NoError(t, err, "GetResult without timeout should succeed") + assert.Equal(t, "test", result) + }) +} From d4cf31682c61617ed2caa8af3a3e3c3edb3113b1 Mon Sep 17 00:00:00 2001 From: Phakorn Kiong Date: Sat, 4 Oct 2025 18:22:09 +0800 Subject: [PATCH 2/4] tidy --- dbos/workflows_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 06616ce..7262ff5 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -38,12 +38,6 @@ func simpleStep(_ context.Context) (string, error) { return "from step", nil } -func slowWorkflow(dbosCtx DBOSContext, input string) (string, error) { - // Simulate a slow workflow that takes time to complete - time.Sleep(500 * time.Millisecond) - return input, nil -} - func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } From b2300d45b96839e3e4788670008cd2646dfa60da Mon Sep 17 00:00:00 2001 From: Phakorn Kiong Date: Sat, 4 Oct 2025 18:41:21 +0800 Subject: [PATCH 3/4] adjust tests --- dbos/workflows_test.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 7262ff5..5c27130 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -34,6 +34,11 @@ func simpleWorkflowWithStep(dbosCtx DBOSContext, input string) (string, error) { }) } +func slowWorkflow(dbosCtx DBOSContext, sleepTime time.Duration) (string, error) { + time.Sleep(sleepTime) + return "done", nil +} + func simpleStep(_ context.Context) (string, error) { return "from step", nil } @@ -4526,11 +4531,11 @@ func TestWorkflowIdentity(t *testing.T) { func TestWorkflowHandleTimeout(t *testing.T) { dbosCtx := setupDBOS(t, true, true) - RegisterWorkflow(dbosCtx, simpleWorkflow) + RegisterWorkflow(dbosCtx, slowWorkflow) t.Run("WorkflowHandleTimeout", func(t *testing.T) { // Test timeout on workflowHandle (channel-based) - handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 5*time.Second) require.NoError(t, err, "failed to start workflow") // Test with a very short timeout - should timeout @@ -4545,23 +4550,23 @@ func TestWorkflowHandleTimeout(t *testing.T) { t.Run("WorkflowHandleNoTimeout", func(t *testing.T) { // Test without timeout - should work normally - handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) require.NoError(t, err, "failed to start workflow") result, err := handle.GetResult() require.NoError(t, err, "GetResult without timeout should succeed") - assert.Equal(t, "test", result) + assert.Equal(t, "done", result) }) t.Run("WorkflowHandleGetResultAfterChannelClose", func(t *testing.T) { // Test getting result after the outcome channel would be closed - handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) require.NoError(t, err, "failed to start workflow") // Get result first time - this will close the outcome channel result1, err := handle.GetResult() require.NoError(t, err, "first GetResult should succeed") - assert.Equal(t, "test", result1) + assert.Equal(t, "done", result1) // Sleep briefly to ensure channel is closed time.Sleep(10 * time.Millisecond) @@ -4575,11 +4580,11 @@ func TestWorkflowHandleTimeout(t *testing.T) { func TestWorkflowPollingHandleTimeout(t *testing.T) { dbosCtx := setupDBOS(t, true, true) - RegisterWorkflow(dbosCtx, simpleWorkflow) + RegisterWorkflow(dbosCtx, slowWorkflow) t.Run("WorkflowPollingHandleTimeout", func(t *testing.T) { // Test timeout on workflowPollingHandle (database polling) - handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 5*time.Second) require.NoError(t, err, "failed to start workflow") // Test with a very short timeout - should timeout @@ -4594,11 +4599,11 @@ func TestWorkflowPollingHandleTimeout(t *testing.T) { t.Run("WorkflowPollingHandleNoTimeout", func(t *testing.T) { // Test without timeout - should work normally - handle, err := RunWorkflow(dbosCtx, simpleWorkflow, "test") + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) require.NoError(t, err, "failed to start workflow") result, err := handle.GetResult() require.NoError(t, err, "GetResult without timeout should succeed") - assert.Equal(t, "test", result) + assert.Equal(t, "done", result) }) } From 9217f7bc3262fbafdd0d35abec0bbb9a86328557 Mon Sep 17 00:00:00 2001 From: Phakorn Kiong Date: Tue, 7 Oct 2025 15:18:02 +0800 Subject: [PATCH 4/4] wrap error and update tests --- dbos/workflow.go | 4 +- dbos/workflows_test.go | 83 ++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 49 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 20c3484..ea5a359 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -197,9 +197,9 @@ func (h *workflowHandle[R]) GetResult(opts ...GetResultOption) (R, error) { } return h.processOutcome(outcome) case <-h.dbosContext.Done(): - return *new(R), h.dbosContext.Err() + return *new(R), context.Cause(h.dbosContext) case <-timeoutChan: - return *new(R), fmt.Errorf("workflow result timeout after %v", options.timeout) + return *new(R), fmt.Errorf("workflow result timeout after %v: %w", options.timeout, context.DeadlineExceeded) } } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 5c27130..8756efd 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -35,7 +35,7 @@ func simpleWorkflowWithStep(dbosCtx DBOSContext, input string) (string, error) { } func slowWorkflow(dbosCtx DBOSContext, sleepTime time.Duration) (string, error) { - time.Sleep(sleepTime) + Sleep(dbosCtx, sleepTime) return "done", nil } @@ -4534,76 +4534,65 @@ func TestWorkflowHandleTimeout(t *testing.T) { RegisterWorkflow(dbosCtx, slowWorkflow) t.Run("WorkflowHandleTimeout", func(t *testing.T) { - // Test timeout on workflowHandle (channel-based) - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 5*time.Second) + handle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) require.NoError(t, err, "failed to start workflow") - // Test with a very short timeout - should timeout start := time.Now() - _, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond)) + _, err = handle.GetResult(WithHandleTimeout(10 * time.Millisecond)) duration := time.Since(start) require.Error(t, err, "expected timeout error") assert.Contains(t, err.Error(), "workflow result timeout") assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") + assert.True(t, errors.Is(err, context.DeadlineExceeded), + "expected error to be detectable as context.DeadlineExceeded, got: %v", err) }) - t.Run("WorkflowHandleNoTimeout", func(t *testing.T) { - // Test without timeout - should work normally - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) + t.Run("WorkflowPollingHandleTimeout", func(t *testing.T) { + // Start a workflow that will block on the first signal + originalHandle, err := RunWorkflow(dbosCtx, slowWorkflow, 10*time.Second) require.NoError(t, err, "failed to start workflow") - result, err := handle.GetResult() - require.NoError(t, err, "GetResult without timeout should succeed") - assert.Equal(t, "done", result) - }) - - t.Run("WorkflowHandleGetResultAfterChannelClose", func(t *testing.T) { - // Test getting result after the outcome channel would be closed - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) - require.NoError(t, err, "failed to start workflow") + pollingHandle, err := RetrieveWorkflow[string](dbosCtx, originalHandle.GetWorkflowID()) + require.NoError(t, err, "failed to retrieve workflow") - // Get result first time - this will close the outcome channel - result1, err := handle.GetResult() - require.NoError(t, err, "first GetResult should succeed") - assert.Equal(t, "done", result1) + _, ok := pollingHandle.(*workflowPollingHandle[string]) + require.True(t, ok, "expected polling handle, got %T", pollingHandle) - // Sleep briefly to ensure channel is closed - time.Sleep(10 * time.Millisecond) + _, err = pollingHandle.GetResult(WithHandleTimeout(10 * time.Millisecond)) - // Get result second time - should fail since channel is closed - _, err = handle.GetResult() - require.Error(t, err, "second GetResult should fail") - assert.Contains(t, err.Error(), "workflow result channel is already closed") + require.Error(t, err, "expected timeout error") + assert.True(t, errors.Is(err, context.DeadlineExceeded), + "expected error to be detectable as context.DeadlineExceeded, got: %v", err) }) } -func TestWorkflowPollingHandleTimeout(t *testing.T) { +func TestWorkflowHandleContextCancel(t *testing.T) { dbosCtx := setupDBOS(t, true, true) - RegisterWorkflow(dbosCtx, slowWorkflow) + RegisterWorkflow(dbosCtx, getEventWorkflow) - t.Run("WorkflowPollingHandleTimeout", func(t *testing.T) { - // Test timeout on workflowPollingHandle (database polling) - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 5*time.Second) + t.Run("WorkflowHandleContextCancel", func(t *testing.T) { + getEventWorkflowStartedSignal.Clear() + handle, err := RunWorkflow(dbosCtx, getEventWorkflow, getEventWorkflowInput{ + TargetWorkflowID: "test-workflow-id", + Key: "test-key", + }) require.NoError(t, err, "failed to start workflow") - // Test with a very short timeout - should timeout - start := time.Now() - _, err = handle.GetResult(WithHandleTimeout(1 * time.Millisecond)) - duration := time.Since(start) + resultChan := make(chan error) + go func() { + _, err := handle.GetResult() + resultChan <- err + }() - require.Error(t, err, "expected timeout error") - assert.Contains(t, err.Error(), "workflow result timeout after 1ms") - assert.True(t, duration < 100*time.Millisecond, "timeout should occur quickly") - }) + getEventWorkflowStartedSignal.Wait() + getEventWorkflowStartedSignal.Clear() - t.Run("WorkflowPollingHandleNoTimeout", func(t *testing.T) { - // Test without timeout - should work normally - handle, err := RunWorkflow(dbosCtx, slowWorkflow, 1*time.Millisecond) - require.NoError(t, err, "failed to start workflow") + dbosCtx.Shutdown(1 * time.Second) - result, err := handle.GetResult() - require.NoError(t, err, "GetResult without timeout should succeed") - assert.Equal(t, "done", result) + err = <-resultChan + require.Error(t, err, "expected error from cancelled context") + assert.True(t, errors.Is(err, context.Canceled), + "expected error to be detectable as context.Canceled, got: %v", err) }) }