From a5d3f3308b9699910a8ba5ef7ad12dc975be51b2 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Fri, 27 Mar 2026 16:40:04 -0500 Subject: [PATCH 01/11] Add interleaved update replay reproducer Add a Java replay test that mirrors the Kotlin GreetingWorkflow sample and replays the provided workflow history fixture. Assert that replay fails with the embedded TMPRL1100 NonDeterministicException message so the reproducer stays pinned to the reported failure mode. --- ...GetVersionInterleavedUpdateReplayTest.java | 120 ++ ...VersionInterleavedUpdateReplayHistory.json | 1069 +++++++++++++++++ 2 files changed, 1189 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java create mode 100644 temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayHistory.json diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java new file mode 100644 index 000000000..fea296347 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -0,0 +1,120 @@ +package io.temporal.workflow.versionTests; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.junit.Test; +import org.slf4j.Logger; + +/** + * Mirrors app/src/main/kotlin/io/temporal/samples/update_nde/GreetingWorkflow.kt from + * gauravthadani/samples-kotlin and replays a history where update completions are interleaved with + * version markers. + */ +public class GetVersionInterleavedUpdateReplayTest { + private static final String HISTORY_RESOURCE = + "testGetVersionInterleavedUpdateReplayHistory.json"; + private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE = + "[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'"; + private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT = + "io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE; + + @Test + public void testReplayHistory() { + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> + WorkflowReplayer.replayWorkflowExecutionFromResource( + HISTORY_RESOURCE, GreetingWorkflowImpl.class)); + assertTrue(thrown.getMessage().contains(EXPECTED_NON_DETERMINISTIC_FRAGMENT)); + } + + public static class Request { + private final String name; + private final OffsetDateTime date; + + public Request(String name, OffsetDateTime date) { + this.name = name; + this.date = date; + } + + public String getName() { + return name; + } + + public OffsetDateTime getDate() { + return date; + } + } + + @WorkflowInterface + public interface GreetingWorkflow { + @WorkflowMethod + String greeting(String name); + + @UpdateMethod + String notify(String name); + } + + public static class GreetingWorkflowImpl implements GreetingWorkflow { + private final Logger logger = Workflow.getLogger(GreetingWorkflow.class); + + public GreetingWorkflowImpl() { + logger.info("Workflow is initialized"); + } + + private GreetingActivities getActivities() { + return Workflow.newActivityStub( + GreetingActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(30)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build()); + } + + @Override + public String greeting(String name) { + logger.info("Workflow started"); + + Workflow.getVersion("ChangeId1", 0, 1); + Workflow.getVersion("ChangeId2", 0, 1); + + Workflow.await(() -> false); + return getActivities().composeGreeting("hello", name); + } + + @Override + public String notify(String name) { + logger.info("Signal received: {}", name); + Workflow.sideEffect(UUID.class, UUID::randomUUID); + return "works"; + } + } + + public static class GreetingActivitiesImpl implements GreetingActivities { + @Override + public String composeGreeting(String greeting, String name) { + System.out.println("Greeting started: " + greeting); + return greeting + ", " + name + "!"; + } + } + + @ActivityInterface + public interface GreetingActivities { + @ActivityMethod(name = "greet") + String composeGreeting(String greeting, String name); + } +} diff --git a/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayHistory.json b/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayHistory.json new file mode 100644 index 000000000..950c0387d --- /dev/null +++ b/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayHistory.json @@ -0,0 +1,1069 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2026-02-23T06:56:17.252716209Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "version": "100265", + "taskId": "269777867", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "GreetingWorkflow" + }, + "taskQueue": { + "name": "HelloActivityTaskQueue", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjIwMjYtMDItMjNUMTc6NTY6MTUuNjU4Njk2KzExOjAwIg==" + } + ] + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "019c8948-d364-7ae9-8664-d048fecb96eb", + "identity": "85298@Gauravs-MacBook-Pro.local", + "firstExecutionRunId": "019c8948-d364-7ae9-8664-d048fecb96eb", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "WORKFLOW_ID_2bc8474d-11d3-47f4-a93d-90ac85c55d29" + } + }, + { + "eventId": "2", + "eventTime": "2026-02-23T06:56:17.252853556Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "version": "100265", + "taskId": "269777868", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "HelloActivityTaskQueue", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2026-02-23T06:56:40.930085120Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "version": "100265", + "taskId": "269777873", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "85431@Gauravs-MacBook-Pro.local", + "requestId": "e46ac299-489f-436e-bceb-1833e070408e", + "historySizeBytes": "388" + } + }, + { + "eventId": "4", + "eventTime": "2026-02-23T06:56:41.431966628Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "version": "100265", + "taskId": "269777877", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "85431@Gauravs-MacBook-Pro.local", + "workerVersion": {}, + "sdkMetadata": { + "langUsedFlags": [ + 1 + ], + "sdkName": "temporal-java", + "sdkVersion": "1.32.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2026-02-23T06:56:41.432163996Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777878", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "f91c4b0b-61ae-4767-81d0-b9938fbf6dea", + "acceptedRequestMessageId": "f91c4b0b-61ae-4767-81d0-b9938fbf6dea/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "f91c4b0b-61ae-4767-81d0-b9938fbf6dea", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDki" + } + ] + } + } + } + } + }, + { + "eventId": "6", + "eventTime": "2026-02-23T06:56:41.432259357Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777879", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjcyMjg3YWMxLWUyMTQtNGM4MC05OGVkLWY1YjNjZTA4Nzg5OCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "7", + "eventTime": "2026-02-23T06:56:41.432281438Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777880", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "d66dd71c-dc64-4e37-a60e-ab0e5dee941b", + "acceptedRequestMessageId": "d66dd71c-dc64-4e37-a60e-ab0e5dee941b/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "d66dd71c-dc64-4e37-a60e-ab0e5dee941b", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDEi" + } + ] + } + } + } + } + }, + { + "eventId": "8", + "eventTime": "2026-02-23T06:56:41.432304437Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777881", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJkNWJjNWMwLTBhMDUtNGFjMS05ZDUyLTE4Yjc1ZDRkNjE4MSI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "9", + "eventTime": "2026-02-23T06:56:41.432322160Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777882", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "a6c6599a-0774-4746-8d68-0a21641292de", + "acceptedRequestMessageId": "a6c6599a-0774-4746-8d68-0a21641292de/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "a6c6599a-0774-4746-8d68-0a21641292de", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDEwIg==" + } + ] + } + } + } + } + }, + { + "eventId": "10", + "eventTime": "2026-02-23T06:56:41.432343822Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777883", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImEwMjgzMGY1LWZlNmMtNDI2NS1iNzBjLTdiMmE2OTFkNDYxMCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "11", + "eventTime": "2026-02-23T06:56:41.432373919Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777884", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "1ca000bf-a781-45c8-a9fb-fa6ddaab87b3", + "acceptedRequestMessageId": "1ca000bf-a781-45c8-a9fb-fa6ddaab87b3/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "1ca000bf-a781-45c8-a9fb-fa6ddaab87b3", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDci" + } + ] + } + } + } + } + }, + { + "eventId": "12", + "eventTime": "2026-02-23T06:56:41.432411958Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777885", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Ijc5OTM3NTM2LWNmMWEtNGY5Ni05YjdmLWE5NWY0MzE1YmIxMCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "13", + "eventTime": "2026-02-23T06:56:41.432432135Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777886", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "598b6a70-d476-4151-aadc-540f96d76372", + "acceptedRequestMessageId": "598b6a70-d476-4151-aadc-540f96d76372/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "598b6a70-d476-4151-aadc-540f96d76372", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDIi" + } + ] + } + } + } + } + }, + { + "eventId": "14", + "eventTime": "2026-02-23T06:56:41.432453739Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777887", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImRkODVkNTMwLWMxN2QtNDg2Ny1iM2QwLWY3Mzk5MWE3ZWJjNCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "15", + "eventTime": "2026-02-23T06:56:41.432471258Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777888", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "f5f7a12c-c159-4c0b-984e-2de35b6f15e5", + "acceptedRequestMessageId": "f5f7a12c-c159-4c0b-984e-2de35b6f15e5/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "f5f7a12c-c159-4c0b-984e-2de35b6f15e5", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDUi" + } + ] + } + } + } + } + }, + { + "eventId": "16", + "eventTime": "2026-02-23T06:56:41.432498893Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777889", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjRkNTgzNTliLTE0OTAtNGNiOS1hMTI4LWQ5OTBlY2UzYmFiNSI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "17", + "eventTime": "2026-02-23T06:56:41.432521769Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777890", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "6fbaa9a4-05d1-4d62-8a88-b76d9e913e06", + "acceptedRequestMessageId": "6fbaa9a4-05d1-4d62-8a88-b76d9e913e06/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "6fbaa9a4-05d1-4d62-8a88-b76d9e913e06", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDMi" + } + ] + } + } + } + } + }, + { + "eventId": "18", + "eventTime": "2026-02-23T06:56:41.432547878Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777891", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjA1ZWY0OGFhLWIzMDItNDcxMy04N2JmLTM5Mzg3MDVjYTkwOCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "19", + "eventTime": "2026-02-23T06:56:41.432567726Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777892", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "58b8174f-2735-4609-867c-e78e08e95ab5", + "acceptedRequestMessageId": "58b8174f-2735-4609-867c-e78e08e95ab5/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "58b8174f-2735-4609-867c-e78e08e95ab5", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDgi" + } + ] + } + } + } + } + }, + { + "eventId": "20", + "eventTime": "2026-02-23T06:56:41.432591390Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777893", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImMwYjBiYzRlLWMyZmItNDlkYy04NTA0LTcwMDhiY2Y4NjNiYSI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "21", + "eventTime": "2026-02-23T06:56:41.432610107Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777894", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "44502c0d-be6f-423f-ba79-cf892a639768", + "acceptedRequestMessageId": "44502c0d-be6f-423f-ba79-cf892a639768/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "44502c0d-be6f-423f-ba79-cf892a639768", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDYi" + } + ] + } + } + } + } + }, + { + "eventId": "22", + "eventTime": "2026-02-23T06:56:41.432639883Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777895", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJkZGRlNmRjLTdmYWYtNGM4Yi05NjZjLTQ4MjFlNmNiMzIzYiI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "23", + "eventTime": "2026-02-23T06:56:41.432656671Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "version": "100265", + "taskId": "269777896", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "543dea54-c2ac-49ed-8098-d84c19724cc9", + "acceptedRequestMessageId": "543dea54-c2ac-49ed-8098-d84c19724cc9/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "543dea54-c2ac-49ed-8098-d84c19724cc9", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InZhbDQi" + } + ] + } + } + } + } + }, + { + "eventId": "24", + "eventTime": "2026-02-23T06:56:41.432685603Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777897", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjM5NjVhYjI3LWUwNDYtNGQyZC1iYjBjLTJkOTJiMThhNTJhOCI=" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "25", + "eventTime": "2026-02-23T06:56:41.432695745Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777898", + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": { + "changeId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkNoYW5nZUlkMSI=" + } + ] + }, + "version": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "26", + "eventTime": "2026-02-23T06:56:41.432728762Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777899", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "f91c4b0b-61ae-4767-81d0-b9938fbf6dea", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "5", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "27", + "eventTime": "2026-02-23T06:56:41.432759704Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777900", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "d66dd71c-dc64-4e37-a60e-ab0e5dee941b", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "7", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "28", + "eventTime": "2026-02-23T06:56:41.432791754Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777901", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "a6c6599a-0774-4746-8d68-0a21641292de", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "9", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "29", + "eventTime": "2026-02-23T06:56:41.432821654Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777902", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "1ca000bf-a781-45c8-a9fb-fa6ddaab87b3", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "11", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "30", + "eventTime": "2026-02-23T06:56:41.432845613Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777903", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "598b6a70-d476-4151-aadc-540f96d76372", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "13", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "31", + "eventTime": "2026-02-23T06:56:41.432872469Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777904", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "f5f7a12c-c159-4c0b-984e-2de35b6f15e5", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "15", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "32", + "eventTime": "2026-02-23T06:56:41.432897002Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777905", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "6fbaa9a4-05d1-4d62-8a88-b76d9e913e06", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "17", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "33", + "eventTime": "2026-02-23T06:56:41.432919961Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777906", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "58b8174f-2735-4609-867c-e78e08e95ab5", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "19", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "34", + "eventTime": "2026-02-23T06:56:41.432944158Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777907", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "44502c0d-be6f-423f-ba79-cf892a639768", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "21", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "35", + "eventTime": "2026-02-23T06:56:41.432966813Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "version": "100265", + "taskId": "269777908", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "543dea54-c2ac-49ed-8098-d84c19724cc9", + "identity": "85298@Gauravs-MacBook-Pro.local" + }, + "acceptedEventId": "23", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmtzIg==" + } + ] + } + } + } + }, + { + "eventId": "36", + "eventTime": "2026-02-23T06:56:41.432995818Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "version": "100265", + "taskId": "269777909", + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": { + "changeId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkNoYW5nZUlkMiI=" + } + ] + }, + "version": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + } + }, + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "37", + "eventTime": "2026-02-23T06:56:54.154170962Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "version": "100265", + "taskId": "269777912", + "workflowExecutionSignaledEventAttributes": { + "signalName": "test", + "input": {}, + "identity": "gaurav.thadani@temporal.io - webui" + } + }, + { + "eventId": "38", + "eventTime": "2026-02-23T06:56:54.154175360Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "version": "100265", + "taskId": "269777913", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "85431@Gauravs-MacBook-Pro.local:fb2f24f2-467c-4aab-82db-b5a2e2a42c0d", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "HelloActivityTaskQueue" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "39", + "eventTime": "2026-02-23T06:56:59.158018087Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT", + "version": "100265", + "taskId": "269777917", + "workflowTaskTimedOutEventAttributes": { + "scheduledEventId": "38", + "timeoutType": "TIMEOUT_TYPE_SCHEDULE_TO_START" + } + }, + { + "eventId": "40", + "eventTime": "2026-02-23T06:56:59.158026481Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "version": "100265", + "taskId": "269777918", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "HelloActivityTaskQueue", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "41", + "eventTime": "2026-02-23T06:56:59.165746096Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "version": "100265", + "taskId": "269777921", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "40", + "identity": "85505@Gauravs-MacBook-Pro.local", + "requestId": "71c5fe60-8ab5-4286-b1b9-d640ed56c2b8", + "historySizeBytes": "6340" + } + }, + { + "eventId": "42", + "eventTime": "2026-02-23T06:56:59.860055251Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_FAILED", + "version": "100265", + "taskId": "269777925", + "workflowTaskFailedEventAttributes": { + "scheduledEventId": "40", + "startedEventId": "41", + "cause": "WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE", + "failure": { + "message": "Failure handling event 26 of type 'EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED' during replay. {WorkflowTaskStartedEventId=41, CurrentStartedEventId=3}", + "source": "JavaSDK", + "stackTrace": "io.temporal.internal.statemachines.WorkflowStateMachines.createEventProcessingException(WorkflowStateMachines.java:445)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:346)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:305)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:246)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:228)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:151)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:115)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:80)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:564)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:403)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:343)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$1(PollTaskExecutor.java:76)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\njava.base/java.lang.Thread.run(Thread.java:1583)\n", + "cause": { + "message": "Version: failure executing RESULT_NOTIFIED_REPLAYING->NON_MATCHING_EVENT, transition history is [CREATED->CHECK_EXECUTION_STATE, REPLAYING->SCHEDULE, MARKER_COMMAND_CREATED_REPLAYING->RECORD_MARKER]", + "source": "JavaSDK", + "stackTrace": "io.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:143)\nio.temporal.internal.statemachines.StateMachine.handleExplicitEvent(StateMachine.java:73)\nio.temporal.internal.statemachines.EntityStateMachineBase.explicitEvent(EntityStateMachineBase.java:75)\nio.temporal.internal.statemachines.VersionStateMachine$InvocationStateMachine.handleEvent(VersionStateMachine.java:165)\nio.temporal.internal.statemachines.CancellableCommand.handleEvent(CancellableCommand.java:53)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:583)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:477)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:344)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:305)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:246)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:228)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:151)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:115)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:80)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:564)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:403)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:343)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$1(PollTaskExecutor.java:76)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\njava.base/java.lang.Thread.run(Thread.java:1583)\n", + "cause": { + "message": "[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'", + "source": "JavaSDK", + "stackTrace": "io.temporal.internal.statemachines.VersionStateMachine$InvocationStateMachine.missingMarkerReplaying(VersionStateMachine.java:328)\nio.temporal.internal.statemachines.FixedTransitionAction.apply(FixedTransitionAction.java:26)\nio.temporal.internal.statemachines.StateMachine.executeTransition(StateMachine.java:139)\nio.temporal.internal.statemachines.StateMachine.handleExplicitEvent(StateMachine.java:73)\nio.temporal.internal.statemachines.EntityStateMachineBase.explicitEvent(EntityStateMachineBase.java:75)\nio.temporal.internal.statemachines.VersionStateMachine$InvocationStateMachine.handleEvent(VersionStateMachine.java:165)\nio.temporal.internal.statemachines.CancellableCommand.handleEvent(CancellableCommand.java:53)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:583)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:477)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:344)\nio.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:305)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.applyServerHistory(ReplayWorkflowRunTaskHandler.java:246)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:228)\nio.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:151)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:115)\nio.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:80)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handleTask(WorkflowWorker.java:564)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:403)\nio.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:343)\nio.temporal.internal.worker.PollTaskExecutor.lambda$process$1(PollTaskExecutor.java:76)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\njava.base/java.lang.Thread.run(Thread.java:1583)\n", + "applicationFailureInfo": { + "type": "io.temporal.worker.NonDeterministicException" + } + }, + "applicationFailureInfo": { + "type": "java.lang.RuntimeException" + } + }, + "applicationFailureInfo": { + "type": "io.temporal.internal.statemachines.InternalWorkflowTaskException" + } + }, + "identity": "85505@Gauravs-MacBook-Pro.local" + } + } + ] +} \ No newline at end of file From c4364434aa71ed31d2fb7f70a06b9bac3efa8123 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Fri, 27 Mar 2026 18:18:26 -0500 Subject: [PATCH 02/11] Add replay ordering reproducer for interleaved updates --- ...nterleavedUpdateReplayTaskHandlerTest.java | 177 ++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java new file mode 100644 index 000000000..889b12603 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java @@ -0,0 +1,177 @@ +package io.temporal.internal.replay; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.uber.m3.tally.NoopScope; +import io.temporal.api.command.v1.Command; +import io.temporal.api.enums.v1.CommandType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.query.v1.WorkflowQuery; +import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.history.VersionMarkerUtils; +import io.temporal.internal.worker.QueryReplayHelper; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowHistoryLoader; +import io.temporal.worker.Worker; +import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class GetVersionInterleavedUpdateReplayTaskHandlerTest { + private static final String HISTORY_RESOURCE = + "testGetVersionInterleavedUpdateReplayHistory.json"; + private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE = + "[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'"; + private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT = + "io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE; + private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1"; + private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; + private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay"; + + @Test + public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands() throws Exception { + WorkflowExecutionHistory history = + WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE); + assertEquals( + Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID), + extractVersionChangeIds(history.getEvents())); + + TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance(); + ReplayWorkflowRunTaskHandler runTaskHandler = null; + try { + Worker worker = testEnvironment.newWorker(TEST_TASK_QUEUE); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + + ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker); + PollWorkflowTaskQueueResponse.Builder replayTask = newReplayTask(history); + runTaskHandler = createStatefulHandler(replayTaskHandler, replayTask); + + WorkflowServiceStubs service = + getField(replayTaskHandler, "service", WorkflowServiceStubs.class); + String namespace = getField(replayTaskHandler, "namespace", String.class); + ServiceWorkflowHistoryIterator historyIterator = + new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope()); + + ReplayWorkflowRunTaskHandler replayHandler = runTaskHandler; + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> replayHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator)); + assertTrue( + "Expected replay failure to contain the nondeterminism marker, but got: " + thrown, + throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_FRAGMENT) + || throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_MESSAGE)); + + List pendingCommands = runTaskHandler.getWorkflowStateMachines().takeCommands(); + int versionMarkerIndex = indexOfVersionMarker(pendingCommands); + int protocolMessageIndex = + indexOfCommandType(pendingCommands, CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE); + + assertNotEquals( + "Expected a pending Version marker command after replay failure", -1, versionMarkerIndex); + assertTrue( + "Expected the pending Version marker to be queued before any update completion protocol command: " + + pendingCommands, + protocolMessageIndex == -1 || versionMarkerIndex < protocolMessageIndex); + } finally { + if (runTaskHandler != null) { + runTaskHandler.close(); + } + testEnvironment.close(); + } + } + + private static PollWorkflowTaskQueueResponse.Builder newReplayTask( + WorkflowExecutionHistory history) { + return PollWorkflowTaskQueueResponse.newBuilder() + .setWorkflowExecution(history.getWorkflowExecution()) + .setWorkflowType( + history + .getHistory() + .getEvents(0) + .getWorkflowExecutionStartedEventAttributes() + .getWorkflowType()) + .setStartedEventId(Long.MAX_VALUE) + .setPreviousStartedEventId(Long.MAX_VALUE) + .setHistory(history.getHistory()) + .setQuery(WorkflowQuery.newBuilder().setQueryType(WorkflowClient.QUERY_TYPE_REPLAY_ONLY)); + } + + private static ReplayWorkflowTaskHandler getNonStickyReplayTaskHandler(Worker worker) + throws Exception { + Object workflowWorker = getField(worker, "workflowWorker", Object.class); + QueryReplayHelper queryReplayHelper = + getField(workflowWorker, "queryReplayHelper", QueryReplayHelper.class); + return getField(queryReplayHelper, "handler", ReplayWorkflowTaskHandler.class); + } + + private static ReplayWorkflowRunTaskHandler createStatefulHandler( + ReplayWorkflowTaskHandler replayTaskHandler, PollWorkflowTaskQueueResponse.Builder replayTask) + throws Exception { + Method method = + ReplayWorkflowTaskHandler.class.getDeclaredMethod( + "createStatefulHandler", + PollWorkflowTaskQueueResponse.Builder.class, + com.uber.m3.tally.Scope.class); + method.setAccessible(true); + return (ReplayWorkflowRunTaskHandler) + method.invoke(replayTaskHandler, replayTask, new NoopScope()); + } + + private static List extractVersionChangeIds(List events) { + List changeIds = new ArrayList<>(); + for (HistoryEvent event : events) { + String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event); + if (changeId != null) { + changeIds.add(changeId); + } + } + return changeIds; + } + + private static int indexOfVersionMarker(List commands) { + for (int i = 0; i < commands.size(); i++) { + if (VersionMarkerUtils.hasVersionMarkerStructure(commands.get(i))) { + return i; + } + } + return -1; + } + + private static int indexOfCommandType(List commands, CommandType commandType) { + for (int i = 0; i < commands.size(); i++) { + if (commands.get(i).getCommandType() == commandType) { + return i; + } + } + return -1; + } + + private static boolean throwableChainContains(Throwable throwable, String expected) { + Throwable current = throwable; + while (current != null) { + if (String.valueOf(current).contains(expected)) { + return true; + } + current = current.getCause(); + } + return false; + } + + private static T getField(Object target, String fieldName, Class expectedType) + throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return expectedType.cast(field.get(target)); + } +} From 3c8d82ec6bb0e85d2ea79098a14f63a45adea962 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Mon, 30 Mar 2026 11:36:07 -0500 Subject: [PATCH 03/11] Delay flagged version replay callback to marker match Run a constrained experiment for the interleaved update replay bug by changing VersionStateMachine replay timing only for histories with SKIP_YIELD_ON_VERSION set. In that path, getVersion still returns synchronously, but the replay callback is no longer fired at fake RECORD_MARKER command creation and is instead delayed until the real MARKER_RECORDED event is matched. The goal of the experiment was to verify that flagged histories do not depend on the current early replay callback or its extra eventLoop scheduling. The legacy interleaved update repro history does not have SKIP_YIELD_ON_VERSION, so it continues to fail unchanged and serves as the control case. Verified with: ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionWithoutCommandEventTest" --tests "io.temporal.workflow.versionTests.GetVersionAndTimerTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionInSignalTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --- .../statemachines/VersionStateMachine.java | 20 ++++++++++++++++++- .../statemachines/WorkflowStateMachines.java | 12 +++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java index 560920cac..7a82e4c01 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java @@ -24,6 +24,7 @@ final class VersionStateMachine { private final String changeId; private final Functions.Func replaying; + private final Functions.Func notifyOnMarkerRecordedReplaying; private final Functions.Proc1 commandSink; private final Functions.Proc1 stateMachineSink; @@ -264,6 +265,11 @@ void notifySkippedExecuting() { } void notifyMarkerCreatedReplaying() { + if (notifyOnMarkerRecordedReplaying.apply()) { + // Flagged histories already get the version synchronously from getVersion(), so delay the + // replay callback until the real marker event is matched. + return; + } try { // it's a replay and the version to return from the getVersion call should be preloaded from // the history @@ -295,6 +301,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() { Preconditions.checkState( preloadedVersion != null, "preloadedVersion is expected to be initialized"); flushPreloadedVersionAndUpdateFromEvent(currentEvent); + if (notifyOnMarkerRecordedReplaying.apply()) { + try { + validateVersionAndThrow(false); + notifyFromVersion(false); + } catch (RuntimeException ex) { + notifyFromException(ex); + } + } } void notifySkippedReplaying() { @@ -366,18 +380,22 @@ void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) { public static VersionStateMachine newInstance( String id, Functions.Func replaying, + Functions.Func notifyOnMarkerRecordedReplaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { - return new VersionStateMachine(id, replaying, commandSink, stateMachineSink); + return new VersionStateMachine( + id, replaying, notifyOnMarkerRecordedReplaying, commandSink, stateMachineSink); } private VersionStateMachine( String changeId, Functions.Func replaying, + Functions.Func notifyOnMarkerRecordedReplaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { this.changeId = Objects.requireNonNull(changeId); this.replaying = Objects.requireNonNull(replaying); + this.notifyOnMarkerRecordedReplaying = Objects.requireNonNull(notifyOnMarkerRecordedReplaying); this.commandSink = Objects.requireNonNull(commandSink); this.stateMachineSink = stateMachineSink; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 372dfb857..f53e7ae19 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -660,7 +660,11 @@ private void preloadVersionMarker(HistoryEvent event) { changeId, (idKey) -> VersionStateMachine.newInstance( - changeId, this::isReplaying, commandSink, stateMachineSink)); + changeId, + this::isReplaying, + () -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION), + commandSink, + stateMachineSink)); Integer version = versionStateMachine.handleMarkersPreload(event); if (versionStateMachine.isWriteVersionChangeSA()) { changeVersions.put(changeId, version); @@ -1248,7 +1252,11 @@ public Integer getVersion( changeId, (idKey) -> VersionStateMachine.newInstance( - changeId, this::isReplaying, commandSink, stateMachineSink)); + changeId, + this::isReplaying, + () -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION), + commandSink, + stateMachineSink)); return stateMachine.getVersion( minSupported, maxSupported, From 0164b45d6dd9589f8f1f4f9645ba4a7a9ba0bca6 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Mon, 30 Mar 2026 11:46:33 -0500 Subject: [PATCH 04/11] Delay version replay callback until marker match Change VersionStateMachine replay semantics so getVersion no longer resumes workflow code when the fake RECORD_MARKER command is created. Replay now waits until the real MARKER_RECORDED event is matched before firing the version callback, which makes version-marker ordering consistent with replayed side effects. This fixes the interleaved update replay bug reproduced by testGetVersionInterleavedUpdateReplayHistory.json. That history previously failed replay with [TMPRL1100] because the second getVersion callback ran ahead of update completion protocol handling. After this change, the same recorded history replays successfully through both WorkflowReplayer and the lower-level direct-query replay task handler. The earlier flag-gated experiment showed that delaying the callback was already safe for histories with SKIP_YIELD_ON_VERSION. This commit removes that temporary gating and applies the same replay ordering to all histories. Verified with: ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionWithoutCommandEventTest" --tests "io.temporal.workflow.versionTests.GetVersionAndTimerTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionInSignalTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" --- .../statemachines/VersionStateMachine.java | 35 +++------- .../statemachines/WorkflowStateMachines.java | 12 +--- ...nterleavedUpdateReplayTaskHandlerTest.java | 68 +++---------------- ...GetVersionInterleavedUpdateReplayTest.java | 18 +---- 4 files changed, 21 insertions(+), 112 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java index 7a82e4c01..02d730597 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java @@ -24,7 +24,6 @@ final class VersionStateMachine { private final String changeId; private final Functions.Func replaying; - private final Functions.Func notifyOnMarkerRecordedReplaying; private final Functions.Proc1 commandSink; private final Functions.Proc1 stateMachineSink; @@ -265,20 +264,8 @@ void notifySkippedExecuting() { } void notifyMarkerCreatedReplaying() { - if (notifyOnMarkerRecordedReplaying.apply()) { - // Flagged histories already get the version synchronously from getVersion(), so delay the - // replay callback until the real marker event is matched. - return; - } - try { - // it's a replay and the version to return from the getVersion call should be preloaded from - // the history - final boolean usePreloadedVersion = true; - validateVersionAndThrow(usePreloadedVersion); - notifyFromVersion(usePreloadedVersion); - } catch (RuntimeException ex) { - notifyFromException(ex); - } + // Replay already preloads the version value, so delay the callback until the real marker + // event is matched. } State createMarkerReplaying() { @@ -301,13 +288,11 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() { Preconditions.checkState( preloadedVersion != null, "preloadedVersion is expected to be initialized"); flushPreloadedVersionAndUpdateFromEvent(currentEvent); - if (notifyOnMarkerRecordedReplaying.apply()) { - try { - validateVersionAndThrow(false); - notifyFromVersion(false); - } catch (RuntimeException ex) { - notifyFromException(ex); - } + try { + validateVersionAndThrow(false); + notifyFromVersion(false); + } catch (RuntimeException ex) { + notifyFromException(ex); } } @@ -380,22 +365,18 @@ void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) { public static VersionStateMachine newInstance( String id, Functions.Func replaying, - Functions.Func notifyOnMarkerRecordedReplaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { - return new VersionStateMachine( - id, replaying, notifyOnMarkerRecordedReplaying, commandSink, stateMachineSink); + return new VersionStateMachine(id, replaying, commandSink, stateMachineSink); } private VersionStateMachine( String changeId, Functions.Func replaying, - Functions.Func notifyOnMarkerRecordedReplaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { this.changeId = Objects.requireNonNull(changeId); this.replaying = Objects.requireNonNull(replaying); - this.notifyOnMarkerRecordedReplaying = Objects.requireNonNull(notifyOnMarkerRecordedReplaying); this.commandSink = Objects.requireNonNull(commandSink); this.stateMachineSink = stateMachineSink; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index f53e7ae19..372dfb857 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -660,11 +660,7 @@ private void preloadVersionMarker(HistoryEvent event) { changeId, (idKey) -> VersionStateMachine.newInstance( - changeId, - this::isReplaying, - () -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION), - commandSink, - stateMachineSink)); + changeId, this::isReplaying, commandSink, stateMachineSink)); Integer version = versionStateMachine.handleMarkersPreload(event); if (versionStateMachine.isWriteVersionChangeSA()) { changeVersions.put(changeId, version); @@ -1252,11 +1248,7 @@ public Integer getVersion( changeId, (idKey) -> VersionStateMachine.newInstance( - changeId, - this::isReplaying, - () -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION), - commandSink, - stateMachineSink)); + changeId, this::isReplaying, commandSink, stateMachineSink)); return stateMachine.getVersion( minSupported, maxSupported, diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java index 889b12603..c05fa44ee 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java @@ -1,13 +1,10 @@ package io.temporal.internal.replay; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import com.uber.m3.tally.NoopScope; -import io.temporal.api.command.v1.Command; -import io.temporal.api.enums.v1.CommandType; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.query.v1.WorkflowQuery; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; @@ -30,16 +27,12 @@ public class GetVersionInterleavedUpdateReplayTaskHandlerTest { private static final String HISTORY_RESOURCE = "testGetVersionInterleavedUpdateReplayHistory.json"; - private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE = - "[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'"; - private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT = - "io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE; private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1"; private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay"; @Test - public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands() throws Exception { + public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable { WorkflowExecutionHistory history = WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE); assertEquals( @@ -62,27 +55,11 @@ public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands() ServiceWorkflowHistoryIterator historyIterator = new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope()); - ReplayWorkflowRunTaskHandler replayHandler = runTaskHandler; - RuntimeException thrown = - assertThrows( - RuntimeException.class, - () -> replayHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator)); - assertTrue( - "Expected replay failure to contain the nondeterminism marker, but got: " + thrown, - throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_FRAGMENT) - || throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_MESSAGE)); - - List pendingCommands = runTaskHandler.getWorkflowStateMachines().takeCommands(); - int versionMarkerIndex = indexOfVersionMarker(pendingCommands); - int protocolMessageIndex = - indexOfCommandType(pendingCommands, CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE); - - assertNotEquals( - "Expected a pending Version marker command after replay failure", -1, versionMarkerIndex); - assertTrue( - "Expected the pending Version marker to be queued before any update completion protocol command: " - + pendingCommands, - protocolMessageIndex == -1 || versionMarkerIndex < protocolMessageIndex); + QueryResult result = + runTaskHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator); + assertNotNull(result); + assertFalse(result.isWorkflowMethodCompleted()); + assertFalse(result.getResponsePayloads().isPresent()); } finally { if (runTaskHandler != null) { runTaskHandler.close(); @@ -139,35 +116,6 @@ private static List extractVersionChangeIds(List events) { return changeIds; } - private static int indexOfVersionMarker(List commands) { - for (int i = 0; i < commands.size(); i++) { - if (VersionMarkerUtils.hasVersionMarkerStructure(commands.get(i))) { - return i; - } - } - return -1; - } - - private static int indexOfCommandType(List commands, CommandType commandType) { - for (int i = 0; i < commands.size(); i++) { - if (commands.get(i).getCommandType() == commandType) { - return i; - } - } - return -1; - } - - private static boolean throwableChainContains(Throwable throwable, String expected) { - Throwable current = throwable; - while (current != null) { - if (String.valueOf(current).contains(expected)) { - return true; - } - current = current.getCause(); - } - return false; - } - private static T getField(Object target, String fieldName, Class expectedType) throws Exception { Field field = target.getClass().getDeclaredField(fieldName); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java index fea296347..45bcaa5be 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -1,8 +1,5 @@ package io.temporal.workflow.versionTests; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; - import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; @@ -26,20 +23,11 @@ public class GetVersionInterleavedUpdateReplayTest { private static final String HISTORY_RESOURCE = "testGetVersionInterleavedUpdateReplayHistory.json"; - private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE = - "[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'"; - private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT = - "io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE; @Test - public void testReplayHistory() { - RuntimeException thrown = - assertThrows( - RuntimeException.class, - () -> - WorkflowReplayer.replayWorkflowExecutionFromResource( - HISTORY_RESOURCE, GreetingWorkflowImpl.class)); - assertTrue(thrown.getMessage().contains(EXPECTED_NON_DETERMINISTIC_FRAGMENT)); + public void testReplayHistory() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + HISTORY_RESOURCE, GreetingWorkflowImpl.class); } public static class Request { From c7b2d39609e8cfa29b497c0f75ac2b9327d0af4c Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Mon, 30 Mar 2026 11:59:49 -0500 Subject: [PATCH 05/11] Doc comments for new regression tests. --- .../GetVersionInterleavedUpdateReplayTaskHandlerTest.java | 7 +++++++ .../GetVersionInterleavedUpdateReplayTest.java | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java index c05fa44ee..eee1b16a0 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java @@ -31,6 +31,13 @@ public class GetVersionInterleavedUpdateReplayTaskHandlerTest { private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay"; + /** + * Regression test for the lower-level replay path behind the public replayer API. + * + *

We replay the same recorded history through the direct-query task handler so the fix is + * verified at the state-machine layer that previously produced the ordering bug. Success here + * shows replay no longer fails before query handling can complete. + */ @Test public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable { WorkflowExecutionHistory history = diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java index 45bcaa5be..22ae37aa5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -24,6 +24,13 @@ public class GetVersionInterleavedUpdateReplayTest { private static final String HISTORY_RESOURCE = "testGetVersionInterleavedUpdateReplayHistory.json"; + /** + * Regression test for the interleaved update/getVersion replay bug. + * + *

This replays the original failing history through the public {@link WorkflowReplayer} API + * and verifies that replay now succeeds instead of surfacing the old {@code [TMPRL1100]} + * nondeterminism failure. + */ @Test public void testReplayHistory() throws Exception { WorkflowReplayer.replayWorkflowExecutionFromResource( From 43ab9dfdb995a3711d2dfb5f1870864500f5c701 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Mon, 30 Mar 2026 17:12:25 -0500 Subject: [PATCH 06/11] Gate VersionStateMachine behavior correction behind new sdk flag `VERSION_WAIT_FOR_MARKER` --- .../io/temporal/internal/common/SdkFlag.java | 5 +++ .../statemachines/VersionStateMachine.java | 38 +++++++++++++++---- .../statemachines/WorkflowStateMachines.java | 1 + 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java index cbfce7e43..5fa57b96e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java @@ -23,6 +23,11 @@ public enum SdkFlag { * condition is resolved before the timeout. */ CANCEL_AWAIT_TIMER_ON_CONDITION(4), + /* + * Changes replay behavior of GetVersion to wait for the matching marker event before executing + * the callback. + */ + VERSION_WAIT_FOR_MARKER(5), UNKNOWN(Integer.MAX_VALUE); private final int value; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java index 02d730597..c218a82db 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java @@ -133,17 +133,20 @@ class InvocationStateMachine private final int minSupported; private final int maxSupported; + private final boolean waitForMarkerRecordedReplaying; private final Functions.Func1 upsertSearchAttributeCallback; private final Functions.Proc2 resultCallback; InvocationStateMachine( int minSupported, int maxSupported, + boolean waitForMarkerRecordedReplaying, Functions.Func1 upsertSearchAttributeCallback, Functions.Proc2 callback) { super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink); this.minSupported = minSupported; this.maxSupported = maxSupported; + this.waitForMarkerRecordedReplaying = waitForMarkerRecordedReplaying; this.upsertSearchAttributeCallback = upsertSearchAttributeCallback; this.resultCallback = Objects.requireNonNull(callback); } @@ -264,8 +267,20 @@ void notifySkippedExecuting() { } void notifyMarkerCreatedReplaying() { - // Replay already preloads the version value, so delay the callback until the real marker - // event is matched. + if (waitForMarkerRecordedReplaying) { + // Replay already preloads the version value, so delay the callback until the real marker + // event is matched. + return; + } + try { + // It's a replay and the version to return from the getVersion call should be preloaded + // from the history. + final boolean usePreloadedVersion = true; + validateVersionAndThrow(usePreloadedVersion); + notifyFromVersion(usePreloadedVersion); + } catch (RuntimeException ex) { + notifyFromException(ex); + } } State createMarkerReplaying() { @@ -288,11 +303,13 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() { Preconditions.checkState( preloadedVersion != null, "preloadedVersion is expected to be initialized"); flushPreloadedVersionAndUpdateFromEvent(currentEvent); - try { - validateVersionAndThrow(false); - notifyFromVersion(false); - } catch (RuntimeException ex) { - notifyFromException(ex); + if (waitForMarkerRecordedReplaying) { + try { + validateVersionAndThrow(false); + notifyFromVersion(false); + } catch (RuntimeException ex) { + notifyFromException(ex); + } } } @@ -392,11 +409,16 @@ private VersionStateMachine( public Integer getVersion( int minSupported, int maxSupported, + boolean waitForMarkerRecordedReplaying, Functions.Func1 upsertSearchAttributeCallback, Functions.Proc2 callback) { InvocationStateMachine ism = new InvocationStateMachine( - minSupported, maxSupported, upsertSearchAttributeCallback, callback); + minSupported, + maxSupported, + waitForMarkerRecordedReplaying, + upsertSearchAttributeCallback, + callback); ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE); ism.explicitEvent(ExplicitEvent.SCHEDULE); // If the state is SKIPPED_REPLAYING that means we: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 372dfb857..c410043a8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -1252,6 +1252,7 @@ public Integer getVersion( return stateMachine.getVersion( minSupported, maxSupported, + checkSdkFlag(SdkFlag.VERSION_WAIT_FOR_MARKER), (version) -> { if (!workflowImplOptions.isEnableUpsertVersionSearchAttributes()) { return null; From 1cf683a8c4acee53927922ec24b59141645f7ede Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Tue, 31 Mar 2026 12:34:19 -0500 Subject: [PATCH 07/11] Disable recorded-history replay test for interleaved histories, since we have elected not to fix that specific history. Make sure there is a run-then-replay regression test which confirms new histories won't be broken that way. --- ...nterleavedUpdateReplayTaskHandlerTest.java | 34 +---- ...GetVersionInterleavedUpdateReplayTest.java | 129 +++++++++++++++++- 2 files changed, 128 insertions(+), 35 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java index eee1b16a0..2e90698ff 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java @@ -5,51 +5,38 @@ import static org.junit.Assert.assertNotNull; import com.uber.m3.tally.NoopScope; -import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.query.v1.WorkflowQuery; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; import io.temporal.client.WorkflowClient; import io.temporal.common.WorkflowExecutionHistory; -import io.temporal.internal.history.VersionMarkerUtils; import io.temporal.internal.worker.QueryReplayHelper; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.testing.TestWorkflowEnvironment; -import io.temporal.testing.WorkflowHistoryLoader; import io.temporal.worker.Worker; +import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest; import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import org.junit.Test; public class GetVersionInterleavedUpdateReplayTaskHandlerTest { - private static final String HISTORY_RESOURCE = - "testGetVersionInterleavedUpdateReplayHistory.json"; private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1"; private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; - private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay"; - /** - * Regression test for the lower-level replay path behind the public replayer API. - * - *

We replay the same recorded history through the direct-query task handler so the fix is - * verified at the state-machine layer that previously produced the ordering bug. Success here - * shows replay no longer fails before query handling can complete. - */ + /** Regression test for the lower-level replay path behind the public replayer API. */ @Test public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable { WorkflowExecutionHistory history = - WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE); + GetVersionInterleavedUpdateReplayTest.captureReplayableHistory(); assertEquals( Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID), - extractVersionChangeIds(history.getEvents())); + GetVersionInterleavedUpdateReplayTest.extractVersionChangeIds(history.getEvents())); TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance(); ReplayWorkflowRunTaskHandler runTaskHandler = null; try { - Worker worker = testEnvironment.newWorker(TEST_TASK_QUEUE); + Worker worker = testEnvironment.newWorker(GetVersionInterleavedUpdateReplayTest.TASK_QUEUE); worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker); @@ -112,17 +99,6 @@ private static ReplayWorkflowRunTaskHandler createStatefulHandler( method.invoke(replayTaskHandler, replayTask, new NoopScope()); } - private static List extractVersionChangeIds(List events) { - List changeIds = new ArrayList<>(); - for (HistoryEvent event : events) { - String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event); - if (changeId != null) { - changeIds.add(changeId); - } - } - return changeIds; - } - private static T getField(Object target, String fieldName, Class expectedType) throws Exception { Field field = target.getClass().getDeclaredField(fieldName); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java index 22ae37aa5..dd08a47bf 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -1,42 +1,159 @@ package io.temporal.workflow.versionTests; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; import io.temporal.common.RetryOptions; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.common.SdkFlag; +import io.temporal.internal.history.VersionMarkerUtils; +import io.temporal.internal.statemachines.WorkflowStateMachines; +import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.testing.WorkflowReplayer; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.Worker; import io.temporal.workflow.UpdateMethod; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.time.Duration; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.UUID; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; /** * Mirrors app/src/main/kotlin/io/temporal/samples/update_nde/GreetingWorkflow.kt from - * gauravthadani/samples-kotlin and replays a history where update completions are interleaved with - * version markers. + * gauravthadani/samples-kotlin and captures histories that exercise interleaved updates around + * getVersion. */ public class GetVersionInterleavedUpdateReplayTest { private static final String HISTORY_RESOURCE = "testGetVersionInterleavedUpdateReplayHistory.json"; + public static final String TASK_QUEUE = "get-version-interleaved-update-replay"; + private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1"; + private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; /** - * Regression test for the interleaved update/getVersion replay bug. + * This recorded history predates {@link SdkFlag#SKIP_YIELD_ON_VERSION}, so it no longer matches + * the histories produced by the current branch. * - *

This replays the original failing history through the public {@link WorkflowReplayer} API - * and verifies that replay now succeeds instead of surfacing the old {@code [TMPRL1100]} - * nondeterminism failure. + *

We keep the fixture around for reference, but do not execute it as part of the suite. Making + * this exact history replay again would require changing replay behavior for histories that did + * not record the newer flags, which may break other existing replays. The fix is to put the + * state-machine behavior change behind an SDK flag {@link SdkFlag#VERSION_WAIT_FOR_MARKER}, and + * to make sure new workflows run with {@link SdkFlag#SKIP_YIELD_ON_VERSION} by default to avoid + * interleaved histories. */ + @Ignore("Recorded history predates SKIP_YIELD_ON_VERSION. Use the live-history replay test.") @Test public void testReplayHistory() throws Exception { WorkflowReplayer.replayWorkflowExecutionFromResource( HISTORY_RESOURCE, GreetingWorkflowImpl.class); } + @Test + public void testReproducedHistoryReplays() throws Exception { + WorkflowExecutionHistory history = captureReplayableHistory(); + + assertEquals( + Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID), + extractVersionChangeIds(history.getEvents())); + assertTrue( + "The reproduced history must advertise SKIP_YIELD_ON_VERSION.", + hasSdkFlag(history, SdkFlag.SKIP_YIELD_ON_VERSION)); + assertTrue( + "The reproduced history must include at least one completed update.", + hasEvent(history.getEvents(), EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)); + + WorkflowReplayer.replayWorkflowExecution(history, GreetingWorkflowImpl.class); + } + + public static WorkflowExecutionHistory captureReplayableHistory() { + List savedInitialFlags = WorkflowStateMachines.initialFlags; + List replayableFlags = new ArrayList<>(savedInitialFlags); + if (!replayableFlags.contains(SdkFlag.SKIP_YIELD_ON_VERSION)) { + replayableFlags.add(SdkFlag.SKIP_YIELD_ON_VERSION); + } + WorkflowStateMachines.initialFlags = Collections.unmodifiableList(replayableFlags); + try (TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance()) { + Worker worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + testEnvironment.start(); + + WorkflowClient client = testEnvironment.getWorkflowClient(); + GreetingWorkflow workflow = + client.newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowId(UUID.randomUUID().toString()) + .build()); + WorkflowExecution execution = WorkflowClient.start(workflow::greeting, "Temporal"); + + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + SDKTestWorkflowRule.waitForOKQuery(workflowStub); + assertEquals("works", workflow.notify("update")); + + return client.fetchHistory(execution.getWorkflowId(), execution.getRunId()); + } finally { + WorkflowStateMachines.initialFlags = savedInitialFlags; + } + } + + public static List extractVersionChangeIds(List events) { + List changeIds = new ArrayList<>(); + for (HistoryEvent event : events) { + String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event); + if (changeId != null) { + changeIds.add(changeId); + } + } + return changeIds; + } + + private static boolean hasSdkFlag(WorkflowExecutionHistory history, SdkFlag flag) { + for (HistoryEvent event : history.getEvents()) { + if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) { + continue; + } + if (!event.getWorkflowTaskCompletedEventAttributes().hasSdkMetadata()) { + continue; + } + if (event + .getWorkflowTaskCompletedEventAttributes() + .getSdkMetadata() + .getLangUsedFlagsList() + .contains(flag.getValue())) { + return true; + } + } + return false; + } + + private static boolean hasEvent(List events, EventType eventType) { + for (HistoryEvent event : events) { + if (event.getEventType() == eventType) { + return true; + } + } + return false; + } + public static class Request { private final String name; private final OffsetDateTime date; From 3154a78b2ad0c421a811a6695a83bbfcc6de26fc Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Tue, 31 Mar 2026 15:47:02 -0500 Subject: [PATCH 08/11] Add flag introduction history on `VERSION_WAIT_FOR_MARKER` SDK flag --- .../src/main/java/io/temporal/internal/common/SdkFlag.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java index 5fa57b96e..d56184e71 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java @@ -26,6 +26,12 @@ public enum SdkFlag { /* * Changes replay behavior of GetVersion to wait for the matching marker event before executing * the callback. + * + * Introduced: 1.34.0 + * + * Enabled: (pending) + * + * Bug: https://github.com/temporalio/sdk-java/issues/2796 */ VERSION_WAIT_FOR_MARKER(5), UNKNOWN(Integer.MAX_VALUE); From 908aa38e36aa332838b117f9c9215ed47ae87a74 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Tue, 31 Mar 2026 17:38:29 -0500 Subject: [PATCH 09/11] Make reproducer for original issue check that it still throws the same exception. --- ...GetVersionInterleavedUpdateReplayTest.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java index dd08a47bf..2317ac064 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -1,6 +1,7 @@ package io.temporal.workflow.versionTests; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import io.temporal.activity.ActivityInterface; @@ -32,7 +33,6 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -52,18 +52,26 @@ public class GetVersionInterleavedUpdateReplayTest { * This recorded history predates {@link SdkFlag#SKIP_YIELD_ON_VERSION}, so it no longer matches * the histories produced by the current branch. * - *

We keep the fixture around for reference, but do not execute it as part of the suite. Making - * this exact history replay again would require changing replay behavior for histories that did - * not record the newer flags, which may break other existing replays. The fix is to put the - * state-machine behavior change behind an SDK flag {@link SdkFlag#VERSION_WAIT_FOR_MARKER}, and - * to make sure new workflows run with {@link SdkFlag#SKIP_YIELD_ON_VERSION} by default to avoid - * interleaved histories. + *

Keep this fixture as a reproducer that old histories without the newer flags still preserve + * the old failure. Making this exact history replay again would require changing replay behavior + * for histories that did not record the newer flags, which may break other existing replays. The + * fix is to put the state-machine behavior change behind an SDK flag {@link + * SdkFlag#VERSION_WAIT_FOR_MARKER}, and to make sure new workflows run with {@link + * SdkFlag#SKIP_YIELD_ON_VERSION} by default to avoid interleaved histories. */ - @Ignore("Recorded history predates SKIP_YIELD_ON_VERSION. Use the live-history replay test.") @Test - public void testReplayHistory() throws Exception { - WorkflowReplayer.replayWorkflowExecutionFromResource( - HISTORY_RESOURCE, GreetingWorkflowImpl.class); + public void testReplayHistoryWithoutFlagStillFails() { + RuntimeException replayFailure = + assertThrows( + RuntimeException.class, + () -> + WorkflowReplayer.replayWorkflowExecutionFromResource( + HISTORY_RESOURCE, GreetingWorkflowImpl.class)); + + assertTrue( + replayFailure + .getMessage() + .contains("[TMPRL1100] getVersion call before the existing version marker event")); } @Test From 515c90dd95c2b37e1e4a0227920eeadbfc32c812 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Wed, 1 Apr 2026 15:13:30 -0500 Subject: [PATCH 10/11] Add test replaying history with `VERSION_WAIT_FOR_MARKER` set in `sdkMetadata.langUsedFlags` --- ...GetVersionInterleavedUpdateReplayTest.java | 22 ++ ...eavedUpdateReplayWaitForMarkerHistory.json | 283 ++++++++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayWaitForMarkerHistory.json diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java index 2317ac064..73f895a74 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java @@ -19,6 +19,7 @@ import io.temporal.internal.history.VersionMarkerUtils; import io.temporal.internal.statemachines.WorkflowStateMachines; import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowHistoryLoader; import io.temporal.testing.WorkflowReplayer; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.Worker; @@ -44,6 +45,8 @@ public class GetVersionInterleavedUpdateReplayTest { private static final String HISTORY_RESOURCE = "testGetVersionInterleavedUpdateReplayHistory.json"; + private static final String WAIT_FOR_MARKER_HISTORY_RESOURCE = + "testGetVersionInterleavedUpdateReplayWaitForMarkerHistory.json"; public static final String TASK_QUEUE = "get-version-interleaved-update-replay"; private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1"; private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2"; @@ -91,6 +94,25 @@ public void testReproducedHistoryReplays() throws Exception { WorkflowReplayer.replayWorkflowExecution(history, GreetingWorkflowImpl.class); } + @Test + public void testReplayHistoryWithWaitForMarkerFlagReplaysWithoutDefaultEnable() throws Exception { + WorkflowExecutionHistory history = + WorkflowHistoryLoader.readHistoryFromResource(WAIT_FOR_MARKER_HISTORY_RESOURCE); + assertTrue( + "The recorded history must advertise VERSION_WAIT_FOR_MARKER.", + hasSdkFlag(history, SdkFlag.VERSION_WAIT_FOR_MARKER)); + + List savedInitialFlags = WorkflowStateMachines.initialFlags; + List replayFlags = new ArrayList<>(savedInitialFlags); + replayFlags.remove(SdkFlag.VERSION_WAIT_FOR_MARKER); + WorkflowStateMachines.initialFlags = Collections.unmodifiableList(replayFlags); + try { + WorkflowReplayer.replayWorkflowExecution(history, GreetingWorkflowImpl.class); + } finally { + WorkflowStateMachines.initialFlags = savedInitialFlags; + } + } + public static WorkflowExecutionHistory captureReplayableHistory() { List savedInitialFlags = WorkflowStateMachines.initialFlags; List replayableFlags = new ArrayList<>(savedInitialFlags); diff --git a/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayWaitForMarkerHistory.json b/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayWaitForMarkerHistory.json new file mode 100644 index 000000000..6beec688c --- /dev/null +++ b/temporal-sdk/src/test/resources/testGetVersionInterleavedUpdateReplayWaitForMarkerHistory.json @@ -0,0 +1,283 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2026-04-01T20:02:55.362Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "GreetingWorkflow" + }, + "taskQueue": { + "name": "get-version-interleaved-update-replay" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IlRlbXBvcmFsIg\u003d\u003d" + } + ] + }, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "f9105a8c-5934-4674-a3bf-b537ade1ef06", + "identity": "46549@ambrose.local", + "firstExecutionRunId": "f9105a8c-5934-4674-a3bf-b537ade1ef06", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2026-04-01T20:02:55.362Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "get-version-interleaved-update-replay" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2026-04-01T20:02:55.368Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "46549@ambrose.local" + } + }, + { + "eventId": "4", + "eventTime": "2026-04-01T20:02:55.423Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "identity": "46549@ambrose.local", + "sdkMetadata": { + "langUsedFlags": [ + 1, + 2, + 3, + 5 + ], + "sdkName": "temporal-java", + "sdkVersion": "1.34.0" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2026-04-01T20:02:55.423Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": { + "changeId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IkNoYW5nZUlkMSI\u003d" + } + ] + }, + "version": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "MQ\u003d\u003d" + } + ] + } + }, + "workflowTaskCompletedEventId": "3" + } + }, + { + "eventId": "6", + "eventTime": "2026-04-01T20:02:55.423Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": { + "changeId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IkNoYW5nZUlkMiI\u003d" + } + ] + }, + "version": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "MQ\u003d\u003d" + } + ] + } + }, + "workflowTaskCompletedEventId": "3" + } + }, + { + "eventId": "7", + "eventTime": "2026-04-01T20:02:55.423Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "get-version-interleaved-update-replay" + }, + "startToCloseTimeout": "10s", + "attempt": 2 + } + }, + { + "eventId": "8", + "eventTime": "2026-04-01T20:02:55.423Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "46549@ambrose.local" + } + }, + { + "eventId": "9", + "eventTime": "2026-04-01T20:02:55.425Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "identity": "46549@ambrose.local", + "sdkMetadata": { + "sdkName": "temporal-java", + "sdkVersion": "1.34.0" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "10", + "eventTime": "2026-04-01T20:02:55.428Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "get-version-interleaved-update-replay" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "11", + "eventTime": "2026-04-01T20:02:55.428Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "10", + "identity": "46549@ambrose.local" + } + }, + { + "eventId": "12", + "eventTime": "2026-04-01T20:02:55.443Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "10", + "identity": "46549@ambrose.local", + "sdkMetadata": { + "sdkName": "temporal-java", + "sdkVersion": "1.34.0" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "13", + "eventTime": "2026-04-01T20:02:55.443Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "4d85fa45-8cae-466a-b92e-196ceca2fd77", + "acceptedRequestMessageId": "4d85fa45-8cae-466a-b92e-196ceca2fd77/request", + "acceptedRequestSequencingEventId": "10", + "acceptedRequest": { + "meta": { + "updateId": "4d85fa45-8cae-466a-b92e-196ceca2fd77", + "identity": "46549@ambrose.local" + }, + "input": { + "header": {}, + "name": "notify", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "InVwZGF0ZSI\u003d" + } + ] + } + } + } + } + }, + { + "eventId": "14", + "eventTime": "2026-04-01T20:02:55.443Z", + "eventType": "EVENT_TYPE_MARKER_RECORDED", + "markerRecordedEventAttributes": { + "markerName": "SideEffect", + "details": { + "data": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IjRjN2JhZDI5LWVkMmYtNDZjNS1hZTY5LTUwOWJhMmFmOWIzYSI\u003d" + } + ] + } + }, + "workflowTaskCompletedEventId": "11" + } + }, + { + "eventId": "15", + "eventTime": "2026-04-01T20:02:55.443Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "4d85fa45-8cae-466a-b92e-196ceca2fd77", + "identity": "46549@ambrose.local" + }, + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IndvcmtzIg\u003d\u003d" + } + ] + } + } + } + } + ] +} \ No newline at end of file From 316a40bfd5af0a54c8a77c399c10e61c6b4f420f Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Wed, 1 Apr 2026 16:39:20 -0500 Subject: [PATCH 11/11] [visibility] attempt to produce interleaved replay behavior with workflow.Async --- .../internal/sync/SyncWorkflowContext.java | 2 +- ...etVersionAsyncLocalActivityReplayTest.java | 184 ++++++++++++++++++ 2 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAsyncLocalActivityReplayTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 747d3a09d..1c90397e0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { * Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic * scheduling if the getVersion call was removed. * */ - if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) { + if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) { // This can happen if we are replaying a workflow and encounter a getVersion call that did not // exist on the original execution and the range does not include the default version. if (versionToUse == null) { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAsyncLocalActivityReplayTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAsyncLocalActivityReplayTest.java new file mode 100644 index 000000000..a794f7f9e --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAsyncLocalActivityReplayTest.java @@ -0,0 +1,184 @@ +package io.temporal.workflow.versionTests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.common.SdkFlag; +import io.temporal.internal.history.VersionMarkerUtils; +import io.temporal.internal.statemachines.WorkflowStateMachines; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.unsafe.WorkflowUnsafe; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class GetVersionAsyncLocalActivityReplayTest { + private static final String TASK_QUEUE = "get-version-async-local-activity-replay"; + private static final String CHANGE_ID = "async-local-activity-change"; + + private static boolean hasReplayed; + + private List savedInitialFlags; + + @Before + public void setUp() { + hasReplayed = false; + savedInitialFlags = WorkflowStateMachines.initialFlags; + WorkflowStateMachines.initialFlags = + Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION); + } + + @After + public void tearDown() { + WorkflowStateMachines.initialFlags = savedInitialFlags; + } + + @Test + public void testGetVersionReplayWithAsyncLocalActivitiesKeepsExpectCBoundToC() throws Exception { + WorkflowExecutionHistory history = executeWorkflowAndCaptureHistory(); + + assertTrue(hasReplayed); + assertTrue(hasVersionMarker(history, CHANGE_ID)); + assertFalse(hasSdkFlag(history, SdkFlag.SKIP_YIELD_ON_VERSION)); + + WorkflowReplayer.replayWorkflowExecution(history, AsyncLocalActivityWorkflowImpl.class); + } + + private WorkflowExecutionHistory executeWorkflowAndCaptureHistory() { + try (TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance()) { + Worker worker = + testEnvironment.newWorker( + TASK_QUEUE, + WorkerOptions.newBuilder() + .setStickyQueueScheduleToStartTimeout(Duration.ZERO) + .build()); + worker.registerWorkflowImplementationTypes(AsyncLocalActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new EchoActivitiesImpl()); + testEnvironment.start(); + + WorkflowClient client = testEnvironment.getWorkflowClient(); + ReplayTestWorkflow workflow = + client.newWorkflowStub( + ReplayTestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowRunTimeout(Duration.ofMinutes(1)) + .setWorkflowTaskTimeout(Duration.ofSeconds(5)) + .build()); + + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + assertEquals("ABC", WorkflowStub.fromTyped(workflow).getResult(String.class)); + + return client.fetchHistory(execution.getWorkflowId(), execution.getRunId()); + } + } + + private static boolean hasSdkFlag(WorkflowExecutionHistory history, SdkFlag flag) { + for (HistoryEvent event : history.getEvents()) { + if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) { + continue; + } + if (!event.getWorkflowTaskCompletedEventAttributes().hasSdkMetadata()) { + continue; + } + if (event + .getWorkflowTaskCompletedEventAttributes() + .getSdkMetadata() + .getLangUsedFlagsList() + .contains(flag.getValue())) { + return true; + } + } + return false; + } + + private static boolean hasVersionMarker(WorkflowExecutionHistory history, String changeId) { + for (HistoryEvent event : history.getEvents()) { + if (changeId.equals(VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event))) { + return true; + } + } + return false; + } + + @WorkflowInterface + public interface ReplayTestWorkflow { + @WorkflowMethod + String execute(); + } + + @ActivityInterface + public interface EchoActivities { + @ActivityMethod + String echo(String value); + } + + public static class EchoActivitiesImpl implements EchoActivities { + @Override + public String echo(String value) { + return value.toUpperCase(Locale.ROOT); + } + } + + public static class AsyncLocalActivityWorkflowImpl implements ReplayTestWorkflow { + private final EchoActivities echoActivities = + Workflow.newLocalActivityStub( + EchoActivities.class, + LocalActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build()); + + @Override + public String execute() { + CompletablePromise expectA = Workflow.newPromise(); + CompletablePromise expectB = Workflow.newPromise(); + Promise asyncBranch = + Async.procedure( + () -> { + expectA.complete(echoActivities.echo("a")); + expectB.complete(echoActivities.echo("b")); + }); + + int version = Workflow.getVersion(CHANGE_ID, Workflow.DEFAULT_VERSION, 1); + assertEquals(1, version); + + String expectC = echoActivities.echo("c"); + asyncBranch.get(); + + assertEquals("A", expectA.get()); + assertEquals("B", expectB.get()); + assertEquals("C", expectC); + + if (WorkflowUnsafe.isReplaying()) { + hasReplayed = true; + } + + Workflow.sleep(Duration.ofSeconds(1)); + return expectA.get() + expectB.get() + expectC; + } + } +}