Skip to content

Commit

Permalink
fix(iroh-net): use try_send rather than send so we dont block the…
Browse files Browse the repository at this point in the history
… local swarm discovery service (#2794)

## Description

Using `subscriber.send().await` could block the entire discovery service
if one of the subscribers polls too slow. Change to `try_send` that will
drop the discovery item from that stream if it is closed.

## Notes & open questions

Added a line in the documentation to mention that if you do not poll
enough, you may miss messages.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.

---------

Co-authored-by: Diva M <[email protected]>
  • Loading branch information
ramfox and divagant-martian authored Oct 7, 2024
1 parent c349c43 commit 2d04306
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
2 changes: 2 additions & 0 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
/// until the stream is actually polled. To avoid missing discovered nodes,
/// poll the stream as soon as possible.
///
/// If you do not regularly poll the stream, you may miss discovered nodes.
///
/// Any discovery systems that only discover when explicitly resolving a
/// specific [`NodeId`] do not need to implement this method. Any nodes or
/// addresses that are discovered by calling `resolve` should NOT be added
Expand Down
22 changes: 17 additions & 5 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use watchable::Watchable;

use iroh_base::key::PublicKey;
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::{sync::mpsc, task::JoinSet};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinSet,
};
use tokio_util::task::AbortOnDropHandle;

use crate::{
Expand Down Expand Up @@ -103,12 +106,21 @@ impl Subscribers {
/// Sends the `node_id` and `item` to each subscriber.
///
/// Cleans up any subscribers that have been dropped.
async fn send(&mut self, item: DiscoveryItem) {
fn send(&mut self, item: DiscoveryItem) {
let mut clean_up = vec![];
for (i, subscriber) in self.0.iter().enumerate() {
// assume subscriber was dropped
if (subscriber.send(item.clone()).await).is_err() {
clean_up.push(i);
if let Err(err) = subscriber.try_send(item.clone()) {
match err {
TrySendError::Full(_) => {
warn!(
?item,
idx = i,
"local swarm discovery subscriber is blocked, dropping item"
)
}
TrySendError::Closed(_) => clean_up.push(i),
}
}
}
for i in clean_up.into_iter().rev() {
Expand Down Expand Up @@ -236,7 +248,7 @@ impl LocalSwarmDiscovery {
// in other words, nodes sent to the `subscribers` should only be the ones that
// have been "passively" discovered
if !resolved {
subscribers.send(item).await;
subscribers.send(item);
}
}
Message::Resolve(node_id, sender) => {
Expand Down

0 comments on commit 2d04306

Please sign in to comment.