Skip to content

Commit

Permalink
Merge pull request #331 from vqvu/parallel-issue-328
Browse files Browse the repository at this point in the history
Parallel should not drop data if paused (fixes #328).
  • Loading branch information
vqvu committed Jun 30, 2015
2 parents ade706b + 2ff560e commit 5db4fc1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
37 changes: 15 additions & 22 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2964,14 +2964,7 @@ Stream.prototype.parallel = function (n) {
var reading_source = false;

return _(function (push, next) {
if (running.length && running[0].buffer.length) {
// send buffered data
flushBuffer();
next();
// still waiting for more data before we can shift
// the running array...
}
else if (running.length < n && !ended && !reading_source) {
if (running.length < n && !ended && !reading_source) {
// get another stream if not already waiting for one
reading_source = true;
source.pull(function (err, x) {
Expand All @@ -2996,9 +2989,7 @@ Stream.prototype.parallel = function (n) {
// remove self from running and check
// to see if we need to read from source again
running.shift();
if (running.length && running[0].buffer.length) {
flushBuffer();
}
flushBuffer();
next();

}
Expand Down Expand Up @@ -3027,19 +3018,21 @@ Stream.prototype.parallel = function (n) {
}

function flushBuffer() {
var buf = running[0].buffer;
for (var i = 0; i < buf.length; i++) {
if (buf[i][1] === nil) {
// this stream has ended
running.shift();
return;
}
else {
// send the buffered output
push.apply(null, buf[i]);
while (running.length && running[0].buffer.length) {
var buf = running[0].buffer;
for (var i = 0; i < buf.length; i++) {
if (buf[i][1] === nil) {
// this stream has ended
running.shift();
break;
}
else {
// send the buffered output
push.apply(null, buf[i]);
}
}
buf.length = 0;
}
buf.length = 0;
}
// else wait for more data to arrive from running streams
});
Expand Down
22 changes: 22 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5035,6 +5035,28 @@ exports['parallel - throw descriptive error on not-stream'] = function (test) {
test.done();
}

exports['parallel - parallel should not drop data if paused (issue #328)'] = function (test) {
test.expect(1);
var s1 = _([1, 2, 3]);
var s2 = _([11, 12, 13]);
_([s1.fork(), s2, s1.fork()])
.parallel(3)
.consume(function (err, x, push, next) {
push(err, x);
if (x.buf === 21) {
// Pause for a while.
setTimeout(next, 1000);
}
else if (x !== _.nil) {
next();
}
})
.toArray(function (xs) {
test.same(xs, [1, 2, 3, 11, 12, 13, 1, 2, 3]);
test.done();
});
};

exports['throttle'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
Expand Down

0 comments on commit 5db4fc1

Please sign in to comment.