Skip to content

Commit

Permalink
feat: Make Endpoint::close infallible (#3112)
Browse files Browse the repository at this point in the history
## Description

<!-- A summary of what this pull request achieves and a rough list of
changes. -->
The only source for a `Result::Err` in `Endpoint::close` was a failed
magic sock actor `Shutdown` message send.

This send should basically shutdown the actor, and sending only fails
when the actor dropped the receiver, which only happens when the actor
already shut down, gracefully or otherwise. In any case, it can't react
to that anyways and it's not worth surfacing that error to the user.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->
- `iroh::Endpoint::close`'s future is now infallible, instead of
returning a `Result`.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- ~~[ ] Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
matheus23 authored Jan 10, 2025
1 parent 1ae820d commit 870c76e
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ let response = recv.read_to_end(1000).await?;
assert_eq!(&response, b"Hello, world!");

// Close the endpoint and all its connections
endpoint.close().await?;
endpoint.close().await;
```

And on the accepting side:
Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/src/server/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Clients {
/// peer is gone from the network.
///
/// Must be passed a matching connection_id.
pub(super) async fn unregister<'a>(&self, connection_id: u64, node_id: NodeId) {
pub(super) async fn unregister(&self, connection_id: u64, node_id: NodeId) {
trace!(
node_id = node_id.fmt_short(),
connection_id,
Expand Down
7 changes: 3 additions & 4 deletions iroh/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,16 @@ pub enum EndpointSelector {
}

impl EndpointSelector {
pub async fn close(self) -> Result<()> {
pub async fn close(self) {
match self {
EndpointSelector::Iroh(endpoint) => {
endpoint.close().await?;
endpoint.close().await;
}
#[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))]
EndpointSelector::Quinn(endpoint) => {
endpoint.close(0u32.into(), b"");
}
}
Ok(())
}
}

Expand Down Expand Up @@ -255,7 +254,7 @@ pub async fn client_handler(
// to `Arc`ing them
connection.close(0u32, b"Benchmark done");

endpoint.close().await?;
endpoint.close().await;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ async fn main() -> anyhow::Result<()> {

// We received the last message: close all connections and allow for the close
// message to be sent.
endpoint.close().await?;
endpoint.close().await;
Ok(())
}
8 changes: 1 addition & 7 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ async fn fetch(ticket: &str, relay_url: Option<String>, relay_only: bool) -> any

// We received the last message: close all connections and allow for the close
// message to be sent.
tokio::time::timeout(Duration::from_secs(3), async move {
let res = endpoint.close().await;
if res.is_err() {
println!("failed to close connection: {res:#?}");
}
})
.await?;
tokio::time::timeout(Duration::from_secs(3), endpoint.close()).await?;

let duration = start.elapsed();
println!(
Expand Down
14 changes: 5 additions & 9 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,21 +972,17 @@ impl Endpoint {
/// Be aware however that the underlying UDP sockets are only closed
/// on [`Drop`], bearing in mind the [`Endpoint`] is only dropped once all the clones
/// are dropped.
///
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self) -> Result<()> {
pub async fn close(&self) {
if self.is_closed() {
return Ok(());
return;
}

tracing::debug!("Closing connections");
self.endpoint.close(0u16.into(), b"");
self.endpoint.wait_idle().await;

tracing::debug!("Connections closed");
self.msock.close().await?;
Ok(())
self.msock.close().await;
}

/// Check if this endpoint is still alive, or already closed.
Expand Down Expand Up @@ -1594,7 +1590,7 @@ mod tests {

info!("closing endpoint");
// close the endpoint and restart it
endpoint.close().await.unwrap();
endpoint.close().await;

info!("restarting endpoint");
// now restart it and check the addressing info of the peer
Expand Down Expand Up @@ -1693,7 +1689,7 @@ mod tests {
send.stopped().await.unwrap();
recv.read_to_end(0).await.unwrap();
info!("client finished");
ep.close().await.unwrap();
ep.close().await;
info!("client closed");
}
.instrument(error_span!("client", %i))
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
//!
//! // Gracefully close the connection and endpoint.
//! conn.close(1u8.into(), b"done");
//! ep.close().await?;
//! ep.close().await;
//! println!("Client closed");
//! Ok(())
//! }
Expand Down Expand Up @@ -202,7 +202,7 @@
//!
//! // Wait for the client to close the connection and gracefully close the endpoint.
//! conn.closed().await;
//! ep.close().await?;
//! ep.close().await;
//! Ok(())
//! }
//! ```
Expand Down
18 changes: 11 additions & 7 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1654,12 +1654,18 @@ impl Handle {
/// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`]
/// indefinitely after this call.
#[instrument(skip_all, fields(me = %self.msock.me))]
pub(crate) async fn close(&self) -> Result<()> {
pub(crate) async fn close(&self) {
if self.msock.is_closed() {
return Ok(());
return;
}
self.msock.closing.store(true, Ordering::Relaxed);
self.msock.actor_sender.send(ActorMessage::Shutdown).await?;
// If this fails, then there's no receiver listening for shutdown messages,
// so nothing to shut down anyways.
self.msock
.actor_sender
.send(ActorMessage::Shutdown)
.await
.ok();
self.msock.closed.store(true, Ordering::SeqCst);

let mut tasks = self.actor_tasks.lock().await;
Expand All @@ -1681,8 +1687,6 @@ impl Handle {
debug!("aborting remaining {}/3 tasks", tasks.len());
tasks.shutdown().await;
}

Ok(())
}
}

Expand Down Expand Up @@ -3408,8 +3412,8 @@ mod tests {
println!("closing endpoints");
let msock1 = m1.endpoint.magic_sock();
let msock2 = m2.endpoint.magic_sock();
m1.endpoint.close().await?;
m2.endpoint.close().await?;
m1.endpoint.close().await;
m2.endpoint.close().await;

assert!(msock1.msock.is_closed());
assert!(msock2.msock.is_closed());
Expand Down

0 comments on commit 870c76e

Please sign in to comment.