diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index e857b6bdae..204f4708dd 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -3,7 +3,9 @@ package activity import ( "context" "errors" + "fmt" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/chasm" @@ -11,6 +13,19 @@ import ( "go.temporal.io/server/common/contextutil" ) +var ( + businessIDReusePolicyMap = map[enumspb.ActivityIdReusePolicy]chasm.BusinessIDReusePolicy{ + enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE: chasm.BusinessIDReusePolicyAllowDuplicate, + enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY: chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly, + enumspb.ACTIVITY_ID_REUSE_POLICY_REJECT_DUPLICATE: chasm.BusinessIDReusePolicyRejectDuplicate, + } + + // TODO this will change once we rebase on main + businessIDConflictPolicyMap = map[enumspb.ActivityIdConflictPolicy]chasm.BusinessIDConflictPolicy{ + enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL: chasm.BusinessIDConflictPolicyFail, + } +) + type handler struct { activitypb.UnimplementedActivityServiceServer config *Config @@ -23,6 +38,18 @@ func newHandler(config *Config) *handler { } func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.StartActivityExecutionRequest) (*activitypb.StartActivityExecutionResponse, error) { + frontendReq := req.GetFrontendRequest() + + reusePolicy, ok := businessIDReusePolicyMap[frontendReq.GetIdReusePolicy()] + if !ok { + return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("unsupported ID reuse policy: %v", frontendReq.GetIdReusePolicy())) + } + + conflictPolicy, ok := businessIDConflictPolicyMap[frontendReq.GetIdConflictPolicy()] + if !ok { + return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("unsupported ID conflict policy: %v", frontendReq.GetIdConflictPolicy())) + } + response, key, _, err := chasm.NewEntity( ctx, chasm.EntityKey{ @@ -47,6 +74,7 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St }, req.GetFrontendRequest(), chasm.WithRequestID(req.GetFrontendRequest().GetRequestId()), + chasm.WithBusinessIDPolicy(reusePolicy, conflictPolicy), ) if err != nil { diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 8488ac2f40..7069272bd4 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -1,11 +1,10 @@ package activity import ( - "fmt" - "github.com/google/uuid" activitypb "go.temporal.io/api/activity/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/common" @@ -43,7 +42,7 @@ func ValidateAndNormalizeActivityAttributes( runTimeout *durationpb.Duration, ) error { if err := tqid.NormalizeAndValidate(options.TaskQueue, "", maxIDLengthLimit); err != nil { - return fmt.Errorf("invalid TaskQueue: %w. ActivityId=%s ActivityType=%s", err, activityID, activityType) + return err } if activityID == "" { @@ -54,7 +53,7 @@ func ValidateAndNormalizeActivityAttributes( } if err := validateActivityRetryPolicy(namespaceID, options.RetryPolicy, getDefaultActivityRetrySettings); err != nil { - return fmt.Errorf("invalid ActivityRetryPolicy: %w. ActivityId=%s ActivityType=%s", err, activityID, activityType) + return err } if len(activityID) > maxIDLengthLimit { @@ -182,6 +181,10 @@ func validateAndNormalizeStartActivityExecutionRequest( return serviceerror.NewInvalidArgument("RequestID length exceeds limit.") } + if err := normalizeAndValidateIDPolicy(req); err != nil { + return err + } + if err := validateInputSize( req.GetActivityId(), req.GetActivityType().GetName(), @@ -204,6 +207,18 @@ func validateAndNormalizeStartActivityExecutionRequest( return nil } +func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error { + if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED { + req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE + } + + if req.GetIdConflictPolicy() == enumspb.ACTIVITY_ID_CONFLICT_POLICY_UNSPECIFIED { + req.IdConflictPolicy = enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL + } + + return nil +} + func validateInputSize( activityID string, blobSizeViolationTagValue string, diff --git a/chasm/lib/activity/validator_test.go b/chasm/lib/activity/validator_test.go index 8a23badca4..4453ce3d1d 100644 --- a/chasm/lib/activity/validator_test.go +++ b/chasm/lib/activity/validator_test.go @@ -7,6 +7,8 @@ import ( "github.com/stretchr/testify/require" activitypb "go.temporal.io/api/activity/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/common/dynamicconfig" @@ -209,7 +211,8 @@ func TestValidateFailures(t *testing.T) { tc.options, tc.priority, durationpb.New(0)) - require.Error(t, err) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) }) } } @@ -231,7 +234,8 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) { log.NewNoopLogger(), defaultMaxIDLengthLimit, nil) - require.Error(t, err) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) } func TestValidateStandAloneInputTooLarge(t *testing.T) { @@ -251,7 +255,8 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) { log.NewNoopLogger(), defaultMaxIDLengthLimit, nil) - require.Error(t, err) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) } func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { @@ -277,6 +282,28 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) { require.NoError(t, err) } +func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) { + req := &workflowservice.StartActivityExecutionRequest{ + ActivityId: defaultActivityID, + ActivityType: &commonpb.ActivityType{Name: defaultActivityType}, + Options: &defaultActivityOptions, + Namespace: "default", + RequestId: "test-request-id", + } + + err := validateAndNormalizeStartActivityExecutionRequest( + req, + defaultBlobSizeLimitError, + defaultBlobSizeLimitWarn, + log.NewNoopLogger(), + defaultMaxIDLengthLimit, + nil) + + require.NoError(t, err) + require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy) + require.Equal(t, enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL, req.IdConflictPolicy) +} + func TestModifiedActivityTimeouts(t *testing.T) { cases := []struct { name string @@ -406,7 +433,8 @@ func TestModifiedActivityTimeouts(t *testing.T) { tc.runTimeout) if tc.isErr { - require.Error(t, err) + var invalidArgErr *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invalidArgErr) return } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 342ac238c6..c9cf20590b 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -62,6 +62,115 @@ func (s *standaloneActivityTestSuite) SetupTest() { s.tv = testvars.New(s.T()) } +func (s *standaloneActivityTestSuite) TestIDReusePolicy_RejectDuplicate() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Result: defaultResult, + Identity: "new-worker", + }) + require.NoError(t, err) + + s.validateCompletion(ctx, t, activityID, runID, "new-worker") + + _, err = s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + Options: &activitypb.ActivityOptions{ + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + }, + IdReusePolicy: enumspb.ACTIVITY_ID_REUSE_POLICY_REJECT_DUPLICATE, + }) + require.Error(t, err) +} + +func (s *standaloneActivityTestSuite) TestIDReusePolicy_AllowDuplicateFailedOnly() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Failure: defaultFailure, + Identity: "new-worker", + }) + require.NoError(t, err) + + s.validateFailure(ctx, t, activityID, runID, nil, "new-worker") + + _, err = s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + Options: &activitypb.ActivityOptions{ + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + }, + IdReusePolicy: enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, + }) + require.NoError(t, err) +} + +func (s *standaloneActivityTestSuite) TestIDConflictPolicy_FailsIfExists() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + s.startAndValidateActivity(ctx, t, activityID, taskQueue) + + // By default, unspecified conflict policy should be set to ACTIVITY_ID_CONFLICT_POLICY_FAIL, so no need to set explicitly + _, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: s.tv.ActivityType(), + Identity: s.tv.WorkerIdentity(), + Input: defaultInput, + Options: &activitypb.ActivityOptions{ + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + }, + }) + require.Error(t, err) +} + +// TODO(fred): add test for BusinessIDConflictPolicyUseExisting after rebasing on main + func (s *standaloneActivityTestSuite) TestActivityCompleted() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)