-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Prevent workflow task and activity task generation when workflow is paused #8687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go
Outdated
Show resolved
Hide resolved
| err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "signal to complete the workflow") | ||
| s.NoError(err) | ||
|
|
||
| time.Sleep(2 * time.Second) // wait 2 seconds to give enough time record the signal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the sleep required with the s.EventuallyWithT below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also a failure in tests that needs to be fixed but lgtm
| } else { // if not we bypass activity task generation if eager start activity is requested. | ||
| bypassActivityTaskGeneration = eagerStartActivity | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like the else part is a duplication of L509.
| // Create a transfer task to schedule a workflow task | ||
| if !mutableState.HasPendingWorkflowTask() { | ||
| // Create a transfer task to schedule a workflow task only if the workflow is in running status and there is no pending workflow task. | ||
| if !mutableState.HasPendingWorkflowTask() && mutableState.GetExecutionState().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think workflow tasks scheduled on the task processing side goes through this code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm.. maybe intercepting in the task generator is the better? Specifically in TaskGeneratorImpl.GenerateScheduleWorkflowTaskTasks().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled all call sites of mutableState.AddWorkflowTaskScheduledEvent() individually (there were about 7-8 places).
c293cae to
3eba1df
Compare
|
This needs more work. Moving it back to draft. |
6e866e3 to
2362882
Compare
2362882 to
93c915a
Compare
This reverts commit 1440d155ae08c013cf3881a4b734f5ace77686f9.
…dleWorkflowTaskScheduling()
93c915a to
69e0e02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Missing pause check allows WFT scheduling event without task
When a workflow task completes while the workflow is paused, and conditions trigger scheduling a new workflow task (like buffered events at line 504 or wtFailedShouldCreateNewTask), the code at lines 562/568 calls AddWorkflowTaskScheduledEvent without checking pause status. This writes a WorkflowTaskScheduled event to history, but GenerateScheduleWorkflowTaskTasks skips transfer task generation due to the pause check. When the workflow is later unpaused, HasPendingWorkflowTask() returns true, so no new task is scheduled and no transfer task is generated for the existing pending task. This could leave the workflow stuck. The condition at lines 500-518 should also check IsWorkflowExecutionStatusPaused() to prevent scheduling when paused.
service/history/api/respondworkflowtaskcompleted/api.go#L499-L568
temporal/service/history/api/respondworkflowtaskcompleted/api.go
Lines 499 to 568 in 69e0e02
| newWorkflowTaskType := enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED | |
| if ms.IsWorkflowExecutionRunning() { | |
| if request.GetForceCreateNewWorkflowTask() || // Heartbeat WT is always of Normal type. | |
| wtFailedShouldCreateNewTask || | |
| hasBufferedEventsOrMessages || | |
| activityNotStartedCancelled || | |
| // If the workflow has an ongoing transition to another deployment version, we should ensure | |
| // it has a pending wft so it does not remain in the transition phase for long. | |
| ms.GetDeploymentTransition() != nil { | |
| newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL | |
| } else if updateRegistry.HasOutgoingMessages(true) { | |
| // There shouldn't be any sent updates in the registry because | |
| // all sent but not processed updates were rejected by server. | |
| // Therefore, it doesn't matter if to includeAlreadySent or not. | |
| newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE | |
| } | |
| } | |
| bypassTaskGeneration := request.GetReturnNewWorkflowTask() && wtFailedCause == nil | |
| // TODO (alex-update): All current SDKs always set ReturnNewWorkflowTask to true | |
| // which means that server always bypass task generation if WFT didn't fail. | |
| // ReturnNewWorkflowTask flag needs to be removed. | |
| if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE && !bypassTaskGeneration { | |
| // If task generation can't be bypassed (i.e. WFT has failed), | |
| // WFT must be created as Normal because speculative WFT by nature skips task generation. | |
| newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL | |
| } | |
| var newWorkflowTask *historyi.WorkflowTaskInfo | |
| // Speculative workflow task will be created after mutable state is persisted. | |
| if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_NORMAL { | |
| versioningStamp := request.WorkerVersionStamp | |
| if versioningStamp.GetUseVersioning() { | |
| if ms.GetAssignedBuildId() == "" { | |
| // old versioning is used. making sure the versioning stamp does not go through otherwise the | |
| // workflow will start using new versioning which may surprise users. | |
| // TODO: remove this block when deleting old wv [cleanup-old-wv] | |
| versioningStamp = nil | |
| } else { | |
| // new versioning is used. do not return new wft to worker if stamp build ID does not match wf build ID | |
| // let the task go through matching and get dispatched to the right worker | |
| if versioningStamp.GetBuildId() != ms.GetAssignedBuildId() { | |
| bypassTaskGeneration = false | |
| } | |
| } | |
| } | |
| if ms.GetDeploymentTransition() != nil { | |
| // Do not return new wft to worker if the workflow is transitioning to a different deployment version. | |
| // Let the task go through matching and get dispatched to the right worker | |
| bypassTaskGeneration = false | |
| } | |
| var newWTErr error | |
| // If we checked WT heartbeat timeout before and WT wasn't timed out, | |
| // then OriginalScheduledTime needs to be carried over to the new WT. | |
| if checkWTHeartbeatTimeout && !wtHeartbeatTimedOut { | |
| newWorkflowTask, newWTErr = ms.AddWorkflowTaskScheduledEventAsHeartbeat( | |
| bypassTaskGeneration, | |
| timestamppb.New(currentWorkflowTask.OriginalScheduledTime), | |
| enumsspb.WORKFLOW_TASK_TYPE_NORMAL, // Heartbeat workflow task is always of Normal type. | |
| ) | |
| } else { | |
| newWorkflowTask, newWTErr = ms.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, newWorkflowTaskType) |
| isPaused := false | ||
| for hist.HasNext() { | ||
| event, err := hist.Next() | ||
| s.NoError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Test assertion uses wrong context inside EventuallyWithT
In assertWorkflowIsPaused, line 520 uses s.NoError(err) while all other assertions in the function correctly use require.NoError(t, err) with the passed *assert.CollectT. This function is called inside EventuallyWithT, which expects assertions to use the provided CollectT for retry logic. Using s.NoError() will cause the test to fail immediately on error rather than allowing EventuallyWithT to retry, leading to potentially flaky tests or incorrect failure behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think task_refresher needs to be updated as well to not regenerated activity & workflow task. and yeah I think I'd prefer the logic lives in refresher not task generator, and keeps generator logic simple/straightforward.
| return nil, serviceerror.NewNotFound("Workflow task not found.") | ||
| } | ||
|
|
||
| // We don't accept the request to create a new workflow task if the workflow is paused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seems weird/inconsistent to me that if workflow task is not heartbeating, we accept the result, otherwise we drop the response and let it timeout. But I understand this is what's been agreed on.
| if r.mutableState.GetExecutionState().Status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED { | ||
| return nil // we bypass task generation if the workflow is paused. | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I don't think you need this? Upper layer should already prevented workflow task in mutable state from being created in the first place?
What changed?
Why?
When a workflow is paused, we should not generate any more new activity and workflow tasks.
How did you test it?
Note
Prevents workflow/activities from scheduling or being updated while paused, adds guards across APIs and task generation, and introduces tests to verify paused behavior.
RespondWorkflowTaskCompleted: rejectForceCreateNewWorkflowTaskifms.IsWorkflowExecutionStatusPaused().workflow_task_completed_handler: bypass activity task generation and disable eager start when paused.signalWithStartWorkflow: skip scheduling a new workflow task when paused.UpdateWorkflowrejects updates if paused; shared update util only schedules WFT if not paused.GenerateScheduleWorkflowTaskTasksandScheduleWorkflowTaskno-op when paused.IsWorkflowExecutionStatusPaused()implementation and interface plumbing.Written by Cursor Bugbot for commit 69e0e02. This will update automatically on new commits. Configure here.