Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved rumqttc event loop check. #2362

Merged

Conversation

suneetnangia
Copy link
Contributor

No description provided.

@itowlson
Copy link
Contributor

The CI failure in the plugins test is because an external web server is returning a different error from the one we expected; it has nothing to do with this PR and can be overridden (assuming there are no other failures)

@itowlson
Copy link
Contributor

@suneetnangia Is this intentionally still draft? No rush, just checking that it's deliberate.

@suneetnangia
Copy link
Contributor Author

@suneetnangia Is this intentionally still draft? No rush, just checking that it's deliberate.
We can merge now, I was trying to see if there's any better way, will raise subsequent PR as needed but for now it brings stability to the publish part.

@suneetnangia suneetnangia marked this pull request as ready for review March 14, 2024 08:22
@suneetnangia suneetnangia marked this pull request as draft March 14, 2024 11:09
@suneetnangia
Copy link
Contributor Author

@itowlson do we have a performance testing project for these outbound and triggers components? Ideally, I want to hammer MQTT trigger and publish component at some point. For now, I have improved the code but I definitely think it can be better e.g. running eventloop async task as soon as connection is created, I'll spend sometime on it later. Thanks

@suneetnangia suneetnangia marked this pull request as ready for review March 14, 2024 12:59
@itowlson
Copy link
Contributor

itowlson commented Mar 14, 2024

@vdice Can you force merge this please? The CI error is unrelated to the PR.

@suneetnangia
Copy link
Contributor Author

suneetnangia commented Mar 14, 2024

Since our last comment here, I've tested a better solution (as I alluded to) with a few thousand messages (100k or so). Updated.

let (client, mut event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);

// Start the task to poll the event loop which sends the messages to the MQTT broker.
// Publisher client code wait for this task to complete before returning, ensuring all events are processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite getting how this comment applies. I see the Tokio task being created but not how it's being awaited or errors propagated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that's for the client of this outbound crate i.e. Spin Rust SDK example waits for this task to complete. Can rephrase if you want me to...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how though. The async block spawns a new task for the loop, but that task seems to just run until the connection fails. It's never returned to the guest... in fact it's in Connection::open not even in the publish method...?

Or are you saying that this is what allows client.publish_bytes (which is called in the publish method) to return the error - the guest doesn't await this task directly, but client.publish_bytes internally sort of does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or completely remove it if that's better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would maybe remove the reference to the publisher client code waiting for this task. Maybe just comment that this needs to run in the background to propagate errors to the guest.

I would still like to better understand what is happening here though. Like how that error gets to the guest. I understood it when you polled the event loop after doing the send, but I feel a bit lost now. I'm sirry if I'm being obtuse!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me update comment.
Having eventloop in publish like in my previous commit results in messages being dropped without raising error, when throughput is high i.e. 200k messages in my test. Plus it has it's own challenges i.e. we exit the eventloop on publish event but this event is not necessarily correlated to that publish call always, it can be from another parallel call going on, as eventloop is shared across client. Plus, error in one publish call (if 1000s are going on) can shift the publish event ack correlation for their respective calls.

Now that was the reason to try another approach (eventloop in connection):
When eventloop returns with error (e.g. due to connectivity), client.publish_bytes in unable to connect to it, so client.publish_bytes throws error, this error is then propagated to guest in publish. Guest can then execute open to reconnect, not ideal but allows us to have high throughput with connection feedback to guest. Now I was testing with with my 200K messages and it does as above but in my further testing today it's not working as expected with a smaller no of messages which is a bummer, I suspect that is because eventloop's takes time to realise connection issues to throw the error but by that time client.publish_bytes has already published a few messages.

In main, MQTT publish currently does not work, so one option would be to use "eventloop in publish" approach (first) for now to get it working and find another solution which is better than both of the above?
Apols, been juggling a few things, far from ideal.

@itowlson
Copy link
Contributor

@vdice sorry, please hold off, still some changes happening after all

@suneetnangia suneetnangia marked this pull request as draft March 15, 2024 16:32
@suneetnangia suneetnangia force-pushed the issue/2335/fix-swallowed-mqtt-errors branch from e849605 to 5c70710 Compare March 19, 2024 11:21
@itowlson
Copy link
Contributor

@suneetnangia Not to rush you, but do we have a way forward here? Failing that, is there an intermediate solution that's "better than today" even if not perfect? Thanks!

@suneetnangia
Copy link
Contributor Author

@suneetnangia Not to rush you, but do we have a way forward here? Failing that, is there an intermediate solution that's "better than today" even if not perfect? Thanks!

@itowlson this PR now have the fix (rolled back latest one) which makes it much better than it's current state (in fact current does not work), if we can get this in for now and I'll look at better solution next week... have some time off.

@suneetnangia suneetnangia marked this pull request as ready for review April 1, 2024 11:54
@itowlson itowlson merged commit 4d1cbe3 into fermyon:main Apr 1, 2024
15 checks passed
@itowlson
Copy link
Contributor

itowlson commented Apr 1, 2024

Thank you @suneetnangia and enjoy your break!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants