Skip to content

Commit 00d5428

Browse files
committed
Better handling of batches when sealed = false
1 parent f3354b1 commit 00d5428

File tree

5 files changed

+286
-5
lines changed

5 files changed

+286
-5
lines changed

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,14 @@ export class StreamBatchItemsService extends WithRunEngine {
161161
});
162162

163163
// Don't seal the batch if count doesn't match
164-
// Client can retry with missing items
164+
// Return sealed: false so client knows to retry with missing items
165165
return {
166166
id: batchFriendlyId,
167167
itemsAccepted,
168168
itemsDeduplicated,
169+
sealed: false,
170+
enqueuedCount,
171+
expectedCount: batch.runCount,
169172
};
170173
}
171174

@@ -195,6 +198,7 @@ export class StreamBatchItemsService extends WithRunEngine {
195198
id: batchFriendlyId,
196199
itemsAccepted,
197200
itemsDeduplicated,
201+
sealed: true,
198202
};
199203
}
200204
);

packages/core/src/v3/apiClient/errors.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,47 @@ export class ApiSchemaValidationError extends ApiError {
190190
}
191191
}
192192

193+
/**
194+
* Error thrown when a batch stream completes but the batch was not sealed.
195+
* This indicates that not all expected items were received by the server.
196+
* The client should retry sending all items, or investigate the mismatch.
197+
*/
198+
export class BatchNotSealedError extends Error {
199+
readonly name = "BatchNotSealedError";
200+
201+
/** The batch ID that was not sealed */
202+
readonly batchId: string;
203+
204+
/** Number of items currently enqueued on the server */
205+
readonly enqueuedCount: number;
206+
207+
/** Number of items expected to complete the batch */
208+
readonly expectedCount: number;
209+
210+
/** Number of items accepted in this request */
211+
readonly itemsAccepted: number;
212+
213+
/** Number of items deduplicated in this request */
214+
readonly itemsDeduplicated: number;
215+
216+
constructor(options: {
217+
batchId: string;
218+
enqueuedCount: number;
219+
expectedCount: number;
220+
itemsAccepted: number;
221+
itemsDeduplicated: number;
222+
}) {
223+
const message = `Batch ${options.batchId} was not sealed: received ${options.enqueuedCount} of ${options.expectedCount} expected items (accepted: ${options.itemsAccepted}, deduplicated: ${options.itemsDeduplicated})`;
224+
super(message);
225+
226+
this.batchId = options.batchId;
227+
this.enqueuedCount = options.enqueuedCount;
228+
this.expectedCount = options.expectedCount;
229+
this.itemsAccepted = options.itemsAccepted;
230+
this.itemsDeduplicated = options.itemsDeduplicated;
231+
}
232+
}
233+
193234
function castToError(err: any): Error {
194235
if (err instanceof Error) return err;
195236
return new Error(err);

packages/core/src/v3/apiClient/index.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import {
6666
zodfetchCursorPage,
6767
zodfetchOffsetLimitPage,
6868
} from "./core.js";
69-
import { ApiConnectionError, ApiError } from "./errors.js";
69+
import { ApiConnectionError, ApiError, BatchNotSealedError } from "./errors.js";
7070
import { calculateNextRetryDelay } from "../utils/retries.js";
7171
import { RetryOptions } from "../schemas/index.js";
7272
import {
@@ -463,20 +463,48 @@ export class ApiClient {
463463
throw ApiError.generate(response.status, errJSON, errMessage, responseHeaders);
464464
}
465465

466-
// Success - cancel the backup stream to release resources
467-
await forRetry.cancel();
468-
469466
const result = await response.json();
470467
const parsed = StreamBatchItemsResponse.safeParse(result);
471468

472469
if (!parsed.success) {
470+
// Cancel backup stream since we're throwing
471+
await forRetry.cancel();
473472
throw new Error(
474473
`Invalid response from server for batch ${batchId}: ${parsed.error.message}`
475474
);
476475
}
477476

477+
// Check if the batch was sealed
478+
if (!parsed.data.sealed) {
479+
// Not all items were received - treat as retryable condition
480+
const delay = calculateNextRetryDelay(retryOptions, attempt);
481+
482+
if (delay) {
483+
// Retry with the backup stream
484+
await sleep(delay);
485+
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
486+
}
487+
488+
// No more retries - cancel backup stream and throw descriptive error
489+
await forRetry.cancel();
490+
throw new BatchNotSealedError({
491+
batchId,
492+
enqueuedCount: parsed.data.enqueuedCount ?? 0,
493+
expectedCount: parsed.data.expectedCount ?? 0,
494+
itemsAccepted: parsed.data.itemsAccepted,
495+
itemsDeduplicated: parsed.data.itemsDeduplicated,
496+
});
497+
}
498+
499+
// Success - cancel the backup stream to release resources
500+
await forRetry.cancel();
501+
478502
return parsed.data;
479503
} catch (error) {
504+
// Don't retry BatchNotSealedError (retries already exhausted)
505+
if (error instanceof BatchNotSealedError) {
506+
throw error;
507+
}
480508
// Don't retry ApiErrors (already handled above with backup stream cancelled)
481509
if (error instanceof ApiError) {
482510
throw error;
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import { describe, it, expect, vi, afterEach } from "vitest";
2+
import { ApiClient } from "./index.js";
3+
import { BatchNotSealedError } from "./errors.js";
4+
5+
vi.setConfig({ testTimeout: 10_000 });
6+
7+
describe("streamBatchItems unsealed handling", () => {
8+
const originalFetch = globalThis.fetch;
9+
10+
afterEach(() => {
11+
globalThis.fetch = originalFetch;
12+
vi.restoreAllMocks();
13+
});
14+
15+
/**
16+
* Helper to create a mock fetch that properly consumes the request body stream.
17+
* This is necessary because streamBatchItems sends a ReadableStream body.
18+
*/
19+
function createMockFetch(
20+
responses: Array<{
21+
id: string;
22+
itemsAccepted: number;
23+
itemsDeduplicated: number;
24+
sealed: boolean;
25+
enqueuedCount?: number;
26+
expectedCount?: number;
27+
}>
28+
) {
29+
let callIndex = 0;
30+
return vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
31+
// Consume the request body stream to prevent hanging
32+
if (init?.body && init.body instanceof ReadableStream) {
33+
const reader = init.body.getReader();
34+
// Drain the stream
35+
while (true) {
36+
const { done } = await reader.read();
37+
if (done) break;
38+
}
39+
}
40+
41+
const responseData = responses[Math.min(callIndex, responses.length - 1)];
42+
callIndex++;
43+
44+
return {
45+
ok: true,
46+
json: () => Promise.resolve(responseData),
47+
};
48+
});
49+
}
50+
51+
it("throws BatchNotSealedError when sealed=false after retries exhausted", async () => {
52+
const mockFetch = createMockFetch([
53+
{
54+
id: "batch_test123",
55+
itemsAccepted: 5,
56+
itemsDeduplicated: 0,
57+
sealed: false,
58+
enqueuedCount: 5,
59+
expectedCount: 10,
60+
},
61+
]);
62+
globalThis.fetch = mockFetch;
63+
64+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
65+
66+
const error = await client
67+
.streamBatchItems(
68+
"batch_test123",
69+
[{ index: 0, task: "test-task", payload: "{}" }],
70+
{ retry: { maxAttempts: 2, minDelay: 10, maxDelay: 50 } }
71+
)
72+
.catch((e) => e);
73+
74+
expect(error).toBeInstanceOf(BatchNotSealedError);
75+
expect((error as BatchNotSealedError).batchId).toBe("batch_test123");
76+
expect((error as BatchNotSealedError).enqueuedCount).toBe(5);
77+
expect((error as BatchNotSealedError).expectedCount).toBe(10);
78+
expect((error as BatchNotSealedError).itemsAccepted).toBe(5);
79+
expect((error as BatchNotSealedError).itemsDeduplicated).toBe(0);
80+
81+
// Should have retried (2 attempts total based on maxAttempts)
82+
expect(mockFetch).toHaveBeenCalledTimes(2);
83+
});
84+
85+
it("retries when sealed=false and succeeds when sealed=true on retry", async () => {
86+
const mockFetch = createMockFetch([
87+
// First response: not sealed
88+
{
89+
id: "batch_test123",
90+
itemsAccepted: 5,
91+
itemsDeduplicated: 0,
92+
sealed: false,
93+
enqueuedCount: 5,
94+
expectedCount: 10,
95+
},
96+
// Second response: sealed
97+
{
98+
id: "batch_test123",
99+
itemsAccepted: 5,
100+
itemsDeduplicated: 0,
101+
sealed: true,
102+
},
103+
]);
104+
globalThis.fetch = mockFetch;
105+
106+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
107+
108+
const result = await client.streamBatchItems(
109+
"batch_test123",
110+
[{ index: 0, task: "test-task", payload: "{}" }],
111+
{ retry: { maxAttempts: 3, minDelay: 10, maxDelay: 50 } }
112+
);
113+
114+
expect(result.sealed).toBe(true);
115+
// Should have been called twice (first unsealed, second sealed)
116+
expect(mockFetch).toHaveBeenCalledTimes(2);
117+
});
118+
119+
it("succeeds immediately when sealed=true on first attempt", async () => {
120+
const mockFetch = createMockFetch([
121+
{
122+
id: "batch_test123",
123+
itemsAccepted: 10,
124+
itemsDeduplicated: 0,
125+
sealed: true,
126+
},
127+
]);
128+
globalThis.fetch = mockFetch;
129+
130+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
131+
132+
const result = await client.streamBatchItems("batch_test123", [
133+
{ index: 0, task: "test-task", payload: "{}" },
134+
]);
135+
136+
expect(result.sealed).toBe(true);
137+
expect(result.itemsAccepted).toBe(10);
138+
// Should only be called once
139+
expect(mockFetch).toHaveBeenCalledTimes(1);
140+
});
141+
142+
it("BatchNotSealedError has descriptive message", async () => {
143+
const mockFetch = createMockFetch([
144+
{
145+
id: "batch_abc123",
146+
itemsAccepted: 7,
147+
itemsDeduplicated: 2,
148+
sealed: false,
149+
enqueuedCount: 9,
150+
expectedCount: 15,
151+
},
152+
]);
153+
globalThis.fetch = mockFetch;
154+
155+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
156+
157+
const error = await client
158+
.streamBatchItems(
159+
"batch_abc123",
160+
[{ index: 0, task: "test-task", payload: "{}" }],
161+
{ retry: { maxAttempts: 1, minDelay: 10, maxDelay: 50 } }
162+
)
163+
.catch((e) => e);
164+
165+
expect(error).toBeInstanceOf(BatchNotSealedError);
166+
expect(error.message).toContain("batch_abc123");
167+
expect(error.message).toContain("9 of 15");
168+
expect(error.message).toContain("accepted: 7");
169+
expect(error.message).toContain("deduplicated: 2");
170+
});
171+
172+
it("handles missing enqueuedCount and expectedCount gracefully", async () => {
173+
// Simulate older server response that might not include these fields
174+
const mockFetch = createMockFetch([
175+
{
176+
id: "batch_test123",
177+
itemsAccepted: 5,
178+
itemsDeduplicated: 0,
179+
sealed: false,
180+
// No enqueuedCount or expectedCount
181+
},
182+
]);
183+
globalThis.fetch = mockFetch;
184+
185+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
186+
187+
const error = await client
188+
.streamBatchItems(
189+
"batch_test123",
190+
[{ index: 0, task: "test-task", payload: "{}" }],
191+
{ retry: { maxAttempts: 1, minDelay: 10, maxDelay: 50 } }
192+
)
193+
.catch((e) => e);
194+
195+
expect(error).toBeInstanceOf(BatchNotSealedError);
196+
// Should default to 0 when not provided
197+
expect((error as BatchNotSealedError).enqueuedCount).toBe(0);
198+
expect((error as BatchNotSealedError).expectedCount).toBe(0);
199+
});
200+
});

packages/core/src/v3/schemas/api.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,14 @@ export const StreamBatchItemsResponse = z.object({
379379
itemsAccepted: z.number(),
380380
/** Number of items that were deduplicated (already enqueued) */
381381
itemsDeduplicated: z.number(),
382+
/** Whether the batch was sealed and is ready for processing.
383+
* If false, the batch needs more items before processing can start.
384+
* Clients should check this field and retry with missing items if needed. */
385+
sealed: z.boolean(),
386+
/** Total items currently enqueued (only present when sealed=false to help with retries) */
387+
enqueuedCount: z.number().optional(),
388+
/** Expected total item count (only present when sealed=false to help with retries) */
389+
expectedCount: z.number().optional(),
382390
});
383391

384392
export type StreamBatchItemsResponse = z.infer<typeof StreamBatchItemsResponse>;

0 commit comments

Comments
 (0)