Skip to content

Commit 54b6e15

Browse files
Wire up deadline on Nexus context (#636)
1 parent a733589 commit 54b6e15

2 files changed

Lines changed: 34 additions & 8 deletions

File tree

src/Temporalio/Worker/NexusWorker.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ public async Task ExecuteAsync()
9999
// we're late-binding it here.
100100
var running = new RunningTask();
101101
runningTasks[task.Task.TaskToken] = running;
102+
var requestDeadline = task.RequestDeadline?.ToDateTime();
102103
#pragma warning disable CA2008 // We don't have to pass a scheduler, factory already implies one
103104
running.Task = worker.Options.NexusTaskFactory.StartNew(
104-
() => HandlePollTaskAsync(running, task.Task)).Unwrap();
105+
() => HandlePollTaskAsync(running, task.Task, requestDeadline)).Unwrap();
105106
#pragma warning restore CA2008
106107
break;
107108
case NexusTask.VariantOneofCase.CancelTask:
@@ -141,12 +142,12 @@ private static void RemoveInvalidHeaders(MapField<string, string> headers)
141142
headers.Remove("Request-Timeout");
142143
}
143144

144-
private async Task HandlePollTaskAsync(RunningTask running, PollNexusTaskQueueResponse task)
145+
private async Task HandlePollTaskAsync(RunningTask running, PollNexusTaskQueueResponse task, DateTime? requestDeadline)
145146
{
146147
try
147148
{
148149
// Handle poll and post back to Core
149-
var completion = await HandlePollTaskInternalAsync(running, task).ConfigureAwait(false);
150+
var completion = await HandlePollTaskInternalAsync(running, task, requestDeadline).ConfigureAwait(false);
150151
logger.LogTrace("Sending Nexus completion: {Completion}", completion);
151152
await worker.BridgeWorker.CompleteNexusTaskAsync(completion).ConfigureAwait(false);
152153
}
@@ -165,7 +166,7 @@ private async Task HandlePollTaskAsync(RunningTask running, PollNexusTaskQueueRe
165166
}
166167

167168
private async Task<StartOperationResponse> HandleStartOperationAsync(
168-
RunningTask running, PollNexusTaskQueueResponse task)
169+
RunningTask running, PollNexusTaskQueueResponse task, DateTime? requestDeadline)
169170
{
170171
// Create context
171172
RemoveInvalidHeaders(task.Request.Header);
@@ -195,6 +196,7 @@ private async Task<StartOperationResponse> HandleStartOperationAsync(
195196
e);
196197
}
197198
}).ToList(),
199+
RequestDeadline = requestDeadline,
198200
};
199201
running.OnCancelReason = reason => context.CancellationReason = reason;
200202

@@ -253,7 +255,7 @@ private async Task<StartOperationResponse> HandleStartOperationAsync(
253255
}
254256

255257
private async Task<CancelOperationResponse> HandleCancelOperationAsync(
256-
RunningTask running, PollNexusTaskQueueResponse task)
258+
RunningTask running, PollNexusTaskQueueResponse task, DateTime? requestDeadline)
257259
{
258260
// Create context
259261
RemoveInvalidHeaders(task.Request.Header);
@@ -266,6 +268,7 @@ private async Task<CancelOperationResponse> HandleCancelOperationAsync(
266268
{
267269
Headers = task.Request.Header.Count == 0 ? null :
268270
new Dictionary<string, string>(task.Request.Header, StringComparer.OrdinalIgnoreCase),
271+
RequestDeadline = requestDeadline,
269272
};
270273
running.OnCancelReason = reason => context.CancellationReason = reason;
271274

@@ -287,22 +290,22 @@ private async Task<CancelOperationResponse> HandleCancelOperationAsync(
287290
}
288291

289292
private async Task<NexusTaskCompletion> HandlePollTaskInternalAsync(
290-
RunningTask running, PollNexusTaskQueueResponse task)
293+
RunningTask running, PollNexusTaskQueueResponse task, DateTime? requestDeadline)
291294
{
292295
try
293296
{
294297
// Handle each case
295298
switch (task.Request.VariantCase)
296299
{
297300
case Request.VariantOneofCase.StartOperation:
298-
var startResp = await HandleStartOperationAsync(running, task).ConfigureAwait(false);
301+
var startResp = await HandleStartOperationAsync(running, task, requestDeadline).ConfigureAwait(false);
299302
return new()
300303
{
301304
TaskToken = task.TaskToken,
302305
Completed = new() { StartOperation = startResp },
303306
};
304307
case Request.VariantOneofCase.CancelOperation:
305-
var cancelResp = await HandleCancelOperationAsync(running, task).ConfigureAwait(false);
308+
var cancelResp = await HandleCancelOperationAsync(running, task, requestDeadline).ConfigureAwait(false);
306309
return new()
307310
{
308311
TaskToken = task.TaskToken,

tests/Temporalio.Tests/Worker/NexusWorkerTests.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,29 @@ await Workflow.CreateNexusWorkflowClient<IStringService>(endpoint).
301301
Assert.Equal("timed out", ctx.CancellationReason);
302302
}
303303

304+
[Fact]
305+
public async Task ExecuteNexusOperationAsync_RequestDeadline_SetOnContext()
306+
{
307+
var contextSource = new TaskCompletionSource<OperationStartContext>();
308+
var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}").
309+
AddNexusService(new AsyncFuncStringService(
310+
start: (ctx, input) =>
311+
{
312+
contextSource.SetResult(ctx);
313+
return Task.FromResult(OperationStartResult.SyncResult($"Hello, {input}"));
314+
}));
315+
var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!);
316+
await RunInWorkflowAsync(workerOptions, async () =>
317+
{
318+
var result = await Workflow.CreateNexusWorkflowClient<IStringService>(endpoint).
319+
ExecuteNexusOperationAsync(svc => svc.DoSomething("some-name"));
320+
Assert.Equal("Hello, some-name", result);
321+
});
322+
var ctx = await contextSource.Task;
323+
// Server always sends a request timeout header, so the deadline should be set
324+
Assert.NotNull(ctx.RequestDeadline);
325+
}
326+
304327
[Fact]
305328
public async Task ExecuteNexusOperationAsync_OperationSummary_FoundInHistory()
306329
{

0 commit comments

Comments
 (0)