Skip to content

Commit 8e5c920

Browse files
committed
added tests
1 parent 2389289 commit 8e5c920

File tree

3 files changed

+231
-0
lines changed

3 files changed

+231
-0
lines changed

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,114 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti
605605
}
606606
}
607607

608+
@Test
609+
void rewindFailedOrchestration() throws TimeoutException {
610+
final String orchestratorName = "RewindOrchestration";
611+
final String activityName = "FailOnceActivity";
612+
final AtomicBoolean shouldFail = new AtomicBoolean(true);
613+
614+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
615+
.addOrchestrator(orchestratorName, ctx -> {
616+
String result = ctx.callActivity(activityName, null, String.class).await();
617+
ctx.complete(result);
618+
})
619+
.addActivity(activityName, ctx -> {
620+
if (shouldFail.compareAndSet(true, false)) {
621+
throw new RuntimeException("Simulated transient failure");
622+
}
623+
return "Success after rewind";
624+
})
625+
.buildAndStart();
626+
627+
DurableTaskClient client = this.createClientBuilder().build();
628+
try (worker; client) {
629+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
630+
631+
// Wait for the orchestration to fail
632+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
633+
assertNotNull(instance);
634+
assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus());
635+
636+
// Rewind the failed orchestration with a reason
637+
String rewindReason = "Rewinding after transient failure";
638+
client.rewindInstance(instanceId, rewindReason);
639+
640+
// Wait for the orchestration to complete after rewind
641+
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
642+
assertNotNull(instance);
643+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
644+
assertEquals("Success after rewind", instance.readOutputAs(String.class));
645+
}
646+
}
647+
648+
@Test
649+
void rewindFailedOrchestrationWithoutReason() throws TimeoutException {
650+
final String orchestratorName = "RewindOrchestrationNoReason";
651+
final String activityName = "FailOnceActivityNoReason";
652+
final AtomicBoolean shouldFail = new AtomicBoolean(true);
653+
654+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
655+
.addOrchestrator(orchestratorName, ctx -> {
656+
String result = ctx.callActivity(activityName, null, String.class).await();
657+
ctx.complete(result);
658+
})
659+
.addActivity(activityName, ctx -> {
660+
if (shouldFail.compareAndSet(true, false)) {
661+
throw new RuntimeException("Simulated transient failure");
662+
}
663+
return "Success after rewind without reason";
664+
})
665+
.buildAndStart();
666+
667+
DurableTaskClient client = this.createClientBuilder().build();
668+
try (worker; client) {
669+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
670+
671+
// Wait for the orchestration to fail
672+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
673+
assertNotNull(instance);
674+
assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus());
675+
676+
// Rewind the failed orchestration without providing a reason
677+
client.rewindInstance(instanceId);
678+
679+
// Wait for the orchestration to complete after rewind
680+
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
681+
assertNotNull(instance);
682+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
683+
assertEquals("Success after rewind without reason", instance.readOutputAs(String.class));
684+
}
685+
}
686+
687+
@Test
688+
void rewindCompletedOrchestrationThrowsException() throws TimeoutException {
689+
final String orchestratorName = "RewindCompletedOrchestration";
690+
691+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
692+
.addOrchestrator(orchestratorName, ctx -> {
693+
ctx.complete("Completed successfully");
694+
})
695+
.buildAndStart();
696+
697+
DurableTaskClient client = this.createClientBuilder().build();
698+
try (worker; client) {
699+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
700+
701+
// Wait for the orchestration to complete
702+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
703+
assertNotNull(instance);
704+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
705+
706+
// Attempt to rewind a completed orchestration - should throw or be a no-op
707+
// Based on API behavior, rewind is only valid for FAILED orchestrations
708+
assertThrows(
709+
Exception.class,
710+
() -> client.rewindInstance(instanceId, "Attempting to rewind completed orchestration"),
711+
"Rewinding a completed orchestration should throw an exception"
712+
);
713+
}
714+
}
715+
608716
@Test
609717
void activityFanOut() throws IOException, TimeoutException {
610718
final String orchestratorName = "ActivityFanOut";
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.functions;
4+
5+
import com.microsoft.azure.functions.ExecutionContext;
6+
import com.microsoft.azure.functions.HttpMethod;
7+
import com.microsoft.azure.functions.HttpRequestMessage;
8+
import com.microsoft.azure.functions.HttpResponseMessage;
9+
import com.microsoft.azure.functions.HttpStatus;
10+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
11+
import com.microsoft.azure.functions.annotation.FunctionName;
12+
import com.microsoft.azure.functions.annotation.HttpTrigger;
13+
import com.microsoft.durabletask.DurableTaskClient;
14+
import com.microsoft.durabletask.TaskOrchestrationContext;
15+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
16+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
17+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
18+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
19+
20+
import java.util.Optional;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
23+
/**
24+
* Sample functions to test the rewind functionality.
25+
* Rewind allows a failed orchestration to be replayed from its last known good state.
26+
*/
27+
public class RewindTest {
28+
29+
// Flag to control whether the activity should fail (first call fails, subsequent calls succeed)
30+
private static final AtomicBoolean shouldFail = new AtomicBoolean(true);
31+
32+
/**
33+
* HTTP trigger to start the rewindable orchestration.
34+
*/
35+
@FunctionName("StartRewindableOrchestration")
36+
public HttpResponseMessage startRewindableOrchestration(
37+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
38+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
39+
final ExecutionContext context) {
40+
context.getLogger().info("Starting rewindable orchestration.");
41+
42+
DurableTaskClient client = durableContext.getClient();
43+
String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration");
44+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
45+
return durableContext.createCheckStatusResponse(request, instanceId);
46+
}
47+
48+
/**
49+
* Orchestration that calls an activity which will fail on the first attempt.
50+
* After rewinding, the orchestration will replay and the activity will succeed.
51+
*/
52+
@FunctionName("RewindableOrchestration")
53+
public String rewindableOrchestration(
54+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
55+
// Call the activity that may fail
56+
String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await();
57+
return result;
58+
}
59+
60+
/**
61+
* Activity that fails on the first call but succeeds on subsequent calls.
62+
* This simulates a transient failure that can be recovered by rewinding.
63+
*/
64+
@FunctionName("FailOnceActivity")
65+
public String failOnceActivity(
66+
@DurableActivityTrigger(name = "input") String input,
67+
final ExecutionContext context) {
68+
if (shouldFail.compareAndSet(true, false)) {
69+
context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input);
70+
throw new RuntimeException("Simulated transient failure - rewind to retry");
71+
}
72+
context.getLogger().info("FailOnceActivity: Success for input: " + input);
73+
return input + "-rewound-success";
74+
}
75+
76+
/**
77+
* HTTP trigger to reset the failure flag (useful for testing).
78+
*/
79+
@FunctionName("ResetRewindFailureFlag")
80+
public HttpResponseMessage resetRewindFailureFlag(
81+
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
82+
final ExecutionContext context) {
83+
shouldFail.set(true);
84+
context.getLogger().info("Reset failure flag to true.");
85+
return request.createResponseBuilder(HttpStatus.OK)
86+
.body("Failure flag reset to true")
87+
.build();
88+
}
89+
}

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,40 @@ public void orchestrationPOJO() throws InterruptedException {
229229
assertEquals("\"TESTNAME\"", outputName);
230230
}
231231

