Skip to content

Commit 8ec58d8

Browse files
authored
Fix bug in CHASM activity schedule-to-close timer task validation (#8720)
## What changed? - Fix bug: schedule-to-close timer task validator was incorrectly requiring activity attempt at task execution time to be equal to activity attempt at task creation - Add test of schedule-to-close timeout that fails with the bug fix reverted - Do not set empty struct as outcome failure on attempt failure when retries are exhausted. - Improve doc comments ## Why? - Standalone activity schedule-to-close was incorrect: would not have fired after attempt 1 without this fix - Setting empty struct on attempt failure when retries are exhausted should not be necessary and it is fragile to introduce special values that code might start to rely on. ## How did you test it? - [x] built - [x] added new functional test(s)
1 parent 1e2af3d commit 8ec58d8

File tree

5 files changed

+68
-34
lines changed

5 files changed

+68
-34
lines changed

chasm/lib/activity/activity_tasks.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,12 @@ func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor
103103
}
104104

105105
func (e *scheduleToCloseTimeoutTaskExecutor) Validate(
106-
ctx chasm.Context,
106+
_ chasm.Context,
107107
activity *Activity,
108108
_ chasm.TaskAttributes,
109-
task *activitypb.ScheduleToCloseTimeoutTask,
109+
_ *activitypb.ScheduleToCloseTimeoutTask,
110110
) (bool, error) {
111-
attempt, err := activity.LastAttempt.Get(ctx)
112-
if err != nil {
113-
return false, err
114-
}
115-
116-
valid := TransitionTimedOut.Possible(activity) && task.Attempt == attempt.Count
117-
return valid, nil
111+
return TransitionTimedOut.Possible(activity), nil
118112
}
119113

120114
func (e *scheduleToCloseTimeoutTaskExecutor) Execute(

chasm/lib/activity/gen/activitypb/v1/tasks.pb.go

Lines changed: 3 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/proto/v1/tasks.proto

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ message ScheduleToStartTimeoutTask {
1717
}
1818

1919
message ScheduleToCloseTimeoutTask {
20-
// The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we
21-
// need to guard against any concurrent operations where the originally intended task may be outdated.
22-
int32 attempt = 1;
2320
}
2421

2522
message StartToCloseTimeoutTask {

chasm/lib/activity/statemachine.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ var TransitionScheduled = chasm.NewTransition(
6262
chasm.TaskAttributes{
6363
ScheduledTime: currentTime.Add(timeout),
6464
},
65-
&activitypb.ScheduleToCloseTimeoutTask{
66-
Attempt: attempt.GetCount(),
67-
})
65+
&activitypb.ScheduleToCloseTimeoutTask{})
6866
}
6967

7068
ctx.AddTask(

tests/standalone_activity_test.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,67 @@ func (s *standaloneActivityTestSuite) TestCompletedActivity_CannotTerminate() {
760760
require.Error(t, err)
761761
}
762762

763+
func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout_WithRetry() {
764+
t := s.T()
765+
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
766+
defer cancel()
767+
activityID := testcore.RandomizeStr(t.Name())
768+
taskQueue := testcore.RandomizeStr(t.Name())
769+
770+
// Start an activity
771+
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
772+
Namespace: s.Namespace().String(),
773+
ActivityId: activityID,
774+
ActivityType: &commonpb.ActivityType{
775+
Name: "test-activity-type",
776+
},
777+
Options: &activitypb.ActivityOptions{
778+
TaskQueue: &taskqueuepb.TaskQueue{
779+
Name: taskQueue,
780+
},
781+
// It's not possible to guarantee (e.g. via NextRetryDelay or RetryPolicy) that a retry
782+
// will start with a delay <1s because of the use of TimerProcessorMaxTimeShift in the
783+
// timer queue. Therefore we allow 1s for the ActivityDispatchTask to be executed, and
784+
// time out the activity 1s into Attempt 2.
785+
ScheduleToCloseTimeout: durationpb.New(2 * time.Second),
786+
},
787+
})
788+
require.NoError(t, err)
789+
790+
// Fail attempt 1, causing the attempt counter to increment.
791+
pollTaskResp, err := s.pollActivityTaskQueue(ctx, taskQueue)
792+
require.NoError(t, err)
793+
_, err = s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{
794+
Namespace: s.Namespace().String(),
795+
TaskToken: pollTaskResp.TaskToken,
796+
Failure: &failurepb.Failure{
797+
Message: "Retryable failure",
798+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
799+
NonRetryable: false,
800+
NextRetryDelay: durationpb.New(1 * time.Second),
801+
}},
802+
},
803+
})
804+
require.NoError(t, err)
805+
_, err = s.pollActivityTaskQueue(ctx, taskQueue)
806+
require.NoError(t, err)
807+
808+
// Wait for schedule-to-close timeout.
809+
pollResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{
810+
Namespace: s.Namespace().String(),
811+
ActivityId: activityID,
812+
RunId: startResp.RunId,
813+
IncludeInfo: true,
814+
IncludeOutcome: true,
815+
WaitPolicy: &workflowservice.PollActivityExecutionRequest_WaitCompletion{
816+
WaitCompletion: &workflowservice.PollActivityExecutionRequest_CompletionWaitOptions{},
817+
},
818+
})
819+
require.NoError(t, err)
820+
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus())
821+
require.Equal(t, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType())
822+
}
823+
763824
// TestStartToCloseTimeout tests that a start-to-close timeout is recorded after the activity is
764825
// started. It also verifies that PollActivityExecution can be used to poll for a TimedOut state
765826
// change caused by execution of a timer task.
@@ -865,11 +926,6 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() {
865926
"expected StartToCloseTimeout but is %s", pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType())
866927
}
867928

868-
func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout() {
869-
// TODO implement when we have PollActivityExecution. Make sure we check the attempt vs. outcome failure population.
870-
s.T().Skip("Temporarily disabled")
871-
}
872-
873929
func (s *standaloneActivityTestSuite) TestPollActivityExecution_NoWait() {
874930
t := s.T()
875931
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)

0 commit comments

Comments
 (0)