mirror of https://github.com/nodejs/node.git
stream: optimize Writable
PR-URL: https://github.com/nodejs/node/pull/50012 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
This commit is contained in:
parent
55ff64001a
commit
95b8f5dcab
|
@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
|
|||
const kExpectWriteCb = 1 << 27;
|
||||
const kAfterWriteTickInfo = 1 << 28;
|
||||
const kAfterWritePending = 1 << 29;
|
||||
const kHasBuffer = 1 << 30;
|
||||
|
||||
// TODO(benjamingr) it is likely slower to do it this way than with free functions
|
||||
function makeBitMapDescriptor(bit) {
|
||||
|
@ -340,6 +341,7 @@ function resetBuffer(state) {
|
|||
state.buffered = [];
|
||||
state.bufferedIndex = 0;
|
||||
state.state |= kAllBuffers | kAllNoop;
|
||||
state.state &= ~kHasBuffer;
|
||||
}
|
||||
|
||||
WritableState.prototype.getBuffer = function getBuffer() {
|
||||
|
@ -396,11 +398,13 @@ function Writable(options) {
|
|||
destroyImpl.construct(this, () => {
|
||||
const state = this._writableState;
|
||||
|
||||
if (!state.writing) {
|
||||
if ((state.state & kWriting) === 0) {
|
||||
clearBuffer(this, state);
|
||||
}
|
||||
|
||||
finishMaybe(this, state);
|
||||
if ((state.state & kEnding) !== 0) {
|
||||
finishMaybe(this, state);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -523,6 +527,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
|
|||
|
||||
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
|
||||
state.buffered.push({ chunk, encoding, callback });
|
||||
state.state |= kHasBuffer;
|
||||
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
|
||||
state.state &= ~kAllBuffers;
|
||||
}
|
||||
|
@ -591,8 +596,9 @@ function onwrite(stream, er) {
|
|||
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
|
||||
er.stack; // eslint-disable-line no-unused-expressions
|
||||
|
||||
if (!state.errored) {
|
||||
state.errored = er;
|
||||
if ((state.state & kErrored) === 0) {
|
||||
state[kErroredValue] = er;
|
||||
state.state |= kErrored;
|
||||
}
|
||||
|
||||
// In case of duplex streams we need to notify the readable side of the
|
||||
|
@ -607,12 +613,12 @@ function onwrite(stream, er) {
|
|||
onwriteError(stream, state, er, cb);
|
||||
}
|
||||
} else {
|
||||
if (state.buffered.length > state.bufferedIndex) {
|
||||
if ((state.state & kHasBuffer) !== 0) {
|
||||
clearBuffer(stream, state);
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
const needDrain = state.length === 0 && (state.state & kNeedDrain) !== 0;
|
||||
const needDrain = (state.state & kNeedDrain) !== 0 && state.length === 0;
|
||||
const needTick = needDrain || (state.state & kDestroyed !== 0) || cb !== nop;
|
||||
|
||||
// It is a common case that the callback passed to .write() is always
|
||||
|
@ -625,7 +631,9 @@ function onwrite(stream, er) {
|
|||
state.state |= kAfterWritePending;
|
||||
} else {
|
||||
state.pendingcb--;
|
||||
finishMaybe(stream, state, true);
|
||||
if ((state.state & kEnding) !== 0) {
|
||||
finishMaybe(stream, state, true);
|
||||
}
|
||||
}
|
||||
} else if ((state.state & kAfterWriteTickInfo) !== 0 &&
|
||||
state[kAfterWriteTickInfoValue].cb === cb) {
|
||||
|
@ -636,7 +644,9 @@ function onwrite(stream, er) {
|
|||
state.state |= (kAfterWritePending | kAfterWriteTickInfo);
|
||||
} else {
|
||||
state.pendingcb--;
|
||||
finishMaybe(stream, state, true);
|
||||
if ((state.state & kEnding) !== 0) {
|
||||
finishMaybe(stream, state, true);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
afterWrite(stream, state, 1, cb);
|
||||
|
@ -668,7 +678,9 @@ function afterWrite(stream, state, count, cb) {
|
|||
errorBuffer(state);
|
||||
}
|
||||
|
||||
finishMaybe(stream, state);
|
||||
if ((state.state & kEnding) !== 0) {
|
||||
finishMaybe(stream, state, true);
|
||||
}
|
||||
}
|
||||
|
||||
// If there's something in the buffer waiting, then invoke callbacks.
|
||||
|
@ -692,7 +704,7 @@ function errorBuffer(state) {
|
|||
|
||||
// If there's something in the buffer waiting, then process it.
|
||||
function clearBuffer(stream, state) {
|
||||
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
|
||||
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer ||
|
||||
(state.state & kConstructed) === 0) {
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue