Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
f8ccafb
PollComponent and PollActivityExecution
dandavison Nov 12, 2025
46eb18e
Changes from @fretz12's review
dandavison Nov 13, 2025
11cac65
Edit docstrings
dandavison Nov 13, 2025
11c6b59
Non-error response on long-poll timeout
dandavison Nov 13, 2025
b24a022
Remove unused opts parameter of PollComponent
dandavison Nov 13, 2025
0db9858
Error on runID mismatch
dandavison Nov 14, 2025
c888969
Revert comment
dandavison Nov 14, 2025
5740320
Use framework-provided predicate to wait for any state change
dandavison Nov 14, 2025
1d27df4
Address code review comments
dandavison Nov 14, 2025
1107a99
Use component last updated VT
dandavison Nov 14, 2025
2ee2b75
Cleanup
dandavison Nov 15, 2025
ea12fcc
Refactor
dandavison Nov 15, 2025
5c2c41e
Refcator: combine handlers
dandavison Nov 15, 2025
c02c869
Rename: ComponentStateToken
dandavison Nov 15, 2025
e1a52d4
entity -> execution
dandavison Nov 15, 2025
24ef87c
Rename: ref.componentVT
dandavison Nov 15, 2025
5412f7e
Subscribe to notifications at component level, not execution level
dandavison Nov 15, 2025
612426f
Revert "Subscribe to notifications at component level, not execution …
dandavison Nov 17, 2025
7958efd
Add non-functional test
dandavison Nov 17, 2025
3904355
Add tests of cross-component subscribe/notify
dandavison Nov 17, 2025
00ef0ff
Rename: ComponentStateChanged
dandavison Nov 17, 2025
da60095
Revert to using ref as long-poll token
dandavison Nov 18, 2025
6d5fc28
Rename: ChasmExecutionNotification
dandavison Nov 18, 2025
a604d80
Functional test env
dandavison Nov 19, 2025
6917450
DEBUG
dandavison Nov 18, 2025
27aafa7
TestStartToClose
dandavison Nov 18, 2025
c623906
ScheduleToStart timeout
dandavison Nov 19, 2025
e05af5a
Revert "ScheduleToStart timeout"
dandavison Nov 19, 2025
6bd6d57
Evolve test
dandavison Nov 19, 2025
f325df8
Evolve test
dandavison Nov 19, 2025
099e6da
Fix expectation
dandavison Nov 19, 2025
7b23898
Revert "DEBUG"
dandavison Nov 19, 2025
b120e9c
PS
dandavison Nov 19, 2025
7e67862
Fix timeout code
dandavison Nov 20, 2025
055044f
Update test
dandavison Nov 20, 2025
affc857
TEMP: eliminate buffer
dandavison Nov 20, 2025
fa4e0ea
PS
dandavison Nov 20, 2025
f6c5290
WIP: transactionImpl
dandavison Nov 18, 2025
b540eb0
Revert "WIP: transactionImpl"
dandavison Nov 20, 2025
d1045de
Fix test
dandavison Nov 21, 2025
f48a2b8
Do not notify from UpdateComponent
dandavison Nov 20, 2025
c98c8f9
Wire through chasm notification to history engine
dandavison Nov 20, 2025
18159b6
Change signature
dandavison Nov 20, 2025
c4a0f1a
Send notification from MS transaction layer
dandavison Nov 20, 2025
310ae97
Update test
dandavison Nov 20, 2025
4ca1d33
Don't send ref in chasm execution notification
dandavison Nov 20, 2025
ccd63a4
Cleanup
dandavison Nov 20, 2025
7178c01
Test
dandavison Nov 20, 2025
a42fb44
Fix IncludeOutcome
dandavison Nov 20, 2025
c0ed2df
Test
dandavison Nov 20, 2025
5bd95d7
Revert accidental change
dandavison Nov 20, 2025
ecf4139
Cleanup
dandavison Nov 21, 2025
74e0a69
Update mock
dandavison Nov 21, 2025
e904d87
Rewrite chasm notifier
dandavison Nov 21, 2025
5b8a8be
subscribe outside
dandavison Nov 21, 2025
88e153d
Resubscribe
dandavison Nov 21, 2025
a500aa0
Fix mock
dandavison Nov 21, 2025
23fe864
PS
dandavison Nov 21, 2025
04a462d
Update test: becomes satisfied on 3rd state transition
dandavison Nov 21, 2025
e22f108
Fix test
dandavison Nov 21, 2025
ff48584
Increment in real Update
dandavison Nov 22, 2025
a84784c
DoAndReturn -> Return
dandavison Nov 22, 2025
3485fb9
Return constant empty workflow data
dandavison Nov 22, 2025
2f0b863
Times(1)
dandavison Nov 22, 2025
0fe23be
Add missing mock call expectation
dandavison Nov 22, 2025
8a410b6
Do Updates synchronously in main goroutine
dandavison Nov 22, 2025
8e83fb0
Notify on both MS snapshot and mutation
dandavison Nov 22, 2025
d8d7721
Don't resubscribe unless necessary; code golf
dandavison Nov 22, 2025
65b83a0
ungolf
dandavison Nov 23, 2025
8dbd866
Cleanup
dandavison Nov 22, 2025
f34f222
- PS
dandavison Nov 23, 2025
4146871
Partial revert 27aafa7cc2a91d8f24706b4ebf327b1d935a5f97
dandavison Nov 23, 2025
ddff1b5
Cleanup test
dandavison Nov 24, 2025
dc248f4
Delete cross-component polling tests
dandavison Nov 24, 2025
1b46b23
Use testcore.RandomizeStr(t.Name())
dandavison Nov 24, 2025
c7f4ab0
Check RunID
dandavison Nov 24, 2025
270c13b
code golf
dandavison Nov 24, 2025
fbc77f3
Revert "- PS"
dandavison Nov 24, 2025
e3d74a7
monotonicPredicateFn
dandavison Nov 24, 2025
1f169fb
Tests
dandavison Nov 24, 2025
2f10ee2
New deadline logic
dandavison Nov 24, 2025
0dda6cc
Validation & error messages fixes
dandavison Nov 24, 2025
d3c709c
Fix CHASM not found errors
dandavison Nov 25, 2025
1e5ca51
Test Outcome
dandavison Nov 25, 2025
36fb610
Evove tests
dandavison Nov 25, 2025
e5d900b
Move StaleState test to unit tests
dandavison Nov 25, 2025
07cc8cf
Rename
dandavison Nov 25, 2025
b7e5c98
Cleanup
dandavison Nov 25, 2025
5cb1637
Revert addition of new metrics
dandavison Nov 25, 2025
561b090
Subscribe does not return an error
dandavison Nov 25, 2025
8087fa5
Fix memory leak
dandavison Nov 25, 2025
e4dd35d
Clean up, fix validation
dandavison Nov 25, 2025
1e43970
Test absent RunID
dandavison Nov 25, 2025
1ad72b5
Don't return unused ref
dandavison Nov 25, 2025
d19eee6
Complete activity implementation
fretz12 Nov 25, 2025
ccba5bc
Resolve remaining differences with poll-component
fretz12 Nov 26, 2025
bae540d
Addressed PR comments.
fretz12 Nov 26, 2025
de5e4f2
Terminate activity implementation
fretz12 Nov 26, 2025
c5a62cf
Cancel activity implementation
fretz12 Nov 26, 2025
a42eeaf
Fix lint error.
fretz12 Nov 26, 2025
1225ccc
Added handling of business ID policy. Refactored standalone activity …
fretz12 Nov 26, 2025
a8893ff
Address PR comments.
fretz12 Nov 30, 2025
d7de3f8
Fix lint errors.
fretz12 Dec 1, 2025
d7a4915
Remove redundant error assertion.
fretz12 Dec 1, 2025
b8aea10
Merge standalone-activity
fretz12 Dec 5, 2025
e7d47e5
Revert lint auto fixes.
fretz12 Dec 5, 2025
5d43bb7
Regen mock.
fretz12 Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions chasm/lib/activity/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,29 @@ package activity
import (
"context"
"errors"
"fmt"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common/contextutil"
)

var (
businessIDReusePolicyMap = map[enumspb.ActivityIdReusePolicy]chasm.BusinessIDReusePolicy{
enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE: chasm.BusinessIDReusePolicyAllowDuplicate,
enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY: chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly,
enumspb.ACTIVITY_ID_REUSE_POLICY_REJECT_DUPLICATE: chasm.BusinessIDReusePolicyRejectDuplicate,
}

// TODO this will change once we rebase on main
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement use existing already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see BusinessIDConflictPolicyUseExisting on main, but doesn't look like it's on our feature branch until we rebase.

businessIDConflictPolicyMap = map[enumspb.ActivityIdConflictPolicy]chasm.BusinessIDConflictPolicy{
enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL: chasm.BusinessIDConflictPolicyFail,
}
)

type handler struct {
activitypb.UnimplementedActivityServiceServer
config *Config
Expand All @@ -23,6 +38,18 @@ func newHandler(config *Config) *handler {
}

func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.StartActivityExecutionRequest) (*activitypb.StartActivityExecutionResponse, error) {
frontendReq := req.GetFrontendRequest()

reusePolicy, ok := businessIDReusePolicyMap[frontendReq.GetIdReusePolicy()]
if !ok {
return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("unsupported ID reuse policy: %v", frontendReq.GetIdReusePolicy()))
}

conflictPolicy, ok := businessIDConflictPolicyMap[frontendReq.GetIdConflictPolicy()]
if !ok {
return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("unsupported ID conflict policy: %v", frontendReq.GetIdConflictPolicy()))
}

response, key, _, err := chasm.NewEntity(
ctx,
chasm.EntityKey{
Expand All @@ -47,6 +74,7 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St
},
req.GetFrontendRequest(),
chasm.WithRequestID(req.GetFrontendRequest().GetRequestId()),
chasm.WithBusinessIDPolicy(reusePolicy, conflictPolicy),
)

if err != nil {
Expand Down
23 changes: 19 additions & 4 deletions chasm/lib/activity/validator.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package activity

import (
"fmt"

"github.com/google/uuid"
activitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -43,7 +42,7 @@ func ValidateAndNormalizeActivityAttributes(
runTimeout *durationpb.Duration,
) error {
if err := tqid.NormalizeAndValidate(options.TaskQueue, "", maxIDLengthLimit); err != nil {
return fmt.Errorf("invalid TaskQueue: %w. ActivityId=%s ActivityType=%s", err, activityID, activityType)
return err
}

if activityID == "" {
Expand All @@ -54,7 +53,7 @@ func ValidateAndNormalizeActivityAttributes(
}

if err := validateActivityRetryPolicy(namespaceID, options.RetryPolicy, getDefaultActivityRetrySettings); err != nil {
return fmt.Errorf("invalid ActivityRetryPolicy: %w. ActivityId=%s ActivityType=%s", err, activityID, activityType)
return err
}

if len(activityID) > maxIDLengthLimit {
Expand Down Expand Up @@ -182,6 +181,10 @@ func validateAndNormalizeStartActivityExecutionRequest(
return serviceerror.NewInvalidArgument("RequestID length exceeds limit.")
}

if err := normalizeAndValidateIDPolicy(req); err != nil {
return err
}

if err := validateInputSize(
req.GetActivityId(),
req.GetActivityType().GetName(),
Expand All @@ -204,6 +207,18 @@ func validateAndNormalizeStartActivityExecutionRequest(
return nil
}

func normalizeAndValidateIDPolicy(req *workflowservice.StartActivityExecutionRequest) error {
if req.GetIdReusePolicy() == enumspb.ACTIVITY_ID_REUSE_POLICY_UNSPECIFIED {
req.IdReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE
}

if req.GetIdConflictPolicy() == enumspb.ACTIVITY_ID_CONFLICT_POLICY_UNSPECIFIED {
req.IdConflictPolicy = enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL
}

return nil
}

func validateInputSize(
activityID string,
blobSizeViolationTagValue string,
Expand Down
36 changes: 32 additions & 4 deletions chasm/lib/activity/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/stretchr/testify/require"
activitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -209,7 +211,8 @@ func TestValidateFailures(t *testing.T) {
tc.options,
tc.priority,
durationpb.New(0))
require.Error(t, err)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
})
}
}
Expand All @@ -231,7 +234,8 @@ func TestValidateStandAloneRequestIDTooLong(t *testing.T) {
log.NewNoopLogger(),
defaultMaxIDLengthLimit,
nil)
require.Error(t, err)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
}

func TestValidateStandAloneInputTooLarge(t *testing.T) {
Expand All @@ -251,7 +255,8 @@ func TestValidateStandAloneInputTooLarge(t *testing.T) {
log.NewNoopLogger(),
defaultMaxIDLengthLimit,
nil)
require.Error(t, err)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
}

func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) {
Expand All @@ -277,6 +282,28 @@ func TestValidateStandAloneInputWarningSizeShouldSucceed(t *testing.T) {
require.NoError(t, err)
}

func TestValidateStandAlone_IDPolicyShouldDefault(t *testing.T) {
req := &workflowservice.StartActivityExecutionRequest{
ActivityId: defaultActivityID,
ActivityType: &commonpb.ActivityType{Name: defaultActivityType},
Options: &defaultActivityOptions,
Namespace: "default",
RequestId: "test-request-id",
}

err := validateAndNormalizeStartActivityExecutionRequest(
req,
defaultBlobSizeLimitError,
defaultBlobSizeLimitWarn,
log.NewNoopLogger(),
defaultMaxIDLengthLimit,
nil)

require.NoError(t, err)
require.Equal(t, enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE, req.IdReusePolicy)
require.Equal(t, enumspb.ACTIVITY_ID_CONFLICT_POLICY_FAIL, req.IdConflictPolicy)
}

func TestModifiedActivityTimeouts(t *testing.T) {
cases := []struct {
name string
Expand Down Expand Up @@ -406,7 +433,8 @@ func TestModifiedActivityTimeouts(t *testing.T) {
tc.runTimeout)

if tc.isErr {
require.Error(t, err)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
return
}

Expand Down
109 changes: 109 additions & 0 deletions tests/standalone_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,115 @@ func (s *standaloneActivityTestSuite) SetupTest() {
s.tv = testvars.New(s.T())
}

func (s *standaloneActivityTestSuite) TestIDReusePolicy_RejectDuplicate() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()

activityID := s.tv.ActivityID()
taskQueue := s.tv.TaskQueue().String()

startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue)
runID := startResp.RunId

pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID)

_, err := s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{
Namespace: s.Namespace().String(),
TaskToken: pollTaskResp.TaskToken,
Result: defaultResult,
Identity: "new-worker",
})
require.NoError(t, err)

s.validateCompletion(ctx, t, activityID, runID, "new-worker")

_, err = s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
ActivityType: s.tv.ActivityType(),
Identity: s.tv.WorkerIdentity(),
Input: defaultInput,
Options: &activitypb.ActivityOptions{
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
},
StartToCloseTimeout: durationpb.New(1 * time.Minute),
},
IdReusePolicy: enumspb.ACTIVITY_ID_REUSE_POLICY_REJECT_DUPLICATE,
})
require.Error(t, err)
}

