mirror of https://github.com/nodejs/node.git
stream: fix 0 transform hwm backpressure
PR-URL: https://github.com/nodejs/node/pull/43685 Refs: https://github.com/nodejs/node/issues/42457 Refs: https://github.com/nodejs/node/pull/43648/files Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
c7ac42f009
commit
31d9edc849
|
@ -65,7 +65,7 @@
|
|||
|
||||
const {
|
||||
ObjectSetPrototypeOf,
|
||||
Symbol
|
||||
Symbol,
|
||||
} = primordials;
|
||||
|
||||
module.exports = Transform;
|
||||
|
@ -73,6 +73,7 @@ const {
|
|||
ERR_METHOD_NOT_IMPLEMENTED
|
||||
} = require('internal/errors').codes;
|
||||
const Duplex = require('internal/streams/duplex');
|
||||
const { getHighWaterMark } = require('internal/streams/state');
|
||||
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
|
||||
ObjectSetPrototypeOf(Transform, Duplex);
|
||||
|
||||
|
@ -82,6 +83,26 @@ function Transform(options) {
|
|||
if (!(this instanceof Transform))
|
||||
return new Transform(options);
|
||||
|
||||
// TODO (ronag): This should preferably always be
|
||||
// applied but would be semver-major. Or even better;
|
||||
// make Transform a Readable with the Writable interface.
|
||||
const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null;
|
||||
if (readableHighWaterMark === 0) {
|
||||
// A Duplex will buffer both on the writable and readable side while
|
||||
// a Transform just wants to buffer hwm number of elements. To avoid
|
||||
// buffering twice we disable buffering on the writable side.
|
||||
options = {
|
||||
...options,
|
||||
highWaterMark: null,
|
||||
readableHighWaterMark,
|
||||
// TODO (ronag): 0 is not optimal since we have
|
||||
// a "bug" where we check needDrain before calling _write and not after.
|
||||
// Refs: https://github.com/nodejs/node/pull/32887
|
||||
// Refs: https://github.com/nodejs/node/pull/35941
|
||||
writableHighWaterMark: options.writableHighWaterMark || 0
|
||||
};
|
||||
}
|
||||
|
||||
Duplex.call(this, options);
|
||||
|
||||
// We have implemented the _read method, and done the other things
|
||||
|
@ -164,9 +185,7 @@ Transform.prototype._write = function(chunk, encoding, callback) {
|
|||
if (
|
||||
wState.ended || // Backwards compat.
|
||||
length === rState.length || // Backwards compat.
|
||||
rState.length < rState.highWaterMark ||
|
||||
rState.highWaterMark === 0 ||
|
||||
rState.length === 0
|
||||
rState.length < rState.highWaterMark
|
||||
) {
|
||||
callback();
|
||||
} else {
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { PassThrough } = require('stream');
|
||||
|
||||
const pt = new PassThrough({ highWaterMark: 0 });
|
||||
pt.on('drain', common.mustCall());
|
||||
pt.write('hello');
|
||||
assert(!pt.write('hello1'));
|
||||
pt.read();
|
||||
pt.read();
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Transform } = require('stream');
|
||||
|
||||
const t = new Transform({
|
||||
objectMode: true, highWaterMark: 0,
|
||||
transform(chunk, enc, callback) {
|
||||
process.nextTick(() => callback(null, chunk, enc));
|
||||
}
|
||||
});
|
||||
|
||||
assert.strictEqual(t.write(1), false);
|
||||
t.on('drain', common.mustCall(() => {
|
||||
assert.strictEqual(t.write(2), false);
|
||||
t.end();
|
||||
}));
|
||||
|
||||
t.once('readable', common.mustCall(() => {
|
||||
assert.strictEqual(t.read(), 1);
|
||||
setImmediate(common.mustCall(() => {
|
||||
assert.strictEqual(t.read(), null);
|
||||
t.once('readable', common.mustCall(() => {
|
||||
assert.strictEqual(t.read(), 2);
|
||||
}));
|
||||
}));
|
||||
}));
|
|
@ -20,10 +20,6 @@ testTransform(666, 777, {
|
|||
writableHighWaterMark: 777,
|
||||
});
|
||||
|
||||
// test 0 overriding defaultHwm
|
||||
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
|
||||
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });
|
||||
|
||||
// Test highWaterMark overriding
|
||||
testTransform(555, 555, {
|
||||
highWaterMark: 555,
|
||||
|
@ -39,21 +35,6 @@ testTransform(555, 555, {
|
|||
writableHighWaterMark: 777,
|
||||
});
|
||||
|
||||
// Test highWaterMark = 0 overriding
|
||||
testTransform(0, 0, {
|
||||
highWaterMark: 0,
|
||||
readableHighWaterMark: 666,
|
||||
});
|
||||
testTransform(0, 0, {
|
||||
highWaterMark: 0,
|
||||
writableHighWaterMark: 777,
|
||||
});
|
||||
testTransform(0, 0, {
|
||||
highWaterMark: 0,
|
||||
readableHighWaterMark: 666,
|
||||
writableHighWaterMark: 777,
|
||||
});
|
||||
|
||||
// Test undefined, null
|
||||
[undefined, null].forEach((v) => {
|
||||
testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
|
||||
|
|
Loading…
Reference in New Issue