Skip to content

Commit ba82668

Browse files
committed
fixed tests
1 parent 3398336 commit ba82668

File tree

2 files changed

+33
-27
lines changed

2 files changed

+33
-27
lines changed

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,33 @@ public void startAndBlock() {
304304
.setCompletionToken(workItem.getCompletionToken())
305305
.build();
306306

307-
// Externalize large payloads in outgoing response
308-
if (this.payloadHelper != null) {
309-
response = externalizeOrchestratorResponsePayloads(response);
307+
// Externalize large payloads and send (with optional chunking).
308+
// If externalization or chunking fails, report as orchestration failure.
309+
try {
310+
if (this.payloadHelper != null) {
311+
response = externalizeOrchestratorResponsePayloads(response);
312+
}
313+
sendOrchestratorResponse(response);
314+
} catch (IllegalArgumentException | IllegalStateException e) {
315+
logger.log(Level.WARNING,
316+
"Failed to send orchestrator response for instance '" +
317+
orchestratorRequest.getInstanceId() + "': " + e.getMessage(), e);
318+
CompleteOrchestrationAction failAction = CompleteOrchestrationAction.newBuilder()
319+
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
320+
.setFailureDetails(TaskFailureDetails.newBuilder()
321+
.setErrorType(e.getClass().getName())
322+
.setErrorMessage(e.getMessage())
323+
.build())
324+
.build();
325+
OrchestratorResponse failResponse = OrchestratorResponse.newBuilder()
326+
.setInstanceId(orchestratorRequest.getInstanceId())
327+
.setCompletionToken(workItem.getCompletionToken())
328+
.addActions(OrchestratorAction.newBuilder()
329+
.setCompleteOrchestration(failAction)
330+
.build())
331+
.build();
332+
this.sidecarClient.completeOrchestratorTask(failResponse);
310333
}
311-
312-
// Chunk the response if it exceeds gRPC message size limit
313-
sendOrchestratorResponse(response);
314334
} else {
315335
switch(versioningOptions.getFailureStrategy()) {
316336
case FAIL:

client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -624,38 +624,24 @@ void autochunk_mixedActions_completesSuccessfully() throws TimeoutException {
624624
@Test
625625
void autochunk_singleActionExceedsChunkSize_failsWithClearError() throws TimeoutException {
626626
final String orchestratorName = "AutochunkOversizedOrch";
627-
final String activityName = "OversizedActivity";
628-
// Create an activity that returns a payload larger than 1MB chunk size
629-
// Since externalization threshold is set very high, this will NOT be externalized
630-
// and will try to be sent as a single action exceeding the chunk size
627+
// Create an orchestrator that completes with a payload larger than 1MB chunk size.
628+
// Externalization is NOT configured so the large payload stays inline in the
629+
// CompleteOrchestration action, which exceeds the chunk size.
631630
final int payloadSize = 1_200_000;
632631

633-
// Use a store but set threshold very high so the payload won't be externalized
634-
InMemoryPayloadStore store = new InMemoryPayloadStore();
635-
LargePayloadOptions options = new LargePayloadOptions.Builder()
636-
.setThresholdBytes(1_048_576) // 1 MiB — max allowed threshold
637-
.setMaxExternalizedPayloadBytes(10_000_000)
638-
.build();
639-
640632
TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder();
641-
workerBuilder.innerBuilder.useExternalizedPayloads(store, options);
642633
workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes(
643634
DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES);
644635
workerBuilder.addOrchestrator(orchestratorName, ctx -> {
645-
String result = ctx.callActivity(activityName, null, String.class).await();
646-
ctx.complete(result.length());
647-
});
648-
workerBuilder.addActivity(activityName, ctx -> {
649636
StringBuilder sb = new StringBuilder(payloadSize);
650637
for (int i = 0; i < payloadSize; i++) {
651638
sb.append('Z');
652639
}
653-
return sb.toString();
640+
ctx.complete(sb.toString());
654641
});
655642
DurableTaskGrpcWorker worker = workerBuilder.buildAndStart();
656643

657644
DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder();
658-
clientBuilder.useExternalizedPayloads(store, options);
659645
DurableTaskClient client = clientBuilder.build();
660646

661647
try (worker; client) {
@@ -718,7 +704,7 @@ void largeInputOutputAndCustomStatus_allExternalized() throws TimeoutException {
718704
@Test
719705
void continueAsNew_withLargeCustomStatusAndFinalOutput() throws TimeoutException {
720706
final String orchestratorName = "ContinueAsNewAllOrch";
721-
final int payloadSize = 800_000;
707+
final int payloadSize = 1_000_000;
722708
final int iterations = 3;
723709

724710
InMemoryPayloadStore store = new InMemoryPayloadStore();
@@ -771,8 +757,8 @@ void largeSubOrchestrationAndActivityOutput_combined() throws TimeoutException {
771757
final String parentOrchName = "CombinedParentOrch";
772758
final String childOrchName = "CombinedChildOrch";
773759
final String activityName = "CombinedActivity";
774-
final int subOrchPayloadSize = 650_000;
775-
final int activityPayloadSize = 820_000;
760+
final int subOrchPayloadSize = 1_000_000;
761+
final int activityPayloadSize = 1_000_000;
776762

777763
InMemoryPayloadStore store = new InMemoryPayloadStore();
778764
LargePayloadOptions options = new LargePayloadOptions.Builder().build();

0 commit comments

Comments
 (0)