From 41067c19f584858edeef5e6d1dd43e26316ef3f3 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Thu, 27 Jul 2023 10:20:05 -0700 Subject: [PATCH] stream work --- packages/core/shared/types.ts | 8 ++++ packages/core/shared/utils.ts | 41 +++++++++++++++++++++ packages/core/streams/ai-stream.ts | 26 ++++++++++--- packages/core/streams/cohere-stream.ts | 4 +- packages/core/streams/data-stream.ts | 37 +++++++++++++++++++ packages/core/streams/huggingface-stream.ts | 4 +- packages/core/streams/langchain-stream.ts | 4 +- packages/core/streams/openai-stream.ts | 21 +++++------ 8 files changed, 123 insertions(+), 22 deletions(-) create mode 100644 packages/core/streams/data-stream.ts diff --git a/packages/core/shared/types.ts b/packages/core/shared/types.ts index b1084e6..3df301a 100644 --- a/packages/core/shared/types.ts +++ b/packages/core/shared/types.ts @@ -196,3 +196,11 @@ export type UseCompletionOptions = { */ body?: object } + +export type JSONValue = + | null + | string + | number + | boolean + | { [x: string]: JSONValue } + | Array diff --git a/packages/core/shared/utils.ts b/packages/core/shared/utils.ts index 04ee65a..3b69b11 100644 --- a/packages/core/shared/utils.ts +++ b/packages/core/shared/utils.ts @@ -1,4 +1,5 @@ import { customAlphabet } from 'nanoid/non-secure' +import { JSONValue } from './types' // 7-character random string export const nanoid = customAlphabet( @@ -13,3 +14,43 @@ export function createChunkDecoder() { return decoder.decode(chunk, { stream: true }) } } + +/** + * The map of prefixes for data in the stream + * + * - 0: Text from the LLM response + * - 1: (OpenAI) function_call responses + * - 2: custom JSON added by the user using `Data` + * + * Example: + * ``` + * 0:Vercel + * 0:'s + * 0: AI + * 0: AI + * 0: SDK + * 0: is great + * 0:! + * 2: { "someJson": "value" } + * 1: {"function_call": {"name": "get_current_weather", "arguments": "{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}} + *``` + */ +export const StreamStringPrefixes = { + text: 0, + function_call: 1, + data: 2 +} as const + +/** + * Prepends a string with a prefix from the `StreamChunkPrefixes`, JSON-ifies it, and appends a new line. + */ +export const getStreamString = ( + type: keyof typeof StreamStringPrefixes, + value: JSONValue +): StreamString => + `${StreamStringPrefixes[type]}:${ + typeof value === 'string' ? value : JSON.stringify(value) + }\n` + +export type StreamString = + `${(typeof StreamStringPrefixes)[keyof typeof StreamStringPrefixes]}:${string}\n` diff --git a/packages/core/streams/ai-stream.ts b/packages/core/streams/ai-stream.ts index 91aa676..b274872 100644 --- a/packages/core/streams/ai-stream.ts +++ b/packages/core/streams/ai-stream.ts @@ -4,6 +4,8 @@ import { type ParsedEvent, type ReconnectInterval } from 'eventsource-parser' +import { Data } from './data-stream' +import { getStreamString } from '../shared/utils' export interface FunctionCallPayload { name: string @@ -18,6 +20,11 @@ export interface AIStreamCallbacks { onStart?: () => Promise | void onCompletion?: (completion: string) => Promise | void onToken?: (token: string) => Promise | void + streamData?: Data +} + +export interface AIStreamCallbacksAndOptions extends AIStreamCallbacks { + streamData?: Data } /** @@ -54,7 +61,10 @@ export function createEventStreamTransformer( if ('data' in event) { const parsedMessage = customParser(event.data) - if (parsedMessage) controller.enqueue(parsedMessage) + if (parsedMessage) + controller.enqueue( + getStreamString('text', parsedMessage) + ) } } ) @@ -75,7 +85,7 @@ export function createEventStreamTransformer( * * This function is useful when you want to process a stream of messages and perform specific actions during the stream's lifecycle. * - * @param {AIStreamCallbacks} [callbacks] - An object containing the callback functions. + * @param {AIStreamCallbacksAndOptions} [callbacksAndOptions] - An object containing the callback functions. * @return {TransformStream} A transform stream that encodes input messages as Uint8Array and allows the execution of custom logic through callbacks. * * @example @@ -86,8 +96,8 @@ export function createEventStreamTransformer( * }; * const transformer = createCallbacksTransformer(callbacks); */ -export function createCallbacksTransformer( - callbacks: AIStreamCallbacks | undefined +export function createCallbacksAndOptionsTransformer( + callbacks: AIStreamCallbacksAndOptions | undefined ): TransformStream { const textEncoder = new TextEncoder() let aggregatedResponse = '' @@ -186,9 +196,15 @@ export function AIStream( return responseBodyStream .pipeThrough(createEventStreamTransformer(customParser)) - .pipeThrough(createCallbacksTransformer(callbacks)) + .pipeThrough(createCallbacksAndOptionsTransformer(callbacks)) } +// outputs lines like +// 0: chunk +// 0: more chunk +// 1: a fct call +// z: added data from Data + /** * Creates an empty ReadableStream that immediately closes upon creation. * This function is used as a fallback for creating a ReadableStream when the response body is null or undefined, diff --git a/packages/core/streams/cohere-stream.ts b/packages/core/streams/cohere-stream.ts index b28c352..64bea98 100644 --- a/packages/core/streams/cohere-stream.ts +++ b/packages/core/streams/cohere-stream.ts @@ -1,4 +1,4 @@ -import { type AIStreamCallbacks, createCallbacksTransformer } from './ai-stream' +import { type AIStreamCallbacks, createCallbacksAndOptionsTransformer } from './ai-stream' const utf8Decoder = new TextDecoder('utf-8') @@ -64,5 +64,5 @@ export function CohereStream( reader: Response, callbacks?: AIStreamCallbacks ): ReadableStream { - return createParser(reader).pipeThrough(createCallbacksTransformer(callbacks)) + return createParser(reader).pipeThrough(createCallbacksAndOptionsTransformer(callbacks)) } diff --git a/packages/core/streams/data-stream.ts b/packages/core/streams/data-stream.ts new file mode 100644 index 0000000..76f2a68 --- /dev/null +++ b/packages/core/streams/data-stream.ts @@ -0,0 +1,37 @@ +import { JSONValue } from '../shared/types' +import { getStreamString } from '../shared/utils' + +/** + * A stream wrapper to send custom JSON-encoded data back to the client. + */ +export class Data { + private encoder = new TextEncoder() + private controller: ReadableStreamDefaultController | null = null + private stream: ReadableStream + + constructor() { + this.stream = new ReadableStream({ + start: controller => { + this.controller = controller + } + }) + } + + append(value: JSONValue, prefix: string = '0'): void { + if (!this.controller) { + throw new Error('Stream controller is not initialized.') + } + + const textEncoder = new TextEncoder() + this.controller.enqueue( + textEncoder.encode(getStreamString('text', JSON.stringify(value))) + ) + } + + close() { + if (!this.controller) return + + this.controller.close() + this.controller = null + } +} diff --git a/packages/core/streams/huggingface-stream.ts b/packages/core/streams/huggingface-stream.ts index 0b00b5b..daa10ce 100644 --- a/packages/core/streams/huggingface-stream.ts +++ b/packages/core/streams/huggingface-stream.ts @@ -1,6 +1,6 @@ import { type AIStreamCallbacks, - createCallbacksTransformer, + createCallbacksAndOptionsTransformer, trimStartOfStreamHelper } from './ai-stream' @@ -39,5 +39,5 @@ export function HuggingFaceStream( res: AsyncGenerator, callbacks?: AIStreamCallbacks ): ReadableStream { - return createParser(res).pipeThrough(createCallbacksTransformer(callbacks)) + return createParser(res).pipeThrough(createCallbacksAndOptionsTransformer(callbacks)) } diff --git a/packages/core/streams/langchain-stream.ts b/packages/core/streams/langchain-stream.ts index 10e1805..fd94b49 100644 --- a/packages/core/streams/langchain-stream.ts +++ b/packages/core/streams/langchain-stream.ts @@ -1,4 +1,4 @@ -import { type AIStreamCallbacks, createCallbacksTransformer } from './ai-stream' +import { type AIStreamCallbacks, createCallbacksAndOptionsTransformer } from './ai-stream' export function LangChainStream(callbacks?: AIStreamCallbacks) { const stream = new TransformStream() @@ -26,7 +26,7 @@ export function LangChainStream(callbacks?: AIStreamCallbacks) { } return { - stream: stream.readable.pipeThrough(createCallbacksTransformer(callbacks)), + stream: stream.readable.pipeThrough(createCallbacksAndOptionsTransformer(callbacks)), handlers: { handleLLMNewToken: async (token: string) => { await writer.ready diff --git a/packages/core/streams/openai-stream.ts b/packages/core/streams/openai-stream.ts index e614a9f..a52f369 100644 --- a/packages/core/streams/openai-stream.ts +++ b/packages/core/streams/openai-stream.ts @@ -1,4 +1,5 @@ -import { CreateMessage } from '../shared/types' +import { CreateMessage, JSONValue } from '../shared/types' +import { getStreamString } from '../shared/utils' import { AIStream, @@ -7,14 +8,6 @@ import { FunctionCallPayload } from './ai-stream' -type JSONValue = - | null - | string - | number - | boolean - | { [x: string]: JSONValue } - | Array - export type OpenAIStreamCallbacks = AIStreamCallbacks & { /** * @example @@ -285,11 +278,17 @@ function createFunctionCallTransformer( // The user didn't do anything with the function call on the server and wants // to either do nothing or run it on the client // so we just return the function call as a message - controller.enqueue(textEncoder.encode(aggregatedResponse)) + controller.enqueue( + textEncoder.encode( + getStreamString('function_call', aggregatedResponse) + ) + ) return } else if (typeof functionResponse === 'string') { // The user returned a string, so we just return it as a message - controller.enqueue(textEncoder.encode(functionResponse)) + controller.enqueue( + textEncoder.encode(getStreamString('text', functionResponse)) + ) return }