Skip to content

Commit

Permalink
Merge pull request #2362 from suneetnangia/issue/2335/fix-swallowed-m…
Browse files Browse the repository at this point in the history
…qtt-errors

Improved rumqttc event loop check.
  • Loading branch information
itowlson authored Apr 1, 2024
2 parents 36c08f2 + 7577b3c commit 4d1cbe3
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions crates/outbound-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod host_component;
use std::time::Duration;

use anyhow::Result;
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_world::v2::mqtt::{self as v2, Connection as MqttConnection, Error, Qos};

Expand Down Expand Up @@ -97,15 +97,22 @@ impl v2::HostConnection for OutboundMqtt {
.await
.map_err(other_error)?;

// Poll EventLoop once to send the message to MQTT broker or capture/throw error
// We may revisit this later to manage long running connections and their issues in the connection pool.
eventloop
.poll()
.await
.map_err(|err: rumqttc::ConnectionError| {
v2::Error::ConnectionFailed(err.to_string())
})?;

// Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
// We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool.
loop {
let event = eventloop
.poll()
.await
.map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;

match (qos, event) {
(QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
| (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
| (QoS::ExactlyOnce, Event::Outgoing(Outgoing::PubComp(_))) => break,

(_, _) => continue,
}
}
Ok(())
}
.await)
Expand Down

0 comments on commit 4d1cbe3

Please sign in to comment.