Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(send_queue): Persist failed to send errors #4137

Merged
merged 10 commits into from
Oct 23, 2024
Merged
71 changes: 4 additions & 67 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::{
mod content;

pub use content::MessageContent;
use matrix_sdk::deserialized_responses::QueueWedgeError;

#[derive(uniffi::Object)]
#[repr(transparent)]
Expand Down Expand Up @@ -908,42 +909,11 @@ pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,

/// One or more verified users in the room has an unsigned device.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem) set.
VerifiedUserHasUnsignedDevice {
/// The unsigned devices belonging to verified users. A map from user ID
/// to a list of device IDs.
devices: HashMap<String, Vec<String>>,
},

/// One or more verified users in the room has changed identity since they
/// were verified.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem)
/// set, or when using [`CollectStrategy::IdentityBasedStrategy`].
VerifiedUserChangedIdentity {
/// The users that were previously verified, but are no longer
users: Vec<String>,
},

/// The user does not have cross-signing set up, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
CrossSigningNotSetup,

/// The current device is not verified, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
SendingFromUnverifiedDevice,

/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
/// Stringified error message.
error: String,
/// The error reason, with information for the user.
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand All @@ -962,46 +932,13 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error, is_recoverable } => {
event_send_state_from_sending_failed(error, *is_recoverable)
Self::SendingFailed { is_recoverable: *is_recoverable, error: error.clone() }
}
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
}
}
}

fn event_send_state_from_sending_failed(error: &Error, is_recoverable: bool) -> EventSendState {
use matrix_sdk::crypto::{OlmError, SessionRecipientCollectionError::*};

match error {
// Special-case the SessionRecipientCollectionErrors, to pass the information they contain
// back to the application.
Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error {
VerifiedUserHasUnsignedDevice(devices) => {
let devices = devices
.iter()
.map(|(user_id, devices)| {
(
user_id.to_string(),
devices.iter().map(|device_id| device_id.to_string()).collect(),
)
})
.collect();
EventSendState::VerifiedUserHasUnsignedDevice { devices }
}

VerifiedUserChangedIdentity(bad_users) => EventSendState::VerifiedUserChangedIdentity {
users: bad_users.iter().map(|user_id| user_id.to_string()).collect(),
},

CrossSigningNotSetup => EventSendState::CrossSigningNotSetup,

SendingFromUnverifiedDevice => EventSendState::SendingFromUnverifiedDevice,
},

_ => EventSendState::SendingFailed { error: error.to_string(), is_recoverable },
}
}

/// Recommended decorations for decrypted messages, representing the message's
/// authenticity properties.
#[derive(uniffi::Enum, Clone)]
Expand Down
24 changes: 17 additions & 7 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use assert_matches::assert_matches;
use assert_matches2::assert_let;
use async_trait::async_trait;
use growable_bloom_filter::GrowableBloomBuilder;
use matrix_sdk_common::deserialized_responses::QueueWedgeError;
use matrix_sdk_test::test_json;
use ruma::{
api::MatrixVersion,
Expand Down Expand Up @@ -1223,7 +1224,7 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "msg0");

assert!(!pending[0].is_wedged);
assert!(!pending[0].is_wedged());
}

// Saving another three things should work.
Expand All @@ -1249,12 +1250,18 @@ impl StateStoreIntegrationTests for DynStateStore {
let deserialized = pending[i].event.deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));
assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}

// Marking an event as wedged works.
let txn2 = &pending[2].transaction_id;
self.update_send_queue_event_status(room_id, txn2, true).await.unwrap();
self.update_send_queue_event_status(
room_id,
txn2,
Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
)
.await
.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
Expand All @@ -1263,10 +1270,13 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_eq!(pending.len(), 4);
assert_eq!(pending[0].transaction_id, txn0);
assert_eq!(pending[2].transaction_id, *txn2);
assert!(pending[2].is_wedged);
assert!(pending[2].is_wedged());
let error = pending[2].clone().error.unwrap();
let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
assert_eq!(generic_error, "Oops");
for i in 0..4 {
if i != 2 {
assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}
}

Expand All @@ -1288,15 +1298,15 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "wow that's a cool test");

assert!(!pending[2].is_wedged);
assert!(!pending[2].is_wedged());

for i in 0..4 {
if i != 2 {
let deserialized = pending[i].event.deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));

assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{

use async_trait::async_trait;
use growable_bloom_filter::GrowableBloom;
use matrix_sdk_common::deserialized_responses::QueueWedgeError;
use ruma::{
canonical_json::{redact, RedactedBecause},
events::{
Expand Down Expand Up @@ -815,7 +816,7 @@ impl StateStore for MemoryStore {
.unwrap()
.entry(room_id.to_owned())
.or_default()
.push(QueuedEvent { event, transaction_id, is_wedged: false });
.push(QueuedEvent { event, transaction_id, error: None });
Ok(())
}

Expand All @@ -835,7 +836,7 @@ impl StateStore for MemoryStore {
.find(|item| item.transaction_id == transaction_id)
{
entry.event = content;
entry.is_wedged = false;
entry.error = None;
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -876,7 +877,7 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
if let Some(entry) = self
.send_queue_events
Expand All @@ -887,7 +888,7 @@ impl StateStore for MemoryStore {
.iter_mut()
.find(|item| item.transaction_id == transaction_id)
{
entry.is_wedged = wedged;
entry.error = error;
}
Ok(())
}
Expand Down
24 changes: 17 additions & 7 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use as_variant::as_variant;
use async_trait::async_trait;
use growable_bloom_filter::GrowableBloom;
use matrix_sdk_common::AsyncTraitDeps;
use matrix_sdk_common::{deserialized_responses::QueueWedgeError, AsyncTraitDeps};
use ruma::{
api::MatrixVersion,
events::{
Expand Down Expand Up @@ -392,12 +392,13 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
) -> Result<Vec<QueuedEvent>, Self::Error>;

/// Updates the send queue wedged status for a given send queue event.
/// Updates the send queue error status (wedge) for a given send queue
/// event.
bnjbvr marked this conversation as resolved.
Show resolved Hide resolved
async fn update_send_queue_event_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error>;

/// Loads all the rooms which have any pending events in their send queue.
Expand Down Expand Up @@ -662,10 +663,10 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
self.0
.update_send_queue_event_status(room_id, transaction_id, wedged)
.update_send_queue_event_status(room_id, transaction_id, error)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -1156,7 +1157,16 @@ pub struct QueuedEvent {
/// If the event couldn't be sent because of an API error, it's marked as
/// wedged, and won't ever be peeked for sending. The only option is to
/// remove it.
pub is_wedged: bool,
pub error: Option<QueueWedgeError>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this doc comment, since an identical one was added below to is_wedged()?

}

impl QueuedEvent {
/// If the event couldn't be sent because of an API error, it's marked as
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is context around what it means to have an error; I'd rather put that near the error field itself, and have the doc comment here say something like "returns whether the queued event is wedged (i.e. ran into an unrecoverable error) or not"

/// wedged, and won't ever be peeked for sending. The only option is to
/// remove it.
pub fn is_wedged(&self) -> bool {
self.error.is_some()
}
}

/// The specific user intent that characterizes a [`DependentQueuedEvent`].
Expand Down Expand Up @@ -1254,7 +1264,7 @@ impl fmt::Debug for QueuedEvent {
// Hide the content from the debug log.
f.debug_struct("QueuedEvent")
.field("transaction_id", &self.transaction_id)
.field("is_wedged", &self.is_wedged)
.field("is_wedged", &self.is_wedged())
.finish_non_exhaustive()
}
}
Expand Down
49 changes: 48 additions & 1 deletion crates/matrix-sdk-common/src/deserialized_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::BTreeMap, fmt};
use std::{
collections::{BTreeMap, HashMap},
fmt,
};

use ruma::{
events::{AnyMessageLikeEvent, AnySyncTimelineEvent, AnyTimelineEvent},
Expand Down Expand Up @@ -703,6 +706,50 @@ impl From<SyncTimelineEventDeserializationHelperV0> for SyncTimelineEvent {
}
}

/// Represent a failed to send error of an event sent via the send_queue.
/// These errors are not automatically retrievable, but yet some manual action
/// can be taken before retry sending.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum QueueWedgeError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is related to the send queue, can we put it in the send queue file? Or is there a reason I'm missing to put it into deserialized_responses?

Also, should this use our common pattern of deriving thiserror::Error, to make it clear it's an error type, and would avoid the manual display impl? (uniffi also has an uniffi::Error derive which could be convenient here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send Queue is in matrix-sdk and this error is persisted in the state storage matrix-sdk-base.
So it cannot be in the send_queue.rs file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, can we put those in matrix-sdk-base next to the QueueEvent type, then? (or in a new file send_queue.rs somewhere in that crate)

/// This error occurs when there are some insecure devices in the room, and
/// the current encryption setting prohibit sharing with them.
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
InsecureDevices {
/// The insecure devices as a Map of userID to deviceID.
user_device_map: HashMap<String, Vec<String>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside the FFI layer, we're not using stringly-typing, so this would become a HashMap<OwnedUserId, Vec<OwnedDeviceId>>, and similarly below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 3e65753

},
/// This error occurs when a previously verified user is not anymore, and
/// the current encryption setting prohibit sharing when it happens.
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
IdentityViolations {
/// The users that are expected to be verified but are not.
users: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec<OwnedUserId>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see here #4137 (comment)

},
/// It is required to set up cross-signing and properly verify the current
/// session before sending.
CrossVerificationRequired,
/// Other errors.
GenericApiError { msg: String },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be a Box<dyn Error> or something similar? It's a bit weird to store the stringified error, since API users can't match on it, unless they precisely know what string to expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see here #4137 (comment)

}

/// Simple display implementation that strips out userIds/DeviceIds to avoid
/// accidental logging.
impl fmt::Display for QueueWedgeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueueWedgeError::InsecureDevices { .. } => {
f.write_str("There are insecure devices in the room")
}
QueueWedgeError::IdentityViolations { .. } => {
f.write_str("Some users that were previously verified are not anymore")
}
QueueWedgeError::CrossVerificationRequired => {
f.write_str("Own verification is required")
}
QueueWedgeError::GenericApiError { msg } => f.write_str(msg),
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down
Loading
Loading