Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/core/shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,23 @@ export type UseCompletionOptions = {
* })
* ```
*/

body?: object
}
=======
body?: object;
};


export type JSONValue =
| null
| string
| number
| boolean
| { [x: string]: JSONValue }

| Array<JSONValue>
=======
| Array<JSONValue>;

export type AssistantMessage = {
Expand All @@ -346,3 +354,4 @@ export type DataMessage = {
role: 'data';
data: JSONValue; // application-specific data
};

47 changes: 47 additions & 0 deletions packages/core/shared/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@

import { customAlphabet } from 'nanoid/non-secure'
import { JSONValue } from './types'

import { customAlphabet } from 'nanoid/non-secure';
import {
StreamPartType,
StreamStringPrefixes,
parseStreamPart,
} from './stream-parts';


// 7-character random string
export const nanoid = customAlphabet(
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz',
Expand Down Expand Up @@ -44,6 +49,47 @@ function createChunkDecoder(complex?: boolean) {
};
}


/**
* The map of prefixes for data in the stream
*
* - 0: Text from the LLM response
* - 1: (OpenAI) function_call responses
* - 2: custom JSON added by the user using `Data`
*
* Example:
* ```
* 0:Vercel
* 0:'s
* 0: AI
* 0: AI
* 0: SDK
* 0: is great
* 0:!
* 2: { "someJson": "value" }
* 1: {"function_call": {"name": "get_current_weather", "arguments": "{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}
*```
*/
export const StreamStringPrefixes = {
text: 0,
function_call: 1,
data: 2
} as const

/**
* Prepends a string with a prefix from the `StreamChunkPrefixes`, JSON-ifies it, and appends a new line.
*/
export const getStreamString = (
type: keyof typeof StreamStringPrefixes,
value: JSONValue
): StreamString =>
`${StreamStringPrefixes[type]}:${
typeof value === 'string' ? value : JSON.stringify(value)
}\n`

export type StreamString =
`${(typeof StreamStringPrefixes)[keyof typeof StreamStringPrefixes]}:${string}\n`

export { createChunkDecoder };

export const isStreamStringEqualToType = (
Expand All @@ -59,3 +105,4 @@ export type StreamString =
* A header sent to the client so it knows how to handle parsing the stream (as a deprecated text response or using the new prefixed protocol)
*/
export const COMPLEX_HEADER = 'X-Experimental-Stream-Data';

44 changes: 44 additions & 0 deletions packages/core/streams/ai-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ import {
createParser,
type EventSourceParser,
type ParsedEvent,

type ReconnectInterval
} from 'eventsource-parser'
import { Data } from './data-stream'
import { getStreamString } from '../shared/utils'
=======
type ReconnectInterval,
} from 'eventsource-parser';
import { OpenAIStreamCallbacks } from './openai-stream';


export interface FunctionCallPayload {
name: string;
arguments: Record<string, unknown>;
Expand All @@ -25,6 +32,17 @@ export interface ToolCallPayload {
* Configuration options and helper callback methods for AIStream stream lifecycle events.
* @interface
*/

export interface AIStreamCallbacks {
onStart?: () => Promise<void> | void
onCompletion?: (completion: string) => Promise<void> | void
onToken?: (token: string) => Promise<void> | void
streamData?: Data
}

export interface AIStreamCallbacksAndOptions extends AIStreamCallbacks {
streamData?: Data
=======
export interface AIStreamCallbacksAndOptions {
/** `onStart`: Called once when the stream is initialized. */
onStart?: () => Promise<void> | void;
Expand Down Expand Up @@ -52,6 +70,7 @@ export interface AIStreamCallbacksAndOptions {
*/
export interface AIStreamParserOptions {
event?: string;

}

/**
Expand Down Expand Up @@ -96,12 +115,20 @@ export function createEventStreamTransformer(
}

if ('data' in event) {

const parsedMessage = customParser(event.data)
if (parsedMessage)
controller.enqueue(
getStreamString('text', parsedMessage)
)

const parsedMessage = customParser
? customParser(event.data, {
event: event.event,
})
: event.data;
if (parsedMessage) controller.enqueue(parsedMessage);

}
},
);
Expand All @@ -123,7 +150,11 @@ export function createEventStreamTransformer(
*
* This function is useful when you want to process a stream of messages and perform specific actions during the stream's lifecycle.
*

* @param {AIStreamCallbacksAndOptions} [callbacksAndOptions] - An object containing the callback functions.
=======
* @param {AIStreamCallbacksAndOptions} [callbacks] - An object containing the callback functions.

* @return {TransformStream<string, Uint8Array>} A transform stream that encodes input messages as Uint8Array and allows the execution of custom logic through callbacks.
*
* @example
Expand All @@ -135,13 +166,22 @@ export function createEventStreamTransformer(
* };
* const transformer = createCallbacksTransformer(callbacks);
*/

export function createCallbacksAndOptionsTransformer(
callbacks: AIStreamCallbacksAndOptions | undefined
): TransformStream<string, Uint8Array> {
const textEncoder = new TextEncoder()
let aggregatedResponse = ''
const { onStart, onToken, onCompletion } = callbacks || {}
=======
export function createCallbacksTransformer(
cb: AIStreamCallbacksAndOptions | OpenAIStreamCallbacks | undefined,
): TransformStream<string | { isText: false; content: string }, Uint8Array> {
const textEncoder = new TextEncoder();
let aggregatedResponse = '';
const callbacks = cb || {};


return new TransformStream({
async start(): Promise<void> {
if (callbacks.onStart) await callbacks.onStart();
Expand Down Expand Up @@ -268,7 +308,11 @@ export function AIStream(

return responseBodyStream
.pipeThrough(createEventStreamTransformer(customParser))

.pipeThrough(createCallbacksAndOptionsTransformer(callbacks))

.pipeThrough(createCallbacksTransformer(callbacks));

}

// outputs lines like
Expand Down
8 changes: 8 additions & 0 deletions packages/core/streams/cohere-stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@

import { type AIStreamCallbacks, createCallbacksAndOptionsTransformer } from './ai-stream'
=======
import {
type AIStreamCallbacksAndOptions,
createCallbacksTransformer,
readableFromAsyncIterable,
} from './ai-stream';
import { createStreamDataTransformer } from './stream-data';


const utf8Decoder = new TextDecoder('utf-8');

// Full types
Expand Down Expand Up @@ -90,6 +94,9 @@ export function CohereStream(
reader: Response | AsyncIterable<StreamChunk>,
callbacks?: AIStreamCallbacksAndOptions,
): ReadableStream {

return createParser(reader).pipeThrough(createCallbacksAndOptionsTransformer(callbacks))
=======
if (Symbol.asyncIterator in reader) {
return readableFromAsyncIterable(streamable(reader))
.pipeThrough(createCallbacksTransformer(callbacks))
Expand All @@ -103,4 +110,5 @@ export function CohereStream(
createStreamDataTransformer(callbacks?.experimental_streamData),
);
}

}
37 changes: 37 additions & 0 deletions packages/core/streams/data-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { JSONValue } from '../shared/types'
import { getStreamString } from '../shared/utils'

/**
* A stream wrapper to send custom JSON-encoded data back to the client.
*/
export class Data {
private encoder = new TextEncoder()
private controller: ReadableStreamDefaultController<Uint8Array> | null = null
private stream: ReadableStream<Uint8Array>

constructor() {
this.stream = new ReadableStream({
start: controller => {
this.controller = controller
}
})
}

append(value: JSONValue, prefix: string = '0'): void {
if (!this.controller) {
throw new Error('Stream controller is not initialized.')
}

const textEncoder = new TextEncoder()
this.controller.enqueue(
textEncoder.encode(getStreamString('text', JSON.stringify(value)))
)
}

close() {
if (!this.controller) return

this.controller.close()
this.controller = null
}
}
11 changes: 11 additions & 0 deletions packages/core/streams/huggingface-stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import {

type AIStreamCallbacks,
createCallbacksAndOptionsTransformer,
trimStartOfStreamHelper
} from './ai-stream'
=======
type AIStreamCallbacksAndOptions,
createCallbacksTransformer,
trimStartOfStreamHelper,
} from './ai-stream';
import { createStreamDataTransformer } from './stream-data';


function createParser(res: AsyncGenerator<any>) {
const trimStartOfStream = trimStartOfStreamHelper();
return new ReadableStream<string>({
Expand Down Expand Up @@ -40,9 +47,13 @@ export function HuggingFaceStream(
res: AsyncGenerator<any>,
callbacks?: AIStreamCallbacksAndOptions,
): ReadableStream {

return createParser(res).pipeThrough(createCallbacksAndOptionsTransformer(callbacks))
=======
return createParser(res)
.pipeThrough(createCallbacksTransformer(callbacks))
.pipeThrough(
createStreamDataTransformer(callbacks?.experimental_streamData),
);

}
8 changes: 8 additions & 0 deletions packages/core/streams/langchain-stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@

import { type AIStreamCallbacks, createCallbacksAndOptionsTransformer } from './ai-stream'
=======
import {
type AIStreamCallbacksAndOptions,
createCallbacksTransformer,
} from './ai-stream';
import { createStreamDataTransformer } from './stream-data';


export function LangChainStream(callbacks?: AIStreamCallbacksAndOptions) {
const stream = new TransformStream();
const writer = stream.writable.getWriter();
Expand All @@ -30,12 +34,16 @@ export function LangChainStream(callbacks?: AIStreamCallbacksAndOptions) {
};

return {

stream: stream.readable.pipeThrough(createCallbacksAndOptionsTransformer(callbacks)),

stream: stream.readable
.pipeThrough(createCallbacksTransformer(callbacks))
.pipeThrough(
createStreamDataTransformer(callbacks?.experimental_streamData),
),
writer,

handlers: {
handleLLMNewToken: async (token: string) => {
await writer.ready;
Expand Down
Loading
Loading