232+
@Test
233+
public void rewindFailedOrchestration() throws InterruptedException {
234+
// Reset the failure flag before starting
235+
post("/api/ResetRewindFailureFlag");
236+
237+
// Start the orchestration - it will fail on the first activity call
238+
String startOrchestrationPath = "/api/StartRewindableOrchestration";
239+
Response response = post(startOrchestrationPath);
240+
JsonPath jsonPath = response.jsonPath();
241+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
242+
243+
// Wait for the orchestration to fail
244+
boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
245+
assertTrue(failed, "Orchestration should have failed");
246+
247+
// Get the rewind URI and rewind the orchestration
248+
String rewindPostUri = jsonPath.get("rewindPostUri");
249+
rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality");
250+
Response rewindResponse = post(rewindPostUri);
251+
assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted");
252+
253+
// Wait for the orchestration to complete after rewind
254+
Set<String> continueStates = new HashSet<>();
255+
continueStates.add("Pending");
256+
continueStates.add("Running");
257+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15));
258+
assertTrue(completed, "Orchestration should complete after rewind");
259+
260+
// Verify the output contains the expected result
261+
Response statusResponse = get(statusQueryGetUri);
262+
String output = statusResponse.jsonPath().get("output");
263+
assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output);
264+
}
265+
232266
private boolean pollingCheck(String statusQueryGetUri,
233267
String expectedState,
234268
Set<String> continueStates,

0 commit comments

Comments
 (0)