mirror of https://github.com/nodejs/node.git
stream: avoid pause with unpipe in buffered write
If a pipe is cleaned up (due to unpipe) during a write that returned false, the source stream can get stuck in a paused state. Fixes: https://github.com/nodejs/node/issues/2323 PR-URL: https://github.com/nodejs/node/pull/2325 Reviewed-By: Sakthipriyan Vairamani <thechargingvolcano@gmail.com>
This commit is contained in:
parent
8e213096df
commit
68990948fe
|
@ -497,6 +497,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
|||
var ondrain = pipeOnDrain(src);
|
||||
dest.on('drain', ondrain);
|
||||
|
||||
var cleanedUp = false;
|
||||
function cleanup() {
|
||||
debug('cleanup');
|
||||
// cleanup event handlers once the pipe is broken
|
||||
|
@ -509,6 +510,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
|||
src.removeListener('end', cleanup);
|
||||
src.removeListener('data', ondata);
|
||||
|
||||
cleanedUp = true;
|
||||
|
||||
// if the reader is waiting for a drain event from this
|
||||
// specific writer, then it would cause it to never start
|
||||
// flowing again.
|
||||
|
@ -524,9 +527,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
|||
debug('ondata');
|
||||
var ret = dest.write(chunk);
|
||||
if (false === ret) {
|
||||
debug('false write response, pause',
|
||||
src._readableState.awaitDrain);
|
||||
src._readableState.awaitDrain++;
|
||||
// If the user unpiped during `dest.write()`, it is possible
|
||||
// to get stuck in a permanently paused state if that write
|
||||
// also returned false.
|
||||
if (state.pipesCount === 1 &&
|
||||
state.pipes[0] === dest &&
|
||||
src.listenerCount('data') === 1 &&
|
||||
!cleanedUp) {
|
||||
debug('false write response, pause', src._readableState.awaitDrain);
|
||||
src._readableState.awaitDrain++;
|
||||
}
|
||||
src.pause();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const stream = require('stream');
|
||||
|
||||
const reader = new stream.Readable();
|
||||
const writer1 = new stream.Writable();
|
||||
const writer2 = new stream.Writable();
|
||||
|
||||
// 560000 is chosen here because it is larger than the (default) highWaterMark
|
||||
// and will cause `.write()` to return false
|
||||
// See: https://github.com/nodejs/node/issues/2323
|
||||
const buffer = new Buffer(560000);
|
||||
|
||||
reader._read = function(n) {};
|
||||
|
||||
writer1._write = common.mustCall(function(chunk, encoding, cb) {
|
||||
this.emit('chunk-received');
|
||||
cb();
|
||||
}, 1);
|
||||
writer1.once('chunk-received', function() {
|
||||
reader.unpipe(writer1);
|
||||
reader.pipe(writer2);
|
||||
reader.push(buffer);
|
||||
setImmediate(function() {
|
||||
reader.push(buffer);
|
||||
setImmediate(function() {
|
||||
reader.push(buffer);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
writer2._write = common.mustCall(function(chunk, encoding, cb) {
|
||||
cb();
|
||||
}, 3);
|
||||
|
||||
reader.pipe(writer1);
|
||||
reader.push(buffer);
|
Loading…
Reference in New Issue