Skip to content

Commit 5bb85de

Browse files
committed
fix(cloudflare): Forward ctx argument to Workflow.do user callback
1 parent 02744f7 commit 5bb85de

File tree

2 files changed

+35
-10
lines changed

2 files changed

+35
-10
lines changed

packages/cloudflare/src/workflows.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,27 @@ class WrappedWorkflowStep implements WorkflowStep {
6767
private _step: WorkflowStep,
6868
) {}
6969

70-
public async do<T extends Rpc.Serializable<T>>(name: string, callback: () => Promise<T>): Promise<T>;
70+
public async do<T extends Rpc.Serializable<T>>(
71+
name: string,
72+
callback: (...args: unknown[]) => Promise<T>,
73+
): Promise<T>;
7174
public async do<T extends Rpc.Serializable<T>>(
7275
name: string,
7376
config: WorkflowStepConfig,
74-
callback: () => Promise<T>,
77+
callback: (...args: unknown[]) => Promise<T>,
7578
): Promise<T>;
7679
public async do<T extends Rpc.Serializable<T>>(
7780
name: string,
7881
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
79-
maybeCallback?: () => Promise<T>,
82+
maybeCallback?: (...args: unknown[]) => Promise<T>,
8083
): Promise<T> {
8184
// Capture the current scope, so parent span (e.g., a startSpan surrounding step.do) is preserved
8285
const scopeForStep = getCurrentScope();
8386

84-
const userCallback = (maybeCallback || configOrCallback) as () => Promise<T>;
87+
const userCallback = (maybeCallback || configOrCallback) as (...args: unknown[]) => Promise<T>;
8588
const config = typeof configOrCallback === 'function' ? undefined : configOrCallback;
8689

87-
const instrumentedCallback: () => Promise<T> = async () => {
90+
const instrumentedCallback = async (...args: unknown[]): Promise<T> => {
8891
return startSpan(
8992
{
9093
op: 'function.step.do',
@@ -101,7 +104,7 @@ class WrappedWorkflowStep implements WorkflowStep {
101104
},
102105
async span => {
103106
try {
104-
const result = await userCallback();
107+
const result = await userCallback(...args);
105108
span.setStatus({ code: 1 });
106109
return result;
107110
} catch (error) {

packages/cloudflare/test/workflow.test.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ import { deterministicTraceIdFromInstanceId, instrumentWorkflowWithSentry } from
66

77
const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0]!);
88

9+
const MOCK_STEP_CTX = { attempt: 1 };
10+
911
const mockStep: WorkflowStep = {
1012
do: vi
1113
.fn()
1214
.mockImplementation(
1315
async (
1416
_name: string,
15-
configOrCallback: WorkflowStepConfig | (() => Promise<any>),
16-
maybeCallback?: () => Promise<any>,
17+
configOrCallback: WorkflowStepConfig | ((...args: unknown[]) => Promise<any>),
18+
maybeCallback?: (...args: unknown[]) => Promise<any>,
1719
) => {
1820
let count = 0;
1921

@@ -22,9 +24,9 @@ const mockStep: WorkflowStep = {
2224

2325
try {
2426
if (typeof configOrCallback === 'function') {
25-
return await configOrCallback();
27+
return await configOrCallback(MOCK_STEP_CTX);
2628
} else {
27-
return await (maybeCallback ? maybeCallback() : Promise.resolve());
29+
return await (maybeCallback ? maybeCallback(MOCK_STEP_CTX) : Promise.resolve());
2830
}
2931
} catch {
3032
await new Promise(resolve => setTimeout(resolve, 1000));
@@ -427,6 +429,26 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
427429
]);
428430
});
429431

432+
test('Forwards step context (ctx) to user callback', async () => {
433+
const callbackSpy = vi.fn().mockResolvedValue({ ok: true });
434+
435+
class CtxTestWorkflow {
436+
constructor(_ctx: ExecutionContext, _env: unknown) {}
437+
438+
async run(_event: Readonly<WorkflowEvent<Params>>, step: WorkflowStep): Promise<void> {
439+
await step.do('ctx step', callbackSpy);
440+
}
441+
}
442+
443+
const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, CtxTestWorkflow as any);
444+
const workflow = new TestWorkflowInstrumented(mockContext, {}) as CtxTestWorkflow;
445+
const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID };
446+
await workflow.run(event, mockStep);
447+
448+
expect(callbackSpy).toHaveBeenCalledTimes(1);
449+
expect(callbackSpy).toHaveBeenCalledWith(MOCK_STEP_CTX);
450+
});
451+
430452
test('Step.do span becomes child of surrounding custom span', async () => {
431453
class ParentChildWorkflow {
432454
constructor(_ctx: ExecutionContext, _env: unknown) {}

0 commit comments

Comments
 (0)