Skip to content

Commit 02eec34

Browse files
committed
better handling streaming ndjson retries
1 parent b77c910 commit 02eec34

File tree

2 files changed

+288
-19
lines changed

2 files changed

+288
-19
lines changed

packages/core/src/v3/apiClient/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ export class ApiClient {
442442
const retryResult = shouldRetryStreamBatchItems(response, attempt, retryOptions);
443443

444444
if (retryResult.retry) {
445+
// Cancel the request stream before retry to prevent tee() from buffering
446+
await forRequest.cancel();
445447
await sleep(retryResult.delay);
446448
// Use the backup stream for retry
447449
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
@@ -480,6 +482,8 @@ export class ApiClient {
480482
const delay = calculateNextRetryDelay(retryOptions, attempt);
481483

482484
if (delay) {
485+
// Cancel the request stream before retry to prevent tee() from buffering
486+
await forRequest.cancel();
483487
// Retry with the backup stream
484488
await sleep(delay);
485489
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
@@ -513,6 +517,8 @@ export class ApiClient {
513517
// Retry connection errors using the backup stream
514518
const delay = calculateNextRetryDelay(retryOptions, attempt);
515519
if (delay) {
520+
// Cancel the request stream before retry to prevent tee() from buffering
521+
await forRequest.cancel();
516522
await sleep(delay);
517523
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
518524
}

packages/core/src/v3/apiClient/streamBatchItems.test.ts

Lines changed: 282 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ describe("streamBatchItems unsealed handling", () => {
1515
/**
1616
* Helper to create a mock fetch that properly consumes the request body stream.
1717
* This is necessary because streamBatchItems sends a ReadableStream body.
18+
* Important: We must release the reader lock after consuming, just like real fetch does.
1819
*/
1920
function createMockFetch(
2021
responses: Array<{
@@ -32,9 +33,14 @@ describe("streamBatchItems unsealed handling", () => {
3233
if (init?.body && init.body instanceof ReadableStream) {
3334
const reader = init.body.getReader();
3435
// Drain the stream
35-
while (true) {
36-
const { done } = await reader.read();
37-
if (done) break;
36+
try {
37+
while (true) {
38+
const { done } = await reader.read();
39+
if (done) break;
40+
}
41+
} finally {
42+
// Release the lock so the stream can be cancelled later (like real fetch does)
43+
reader.releaseLock();
3844
}
3945
}
4046

@@ -64,11 +70,9 @@ describe("streamBatchItems unsealed handling", () => {
6470
const client = new ApiClient("http://localhost:3030", "tr_test_key");
6571

6672
const error = await client
67-
.streamBatchItems(
68-
"batch_test123",
69-
[{ index: 0, task: "test-task", payload: "{}" }],
70-
{ retry: { maxAttempts: 2, minDelay: 10, maxDelay: 50 } }
71-
)
73+
.streamBatchItems("batch_test123", [{ index: 0, task: "test-task", payload: "{}" }], {
74+
retry: { maxAttempts: 2, minTimeoutInMs: 10, maxTimeoutInMs: 50 },
75+
})
7276
.catch((e) => e);
7377

7478
expect(error).toBeInstanceOf(BatchNotSealedError);
@@ -108,7 +112,7 @@ describe("streamBatchItems unsealed handling", () => {
108112
const result = await client.streamBatchItems(
109113
"batch_test123",
110114
[{ index: 0, task: "test-task", payload: "{}" }],
111-
{ retry: { maxAttempts: 3, minDelay: 10, maxDelay: 50 } }
115+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
112116
);
113117

114118
expect(result.sealed).toBe(true);
@@ -155,11 +159,9 @@ describe("streamBatchItems unsealed handling", () => {
155159
const client = new ApiClient("http://localhost:3030", "tr_test_key");
156160

157161
const error = await client
158-
.streamBatchItems(
159-
"batch_abc123",
160-
[{ index: 0, task: "test-task", payload: "{}" }],
161-
{ retry: { maxAttempts: 1, minDelay: 10, maxDelay: 50 } }
162-
)
162+
.streamBatchItems("batch_abc123", [{ index: 0, task: "test-task", payload: "{}" }], {
163+
retry: { maxAttempts: 1, minTimeoutInMs: 10, maxTimeoutInMs: 50 },
164+
})
163165
.catch((e) => e);
164166

165167
expect(error).toBeInstanceOf(BatchNotSealedError);
@@ -185,11 +187,9 @@ describe("streamBatchItems unsealed handling", () => {
185187
const client = new ApiClient("http://localhost:3030", "tr_test_key");
186188

187189
const error = await client
188-
.streamBatchItems(
189-
"batch_test123",
190-
[{ index: 0, task: "test-task", payload: "{}" }],
191-
{ retry: { maxAttempts: 1, minDelay: 10, maxDelay: 50 } }
192-
)
190+
.streamBatchItems("batch_test123", [{ index: 0, task: "test-task", payload: "{}" }], {
191+
retry: { maxAttempts: 1, minTimeoutInMs: 10, maxTimeoutInMs: 50 },
192+
})
193193
.catch((e) => e);
194194

195195
expect(error).toBeInstanceOf(BatchNotSealedError);
@@ -198,3 +198,266 @@ describe("streamBatchItems unsealed handling", () => {
198198
expect((error as BatchNotSealedError).expectedCount).toBe(0);
199199
});
200200
});
201+
202+
describe("streamBatchItems stream cancellation on retry", () => {
203+
const originalFetch = globalThis.fetch;
204+
205+
afterEach(() => {
206+
globalThis.fetch = originalFetch;
207+
vi.restoreAllMocks();
208+
});
209+
210+
/**
211+
* Helper to consume a stream and release the lock (simulating fetch behavior).
212+
*/
213+
async function consumeAndRelease(stream: ReadableStream<any>) {
214+
const reader = stream.getReader();
215+
try {
216+
while (true) {
217+
const { done } = await reader.read();
218+
if (done) break;
219+
}
220+
} finally {
221+
reader.releaseLock();
222+
}
223+
}
224+
225+
it("cancels forRequest stream when retrying due to HTTP error", async () => {
226+
// Track cancel calls
227+
let cancelCallCount = 0;
228+
let callIndex = 0;
229+
230+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
231+
const currentAttempt = callIndex;
232+
callIndex++;
233+
234+
if (init?.body && init.body instanceof ReadableStream) {
235+
const originalCancel = init.body.cancel.bind(init.body);
236+
init.body.cancel = async (reason?: any) => {
237+
cancelCallCount++;
238+
return originalCancel(reason);
239+
};
240+
241+
// Consume stream and release lock (like real fetch does)
242+
await consumeAndRelease(init.body);
243+
}
244+
245+
// First attempt: return 500 error (retryable)
246+
if (currentAttempt === 0) {
247+
return {
248+
ok: false,
249+
status: 500,
250+
text: () => Promise.resolve("Server error"),
251+
headers: new Headers(),
252+
};
253+
}
254+
255+
// Second attempt: success
256+
return {
257+
ok: true,
258+
json: () =>
259+
Promise.resolve({
260+
id: "batch_test123",
261+
itemsAccepted: 10,
262+
itemsDeduplicated: 0,
263+
sealed: true,
264+
}),
265+
};
266+
});
267+
globalThis.fetch = mockFetch;
268+
269+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
270+
271+
const result = await client.streamBatchItems(
272+
"batch_test123",
273+
[{ index: 0, task: "test-task", payload: "{}" }],
274+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
275+
);
276+
277+
expect(result.sealed).toBe(true);
278+
expect(mockFetch).toHaveBeenCalledTimes(2);
279+
// forRequest should be cancelled once (before first retry)
280+
// forRetry should be cancelled once (after success)
281+
// Total: 2 cancel calls
282+
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
283+
});
284+
285+
it("cancels forRequest stream when retrying due to batch not sealed", async () => {
286+
let cancelCallCount = 0;
287+
let callIndex = 0;
288+
289+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
290+
const currentAttempt = callIndex;
291+
callIndex++;
292+
293+
if (init?.body && init.body instanceof ReadableStream) {
294+
const originalCancel = init.body.cancel.bind(init.body);
295+
init.body.cancel = async (reason?: any) => {
296+
cancelCallCount++;
297+
return originalCancel(reason);
298+
};
299+
300+
await consumeAndRelease(init.body);
301+
}
302+
303+
// First attempt: not sealed (triggers retry)
304+
if (currentAttempt === 0) {
305+
return {
306+
ok: true,
307+
json: () =>
308+
Promise.resolve({
309+
id: "batch_test123",
310+
itemsAccepted: 5,
311+
itemsDeduplicated: 0,
312+
sealed: false,
313+
enqueuedCount: 5,
314+
expectedCount: 10,
315+
}),
316+
};
317+
}
318+
319+
// Second attempt: sealed
320+
return {
321+
ok: true,
322+
json: () =>
323+
Promise.resolve({
324+
id: "batch_test123",
325+
itemsAccepted: 5,
326+
itemsDeduplicated: 5,
327+
sealed: true,
328+
}),
329+
};
330+
});
331+
globalThis.fetch = mockFetch;
332+
333+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
334+
335+
const result = await client.streamBatchItems(
336+
"batch_test123",
337+
[{ index: 0, task: "test-task", payload: "{}" }],
338+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
339+
);
340+
341+
expect(result.sealed).toBe(true);
342+
expect(mockFetch).toHaveBeenCalledTimes(2);
343+
// forRequest cancelled before retry + forRetry cancelled after success
344+
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
345+
});
346+
347+
it("cancels forRequest stream when retrying due to connection error", async () => {
348+
let cancelCallCount = 0;
349+
let callIndex = 0;
350+
351+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
352+
const currentAttempt = callIndex;
353+
callIndex++;
354+
355+
if (init?.body && init.body instanceof ReadableStream) {
356+
const originalCancel = init.body.cancel.bind(init.body);
357+
init.body.cancel = async (reason?: any) => {
358+
cancelCallCount++;
359+
return originalCancel(reason);
360+
};
361+
362+
// Always consume and release - even for error case
363+
// This simulates what happens when fetch partially reads before failing
364+
// The important thing is the stream lock is released so cancel() can work
365+
await consumeAndRelease(init.body);
366+
}
367+
368+
// First attempt: connection error (simulate by throwing after consuming)
369+
if (currentAttempt === 0) {
370+
throw new TypeError("Failed to fetch");
371+
}
372+
373+
// Second attempt: success
374+
return {
375+
ok: true,
376+
json: () =>
377+
Promise.resolve({
378+
id: "batch_test123",
379+
itemsAccepted: 10,
380+
itemsDeduplicated: 0,
381+
sealed: true,
382+
}),
383+
};
384+
});
385+
globalThis.fetch = mockFetch;
386+
387+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
388+
389+
const result = await client.streamBatchItems(
390+
"batch_test123",
391+
[{ index: 0, task: "test-task", payload: "{}" }],
392+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
393+
);
394+
395+
expect(result.sealed).toBe(true);
396+
expect(mockFetch).toHaveBeenCalledTimes(2);
397+
// forRequest should be cancelled before retry
398+
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
399+
});
400+
401+
it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => {
402+
let cancelCallCount = 0;
403+
let callIndex = 0;
404+
405+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
406+
const currentAttempt = callIndex;
407+
callIndex++;
408+
409+
if (init?.body && init.body instanceof ReadableStream) {
410+
const originalCancel = init.body.cancel.bind(init.body);
411+
init.body.cancel = async (reason?: any) => {
412+
cancelCallCount++;
413+
return originalCancel(reason);
414+
};
415+
416+
await consumeAndRelease(init.body);
417+
}
418+
419+
// First two attempts: not sealed
420+
if (currentAttempt < 2) {
421+
return {
422+
ok: true,
423+
json: () =>
424+
Promise.resolve({
425+
id: "batch_test123",
426+
itemsAccepted: 5,
427+
itemsDeduplicated: 0,
428+
sealed: false,
429+
enqueuedCount: 5,
430+
expectedCount: 10,
431+
}),
432+
};
433+
}
434+
435+
// Third attempt: sealed
436+
return {
437+
ok: true,
438+
json: () =>
439+
Promise.resolve({
440+
id: "batch_test123",
441+
itemsAccepted: 5,
442+
itemsDeduplicated: 5,
443+
sealed: true,
444+
}),
445+
};
446+
});
447+
globalThis.fetch = mockFetch;
448+
449+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
450+
451+
const result = await client.streamBatchItems(
452+
"batch_test123",
453+
[{ index: 0, task: "test-task", payload: "{}" }],
454+
{ retry: { maxAttempts: 5, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
455+
);
456+
457+
expect(result.sealed).toBe(true);
458+
expect(mockFetch).toHaveBeenCalledTimes(3);
459+
// Each retry should cancel forRequest, plus final forRetry cancel
460+
// With 2 retries: 2 forRequest cancels + 1 forRetry cancel = 3 total
461+
expect(cancelCallCount).toBeGreaterThanOrEqual(2);
462+
});
463+
});

0 commit comments

Comments
 (0)