Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 81 additions & 23 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -54,6 +55,12 @@ type Activity struct {
Store chasm.Field[ActivityStore]
}

// WithToken wraps a request with its deserialized task token.
type WithToken[R any] struct {
Token *tokenspb.Task
Request R
}

func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch a.Status {
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
Expand Down Expand Up @@ -134,25 +141,26 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) (
*historyservice.RecordActivityTaskStartedResponse, error,
) {
if err := TransitionStarted.Apply(a, ctx, nil); err != nil {
return nil, err
}

attempt := a.LastAttempt.Get(ctx)
attempt.StartedTime = timestamppb.New(ctx.Now(a))
attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()

if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil {
attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
BuildId: versionDirective.GetBuildId(),
DeploymentName: versionDirective.GetDeploymentName(),
}
}

if err := TransitionStarted.Apply(a, ctx, nil); err != nil {
return nil, err
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The movement here was required to fix a bug that I encountered when implementing the other proposed design. We needed attempt.StartedTime to be available in TransitionStarted so that we could use it in scheduling the first heartbeat task, as well as the base for the start-to-close timeout. Before this PR we were using an ad-hoc ctx.Now() that was close to but not equal to StartedTime.

Reverting it doesn't cause a test failure with the hreartbeating design in this PR, but it's better to set the field state before calling the transition function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverting this would now cause a test failure since the PR is back to design A.

response := &historyservice.RecordActivityTaskStartedResponse{}
err := a.StoreOrSelf(ctx).PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response)
return response, err
}

// PopulateRecordStartedResponse populates the response for HandleStarted.
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error {
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
if lastHeartbeat != nil {
Expand Down Expand Up @@ -183,15 +191,22 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.Ex
return nil
}

// RecordCompleted applies the provided function to record activity completion.
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
return applyFn(ctx)
}

// HandleCompleted updates the activity on activity completion.
func (a *Activity) HandleCompleted(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) (
*historyservice.RespondActivityTaskCompletedResponse, error,
) {
if err := TransitionCompleted.Apply(a, ctx, request); err != nil {
func (a *Activity) HandleCompleted(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCompletedRequest],
) (*historyservice.RespondActivityTaskCompletedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
return nil, err
}

if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil {
return nil, err
}

Expand All @@ -200,10 +215,16 @@ func (a *Activity) HandleCompleted(ctx chasm.MutableContext, request *historyser

// HandleFailed updates the activity on activity failure. if the activity is retryable, it will be rescheduled
// for retry instead.
func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) (
*historyservice.RespondActivityTaskFailedResponse, error,
) {
failure := req.GetFailedRequest().GetFailure()
func (a *Activity) HandleFailed(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskFailedRequest],
) (*historyservice.RespondActivityTaskFailedResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
return nil, err
}

failure := input.Request.GetFailedRequest().GetFailure()

shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure)
if err != nil {
Expand All @@ -222,18 +243,24 @@ func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.Re
}

// No more retries, transition to failed state
if err := TransitionFailed.Apply(a, ctx, req); err != nil {
if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskFailedResponse{}, nil
}

// HandleCanceled updates the activity on activity canceled.
func (a *Activity) HandleCanceled(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCanceledRequest) (
*historyservice.RespondActivityTaskCanceledResponse, error,
) {
if err := TransitionCanceled.Apply(a, ctx, request.GetCancelRequest().GetDetails()); err != nil {
func (a *Activity) HandleCanceled(
ctx chasm.MutableContext,
input WithToken[*historyservice.RespondActivityTaskCanceledRequest],
) (*historyservice.RespondActivityTaskCanceledResponse, error) {
// TODO(dan): add test coverage for this validation
if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil {
return nil, err
}

if err := TransitionCanceled.Apply(a, ctx, input.Request.GetCancelRequest().GetDetails()); err != nil {
return nil, err
}

Expand All @@ -250,9 +277,9 @@ func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.Te
return &activitypb.TerminateActivityExecutionResponse{}, nil
}

// getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
// getOrCreateLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
// to avoid unnecessary writes when heartbeats are not used.
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
heartbeat, ok := a.LastHeartbeat.TryGet(ctx)
if !ok {
heartbeat = &activitypb.ActivityHeartbeatState{}
Expand Down Expand Up @@ -417,12 +444,43 @@ func createStartToCloseTimeoutFailure() *failurepb.Failure {
}
}

func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.Payloads) (chasm.NoValue, error) {
func createHeartbeatTimeoutFailure() *failurepb.Failure {
return &failurepb.Failure{
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, enumspb.TIMEOUT_TYPE_HEARTBEAT.String()),
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
},
},
}
}

