Fix: Multiple pipes to the same stream were broken

When creating multiple .pipe()s to the same destination stream, the
first source to end would close the destination, breaking all remaining
pipes. This patch fixes the problem by keeping track of all open
pipes, so that we only call end on destinations that have no more
sources piping to them.

closes #929
This commit is contained in:
Felix Geisendörfer 2011-04-14 20:33:54 +02:00 committed by Ryan Dahl
parent 8417870f51
commit 6c5b31bd80
2 changed files with 41 additions and 9 deletions

View File

@ -28,9 +28,13 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
var pipes = [];
Stream.prototype.pipe = function(dest, options) {
var source = this;
pipes.push(dest);
function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {
if (!options || options.end !== false) {
function onend() {
var index = pipes.indexOf(dest);
pipes.splice(index, 1);
if (pipes.indexOf(dest) > -1) {
return;
}
dest.end();
}
source.on('end', onend);
source.on('close', onend);
}
/*
@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
source.emit('resume');
};
}
var onpause = function() {
source.pause();
}
dest.on('pause', onpause);
var onresume = function() {
if (source.readable) source.resume();
};
dest.on('resume', onresume);
var cleanup = function () {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
source.removeListener('close', onend);
dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
dest.removeListener('end', cleanup);
dest.removeListener('close', cleanup);
}
source.on('end', cleanup);
source.on('close', cleanup);

View File

@ -28,10 +28,13 @@ var util = require('util');
function Writable () {
this.writable = true;
this.endCalls = 0;
stream.Stream.call(this);
}
util.inherits(Writable, stream.Stream);
Writable.prototype.end = function () {}
Writable.prototype.end = function () {
this.endCalls++;
}
function Readable () {
this.readable = true;
@ -56,6 +59,9 @@ for (i = 0; i < limit; i++) {
r.emit('end')
}
assert.equal(0, r.listeners('end').length);
assert.equal(limit, w.endCalls);
w.endCalls = 0;
for (i = 0; i < limit; i++) {
r = new Readable()
@ -63,6 +69,19 @@ for (i = 0; i < limit; i++) {
r.emit('close')
}
assert.equal(0, r.listeners('close').length);
assert.equal(limit, w.endCalls);
w.endCalls = 0;
var r2;
r = new Readable()
r2 = new Readable();
r.pipe(w)
r2.pipe(w)
r.emit('close')
r2.emit('close')
assert.equal(1, w.endCalls);
r = new Readable();