mirror of https://github.com/nodejs/node.git
Fix: Multiple pipes to the same stream were broken
When creating multiple .pipe()s to the same destination stream, the first source to end would close the destination, breaking all remaining pipes. This patch fixes the problem by keeping track of all open pipes, so that we only call end on destinations that have no more sources piping to them. closes #929
This commit is contained in:
parent
8417870f51
commit
6c5b31bd80
|
@ -28,9 +28,13 @@ function Stream() {
|
|||
util.inherits(Stream, events.EventEmitter);
|
||||
exports.Stream = Stream;
|
||||
|
||||
var pipes = [];
|
||||
|
||||
Stream.prototype.pipe = function(dest, options) {
|
||||
var source = this;
|
||||
|
||||
pipes.push(dest);
|
||||
|
||||
function ondata(chunk) {
|
||||
if (dest.writable) {
|
||||
if (false === dest.write(chunk)) source.pause();
|
||||
|
@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {
|
|||
|
||||
if (!options || options.end !== false) {
|
||||
function onend() {
|
||||
var index = pipes.indexOf(dest);
|
||||
pipes.splice(index, 1);
|
||||
|
||||
if (pipes.indexOf(dest) > -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
dest.end();
|
||||
}
|
||||
|
||||
source.on('end', onend);
|
||||
source.on('close', onend);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
|
|||
source.emit('resume');
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
var onpause = function() {
|
||||
source.pause();
|
||||
}
|
||||
|
||||
dest.on('pause', onpause);
|
||||
|
||||
|
||||
var onresume = function() {
|
||||
if (source.readable) source.resume();
|
||||
};
|
||||
|
||||
|
||||
dest.on('resume', onresume);
|
||||
|
||||
|
||||
var cleanup = function () {
|
||||
source.removeListener('data', ondata);
|
||||
dest.removeListener('drain', ondrain);
|
||||
source.removeListener('end', onend);
|
||||
|
||||
source.removeListener('close', onend);
|
||||
|
||||
dest.removeListener('pause', onpause);
|
||||
dest.removeListener('resume', onresume);
|
||||
|
||||
|
||||
source.removeListener('end', cleanup);
|
||||
source.removeListener('close', cleanup);
|
||||
|
||||
|
||||
dest.removeListener('end', cleanup);
|
||||
dest.removeListener('close', cleanup);
|
||||
}
|
||||
|
||||
|
||||
source.on('end', cleanup);
|
||||
source.on('close', cleanup);
|
||||
|
||||
|
|
|
@ -28,10 +28,13 @@ var util = require('util');
|
|||
|
||||
function Writable () {
|
||||
this.writable = true;
|
||||
this.endCalls = 0;
|
||||
stream.Stream.call(this);
|
||||
}
|
||||
util.inherits(Writable, stream.Stream);
|
||||
Writable.prototype.end = function () {}
|
||||
Writable.prototype.end = function () {
|
||||
this.endCalls++;
|
||||
}
|
||||
|
||||
function Readable () {
|
||||
this.readable = true;
|
||||
|
@ -56,6 +59,9 @@ for (i = 0; i < limit; i++) {
|
|||
r.emit('end')
|
||||
}
|
||||
assert.equal(0, r.listeners('end').length);
|
||||
assert.equal(limit, w.endCalls);
|
||||
|
||||
w.endCalls = 0;
|
||||
|
||||
for (i = 0; i < limit; i++) {
|
||||
r = new Readable()
|
||||
|
@ -63,6 +69,19 @@ for (i = 0; i < limit; i++) {
|
|||
r.emit('close')
|
||||
}
|
||||
assert.equal(0, r.listeners('close').length);
|
||||
assert.equal(limit, w.endCalls);
|
||||
|
||||
w.endCalls = 0;
|
||||
|
||||
var r2;
|
||||
r = new Readable()
|
||||
r2 = new Readable();
|
||||
|
||||
r.pipe(w)
|
||||
r2.pipe(w)
|
||||
r.emit('close')
|
||||
r2.emit('close')
|
||||
assert.equal(1, w.endCalls);
|
||||
|
||||
r = new Readable();
|
||||
|
||||
|
|
Loading…
Reference in New Issue