Skip to content

Commit 822ef3a

Browse files
authored
stream: reject pull() reads on abort
Make pull() race pending source reads against the provided AbortSignal so aborting can reject a pending next() even when the source is waiting before yielding data. Fixes: #63497 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63498 Fixes: #63497 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 4826d74 commit 822ef3a

2 files changed

Lines changed: 125 additions & 6 deletions

File tree

lib/internal/streams/iter/pull.js

Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@ const {
1414
ArrayPrototypeSlice,
1515
PromisePrototypeThen,
1616
PromiseResolve,
17+
PromiseWithResolvers,
18+
SafePromisePrototypeFinally,
19+
SafePromiseRace,
1720
SymbolAsyncIterator,
1821
SymbolIterator,
1922
TypedArrayPrototypeGetByteLength,
2023
Uint8Array,
2124
} = primordials;
2225

26+
const {
27+
markPromiseAsHandled,
28+
} = internalBinding('util');
29+
2330
const {
2431
codes: {
2532
ERR_INVALID_ARG_TYPE,
@@ -685,6 +692,81 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options)
685692
options.signal?.throwIfAborted();
686693
}
687694

695+
function getOnAbort(reject, signal) {
696+
return () => reject(signal.reason);
697+
}
698+
699+
/**
700+
* Read one item from an async iterator, rejecting early if the signal aborts.
701+
* @param {AsyncIterator} iterator - The iterator to read from.
702+
* @param {AbortSignal|undefined} signal - Optional abort signal.
703+
* @returns {Promise<IteratorResult<Uint8Array[]>>|IteratorResult<Uint8Array[]>}
704+
*/
705+
function abortableNext(iterator, signal) {
706+
if (signal === undefined) {
707+
return iterator.next();
708+
}
709+
710+
signal.throwIfAborted();
711+
712+
const next = iterator.next();
713+
const { promise, reject } = PromiseWithResolvers();
714+
const onAbort = getOnAbort(reject, signal);
715+
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
716+
if (signal.aborted) {
717+
onAbort();
718+
}
719+
720+
return SafePromisePrototypeFinally(SafePromiseRace([next, promise]), () => {
721+
signal.removeEventListener('abort', onAbort);
722+
});
723+
}
724+
725+
/**
726+
* Wrap an async source so each pending read is abort-aware.
727+
* @param {AsyncIterable<Uint8Array[]>} source - The source to read from.
728+
* @param {AbortSignal|undefined} signal - Optional abort signal.
729+
* @returns {AsyncIterable<Uint8Array[]>}
730+
*/
731+
function yieldAbortable(source, signal) {
732+
if (signal === undefined) {
733+
return source;
734+
}
735+
736+
return {
737+
__proto__: null,
738+
async *[SymbolAsyncIterator]() {
739+
const iterator = source[SymbolAsyncIterator]();
740+
let completed = false;
741+
let aborted = false;
742+
743+
try {
744+
while (true) {
745+
const { done, value } = await abortableNext(iterator, signal);
746+
if (done) {
747+
completed = true;
748+
return;
749+
}
750+
signal.throwIfAborted();
751+
yield value;
752+
}
753+
} catch (error) {
754+
aborted = signal.aborted;
755+
throw error;
756+
} finally {
757+
if (!completed && typeof iterator.return === 'function') {
758+
const result = iterator.return();
759+
if (aborted) {
760+
markPromiseAsHandled(result);
761+
} else {
762+
await result;
763+
}
764+
}
765+
}
766+
},
767+
};
768+
}
769+
688770
/**
689771
* Create an async pipeline from source through transforms.
690772
* @yields {Uint8Array[]}
@@ -693,17 +775,14 @@ async function* createAsyncPipeline(source, transforms, signal) {
693775
// Check for abort
694776
signal?.throwIfAborted();
695777

696-
const normalized = source;
697-
698778
// Fast path: no transforms, just yield normalized source directly
699779
if (transforms.length === 0) {
700-
for await (const batch of normalized) {
701-
signal?.throwIfAborted();
702-
yield batch;
703-
}
780+
yield* yieldAbortable(source, signal);
704781
return;
705782
}
706783

784+
const normalized = yieldAbortable(source, signal);
785+
707786
// Create internal controller for transform cancellation.
708787
// Note: if signal was already aborted, we threw above - no need to check here.
709788
const controller = new AbortController();

test/parallel/test-stream-iter-pull-async.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,44 @@ async function testPullSignalAbortMidIteration() {
153153
await assert.rejects(() => iter.next(), { name: 'AbortError' });
154154
}
155155

156+
async function testPullSignalAbortWhileSourceNextPending() {
157+
const source = {
158+
[Symbol.asyncIterator]() {
159+
return {
160+
async next() {
161+
await new Promise(() => {});
162+
},
163+
};
164+
},
165+
};
166+
const ac = new AbortController();
167+
const iter = pull(source, { signal: ac.signal })[Symbol.asyncIterator]();
168+
const next = iter.next();
169+
ac.abort();
170+
await assert.rejects(next, { name: 'AbortError' });
171+
}
172+
173+
async function testPullSignalAbortWithTransformWhileSourceNextPending() {
174+
const source = {
175+
[Symbol.asyncIterator]() {
176+
return {
177+
async next() {
178+
await new Promise(() => {});
179+
},
180+
};
181+
},
182+
};
183+
const ac = new AbortController();
184+
const iter = pull(
185+
source,
186+
(chunks) => chunks,
187+
{ signal: ac.signal },
188+
)[Symbol.asyncIterator]();
189+
const next = iter.next();
190+
ac.abort();
191+
await assert.rejects(next, { name: 'AbortError' });
192+
}
193+
156194
// Pull consumer break (return()) cleans up transform signal
157195
async function testPullConsumerBreakCleanup() {
158196
let signalAborted = false;
@@ -361,6 +399,8 @@ async function testTransformOptionsNotShared() {
361399
testPullSourceError(),
362400
testTapCallbackError(),
363401
testPullSignalAbortMidIteration(),
402+
testPullSignalAbortWhileSourceNextPending(),
403+
testPullSignalAbortWithTransformWhileSourceNextPending(),
364404
testPullConsumerBreakCleanup(),
365405
testPullTransformReturnsPromise(),
366406
testPullTransformYieldsStrings(),

0 commit comments

Comments
 (0)