Skip to content

Commit dc50637

Browse files
committed
stream: fix merge abort for pending sources
Wake the multi-source merge loop when its abort signal fires so a pending read rejects instead of waiting for another source to settle. When the primary error is the abort reason, do not await iterator cleanup because sources may already be stuck in pending next() calls. Fixes: #64012 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent 4d3198c commit dc50637

2 files changed

Lines changed: 57 additions & 3 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ const {
3939
validateObject,
4040
} = require('internal/validators');
4141

42+
const {
43+
markPromiseAsHandled,
44+
} = internalBinding('util');
45+
4246
const {
4347
from,
4448
fromSync,
@@ -434,6 +438,20 @@ function merge(...args) {
434438
const ready = [];
435439
let activeCount = normalized.length;
436440
let waitResolve = null;
441+
let onAbort;
442+
443+
if (signal) {
444+
onAbort = () => {
445+
if (waitResolve) {
446+
waitResolve();
447+
waitResolve = null;
448+
}
449+
};
450+
signal.addEventListener('abort', onAbort, {
451+
__proto__: null,
452+
once: true,
453+
});
454+
}
437455

438456
// Called when a source's .next() settles. Pushes the result into
439457
// the ready queue and wakes the consumer if it's waiting.
@@ -498,27 +516,43 @@ function merge(...args) {
498516
if (activeCount > 0) {
499517
await new Promise((resolve) => {
500518
waitResolve = resolve;
519+
if (signal?.aborted) {
520+
waitResolve = null;
521+
resolve();
522+
}
501523
});
502524
}
503525
}
504526
} catch (err) {
505527
primaryError = err;
506528
} finally {
529+
if (onAbort !== undefined) {
530+
signal.removeEventListener('abort', onAbort);
531+
}
507532
// Clean up: return all iterators. Cleanup errors are not
508533
// swallowed - a broken iterator.return() (e.g., failing to
509534
// release a resource) should be visible to the caller.
510-
await cleanupIterators(iterators, primaryError);
535+
await cleanupIterators(
536+
iterators,
537+
primaryError,
538+
signal?.aborted && primaryError === signal.reason,
539+
);
511540
}
512541
},
513542
};
514543
}
515544

516-
async function cleanupIterators(iterators, primaryError) {
545+
async function cleanupIterators(iterators, primaryError, skipAwaitCleanup) {
517546
let cleanupError;
518547
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
519548
if (iterator.return) {
520549
try {
521-
await iterator.return();
550+
const result = iterator.return();
551+
if (skipAwaitCleanup) {
552+
markPromiseAsHandled(result);
553+
} else {
554+
await result;
555+
}
522556
} catch (err) {
523557
// Keep the first cleanup error encountered.
524558
cleanupError ??= err;

test/parallel/test-stream-iter-consumers-merge.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,25 @@ async function testMergeSignalMidIteration() {
151151
await assert.rejects(() => iter.next(), { name: 'AbortError' });
152152
}
153153

154+
async function testMergeSignalDuringPendingMultiSourceRead() {
155+
const ac = new AbortController();
156+
157+
async function* pending() {
158+
await new Promise(() => {});
159+
yield [];
160+
}
161+
162+
const iter = merge(pending(), pending(), {
163+
__proto__: null,
164+
signal: ac.signal,
165+
})[Symbol.asyncIterator]();
166+
167+
const next = iter.next();
168+
ac.abort();
169+
170+
await assert.rejects(next, { name: 'AbortError' });
171+
}
172+
154173
// merge() accepts string sources (normalized via from())
155174
async function testMergeStringSources() {
156175
const batches = [];
@@ -286,6 +305,7 @@ Promise.all([
286305
testMergeSourceError(),
287306
testMergeConsumerBreak(),
288307
testMergeSignalMidIteration(),
308+
testMergeSignalDuringPendingMultiSourceRead(),
289309
testMergeStringSources(),
290310
testMergeObjectLikeSources(),
291311
testMergeCleanupErrorOnly(),

0 commit comments

Comments
 (0)