Skip to content

Commit 4f22fdc

Browse files
committed
Introduce the chat session API and better docs organization
1 parent 36fbf46 commit 4f22fdc

File tree

3 files changed

+289
-0
lines changed

3 files changed

+289
-0
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,219 @@ class ChatMessageAccumulator {
17171717
}
17181718
}
17191719

1720+
// ---------------------------------------------------------------------------
1721+
// chat.createSession — async iterator for chat turns
1722+
// ---------------------------------------------------------------------------
1723+
1724+
export type ChatSessionOptions = {
1725+
/** Run-level cancel signal (from task context). */
1726+
signal: AbortSignal;
1727+
/** Seconds to stay warm between turns before suspending. @default 30 */
1728+
warmTimeoutInSeconds?: number;
1729+
/** Duration string for suspend timeout. @default "1h" */
1730+
timeout?: string;
1731+
/** Max turns before ending. @default 100 */
1732+
maxTurns?: number;
1733+
};
1734+
1735+
export type ChatTurn = {
1736+
/** Turn number (0-indexed). */
1737+
number: number;
1738+
/** Chat session ID. */
1739+
chatId: string;
1740+
/** What triggered this turn. */
1741+
trigger: string;
1742+
/** Client data from the transport (`metadata` field on the wire payload). */
1743+
clientData: unknown;
1744+
/** Full accumulated model messages — pass directly to `streamText`. */
1745+
messages: ModelMessage[];
1746+
/** Full accumulated UI messages — use for persistence. */
1747+
uiMessages: UIMessage[];
1748+
/** Combined stop+cancel AbortSignal (fresh each turn). */
1749+
signal: AbortSignal;
1750+
/** Whether the user stopped generation this turn. */
1751+
readonly stopped: boolean;
1752+
/** Whether this is a continuation run. */
1753+
continuation: boolean;
1754+
1755+
/**
1756+
* Easy path: pipe stream, capture response, accumulate it,
1757+
* clean up aborted parts if stopped, and write turn-complete chunk.
1758+
*/
1759+
complete(source: UIMessageStreamable): Promise<UIMessage | undefined>;
1760+
1761+
/**
1762+
* Manual path: just write turn-complete chunk.
1763+
* Use when you've already piped and accumulated manually.
1764+
*/
1765+
done(): Promise<void>;
1766+
1767+
/**
1768+
* Add the response to the accumulator manually.
1769+
* Use with `chat.pipeAndCapture` when you need control between pipe and done.
1770+
*/
1771+
addResponse(response: UIMessage): Promise<void>;
1772+
};
1773+
1774+
/**
1775+
* Create a chat session that yields turns as an async iterator.
1776+
*
1777+
* Handles: preload wait, stop signals, message accumulation, turn-complete
1778+
* signaling, and warm/suspend between turns. You control: initialization,
1779+
* model/tool selection, persistence, and any custom per-turn logic.
1780+
*
1781+
* @example
1782+
* ```ts
1783+
* import { task } from "@trigger.dev/sdk";
1784+
* import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
1785+
* import { streamText } from "ai";
1786+
* import { openai } from "@ai-sdk/openai";
1787+
*
1788+
* export const myChat = task({
1789+
* id: "my-chat",
1790+
* run: async (payload: ChatTaskWirePayload, { signal }) => {
1791+
* const session = chat.createSession(payload, { signal });
1792+
*
1793+
* for await (const turn of session) {
1794+
* const result = streamText({
1795+
* model: openai("gpt-4o"),
1796+
* messages: turn.messages,
1797+
* abortSignal: turn.signal,
1798+
* });
1799+
* await turn.complete(result);
1800+
* }
1801+
* },
1802+
* });
1803+
* ```
1804+
*/
1805+
function createChatSession(
1806+
payload: ChatTaskWirePayload,
1807+
options: ChatSessionOptions
1808+
): AsyncIterable<ChatTurn> {
1809+
const {
1810+
signal: runSignal,
1811+
warmTimeoutInSeconds = 30,
1812+
timeout = "1h",
1813+
maxTurns = 100,
1814+
} = options;
1815+
1816+
return {
1817+
[Symbol.asyncIterator]() {
1818+
let currentPayload = payload;
1819+
let turn = -1;
1820+
const stop = createStopSignal();
1821+
const accumulator = new ChatMessageAccumulator();
1822+
1823+
return {
1824+
async next(): Promise<IteratorResult<ChatTurn>> {
1825+
turn++;
1826+
1827+
// First turn: handle preload — wait for the first real message
1828+
if (turn === 0 && currentPayload.trigger === "preload") {
1829+
const result = await messagesInput.waitWithWarmup({
1830+
warmTimeoutInSeconds: currentPayload.warmTimeoutInSeconds ?? warmTimeoutInSeconds,
1831+
timeout,
1832+
spanName: "waiting for first message",
1833+
});
1834+
if (!result.ok || runSignal.aborted) {
1835+
stop.cleanup();
1836+
return { done: true, value: undefined };
1837+
}
1838+
currentPayload = result.output;
1839+
}
1840+
1841+
// Subsequent turns: wait for the next message
1842+
if (turn > 0) {
1843+
const next = await messagesInput.waitWithWarmup({
1844+
warmTimeoutInSeconds,
1845+
timeout,
1846+
spanName: "waiting for next message",
1847+
});
1848+
if (!next.ok || runSignal.aborted) {
1849+
stop.cleanup();
1850+
return { done: true, value: undefined };
1851+
}
1852+
currentPayload = next.output;
1853+
}
1854+
1855+
// Check limits
1856+
if (turn >= maxTurns || runSignal.aborted) {
1857+
stop.cleanup();
1858+
return { done: true, value: undefined };
1859+
}
1860+
1861+
// Reset stop signal for this turn
1862+
stop.reset();
1863+
1864+
// Accumulate messages
1865+
const messages = await accumulator.addIncoming(
1866+
currentPayload.messages,
1867+
currentPayload.trigger,
1868+
turn,
1869+
);
1870+
1871+
const combinedSignal = AbortSignal.any([runSignal, stop.signal]);
1872+
1873+
const turnObj: ChatTurn = {
1874+
number: turn,
1875+
chatId: currentPayload.chatId,
1876+
trigger: currentPayload.trigger,
1877+
clientData: currentPayload.metadata,
1878+
messages,
1879+
uiMessages: accumulator.uiMessages,
1880+
signal: combinedSignal,
1881+
get stopped() { return stop.signal.aborted && !runSignal.aborted; },
1882+
continuation: currentPayload.continuation ?? false,
1883+
1884+
async complete(source: UIMessageStreamable) {
1885+
let response: UIMessage | undefined;
1886+
try {
1887+
response = await pipeChatAndCapture(source, { signal: combinedSignal });
1888+
} catch (error) {
1889+
if (error instanceof Error && error.name === "AbortError") {
1890+
if (runSignal.aborted) {
1891+
// Full cancel — don't accumulate
1892+
await chatWriteTurnComplete();
1893+
return undefined;
1894+
}
1895+
// Stop — fall through to accumulate partial response
1896+
} else {
1897+
throw error;
1898+
}
1899+
}
1900+
1901+
if (response) {
1902+
const cleaned = (stop.signal.aborted && !runSignal.aborted)
1903+
? cleanupAbortedParts(response)
1904+
: response;
1905+
await accumulator.addResponse(cleaned);
1906+
}
1907+
1908+
await chatWriteTurnComplete();
1909+
return response;
1910+
},
1911+
1912+
async addResponse(response: UIMessage) {
1913+
await accumulator.addResponse(response);
1914+
},
1915+
1916+
async done() {
1917+
await chatWriteTurnComplete();
1918+
},
1919+
};
1920+
1921+
return { done: false, value: turnObj };
1922+
},
1923+
1924+
async return() {
1925+
stop.cleanup();
1926+
return { done: true, value: undefined };
1927+
},
1928+
};
1929+
},
1930+
};
1931+
}
1932+
17201933
// ---------------------------------------------------------------------------
17211934
// chat.local — per-run typed data with Proxy access
17221935
// ---------------------------------------------------------------------------
@@ -1985,6 +2198,8 @@ export const chat = {
19852198
pipeAndCapture: pipeChatAndCapture,
19862199
/** Message accumulator class for raw task chat. See {@link ChatMessageAccumulator}. */
19872200
MessageAccumulator: ChatMessageAccumulator,
2201+
/** Create a chat session (async iterator). See {@link createChatSession}. */
2202+
createSession: createChatSession,
19882203
};
19892204

19902205
/**

references/ai-chat/src/components/chat-sidebar.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ export function ChatSidebar({
121121
>
122122
<option value="ai-chat">ai-chat (chat.task)</option>
123123
<option value="ai-chat-raw">ai-chat-raw (raw task)</option>
124+
<option value="ai-chat-session">ai-chat-session (session)</option>
124125
</select>
125126
</div>
126127
</div>

references/ai-chat/src/trigger/chat.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,3 +563,76 @@ export const aiChatRaw = task({
563563
stop.cleanup();
564564
},
565565
});
566+
567+
// --------------------------------------------------------------------------
568+
// Session iterator version — middle ground between chat.task and raw task
569+
// --------------------------------------------------------------------------
570+
571+
export const aiChatSession = task({
572+
id: "ai-chat-session",
573+
run: async (payload: ChatTaskWirePayload, { signal }) => {
574+
const clientData = payload.metadata as { userId: string; model?: string } | undefined;
575+
576+
// One-time init — just code at the top, no hooks needed
577+
if (clientData) {
578+
await initUserContext(clientData.userId, payload.chatId, clientData.model);
579+
}
580+
581+
const session = chat.createSession(payload, {
582+
signal,
583+
warmTimeoutInSeconds: payload.warmTimeoutInSeconds ?? 60,
584+
timeout: "1h",
585+
});
586+
587+
for await (const turn of session) {
588+
const turnClientData = (turn.clientData ?? clientData) as
589+
| { userId: string; model?: string }
590+
| undefined;
591+
592+
userContext.messageCount++;
593+
if (turnClientData?.model) userContext.preferredModel = turnClientData.model;
594+
595+
const modelId = turnClientData?.model ?? userContext.preferredModel ?? undefined;
596+
const useReasoning = REASONING_MODELS.has(modelId ?? DEFAULT_MODEL);
597+
598+
const result = streamText({
599+
model: getModel(modelId),
600+
system: `You are a helpful assistant for ${userContext.name} (${userContext.plan} plan). Be concise and friendly.`,
601+
messages: turn.messages,
602+
tools: {
603+
inspectEnvironment,
604+
webFetch,
605+
deepResearch: ai.tool(deepResearch),
606+
},
607+
stopWhen: stepCountIs(10),
608+
abortSignal: turn.signal,
609+
providerOptions: {
610+
openai: { user: turnClientData?.userId },
611+
anthropic: {
612+
metadata: { user_id: turnClientData?.userId },
613+
...(useReasoning ? { thinking: { type: "enabled", budgetTokens: 10000 } } : {}),
614+
},
615+
},
616+
experimental_telemetry: { isEnabled: true },
617+
});
618+
619+
await turn.complete(result);
620+
621+
// Persist after each turn
622+
await prisma.chat.update({
623+
where: { id: turn.chatId },
624+
data: { messages: turn.uiMessages as any },
625+
});
626+
627+
if (userContext.hasChanged()) {
628+
await prisma.user.update({
629+
where: { id: userContext.userId },
630+
data: {
631+
messageCount: userContext.messageCount,
632+
preferredModel: userContext.preferredModel,
633+
},
634+
});
635+
}
636+
}
637+
},
638+
});

0 commit comments

Comments
 (0)