mirror of https://github.com/nodejs/node.git
stream: handle generator destruction from Duplex.from()
PR-URL: https://github.com/nodejs/node/pull/55096 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Jason Zhang <xzha4350@gmail.com>
This commit is contained in:
parent
6cd1805364
commit
55413004c8
|
@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) {
|
|||
}
|
||||
|
||||
if (typeof body === 'function') {
|
||||
const { value, write, final, destroy } = fromAsyncGen(body);
|
||||
let d;
|
||||
|
||||
const { value, write, final, destroy } = fromAsyncGen(body, () => {
|
||||
destroyer(d);
|
||||
});
|
||||
|
||||
// Body might be a constructor function instead of an async generator function.
|
||||
if (isDuplexNodeStream(value)) {
|
||||
return value;
|
||||
return d = value;
|
||||
}
|
||||
|
||||
if (isIterable(value)) {
|
||||
return from(Duplexify, value, {
|
||||
return d = from(Duplexify, value, {
|
||||
// TODO (ronag): highWaterMark?
|
||||
objectMode: true,
|
||||
write,
|
||||
|
@ -102,12 +106,16 @@ module.exports = function duplexify(body, name) {
|
|||
|
||||
const then = value?.then;
|
||||
if (typeof then === 'function') {
|
||||
let d;
|
||||
let finalized = false;
|
||||
|
||||
const promise = FunctionPrototypeCall(
|
||||
then,
|
||||
value,
|
||||
(val) => {
|
||||
// The function returned without (fully) consuming the generator.
|
||||
if (!finalized) {
|
||||
destroyer(d);
|
||||
}
|
||||
if (val != null) {
|
||||
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
|
||||
}
|
||||
|
@ -123,6 +131,7 @@ module.exports = function duplexify(body, name) {
|
|||
readable: false,
|
||||
write,
|
||||
final(cb) {
|
||||
finalized = true;
|
||||
final(async () => {
|
||||
try {
|
||||
await promise;
|
||||
|
@ -208,11 +217,12 @@ module.exports = function duplexify(body, name) {
|
|||
body);
|
||||
};
|
||||
|
||||
function fromAsyncGen(fn) {
|
||||
function fromAsyncGen(fn, destructor) {
|
||||
let { promise, resolve } = PromiseWithResolvers();
|
||||
const ac = new AbortController();
|
||||
const signal = ac.signal;
|
||||
const value = fn(async function*() {
|
||||
|
||||
const asyncGenerator = (async function* () {
|
||||
while (true) {
|
||||
const _promise = promise;
|
||||
promise = null;
|
||||
|
@ -222,9 +232,44 @@ function fromAsyncGen(fn) {
|
|||
if (signal.aborted)
|
||||
throw new AbortError(undefined, { cause: signal.reason });
|
||||
({ promise, resolve } = PromiseWithResolvers());
|
||||
// Next line will "break" the loop if the generator is returned/thrown.
|
||||
yield chunk;
|
||||
}
|
||||
}(), { signal });
|
||||
})();
|
||||
|
||||
const originalReturn = asyncGenerator.return;
|
||||
asyncGenerator.return = async function(value) {
|
||||
try {
|
||||
return await originalReturn.call(this, value);
|
||||
} finally {
|
||||
if (promise) {
|
||||
const _promise = promise;
|
||||
promise = null;
|
||||
const { cb } = await _promise;
|
||||
process.nextTick(cb);
|
||||
|
||||
process.nextTick(destructor);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const originalThrow = asyncGenerator.throw;
|
||||
asyncGenerator.throw = async function(err) {
|
||||
try {
|
||||
return await originalThrow.call(this, err);
|
||||
} finally {
|
||||
if (promise) {
|
||||
const _promise = promise;
|
||||
promise = null;
|
||||
const { cb } = await _promise;
|
||||
|
||||
// asyncGenerator.throw(undefined) should cause a callback error
|
||||
process.nextTick(cb, err ?? new AbortError());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const value = fn(asyncGenerator, { signal });
|
||||
|
||||
return {
|
||||
value,
|
||||
|
|
|
@ -5,6 +5,7 @@ const assert = require('assert');
|
|||
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
|
||||
const { ReadableStream, WritableStream } = require('stream/web');
|
||||
const { Blob } = require('buffer');
|
||||
const sleep = require('util').promisify(setTimeout);
|
||||
|
||||
{
|
||||
const d = Duplex.from({
|
||||
|
@ -401,3 +402,193 @@ function makeATestWritableStream(writeFunc) {
|
|||
assert.strictEqual(d.writable, false);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
const values = await Array.fromAsync(asyncGenerator);
|
||||
assert.deepStrictEqual(values, ['foo', 'bar', 'baz']);
|
||||
|
||||
await asyncGenerator.return();
|
||||
await asyncGenerator.return();
|
||||
await asyncGenerator.return();
|
||||
}),
|
||||
common.mustSucceed(() => {
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for await (const _ of asyncGenerator) break;
|
||||
}),
|
||||
common.mustSucceed(() => {
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
const a = await asyncGenerator.next();
|
||||
assert.strictEqual(a.done, false);
|
||||
assert.strictEqual(a.value.toString(), 'foo');
|
||||
const b = await asyncGenerator.return();
|
||||
assert.strictEqual(b.done, true);
|
||||
}),
|
||||
common.mustSucceed(() => {
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
// Note: the generator is not even started at this point
|
||||
await asyncGenerator.return();
|
||||
}),
|
||||
common.mustSucceed(() => {
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
// Same as before, with a delay
|
||||
await sleep(100);
|
||||
await asyncGenerator.return();
|
||||
}),
|
||||
common.mustSucceed(() => {
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {}),
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
await sleep(100);
|
||||
}),
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar', 'baz']);
|
||||
const d = Duplex.from(async function(asyncGenerator) {
|
||||
while (!(await asyncGenerator.next()).done) await sleep(100);
|
||||
});
|
||||
|
||||
setTimeout(() => d.destroy(), 150);
|
||||
|
||||
pipeline(
|
||||
r,
|
||||
d,
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Duplex.from(async function* () {
|
||||
for (const value of ['foo', 'bar', 'baz']) {
|
||||
await sleep(50);
|
||||
yield value;
|
||||
}
|
||||
});
|
||||
const d = Duplex.from(async function(asyncGenerator) {
|
||||
while (!(await asyncGenerator.next()).done);
|
||||
});
|
||||
|
||||
setTimeout(() => r.destroy(), 75);
|
||||
|
||||
pipeline(
|
||||
r,
|
||||
d,
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
assert.strictEqual(d.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
await asyncGenerator.throw(new Error('my error'));
|
||||
}),
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'my error');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
await asyncGenerator.next();
|
||||
await asyncGenerator.throw(new Error('my error'));
|
||||
}),
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.message, 'my error');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
const r = Readable.from(['foo', 'bar']);
|
||||
pipeline(
|
||||
r,
|
||||
Duplex.from(async function(asyncGenerator) {
|
||||
await asyncGenerator.next();
|
||||
await asyncGenerator.throw();
|
||||
}),
|
||||
common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ABORT_ERR');
|
||||
assert.strictEqual(r.destroyed, true);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue