mirror of https://github.com/nodejs/node.git
stream: writable state bitmap
PR-URL: https://github.com/nodejs/node/pull/49899 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Raz Luvaton <rluvaton@gmail.com> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
This commit is contained in:
parent
ad17126501
commit
35ec93115d
|
@ -4,7 +4,7 @@ const common = require('../common');
|
||||||
const Writable = require('stream').Writable;
|
const Writable = require('stream').Writable;
|
||||||
|
|
||||||
const bench = common.createBenchmark(main, {
|
const bench = common.createBenchmark(main, {
|
||||||
n: [2e6],
|
n: [1e5],
|
||||||
sync: ['yes', 'no'],
|
sync: ['yes', 'no'],
|
||||||
writev: ['yes', 'no'],
|
writev: ['yes', 'no'],
|
||||||
callback: ['yes', 'no'],
|
callback: ['yes', 'no'],
|
||||||
|
@ -13,7 +13,7 @@ const bench = common.createBenchmark(main, {
|
||||||
|
|
||||||
function main({ n, sync, writev, callback, len }) {
|
function main({ n, sync, writev, callback, len }) {
|
||||||
const b = Buffer.allocUnsafe(len);
|
const b = Buffer.allocUnsafe(len);
|
||||||
const s = new Writable();
|
const s = new Writable({ highWaterMark: 16 * 1024 });
|
||||||
sync = sync === 'yes';
|
sync = sync === 'yes';
|
||||||
|
|
||||||
const writecb = (cb) => {
|
const writecb = (cb) => {
|
||||||
|
|
|
@ -72,7 +72,11 @@ ObjectSetPrototypeOf(Writable, Stream);
|
||||||
|
|
||||||
function nop() {}
|
function nop() {}
|
||||||
|
|
||||||
const kOnFinished = Symbol('kOnFinished');
|
const kOnFinishedValue = Symbol('kOnFinishedValue');
|
||||||
|
const kErroredValue = Symbol('kErroredValue');
|
||||||
|
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
|
||||||
|
const kWriteCbValue = Symbol('kWriteCbValue');
|
||||||
|
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
|
||||||
|
|
||||||
const kObjectMode = 1 << 0;
|
const kObjectMode = 1 << 0;
|
||||||
const kEnded = 1 << 1;
|
const kEnded = 1 << 1;
|
||||||
|
@ -94,6 +98,16 @@ const kBufferProcessing = 1 << 16;
|
||||||
const kPrefinished = 1 << 17;
|
const kPrefinished = 1 << 17;
|
||||||
const kAllBuffers = 1 << 18;
|
const kAllBuffers = 1 << 18;
|
||||||
const kAllNoop = 1 << 19;
|
const kAllNoop = 1 << 19;
|
||||||
|
const kOnFinished = 1 << 20;
|
||||||
|
const kErrored = 1 << 21;
|
||||||
|
const kHasWritable = 1 << 22;
|
||||||
|
const kWritable = 1 << 23;
|
||||||
|
const kCorked = 1 << 24;
|
||||||
|
const kDefaultUTF8Encoding = 1 << 25;
|
||||||
|
const kWriteCb = 1 << 26;
|
||||||
|
const kExpectWriteCb = 1 << 27;
|
||||||
|
const kAfterWriteTickInfo = 1 << 28;
|
||||||
|
const kAfterWritePending = 1 << 29;
|
||||||
|
|
||||||
// TODO(benjamingr) it is likely slower to do it this way than with free functions
|
// TODO(benjamingr) it is likely slower to do it this way than with free functions
|
||||||
function makeBitMapDescriptor(bit) {
|
function makeBitMapDescriptor(bit) {
|
||||||
|
@ -176,6 +190,85 @@ ObjectDefineProperties(WritableState.prototype, {
|
||||||
|
|
||||||
allBuffers: makeBitMapDescriptor(kAllBuffers),
|
allBuffers: makeBitMapDescriptor(kAllBuffers),
|
||||||
allNoop: makeBitMapDescriptor(kAllNoop),
|
allNoop: makeBitMapDescriptor(kAllNoop),
|
||||||
|
|
||||||
|
// Indicates whether the stream has errored. When true all write() calls
|
||||||
|
// should return false. This is needed since when autoDestroy
|
||||||
|
// is disabled we need a way to tell whether the stream has failed.
|
||||||
|
// This is/should be a cold path.
|
||||||
|
errored: {
|
||||||
|
__proto__: null,
|
||||||
|
enumerable: false,
|
||||||
|
get() { return (this.state & kErrored) !== 0 ? this[kErroredValue] : null; },
|
||||||
|
set(value) {
|
||||||
|
if (value) {
|
||||||
|
this[kErroredValue] = value;
|
||||||
|
this.state |= kErrored;
|
||||||
|
} else {
|
||||||
|
this.state &= ~kErrored;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
writable: {
|
||||||
|
__proto__: null,
|
||||||
|
enumerable: false,
|
||||||
|
get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : undefined; },
|
||||||
|
set(value) {
|
||||||
|
if (value == null) {
|
||||||
|
this.state &= ~(kHasWritable | kWritable);
|
||||||
|
} else if (value) {
|
||||||
|
this.state |= (kHasWritable | kWritable);
|
||||||
|
} else {
|
||||||
|
this.state |= kHasWritable;
|
||||||
|
this.state &= ~kWritable;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
defaultEncoding: {
|
||||||
|
__proto__: null,
|
||||||
|
enumerable: false,
|
||||||
|
get() { return (this.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; },
|
||||||
|
set(value) {
|
||||||
|
if (value === 'utf8' || value === 'utf-8') {
|
||||||
|
this.state |= kDefaultUTF8Encoding;
|
||||||
|
} else {
|
||||||
|
this.state &= ~kDefaultUTF8Encoding;
|
||||||
|
this[kDefaultEncodingValue] = value;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
// The callback that the user supplies to write(chunk, encoding, cb).
|
||||||
|
writecb: {
|
||||||
|
__proto__: null,
|
||||||
|
enumerable: false,
|
||||||
|
get() { return (this.state & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; },
|
||||||
|
set(value) {
|
||||||
|
if (value) {
|
||||||
|
this[kWriteCbValue] = value;
|
||||||
|
this.state |= kWriteCb;
|
||||||
|
} else {
|
||||||
|
this.state &= ~kWriteCb;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
// Storage for data passed to the afterWrite() callback in case of
|
||||||
|
// synchronous _write() completion.
|
||||||
|
afterWriteTickInfo: {
|
||||||
|
__proto__: null,
|
||||||
|
enumerable: false,
|
||||||
|
get() { return (this.state & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; },
|
||||||
|
set(value) {
|
||||||
|
if (value) {
|
||||||
|
this[kAfterWriteTickInfoValue] = value;
|
||||||
|
this.state |= kAfterWriteTickInfo;
|
||||||
|
} else {
|
||||||
|
this.state &= ~kAfterWriteTickInfo;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
function WritableState(options, stream, isDuplex) {
|
function WritableState(options, stream, isDuplex) {
|
||||||
|
@ -213,10 +306,11 @@ function WritableState(options, stream, isDuplex) {
|
||||||
// encoding is 'binary' so we have to make this configurable.
|
// encoding is 'binary' so we have to make this configurable.
|
||||||
// Everything else in the universe uses 'utf8', though.
|
// Everything else in the universe uses 'utf8', though.
|
||||||
const defaultEncoding = options?.defaultEncoding;
|
const defaultEncoding = options?.defaultEncoding;
|
||||||
if (defaultEncoding == null) {
|
if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') {
|
||||||
this.defaultEncoding = 'utf8';
|
this.state |= kDefaultUTF8Encoding;
|
||||||
} else if (Buffer.isEncoding(defaultEncoding)) {
|
} else if (Buffer.isEncoding(defaultEncoding)) {
|
||||||
this.defaultEncoding = defaultEncoding;
|
this.state &= ~kDefaultUTF8Encoding;
|
||||||
|
this[kDefaultEncodingValue] = defaultEncoding;
|
||||||
} else {
|
} else {
|
||||||
throw new ERR_UNKNOWN_ENCODING(defaultEncoding);
|
throw new ERR_UNKNOWN_ENCODING(defaultEncoding);
|
||||||
}
|
}
|
||||||
|
@ -232,28 +326,14 @@ function WritableState(options, stream, isDuplex) {
|
||||||
// The callback that's passed to _write(chunk, cb).
|
// The callback that's passed to _write(chunk, cb).
|
||||||
this.onwrite = onwrite.bind(undefined, stream);
|
this.onwrite = onwrite.bind(undefined, stream);
|
||||||
|
|
||||||
// The callback that the user supplies to write(chunk, encoding, cb).
|
|
||||||
this.writecb = null;
|
|
||||||
|
|
||||||
// The amount that is being written when _write is called.
|
// The amount that is being written when _write is called.
|
||||||
this.writelen = 0;
|
this.writelen = 0;
|
||||||
|
|
||||||
// Storage for data passed to the afterWrite() callback in case of
|
|
||||||
// synchronous _write() completion.
|
|
||||||
this.afterWriteTickInfo = null;
|
|
||||||
|
|
||||||
resetBuffer(this);
|
resetBuffer(this);
|
||||||
|
|
||||||
// Number of pending user-supplied write callbacks
|
// Number of pending user-supplied write callbacks
|
||||||
// this must be 0 before 'finish' can be emitted.
|
// this must be 0 before 'finish' can be emitted.
|
||||||
this.pendingcb = 0;
|
this.pendingcb = 0;
|
||||||
|
|
||||||
// Indicates whether the stream has errored. When true all write() calls
|
|
||||||
// should return false. This is needed since when autoDestroy
|
|
||||||
// is disabled we need a way to tell whether the stream has failed.
|
|
||||||
this.errored = null;
|
|
||||||
|
|
||||||
this[kOnFinished] = [];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function resetBuffer(state) {
|
function resetBuffer(state) {
|
||||||
|
@ -344,10 +424,10 @@ function _write(stream, chunk, encoding, cb) {
|
||||||
|
|
||||||
if (typeof encoding === 'function') {
|
if (typeof encoding === 'function') {
|
||||||
cb = encoding;
|
cb = encoding;
|
||||||
encoding = state.defaultEncoding;
|
encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding;
|
||||||
} else {
|
} else {
|
||||||
if (!encoding)
|
if (!encoding)
|
||||||
encoding = state.defaultEncoding;
|
encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding;
|
||||||
else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
|
else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
|
||||||
throw new ERR_UNKNOWN_ENCODING(encoding);
|
throw new ERR_UNKNOWN_ENCODING(encoding);
|
||||||
if (typeof cb !== 'function')
|
if (typeof cb !== 'function')
|
||||||
|
@ -394,7 +474,10 @@ Writable.prototype.write = function(chunk, encoding, cb) {
|
||||||
};
|
};
|
||||||
|
|
||||||
Writable.prototype.cork = function() {
|
Writable.prototype.cork = function() {
|
||||||
this._writableState.corked++;
|
const state = this._writableState;
|
||||||
|
|
||||||
|
state.state |= kCorked;
|
||||||
|
state.corked++;
|
||||||
};
|
};
|
||||||
|
|
||||||
Writable.prototype.uncork = function() {
|
Writable.prototype.uncork = function() {
|
||||||
|
@ -403,6 +486,10 @@ Writable.prototype.uncork = function() {
|
||||||
if (state.corked) {
|
if (state.corked) {
|
||||||
state.corked--;
|
state.corked--;
|
||||||
|
|
||||||
|
if (!state.corked) {
|
||||||
|
state.state &= ~kCorked;
|
||||||
|
}
|
||||||
|
|
||||||
if ((state.state & kWriting) === 0)
|
if ((state.state & kWriting) === 0)
|
||||||
clearBuffer(this, state);
|
clearBuffer(this, state);
|
||||||
}
|
}
|
||||||
|
@ -428,11 +515,13 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
|
||||||
|
|
||||||
// stream._write resets state.length
|
// stream._write resets state.length
|
||||||
const ret = state.length < state.highWaterMark;
|
const ret = state.length < state.highWaterMark;
|
||||||
// We must ensure that previous needDrain will not be reset to false.
|
|
||||||
if (!ret)
|
|
||||||
state.state |= kNeedDrain;
|
|
||||||
|
|
||||||
if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
|
// We must ensure that previous needDrain will not be reset to false.
|
||||||
|
if (!ret) {
|
||||||
|
state.state |= kNeedDrain;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
|
||||||
state.buffered.push({ chunk, encoding, callback });
|
state.buffered.push({ chunk, encoding, callback });
|
||||||
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
|
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
|
||||||
state.state &= ~kAllBuffers;
|
state.state &= ~kAllBuffers;
|
||||||
|
@ -442,21 +531,25 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
state.writelen = len;
|
state.writelen = len;
|
||||||
state.writecb = callback;
|
if (callback !== nop) {
|
||||||
state.state |= kWriting | kSync;
|
state.writecb = callback;
|
||||||
|
}
|
||||||
|
state.state |= kWriting | kSync | kExpectWriteCb;
|
||||||
stream._write(chunk, encoding, state.onwrite);
|
stream._write(chunk, encoding, state.onwrite);
|
||||||
state.state &= ~kSync;
|
state.state &= ~kSync;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return false if errored or destroyed in order to break
|
// Return false if errored or destroyed in order to break
|
||||||
// any synchronous while(stream.write(data)) loops.
|
// any synchronous while(stream.write(data)) loops.
|
||||||
return ret && !state.errored && (state.state & kDestroyed) === 0;
|
return ret && (state.state & (kDestroyed | kErrored)) === 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
||||||
state.writelen = len;
|
state.writelen = len;
|
||||||
state.writecb = cb;
|
if (cb !== nop) {
|
||||||
state.state |= kWriting | kSync;
|
state.writecb = cb;
|
||||||
|
}
|
||||||
|
state.state |= kWriting | kSync | kExpectWriteCb;
|
||||||
if ((state.state & kDestroyed) !== 0)
|
if ((state.state & kDestroyed) !== 0)
|
||||||
state.onwrite(new ERR_STREAM_DESTROYED('write'));
|
state.onwrite(new ERR_STREAM_DESTROYED('write'));
|
||||||
else if (writev)
|
else if (writev)
|
||||||
|
@ -481,16 +574,16 @@ function onwriteError(stream, state, er, cb) {
|
||||||
|
|
||||||
function onwrite(stream, er) {
|
function onwrite(stream, er) {
|
||||||
const state = stream._writableState;
|
const state = stream._writableState;
|
||||||
const sync = (state.state & kSync) !== 0;
|
|
||||||
const cb = state.writecb;
|
|
||||||
|
|
||||||
if (typeof cb !== 'function') {
|
if ((state.state & kExpectWriteCb) === 0) {
|
||||||
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
|
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
state.state &= ~kWriting;
|
const sync = (state.state & kSync) !== 0;
|
||||||
state.writecb = null;
|
const cb = (state.state & kWriteCb) !== 0 ? state[kWriteCbValue] : nop;
|
||||||
|
|
||||||
|
state.state &= ~(kWriting | kExpectWriteCb | kWriteCb);
|
||||||
state.length -= state.writelen;
|
state.length -= state.writelen;
|
||||||
state.writelen = 0;
|
state.writelen = 0;
|
||||||
|
|
||||||
|
@ -523,12 +616,20 @@ function onwrite(stream, er) {
|
||||||
// the same. In that case, we do not schedule a new nextTick(), but
|
// the same. In that case, we do not schedule a new nextTick(), but
|
||||||
// rather just increase a counter, to improve performance and avoid
|
// rather just increase a counter, to improve performance and avoid
|
||||||
// memory allocations.
|
// memory allocations.
|
||||||
if (state.afterWriteTickInfo !== null &&
|
if (cb === nop) {
|
||||||
state.afterWriteTickInfo.cb === cb) {
|
if ((state.state & kAfterWritePending) === 0) {
|
||||||
|
process.nextTick(afterWrite, stream, state, 1, cb);
|
||||||
|
state.state |= kAfterWritePending;
|
||||||
|
} else {
|
||||||
|
state.pendingcb -= 1;
|
||||||
|
}
|
||||||
|
} else if (state.afterWriteTickInfo !== null &&
|
||||||
|
state.afterWriteTickInfo.cb === cb) {
|
||||||
state.afterWriteTickInfo.count++;
|
state.afterWriteTickInfo.count++;
|
||||||
} else {
|
} else {
|
||||||
state.afterWriteTickInfo = { count: 1, cb, stream, state };
|
state.afterWriteTickInfo = { count: 1, cb, stream, state };
|
||||||
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
|
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
|
||||||
|
state.state |= kAfterWritePending;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
afterWrite(stream, state, 1, cb);
|
afterWrite(stream, state, 1, cb);
|
||||||
|
@ -542,7 +643,9 @@ function afterWriteTick({ stream, state, count, cb }) {
|
||||||
}
|
}
|
||||||
|
|
||||||
function afterWrite(stream, state, count, cb) {
|
function afterWrite(stream, state, count, cb) {
|
||||||
const needDrain = (state.state & (kEnding | kNeedDrain)) === kNeedDrain && !stream.destroyed && state.length === 0;
|
state.state &= ~kAfterWritePending;
|
||||||
|
|
||||||
|
const needDrain = (state.state & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0;
|
||||||
if (needDrain) {
|
if (needDrain) {
|
||||||
state.state &= ~kNeedDrain;
|
state.state &= ~kNeedDrain;
|
||||||
stream.emit('drain');
|
stream.emit('drain');
|
||||||
|
@ -573,19 +676,16 @@ function errorBuffer(state) {
|
||||||
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
|
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
|
||||||
}
|
}
|
||||||
|
|
||||||
const onfinishCallbacks = state[kOnFinished].splice(0);
|
|
||||||
for (let i = 0; i < onfinishCallbacks.length; i++) {
|
callFinishedCallbacks(state, state.errored ?? new ERR_STREAM_DESTROYED('end'));
|
||||||
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
|
|
||||||
}
|
|
||||||
|
|
||||||
resetBuffer(state);
|
resetBuffer(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's something in the buffer waiting, then process it.
|
// If there's something in the buffer waiting, then process it.
|
||||||
function clearBuffer(stream, state) {
|
function clearBuffer(stream, state) {
|
||||||
if (state.corked ||
|
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
|
||||||
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
|
(state.state & kConstructed) === 0) {
|
||||||
(state.state & kConstructed) === 0) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,7 +761,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
||||||
|
|
||||||
let err;
|
let err;
|
||||||
|
|
||||||
if (chunk !== null && chunk !== undefined) {
|
if (chunk != null) {
|
||||||
const ret = _write(this, chunk, encoding);
|
const ret = _write(this, chunk, encoding);
|
||||||
if (ret instanceof Error) {
|
if (ret instanceof Error) {
|
||||||
err = ret;
|
err = ret;
|
||||||
|
@ -669,14 +769,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// .end() fully uncorks.
|
// .end() fully uncorks.
|
||||||
if (state.corked) {
|
if ((state.state & kCorked) !== 0) {
|
||||||
state.corked = 1;
|
state.corked = 1;
|
||||||
this.uncork();
|
this.uncork();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
// Do nothing...
|
// Do nothing...
|
||||||
} else if (!state.errored && (state.state & kEnding) === 0) {
|
} else if ((state.state & (kEnding | kErrored)) === 0) {
|
||||||
// This is forgiving in terms of unnecessary calls to end() and can hide
|
// This is forgiving in terms of unnecessary calls to end() and can hide
|
||||||
// logic errors. However, usually such errors are harmless and causing a
|
// logic errors. However, usually such errors are harmless and causing a
|
||||||
// hard error can be disproportionately destructive. It is not always
|
// hard error can be disproportionately destructive. It is not always
|
||||||
|
@ -698,7 +798,9 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
||||||
} else if ((state.state & kFinished) !== 0) {
|
} else if ((state.state & kFinished) !== 0) {
|
||||||
process.nextTick(cb, null);
|
process.nextTick(cb, null);
|
||||||
} else {
|
} else {
|
||||||
state[kOnFinished].push(cb);
|
state.state |= kOnFinished;
|
||||||
|
state[kOnFinishedValue] ??= [];
|
||||||
|
state[kOnFinishedValue].push(cb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -715,10 +817,10 @@ function needFinish(state) {
|
||||||
kFinished |
|
kFinished |
|
||||||
kWriting |
|
kWriting |
|
||||||
kErrorEmitted |
|
kErrorEmitted |
|
||||||
kCloseEmitted
|
kCloseEmitted |
|
||||||
|
kErrored
|
||||||
)) === (kEnding | kConstructed) &&
|
)) === (kEnding | kConstructed) &&
|
||||||
state.length === 0 &&
|
state.length === 0 &&
|
||||||
!state.errored &&
|
|
||||||
state.buffered.length === 0);
|
state.buffered.length === 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -734,10 +836,7 @@ function callFinal(stream, state) {
|
||||||
|
|
||||||
state.pendingcb--;
|
state.pendingcb--;
|
||||||
if (err) {
|
if (err) {
|
||||||
const onfinishCallbacks = state[kOnFinished].splice(0);
|
callFinishedCallbacks(state, err);
|
||||||
for (let i = 0; i < onfinishCallbacks.length; i++) {
|
|
||||||
onfinishCallbacks[i](err);
|
|
||||||
}
|
|
||||||
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
|
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
|
||||||
} else if (needFinish(state)) {
|
} else if (needFinish(state)) {
|
||||||
state.state |= kPrefinished;
|
state.state |= kPrefinished;
|
||||||
|
@ -799,10 +898,7 @@ function finish(stream, state) {
|
||||||
state.pendingcb--;
|
state.pendingcb--;
|
||||||
state.state |= kFinished;
|
state.state |= kFinished;
|
||||||
|
|
||||||
const onfinishCallbacks = state[kOnFinished].splice(0);
|
callFinishedCallbacks(state, null);
|
||||||
for (let i = 0; i < onfinishCallbacks.length; i++) {
|
|
||||||
onfinishCallbacks[i](null);
|
|
||||||
}
|
|
||||||
|
|
||||||
stream.emit('finish');
|
stream.emit('finish');
|
||||||
|
|
||||||
|
@ -822,8 +918,20 @@ function finish(stream, state) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectDefineProperties(Writable.prototype, {
|
function callFinishedCallbacks(state, err) {
|
||||||
|
if ((state.state & kOnFinished) === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const onfinishCallbacks = state[kOnFinishedValue];
|
||||||
|
state[kOnFinishedValue] = null;
|
||||||
|
state.state &= ~kOnFinished;
|
||||||
|
for (let i = 0; i < onfinishCallbacks.length; i++) {
|
||||||
|
onfinishCallbacks[i](err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectDefineProperties(Writable.prototype, {
|
||||||
closed: {
|
closed: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
|
@ -867,60 +975,64 @@ ObjectDefineProperties(Writable.prototype, {
|
||||||
writableFinished: {
|
writableFinished: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState ? (this._writableState.state & kFinished) !== 0 : false;
|
const state = this._writableState;
|
||||||
|
return state ? (state.state & kFinished) !== 0 : false;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableObjectMode: {
|
writableObjectMode: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState ? (this._writableState.state & kObjectMode) !== 0 : false;
|
const state = this._writableState;
|
||||||
|
return state ? (state.state & kObjectMode) !== 0 : false;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableBuffer: {
|
writableBuffer: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState && this._writableState.getBuffer();
|
const state = this._writableState;
|
||||||
|
return state && state.getBuffer();
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableEnded: {
|
writableEnded: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState ? (this._writableState.state & kEnding) !== 0 : false;
|
const state = this._writableState;
|
||||||
|
return state ? (state.state & kEnding) !== 0 : false;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableNeedDrain: {
|
writableNeedDrain: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
const wState = this._writableState;
|
const state = this._writableState;
|
||||||
if (!wState) return false;
|
return state ? (state.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false;
|
||||||
|
|
||||||
// !destroyed && !ending && needDrain
|
|
||||||
return (wState.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain;
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableHighWaterMark: {
|
writableHighWaterMark: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState && this._writableState.highWaterMark;
|
const state = this._writableState;
|
||||||
|
return state && state.highWaterMark;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableCorked: {
|
writableCorked: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState ? this._writableState.corked : 0;
|
const state = this._writableState;
|
||||||
|
return state ? state.corked : 0;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableLength: {
|
writableLength: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState && this._writableState.length;
|
const state = this._writableState;
|
||||||
|
return state && state.length;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -928,18 +1040,19 @@ ObjectDefineProperties(Writable.prototype, {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
enumerable: false,
|
enumerable: false,
|
||||||
get() {
|
get() {
|
||||||
return this._writableState ? this._writableState.errored : null;
|
const state = this._writableState;
|
||||||
|
return state ? state.errored : null;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
writableAborted: {
|
writableAborted: {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
enumerable: false,
|
|
||||||
get: function() {
|
get: function() {
|
||||||
return !!(
|
const state = this._writableState;
|
||||||
this._writableState.writable !== false &&
|
return (
|
||||||
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
|
(state.state & (kHasWritable | kWritable)) !== kHasWritable &&
|
||||||
(this._writableState.state & kFinished) === 0
|
(state.state & (kDestroyed | kErrored)) !== 0 &&
|
||||||
|
(state.state & kFinished) === 0
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -952,7 +1065,7 @@ Writable.prototype.destroy = function(err, cb) {
|
||||||
// Invoke pending callbacks.
|
// Invoke pending callbacks.
|
||||||
if ((state.state & kDestroyed) === 0 &&
|
if ((state.state & kDestroyed) === 0 &&
|
||||||
(state.bufferedIndex < state.buffered.length ||
|
(state.bufferedIndex < state.buffered.length ||
|
||||||
state[kOnFinished].length)) {
|
(state.state & kOnFinished) !== 0)) {
|
||||||
process.nextTick(errorBuffer, state);
|
process.nextTick(errorBuffer, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue