Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/ci/app/ci/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ export const TEST_ROUTES: Pick<TestConfig, RouteConfigs>[] = [
},
{
route: "qstash-trigger-fetch/workflows/mainWorkflow",
},
{
route: "invoke-parent-failure/workflow",
}

/**
Expand Down
2 changes: 1 addition & 1 deletion examples/ci/app/ci/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const getTestConfig = async (route: string) => {
)

if (response.status !== 200) {
throw new Error(`Failed to get the error config: ${response.statusText}`)
throw new Error(`Failed to get the test config: ${response.statusText}`)
}

const testConfig = await response.json() as Parameters<typeof testServe>[1]
Expand Down
98 changes: 98 additions & 0 deletions examples/ci/app/test-routes/invoke-parent-failure/[...]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

import { Client, WorkflowContext, WorkflowNonRetryableError } from "@upstash/workflow";
import { createWorkflow, serveMany } from "@upstash/workflow/nextjs";
import { BASE_URL, CI_RANDOM_ID_HEADER, CI_ROUTE_HEADER } from "app/ci/constants";
import { saveResult } from "app/ci/upstash/redis";
import { testServe } from "app/ci/utils";

const client = new Client({
baseUrl: process.env.QSTASH_URL!,
token: process.env.QSTASH_TOKEN!,
})

const FAILING_STEP_NAME = "failing step"
const INVOKE_CHILD_STEP_NAME = "invoke child"

const workflow = createWorkflow(async (context: WorkflowContext) => {
const workflowRunId = await context.run("step 1", async () => {
console.log("workflow says hi")
return `workflow-run-id-${(Math.random() * 1000).toFixed(0)}`
})

await context.invoke("invoke child", {
workflow: testWorkflow,
body: undefined,
headers: {
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
},
workflowRunId,
})

await context.run("verify step", async () => {

for (let i = 0; i < 30; i++) {
const workflowLogs = await client.logs({ workflowRunId: `wfr_${workflowRunId}` })
const workflowRun = workflowLogs.runs[0]

if (workflowRun && workflowRun.steps[1] && workflowRun.steps[1].type === "parallel") {
const invokeChild = workflowRun.steps[1].steps?.find(s => s.stepName === INVOKE_CHILD_STEP_NAME)
const failingStep = workflowRun.steps[1].steps?.find(s => s.stepName === FAILING_STEP_NAME)

if (invokeChild && invokeChild.state === "STEP_SUCCESS" && failingStep && failingStep.state === "STEP_FAILED") {
return { success: true }
}
}
// sleep for 1 sec
await new Promise(r => setTimeout(r, 1000));
}

console.warn("child workflow did not fail within expected time");
throw new WorkflowNonRetryableError("child workflow did not fail within expected time")
})

await saveResult(
context,
"done invoke"
)
})

const testWorkflow = createWorkflow(async (context: WorkflowContext) => {
await Promise.all([
context.invoke(INVOKE_CHILD_STEP_NAME, {
workflow: childWorkflow,
body: undefined,
headers: {
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
},
}),
context.run(FAILING_STEP_NAME, async () => {
// sleep for 1 sec
await new Promise(r => setTimeout(r, 1000));
throw new WorkflowNonRetryableError("step failed")
})
])

})

const childWorkflow = createWorkflow(async (context: WorkflowContext) => {
await context.sleep("sleep 3s", 3)
await context.run("child step", async () => {
console.log("child workflow step")
})
})

export const { POST, GET } = testServe(serveMany({
workflow,
testWorkflow,
childWorkflow,
}, {
baseUrl: BASE_URL
}),
{
expectedCallCount: 11,
expectedResult: "done invoke",
payload: undefined,
}
)
2 changes: 2 additions & 0 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ describe("workflow client", () => {
workflowRunId,
steps: [],
url: "https://httpstat.us/200",
workflowRunCreatedAt: 0,
}),
});

Expand Down Expand Up @@ -656,6 +657,7 @@ describe("workflow client", () => {
workflowRunId,
steps: [],
url: "https://httpstat.us/400",
workflowRunCreatedAt: 0,
}),
failureUrl: "https://400check.requestcatcher.com/",
retries: 0,
Expand Down
1 change: 1 addition & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export class Client {
workflowRunId: finalWorkflowRunId,
telemetry: option.disableTelemetry ? undefined : { sdk: SDK_TELEMETRY },
label: option.label,
workflowRunCreatedAt: Date.now(), // pass a timestamp (server will override it)
});

return {
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Telemetry } from "./types";
export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
export const WORKFLOW_CREATED_AT_HEADER = "Upstash-Workflow-CreatedAt";
export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure";
export const WORKFLOW_FAILURE_CALLBACK_HEADER = "Upstash-Workflow-Failure-Callback";
export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set";
Expand Down
1 change: 1 addition & 0 deletions src/context/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ describe("auto-executor", () => {
steps,
url: WORKFLOW_ENDPOINT,
invokeCount: 7,
workflowRunCreatedAt: 0,
});
};

Expand Down
21 changes: 21 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers({ "upstash-label": label }) as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
label,
});
expect(context.label).toBe(label);
Expand All @@ -43,6 +44,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const throws = async () => {
Expand All @@ -67,6 +69,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const throws = async () => {
Expand All @@ -89,6 +92,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const throws = async () => {
Expand All @@ -111,6 +115,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const throws = async () => {
Expand All @@ -134,6 +139,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 5,
workflowRunCreatedAt: 0,
});

await mockQStashServer({
Expand Down Expand Up @@ -183,6 +189,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 5,
workflowRunCreatedAt: 0,
});

const eventId = "my-event-id";
Expand Down Expand Up @@ -235,6 +242,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 1,
workflowRunCreatedAt: 0,
});

const eventId = "my-event-id";
Expand Down Expand Up @@ -306,6 +314,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 7,
workflowRunCreatedAt: 0,
});
await mockQStashServer({
execute: () => {
Expand Down Expand Up @@ -384,6 +393,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});
await mockQStashServer({
execute: () => {
Expand Down Expand Up @@ -445,6 +455,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});
await mockQStashServer({
execute: () => {
Expand Down Expand Up @@ -509,6 +520,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});
try {
await context.cancel();
Expand All @@ -530,6 +542,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

await mockQStashServer({
Expand Down Expand Up @@ -559,6 +572,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

await mockQStashServer({
Expand Down Expand Up @@ -589,6 +603,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const openAIToken = `hello-there`;
Expand Down Expand Up @@ -670,6 +685,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 5,
workflowRunCreatedAt: 0,
});

const openAIToken = `hello-there`;
Expand Down Expand Up @@ -753,6 +769,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const resendToken = `hello-there`;
Expand Down Expand Up @@ -832,6 +849,7 @@ describe("context tests", () => {
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
invokeCount: 3,
workflowRunCreatedAt: 0,
});

const anthropicToken = `hello-there`;
Expand Down Expand Up @@ -925,6 +943,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

await mockQStashServer({
Expand Down Expand Up @@ -972,6 +991,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

const webhook = {
Expand Down Expand Up @@ -1028,6 +1048,7 @@ describe("context tests", () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
workflowRunCreatedAt: 0,
});

// First create webhook
Expand Down
7 changes: 7 additions & 0 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export class WorkflowContext<TInitialPayload = unknown> {
* Run id of the workflow
*/
public readonly workflowRunId: string;
/**
* Creation time of the workflow run
*/
public readonly workflowRunCreatedAt: number;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new workflowRunCreatedAt field in context

