mirror of https://github.com/nodejs/node.git
stream: avoid pause with unpipe in buffered write
If a pipe is cleaned up (due to unpipe) during a write that returned false, the source stream can get stuck in a paused state. Fixes: https://github.com/nodejs/node/issues/2323 PR-URL: https://github.com/nodejs/node/pull/2325 Reviewed-By: Sakthipriyan Vairamani <thechargingvolcano@gmail.com>
This commit is contained in:
parent
8e213096df
commit
68990948fe
|
@ -497,6 +497,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||||
var ondrain = pipeOnDrain(src);
|
var ondrain = pipeOnDrain(src);
|
||||||
dest.on('drain', ondrain);
|
dest.on('drain', ondrain);
|
||||||
|
|
||||||
|
var cleanedUp = false;
|
||||||
function cleanup() {
|
function cleanup() {
|
||||||
debug('cleanup');
|
debug('cleanup');
|
||||||
// cleanup event handlers once the pipe is broken
|
// cleanup event handlers once the pipe is broken
|
||||||
|
@ -509,6 +510,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||||
src.removeListener('end', cleanup);
|
src.removeListener('end', cleanup);
|
||||||
src.removeListener('data', ondata);
|
src.removeListener('data', ondata);
|
||||||
|
|
||||||
|
cleanedUp = true;
|
||||||
|
|
||||||
// if the reader is waiting for a drain event from this
|
// if the reader is waiting for a drain event from this
|
||||||
// specific writer, then it would cause it to never start
|
// specific writer, then it would cause it to never start
|
||||||
// flowing again.
|
// flowing again.
|
||||||
|
@ -524,9 +527,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||||
debug('ondata');
|
debug('ondata');
|
||||||
var ret = dest.write(chunk);
|
var ret = dest.write(chunk);
|
||||||
if (false === ret) {
|
if (false === ret) {
|
||||||
debug('false write response, pause',
|
// If the user unpiped during `dest.write()`, it is possible
|
||||||
src._readableState.awaitDrain);
|
// to get stuck in a permanently paused state if that write
|
||||||
src._readableState.awaitDrain++;
|
// also returned false.
|
||||||
|
if (state.pipesCount === 1 &&
|
||||||
|
state.pipes[0] === dest &&
|
||||||
|
src.listenerCount('data') === 1 &&
|
||||||
|
!cleanedUp) {
|
||||||
|
debug('false write response, pause', src._readableState.awaitDrain);
|
||||||
|
src._readableState.awaitDrain++;
|
||||||
|
}
|
||||||
src.pause();
|
src.pause();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const stream = require('stream');
|
||||||
|
|
||||||
|
const reader = new stream.Readable();
|
||||||
|
const writer1 = new stream.Writable();
|
||||||
|
const writer2 = new stream.Writable();
|
||||||
|
|
||||||
|
// 560000 is chosen here because it is larger than the (default) highWaterMark
|
||||||
|
// and will cause `.write()` to return false
|
||||||
|
// See: https://github.com/nodejs/node/issues/2323
|
||||||
|
const buffer = new Buffer(560000);
|
||||||
|
|
||||||
|
reader._read = function(n) {};
|
||||||
|
|
||||||
|
writer1._write = common.mustCall(function(chunk, encoding, cb) {
|
||||||
|
this.emit('chunk-received');
|
||||||
|
cb();
|
||||||
|
}, 1);
|
||||||
|
writer1.once('chunk-received', function() {
|
||||||
|
reader.unpipe(writer1);
|
||||||
|
reader.pipe(writer2);
|
||||||
|
reader.push(buffer);
|
||||||
|
setImmediate(function() {
|
||||||
|
reader.push(buffer);
|
||||||
|
setImmediate(function() {
|
||||||
|
reader.push(buffer);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
writer2._write = common.mustCall(function(chunk, encoding, cb) {
|
||||||
|
cb();
|
||||||
|
}, 3);
|
||||||
|
|
||||||
|
reader.pipe(writer1);
|
||||||
|
reader.push(buffer);
|
Loading…
Reference in New Issue