Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/flat-garlics-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ai-connector": patch
---

Implement new start-of-stream newline trimming
26 changes: 15 additions & 11 deletions packages/core/src/ai-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,25 @@ export interface AIStreamCallbacks {
onToken?: (token: string) => Promise<void>
}

export interface AIStreamParserOptions {
data: any
counter: number
}

export interface AIStreamParser {
(opts: AIStreamParserOptions): string | void
(data: string): string | void
}

export function createEventStreamTransformer(customParser: AIStreamParser) {
const decoder = new TextDecoder()
let counter = 0
let parser: EventSourceParser

return new TransformStream<Uint8Array, string>({
async start(controller): Promise<void> {
function onParse(event: ParsedEvent | ReconnectInterval): void {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.terminate()
return
}

const message = customParser({ data, counter })
counter++

const message = customParser(data)
if (message) controller.enqueue(message)
}
}
Expand Down Expand Up @@ -85,6 +77,18 @@ export function createCallbacksTransformer(
})
}

// If we're still at the start of the stream, we want to trim the leading
// `\n\n`. But, after we've seen some text, we no longer want to trim out
// whitespace.
export function trimStartOfStreamHelper() {
let start = true
return (text: string) => {
if (start) text = text.trimStart()
if (text) start = false
return text
}
}

export function AIStream(
res: Response,
customParser: AIStreamParser,
Expand Down
12 changes: 3 additions & 9 deletions packages/core/src/anthropic-stream.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import {
AIStream,
type AIStreamCallbacks,
type AIStreamParserOptions
} from './ai-stream'
import { AIStream, type AIStreamCallbacks } from './ai-stream'

function parseAnthropicStream(): ({
data
}: AIStreamParserOptions) => string | void {
function parseAnthropicStream(): (data: string) => string | void {
let previous = ''

return ({ data }) => {
return data => {
const json = JSON.parse(data as string) as {
completion: string
stop: string | null
Expand Down
23 changes: 11 additions & 12 deletions packages/core/src/huggingface-stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { type AIStreamCallbacks, createCallbacksTransformer } from './ai-stream'
import {
type AIStreamCallbacks,
createCallbacksTransformer,
trimStartOfStreamHelper
} from './ai-stream'

function createParser(res: AsyncGenerator<any>) {
let counter = 0
const trimStartOfStream = trimStartOfStreamHelper()
return new ReadableStream<string>({
async pull(controller): Promise<void> {
const { value, done } = await res.next()
Expand All @@ -10,7 +14,8 @@ function createParser(res: AsyncGenerator<any>) {
return
}

const text: string = value.token?.text ?? ''
const text = trimStartOfStream(value.token?.text ?? '')
if (!text) return

// some HF models return generated_text instead of a real ending token
if (value.generated_text != null && value.generated_text.length > 0) {
Expand All @@ -20,16 +25,10 @@ function createParser(res: AsyncGenerator<any>) {

// <|endoftext|> is for https://huggingface.co/OpenAssistant/oasst-sft-4-pythia-12b-epoch-3.5
// </s> is also often last token in the stream depending on the model
if (text !== '</s>' && text !== '<|endoftext|>') {
// TODO: Is this needed?
if (counter < 2 && text.includes('\n')) {
return
}

controller.enqueue(text)
counter++
} else {
if (text === '</s>' || text === '<|endoftext|>') {
controller.close()
} else {
controller.enqueue(text)
}
}
})
Expand Down
29 changes: 13 additions & 16 deletions packages/core/src/openai-stream.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
import {
AIStream,
type AIStreamCallbacks,
type AIStreamParserOptions
trimStartOfStreamHelper,
type AIStreamCallbacks
} from './ai-stream'

function parseOpenAIStream({
data,
counter
}: AIStreamParserOptions): string | void {
// TODO: Needs a type
const json = JSON.parse(data)
function parseOpenAIStream(): (data: string) => string | void {
const trimStartOfStream = trimStartOfStreamHelper()
return data => {
// TODO: Needs a type
const json = JSON.parse(data)

// this can be used for either chat or completion models
const text = json.choices[0]?.delta?.content ?? json.choices[0]?.text ?? ''
// this can be used for either chat or completion models
const text = trimStartOfStream(
json.choices[0]?.delta?.content ?? json.choices[0]?.text ?? ''
)

// TODO: I don't understand the `counter && has newline`. Should this be `counter < 2 || !has newline?`?
if (counter < 2 && text.includes('\n')) {
return
return text
}

return text
}

export function OpenAIStream(
res: Response,
cb?: AIStreamCallbacks
): ReadableStream {
return AIStream(res, parseOpenAIStream, cb)
return AIStream(res, parseOpenAIStream(), cb)
}