Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(send_queue): Persist failed to send errors #4137

Merged
merged 10 commits into from
Oct 23, 2024
Merged
71 changes: 4 additions & 67 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::{
mod content;

pub use content::MessageContent;
use matrix_sdk::deserialized_responses::QueueWedgeError;

#[derive(uniffi::Object)]
#[repr(transparent)]
Expand Down Expand Up @@ -908,42 +909,11 @@ pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,

/// One or more verified users in the room has an unsigned device.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem) set.
VerifiedUserHasUnsignedDevice {
/// The unsigned devices belonging to verified users. A map from user ID
/// to a list of device IDs.
devices: HashMap<String, Vec<String>>,
},

/// One or more verified users in the room has changed identity since they
/// were verified.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem)
/// set, or when using [`CollectStrategy::IdentityBasedStrategy`].
VerifiedUserChangedIdentity {
/// The users that were previously verified, but are no longer
users: Vec<String>,
},

/// The user does not have cross-signing set up, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
CrossSigningNotSetup,

/// The current device is not verified, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
SendingFromUnverifiedDevice,

/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
/// Stringified error message.
error: String,
/// The error reason, with information for the user.
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand All @@ -962,46 +932,13 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error, is_recoverable } => {
event_send_state_from_sending_failed(error, *is_recoverable)
Self::SendingFailed { is_recoverable: *is_recoverable, error: error.clone() }
}
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
}
}
}

fn event_send_state_from_sending_failed(error: &Error, is_recoverable: bool) -> EventSendState {
use matrix_sdk::crypto::{OlmError, SessionRecipientCollectionError::*};

match error {
// Special-case the SessionRecipientCollectionErrors, to pass the information they contain
// back to the application.
Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error {
VerifiedUserHasUnsignedDevice(devices) => {
let devices = devices
.iter()
.map(|(user_id, devices)| {
(
user_id.to_string(),
devices.iter().map(|device_id| device_id.to_string()).collect(),
)
})
.collect();
EventSendState::VerifiedUserHasUnsignedDevice { devices }
}

VerifiedUserChangedIdentity(bad_users) => EventSendState::VerifiedUserChangedIdentity {
users: bad_users.iter().map(|user_id| user_id.to_string()).collect(),
},

CrossSigningNotSetup => EventSendState::CrossSigningNotSetup,

SendingFromUnverifiedDevice => EventSendState::SendingFromUnverifiedDevice,
},

_ => EventSendState::SendingFailed { error: error.to_string(), is_recoverable },
}
}

/// Recommended decorations for decrypted messages, representing the message's
/// authenticity properties.
#[derive(uniffi::Enum, Clone)]
Expand Down
49 changes: 48 additions & 1 deletion crates/matrix-sdk-common/src/deserialized_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::BTreeMap, fmt};
use std::{
collections::{BTreeMap, HashMap},
fmt,
};

use ruma::{
events::{AnyMessageLikeEvent, AnySyncTimelineEvent, AnyTimelineEvent},
Expand Down Expand Up @@ -703,6 +706,50 @@ impl From<SyncTimelineEventDeserializationHelperV0> for SyncTimelineEvent {
}
}

/// Represent a failed to send error of an event sent via the send_queue.
/// These errors are not automatically retrievable, but yet some manual action
/// can be taken before retry sending.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum QueueWedgeError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is related to the send queue, can we put it in the send queue file? Or is there a reason I'm missing to put it into deserialized_responses?

Also, should this use our common pattern of deriving thiserror::Error, to make it clear it's an error type, and would avoid the manual display impl? (uniffi also has an uniffi::Error derive which could be convenient here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send Queue is in matrix-sdk and this error is persisted in the state storage matrix-sdk-base.
So it cannot be in the send_queue.rs file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, can we put those in matrix-sdk-base next to the QueueEvent type, then? (or in a new file send_queue.rs somewhere in that crate)

