Skip to content

Commit

Permalink
stream: writable bitfield
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 29, 2019
1 parent 283e7a4 commit b941d42
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 40 deletions.
6 changes: 3 additions & 3 deletions benchmark/streams/writable-manywrites.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ const Writable = require('stream').Writable;

const bench = common.createBenchmark(main, {
n: [2e6],
sync: ['yes', 'no'],
writev: ['yes', 'no'],
callback: ['yes', 'no']
// sync: ['yes', 'no'],
// writev: ['yes', 'no'],
// callback: ['yes', 'no']
});

function main({ n, sync, writev, callback }) {
Expand Down
139 changes: 102 additions & 37 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ function WritableState(options, stream, isDuplex) {
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

this.bitfield = 0;

// Object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!(options && options.objectMode);
Expand Down Expand Up @@ -184,6 +186,73 @@ function WritableState(options, stream, isDuplex) {
this.corkedRequestsFree = corkReq;
}

const F = {};
for (let [ key, options ] of Object.entries({
objectMode: {},
finalCalled: {},
drained: {},
ending: {},
ended: {},
finished: {},
destroyed: {},
decodeStrings: {},
writing: {},
sync: {},
bufferProcessing: {},
prefinished: {},
errorEmitted: {},
emitClose: {},
autoDestroy: {},
errored: {}
})) {
options = options || {}
const mask = 1 << (Object.keys(F).length + 8);
F[key] = mask;
ObjectDefineProperty(WritableState.prototype, key, {
get () {
return !!(this.bitfield & mask);
},
set (val) {
if (val) {
this.bitfield |= mask;
} else {
this.bitfield &= ~mask;
}
}
})
}

ObjectDefineProperty(WritableState.prototype, 'needDrain', {
get () {
return !this.drained;
},
set (val) {
this.drained = !val;
}
});

F.corked = 0xF0;
ObjectDefineProperty(WritableState.prototype, 'corked', {
get () {
return (this.bitfield & F.corked) >> 4;
},
set (val) {
this.bitfield &= ~F.corked;
this.bitfield |= (val << 4) & F.corked;
}
});

F.pendingcb = 0xF;
ObjectDefineProperty(WritableState.prototype, 'pendingcb', {
get () {
return (this.bitfield & F.pendingcb);
},
set (val) {
this.bitfield &= ~F.pendingcb;
this.bitfield |= (val & F.pendingcb);
}
});

WritableState.prototype.getBuffer = function getBuffer() {
var current = this.bufferedRequest;
const out = [];
Expand Down Expand Up @@ -294,8 +363,9 @@ function validChunk(stream, state, chunk, cb) {

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
const objectMode = state.bitfield & F.objectMode;
var ret = false;
const isBuf = !state.objectMode && Stream._isUint8Array(chunk);
const isBuf = !objectMode && Stream._isUint8Array(chunk);

// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
if (isBuf && !(chunk instanceof Buffer)) {
Expand All @@ -315,12 +385,14 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (typeof cb !== 'function')
cb = nop;

if (state.ending) {
writeAfterEnd(this, cb);
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
if (state.bitfield & (F.ending | F.destroyed)) {
if (state.ending) {
writeAfterEnd(this, cb);
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
}
} else if (isBuf || validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
Expand All @@ -340,8 +412,8 @@ Writable.prototype.uncork = function() {
state.corked--;

if (!state.writing &&
!state.corked &&
!state.bufferProcessing &&
!state.corked &&
state.bufferedRequest)
clearBuffer(this, state);
}
Expand Down Expand Up @@ -401,8 +473,11 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', {
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!state.objectMode &&
state.decodeStrings !== false &&
const objectMode = state.bitfield & F.objectMode;
const decodeStrings = state.bitfield & F.decodeStrings;

if (!objectMode &&
decodeStrings &&
encoding !== 'buffer' &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
Expand All @@ -415,9 +490,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
state.bitfield &= ~F.drained;

if (state.writing || state.corked || state.errored) {
if (state.bitfield & (F.writing | F.corked | F.errored)) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
Expand All @@ -437,21 +512,18 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
return ret && !(state.bitfield & (F.errored | F.destroyed));
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
if (state.destroyed)
state.onwrite(new ERR_STREAM_DESTROYED('write'));
else if (writev)
state.bitfield |= F.writing | F.sync;
if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
state.bitfield &= ~F.sync;
}

function onwriteError(stream, state, er, cb) {
Expand All @@ -464,15 +536,15 @@ function onwriteError(stream, state, er, cb) {

function onwrite(stream, er) {
const state = stream._writableState;
const sync = state.sync;
const sync = state.bitfield & F.sync;
const cb = state.writecb;

if (typeof cb !== 'function') {
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}

state.writing = false;
state.bitfield &= ~F.writing;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;
Expand All @@ -485,12 +557,7 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

if (!finished &&
!state.corked &&
!state.bufferProcessing &&
if (!(state.bitfield & (F.corked | F.bufferProcessing)) &&
state.bufferedRequest) {
clearBuffer(stream, state);
}
Expand Down Expand Up @@ -519,10 +586,10 @@ function afterWriteTick({ stream, state, count, cb }) {
}

function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
state.needDrain;
const needDrain = !(state.bitfield & (F.ending | F.destroyed | F.drained)) &&
state.length === 0;
if (needDrain) {
state.needDrain = false;
state.bitfield |= F.drained;
stream.emit('drain');
}

Expand All @@ -536,7 +603,7 @@ function afterWrite(stream, state, count, cb) {

// If there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
state.bitfield |= F.bufferProcessing;
var entry = state.bufferedRequest;

if (stream._writev && entry && entry.next) {
Expand Down Expand Up @@ -597,7 +664,7 @@ function clearBuffer(stream, state) {
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
state.bitfield &= ~F.bufferProcessing;
}

Writable.prototype._write = function(chunk, encoding, cb) {
Expand Down Expand Up @@ -656,12 +723,10 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', {
});

function needFinish(state) {
return (state.ending &&
return ((state.bitfield & F.ending) &&
!(state.bitfield & (F.errored | F.finished | F.writing)) &&
state.length === 0 &&
!state.errored &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
state.bufferedRequest === null);
}

function callFinal(stream, state) {
Expand Down

0 comments on commit b941d42

Please sign in to comment.