-
Couldn't load subscription status.
- Fork 38
run a step inside a go routine #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@af-md thanks for the PR! The approach has the right fundamentals, i.e., I think we can make this simpler by simply having a package level With respect to what chans = make([]chan int, 3)
for := range 10 {
resChan1, err := dbos.Go(ctx, StepFnClosure)
// handle err
}
// Read from each channel hereThe res channel can hold types similar to the workflow outcome chan: |
|
@maxdml does this feature have any conflict with what @apoliakov said about pre generating stepIDs: https://discord.com/channels/1156433345631232100/1166779411920597002/1413954852618244267 It makes sense to run steps inside Go routines - as they tend to be better performant compared to standard execution - however the users should be advised to write their code to wait for a step to complete (committed into DB) and then move onto the next step? probably that's what you were thinking of anyway... |
Ah... what I said was a comment on how Python works. Here we may have an opportunity to make it act differently. But Max or Peter will need to opine on that |
|
@maxdml @apoliakov there is a small misunderstanding here. The problem we are solving with this PR is the non deterministic generation of stepIDs, resulting from the execution of steps in goroutines. What this PR will do is to serialize the generation of step IDs from within the workflow. That way, step IDs will be generated deterministically, before the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The low level implementation looks good, see my comments for the test.
I am realizing we need to change the API to support mocking. Specifically we should have a mirror Go method on the DBOSContext interface, that would be typeless (returns (stepOutcome[any], error)). The reason we've been doing this for all DBOS methods is to allow the mocking of DBOSContext in users' tests.
The package-level Go would, like the package-level RunAsStep does with its interface counterpart, call the interface Go with a typed-erased function and set the stepName in the options.
The interface level Go will do the step introspection, increment the stepID, then call the interface level RunAsStep and return a typeless (stepOutcome[any]) channel which we can pipe to the generic one (see an example in RunWorkflow.)
dbos/workflows_test.go
Outdated
| // Test step IDs are deterministic and in the order of execution | ||
| steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) | ||
| require.NoError(t, err, "failed to get workflow steps") | ||
| require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps)) | ||
| for i := 0; i < numSteps; i++ { | ||
| assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not test what you think it does: GetWorkflowSteps returns all the workflow steps sorted by ascending step ID, so you're testing the SQL, not the step ID attribution.
The way to test this would be to have each step take their ID as input and return their ID. Then, you can iterate over the channels and make sure that the iterator number == the step result from the channel.
- Channels should be ordered by stepID
- If the correct ID was attributed to the step, the step return value will be equal to the channel iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a second part, we should also exercise the recovery part, either by running recoverPendingWorkflow, or simply by executing the workflow again with the same workflowID. If there was a non determinism step attribution, DBOS would throw an error during the second RunAsStep -- which it shouldn't.
For this to happen, however, we must ensure the workflow stays PENDING and does not return in the first, run, which we can achieve with an event (see this example). (If we don't do that, re-running the workflow will just get the workflow outcome, rather than going through the steps again.
…implify result handling
…uce stepWithSleep function
…step ID generation
…es with result channel
…custom output types
…y removing unnecessary line
…nism checks in Go workflows
5ce5275 to
f01ca09
Compare
…e related function signatures
closes #90
Summary
Adds support for running steps inside goroutines with deterministic step ID generation.
This is by no means the final solution, it's a PR to get feedback.
Problem
Currently, running steps inside goroutines causes non-deterministic step ID generation due to race conditions:
Solution
Pre-generate step IDs before launching goroutines:
Open Questions
ToDos