Skip to content

Commit a5791a0

Browse files
committed
cancel read stream before releasing lock
1 parent 7e073b4 commit a5791a0

File tree

5 files changed

+698
-195
lines changed

5 files changed

+698
-195
lines changed

packages/trigger-sdk/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"build": "tshy && pnpm run update-version",
4646
"dev": "tshy --watch",
4747
"typecheck": "tsc --noEmit",
48+
"test": "vitest",
4849
"update-version": "tsx ../../scripts/updateVersion.ts",
4950
"check-exports": "attw --pack ."
5051
},
@@ -74,6 +75,7 @@
7475
"tshy": "^3.0.2",
7576
"tsx": "4.17.0",
7677
"typed-emitter": "^2.1.0",
78+
"vitest": "^3.2.4",
7779
"zod": "3.25.76"
7880
},
7981
"peerDependencies": {
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import { describe, it, expect } from "vitest";
2+
import { readableStreamToAsyncIterable } from "./shared.js";
3+
4+
describe("readableStreamToAsyncIterable", () => {
5+
it("yields all values from the stream", async () => {
6+
const values = [1, 2, 3, 4, 5];
7+
const stream = new ReadableStream<number>({
8+
start(controller) {
9+
for (const value of values) {
10+
controller.enqueue(value);
11+
}
12+
controller.close();
13+
},
14+
});
15+
16+
const result: number[] = [];
17+
for await (const value of readableStreamToAsyncIterable(stream)) {
18+
result.push(value);
19+
}
20+
21+
expect(result).toEqual(values);
22+
});
23+
24+
it("cancels the stream when consumer breaks early", async () => {
25+
let cancelCalled = false;
26+
27+
const stream = new ReadableStream<number>({
28+
start(controller) {
29+
controller.enqueue(1);
30+
controller.enqueue(2);
31+
controller.enqueue(3);
32+
controller.enqueue(4);
33+
controller.enqueue(5);
34+
controller.close();
35+
},
36+
cancel() {
37+
cancelCalled = true;
38+
},
39+
});
40+
41+
const result: number[] = [];
42+
for await (const value of readableStreamToAsyncIterable(stream)) {
43+
result.push(value);
44+
if (value === 2) {
45+
break; // Early termination
46+
}
47+
}
48+
49+
expect(result).toEqual([1, 2]);
50+
expect(cancelCalled).toBe(true);
51+
});
52+
53+
it("cancels the stream when consumer throws an error", async () => {
54+
let cancelCalled = false;
55+
56+
const stream = new ReadableStream<number>({
57+
start(controller) {
58+
controller.enqueue(1);
59+
controller.enqueue(2);
60+
controller.enqueue(3);
61+
controller.close();
62+
},
63+
cancel() {
64+
cancelCalled = true;
65+
},
66+
});
67+
68+
const result: number[] = [];
69+
const testError = new Error("Test error");
70+
71+
await expect(async () => {
72+
for await (const value of readableStreamToAsyncIterable(stream)) {
73+
result.push(value);
74+
if (value === 2) {
75+
throw testError;
76+
}
77+
}
78+
}).rejects.toThrow(testError);
79+
80+
expect(result).toEqual([1, 2]);
81+
expect(cancelCalled).toBe(true);
82+
});
83+
84+
it("handles stream that produces values asynchronously", async () => {
85+
const values = ["a", "b", "c"];
86+
let index = 0;
87+
88+
const stream = new ReadableStream<string>({
89+
async pull(controller) {
90+
if (index < values.length) {
91+
// Simulate async data production
92+
await new Promise((resolve) => setTimeout(resolve, 1));
93+
controller.enqueue(values[index]!);
94+
index++;
95+
} else {
96+
controller.close();
97+
}
98+
},
99+
});
100+
101+
const result: string[] = [];
102+
for await (const value of readableStreamToAsyncIterable(stream)) {
103+
result.push(value);
104+
}
105+
106+
expect(result).toEqual(values);
107+
});
108+
109+
it("cancels async stream when consumer breaks early", async () => {
110+
let cancelCalled = false;
111+
let producedCount = 0;
112+
113+
const stream = new ReadableStream<number>({
114+
async pull(controller) {
115+
// Simulate async data production
116+
await new Promise((resolve) => setTimeout(resolve, 1));
117+
producedCount++;
118+
controller.enqueue(producedCount);
119+
// Never close - infinite stream
120+
},
121+
cancel() {
122+
cancelCalled = true;
123+
},
124+
});
125+
126+
const result: number[] = [];
127+
for await (const value of readableStreamToAsyncIterable(stream)) {
128+
result.push(value);
129+
if (value >= 3) {
130+
break;
131+
}
132+
}
133+
134+
expect(result).toEqual([1, 2, 3]);
135+
expect(cancelCalled).toBe(true);
136+
});
137+
138+
it("does not throw when cancelling an already-closed stream", async () => {
139+
const stream = new ReadableStream<number>({
140+
start(controller) {
141+
controller.enqueue(1);
142+
controller.close();
143+
},
144+
});
145+
146+
// Normal iteration should complete without errors
147+
const result: number[] = [];
148+
for await (const value of readableStreamToAsyncIterable(stream)) {
149+
result.push(value);
150+
}
151+
152+
expect(result).toEqual([1]);
153+
});
154+
155+
it("does not throw when cancelling an errored stream", async () => {
156+
const streamError = new Error("Stream error");
157+
let errorIndex = 0;
158+
159+
const stream = new ReadableStream<number>({
160+
pull(controller) {
161+
errorIndex++;
162+
if (errorIndex <= 2) {
163+
controller.enqueue(errorIndex);
164+
} else {
165+
controller.error(streamError);
166+
}
167+
},
168+
});
169+
170+
const result: number[] = [];
171+
172+
// The stream error should propagate
173+
await expect(async () => {
174+
for await (const value of readableStreamToAsyncIterable(stream)) {
175+
result.push(value);
176+
}
177+
}).rejects.toThrow(streamError);
178+
179+
// We should have gotten the values before the error
180+
expect(result).toEqual([1, 2]);
181+
});
182+
183+
it("signals upstream producer to stop via cancel", async () => {
184+
const producedValues: number[] = [];
185+
let isProducing = true;
186+
187+
const stream = new ReadableStream<number>({
188+
async pull(controller) {
189+
if (!isProducing) return;
190+
191+
await new Promise((resolve) => setTimeout(resolve, 5));
192+
const value = producedValues.length + 1;
193+
producedValues.push(value);
194+
controller.enqueue(value);
195+
},
196+
cancel() {
197+
isProducing = false;
198+
},
199+
});
200+
201+
const consumed: number[] = [];
202+
for await (const value of readableStreamToAsyncIterable(stream)) {
203+
consumed.push(value);
204+
if (value >= 2) {
205+
break;
206+
}
207+
}
208+
209+
// Wait a bit to ensure no more values are produced
210+
await new Promise((resolve) => setTimeout(resolve, 20));
211+
212+
expect(consumed).toEqual([1, 2]);
213+
// Producer should have stopped after cancel
214+
expect(isProducing).toBe(false);
215+
// No more values should have been produced after breaking
216+
expect(producedValues.length).toBeLessThanOrEqual(3);
217+
});
218+
});
219+

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,10 +1557,11 @@ async function executeBatchTwoPhase(
15571557
);
15581558
} catch (error) {
15591559
// Wrap with context about which phase failed
1560-
throw new BatchTriggerError(
1561-
`Failed to create batch with ${items.length} items`,
1562-
{ cause: error, phase: "create", itemCount: items.length }
1563-
);
1560+
throw new BatchTriggerError(`Failed to create batch with ${items.length} items`, {
1561+
cause: error,
1562+
phase: "create",
1563+
itemCount: items.length,
1564+
});
15641565
}
15651566