func (s *standaloneActivityTestSuite) TestIDReusePolicy_AllowDuplicateFailedOnly() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()

activityID := s.tv.ActivityID()
taskQueue := s.tv.TaskQueue().String()

startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue)
runID := startResp.RunId

pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID)

_, err := s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{
Namespace: s.Namespace().String(),
TaskToken: pollTaskResp.TaskToken,
Failure: defaultFailure,
Identity: "new-worker",
})
require.NoError(t, err)

s.validateFailure(ctx, t, activityID, runID, nil, "new-worker")

_, err = s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
ActivityType: s.tv.ActivityType(),
Identity: s.tv.WorkerIdentity(),
Input: defaultInput,
Options: &activitypb.ActivityOptions{
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
},
StartToCloseTimeout: durationpb.New(1 * time.Minute),
},
IdReusePolicy: enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
})
require.NoError(t, err)
}

func (s *standaloneActivityTestSuite) TestIDConflictPolicy_FailsIfExists() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()

activityID := s.tv.ActivityID()
taskQueue := s.tv.TaskQueue().String()

s.startAndValidateActivity(ctx, t, activityID, taskQueue)

// By default, unspecified conflict policy should be set to ACTIVITY_ID_CONFLICT_POLICY_FAIL, so no need to set explicitly
_, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
Namespace: s.Namespace().String(),
ActivityId: activityID,
ActivityType: s.tv.ActivityType(),
Identity: s.tv.WorkerIdentity(),
Input: defaultInput,
Options: &activitypb.ActivityOptions{
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
},
StartToCloseTimeout: durationpb.New(1 * time.Minute),
},
})
require.Error(t, err)
}

// TODO(fred): add test for BusinessIDConflictPolicyUseExisting after rebasing on main

func (s *standaloneActivityTestSuite) TestActivityCompleted() {
t := s.T()
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
Expand Down
Loading