Skip to content

Commit 64df61f

Browse files
tyler6204steipete
authored andcommitted
feat(cron): enhance delivery handling and testing for isolated jobs
- Introduced new properties for explicit message targeting and message tool disabling in the EmbeddedRunAttemptParams type. - Updated cron job tests to validate best-effort delivery behavior and handling of delivery failures. - Added logic to clear delivery settings when switching session targets in cron jobs. - Improved the resolution of delivery failures and best-effort logic in the isolated agent's run function. This update enhances the flexibility and reliability of delivery mechanisms in isolated cron jobs, ensuring better handling of message delivery scenarios.
1 parent ef4949b commit 64df61f

File tree

9 files changed

+184
-6
lines changed

9 files changed

+184
-6
lines changed

src/agents/pi-embedded-runner/run/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ export type EmbeddedRunAttemptParams = {
7878
onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
7979
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
8080
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
81+
/** Require explicit message tool targets (no implicit last-route sends). */
82+
requireExplicitMessageTarget?: boolean;
83+
/** If true, omit the message tool from the tool list. */
84+
disableMessageTool?: boolean;
8185
extraSystemPrompt?: string;
8286
streamParams?: AgentStreamParams;
8387
ownerNumbers?: string[];

src/agents/tools/cron-tool.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ describe("cron tool", () => {
8282
expect(call.method).toBe("cron.add");
8383
expect(call.params).toEqual({
8484
name: "wake-up",
85+
enabled: true,
86+
deleteAfterRun: true,
8587
schedule: { kind: "at", at: new Date(123).toISOString() },
8688
sessionTarget: "main",
8789
wakeMode: "next-heartbeat",

src/cron/delivery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
4848
);
4949
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
5050

51-
const channel = (deliveryChannel ?? payloadChannel ?? "last") as CronMessageChannel;
51+
const channel = deliveryChannel ?? payloadChannel ?? "last";
5252
const to = deliveryTo ?? payloadTo;
5353
if (hasDelivery) {
5454
const resolvedMode = mode ?? "none";

src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,85 @@ describe("runCronIsolatedAgentTurn", () => {
224224
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
225225
});
226226
});
227+
228+
it("fails when announce delivery fails and best-effort is disabled", async () => {
229+
await withTempHome(async (home) => {
230+
const storePath = await writeSessionStore(home);
231+
const deps: CliDeps = {
232+
sendMessageWhatsApp: vi.fn(),
233+
sendMessageTelegram: vi.fn(),
234+
sendMessageDiscord: vi.fn(),
235+
sendMessageSignal: vi.fn(),
236+
sendMessageIMessage: vi.fn(),
237+
};
238+
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
239+
payloads: [{ text: "hello from cron" }],
240+
meta: {
241+
durationMs: 5,
242+
agentMeta: { sessionId: "s", provider: "p", model: "m" },
243+
},
244+
});
245+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
246+
247+
const res = await runCronIsolatedAgentTurn({
248+
cfg: makeCfg(home, storePath, {
249+
channels: { telegram: { botToken: "t-1" } },
250+
}),
251+
deps,
252+
job: {
253+
...makeJob({ kind: "agentTurn", message: "do it" }),
254+
delivery: { mode: "announce", channel: "telegram", to: "123" },
255+
},
256+
message: "do it",
257+
sessionKey: "cron:job-1",
258+
lane: "cron",
259+
});
260+
261+
expect(res.status).toBe("error");
262+
expect(res.error).toBe("cron announce delivery failed");
263+
});
264+
});
265+
266+
it("ignores announce delivery failures when best-effort is enabled", async () => {
267+
await withTempHome(async (home) => {
268+
const storePath = await writeSessionStore(home);
269+
const deps: CliDeps = {
270+
sendMessageWhatsApp: vi.fn(),
271+
sendMessageTelegram: vi.fn(),
272+
sendMessageDiscord: vi.fn(),
273+
sendMessageSignal: vi.fn(),
274+
sendMessageIMessage: vi.fn(),
275+
};
276+
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
277+
payloads: [{ text: "hello from cron" }],
278+
meta: {
279+
durationMs: 5,
280+
agentMeta: { sessionId: "s", provider: "p", model: "m" },
281+
},
282+
});
283+
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
284+
285+
const res = await runCronIsolatedAgentTurn({
286+
cfg: makeCfg(home, storePath, {
287+
channels: { telegram: { botToken: "t-1" } },
288+
}),
289+
deps,
290+
job: {
291+
...makeJob({ kind: "agentTurn", message: "do it" }),
292+
delivery: {
293+
mode: "announce",
294+
channel: "telegram",
295+
to: "123",
296+
bestEffort: true,
297+
},
298+
},
299+
message: "do it",
300+
sessionKey: "cron:job-1",
301+
lane: "cron",
302+
});
303+
304+
expect(res.status).toBe("ok");
305+
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
306+
});
307+
});
227308
});

