From 889685405a0abfd67a152e8d68931772cffdcc1b Mon Sep 17 00:00:00 2001 From: sw-yx Date: Thu, 13 Jan 2022 08:04:16 +0800 Subject: [PATCH 1/6] init --- .../temporal-time-utils/EntityWorkflow.ts | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 packages/temporal-time-utils/EntityWorkflow.ts diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts new file mode 100644 index 0000000..340b49e --- /dev/null +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -0,0 +1,62 @@ +import * as wf from '@temporalio/workflow'; + +const noop = async () => {} +type Update = any // no way to export this usefully? +type ThingToInvoke = { activity: string, activityOptions: wf.ActivityOptions } +| { workflow: string, workflowOptions: wf.ChildWorkflowOptions } +export const EntityUpdateSignal = wf.defineSignal<[Update]>('EntityUpdateSignal') // no real way to pass the types +export class Entity { + MAX_ITERATIONS: number + setup: (input: Input) => Promise + cleanup: () => Promise + thingToInvoke: ThingToInvoke + + constructor(thingToInvoke: ThingToInvoke, maxIterations = 1000, setup = noop, cleanup = noop) { + this.thingToInvoke = thingToInvoke; + this.MAX_ITERATIONS = maxIterations // can override if needed + this.setup = setup + this.cleanup = cleanup + this.workflow = this.workflow.bind(this) + } + + async workflow(input: Input, isContinued = false) { + try { + const pendingUpdates = Array(); + wf.setHandler(EntityUpdateSignal, (updateCommand: Input) => { + pendingUpdates.push(updateCommand); + }); + + if (!isContinued) { + await this.setup(input); + } + + for (let iteration = 1; iteration <= this.MAX_ITERATIONS; ++iteration) { + // Automatically continue as new after a day if no updates were received + await wf.condition(() => pendingUpdates.length > 0, '1 day'); + + while (pendingUpdates.length) { + const update = pendingUpdates.shift(); + if ('activity' in this.thingToInvoke) { + const acts = wf.proxyActivities(this.thingToInvoke.activityOptions); + await acts[this.thingToInvoke.activity](update); + } else if ('workflow' in this.thingToInvoke) { + wf.executeChild(this.thingToInvoke.workflow, { + args: update + ...this.thingToInvoke.workflowOptions + }); + } else { + throw new Error('No thing to invoke: ' + JSON.stringify(this.thingToInvoke)) + } + } + } + } catch (err) { + if (wf.isCancellation(err)) { + await wf.CancellationScope.nonCancellable(async () => { + await this.cleanup(); + }); + } + throw err; + } + await wf.continueAsNew(input, false); + } +} \ No newline at end of file From 4de18571994371e336e420f969486f690c509e91 Mon Sep 17 00:00:00 2001 From: sw-yx Date: Thu, 13 Jan 2022 08:17:19 +0800 Subject: [PATCH 2/6] add docs --- .../temporal-time-utils/EntityWorkflow.ts | 13 ++++---- packages/temporal-time-utils/README.md | 31 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts index 340b49e..253b3f9 100644 --- a/packages/temporal-time-utils/EntityWorkflow.ts +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -1,28 +1,29 @@ import * as wf from '@temporalio/workflow'; +import {SignalDefinition} from '@temporalio/common' const noop = async () => {} -type Update = any // no way to export this usefully? type ThingToInvoke = { activity: string, activityOptions: wf.ActivityOptions } | { workflow: string, workflowOptions: wf.ChildWorkflowOptions } -export const EntityUpdateSignal = wf.defineSignal<[Update]>('EntityUpdateSignal') // no real way to pass the types -export class Entity { +export class Entity { MAX_ITERATIONS: number setup: (input: Input) => Promise cleanup: () => Promise thingToInvoke: ThingToInvoke - + Signal: SignalDefinition<[Update]> + constructor(thingToInvoke: ThingToInvoke, maxIterations = 1000, setup = noop, cleanup = noop) { this.thingToInvoke = thingToInvoke; this.MAX_ITERATIONS = maxIterations // can override if needed this.setup = setup this.cleanup = cleanup + this.Signal = wf.defineSignal<[Update]>('Signal') // no real way to pass the types this.workflow = this.workflow.bind(this) } async workflow(input: Input, isContinued = false) { try { const pendingUpdates = Array(); - wf.setHandler(EntityUpdateSignal, (updateCommand: Input) => { + wf.setHandler(this.Signal, (updateCommand: Update) => { pendingUpdates.push(updateCommand); }); @@ -41,7 +42,7 @@ export class Entity { await acts[this.thingToInvoke.activity](update); } else if ('workflow' in this.thingToInvoke) { wf.executeChild(this.thingToInvoke.workflow, { - args: update + args: [update], ...this.thingToInvoke.workflowOptions }); } else { diff --git a/packages/temporal-time-utils/README.md b/packages/temporal-time-utils/README.md index 71dce80..ebefeea 100644 --- a/packages/temporal-time-utils/README.md +++ b/packages/temporal-time-utils/README.md @@ -42,6 +42,8 @@ You can consider it the next step up from `sleepUntil`. After you instantiate it with an initial datetime to wake up at, it exposes only two APIs: `then()` for you to `await`, and `.deadline` that you can set and get. ```ts +import { UpdatableTimer } from "temporal-time-utils"; + // example usage inside workflow function export async function countdownWorkflow(initialDeadline: Date): Promise { const timer = new UpdatableTimer(initialDeadline); @@ -69,6 +71,8 @@ See example usage inside of `/apps/fixture`: - https://github.com/sw-yx/temporal-time-utils/blob/main/apps/fixture/src/workflows.ts#L5 necessary export for Worker to pick it up ```ts +import { ScheduleWorkflow } from "temporal-time-utils"; + // inside client file async function run() { const client = new WorkflowClient(); @@ -145,3 +149,30 @@ await handle.signal(stateSignal, "RUNNING" as ScheduleWorkflowState); // resume await handle.cancel(); // stop schedule workflow completely await handle.query(stateQuery); // get wf state (running, paused, or stopped) ``` + +## `Entity` + +This workflow manages continue as new for you. + +```ts +import { Entity } from "temporal-time-utils"; + +interface Input { /* define your workflow input type here */ } +interface Update { /* define your workflow update type here */ } +const entity = new Entity({ + activity: 'MyActivityName' + activityOptions: { + startToCloseTimeout: '1 minute', + } +}) +const handle = await client.start(entity.workflow, { + args: [{ + inputValue: 'initial' + }], + taskQueue: "scheduler", + workflowId: "schedule-for-" + userId, +}); + +// during signaling updates +await client.Signal(entity.Signal, 'new thing') +``` From d5eb26200bb049893c723dd59b4cbc51787cdd67 Mon Sep 17 00:00:00 2001 From: sw-yx Date: Thu, 13 Jan 2022 08:19:37 +0800 Subject: [PATCH 3/6] minor tweaks --- packages/temporal-time-utils/EntityWorkflow.ts | 2 +- packages/temporal-time-utils/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts index 253b3f9..1413c4f 100644 --- a/packages/temporal-time-utils/EntityWorkflow.ts +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -58,6 +58,6 @@ export class Entity { } throw err; } - await wf.continueAsNew(input, false); + await wf.continueAsNew(input, true); } } \ No newline at end of file diff --git a/packages/temporal-time-utils/README.md b/packages/temporal-time-utils/README.md index ebefeea..9aa6fa1 100644 --- a/packages/temporal-time-utils/README.md +++ b/packages/temporal-time-utils/README.md @@ -174,5 +174,5 @@ const handle = await client.start(entity.workflow, { }); // during signaling updates -await client.Signal(entity.Signal, 'new thing') +await client.Signal(entity.Signal, { increment: 1 }) ``` From 65e02d89cb180c42c88e7a13b7ae77a8b457a910 Mon Sep 17 00:00:00 2001 From: sw-yx Date: Thu, 13 Jan 2022 08:27:44 +0800 Subject: [PATCH 4/6] add query --- .../temporal-time-utils/EntityWorkflow.ts | 79 +++++++++++-------- packages/temporal-time-utils/README.md | 3 +- 2 files changed, 47 insertions(+), 35 deletions(-) diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts index 1413c4f..fb3f385 100644 --- a/packages/temporal-time-utils/EntityWorkflow.ts +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -1,23 +1,31 @@ -import * as wf from '@temporalio/workflow'; -import {SignalDefinition} from '@temporalio/common' +import * as wf from "@temporalio/workflow"; +import { SignalDefinition, QueryDefinition } from "@temporalio/common"; -const noop = async () => {} -type ThingToInvoke = { activity: string, activityOptions: wf.ActivityOptions } -| { workflow: string, workflowOptions: wf.ChildWorkflowOptions } +const noop = async () => {}; +type ThingToInvoke = + | { activity: string; activityOptions: wf.ActivityOptions } + | { workflow: string; workflowOptions: wf.ChildWorkflowOptions }; export class Entity { - MAX_ITERATIONS: number - setup: (input: Input) => Promise - cleanup: () => Promise - thingToInvoke: ThingToInvoke - Signal: SignalDefinition<[Update]> - - constructor(thingToInvoke: ThingToInvoke, maxIterations = 1000, setup = noop, cleanup = noop) { + MAX_ITERATIONS: number; + setup: (input: Input) => Promise; + cleanup: () => Promise; + thingToInvoke: ThingToInvoke; + Signal: SignalDefinition<[Update]>; + Query: QueryDefinition; + + constructor( + thingToInvoke: ThingToInvoke, + maxIterations = 1000, + setup = noop, + cleanup = noop + ) { this.thingToInvoke = thingToInvoke; - this.MAX_ITERATIONS = maxIterations // can override if needed - this.setup = setup - this.cleanup = cleanup - this.Signal = wf.defineSignal<[Update]>('Signal') // no real way to pass the types - this.workflow = this.workflow.bind(this) + this.MAX_ITERATIONS = maxIterations; // can override if needed + this.setup = setup; + this.cleanup = cleanup; + this.Signal = wf.defineSignal<[Update]>("EntitySignal"); // no real way to pass the types + this.Query = wf.defineQuery<[Update]>("EntityQuery"); // no real way to pass the types + this.workflow = this.workflow.bind(this); } async workflow(input: Input, isContinued = false) { @@ -26,29 +34,32 @@ export class Entity { wf.setHandler(this.Signal, (updateCommand: Update) => { pendingUpdates.push(updateCommand); }); + wf.setHandler(this.Query, () => pendingUpdates); if (!isContinued) { await this.setup(input); } for (let iteration = 1; iteration <= this.MAX_ITERATIONS; ++iteration) { - // Automatically continue as new after a day if no updates were received - await wf.condition(() => pendingUpdates.length > 0, '1 day'); - - while (pendingUpdates.length) { - const update = pendingUpdates.shift(); - if ('activity' in this.thingToInvoke) { - const acts = wf.proxyActivities(this.thingToInvoke.activityOptions); - await acts[this.thingToInvoke.activity](update); - } else if ('workflow' in this.thingToInvoke) { - wf.executeChild(this.thingToInvoke.workflow, { - args: [update], - ...this.thingToInvoke.workflowOptions - }); - } else { - throw new Error('No thing to invoke: ' + JSON.stringify(this.thingToInvoke)) - } + // Automatically continue as new after a day if no updates were received + await wf.condition(() => pendingUpdates.length > 0, "1 day"); + + while (pendingUpdates.length) { + const update = pendingUpdates.shift(); + if ("activity" in this.thingToInvoke) { + const acts = wf.proxyActivities(this.thingToInvoke.activityOptions); + await acts[this.thingToInvoke.activity](update); + } else if ("workflow" in this.thingToInvoke) { + wf.executeChild(this.thingToInvoke.workflow, { + args: [update], + ...this.thingToInvoke.workflowOptions, + }); + } else { + throw new Error( + "No thing to invoke: " + JSON.stringify(this.thingToInvoke) + ); } + } } } catch (err) { if (wf.isCancellation(err)) { @@ -60,4 +71,4 @@ export class Entity { } await wf.continueAsNew(input, true); } -} \ No newline at end of file +} diff --git a/packages/temporal-time-utils/README.md b/packages/temporal-time-utils/README.md index 9aa6fa1..af26203 100644 --- a/packages/temporal-time-utils/README.md +++ b/packages/temporal-time-utils/README.md @@ -152,7 +152,7 @@ await handle.query(stateQuery); // get wf state (running, paused, or stopped) ## `Entity` -This workflow manages continue as new for you. +This special class packages an indefinitely long lived Workflow and the Signal and Query that go with updating it. It correctly handles the pending Signals and `continueAsNew`, and calls `continueAsNew` at least once a day as recommended by Temporal. ```ts import { Entity } from "temporal-time-utils"; @@ -175,4 +175,5 @@ const handle = await client.start(entity.workflow, { // during signaling updates await client.Signal(entity.Signal, { increment: 1 }) +console.log(await client.Query(entity.Query)) ``` From 2f5feffa633f1d31121a99098a6b0dac84b48a35 Mon Sep 17 00:00:00 2001 From: sw-yx Date: Thu, 13 Jan 2022 08:58:25 +0800 Subject: [PATCH 5/6] add await --- packages/temporal-time-utils/EntityWorkflow.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts index fb3f385..01905a4 100644 --- a/packages/temporal-time-utils/EntityWorkflow.ts +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -50,7 +50,7 @@ export class Entity { const acts = wf.proxyActivities(this.thingToInvoke.activityOptions); await acts[this.thingToInvoke.activity](update); } else if ("workflow" in this.thingToInvoke) { - wf.executeChild(this.thingToInvoke.workflow, { + await wf.executeChild(this.thingToInvoke.workflow, { args: [update], ...this.thingToInvoke.workflowOptions, }); From 42d3e5c7cac87eca992ef8de978982fd929a98d2 Mon Sep 17 00:00:00 2001 From: sw-yx Date: Wed, 19 Jan 2022 04:25:25 +0800 Subject: [PATCH 6/6] incorporate roey suggestions --- .../temporal-time-utils/EntityWorkflow.ts | 67 +++++++++---------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/packages/temporal-time-utils/EntityWorkflow.ts b/packages/temporal-time-utils/EntityWorkflow.ts index 01905a4..dd587b5 100644 --- a/packages/temporal-time-utils/EntityWorkflow.ts +++ b/packages/temporal-time-utils/EntityWorkflow.ts @@ -2,69 +2,62 @@ import * as wf from "@temporalio/workflow"; import { SignalDefinition, QueryDefinition } from "@temporalio/common"; const noop = async () => {}; -type ThingToInvoke = - | { activity: string; activityOptions: wf.ActivityOptions } - | { workflow: string; workflowOptions: wf.ChildWorkflowOptions }; -export class Entity { - MAX_ITERATIONS: number; +type EntityOptions = { + maxIterations: number; + timeToContinue: number | string +} +export class Entity { + options: EntityOptions; setup: (input: Input) => Promise; - cleanup: () => Promise; - thingToInvoke: ThingToInvoke; - Signal: SignalDefinition<[Update]>; - Query: QueryDefinition; + cleanup: (state?: State) => Promise; + updateCallback: (input?: Update) => Promise; + signal: SignalDefinition<[Update]>; + state: State + query: QueryDefinition; constructor( - thingToInvoke: ThingToInvoke, - maxIterations = 1000, + updateCallback = noop, + initialState = 'No initial state specified for Entity' as State, setup = noop, - cleanup = noop + cleanup = noop, + options: EntityOptions ) { - this.thingToInvoke = thingToInvoke; - this.MAX_ITERATIONS = maxIterations; // can override if needed + this.state = initialState; + this.updateCallback = updateCallback; this.setup = setup; this.cleanup = cleanup; - this.Signal = wf.defineSignal<[Update]>("EntitySignal"); // no real way to pass the types - this.Query = wf.defineQuery<[Update]>("EntityQuery"); // no real way to pass the types + this.signal = wf.defineSignal<[Update]>("EntitySignal"); + this.query = wf.defineQuery<[Update]>("EntityStateQuery"); this.workflow = this.workflow.bind(this); + this.options = { + maxIterations: options.maxIterations || 1000, + timeToContinue: options.timeToContinue || '1 day', + } } async workflow(input: Input, isContinued = false) { try { const pendingUpdates = Array(); - wf.setHandler(this.Signal, (updateCommand: Update) => { + wf.setHandler(this.signal, (updateCommand: Update) => { pendingUpdates.push(updateCommand); }); - wf.setHandler(this.Query, () => pendingUpdates); + wf.setHandler(this.query, () => this.state); - if (!isContinued) { - await this.setup(input); - } + if (!isContinued) await this.setup(input); - for (let iteration = 1; iteration <= this.MAX_ITERATIONS; ++iteration) { + for (let iteration = 1; iteration <= this.options.maxIterations; ++iteration) { // Automatically continue as new after a day if no updates were received - await wf.condition(() => pendingUpdates.length > 0, "1 day"); + await wf.condition(() => pendingUpdates.length > 0, this.options.timeToContinue); while (pendingUpdates.length) { const update = pendingUpdates.shift(); - if ("activity" in this.thingToInvoke) { - const acts = wf.proxyActivities(this.thingToInvoke.activityOptions); - await acts[this.thingToInvoke.activity](update); - } else if ("workflow" in this.thingToInvoke) { - await wf.executeChild(this.thingToInvoke.workflow, { - args: [update], - ...this.thingToInvoke.workflowOptions, - }); - } else { - throw new Error( - "No thing to invoke: " + JSON.stringify(this.thingToInvoke) - ); - } + await this.updateCallback(update); } } } catch (err) { if (wf.isCancellation(err)) { await wf.CancellationScope.nonCancellable(async () => { - await this.cleanup(); + await this.cleanup(this.state); }); } throw err;