Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions packages/temporal-time-utils/EntityWorkflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import * as wf from "@temporalio/workflow";
import { SignalDefinition, QueryDefinition } from "@temporalio/common";

const noop = async () => {};
type EntityOptions = {
maxIterations: number;
timeToContinue: number | string
}
export class Entity<Input = any, Update = any, State extends string = any> {
options: EntityOptions;
setup: (input: Input) => Promise<void>;
cleanup: (state?: State) => Promise<void>;
updateCallback: (input?: Update) => Promise<void | State>;
signal: SignalDefinition<[Update]>;
state: State
query: QueryDefinition<any>;

constructor(
updateCallback = noop,
initialState = 'No initial state specified for Entity' as State,
setup = noop,
cleanup = noop,
options: EntityOptions
) {
this.state = initialState;
this.updateCallback = updateCallback;
this.setup = setup;
this.cleanup = cleanup;
this.signal = wf.defineSignal<[Update]>("EntitySignal");
this.query = wf.defineQuery<[Update]>("EntityStateQuery");
this.workflow = this.workflow.bind(this);
this.options = {
maxIterations: options.maxIterations || 1000,
timeToContinue: options.timeToContinue || '1 day',
}
}

async workflow(input: Input, isContinued = false) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's the worker going to know the name of this workflow?
You need to export a workflow function per entity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the biggest question. perhaps i can attach a name on that workflow

try {
const pendingUpdates = Array<Update>();
wf.setHandler(this.signal, (updateCommand: Update) => {
pendingUpdates.push(updateCommand);
});
wf.setHandler(this.query, () => this.state);

if (!isContinued) await this.setup(input);

for (let iteration = 1; iteration <= this.options.maxIterations; ++iteration) {
// Automatically continue as new after a day if no updates were received
await wf.condition(() => pendingUpdates.length > 0, this.options.timeToContinue);

while (pendingUpdates.length) {
const update = pendingUpdates.shift();
await this.updateCallback(update);
}
}
} catch (err) {
if (wf.isCancellation(err)) {
await wf.CancellationScope.nonCancellable(async () => {
await this.cleanup(this.state);
});
}
throw err;
}
await wf.continueAsNew<typeof this.workflow>(input, true);
}
}
32 changes: 32 additions & 0 deletions packages/temporal-time-utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ You can consider it the next step up from `sleepUntil`.
After you instantiate it with an initial datetime to wake up at, it exposes only two APIs: `then()` for you to `await`, and `.deadline` that you can set and get.

```ts
import { UpdatableTimer } from "temporal-time-utils";

// example usage inside workflow function
export async function countdownWorkflow(initialDeadline: Date): Promise<void> {
const timer = new UpdatableTimer(initialDeadline);
Expand Down Expand Up @@ -69,6 +71,8 @@ See example usage inside of `/apps/fixture`:
- https://github.com/sw-yx/temporal-time-utils/blob/main/apps/fixture/src/workflows.ts#L5 necessary export for Worker to pick it up

```ts
import { ScheduleWorkflow } from "temporal-time-utils";

// inside client file
async function run() {
const client = new WorkflowClient();
Expand Down Expand Up @@ -145,3 +149,31 @@ await handle.signal(stateSignal, "RUNNING" as ScheduleWorkflowState); // resume
await handle.cancel(); // stop schedule workflow completely
await handle.query(stateQuery); // get wf state (running, paused, or stopped)
```

## `Entity`

This special class packages an indefinitely long lived Workflow and the Signal and Query that go with updating it. It correctly handles the pending Signals and `continueAsNew`, and calls `continueAsNew` at least once a day as recommended by Temporal.

```ts
import { Entity } from "temporal-time-utils";

interface Input { /* define your workflow input type here */ }
interface Update { /* define your workflow update type here */ }
const entity = new Entity<Input, Update>({
activity: 'MyActivityName'
activityOptions: {
startToCloseTimeout: '1 minute',
}
})
Comment on lines +160 to +167

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a workflow code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point

Copy link
Collaborator Author

@swyxio swyxio Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually.. if i dont make it workflow code, then i dont have to register it separately. it could be very interesting.

const handle = await client.start(entity.workflow, {
args: [{
inputValue: 'initial'
}],
taskQueue: "scheduler",
workflowId: "schedule-for-" + userId,
});

// during signaling updates
await client.Signal(entity.Signal, { increment: 1 })
console.log(await client.Query(entity.Query))
Comment on lines +177 to +178
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to be fancy we can even collapse this to entity.signal() and entity.query() by absorbing the client calls into Entity, but not sure if there would be significant impact on workflow bundle size.

```