/// This error occurs when there are some insecure devices in the room, and
/// the current encryption setting prohibit sharing with them.
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
InsecureDevices {
/// The insecure devices as a Map of userID to deviceID.
user_device_map: HashMap<String, Vec<String>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside the FFI layer, we're not using stringly-typing, so this would become a HashMap<OwnedUserId, Vec<OwnedDeviceId>>, and similarly below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 3e65753

},
/// This error occurs when a previously verified user is not anymore, and
/// the current encryption setting prohibit sharing when it happens.
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
IdentityViolations {
/// The users that are expected to be verified but are not.
users: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec<OwnedUserId>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see here #4137 (comment)

},
/// It is required to set up cross-signing and properly verify the current
/// session before sending.
CrossVerificationRequired,
/// Other errors.
GenericApiError { msg: String },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be a Box<dyn Error> or something similar? It's a bit weird to store the stringified error, since API users can't match on it, unless they precisely know what string to expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see here #4137 (comment)

}

/// Simple display implementation that strips out userIds/DeviceIds to avoid
/// accidental logging.
impl fmt::Display for QueueWedgeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueueWedgeError::InsecureDevices { .. } => {
f.write_str("There are insecure devices in the room")
}
QueueWedgeError::IdentityViolations { .. } => {
f.write_str("Some users that were previously verified are not anymore")
}
QueueWedgeError::CrossVerificationRequired => {
f.write_str("Own verification is required")
}
QueueWedgeError::GenericApiError { msg } => f.write_str(msg),
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down
12 changes: 6 additions & 6 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use matrix_sdk::{
},
Result, Room,
};
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
Expand Down Expand Up @@ -1262,9 +1263,9 @@ impl<P: RoomDataProvider> TimelineController<P> {
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors
// that occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
error: QueueWedgeError::GenericApiError {
msg: MISSING_LOCAL_ECHO_FAIL_ERROR.into(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for &str to String conversions, we prefer .to_owned().

},
is_recoverable: false,
},
)
Expand Down Expand Up @@ -1521,9 +1522,8 @@ impl TimelineController {
}
}

#[derive(Debug, Error)]
#[error("local echo failed to send in a previous session")]
struct MissingLocalEchoFailError;
const MISSING_LOCAL_ECHO_FAIL_ERROR: &'static str =
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
"local echo failed to send in a previous session";