15661567
// If the batch was cached (idempotent replay), skip streaming items
@@ -1674,9 +1675,14 @@ function isReadableStream<T>(value: unknown): value is ReadableStream<T> {
16741675
}
16751676

16761677
/**
1677-
* Convert a ReadableStream to an AsyncIterable
1678+
* Convert a ReadableStream to an AsyncIterable.
1679+
* Properly cancels the stream when the consumer terminates early.
1680+
*
1681+
* @internal Exported for testing purposes
16781682
*/
1679-
async function* readableStreamToAsyncIterable<T>(stream: ReadableStream<T>): AsyncIterable<T> {
1683+
export async function* readableStreamToAsyncIterable<T>(
1684+
stream: ReadableStream<T>
1685+
): AsyncIterable<T> {
16801686
const reader = stream.getReader();
16811687
try {
16821688
while (true) {
@@ -1685,6 +1691,11 @@ async function* readableStreamToAsyncIterable<T>(stream: ReadableStream<T>): Asy
16851691
yield value;
16861692
}
16871693
} finally {
1694+
try {
1695+
await reader.cancel();
1696+
} catch {
1697+
// Ignore errors - stream might already be errored or closed
1698+
}
16881699
reader.releaseLock();
16891700
}
16901701
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { defineConfig } from "vitest/config";
2+
3+
export default defineConfig({
4+
test: {
5+
include: ["test/**/*.test.ts", "src/v3/**/*.test.ts"],
6+
globals: true,
7+
},
8+
});
9+

0 commit comments

Comments
 (0)