/**
* URL of the workflow
*
Expand Down Expand Up @@ -153,6 +157,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
constructor({
qstashClient,
workflowRunId,
workflowRunCreatedAt,
headers,
steps,
url,
Expand All @@ -165,6 +170,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
}: {
qstashClient: WorkflowClient;
workflowRunId: string;
workflowRunCreatedAt: number;
headers: Headers;
steps: Step[];
url: string;
Expand All @@ -177,6 +183,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
}) {
this.qstashClient = qstashClient;
this.workflowRunId = workflowRunId;
this.workflowRunCreatedAt = workflowRunCreatedAt;
this.steps = steps;
this.url = url;
this.headers = headers;
Expand Down
1 change: 1 addition & 0 deletions src/context/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const createMockContext = () => {
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "test-wfr-id",
workflowRunCreatedAt: 0,
});
};

Expand Down
1 change: 1 addition & 0 deletions src/context/steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ export class LazyInvokeStep<TResult = unknown, TBody = unknown> extends BaseLazy
Object.entries(invokerHeaders).map((pairs) => [pairs[0], [pairs[1]]])
),
workflowRunId: context.workflowRunId,
workflowRunCreatedAt: context.workflowRunCreatedAt,
workflowUrl: context.url,
step,
};
Expand Down
Loading