// RecordHeartbeat records a heartbeat for the activity.
func (a *Activity) RecordHeartbeat(
ctx chasm.MutableContext,
input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest],
) (*historyservice.RecordActivityTaskHeartbeatResponse, error) {
err := ValidateActivityTaskToken(ctx, a, input.Token)
if err != nil {
return nil, err
}
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bergundy is there any performance pentalty if we create a new field on every hearbeat?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, no penalty.

RecordedTime: timestamppb.New(ctx.Now(a)),
Details: details,
Details: input.Request.GetHeartbeatRequest().GetDetails(),
})
return nil, nil
ctx.AddTask(
a,
chasm.TaskAttributes{
ScheduledTime: ctx.Now(a).Add(a.GetHeartbeatTimeout().AsDuration()),
},
&activitypb.HeartbeatTimeoutTask{
Attempt: a.LastAttempt.Get(ctx).GetCount(),
},
)
return &historyservice.RecordActivityTaskHeartbeatResponse{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just before returning here you want to generate a new heartbeat task if the heartbeat timeout is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See reply above; I currently still think the proposed design is preferable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done now -- switched back to your design.

CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
// TODO(dan): ActivityPaused, ActivityReset
}, nil
}

func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) {
Expand Down
70 changes: 70 additions & 0 deletions chasm/lib/activity/activity_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/util"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -146,3 +147,72 @@ func (e *startToCloseTimeoutTaskExecutor) Execute(
// Reached maximum attempts, timeout the activity
return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_START_TO_CLOSE)
}

// HeartbeatTimeoutTask is a pure task that enforces heartbeat timeouts.
type heartbeatTimeoutTaskExecutor struct{}

func newHeartbeatTimeoutTaskExecutor() *heartbeatTimeoutTaskExecutor {
return &heartbeatTimeoutTaskExecutor{}
}

// Validate validates a HeartbeatTimeoutTask.
func (e *heartbeatTimeoutTaskExecutor) Validate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will want to validate that the schedule time in the attributes is still relevant here not in the execute function. It should deterministic function of the last heartbeat time and the attempt start time (hbDeadline below should be equal to the schedule time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See reply below

ctx chasm.Context,
activity *Activity,
taskAttrs chasm.TaskAttributes,
task *activitypb.HeartbeatTimeoutTask,
) (bool, error) {
// Let T = user-configured heartbeat timeout and let hb_i be the time of the ith user-submitted
// heartbeat request. (hb_0 = 0 since we always start a timer task when an attempt starts).

// There are two concurrent sequences of events:
// 1. A worker is sending heartbeats at times hb_i.
// 2. This task is being executed at (shortly after) times hb_i + T.

// On the i-th execution of this function, we look back into the past and determine whether the
// last heartbeat was received after hb_i. If so, we reject this timeout task. Otherwise, the
// Execute function runs and we fail the attempt.
if activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
return false, nil
}
// Task attempt must still match current attempt.
attempt := activity.LastAttempt.Get(ctx)
if attempt.GetCount() != task.Attempt {
return false, nil
}

// Must not have been a heartbeat since this task was created
hbTimeout := activity.GetHeartbeatTimeout().AsDuration() // T
attemptStartTime := attempt.GetStartedTime().AsTime()
lastHb, _ := activity.LastHeartbeat.TryGet(ctx) // could be nil, or from a previous attempt
// No hbs in attempt so far is equivalent to hb having been sent at attempt start time.
lastHbTime := util.MaxTime(lastHb.GetRecordedTime().AsTime(), attemptStartTime)
thisTaskHbTime := taskAttrs.ScheduledTime.Add(-hbTimeout) // hb_i
if lastHbTime.After(thisTaskHbTime) {
// another heartbeat has invalidated this task's heartbeat
return false, nil
}
return true, nil
}

// Execute executes a HeartbeatTimeoutTask. It fails the attempt due to heartbeat timeout, leading
// to retry or activity failure.
func (e *heartbeatTimeoutTaskExecutor) Execute(
ctx chasm.MutableContext,
activity *Activity,
_ chasm.TaskAttributes,
_ *activitypb.HeartbeatTimeoutTask,
) error {
shouldRetry, retryInterval, err := activity.shouldRetry(ctx, 0)
if err != nil {
return err
}
if shouldRetry {
return TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{
retryInterval: retryInterval,
failure: createHeartbeatTimeoutFailure(),
})
}
return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_HEARTBEAT)
}
1 change: 1 addition & 0 deletions chasm/lib/activity/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var HistoryModule = fx.Module(
newScheduleToStartTimeoutTaskExecutor,
newScheduleToCloseTimeoutTaskExecutor,
newStartToCloseTimeoutTaskExecutor,
newHeartbeatTimeoutTaskExecutor,
newHandler,
newLibrary,
),
Expand Down
37 changes: 37 additions & 0 deletions chasm/lib/activity/gen/activitypb/v1/tasks.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading