Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions chasm/lib/activity/activity_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor
}

func (e *scheduleToCloseTimeoutTaskExecutor) Validate(
ctx chasm.Context,
_ chasm.Context,
activity *Activity,
_ chasm.TaskAttributes,
task *activitypb.ScheduleToCloseTimeoutTask,
_ *activitypb.ScheduleToCloseTimeoutTask,
) (bool, error) {
attempt, err := activity.LastAttempt.Get(ctx)
if err != nil {
return false, err
}

valid := TransitionTimedOut.Possible(activity) && task.Attempt == attempt.Count
return valid, nil
return TransitionTimedOut.Possible(activity), nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bug fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you're right since there could be multiple retry attempts within a scheduleToClose tasks. Good catch.

}

func (e *scheduleToCloseTimeoutTaskExecutor) Execute(
Expand Down
17 changes: 3 additions & 14 deletions chasm/lib/activity/gen/activitypb/v1/tasks.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions chasm/lib/activity/proto/v1/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ message ScheduleToStartTimeoutTask {
}

message ScheduleToCloseTimeoutTask {
Copy link
Contributor

@fretz12 fretz12 Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can remove this altogether now. In place of the interface arg you can use _ any

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it because we will have validation later when we support activity resets.

// The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we
// need to guard against any concurrent operations where the originally intended task may be outdated.
int32 attempt = 1;
}

message StartToCloseTimeoutTask {
Expand Down
4 changes: 1 addition & 3 deletions chasm/lib/activity/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ var TransitionScheduled = chasm.NewTransition(
chasm.TaskAttributes{
ScheduledTime: currentTime.Add(timeout),
},
&activitypb.ScheduleToCloseTimeoutTask{
Attempt: attempt.GetCount(),
})
&activitypb.ScheduleToCloseTimeoutTask{})
}

ctx.AddTask(
Expand Down
66 changes: 61 additions & 5 deletions tests/standalone_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,67 @@ func (s *standaloneActivityTestSuite) TestCompletedActivity_CannotTerminate() {
require.Error(t, err)
}

func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout_WithRetry() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()
activityID := testcore.RandomizeStr(t.Name())
taskQueue := testcore.RandomizeStr(t.Name())

// Start an activity
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
ActivityType: &commonpb.ActivityType{
Name: "test-activity-type",
},
Options: &activitypb.ActivityOptions{
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
},
// It's not possible to guarantee (e.g. via NextRetryDelay or RetryPolicy) that a retry
// will start with a delay <1s because of the use of TimerProcessorMaxTimeShift in the
// timer queue. Therefore we allow 1s for the ActivityDispatchTask to be executed, and
// time out the activity 1s into Attempt 2.
ScheduleToCloseTimeout: durationpb.New(2 * time.Second),
},
})
require.NoError(t, err)

// Fail attempt 1, causing the attempt counter to increment.
pollTaskResp, err := s.pollActivityTaskQueue(ctx, taskQueue)
require.NoError(t, err)
_, err = s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{
Namespace: s.Namespace().String(),
TaskToken: pollTaskResp.TaskToken,
Failure: &failurepb.Failure{
Message: "Retryable failure",
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
NonRetryable: false,
NextRetryDelay: durationpb.New(1 * time.Second),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on timing, this may prevent the schedule to close timeout from firing because we would know there's not enough time for the next attempt and avoid scheduling it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test for this behavior if we don't yet.

}},
},
})
require.NoError(t, err)
_, err = s.pollActivityTaskQueue(ctx, taskQueue)
require.NoError(t, err)

// Wait for schedule-to-close timeout.
pollResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
RunId: startResp.RunId,
IncludeInfo: true,
IncludeOutcome: true,
WaitPolicy: &workflowservice.PollActivityExecutionRequest_WaitCompletion{
WaitCompletion: &workflowservice.PollActivityExecutionRequest_CompletionWaitOptions{},
},
})
require.NoError(t, err)
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus())
require.Equal(t, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType())
}

// TestStartToCloseTimeout tests that a start-to-close timeout is recorded after the activity is
// started. It also verifies that PollActivityExecution can be used to poll for a TimedOut state
// change caused by execution of a timer task.
Expand Down Expand Up @@ -865,11 +926,6 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() {
"expected StartToCloseTimeout but is %s", pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType())
}

func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout() {
// TODO implement when we have PollActivityExecution. Make sure we check the attempt vs. outcome failure population.
s.T().Skip("Temporarily disabled")
}

func (s *standaloneActivityTestSuite) TestPollActivityExecution_NoWait() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
Expand Down
Loading