Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel and corrupted data? #328

Closed
ronag opened this issue Jun 29, 2015 · 15 comments
Closed

parallel and corrupted data? #328

ronag opened this issue Jun 29, 2015 · 15 comments
Labels
Milestone

Comments

@ronag
Copy link

ronag commented Jun 29, 2015

I've noticed a strange occurrence. I'm reading raw compressed data from multiple files and then concatenating them:

   .map(url => _(request.get(url))
   .parallel(8)
   .through(decoder())

However, this results sometimes in corrupted data.

While the following works just fine:

   .map(url => _(request.get(url))
   .parallel(1)
   .through(decoder())

I don't have any reproducible example yet. The problem is somewhat intermittent.

@vqvu
Copy link
Collaborator

vqvu commented Jun 29, 2015

Can you try testing against the latest master? parallel had a buffering bug where it would drop data in certain circumstances (see #234). This was fixed in #302 but has not yet been released.

@ronag
Copy link
Author

ronag commented Jun 29, 2015

Will need to do some more testing but so far that seems to have solved it.

Looking forward to the next release :).

@ronag ronag closed this as completed Jun 29, 2015
@ronag
Copy link
Author

ronag commented Jun 29, 2015

Nope. Still a problem.

@ronag ronag reopened this Jun 29, 2015
@vqvu
Copy link
Collaborator

vqvu commented Jun 29, 2015

Do you know why the data is corrupted? Are the Buffers being dropped or reordered?

@ronag
Copy link
Author

ronag commented Jun 29, 2015

It's very difficult to tell. It's H264 video buffers that are sent to ffplay. Analysing a H264 bitstream is a bit complicated.

@vqvu
Copy link
Collaborator

vqvu commented Jun 29, 2015

Hmm, I tried reading a bunch of local files via parallel and couldn't find any issue.

I've pushed a debug branch here: https://github.com/vqvu/highland/tree/parallel-debug (vqvu/highland#parallel-debug).

Can you run this code against that branch? It should track the order of buffers that request emits and tell you if anything is wrong.

Don't forget to npm update --dev first!

var counter = [];
var i = 0;
var checkState = {
    urlIndex: 0,
    bufIndex: 0
};

// Check correct version
_.whatAmI();
// => Highland 2.x. debug-parallel edition

stream.map(url => _(request.get(url)))
    .map(function (s) {
        var urlIndex = i++;
        counter[urlIndex] = 0;
        return s.map(function (buf) {
            return {
                index: [urlIndex, counter[urlIndex]++],
                buf: buf,
            };
        });
    })
    .parallel(8)
    .map(function (obj) {
        if (obj.index[0] === checkState.urlIndex + 1) {
            checkState.urlIndex++;
            checkState.bufIndex = 0;
        }

        if (obj.index[0] === checkState.urlIndex) {
            if (obj.index[1] !== checkState.bufIndex) {
                console.log('Corrupt data.', checkState, obj.index);
            }
            checkState.bufIndex++;
        }
        else {
            console.log('Corrupt data.', checkState, obj.index);
        }

        return obj.buf;
    })
   .through(decoder())

@ronag
Copy link
Author

ronag commented Jun 29, 2015

How do I checkout a branch with npm?

[email protected]:vqvu/highland.git...something

@ronag
Copy link
Author

ronag commented Jun 29, 2015

See also, #330, with which I have no issues, so far.

@apaleslimghost
Copy link
Collaborator

@ronag you can run npm install vqvu/highland#parallel-debug

@apaleslimghost
Copy link
Collaborator

(depending on your shell, you might need to escape the #)

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Ok, I think I have a broken test case. Will type up the fix soon-ish. Though I'm not sure if this is the problem you're encountering or not.

var s1 = _([1, 2, 3]);
var s2 = _([11, 12, 13]);
_([s1.fork(), s2, s1.fork()])
    .parallel(3)
    .consume(function (err, x, push, next) {
        push(err, x);
        if (x.buf === 21) {
            // Pause for a while.
            setTimeout(next, 1000);
        }
        else if (x !== _.nil) {
            next();
        }
    })
    .toArray(_.log);
// => [ 1, 2, 3, 11, 12, 13 ]
// should be
// => [ 1, 2, 3, 11, 12, 13, 1, 2, 3 ]

Basically, when s1 ends, if

  • s2 has already ended, and
  • the parallel stream is paused (caused by the decoder)

then anything that s3 has already emitted gets dropped on the floor.

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Can you try testing against the code in #331?

vqvu/highland#parallel-issue-328

@vqvu vqvu added the bug label Jun 30, 2015
@ronag
Copy link
Author

ronag commented Jun 30, 2015

Will do later today.

@ronag
Copy link
Author

ronag commented Jun 30, 2015

That seems to have solved it. I haven't seen any corruption so far with the same test cases.

@vqvu vqvu closed this as completed in 5db4fc1 Jun 30, 2015
@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Please reopen if you run into the problem again.

@vqvu vqvu added this to the v2.6.0 milestone Nov 18, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants