From b941d4211d197c16123dfbc311a2aec90dc7e714 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 29 Dec 2019 21:52:16 +0100 Subject: [PATCH] stream: writable bitfield --- benchmark/streams/writable-manywrites.js | 6 +- lib/_stream_writable.js | 139 +++++++++++++++++------ 2 files changed, 105 insertions(+), 40 deletions(-) diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index 049bf8eb281db2..f2931b5bae09cb 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -5,9 +5,9 @@ const Writable = require('stream').Writable; const bench = common.createBenchmark(main, { n: [2e6], - sync: ['yes', 'no'], - writev: ['yes', 'no'], - callback: ['yes', 'no'] + // sync: ['yes', 'no'], + // writev: ['yes', 'no'], + // callback: ['yes', 'no'] }); function main({ n, sync, writev, callback }) { diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 63a912186aec80..3aef176c014765 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -73,6 +73,8 @@ function WritableState(options, stream, isDuplex) { if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex; + this.bitfield = 0; + // Object stream flag to indicate whether or not this stream // contains buffers or objects. this.objectMode = !!(options && options.objectMode); @@ -184,6 +186,73 @@ function WritableState(options, stream, isDuplex) { this.corkedRequestsFree = corkReq; } +const F = {}; +for (let [ key, options ] of Object.entries({ + objectMode: {}, + finalCalled: {}, + drained: {}, + ending: {}, + ended: {}, + finished: {}, + destroyed: {}, + decodeStrings: {}, + writing: {}, + sync: {}, + bufferProcessing: {}, + prefinished: {}, + errorEmitted: {}, + emitClose: {}, + autoDestroy: {}, + errored: {} +})) { + options = options || {} + const mask = 1 << (Object.keys(F).length + 8); + F[key] = mask; + ObjectDefineProperty(WritableState.prototype, key, { + get () { + return !!(this.bitfield & mask); + }, + set (val) { + if (val) { + this.bitfield |= mask; + } else { + this.bitfield &= ~mask; + } + } + }) +} + +ObjectDefineProperty(WritableState.prototype, 'needDrain', { + get () { + return !this.drained; + }, + set (val) { + this.drained = !val; + } +}); + +F.corked = 0xF0; +ObjectDefineProperty(WritableState.prototype, 'corked', { + get () { + return (this.bitfield & F.corked) >> 4; + }, + set (val) { + this.bitfield &= ~F.corked; + this.bitfield |= (val << 4) & F.corked; + } +}); + +F.pendingcb = 0xF; +ObjectDefineProperty(WritableState.prototype, 'pendingcb', { + get () { + return (this.bitfield & F.pendingcb); + }, + set (val) { + this.bitfield &= ~F.pendingcb; + this.bitfield |= (val & F.pendingcb); + } +}); + WritableState.prototype.getBuffer = function getBuffer() { var current = this.bufferedRequest; const out = []; @@ -294,8 +363,9 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; + const objectMode = state.bitfield & F.objectMode; var ret = false; - const isBuf = !state.objectMode && Stream._isUint8Array(chunk); + const isBuf = !objectMode && Stream._isUint8Array(chunk); // Do not use Object.getPrototypeOf as it is slower since V8 7.3. if (isBuf && !(chunk instanceof Buffer)) { @@ -315,12 +385,14 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (typeof cb !== 'function') cb = nop; - if (state.ending) { - writeAfterEnd(this, cb); - } else if (state.destroyed) { - const err = new ERR_STREAM_DESTROYED('write'); - process.nextTick(cb, err); - errorOrDestroy(this, err); + if (state.bitfield & (F.ending | F.destroyed)) { + if (state.ending) { + writeAfterEnd(this, cb); + } else if (state.destroyed) { + const err = new ERR_STREAM_DESTROYED('write'); + process.nextTick(cb, err); + errorOrDestroy(this, err); + } } else if (isBuf || validChunk(this, state, chunk, cb)) { state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); @@ -340,8 +412,8 @@ Writable.prototype.uncork = function() { state.corked--; if (!state.writing && - !state.corked && !state.bufferProcessing && + !state.corked && state.bufferedRequest) clearBuffer(this, state); } @@ -401,8 +473,11 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, cb) { - if (!state.objectMode && - state.decodeStrings !== false && + const objectMode = state.bitfield & F.objectMode; + const decodeStrings = state.bitfield & F.decodeStrings; + + if (!objectMode && + decodeStrings && encoding !== 'buffer' && typeof chunk === 'string') { chunk = Buffer.from(chunk, encoding); @@ -415,9 +490,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { const ret = state.length < state.highWaterMark; // We must ensure that previous needDrain will not be reset to false. if (!ret) - state.needDrain = true; + state.bitfield &= ~F.drained; - if (state.writing || state.corked || state.errored) { + if (state.bitfield & (F.writing | F.corked | F.errored)) { var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, @@ -437,21 +512,18 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && !state.destroyed; + return ret && !(state.bitfield & (F.errored | F.destroyed)); } function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) - state.onwrite(new ERR_STREAM_DESTROYED('write')); - else if (writev) + state.bitfield |= F.writing | F.sync; + if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); - state.sync = false; + state.bitfield &= ~F.sync; } function onwriteError(stream, state, er, cb) { @@ -464,7 +536,7 @@ function onwriteError(stream, state, er, cb) { function onwrite(stream, er) { const state = stream._writableState; - const sync = state.sync; + const sync = state.bitfield & F.sync; const cb = state.writecb; if (typeof cb !== 'function') { @@ -472,7 +544,7 @@ function onwrite(stream, er) { return; } - state.writing = false; + state.bitfield &= ~F.writing; state.writecb = null; state.length -= state.writelen; state.writelen = 0; @@ -485,12 +557,7 @@ function onwrite(stream, er) { onwriteError(stream, state, er, cb); } } else { - // Check if we're actually ready to finish, but don't emit yet - var finished = needFinish(state) || stream.destroyed; - - if (!finished && - !state.corked && - !state.bufferProcessing && + if (!(state.bitfield & (F.corked | F.bufferProcessing)) && state.bufferedRequest) { clearBuffer(stream, state); } @@ -519,10 +586,10 @@ function afterWriteTick({ stream, state, count, cb }) { } function afterWrite(stream, state, count, cb) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; + const needDrain = !(state.bitfield & (F.ending | F.destroyed | F.drained)) && + state.length === 0; if (needDrain) { - state.needDrain = false; + state.bitfield |= F.drained; stream.emit('drain'); } @@ -536,7 +603,7 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { - state.bufferProcessing = true; + state.bitfield |= F.bufferProcessing; var entry = state.bufferedRequest; if (stream._writev && entry && entry.next) { @@ -597,7 +664,7 @@ function clearBuffer(stream, state) { } state.bufferedRequest = entry; - state.bufferProcessing = false; + state.bitfield &= ~F.bufferProcessing; } Writable.prototype._write = function(chunk, encoding, cb) { @@ -656,12 +723,10 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', { }); function needFinish(state) { - return (state.ending && + return ((state.bitfield & F.ending) && + !(state.bitfield & (F.errored | F.finished | F.writing)) && state.length === 0 && - !state.errored && - state.bufferedRequest === null && - !state.finished && - !state.writing); + state.bufferedRequest === null); } function callFinal(stream, state) {