Skip to content

Commit 5c25ce4

Browse files
committed
feat: add type-safe React stub streaming calls
1 parent 1861528 commit 5c25ce4

File tree

7 files changed

+189
-32
lines changed

7 files changed

+189
-32
lines changed

.changeset/nice-hairs-fetch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"agents": minor
3+
---
4+
5+
Type-safe react stub streaming calls

packages/agents/src/client.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@ export type AgentClientOptions<State = unknown> = Omit<
2828
/**
2929
* Options for streaming RPC calls
3030
*/
31-
export type StreamOptions = {
31+
export type StreamOptions<
32+
OnChunkT extends unknown | SerializableValue = unknown,
33+
OnDoneT extends unknown | SerializableValue = unknown
34+
> = {
3235
/** Called when a chunk of data is received */
33-
onChunk?: (chunk: unknown) => void;
36+
onChunk?: (chunk: OnChunkT) => void;
3437
/** Called when the stream ends */
35-
onDone?: (finalChunk: unknown) => void;
38+
onDone?: (finalChunk: OnDoneT) => void;
3639
/** Called when an error occurs */
3740
onError?: (error: string) => void;
3841
};

packages/agents/src/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import type { TransportType } from "./mcp/types";
2929
import { genericObservability, type Observability } from "./observability";
3030
import { DisposableStore } from "./core/events";
3131
import { MessageType } from "./ai-types";
32+
import type { SerializableValue } from "./serializable";
3233

3334
export type { Connection, ConnectionContext, WSMessage } from "partyserver";
3435

@@ -1982,7 +1983,10 @@ export async function getAgentByName<
19821983
/**
19831984
* A wrapper for streaming responses in callable methods
19841985
*/
1985-
export class StreamingResponse {
1986+
export class StreamingResponse<
1987+
OnChunkT extends SerializableValue | unknown = unknown,
1988+
OnDoneT extends SerializableValue | unknown = unknown
1989+
> {
19861990
private _connection: Connection;
19871991
private _id: string;
19881992
private _closed = false;
@@ -1996,7 +2000,7 @@ export class StreamingResponse {
19962000
* Send a chunk of data to the client
19972001
* @param chunk The data to send
19982002
*/
1999-
send(chunk: unknown) {
2003+
send(chunk: OnChunkT) {
20002004
if (this._closed) {
20012005
throw new Error("StreamingResponse is already closed");
20022006
}
@@ -2014,7 +2018,7 @@ export class StreamingResponse {
20142018
* End the stream and send the final chunk (if any)
20152019
* @param finalChunk Optional final chunk of data to send
20162020
*/
2017-
end(finalChunk?: unknown) {
2021+
end(finalChunk?: OnDoneT) {
20182022
if (this._closed) {
20192023
throw new Error("StreamingResponse is already closed");
20202024
}

packages/agents/src/react.tsx

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@ import { usePartySocket } from "partysocket/react";
33
import { useCallback, useRef, use, useMemo, useEffect } from "react";
44
import type { Agent, MCPServersState, RPCRequest, RPCResponse } from "./";
55
import type { StreamOptions } from "./client";
6-
import type { Method, RPCMethod } from "./serializable";
76
import { MessageType } from "./ai-types";
7+
import type {
8+
AllSerializableValues,
9+
SerializableReturnValue,
10+
SerializableValue
11+
} from "./serializable";
812

913
/**
1014
* Convert a camelCase string to a kebab-case string
@@ -130,17 +134,68 @@ export type UseAgentOptions<State = unknown> = Omit<
130134
onMcpUpdate?: (mcpServers: MCPServersState) => void;
131135
};
132136

137+
// biome-ignore lint: suppressions/parse
138+
type Method = (...args: any[]) => any;
139+
140+
type NonStreamingRPCMethod<T extends Method> =
141+
AllSerializableValues<Parameters<T>> extends true
142+
? ReturnType<T> extends SerializableReturnValue
143+
? T
144+
: never
145+
: never;
146+
147+
interface StreamingResponse<
148+
OnChunkT extends SerializableValue | unknown = unknown,
149+
OnDoneT extends SerializableValue | unknown = unknown
150+
> {
151+
send(chunk: OnChunkT): void;
152+
end(finalChunk?: OnDoneT): void;
153+
}
154+
155+
type StreamingRPCMethod<T extends Method> = T extends (
156+
arg: infer A,
157+
...rest: infer R
158+
) => void | Promise<void>
159+
? A extends StreamingResponse<SerializableValue, SerializableValue>
160+
? AllSerializableValues<R> extends true
161+
? T
162+
: never
163+
: never
164+
: never;
165+
166+
type RPCMethod<T extends Method> =
167+
T extends NonStreamingRPCMethod<T>
168+
? NonStreamingRPCMethod<T>
169+
: T extends StreamingRPCMethod<T>
170+
? StreamingRPCMethod<T>
171+
: never;
172+
173+
type RPCMethods<T> = {
174+
[K in keyof T as T[K] extends Method ? K : never]: T[K] extends Method
175+
? RPCMethod<T[K]>
176+
: never;
177+
};
178+
133179
type AllOptional<T> = T extends [infer A, ...infer R]
134180
? undefined extends A
135181
? AllOptional<R>
136182
: false
137183
: true; // no params means optional by default
138184

139-
type RPCMethods<T> = {
140-
[K in keyof T as T[K] extends RPCMethod<T[K]> ? K : never]: RPCMethod<T[K]>;
141-
};
185+
type StreamOptionsFrom<StreamingResponseT> =
186+
StreamingResponseT extends StreamingResponse<
187+
infer T extends SerializableValue,
188+
infer U extends SerializableValue
189+
>
190+
? StreamOptions<T, U>
191+
: never;
192+
193+
type RestParameters<T extends Method> =
194+
Parameters<StreamingRPCMethod<T>> extends [unknown, ...infer Rest]
195+
? Rest
196+
: never;
142197

143-
type OptionalParametersMethod<T extends RPCMethod> =
198+
type OptionalParametersMethod<T extends Method> =
144199
AllOptional<Parameters<T>> extends true ? T : never;
145200

146201
// all methods of the Agent, excluding the ones that are declared in the base Agent class
@@ -160,6 +215,14 @@ type RequiredAgentMethods<T> = Omit<
160215
keyof OptionalAgentMethods<T>
161216
>;
162217

218+
type StreamingAgentMethods<T> = {
219+
[K in keyof AgentMethods<T> as AgentMethods<T>[K] extends StreamingRPCMethod<
220+
AgentMethods<T>[K]
221+
>
222+
? K
223+
: never]: StreamingRPCMethod<AgentMethods<T>[K]>;
224+
};
225+
163226
type AgentPromiseReturnType<T, K extends keyof AgentMethods<T>> =
164227
// biome-ignore lint: suppressions/parse
165228
ReturnType<AgentMethods<T>[K]> extends Promise<any>
@@ -182,7 +245,18 @@ type RequiredArgsAgentMethodCall<AgentT> = <
182245
streamOptions?: StreamOptions
183246
) => AgentPromiseReturnType<AgentT, K>;
184247

185-
type AgentMethodCall<AgentT> = OptionalArgsAgentMethodCall<AgentT> &
248+
type StreamingAgentMethodCall<AgentT> = <
249+
K extends keyof StreamingAgentMethods<AgentT>
250+
>(
251+
method: K,
252+
args: RestParameters<StreamingAgentMethods<AgentT>[K]>,
253+
streamOptions: StreamOptionsFrom<
254+
Parameters<StreamingAgentMethods<AgentT>[K]>[0]
255+
>
256+
) => void;
257+
258+
type AgentMethodCall<AgentT> = StreamingAgentMethodCall<AgentT> &
259+
OptionalArgsAgentMethodCall<AgentT> &
186260
RequiredArgsAgentMethodCall<AgentT>;
187261

188262
type UntypedAgentMethodCall = <T = unknown>(
@@ -192,9 +266,16 @@ type UntypedAgentMethodCall = <T = unknown>(
192266
) => Promise<T>;
193267

194268
type AgentStub<T> = {
195-
[K in keyof AgentMethods<T>]: (
196-
...args: Parameters<AgentMethods<T>[K]>
197-
) => AgentPromiseReturnType<AgentMethods<T>, K>;
269+
[K in keyof AgentMethods<T>]: AgentMethods<T>[K] extends StreamingRPCMethod<
270+
AgentMethods<T>[K]
271+
>
272+
? (
273+
options: StreamOptionsFrom<Parameters<AgentMethods<T>[K]>[0]>,
274+
...args: RestParameters<AgentMethods<T>[K]>
275+
) => void
276+
: (
277+
...args: Parameters<AgentMethods<T>[K]>
278+
) => AgentPromiseReturnType<AgentMethods<T>, K>;
198279
};
199280

200281
// we neet to use Method instead of RPCMethod here for retro-compatibility
@@ -232,7 +313,7 @@ export function useAgent<State>(
232313
agent: string;
233314
name: string;
234315
setState: (state: State) => void;
235-
call: UntypedAgentMethodCall | AgentMethodCall<unknown>;
316+
call: UntypedAgentMethodCall;
236317
stub: UntypedAgentStub;
237318
} {
238319
const agentNamespace = camelCaseToKebabCase(options.agent);

packages/agents/src/serializable.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,8 @@ export type SerializableReturnValue =
1313
| Promise<SerializableValue>
1414
| Promise<void>;
1515

16-
type AllSerializableValues<A> = A extends [infer First, ...infer Rest]
16+
export type AllSerializableValues<A> = A extends [infer First, ...infer Rest]
1717
? First extends SerializableValue
1818
? AllSerializableValues<Rest>
1919
: false
2020
: true; // no params means serializable by default
21-
22-
// biome-ignore lint: suspicious/noExplicitAny
23-
export type Method = (...args: any[]) => any;
24-
25-
export type RPCMethod<T = Method> = T extends Method
26-
? T extends (...arg: infer A) => infer R
27-
? AllSerializableValues<A> extends true
28-
? R extends SerializableReturnValue
29-
? T
30-
: never
31-
: never
32-
: never
33-
: never;

packages/agents/src/tests-d/example-stub.test-d.ts

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/** biome-ignore-all lint/correctness/useHookAtTopLevel: testing types */
22
import type { env } from "cloudflare:workers";
3-
import { Agent, callable } from "..";
3+
import { Agent, callable, type StreamingResponse } from "..";
44
import { useAgent } from "../react.tsx";
5+
import type { StreamOptions } from "../client.ts";
56

67
class MyAgent extends Agent<typeof env, {}> {
78
@callable()
@@ -18,6 +19,42 @@ class MyAgent extends Agent<typeof env, {}> {
1819
nonRpc(): void {
1920
// do something
2021
}
22+
23+
@callable({ streaming: true })
24+
performStream(
25+
_options: StreamingResponse<number, boolean>,
26+
_other: string
27+
): void {
28+
// do something
29+
}
30+
31+
// TODO should fail, first argument is not a streamOptions
32+
@callable({ streaming: true })
33+
performStreamFirstArgNotStreamOptions(
34+
_other: string,
35+
_options: StreamingResponse<number, boolean>
36+
): void {
37+
// do something
38+
}
39+
40+
// TODO should fail, should be marked as streaming
41+
@callable()
42+
performStreamFail(_options: StreamingResponse): void {
43+
// do something
44+
}
45+
46+
// TODO should fail, has no streamOptions
47+
@callable({ streaming: true })
48+
async performFail(_task: string): Promise<string> {
49+
// do something
50+
return "";
51+
}
52+
53+
@callable({ streaming: true })
54+
performStreamUnserializable(options: StreamingResponse<Date>): void {
55+
// @ts-expect-error parameter is not serializable
56+
options.onDone(new Date());
57+
}
2158
}
2259

2360
const { stub } = useAgent<MyAgent, {}>({ agent: "my-agent" });
@@ -39,9 +76,26 @@ await stub.nonRpc();
3976
// @ts-expect-error nonSerializable is not serializable
4077
await stub.nonSerializable("hello", new Date());
4178

79+
const streamOptions: StreamOptions<number, boolean> = {};
80+
81+
// biome-ignore lint: suspicious/noConfusingVoidType
82+
stub.performStream(streamOptions, "hello") satisfies void;
83+
84+
// @ts-expect-error there's no 2nd argument
85+
stub.performStream(streamOptions, "hello", 1);
86+
87+
const invalidStreamOptions: StreamOptions<string, boolean> = {};
88+
89+
// @ts-expect-error streamOptions must be of type StreamOptions<number, boolean>
90+
stub.performStream(invalidStreamOptions, "hello");
91+
92+
// @ts-expect-error first argument is not a streamOptions
93+
stub.performStreamFirstArgNotStreamOptions("hello", streamOptions);
94+
4295
const { stub: stub2 } = useAgent<Omit<MyAgent, "nonRpc">, {}>({
4396
agent: "my-agent"
4497
});
98+
4599
stub2.sayHello();
46100
// @ts-expect-error nonRpc excluded from useAgent
47101
stub2.nonRpc();

packages/agents/src/tests-d/example.test-d.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/** biome-ignore-all lint/correctness/useHookAtTopLevel: testing types */
22
import type { env } from "cloudflare:workers";
3-
import { Agent, callable } from "..";
3+
import { Agent, callable, type StreamingResponse } from "..";
44
import { useAgent } from "../react.tsx";
5+
import type { StreamOptions } from "../client.ts";
56

67
class MyAgent extends Agent<typeof env, {}> {
78
@callable()
@@ -18,6 +19,16 @@ class MyAgent extends Agent<typeof env, {}> {
1819
nonRpc(): void {
1920
// do something
2021
}
22+
23+
@callable({ streaming: true })
24+
performStream(
25+
response: StreamingResponse<number, boolean>,
26+
_other: string
27+
): void {
28+
response.send(1);
29+
response.send(2);
30+
response.end(true);
31+
}
2132
}
2233

2334
const agent = useAgent<MyAgent, {}>({ agent: "my-agent" });
@@ -39,6 +50,18 @@ await agent.call("nonRpc");
3950
// @ts-expect-error nonSerializable is not serializable
4051
await agent.call("nonSerializable", ["hello", new Date()]);
4152

53+
const streamOptions: StreamOptions<number, boolean> = {};
54+
55+
agent.call("performStream", ["hello"], streamOptions);
56+
57+
// @ts-expect-error there's no second parameter
58+
agent.call("performStream", ["a", 1], streamOptions);
59+
60+
const invalidStreamOptions: StreamOptions<string, boolean> = {};
61+
62+
// @ts-expect-error streamOptions must be of type StreamOptions<number, boolean>
63+
agent.call("performStream", ["a", 1], invalidStreamOptions);
64+
4265
const agent2 = useAgent<Omit<MyAgent, "nonRpc">, {}>({ agent: "my-agent" });
4366
agent2.call("sayHello");
4467
// @ts-expect-error nonRpc excluded from useAgent

0 commit comments

Comments
 (0)