From fb95882f40a96729be3dad908c5f59e91018401f Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Wed, 20 Dec 2023 20:10:00 -0500 Subject: [PATCH 1/7] adding message_data streaming part, appendToMessage, and parser adding onEvent callback to ai-stream fixing streaming part typo removing onevent adding optional event param to AIStreamParser passing the extra arg in the transformer for event type --- .../core/shared/parse-complex-response.ts | 39 +++++++++++++++++-- packages/core/shared/stream-parts.test.ts | 9 +++++ packages/core/shared/stream-parts.ts | 26 +++++++++++-- packages/core/streams/ai-stream.ts | 4 +- packages/core/streams/stream-data.ts | 24 ++++++++++++ 5 files changed, 94 insertions(+), 8 deletions(-) diff --git a/packages/core/shared/parse-complex-response.ts b/packages/core/shared/parse-complex-response.ts index b4d8ee2..6608da6 100644 --- a/packages/core/shared/parse-complex-response.ts +++ b/packages/core/shared/parse-complex-response.ts @@ -15,6 +15,24 @@ type PrefixMap = { data: JSONValue[]; }; +function initializeMessage({ + generateId, + content, + createdAt, +}: { + generateId: () => string; + content: string; + createdAt: Date; + data?: JSONValue; +}): Message { + return { + id: generateId(), + role: 'assistant', + content, + createdAt, + }; +} + export async function parseComplexResponse({ reader, abortControllerRef, @@ -48,12 +66,27 @@ export async function parseComplexResponse({ content: (prefixMap['text'].content || '') + value, }; } else { - prefixMap['text'] = { - id: generateId(), - role: 'assistant', + prefixMap['text'] = initializeMessage({ + generateId, content: value, createdAt, + }); + } + } + + if (type == 'message_data') { + if (prefixMap['text']) { + prefixMap['text'] = { + ...prefixMap['text'], + data: value, }; + } else { + prefixMap['text'] = initializeMessage({ + generateId, + content: '', + createdAt, + data: value, + }); } } diff --git a/packages/core/shared/stream-parts.test.ts b/packages/core/shared/stream-parts.test.ts index e079eae..2ebe145 100644 --- a/packages/core/shared/stream-parts.test.ts +++ b/packages/core/shared/stream-parts.test.ts @@ -69,6 +69,15 @@ describe('stream-parts', () => { expect(parseStreamPart(input)).toEqual(expectedOutput); }); + it('should parse a message data line', () => { + const input = '8:[{"test":"value"}]'; + const expectedOutput = { + type: 'message_data', + value: [{ test: 'value' }], + }; + expect(parseStreamPart(input)).toEqual(expectedOutput); + }); + it('should throw an error if the input does not contain a colon separator', () => { const input = 'invalid stream string'; expect(() => parseStreamPart(input)).toThrow(); diff --git a/packages/core/shared/stream-parts.ts b/packages/core/shared/stream-parts.ts index e013d79..a629c2a 100644 --- a/packages/core/shared/stream-parts.ts +++ b/packages/core/shared/stream-parts.ts @@ -221,6 +221,22 @@ const toolCallStreamPart: StreamPart< }, }; +const messageDataStreamPart: StreamPart< + '8', + 'message_data', + Array +> = { + code: '8', + name: 'message_data', + parse: (value: JSONValue) => { + if (!Array.isArray(value)) { + throw new Error('"data" parts expect an array value.'); + } + + return { type: 'message_data', value }; + }, +}; + const streamParts = [ textStreamPart, functionCallStreamPart, @@ -230,6 +246,7 @@ const streamParts = [ assistantControlDataStreamPart, dataMessageStreamPart, toolCallStreamPart, + messageDataStreamPart, ] as const; // union type of all stream parts @@ -241,8 +258,8 @@ type StreamParts = | typeof assistantMessageStreamPart | typeof assistantControlDataStreamPart | typeof dataMessageStreamPart - | typeof toolCallStreamPart; - + | typeof toolCallStreamPart + | typeof messageDataStreamPart; /** * Maps the type of a stream part to its value type. */ @@ -258,7 +275,8 @@ export type StreamPartType = | ReturnType | ReturnType | ReturnType - | ReturnType; + | ReturnType + | ReturnType; export const streamPartsByCode = { [textStreamPart.code]: textStreamPart, @@ -269,6 +287,7 @@ export const streamPartsByCode = { [assistantControlDataStreamPart.code]: assistantControlDataStreamPart, [dataMessageStreamPart.code]: dataMessageStreamPart, [toolCallStreamPart.code]: toolCallStreamPart, + [messageDataStreamPart.code]: messageDataStreamPart, } as const; /** @@ -302,6 +321,7 @@ export const StreamStringPrefixes = { [assistantControlDataStreamPart.name]: assistantControlDataStreamPart.code, [dataMessageStreamPart.name]: dataMessageStreamPart.code, [toolCallStreamPart.name]: toolCallStreamPart.code, + [messageDataStreamPart.name]: messageDataStreamPart.code, } as const; export const validCodes = streamParts.map(part => part.code); diff --git a/packages/core/streams/ai-stream.ts b/packages/core/streams/ai-stream.ts index 441d030..e227b9e 100644 --- a/packages/core/streams/ai-stream.ts +++ b/packages/core/streams/ai-stream.ts @@ -50,7 +50,7 @@ export interface AIStreamCallbacksAndOptions { * @interface */ export interface AIStreamParser { - (data: string): string | void; + (data: string, event?: string): string | void; } /** @@ -82,7 +82,7 @@ export function createEventStreamTransformer( if ('data' in event) { const parsedMessage = customParser - ? customParser(event.data) + ? customParser(event.data, event.event) : event.data; if (parsedMessage) controller.enqueue(parsedMessage); } diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 2f1c452..5461844 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -19,6 +19,8 @@ export class experimental_StreamData { // array to store appended data private data: JSONValue[] = []; + private messageData: JSONValue[] = []; + constructor() { this.isClosedPromise = new Promise(resolve => { this.isClosedPromiseResolver = resolve; @@ -39,6 +41,13 @@ export class experimental_StreamData { controller.enqueue(encodedData); } + if (self.messageData.length) { + const encodedMessageData = self.encoder.encode( + formatStreamPart('message_data', self.messageData), + ); + controller.enqueue(encodedMessageData); + } + controller.enqueue(chunk); }, async flush(controller) { @@ -64,6 +73,13 @@ export class experimental_StreamData { ); controller.enqueue(encodedData); } + + if (self.messageData.length) { + const encodedData = self.encoder.encode( + formatStreamPart('message_data', self.messageData), + ); + controller.enqueue(encodedData); + } }, }); } @@ -88,6 +104,14 @@ export class experimental_StreamData { this.data.push(value); } + + appendToMessage(value: JSONValue): void { + if (this.isClosed) { + throw new Error('Data Stream has already been closed.'); + } + + this.messageData.push(value); + } } /** From 295ff58d17e79f7a9f720bb9d04bd042e866b542 Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Fri, 26 Jan 2024 00:58:02 -0500 Subject: [PATCH 2/7] renaming to annotations --- packages/core/shared/parse-complex-response.ts | 2 +- packages/core/shared/stream-parts.test.ts | 2 +- packages/core/shared/stream-parts.ts | 6 +++--- packages/core/streams/stream-data.ts | 12 ++++++------ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/core/shared/parse-complex-response.ts b/packages/core/shared/parse-complex-response.ts index 6608da6..ab660c4 100644 --- a/packages/core/shared/parse-complex-response.ts +++ b/packages/core/shared/parse-complex-response.ts @@ -74,7 +74,7 @@ export async function parseComplexResponse({ } } - if (type == 'message_data') { + if (type == 'message_annotations') { if (prefixMap['text']) { prefixMap['text'] = { ...prefixMap['text'], diff --git a/packages/core/shared/stream-parts.test.ts b/packages/core/shared/stream-parts.test.ts index 2ebe145..8b00adb 100644 --- a/packages/core/shared/stream-parts.test.ts +++ b/packages/core/shared/stream-parts.test.ts @@ -72,7 +72,7 @@ describe('stream-parts', () => { it('should parse a message data line', () => { const input = '8:[{"test":"value"}]'; const expectedOutput = { - type: 'message_data', + type: 'message_annotations', value: [{ test: 'value' }], }; expect(parseStreamPart(input)).toEqual(expectedOutput); diff --git a/packages/core/shared/stream-parts.ts b/packages/core/shared/stream-parts.ts index a629c2a..a409d49 100644 --- a/packages/core/shared/stream-parts.ts +++ b/packages/core/shared/stream-parts.ts @@ -223,17 +223,17 @@ const toolCallStreamPart: StreamPart< const messageDataStreamPart: StreamPart< '8', - 'message_data', + 'message_annotations', Array > = { code: '8', - name: 'message_data', + name: 'message_annotations', parse: (value: JSONValue) => { if (!Array.isArray(value)) { throw new Error('"data" parts expect an array value.'); } - return { type: 'message_data', value }; + return { type: 'message_annotations', value }; }, }; diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 5461844..0462a3a 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -19,7 +19,7 @@ export class experimental_StreamData { // array to store appended data private data: JSONValue[] = []; - private messageData: JSONValue[] = []; + private messageAnnotations: JSONValue[] = []; constructor() { this.isClosedPromise = new Promise(resolve => { @@ -43,7 +43,7 @@ export class experimental_StreamData { if (self.messageData.length) { const encodedMessageData = self.encoder.encode( - formatStreamPart('message_data', self.messageData), + formatStreamPart('message_annotations', self.messageData), ); controller.enqueue(encodedMessageData); } @@ -74,9 +74,9 @@ export class experimental_StreamData { controller.enqueue(encodedData); } - if (self.messageData.length) { + if (self.messageAnnotations.length) { const encodedData = self.encoder.encode( - formatStreamPart('message_data', self.messageData), + formatStreamPart('message_annotations', self.messageAnnotations), ); controller.enqueue(encodedData); } @@ -105,12 +105,12 @@ export class experimental_StreamData { this.data.push(value); } - appendToMessage(value: JSONValue): void { + appendMessageAnnotation(value: JSONValue): void { if (this.isClosed) { throw new Error('Data Stream has already been closed.'); } - this.messageData.push(value); + this.messageAnnotations.push(value); } } From b960803c290686510c57b56e9fb2543c607886c1 Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Fri, 26 Jan 2024 01:01:04 -0500 Subject: [PATCH 3/7] rename of messageData to messageAnnotations in stream parts and stram data --- packages/core/shared/stream-parts.ts | 12 ++++++------ packages/core/streams/stream-data.ts | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core/shared/stream-parts.ts b/packages/core/shared/stream-parts.ts index a409d49..062dc45 100644 --- a/packages/core/shared/stream-parts.ts +++ b/packages/core/shared/stream-parts.ts @@ -221,7 +221,7 @@ const toolCallStreamPart: StreamPart< }, }; -const messageDataStreamPart: StreamPart< +const messageAnnotationsStreamPart: StreamPart< '8', 'message_annotations', Array @@ -246,7 +246,7 @@ const streamParts = [ assistantControlDataStreamPart, dataMessageStreamPart, toolCallStreamPart, - messageDataStreamPart, + messageAnnotationsStreamPart, ] as const; // union type of all stream parts @@ -259,7 +259,7 @@ type StreamParts = | typeof assistantControlDataStreamPart | typeof dataMessageStreamPart | typeof toolCallStreamPart - | typeof messageDataStreamPart; + | typeof messageAnnotationsStreamPart; /** * Maps the type of a stream part to its value type. */ @@ -276,7 +276,7 @@ export type StreamPartType = | ReturnType | ReturnType | ReturnType - | ReturnType; + | ReturnType; export const streamPartsByCode = { [textStreamPart.code]: textStreamPart, @@ -287,7 +287,7 @@ export const streamPartsByCode = { [assistantControlDataStreamPart.code]: assistantControlDataStreamPart, [dataMessageStreamPart.code]: dataMessageStreamPart, [toolCallStreamPart.code]: toolCallStreamPart, - [messageDataStreamPart.code]: messageDataStreamPart, + [messageAnnotationsStreamPart.code]: messageAnnotationsStreamPart, } as const; /** @@ -321,7 +321,7 @@ export const StreamStringPrefixes = { [assistantControlDataStreamPart.name]: assistantControlDataStreamPart.code, [dataMessageStreamPart.name]: dataMessageStreamPart.code, [toolCallStreamPart.name]: toolCallStreamPart.code, - [messageDataStreamPart.name]: messageDataStreamPart.code, + [messageAnnotationsStreamPart.name]: messageAnnotationsStreamPart.code, } as const; export const validCodes = streamParts.map(part => part.code); diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 0462a3a..905e3da 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -41,11 +41,11 @@ export class experimental_StreamData { controller.enqueue(encodedData); } - if (self.messageData.length) { - const encodedMessageData = self.encoder.encode( - formatStreamPart('message_annotations', self.messageData), + if (self.messageAnnotations.length) { + const encodedmessageAnnotations = self.encoder.encode( + formatStreamPart('message_annotations', self.messageAnnotations), ); - controller.enqueue(encodedMessageData); + controller.enqueue(encodedmessageAnnotations); } controller.enqueue(chunk); From 604025e32fc2f0da37ba58024b7549bb39914a04 Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Fri, 26 Jan 2024 02:13:52 -0500 Subject: [PATCH 4/7] updating to multi-array and adding test cases for parsing the complex response --- .../shared/parse-complex-response.test.ts | 41 +++++++++++++++++++ .../core/shared/parse-complex-response.ts | 30 +++++++------- packages/core/shared/stream-parts.ts | 2 +- packages/core/shared/types.ts | 5 +++ packages/core/streams/stream-data.ts | 2 +- 5 files changed, 63 insertions(+), 17 deletions(-) diff --git a/packages/core/shared/parse-complex-response.test.ts b/packages/core/shared/parse-complex-response.test.ts index e61dd1a..480dd8f 100644 --- a/packages/core/shared/parse-complex-response.test.ts +++ b/packages/core/shared/parse-complex-response.test.ts @@ -224,4 +224,45 @@ describe('parseComplexResponse function', () => { data: [{ t1: 'v1' }, 3, null, false, 'text'], }); }); + + it('should parse a combination of a text message and message annotations', async () => { + const mockUpdate = vi.fn(); + + // Execute the parser function + const result = await parseComplexResponse({ + reader: createTestReader([ + '0:"Sample text message."\n', + '8:[{"key":"value"}, 2]\n', + ]), + abortControllerRef: { current: new AbortController() }, + update: mockUpdate, + generateId: () => 'test-id', + getCurrentDate: () => new Date(0), + }); + + // check the mockUpdate call: + expect(mockUpdate).toHaveBeenCalledTimes(2); + + expect(mockUpdate.mock.calls[0][0]).toEqual([ + assistantTextMessage('Sample text message.'), + ]); + + expect(mockUpdate.mock.calls[1][0]).toEqual([ + { + ...assistantTextMessage('Sample text message.'), + annotations: [{ key: 'value' }, 2], + }, + ]); + + // check the result + expect(result).toEqual({ + messages: [ + { + ...assistantTextMessage('Sample text message.'), + annotations: [{ key: 'value' }, 2], + }, + ], + data: [], + }); + }); }); diff --git a/packages/core/shared/parse-complex-response.ts b/packages/core/shared/parse-complex-response.ts index ab660c4..788cd32 100644 --- a/packages/core/shared/parse-complex-response.ts +++ b/packages/core/shared/parse-complex-response.ts @@ -17,19 +17,17 @@ type PrefixMap = { function initializeMessage({ generateId, - content, - createdAt, + ...rest }: { generateId: () => string; content: string; createdAt: Date; - data?: JSONValue; + annotations?: JSONValue[]; }): Message { return { id: generateId(), role: 'assistant', - content, - createdAt, + ...rest }; } @@ -66,11 +64,12 @@ export async function parseComplexResponse({ content: (prefixMap['text'].content || '') + value, }; } else { - prefixMap['text'] = initializeMessage({ - generateId, + prefixMap['text'] = { + id: generateId(), + role: 'assistant', content: value, - createdAt, - }); + createdAt + }; } } @@ -78,15 +77,16 @@ export async function parseComplexResponse({ if (prefixMap['text']) { prefixMap['text'] = { ...prefixMap['text'], - data: value, + annotations: [...prefixMap['text'].annotations || [], ...value], }; } else { - prefixMap['text'] = initializeMessage({ - generateId, + prefixMap['text'] = { + id: generateId(), + role: 'assistant', content: '', - createdAt, - data: value, - }); + annotations: [...value], + createdAt + }; } } diff --git a/packages/core/shared/stream-parts.ts b/packages/core/shared/stream-parts.ts index 062dc45..55392a3 100644 --- a/packages/core/shared/stream-parts.ts +++ b/packages/core/shared/stream-parts.ts @@ -230,7 +230,7 @@ const messageAnnotationsStreamPart: StreamPart< name: 'message_annotations', parse: (value: JSONValue) => { if (!Array.isArray(value)) { - throw new Error('"data" parts expect an array value.'); + throw new Error('"message_annotations" parts expect an array value.'); } return { type: 'message_annotations', value }; diff --git a/packages/core/shared/types.ts b/packages/core/shared/types.ts index a88dbc2..f9cdfde 100644 --- a/packages/core/shared/types.ts +++ b/packages/core/shared/types.ts @@ -110,6 +110,11 @@ export interface Message { * the tool call name and arguments. Otherwise, the field should not be set. */ tool_calls?: string | ToolCall[]; + + /** + * Additional message-specific information added on the server via StreamData + */ + annotations?: JSONValue[] | undefined; } export type CreateMessage = Omit & { diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 905e3da..8677284 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -105,7 +105,7 @@ export class experimental_StreamData { this.data.push(value); } - appendMessageAnnotation(value: JSONValue): void { + appendMessageAnnotations(value: JSONValue): void { if (this.isClosed) { throw new Error('Data Stream has already been closed.'); } From 66f25a5996d6697fc7def9c3faae1e7762efcebd Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Fri, 26 Jan 2024 15:49:46 -0500 Subject: [PATCH 5/7] putting event in the options param for customParser --- packages/core/streams/ai-stream.ts | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/packages/core/streams/ai-stream.ts b/packages/core/streams/ai-stream.ts index e227b9e..01e56d0 100644 --- a/packages/core/streams/ai-stream.ts +++ b/packages/core/streams/ai-stream.ts @@ -43,14 +43,24 @@ export interface AIStreamCallbacksAndOptions { experimental_streamData?: boolean; } -// new TokenData() -// data: TokenData, +/** + * Options for the AIStreamParser. + * @interface + * @property {string} event - The event (type) from the server side event stream. + */ +export interface AIStreamParserOptions { + event?: string; +} + /** * Custom parser for AIStream data. * @interface + * @param {string} data - The data to be parsed. + * @param {AIStreamParserOptions} options - The options for the parser. + * @returns {string | void} The parsed data or void. */ export interface AIStreamParser { - (data: string, event?: string): string | void; + (data: string, options: AIStreamParserOptions): string | void; } /** @@ -82,7 +92,9 @@ export function createEventStreamTransformer( if ('data' in event) { const parsedMessage = customParser - ? customParser(event.data, event.event) + ? customParser(event.data, { + event: event.event + }) : event.data; if (parsedMessage) controller.enqueue(parsedMessage); } From bcf90d0e1bca2f0245410a6a6115a93a18b0febc Mon Sep 17 00:00:00 2001 From: Nick Gomez Date: Fri, 26 Jan 2024 19:04:21 -0500 Subject: [PATCH 6/7] appendMessageAnnotations to appendMessageAnnotation --- packages/core/streams/stream-data.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 8677284..905e3da 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -105,7 +105,7 @@ export class experimental_StreamData { this.data.push(value); } - appendMessageAnnotations(value: JSONValue): void { + appendMessageAnnotation(value: JSONValue): void { if (this.isClosed) { throw new Error('Data Stream has already been closed.'); } From 0b5babce77c1618e1b22b9baff736000e0d28eba Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Mon, 29 Jan 2024 15:41:58 -0800 Subject: [PATCH 7/7] changeset --- .changeset/tasty-bobcats-check.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/tasty-bobcats-check.md diff --git a/.changeset/tasty-bobcats-check.md b/.changeset/tasty-bobcats-check.md new file mode 100644 index 0000000..7953393 --- /dev/null +++ b/.changeset/tasty-bobcats-check.md @@ -0,0 +1,5 @@ +--- +'ai': patch +--- + +StreamData: add `annotations` and `appendMessageAnnotation` support