-
Notifications
You must be signed in to change notification settings - Fork 14
Wrap gRPC StatusRuntimeException across all DurableTaskGrpcClient methods #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1bb999f
20baafb
bb4a5bd
f7b7d1c
a2d7c00
42765bc
a4fe62f
539916b
d0241fb
54035e4
261fec0
7cfe800
aede530
eb1ad65
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -161,6 +161,8 @@ public String scheduleNewOrchestrationInstance( | |
| CreateInstanceRequest request = builder.build(); | ||
| CreateInstanceResponse response = this.sidecarClient.startInstance(request); | ||
| return response.getInstanceId(); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "scheduleNewOrchestrationInstance"); | ||
| } finally { | ||
| createScope.close(); | ||
| createSpan.end(); | ||
|
|
@@ -184,7 +186,11 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) | |
| } | ||
|
|
||
| RaiseEventRequest request = builder.build(); | ||
| this.sidecarClient.raiseEvent(request); | ||
| try { | ||
| this.sidecarClient.raiseEvent(request); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "raiseEvent"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -193,8 +199,12 @@ public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getI | |
| .setInstanceId(instanceId) | ||
| .setGetInputsAndOutputs(getInputsAndOutputs) | ||
| .build(); | ||
| GetInstanceResponse response = this.sidecarClient.getInstance(request); | ||
| return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); | ||
| try { | ||
| GetInstanceResponse response = this.sidecarClient.getInstance(request); | ||
| return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "getInstanceMetadata"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -219,7 +229,13 @@ public OrchestrationMetadata waitForInstanceStart(String instanceId, Duration ti | |
| if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { | ||
| throw new TimeoutException("Start orchestration timeout reached."); | ||
| } | ||
| throw e; | ||
| Exception translated = StatusRuntimeExceptionHelper.toException(e, "waitForInstanceStart"); | ||
| if (translated instanceof TimeoutException) { | ||
| throw (TimeoutException) translated; | ||
| } else if (translated instanceof RuntimeException) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just have the |
||
| throw (RuntimeException) translated; | ||
| } | ||
| throw new RuntimeException(translated); | ||
|
Comment on lines
229
to
+238
|
||
| } | ||
| return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); | ||
| } | ||
|
|
@@ -246,7 +262,13 @@ public OrchestrationMetadata waitForInstanceCompletion(String instanceId, Durati | |
| if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { | ||
| throw new TimeoutException("Orchestration instance completion timeout reached."); | ||
| } | ||
| throw e; | ||
| Exception translated = StatusRuntimeExceptionHelper.toException(e, "waitForInstanceCompletion"); | ||
| if (translated instanceof TimeoutException) { | ||
| throw (TimeoutException) translated; | ||
| } else if (translated instanceof RuntimeException) { | ||
| throw (RuntimeException) translated; | ||
| } | ||
| throw new RuntimeException(translated); | ||
| } | ||
| return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); | ||
| } | ||
|
|
@@ -263,7 +285,11 @@ public void terminate(String instanceId, @Nullable Object output) { | |
| if (serializeOutput != null){ | ||
| builder.setOutput(StringValue.of(serializeOutput)); | ||
| } | ||
| this.sidecarClient.terminateInstance(builder.build()); | ||
| try { | ||
| this.sidecarClient.terminateInstance(builder.build()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "terminate"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -277,8 +303,12 @@ public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery qu | |
| instanceQueryBuilder.setMaxInstanceCount(query.getMaxInstanceCount()); | ||
| query.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> instanceQueryBuilder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status)))); | ||
| query.getTaskHubNames().forEach(taskHubName -> Optional.ofNullable(taskHubName).ifPresent(name -> instanceQueryBuilder.addTaskHubNames(StringValue.of(name)))); | ||
| QueryInstancesResponse queryInstancesResponse = this.sidecarClient.queryInstances(QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build()); | ||
| return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs()); | ||
| try { | ||
| QueryInstancesResponse queryInstancesResponse = this.sidecarClient.queryInstances(QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build()); | ||
| return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "queryInstances"); | ||
| } | ||
| } | ||
|
|
||
| private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs){ | ||
|
|
@@ -291,12 +321,20 @@ private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse quer | |
|
|
||
| @Override | ||
| public void createTaskHub(boolean recreateIfExists) { | ||
| this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build()); | ||
| try { | ||
| this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "createTaskHub"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteTaskHub() { | ||
| this.sidecarClient.deleteTaskHub(DeleteTaskHubRequest.newBuilder().build()); | ||
| try { | ||
| this.sidecarClient.deleteTaskHub(DeleteTaskHubRequest.newBuilder().build()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "deleteTaskHub"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -305,8 +343,12 @@ public PurgeResult purgeInstance(String instanceId) { | |
| .setInstanceId(instanceId) | ||
| .build(); | ||
|
|
||
| PurgeInstancesResponse response = this.sidecarClient.purgeInstances(request); | ||
| return toPurgeResult(response); | ||
| try { | ||
| PurgeInstancesResponse response = this.sidecarClient.purgeInstances(request); | ||
| return toPurgeResult(response); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "purgeInstance"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -334,7 +376,13 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t | |
| String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout); | ||
| throw new TimeoutException(timeOutException); | ||
| } | ||
| throw e; | ||
| Exception translated = StatusRuntimeExceptionHelper.toException(e, "purgeInstances"); | ||
| if (translated instanceof TimeoutException) { | ||
| throw (TimeoutException) translated; | ||
| } else if (translated instanceof RuntimeException) { | ||
| throw (RuntimeException) translated; | ||
| } | ||
| throw new RuntimeException(translated); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -345,7 +393,11 @@ public void suspendInstance(String instanceId, @Nullable String reason) { | |
| if (reason != null) { | ||
| suspendRequestBuilder.setReason(StringValue.of(reason)); | ||
| } | ||
| this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); | ||
| try { | ||
| this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "suspendInstance"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -355,7 +407,11 @@ public void resumeInstance(String instanceId, @Nullable String reason) { | |
| if (reason != null) { | ||
| resumeRequestBuilder.setReason(StringValue.of(reason)); | ||
| } | ||
| this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); | ||
| try { | ||
| this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); | ||
| } catch (StatusRuntimeException e) { | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "resumeInstance"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -377,7 +433,7 @@ public void rewindInstance(String instanceId, @Nullable String reason) { | |
| throw new IllegalStateException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also get rid of this custom exception handling since this is now handled in our helper |
||
| "Orchestration instance '" + instanceId + "' is not in a failed state and cannot be rewound.", e); | ||
| } | ||
| throw e; | ||
| throw StatusRuntimeExceptionHelper.toRuntimeException(e, "rewindInstance"); | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also add handling for the ALREADY_EXISTS status code (this can sometimes be thrown when creating a new orchestration with the same instance ID)? |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
| package com.microsoft.durabletask; | ||
|
|
||
| import io.grpc.Status; | ||
| import io.grpc.StatusRuntimeException; | ||
|
|
||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| /** | ||
| * Utility class to translate gRPC {@link StatusRuntimeException} into SDK-level exceptions. | ||
| * This ensures callers do not need to depend on gRPC types directly. | ||
| * | ||
| * <p>Status code mappings: | ||
| * <ul> | ||
| * <li>{@code CANCELLED} → {@link CancellationException}</li> | ||
| * <li>{@code DEADLINE_EXCEEDED} → {@link TimeoutException} (via {@link #toException})</li> | ||
| * <li>{@code INVALID_ARGUMENT} → {@link IllegalArgumentException}</li> | ||
| * <li>{@code FAILED_PRECONDITION} → {@link IllegalStateException}</li> | ||
| * <li>{@code NOT_FOUND} → {@link IllegalArgumentException}</li> | ||
| * <li>{@code UNIMPLEMENTED} → {@link UnsupportedOperationException}</li> | ||
| * <li>All other codes → {@link RuntimeException}</li> | ||
| * </ul> | ||
|
Comment on lines
+15
to
+24
|
||
| */ | ||
| final class StatusRuntimeExceptionHelper { | ||
|
|
||
| /** | ||
| * Translates a {@link StatusRuntimeException} into an appropriate SDK-level unchecked exception. | ||
| * | ||
| * @param e the gRPC exception to translate | ||
| * @param operationName the name of the operation that failed, used in exception messages | ||
| * @return a translated RuntimeException (never returns null) | ||
| */ | ||
| static RuntimeException toRuntimeException(StatusRuntimeException e, String operationName) { | ||
| Status.Code code = e.getStatus().getCode(); | ||
| String message = formatMessage(operationName, code, getDescriptionOrDefault(e)); | ||
| switch (code) { | ||
| case CANCELLED: | ||
| return createCancellationException(e, operationName); | ||
| case INVALID_ARGUMENT: | ||
| return new IllegalArgumentException(message, e); | ||
| case FAILED_PRECONDITION: | ||
| return new IllegalStateException(message, e); | ||
| case NOT_FOUND: | ||
| return new IllegalArgumentException(message, e); | ||
| case UNIMPLEMENTED: | ||
| return new UnsupportedOperationException(message, e); | ||
| default: | ||
| return new RuntimeException(message, e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Translates a {@link StatusRuntimeException} into an appropriate SDK-level checked exception | ||
| * for operations that declare {@code throws TimeoutException}. | ||
| * <p> | ||
| * Note: The DEADLINE_EXCEEDED case is included for completeness and future-proofing, even | ||
| * though current call sites handle DEADLINE_EXCEEDED before falling through to this method. | ||
| * This ensures centralized translation if call sites are refactored in the future. | ||
| * | ||
| * @param e the gRPC exception to translate | ||
| * @param operationName the name of the operation that failed, used in exception messages | ||
| * @return a translated Exception (never returns null) | ||
| */ | ||
| static Exception toException(StatusRuntimeException e, String operationName) { | ||
| Status.Code code = e.getStatus().getCode(); | ||
| String message = formatMessage(operationName, code, getDescriptionOrDefault(e)); | ||
| switch (code) { | ||
| case DEADLINE_EXCEEDED: | ||
| return new TimeoutException(message); | ||
| case CANCELLED: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just call |
||
| return createCancellationException(e, operationName); | ||
| case INVALID_ARGUMENT: | ||
| return new IllegalArgumentException(message, e); | ||
| case FAILED_PRECONDITION: | ||
| return new IllegalStateException(message, e); | ||
| case NOT_FOUND: | ||
| return new IllegalArgumentException(message, e); | ||
| case UNIMPLEMENTED: | ||
| return new UnsupportedOperationException(message, e); | ||
| default: | ||
| return new RuntimeException(message, e); | ||
| } | ||
| } | ||
|
Comment on lines
+66
to
+85
|
||
|
|
||
| private static CancellationException createCancellationException( | ||
| StatusRuntimeException e, String operationName) { | ||
| CancellationException ce = new CancellationException( | ||
| "The " + operationName + " operation was canceled."); | ||
| ce.initCause(e); | ||
| return ce; | ||
| } | ||
|
|
||
| private static String formatMessage(String operationName, Status.Code code, String description) { | ||
| return "The " + operationName + " operation failed with a " + code + " gRPC status: " + description; | ||
| } | ||
|
Comment on lines
+87
to
+97
|
||
|
|
||
| private static String getDescriptionOrDefault(StatusRuntimeException e) { | ||
| String description = e.getStatus().getDescription(); | ||
| return description != null ? description : "(no description)"; | ||
| } | ||
|
|
||
| // Cannot be instantiated | ||
| private StatusRuntimeExceptionHelper() { | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this is confusing because now we have special-cased this exception-handling even though we explicitly added logic for it in the
StatusRuntimeExceptionHelper. Can we just have this class handle all exceptions?