#[derive(Debug, Default)]
pub(super) struct HandleManyEventsResult {
Expand Down
7 changes: 3 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/event_item/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use as_variant::as_variant;
use matrix_sdk::{send_queue::SendHandle, Error};
use matrix_sdk::send_queue::SendHandle;
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use ruma::{EventId, OwnedEventId, OwnedTransactionId};

use super::TimelineEventItemId;
Expand Down Expand Up @@ -70,7 +69,7 @@ pub enum EventSendState {
/// sending has failed.
SendingFailed {
/// Details about how sending the event failed.
error: Arc<Error>,
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand Down
12 changes: 4 additions & 8 deletions crates/matrix-sdk-ui/src/timeline/tests/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{io, sync::Arc};

use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use matrix_sdk::{
assert_next_matches_with_timeout, send_queue::RoomSendQueueUpdate,
test_utils::events::EventFactory, Error,
test_utils::events::EventFactory,
};
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use matrix_sdk_test::{async_test, ALICE, BOB};
use ruma::{
event_id,
Expand Down Expand Up @@ -66,15 +65,12 @@ async fn test_remote_echo_full_trip() {
// Scenario 2: The local event has not been sent to the server successfully, it
// has failed. In this case, there is no event ID.
{
let some_io_error = Error::Io(io::Error::new(io::ErrorKind::Other, "this is a test"));
let some_io_error = QueueWedgeError::GenericApiError { msg: "this is a test".to_owned() };
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved
timeline
.controller
.update_event_send_state(
&txn_id,
EventSendState::SendingFailed {
error: Arc::new(some_io_error),
is_recoverable: true,
},
EventSendState::SendingFailed { error: some_io_error, is_recoverable: true },
)
.await;

Expand Down
52 changes: 44 additions & 8 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,23 @@ use matrix_sdk_base::{
},
RoomState, StoreError,
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use matrix_sdk_common::{
deserialized_responses::QueueWedgeError,
executor::{spawn, JoinHandle},
};
use ruma::{
events::{
reaction::ReactionEventContent, relation::Annotation, AnyMessageLikeEventContent,
EventContent as _,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, TransactionId,
};
use tokio::sync::{broadcast, Notify, RwLock};
use tracing::{debug, error, info, instrument, trace, warn};

#[cfg(feature = "e2e-encryption")]
use crate::crypto::{OlmError, SessionRecipientCollectionError};
use crate::{
client::WeakClient,
config::RequestConfig,
Expand Down Expand Up @@ -187,7 +192,7 @@ pub struct SendQueueRoomError {
pub room_id: OwnedRoomId,

/// The error the room has ran into, when trying to send an event.
pub error: Arc<crate::Error>,
pub error: QueueWedgeError,

/// Whether the error is considered recoverable or not.
///
Expand Down Expand Up @@ -494,6 +499,8 @@ impl RoomSendQueue {
_ => false,
};

let wedged_err = convert_error_to_wedged_reason(&err);

if is_recoverable {
warn!(txn_id = %queued_event.transaction_id, error = ?err, "Recoverable error when sending event: {err}, disabling send queue");

Expand All @@ -520,17 +527,15 @@ impl RoomSendQueue {
}
}

let error = Arc::new(err);

let _ = global_error_reporter.send(SendQueueRoomError {
room_id: room.room_id().to_owned(),
error: error.clone(),
error: wedged_err.clone(),
is_recoverable,
});

let _ = updates.send(RoomSendQueueUpdate::SendError {
transaction_id: queued_event.transaction_id,
error,
error: wedged_err,
is_recoverable,
});
}
Expand Down Expand Up @@ -577,6 +582,37 @@ impl RoomSendQueue {
}
}

fn convert_error_to_wedged_reason(value: &crate::Error) -> QueueWedgeError {
match value {
#[cfg(feature = "e2e-encryption")]
crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error {
SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
QueueWedgeError::InsecureDevices {
user_device_map: user_map
.iter()
.map(|(user_id, devices)| {
(
user_id.to_string(),
devices.iter().map(|device_id| device_id.to_string()).collect(),
)
})
.collect(),
}
}
SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
QueueWedgeError::IdentityViolations {
users: users.iter().map(OwnedUserId::to_string).collect(),
}
}
SessionRecipientCollectionError::CrossSigningNotSetup
| SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
QueueWedgeError::CrossVerificationRequired
}
},
_ => QueueWedgeError::GenericApiError { msg: value.to_string() },
}
}

struct RoomSendQueueInner {
/// The room which this send queue relates to.
room: WeakRoom,
Expand Down Expand Up @@ -1193,7 +1229,7 @@ pub enum RoomSendQueueUpdate {
/// Transaction id used to identify this event.
transaction_id: OwnedTransactionId,
/// Error received while sending the event.
error: Arc<crate::Error>,
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use assert_matches2::{assert_let, assert_matches};
use matrix_sdk::{
config::{RequestConfig, StoreConfig},
deserialized_responses::QueueWedgeError,
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueError, RoomSendQueueUpdate},
test_utils::{
events::EventFactory, logged_in_client, logged_in_client_with_server, set_client_session,
Expand Down Expand Up @@ -471,8 +472,7 @@ async fn test_error_then_locally_reenabling() {
// seconds).
// It's the same transaction id that's used to signal the send error.
let error = assert_update!(watch => error { recoverable=true, txn=txn1 });
let error = error.as_client_api_error().unwrap();
assert_eq!(error.status_code, 500);
assert_matches!(error, QueueWedgeError::GenericApiError { .. });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change justifies why we need to store the error itself, in the case of GenericApiError: otherwise here we're loosing information that it could be a client API error. Maybe GenericApiError could contain a (boxed or not) matrix_sdk::Error, as this is the most general error type we have in this crate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed now, see here details #4137 (comment)


sleep(Duration::from_millis(50)).await;

Expand Down