stream: enable usage of webstreams on compose()

Refs: https://github.com/nodejs/node/issues/39316
PR-URL: https://github.com/nodejs/node/pull/46675
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Debadree Chatterjee 2023-02-27 14:20:39 +05:30 committed by GitHub
parent fadcee71e0
commit 94e1f8f8e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 617 additions and 50 deletions

View File

@ -2798,11 +2798,16 @@ const server = http.createServer((req, res) => {
<!-- YAML
added: v16.9.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46675
description: Added support for webstreams.
-->
> Stability: 1 - `stream.compose` is experimental.
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* Returns: {stream.Duplex}
Combines two or more streams into a `Duplex` stream that writes to the

View File

@ -7,6 +7,10 @@ const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream,
} = require('internal/streams/utils');
const {
AbortError,
@ -15,6 +19,7 @@ const {
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const eos = require('internal/streams/end-of-stream');
module.exports = function compose(...streams) {
if (streams.length === 0) {
@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
}
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
if (
n < streams.length - 1 &&
!(
isReadable(streams[n]) ||
isReadableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable',
);
}
if (n > 0 && !isWritable(streams[n])) {
if (
n > 0 &&
!(
isWritable(streams[n]) ||
isWritableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
@ -79,8 +98,16 @@ module.exports = function compose(...streams) {
const head = streams[0];
const tail = pipeline(streams, onfinished);
const writable = !!isWritable(head);
const readable = !!isReadable(tail);
const writable = !!(
isWritable(head) ||
isWritableStream(head) ||
isTransformStream(head)
);
const readable = !!(
isReadable(tail) ||
isReadableStream(tail) ||
isTransformStream(tail)
);
// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
@ -94,28 +121,55 @@ module.exports = function compose(...streams) {
});
if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};
if (isNodeStream(head)) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};
d._final = function(callback) {
head.end();
onfinish = callback;
};
d._final = function(callback) {
head.end();
onfinish = callback;
};
head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();
tail.on('finish', function() {
d._write = async function(chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
callback();
} catch (err) {
callback(err);
}
};
d._final = async function(callback) {
try {
await writer.ready;
writer.close().catch(() => {});
onfinish = callback;
} catch (err) {
callback(err);
}
};
}
const toRead = isTransformStream(tail) ? tail.readable : tail;
eos(toRead, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
@ -125,32 +179,54 @@ module.exports = function compose(...streams) {
}
if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});
tail.on('end', function() {
d.push(null);
});
d._read = function() {
while (true) {
const buf = tail.read();
if (buf === null) {
onreadable = d._read;
return;
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});
if (!d.push(buf)) {
return;
tail.on('end', function() {
d.push(null);
});
d._read = function() {
while (true) {
const buf = tail.read();
if (buf === null) {
onreadable = d._read;
return;
}
if (!d.push(buf)) {
return;
}
}
}
};
};
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
d._read = async function() {
while (true) {
try {
const { value, done } = await reader.read();
if (!d.push(value)) {
return;
}
if (done) {
d.push(null);
return;
}
} catch {
return;
}
}
};
}
}
d._destroy = function(err, callback) {
@ -166,7 +242,9 @@ module.exports = function compose(...streams) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
if (isNodeStream(tail)) {
destroyer(tail, err);
}
}
};

View File

@ -286,7 +286,7 @@ function pipelineImpl(streams, callback, opts) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
@ -385,6 +385,7 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
finishCount++;
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(

View File

@ -0,0 +1,483 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const {
Transform,
Readable,
Writable,
compose
} = require('stream');
const {
TransformStream,
ReadableStream,
WritableStream,
} = require('stream/web');
{
let res = '';
const d = compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk?.toString()?.replace(' ', '_'));
})
}),
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk?.toString()?.toUpperCase());
})
})
);
d.on('data', common.mustCall((chunk) => {
res += chunk;
}));
d.on('end', common.mustCall(() => {
assert.strictEqual(res, 'HELLO_WORLD');
}));
d.end('hello world');
}
{
let res = '';
compose(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk + chunk);
})
}),
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASDASD');
}));
}
{
let res = '';
compose(
async function*(source) {
for await (const chunk of source) {
yield chunk + chunk;
}
},
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
}),
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASDASD');
}));
}
{
let res = '';
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
}),
}),
async function*(source) {
for await (const chunk of source) {
yield chunk + chunk;
}
},
new Transform({
transform: common.mustCall((chunk, enc, clb) => {
clb(null, chunk?.toString()?.replaceAll('A', 'B'));
})
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'BSDBSD');
}));
}
{
let res = '';
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
}),
}),
async function*(source) {
for await (const chunk of source) {
yield chunk + chunk;
}
},
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk?.toString()?.replaceAll('A', 'B'));
})
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'BSDBSD');
}));
}
{
let res = '';
compose(
new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
}),
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk?.toString()?.toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
}),
new Transform({
transform: common.mustCall((chunk, enc, clb) => {
clb(null, chunk?.toString()?.toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
Readable.from(['asd']),
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk?.toString()?.toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Writable({
write: common.mustCall((chunk, encoding, callback) => {
res += chunk;
callback(null);
})
})
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new WritableStream({
write: common.mustCall((chunk) => {
res += chunk;
})
})
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new WritableStream({
write: common.mustCall((chunk) => {
res += chunk;
})
})
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
let res = '';
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
async function(source) {
for await (const chunk of source) {
res += chunk;
}
}
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.error(new Error('asd'));
})
}),
new TransformStream({
transform: common.mustNotCall()
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk);
})
}),
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.error(new Error('asd'));
})
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk);
})
}),
async function*(source) { // eslint-disable-line require-yield
let tmp = '';
for await (const chunk of source) {
tmp += chunk;
throw new Error('asd');
}
return tmp;
},
new TransformStream({
transform: common.mustNotCall()
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.error(new Error('asd'));
})
}),
new Transform({
transform: common.mustNotCall()
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new Transform({
transform: common.mustCall((chunk, enc, clb) => {
clb(new Error('asd'));
})
}),
new TransformStream({
transform: common.mustNotCall()
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new ReadableStream({
start(controller) {
controller.enqueue(new Error('asd'));
}
}),
new TransformStream({
transform: common.mustNotCall()
})
)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
}),
new WritableStream({
write: common.mustCall((chunk, controller) => {
controller.error(new Error('asd'));
})
})
)
.on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
})
.end('xyz');
}
{
compose(
new TransformStream({
transform: common.mustCall((chunk, controller) => {
controller.enqueue(chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
async function(source) {
throw new Error('asd');
}
).on('error', (err) => {
assert.strictEqual(err?.message, 'asd');
}).end('xyz');
}