Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/actors/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import { AnyActorRef, AnyEventObject, assign, enqueueActions, sendTo } from 'xstate';
import { AnyActorSystem } from 'xstate/dist/declarations/src/system';
import { workerEvents } from './worker/events';

type ContextWithReturnAddress = { returnAddress: AnyActorRef };

Check failure on line 8 in src/actors/utils.ts

View workflow job for this annotation

GitHub Actions / compatibility-check (18)

'ContextWithReturnAddress' is defined but never used

Check failure on line 8 in src/actors/utils.ts

View workflow job for this annotation

GitHub Actions / compatibility-check (20)

'ContextWithReturnAddress' is defined but never used

Check failure on line 8 in src/actors/utils.ts

View workflow job for this annotation

GitHub Actions / compatibility-check (18)

'ContextWithReturnAddress' is defined but never used

export const sendToActor = (actor: string, event: AnyEventObject) =>
sendTo(({ system }: { system: AnyActorSystem }) => {
Expand Down Expand Up @@ -39,8 +40,28 @@

export const deadLetter = (event: AnyEventObject) => ({ type: 'DEAD_LETTER', event });

export const reply = (eventFn: (actionArgs: any, params: any) => AnyEventObject) =>
sendTo(({ context }: { context: ContextWithReturnAddress }) => context.returnAddress, eventFn);


function isInWorker() {
return (
typeof self !== 'undefined' && // 'self' exists
typeof self?.document === 'undefined' // no Window in worker
);
}

export const reply = (eventFn: (actionArgs: any, params: any) => AnyEventObject) => enqueueActions(({ enqueue, ...actionArgs }, params) => {

if (!actionArgs.context.returnAddress) {
console.warn('No return address specified in context for reply action');
return;
}
if (isInWorker()) {
console.log('reply in worker - posting message to main thread', actionArgs.context.returnAddress, eventFn(actionArgs, params));
postMessage(workerEvents.proxyEvent(eventFn(actionArgs, params), actionArgs.context.returnAddress));
return;
}
enqueue.sendTo(actionArgs.context.returnAddress, eventFn);
})

export const XSTATE_DEBUG_EVENT = 'XSTATE_DEBUG_EVENT';

Expand Down
39 changes: 32 additions & 7 deletions src/actors/worker/fromWorkerfiedActor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ export const fromWorkerfiedActor = (
worker: Worker
): WorkerActorLogic<EventObject, WorkerInput> => ({
config: Worker,




start: (state, actorScope) => {
const { self, system } = actorScope;
Expand All @@ -46,22 +49,39 @@ export const fromWorkerfiedActor = (
};

worker.addEventListener('message', (event) => {
const eventFromWorker = event.data as AnyEventObject;
if (eventFromWorker.type == 'STATE_SNAPSHOT') {
console.log('Message received from worker --> ', event,event.data.type == WORKER_EVENTS.PROXY_EVENT,WORKER_EVENTS.PROXY_EVENT);
const eventPayload = event.data;
if (eventPayload.type === WORKER_EVENTS.STATE_SNAPSHOT) {
const eventFromWorker = eventPayload as STATE_SNAPSHOT_EVENT;
self.send(eventFromWorker);
return state;
}

if (event.type === WORKER_EVENTS.PROXY_EVENT) {
const proxyEvent = event as ProxyEvent;
if (proxyEvent.data.to === 'parent' && self._parent) {
if (eventPayload.type === WORKER_EVENTS.PROXY_EVENT) {

const proxyEvent = eventPayload as ProxyEvent;
const targetActorId = proxyEvent.data.to;
const targetEvent = proxyEvent.data.event;
const isToParent = targetActorId === 'parent';
console.log('Proxy event received from worker to', targetActorId, targetEvent, isToParent);


if (isToParent && self._parent) {
console.log('Relaying to parent', proxyEvent.data);
self._parent.send(proxyEvent.data.event);
return state;
}
if (!isToParent) {
const targetActor = system.get(proxyEvent.data.to);
console.log('Relaying to system actor', proxyEvent.data, targetActor,system,self);
if (targetActor){
targetActor.send(proxyEvent.data.event);
}
return state;
}



system.get(proxyEvent.data.to).send(proxyEvent.data.event);
return state;
}
});

Expand All @@ -70,6 +90,8 @@ export const fromWorkerfiedActor = (
transition: (state, event, actorScope) => {
const { self } = actorScope;
const workerState = instanceStates.get(self);
console.log('fromWorkerActor transition', state, event, actorScope);

if (event.type === 'xstate.stop') {
console.log('Stopping fromWorkerActor...', state, event, actorScope);
workerState.worker.postMessage(workerCommands.stopActor());
Expand All @@ -80,6 +102,9 @@ export const fromWorkerfiedActor = (
error: undefined
};
}



if (event.type == WORKER_EVENTS.STATE_SNAPSHOT) {
const snapshot = (event as STATE_SNAPSHOT_EVENT).data.snapshot as AnyMachineSnapshot;
return {
Expand Down
4 changes: 3 additions & 1 deletion src/actors/worker/workerfy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const ProxyActor = setup({
on: {
'*': {
actions: [
({ event, context }) => console.log('Proxying event', event, 'to', context.proxyToId),
({ event, context }) => console.log('Proxying actor event', event, 'to', context.proxyToId),
({ event, context }) => postMessage(workerEvents.proxyEvent(event, context.proxyToId))
]
}
Expand Down Expand Up @@ -56,6 +56,8 @@ export const workerfyActor = (actor: AnyActorLogic) => {
}
}).start();

console.log('Worker actor initialized, waiting for commands...');

addEventListener('message', (event) => {
if (event.data.type === WORKER_COMMANDS.START_ACTOR) {
actorRef = createActor(actor, {
Expand Down
Loading