Skip to content

Commit

Permalink
fix(iroh-relay): removes deadlock in Clients (#3099)
Browse files Browse the repository at this point in the history
## Description

Remove the deadlock that was a bit hidden due to `DashMap`.

Added `client` parameter to `unregister` to explicitly drop before
attempting to call into the `clients` `DashMap` again.

## Notes & open questions

The `warn` log for stream termination seems a little fear-mongering, but
I'm not sure the best way to "downgrade" this, as we seem to rely on
this error in tests like `endpoint_relay_connect_loop`. Ended up leaving
it as is.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.

---------

Co-authored-by: “ramfox” <“[email protected]”>
Co-authored-by: Friedel Ziegelmayer <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent 60ba9ac commit c650ea8
Showing 1 changed file with 56 additions and 35 deletions.
91 changes: 56 additions & 35 deletions iroh-relay/src/server/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iroh_metrics::inc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, trace};

use super::client::{Client, Config, Packet};
use super::client::{Client, Config};
use crate::server::metrics::Metrics;

/// Manages the connections to all currently connected clients.
Expand Down Expand Up @@ -56,7 +56,14 @@ impl Clients {
/// Removes the client from the map of clients, & sends a notification
/// to each client that peers has sent data to, to let them know that
/// peer is gone from the network.
async fn unregister(&self, node_id: NodeId) {
///
/// Explicitly drops the reference to the client to avoid deadlock.
async fn unregister<'a>(
&self,
client: dashmap::mapref::one::Ref<'a, iroh_base::PublicKey, Client>,
node_id: NodeId,
) {
drop(client); // avoid deadlock
trace!(node_id = node_id.fmt_short(), "unregistering client");

if let Some((_, client)) = self.0.clients.remove(&node_id) {
Expand All @@ -83,42 +90,53 @@ impl Clients {
}
}

/// Attempt to send a packet to client with [`NodeId`] `dst`
/// Attempt to send a packet to client with [`NodeId`] `dst`.
pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> {
if let Some(client) = self.0.clients.get(&dst) {
let res = client.try_send_packet(src, data);
return self.process_result(src, dst, res).await;
let Some(client) = self.0.clients.get(&dst) else {
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
inc!(Metrics, send_packets_dropped);
return Ok(());
};
match client.try_send_packet(src, data) {
Ok(_) => {
// Record sent_to relationship
self.0.sent_to.entry(src).or_default().insert(dst);
Ok(())
}
Err(TrySendError::Full(_)) => {
debug!(
dst = dst.fmt_short(),
"client too busy to receive packet, dropping packet"
);
bail!("failed to send message: full");
}
Err(TrySendError::Closed(_)) => {
debug!(
dst = dst.fmt_short(),
"can no longer write to client, dropping message and pruning connection"
);
self.unregister(client, dst).await;
bail!("failed to send message: gone");
}
}
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
inc!(Metrics, send_packets_dropped);
Ok(())
}

/// Attempt to send a disco packet to client with [`NodeId`] `dst`.
pub(super) async fn send_disco_packet(
&self,
dst: NodeId,
data: Bytes,
src: NodeId,
) -> Result<()> {
if let Some(client) = self.0.clients.get(&dst) {
let res = client.try_send_disco_packet(src, data);
return self.process_result(src, dst, res).await;
}
debug!(
dst = dst.fmt_short(),
"no connected client, dropped disco packet"
);
inc!(Metrics, disco_packets_dropped);
Ok(())
}

async fn process_result(
&self,
src: NodeId,
dst: NodeId,
res: Result<(), TrySendError<Packet>>,
) -> Result<()> {
match res {
let Some(client) = self.0.clients.get(&dst) else {
debug!(
dst = dst.fmt_short(),
"no connected client, dropped disco packet"
);
inc!(Metrics, disco_packets_dropped);
return Ok(());
};
match client.try_send_disco_packet(src, data) {
Ok(_) => {
// Record sent_to relationship
self.0.sent_to.entry(src).or_default().insert(dst);
Expand All @@ -127,17 +145,17 @@ impl Clients {
Err(TrySendError::Full(_)) => {
debug!(
dst = dst.fmt_short(),
"client too busy to receive packet, dropping packet"
"client too busy to receive disco packet, dropping packet"
);
bail!("failed to send message");
bail!("failed to send message: full");
}
Err(TrySendError::Closed(_)) => {
debug!(
dst = dst.fmt_short(),
"can no longer write to client, dropping message and pruning connection"
"can no longer write to client, dropping disco message and pruning connection"
);
self.unregister(dst).await;
bail!("failed to send message");
self.unregister(client, dst).await;
bail!("failed to send message: gone");
}
}
}
Expand Down Expand Up @@ -212,8 +230,11 @@ mod tests {
}
);

// send peer_gone
clients.unregister(a_key).await;
let client = clients.0.clients.get(&a_key).unwrap();

// send peer_gone. Also, tests that we do not get a deadlock
// when unregistering.
clients.unregister(client, a_key).await;

assert!(!clients.0.clients.contains_key(&a_key));
clients.shutdown().await;
Expand Down

0 comments on commit c650ea8

Please sign in to comment.