src/cron/isolated-agent/run.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ function matchesMessagingToolDeliveryTarget(
8989
return target.to === delivery.to;
9090
}
9191

92+
function resolveCronDeliveryBestEffort(job: CronJob): boolean {
93+
if (typeof job.delivery?.bestEffort === "boolean") {
94+
return job.delivery.bestEffort;
95+
}
96+
if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") {
97+
return job.payload.bestEffortDeliver;
98+
}
99+
return false;
100+
}
101+
102+
function resolveCronDeliveryFailure(
103+
resolved: Awaited<ReturnType<typeof resolveDeliveryTarget>>,
104+
): Error | undefined {
105+
if (resolved.error) {
106+
return resolved.error;
107+
}
108+
if (!resolved.to) {
109+
return new Error("cron delivery target is missing");
110+
}
111+
return undefined;
112+
}
113+
92114
export type RunCronAgentTurnResult = {
93115
status: "ok" | "error" | "skipped";
94116
summary?: string;
@@ -428,6 +450,7 @@ export async function runCronIsolatedAgentTurn(params: {
428450
const firstText = payloads[0]?.text ?? "";
429451
const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
430452
const outputText = pickLastNonEmptyTextFromPayloads(payloads);
453+
const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job);
431454

432455
// Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content).
433456
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
@@ -444,6 +467,19 @@ export async function runCronIsolatedAgentTurn(params: {
444467
);
445468

446469
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
470+
const deliveryFailure = resolveCronDeliveryFailure(resolvedDelivery);
471+
if (deliveryFailure) {
472+
if (!deliveryBestEffort) {
473+
return {
474+
status: "error",
475+
error: deliveryFailure.message,
476+
summary,
477+
outputText,
478+
};
479+
}
480+
logWarn(`[cron:${params.job.id}] ${deliveryFailure.message}`);
481+
return { status: "ok", summary, outputText };
482+
}
447483
const requesterSessionKey = resolveAgentMainSessionKey({
448484
cfg: cfgWithAgentDefaults,
449485
agentId,
@@ -459,7 +495,7 @@ export async function runCronIsolatedAgentTurn(params: {
459495
: undefined;
460496
const outcome: SubagentRunOutcome = { status: "ok" };
461497
const taskLabel = params.job.name?.trim() || "cron job";
462-
await runSubagentAnnounceFlow({
498+
const didAnnounce = await runSubagentAnnounceFlow({
463499
childSessionKey: agentSessionKey,
464500
childRunId: cronSession.sessionEntry.sessionId,
465501
requesterSessionKey,
@@ -473,6 +509,14 @@ export async function runCronIsolatedAgentTurn(params: {
473509
label: `Cron: ${taskLabel}`,
474510
outcome,
475511
});
512+
if (!didAnnounce && !deliveryBestEffort) {
513+
return {
514+
status: "error",
515+
error: "cron announce delivery failed",
516+
summary,
517+
outputText,
518+
};
519+
}
476520
}
477521

478522
return { status: "ok", summary, outputText };

src/cron/normalize.test.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@ describe("normalizeCronJobCreate", () => {
1919
}) as unknown as Record<string, unknown>;
2020

2121
const payload = normalized.payload as Record<string, unknown>;
22-
expect(payload.channel).toBe("telegram");
22+
expect(payload.channel).toBeUndefined();
23+
expect(payload.deliver).toBeUndefined();
2324
expect("provider" in payload).toBe(false);
25+
26+
const delivery = normalized.delivery as Record<string, unknown>;
27+
expect(delivery.mode).toBe("announce");
28+
expect(delivery.channel).toBe("telegram");
29+
expect(delivery.to).toBe("7200373102");
2430
});
2531

2632
it("trims agentId and drops null", () => {
@@ -72,7 +78,13 @@ describe("normalizeCronJobCreate", () => {
7278
}) as unknown as Record<string, unknown>;
7379

7480
const payload = normalized.payload as Record<string, unknown>;
75-
expect(payload.channel).toBe("telegram");
81+
expect(payload.channel).toBeUndefined();
82+
expect(payload.deliver).toBeUndefined();
83+
84+
const delivery = normalized.delivery as Record<string, unknown>;
85+
expect(delivery.mode).toBe("announce");
86+
expect(delivery.channel).toBe("telegram");
87+
expect(delivery.to).toBe("7200373102");
7688
});
7789

7890
it("coerces ISO schedule.at to normalized ISO (UTC)", () => {

src/cron/service.jobs.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { describe, expect, it } from "vitest";
2+
import type { CronJob, CronJobPatch } from "./types.js";
3+
import { applyJobPatch } from "./service/jobs.js";
4+
5+
describe("applyJobPatch", () => {
6+
it("clears delivery when switching to main session", () => {
7+
const now = Date.now();
8+
const job: CronJob = {
9+
id: "job-1",
10+
name: "job-1",
11+
enabled: true,
12+
createdAtMs: now,
13+
updatedAtMs: now,
14+
schedule: { kind: "every", everyMs: 60_000 },
15+
sessionTarget: "isolated",
16+
wakeMode: "now",
17+
payload: { kind: "agentTurn", message: "do it" },
18+
delivery: { mode: "announce", channel: "telegram", to: "123" },
19+
state: {},
20+
};
21+
22+
const patch: CronJobPatch = {
23+
sessionTarget: "main",
24+
payload: { kind: "systemEvent", text: "ping" },
25+
};
26+
27+
expect(() => applyJobPatch(job, patch)).not.toThrow();
28+
expect(job.sessionTarget).toBe("main");
29+
expect(job.payload.kind).toBe("systemEvent");
30+
expect(job.delivery).toBeUndefined();
31+
});
32+
});

src/cron/service.store.migration.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import fs from "node:fs/promises";
22
import os from "node:os";
33
import path from "node:path";
44
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
5-
import { loadCronStore } from "../store.js";
65
import { CronService } from "./service.js";
6+
import { loadCronStore } from "./store.js";
77

88
const noopLogger = {
99
debug: vi.fn(),

src/cron/service/jobs.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
158158
if (patch.delivery) {
159159
job.delivery = mergeCronDelivery(job.delivery, patch.delivery);
160160
}
161+
if (job.sessionTarget === "main" && job.delivery) {
162+
job.delivery = undefined;
163+
}
161164
if (patch.state) {
162165
job.state = { ...job.state, ...patch.state };
163166
}
@@ -250,7 +253,7 @@ function mergeCronDelivery(
250253
};
251254

252255
if (typeof patch.mode === "string") {
253-
next.mode = patch.mode === "deliver" ? "announce" : patch.mode;
256+
next.mode = (patch.mode as string) === "deliver" ? "announce" : patch.mode;
254257
}
255258
if ("channel" in patch) {
256259
const channel = typeof patch.channel === "string" ? patch.channel.trim() : "";

0 commit comments

Comments
 (0)