Skip to content

Commit 5b8ec88

Browse files
committed
better ndjson parsing with bounded memory usage
1 parent d69cfa5 commit 5b8ec88

File tree

2 files changed

+402
-37
lines changed

2 files changed

+402
-37
lines changed

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 140 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -273,64 +273,169 @@ export class StreamBatchItemsService extends WithRunEngine {
273273
* Converts a stream of Uint8Array chunks into parsed JSON objects.
274274
* Each line in the NDJSON is parsed independently.
275275
*
276+
* Uses byte-buffer accumulation to:
277+
* - Prevent OOM from unbounded string buffers
278+
* - Properly handle multibyte UTF-8 characters across chunk boundaries
279+
* - Check size limits on raw bytes before decoding
280+
*
276281
* @param maxItemBytes - Maximum allowed bytes per line (item)
277282
* @returns TransformStream that outputs parsed JSON objects
278283
*/
279284
export function createNdjsonParserStream(
280285
maxItemBytes: number
281286
): TransformStream<Uint8Array, unknown> {
282-
const decoder = new TextDecoder();
283-
let buffer = "";
287+
// Single decoder instance, reused for all lines
288+
const decoder = new TextDecoder("utf-8", { fatal: true });
289+
290+
// Byte buffer: array of chunks with tracked total length
291+
let chunks: Uint8Array[] = [];
292+
let totalBytes = 0;
284293
let lineNumber = 0;
285294

286-
return new TransformStream<Uint8Array, unknown>({
287-
transform(chunk, controller) {
288-
buffer += decoder.decode(chunk, { stream: true });
295+
const NEWLINE_BYTE = 0x0a; // '\n'
296+
297+
/**
298+
* Concatenate all chunks into a single Uint8Array
299+
*/
300+
function concatenateChunks(): Uint8Array {
301+
if (chunks.length === 0) {
302+
return new Uint8Array(0);
303+
}
304+
if (chunks.length === 1) {
305+
return chunks[0];
306+
}
307+
const result = new Uint8Array(totalBytes);
308+
let offset = 0;
309+
for (const chunk of chunks) {
310+
result.set(chunk, offset);
311+
offset += chunk.byteLength;
312+
}
313+
return result;
314+
}
315+
316+
/**
317+
* Find the index of the first newline byte in the buffer.
318+
* Returns -1 if not found.
319+
*/
320+
function findNewlineIndex(): number {
321+
let globalIndex = 0;
322+
for (const chunk of chunks) {
323+
for (let i = 0; i < chunk.byteLength; i++) {
324+
if (chunk[i] === NEWLINE_BYTE) {
325+
return globalIndex + i;
326+
}
327+
}
328+
globalIndex += chunk.byteLength;
329+
}
330+
return -1;
331+
}
332+
333+
/**
334+
* Extract bytes from the buffer up to (but not including) the given index,
335+
* and remove those bytes plus the delimiter from the buffer.
336+
*/
337+
function extractLine(newlineIndex: number): Uint8Array {
338+
const fullBuffer = concatenateChunks();
339+
const lineBytes = fullBuffer.slice(0, newlineIndex);
340+
const remaining = fullBuffer.slice(newlineIndex + 1); // Skip the newline
341+
342+
// Reset buffer with remaining bytes
343+
if (remaining.byteLength > 0) {
344+
chunks = [remaining];
345+
totalBytes = remaining.byteLength;
346+
} else {
347+
chunks = [];
348+
totalBytes = 0;
349+
}
289350

290-
// Split on newlines
291-
const lines = buffer.split("\n");
292-
buffer = lines.pop() ?? "";
351+
return lineBytes;
352+
}
293353

294-
for (const line of lines) {
295-
lineNumber++;
296-
const trimmed = line.trim();
297-
if (!trimmed) continue;
354+
/**
355+
* Parse a line from bytes, handling whitespace trimming.
356+
* Returns the parsed object or null for empty lines.
357+
*/
358+
function parseLine(
359+
lineBytes: Uint8Array,
360+
controller: TransformStreamDefaultController<unknown>
361+
): void {
362+
lineNumber++;
363+
364+
// Decode the line bytes (stream: false since this is a complete line)
365+
let lineText: string;
366+
try {
367+
lineText = decoder.decode(lineBytes, { stream: false });
368+
} catch (err) {
369+
throw new Error(`Invalid UTF-8 at line ${lineNumber}: ${(err as Error).message}`);
370+
}
298371

299-
// Check byte size before parsing
300-
const lineBytes = new TextEncoder().encode(trimmed).length;
301-
if (lineBytes > maxItemBytes) {
372+
const trimmed = lineText.trim();
373+
if (!trimmed) {
374+
return; // Skip empty lines
375+
}
376+
377+
try {
378+
const obj = JSON.parse(trimmed);
379+
controller.enqueue(obj);
380+
} catch (err) {
381+
throw new Error(`Invalid JSON at line ${lineNumber}: ${(err as Error).message}`);
382+
}
383+
}
384+
385+
return new TransformStream<Uint8Array, unknown>({
386+
transform(chunk, controller) {
387+
// Append chunk to buffer
388+
chunks.push(chunk);
389+
totalBytes += chunk.byteLength;
390+
391+
// Process all complete lines in the buffer
392+
let newlineIndex: number;
393+
while ((newlineIndex = findNewlineIndex()) !== -1) {
394+
// Check size limit BEFORE extracting/decoding (bytes up to newline)
395+
if (newlineIndex > maxItemBytes) {
302396
throw new Error(
303-
`Item at line ${lineNumber} exceeds maximum size of ${maxItemBytes} bytes (actual: ${lineBytes})`
397+
`Item at line ${
398+
lineNumber + 1
399+
} exceeds maximum size of ${maxItemBytes} bytes (actual: ${newlineIndex})`
304400
);
305401
}
306402

307-
try {
308-
const obj = JSON.parse(trimmed);
309-
controller.enqueue(obj);
310-
} catch (err) {
311-
throw new Error(`Invalid JSON at line ${lineNumber}: ${(err as Error).message}`);
312-
}
403+
const lineBytes = extractLine(newlineIndex);
404+
parseLine(lineBytes, controller);
405+
}
406+
407+
// Check if the remaining buffer (incomplete line) exceeds the limit
408+
// This prevents OOM from a single huge line without newlines
409+
if (totalBytes > maxItemBytes) {
410+
throw new Error(
411+
`Item at line ${
412+
lineNumber + 1
413+
} exceeds maximum size of ${maxItemBytes} bytes (buffered: ${totalBytes}, no newline found)`
414+
);
313415
}
314416
},
417+
315418
flush(controller) {
316-
// Handle any remaining buffered data (no trailing newline case)
317-
const final = buffer.trim();
318-
if (!final) return;
419+
// Flush any remaining bytes from the decoder's internal state
420+
// This handles multibyte characters that may have been split across chunks
421+
decoder.decode(new Uint8Array(0), { stream: false });
319422

320-
lineNumber++;
321-
const lineBytes = new TextEncoder().encode(final).length;
322-
if (lineBytes > maxItemBytes) {
423+
// Process any remaining buffered data (no trailing newline case)
424+
if (totalBytes === 0) {
425+
return;
426+
}
427+
428+
// Check size limit before processing final line
429+
if (totalBytes > maxItemBytes) {
323430
throw new Error(
324-
`Item at line ${lineNumber} exceeds maximum size of ${maxItemBytes} bytes (actual: ${lineBytes})`
431+
`Item at line ${
432+
lineNumber + 1
433+
} exceeds maximum size of ${maxItemBytes} bytes (actual: ${totalBytes})`
325434
);
326435
}
327436

328-
try {
329-
const obj = JSON.parse(final);
330-
controller.enqueue(obj);
331-
} catch (err) {
332-
throw new Error(`Invalid JSON at line ${lineNumber}: ${(err as Error).message}`);
333-
}
437+
const finalBytes = concatenateChunks();
438+
parseLine(finalBytes, controller);
334439
},
335440
});
336441
}

0 commit comments

Comments
 (0)