mirror of https://github.com/nodejs/node.git
stream: allow readable to end early without error
PR-URL: https://github.com/nodejs/node/pull/40881 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
This commit is contained in:
parent
414bc7bec1
commit
1fa507f098
|
@ -33,16 +33,13 @@ const {
|
|||
isIterable,
|
||||
isReadableNodeStream,
|
||||
isNodeStream,
|
||||
isReadableFinished,
|
||||
} = require('internal/streams/utils');
|
||||
const { AbortController } = require('internal/abort_controller');
|
||||
|
||||
let PassThrough;
|
||||
let Readable;
|
||||
|
||||
function destroyer(stream, reading, writing, callback) {
|
||||
callback = once(callback);
|
||||
|
||||
function destroyer(stream, reading, writing) {
|
||||
let finished = false;
|
||||
stream.on('close', () => {
|
||||
finished = true;
|
||||
|
@ -50,35 +47,12 @@ function destroyer(stream, reading, writing, callback) {
|
|||
|
||||
eos(stream, { readable: reading, writable: writing }, (err) => {
|
||||
finished = !err;
|
||||
|
||||
const rState = stream._readableState;
|
||||
if (
|
||||
err &&
|
||||
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
|
||||
reading &&
|
||||
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
|
||||
) {
|
||||
// Some readable streams will emit 'close' before 'end'. However, since
|
||||
// this is on the readable side 'end' should still be emitted if the
|
||||
// stream has been ended and no error emitted. This should be allowed in
|
||||
// favor of backwards compatibility. Since the stream is piped to a
|
||||
// destination this should not result in any observable difference.
|
||||
// We don't need to check if this is a writable premature close since
|
||||
// eos will only fail with premature close on the reading side for
|
||||
// duplex streams.
|
||||
stream
|
||||
.once('end', callback)
|
||||
.once('error', callback);
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
});
|
||||
|
||||
return (err) => {
|
||||
if (finished) return;
|
||||
finished = true;
|
||||
destroyImpl.destroyer(stream, err);
|
||||
callback(err || new ERR_STREAM_DESTROYED('pipe'));
|
||||
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -109,7 +83,7 @@ async function* fromReadable(val) {
|
|||
yield* Readable.prototype[SymbolAsyncIterator].call(val);
|
||||
}
|
||||
|
||||
async function pump(iterable, writable, finish, opts) {
|
||||
async function pump(iterable, writable, finish, { end }) {
|
||||
let error;
|
||||
let onresolve = null;
|
||||
|
||||
|
@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) {
|
|||
}
|
||||
}
|
||||
|
||||
if (opts?.end !== false) {
|
||||
if (end) {
|
||||
writable.end();
|
||||
}
|
||||
|
||||
|
@ -220,7 +194,7 @@ function pipelineImpl(streams, callback, opts) {
|
|||
ac.abort();
|
||||
|
||||
if (final) {
|
||||
callback(error, value);
|
||||
process.nextTick(callback, error, value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) {
|
|||
|
||||
if (isNodeStream(stream)) {
|
||||
if (end) {
|
||||
finishCount++;
|
||||
destroys.push(destroyer(stream, reading, writing, (err) => {
|
||||
if (!err && !reading && isReadableFinished(stream, false)) {
|
||||
stream.read(0);
|
||||
destroyer(stream, true, writing, finish);
|
||||
} else {
|
||||
finish(err);
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
stream.on('error', finish);
|
||||
destroys.push(destroyer(stream, reading, writing));
|
||||
}
|
||||
|
||||
// Catch stream errors that occur after pipe/pump has completed.
|
||||
stream.on('error', (err) => {
|
||||
if (
|
||||
err &&
|
||||
err.name !== 'AbortError' &&
|
||||
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
|
||||
) {
|
||||
finish(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (i === 0) {
|
||||
|
@ -286,6 +261,7 @@ function pipelineImpl(streams, callback, opts) {
|
|||
// second use.
|
||||
const then = ret?.then;
|
||||
if (typeof then === 'function') {
|
||||
finishCount++;
|
||||
then.call(ret,
|
||||
(val) => {
|
||||
value = val;
|
||||
|
@ -293,8 +269,10 @@ function pipelineImpl(streams, callback, opts) {
|
|||
if (end) {
|
||||
pt.end();
|
||||
}
|
||||
process.nextTick(finish);
|
||||
}, (err) => {
|
||||
pt.destroy(err);
|
||||
process.nextTick(finish, err);
|
||||
},
|
||||
);
|
||||
} else if (isIterable(ret, true)) {
|
||||
|
@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) {
|
|||
|
||||
ret = pt;
|
||||
|
||||
finishCount++;
|
||||
destroys.push(destroyer(ret, false, true, finish));
|
||||
destroys.push(destroyer(ret, false, true));
|
||||
}
|
||||
} else if (isNodeStream(stream)) {
|
||||
if (isReadableNodeStream(ret)) {
|
||||
ret.pipe(stream, { end });
|
||||
|
||||
// Compat. Before node v10.12.0 stdio used to throw an error so
|
||||
// pipe() did/does not end() stdio destinations.
|
||||
// Now they allow it but "secretly" don't close the underlying fd.
|
||||
if (stream === process.stdout || stream === process.stderr) {
|
||||
ret.on('end', () => stream.end());
|
||||
}
|
||||
} else {
|
||||
ret = makeAsyncIterable(ret);
|
||||
|
||||
finishCount += 2;
|
||||
pipe(ret, stream, finish, { end });
|
||||
} else if (isIterable(ret)) {
|
||||
finishCount++;
|
||||
pump(ret, stream, finish, { end });
|
||||
} else {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
|
||||
}
|
||||
ret = stream;
|
||||
} else {
|
||||
|
@ -339,4 +311,41 @@ function pipelineImpl(streams, callback, opts) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
function pipe(src, dst, finish, { end }) {
|
||||
src.pipe(dst, { end });
|
||||
|
||||
if (end) {
|
||||
// Compat. Before node v10.12.0 stdio used to throw an error so
|
||||
// pipe() did/does not end() stdio destinations.
|
||||
// Now they allow it but "secretly" don't close the underlying fd.
|
||||
src.once('end', () => dst.end());
|
||||
} else {
|
||||
finish();
|
||||
}
|
||||
|
||||
eos(src, { readable: true, writable: false }, (err) => {
|
||||
const rState = src._readableState;
|
||||
if (
|
||||
err &&
|
||||
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
|
||||
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
|
||||
) {
|
||||
// Some readable streams will emit 'close' before 'end'. However, since
|
||||
// this is on the readable side 'end' should still be emitted if the
|
||||
// stream has been ended and no error emitted. This should be allowed in
|
||||
// favor of backwards compatibility. Since the stream is piped to a
|
||||
// destination this should not result in any observable difference.
|
||||
// We don't need to check if this is a writable premature close since
|
||||
// eos will only fail with premature close on the reading side for
|
||||
// duplex streams.
|
||||
src
|
||||
.once('end', finish)
|
||||
.once('error', finish);
|
||||
} else {
|
||||
finish(err);
|
||||
}
|
||||
});
|
||||
eos(dst, { readable: false, writable: true }, finish);
|
||||
}
|
||||
|
||||
module.exports = { pipelineImpl, pipeline };
|
||||
|
|
|
@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
|
|||
const src = new PassThrough();
|
||||
const dst = new PassThrough();
|
||||
pipeline(src, dst, common.mustSucceed(() => {
|
||||
assert.strictEqual(dst.destroyed, true);
|
||||
assert.strictEqual(dst.destroyed, false);
|
||||
}));
|
||||
src.end();
|
||||
}
|
||||
|
@ -1462,7 +1462,7 @@ const tsp = require('timers/promises');
|
|||
|
||||
await pipelinePromise(read, duplex);
|
||||
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
assert.strictEqual(duplex.destroyed, false);
|
||||
}
|
||||
|
||||
run().then(common.mustCall());
|
||||
|
@ -1488,3 +1488,27 @@ const tsp = require('timers/promises');
|
|||
|
||||
run().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const s = new PassThrough({ objectMode: true });
|
||||
pipeline(async function*() {
|
||||
await Promise.resolve();
|
||||
yield 'hello';
|
||||
yield 'world';
|
||||
yield 'world';
|
||||
}, s, async function(source) {
|
||||
let ret = '';
|
||||
let n = 0;
|
||||
for await (const chunk of source) {
|
||||
if (n++ > 1) {
|
||||
break;
|
||||
}
|
||||
ret += chunk;
|
||||
}
|
||||
return ret;
|
||||
}, common.mustCall((err, val) => {
|
||||
assert.strictEqual(err, undefined);
|
||||
assert.strictEqual(val, 'helloworld');
|
||||
assert.strictEqual(s.destroyed, true);
|
||||
}));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue