diff --git a/Cargo.lock b/Cargo.lock index af8e7a5224..83ac753d80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2780,9 +2780,9 @@ dependencies = [ "clap", "derive_more", "ed25519-dalek", + "futures-concurrency", "futures-lite 2.3.0", "futures-util", - "genawaiter", "indexmap 2.2.6", "iroh-base", "iroh-blake3", diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index 0ab236bdce..7995491fd0 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -65,7 +65,7 @@ impl GossipCommands { line = input_lines.next_line() => { let line = line.context("failed to read from stdin")?; if let Some(line) = line { - sink.send(iroh_gossip::dispatcher::Command::Broadcast(line.into())).await?; + sink.send(iroh_gossip::net::Command::Broadcast(line.into())).await?; } else { break; } @@ -73,14 +73,14 @@ impl GossipCommands { res = stream.next() => { let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?; match res { - iroh_gossip::dispatcher::Event::Gossip(event) => { + iroh_gossip::net::Event::Gossip(event) => { if verbose { println!("{:?}", event); - } else if let iroh_gossip::dispatcher::GossipEvent::Received(iroh_gossip::dispatcher::Message { content, .. }) = event { + } else if let iroh_gossip::net::GossipEvent::Received(iroh_gossip::net::Message { content, .. }) = event { println!("{:?}", content); } } - iroh_gossip::dispatcher::Event::Lagged => { + iroh_gossip::net::Event::Lagged => { anyhow::bail!("gossip stream lagged"); } }; diff --git a/iroh-docs/src/engine.rs b/iroh-docs/src/engine.rs index f6a2ae73aa..332497a78e 100644 --- a/iroh-docs/src/engine.rs +++ b/iroh-docs/src/engine.rs @@ -23,7 +23,6 @@ use tracing::{error, error_span, Instrument}; use crate::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId}; use crate::{Author, AuthorId}; -use self::gossip::GossipActor; use self::live::{LiveActor, ToLiveActor}; pub use self::live::SyncEvent; @@ -69,7 +68,6 @@ impl Engine { default_author_storage: DefaultAuthorStorage, ) -> anyhow::Result { let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP); - let (to_gossip_actor, to_gossip_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP); let me = endpoint.node_id().fmt_short(); let content_status_cb = { @@ -86,17 +84,10 @@ impl Engine { downloader, to_live_actor_recv, live_actor_tx.clone(), - to_gossip_actor, - ); - let gossip_actor = GossipActor::new( - to_gossip_actor_recv, - sync.clone(), - gossip, - live_actor_tx.clone(), ); let actor_handle = tokio::task::spawn( async move { - if let Err(err) = actor.run(gossip_actor).await { + if let Err(err) = actor.run().await { error!("sync actor failed: {err:?}"); } } diff --git a/iroh-docs/src/engine/gossip.rs b/iroh-docs/src/engine/gossip.rs index 210356c969..ab261ef1e6 100644 --- a/iroh-docs/src/engine/gossip.rs +++ b/iroh-docs/src/engine/gossip.rs @@ -1,170 +1,156 @@ -use std::collections::HashSet; +use std::collections::{hash_map, HashMap}; use anyhow::{Context, Result}; +use bytes::Bytes; use futures_lite::StreamExt; use futures_util::FutureExt; -use iroh_gossip::net::{Event, Gossip}; -use iroh_metrics::inc; -use iroh_net::key::PublicKey; +use iroh_gossip::net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender, JoinOptions}; +use iroh_net::NodeId; use tokio::{ - sync::{broadcast, mpsc}, - task::JoinSet, + sync::mpsc, + task::{AbortHandle, JoinSet}, }; -use tokio_stream::{ - wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, - StreamMap, -}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, instrument, warn}; -use crate::metrics::Metrics; use crate::{actor::SyncHandle, ContentStatus, NamespaceId}; use super::live::{Op, ToLiveActor}; -#[derive(strum::Display, Debug)] -pub enum ToGossipActor { - Shutdown, - Join { - namespace: NamespaceId, - peers: Vec, - }, - Leave { - namespace: NamespaceId, - }, +#[derive(Debug)] +struct ActiveState { + sender: GossipSender, + abort_handle: AbortHandle, } -/// This actor subscribes to all gossip events. When receiving entries, they are inserted in the -/// replica (if open). Other events are forwarded to the main actor to be handled there. -pub struct GossipActor { - inbox: mpsc::Receiver, - sync: SyncHandle, +#[derive(Debug)] +pub struct GossipState { gossip: Gossip, - to_sync_actor: mpsc::Sender, - joined: HashSet, - want_join: HashSet, - pending_joins: JoinSet<(NamespaceId, Result>)>, - gossip_events: StreamMap>, + sync: SyncHandle, + to_live_actor: mpsc::Sender, + active: HashMap, + active_tasks: JoinSet<(NamespaceId, Result<()>)>, } -impl GossipActor { - pub fn new( - inbox: mpsc::Receiver, - sync: SyncHandle, - gossip: Gossip, - to_sync_actor: mpsc::Sender, - ) -> Self { +impl GossipState { + pub fn new(gossip: Gossip, sync: SyncHandle, to_live_actor: mpsc::Sender) -> Self { Self { - inbox, - sync, gossip, - to_sync_actor, - joined: Default::default(), - want_join: Default::default(), - pending_joins: Default::default(), - gossip_events: Default::default(), + sync, + to_live_actor, + active: Default::default(), + active_tasks: Default::default(), } } - pub async fn run(&mut self) -> anyhow::Result<()> { - let mut i = 0; - loop { - i += 1; - trace!(?i, "tick wait"); - inc!(Metrics, doc_gossip_tick_main); - tokio::select! { - next = self.gossip_events.next(), if !self.gossip_events.is_empty() => { - trace!(?i, "tick: gossip_event"); - inc!(Metrics, doc_gossip_tick_event); - if let Err(err) = self.on_gossip_event(next).await { - error!("gossip actor died: {err:?}"); - return Err(err); - } - }, - msg = self.inbox.recv() => { - let msg = msg.context("to_actor closed")?; - trace!(%msg, ?i, "tick: to_actor"); - inc!(Metrics, doc_gossip_tick_actor); - if !self.on_actor_message(msg).await.context("on_actor_message")? { - break; - } - } - Some(res) = self.pending_joins.join_next(), if !self.pending_joins.is_empty() => { - trace!(?i, "tick: pending_joins"); - inc!(Metrics, doc_gossip_tick_pending_join); - let (namespace, res) = res.context("pending_joins closed")?; - match res { - Ok(stream) => { - debug!(namespace = %namespace.fmt_short(), "joined gossip"); - self.joined.insert(namespace); - let stream = BroadcastStream::new(stream); - self.gossip_events.insert(namespace, stream); - }, - Err(err) => { - if self.want_join.contains(&namespace) { - error!(?namespace, ?err, "failed to join gossip"); - } - } - } - } + pub async fn join(&mut self, namespace: NamespaceId, bootstrap: Vec) -> Result<()> { + match self.active.entry(namespace) { + hash_map::Entry::Occupied(entry) => { + if !bootstrap.is_empty() { + entry.get().sender.join_peers(bootstrap).await?; + } + } + hash_map::Entry::Vacant(entry) => { + let sub = self + .gossip + .join_with_opts(namespace.into(), JoinOptions::with_bootstrap(bootstrap)); + let (sender, stream) = sub.split(); + let abort_handle = self.active_tasks.spawn( + receive_loop( + namespace, + stream, + self.to_live_actor.clone(), + self.sync.clone(), + ) + .map(move |res| (namespace, res)), + ); + entry.insert(ActiveState { + sender, + abort_handle, + }); } } Ok(()) } - async fn on_actor_message(&mut self, msg: ToGossipActor) -> anyhow::Result { - match msg { - ToGossipActor::Shutdown => { - for namespace in self.joined.iter() { - self.gossip.quit((*namespace).into()).await.ok(); - } - return Ok(false); - } - ToGossipActor::Join { namespace, peers } => { - debug!(?namespace, peers = peers.len(), "join gossip"); - let gossip = self.gossip.clone(); - // join gossip for the topic to receive and send message - let fut = async move { - let stream = gossip.subscribe(namespace.into()).await?; - let _topic = gossip.join(namespace.into(), peers).await?.await?; - Ok(stream) - }; - let fut = fut.map(move |res| (namespace, res)); - self.want_join.insert(namespace); - self.pending_joins.spawn(fut); - } - ToGossipActor::Leave { namespace } => { - self.gossip.quit(namespace.into()).await?; - self.joined.remove(&namespace); - self.want_join.remove(&namespace); - } + pub fn quit(&mut self, topic: &NamespaceId) { + if let Some(state) = self.active.remove(topic) { + state.abort_handle.abort(); } - Ok(true) } - async fn on_gossip_event( - &mut self, - event: Option<(NamespaceId, Result)>, - ) -> Result<()> { - let (namespace, event) = event.context("Gossip event channel closed")?; - let event = match event { - Ok(event) => event, - Err(BroadcastStreamRecvError::Lagged(n)) => { - warn!("GossipActor too slow (lagged by {n}) - dropping gossip event"); - return Ok(()); - } - }; - if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { - error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); - return Ok(()); + + pub async fn shutdown(&mut self) -> Result<()> { + for (_, state) in self.active.drain() { + state.abort_handle.abort(); } - if let Err(err) = self.on_gossip_event_inner(namespace, event).await { - error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event"); + self.progress().await + } + + pub async fn broadcast(&self, namespace: &NamespaceId, message: Bytes) { + if let Some(state) = self.active.get(namespace) { + state.sender.broadcast(message).await.ok(); + } + } + + pub async fn broadcast_neighbors(&self, namespace: &NamespaceId, message: Bytes) { + if let Some(state) = self.active.get(namespace) { + state.sender.broadcast_neighbors(message).await.ok(); + } + } + + pub fn max_message_size(&self) -> usize { + self.gossip.max_message_size() + } + + pub fn is_empty(&self) -> bool { + self.active.is_empty() + } + + /// Progress the internal task queues. + /// + /// Returns an error if any of the active tasks panic. + /// + /// ## Cancel safety + /// + /// This function is fully cancel-safe. + pub async fn progress(&mut self) -> Result<()> { + while let Some(res) = self.active_tasks.join_next().await { + match res { + Err(err) if err.is_cancelled() => continue, + Err(err) => return Err(err).context("gossip receive loop panicked"), + Ok((namespace, res)) => { + self.active.remove(&namespace); + if let Err(err) = res { + warn!(?err, ?namespace, "gossip receive loop failed") + } + } + } } Ok(()) } +} - async fn on_gossip_event_inner(&mut self, namespace: NamespaceId, event: Event) -> Result<()> { +#[instrument("gossip-recv", skip_all, fields(namespace=%namespace.fmt_short()))] +async fn receive_loop( + namespace: NamespaceId, + mut recv: GossipReceiver, + to_sync_actor: mpsc::Sender, + sync: SyncHandle, +) -> Result<()> { + for peer in recv.neighbors() { + to_sync_actor + .send(ToLiveActor::NeighborUp { namespace, peer }) + .await?; + } + while let Some(event) = recv.try_next().await? { + let event = match event { + Event::Gossip(event) => event, + Event::Lagged => { + debug!("gossip loop lagged - dropping gossip event"); + continue; + } + }; match event { - Event::Received(msg) => { + GossipEvent::Received(msg) => { let op: Op = postcard::from_bytes(&msg.content)?; match op { Op::Put(entry) => { @@ -180,12 +166,15 @@ impl GossipActor { false => ContentStatus::Missing, }; let from = *msg.delivered_from.as_bytes(); - self.sync + if let Err(err) = sync .insert_remote(namespace, entry, from, content_status) - .await?; + .await + { + debug!("ignoring entry received via gossip: {err}"); + } } Op::ContentReady(hash) => { - self.to_sync_actor + to_sync_actor .send(ToLiveActor::NeighborContentReady { namespace, node: msg.delivered_from, @@ -194,7 +183,7 @@ impl GossipActor { .await?; } Op::SyncReport(report) => { - self.to_sync_actor + to_sync_actor .send(ToLiveActor::IncomingSyncReport { from: msg.delivered_from, report, @@ -203,20 +192,24 @@ impl GossipActor { } } } - // A new neighbor appeared in the gossip swarm. Try to sync with it directly. - // [Self::sync_with_peer] will check to not resync with peers synced previously in the - // same session. TODO: Maybe this is too broad and leads to too many sync requests. - Event::NeighborUp(peer) => { - self.to_sync_actor + GossipEvent::NeighborUp(peer) => { + to_sync_actor .send(ToLiveActor::NeighborUp { namespace, peer }) .await?; } - Event::NeighborDown(peer) => { - self.to_sync_actor + GossipEvent::NeighborDown(peer) => { + to_sync_actor .send(ToLiveActor::NeighborDown { namespace, peer }) .await?; } + GossipEvent::Joined(peers) => { + for peer in peers { + to_sync_actor + .send(ToLiveActor::NeighborUp { namespace, peer }) + .await?; + } + } } - Ok(()) } + Ok(()) } diff --git a/iroh-docs/src/engine/live.rs b/iroh-docs/src/engine/live.rs index 6e49536baa..84cb52e953 100644 --- a/iroh-docs/src/engine/live.rs +++ b/iroh-docs/src/engine/live.rs @@ -9,7 +9,7 @@ use iroh_blobs::downloader::{DownloadError, DownloadRequest, Downloader}; use iroh_blobs::get::Stats; use iroh_blobs::HashAndFormat; use iroh_blobs::{store::EntryStatus, Hash}; -use iroh_gossip::{net::Gossip, proto::TopicId}; +use iroh_gossip::net::Gossip; use iroh_metrics::inc; use iroh_net::NodeId; use iroh_net::{key::PublicKey, Endpoint, NodeAddr}; @@ -18,9 +18,8 @@ use tokio::{ sync::{self, mpsc, oneshot}, task::JoinSet, }; -use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument, Span}; +use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; -use crate::metrics::Metrics; use crate::{ actor::{OpenOpts, SyncHandle}, net::{ @@ -29,8 +28,9 @@ use crate::{ }, AuthorHeads, ContentStatus, NamespaceId, SignedEntry, }; +use crate::{engine::gossip::GossipState, metrics::Metrics}; -use super::gossip::{GossipActor, ToGossipActor}; +// use super::gossip::{GossipActor, ToGossipActor}; use super::state::{NamespaceStates, Origin, SyncReason}; /// Name used for logging when new node addresses are added from the docs engine. @@ -150,7 +150,6 @@ pub struct LiveActor { inbox: mpsc::Receiver, sync: SyncHandle, endpoint: Endpoint, - gossip: Gossip, bao_store: B, downloader: Downloader, replica_events_tx: async_channel::Sender, @@ -160,7 +159,7 @@ pub struct LiveActor { /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks. /// Only clone into newly spawned tasks. sync_actor_tx: mpsc::Sender, - gossip_actor_tx: mpsc::Sender, + gossip: GossipState, /// Running sync futures (from connect). running_sync_connect: JoinSet, @@ -190,20 +189,19 @@ impl LiveActor { downloader: Downloader, inbox: mpsc::Receiver, sync_actor_tx: mpsc::Sender, - gossip_actor_tx: mpsc::Sender, ) -> Self { let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024); + let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone()); Self { inbox, sync, replica_events_rx, replica_events_tx, endpoint, - gossip, + gossip: gossip_state, bao_store, downloader, sync_actor_tx, - gossip_actor_tx, running_sync_connect: Default::default(), running_sync_accept: Default::default(), subscribers: Default::default(), @@ -215,22 +213,11 @@ impl LiveActor { } /// Run the actor loop. - pub async fn run(mut self, mut gossip_actor: GossipActor) -> Result<()> { - let me = self.endpoint.node_id().fmt_short(); - let gossip_handle = tokio::task::spawn( - async move { - if let Err(err) = gossip_actor.run().await { - error!("gossip recv actor failed: {err:?}"); - } - } - .instrument(error_span!("sync", %me)), - ); - + pub async fn run(mut self) -> Result<()> { let shutdown_reply = self.run_inner().await; if let Err(err) = self.shutdown().await { error!(?err, "Error during shutdown"); } - gossip_handle.await?; drop(self); match shutdown_reply { Ok(reply) => { @@ -288,7 +275,11 @@ impl LiveActor { inc!(Metrics, doc_live_tick_pending_downloads); let (namespace, hash, res) = res.context("pending_downloads closed")?; self.on_download_ready(namespace, hash, res).await; - + } + res = self.gossip.progress(), if !self.gossip.is_empty() => { + if let Err(error) = res { + warn!(?error, "gossip state failed"); + } } } } @@ -379,13 +370,15 @@ impl LiveActor { async fn shutdown(&mut self) -> anyhow::Result<()> { // cancel all subscriptions self.subscribers.clear(); - // shutdown gossip actor - self.gossip_actor_tx - .send(ToGossipActor::Shutdown) - .await - .ok(); - // shutdown sync thread - let _store = self.sync.shutdown().await; + let (gossip_shutdown_res, _store) = tokio::join!( + // quit the gossip topics and task loops. + self.gossip.shutdown(), + // shutdown sync thread + self.sync.shutdown() + ); + gossip_shutdown_res?; + // TODO: abort_all and join_next all JoinSets to catch panics + // (they are aborted on drop, but that swallows panics) Ok(()) } @@ -439,10 +432,7 @@ impl LiveActor { .unsubscribe(namespace, self.replica_events_tx.clone()) .await?; self.sync.close(namespace).await?; - self.gossip_actor_tx - .send(ToGossipActor::Leave { namespace }) - .await - .context("gossip actor failure")?; + self.gossip.quit(&namespace); } if kill_subscribers { self.subscribers.remove(&namespace); @@ -450,11 +440,7 @@ impl LiveActor { Ok(()) } - async fn join_peers( - &mut self, - namespace: NamespaceId, - peers: Vec, - ) -> anyhow::Result<()> { + async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec) -> Result<()> { let mut peer_ids = Vec::new(); // add addresses of peers to our endpoint address book @@ -477,12 +463,7 @@ impl LiveActor { } // tell gossip to join - self.gossip_actor_tx - .send(ToGossipActor::Join { - namespace, - peers: peer_ids.clone(), - }) - .await?; + self.gossip.join(namespace, peer_ids.clone()).await?; if !peer_ids.is_empty() { // trigger initial sync with initial peers @@ -644,18 +625,9 @@ impl LiveActor { } }; // TODO: We should debounce and merge these neighbor announcements likely. - if let Err(err) = self - .gossip - .broadcast_neighbors(namespace.into(), msg.into()) - .await - { - error!( - namespace = %namespace.fmt_short(), - %op, - ?err, - "Failed to broadcast to neighbors" - ); - } + self.gossip + .broadcast_neighbors(&namespace, msg.into()) + .await; } async fn on_download_ready( @@ -725,12 +697,11 @@ impl LiveActor { match event { crate::Event::LocalInsert { namespace, entry } => { debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert"); - let topic = TopicId::from_bytes(*namespace.as_bytes()); // A new entry was inserted locally. Broadcast a gossip message. if self.state.is_syncing(&namespace) { let op = Op::Put(entry.clone()); let message = postcard::to_stdvec(&op)?.into(); - self.gossip.broadcast(topic, message).await?; + self.gossip.broadcast(&namespace, message).await; } } crate::Event::RemoteInsert { diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 3aed7435fa..b39e4d76a8 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -15,43 +15,38 @@ rust-version = "1.76" workspace = true [dependencies] -# proto dependencies (required) anyhow = { version = "1" } +async-channel = { version = "2.3.1", optional = true } blake3 = { package = "iroh-blake3", version = "1.4.5"} bytes = { version = "1.4.0", features = ["serde"] } derive_more = { version = "=1.0.0-beta.7", features = ["add", "debug", "deref", "display", "from", "try_into", "into"] } ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } +futures-concurrency = { version = "7.6.1", optional = true } +futures-lite = { version = "2.3", optional = true } +futures-util = { version = "0.3.30", optional = true } indexmap = "2.0" +iroh-base = { version = "0.21.0", path = "../iroh-base" } +iroh-metrics = { version = "0.21.0", path = "../iroh-metrics" } +iroh-net = { path = "../iroh-net", version = "0.21.0", optional = true, default-features = false } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } rand = { version = "0.8.5", features = ["std_rng"] } rand_core = "0.6.4" serde = { version = "1.0.164", features = ["derive"] } -tracing = "0.1" -iroh-metrics = { version = "0.21.0", path = "../iroh-metrics" } -iroh-base = { version = "0.21.0", path = "../iroh-base" } - -# net dependencies (optional) -futures-lite = { version = "2.3", optional = true } -iroh-net = { path = "../iroh-net", version = "0.21.0", optional = true, default-features = false, features = ["test-utils"] } tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] } tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } -genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } - -# dispatcher dependencies (optional) -async-channel = { version = "2.3.1", optional = true } -futures-util = { version = "0.3.30", optional = true } +tracing = "0.1" [dev-dependencies] clap = { version = "4", features = ["derive"] } +iroh-net = { path = "../iroh-net", version = "0.21.0", default-features = false, features = ["test-utils"] } iroh-test = { path = "../iroh-test" } rand_chacha = "0.3.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.4.0" [features] -default = ["net", "dispatcher"] -net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"] -dispatcher = ["dep:async-channel", "dep:futures-util"] +default = ["net"] +net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util", "dep:async-channel", "dep:futures-util", "dep:futures-concurrency"] [[example]] name = "chat" diff --git a/iroh-gossip/examples/chat.rs b/iroh-gossip/examples/chat.rs index a9b3a3999d..4aa36a2f56 100644 --- a/iroh-gossip/examples/chat.rs +++ b/iroh-gossip/examples/chat.rs @@ -1,13 +1,14 @@ use std::{collections::HashMap, fmt, str::FromStr}; -use anyhow::{bail, Context}; +use anyhow::{bail, Context, Result}; use bytes::Bytes; use clap::Parser; use ed25519_dalek::Signature; +use futures_lite::StreamExt; use iroh_base::base32; use iroh_gossip::{ - net::{Gossip, GOSSIP_ALPN}, - proto::{Event, TopicId}, + net::{Event, Gossip, GossipEvent, GossipReceiver, GOSSIP_ALPN}, + proto::TopicId, }; use iroh_net::{ key::{PublicKey, SecretKey}, @@ -65,7 +66,7 @@ enum Command { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let args = Args::parse(); @@ -134,18 +135,18 @@ async fn main() -> anyhow::Result<()> { endpoint.add_node_addr(peer)?; } }; - gossip.join(topic, peer_ids).await?.await?; + let (sender, receiver) = gossip.join(topic, peer_ids).await?.split(); println!("> connected!"); // broadcast our name, if set if let Some(name) = args.name { let message = Message::AboutMe { name }; let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; - gossip.broadcast(topic, encoded_message).await?; + sender.broadcast(encoded_message).await?; } // subscribe and print loop - tokio::spawn(subscribe_loop(gossip.clone(), topic)); + tokio::spawn(subscribe_loop(receiver)); // spawn an input thread that reads stdin // not using tokio here because they recommend this for "technical reasons" @@ -157,21 +158,18 @@ async fn main() -> anyhow::Result<()> { while let Some(text) = line_rx.recv().await { let message = Message::Message { text: text.clone() }; let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; - gossip.broadcast(topic, encoded_message).await?; + sender.broadcast(encoded_message).await?; println!("> sent: {text}"); } Ok(()) } -async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> { +async fn subscribe_loop(mut receiver: GossipReceiver) -> Result<()> { // init a peerid -> name hashmap let mut names = HashMap::new(); - // get a stream that emits updates on our topic - let mut stream = gossip.subscribe(topic).await?; - loop { - let event = stream.recv().await?; - if let Event::Received(msg) = event { + while let Some(event) = receiver.try_next().await? { + if let Event::Gossip(GossipEvent::Received(msg)) = event { let (from, message) = SignedMessage::verify_and_decode(&msg.content)?; match message { Message::AboutMe { name } => { @@ -187,6 +185,7 @@ async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> { } } } + Ok(()) } async fn endpoint_loop(endpoint: Endpoint, gossip: Gossip) { @@ -199,10 +198,7 @@ async fn endpoint_loop(endpoint: Endpoint, gossip: Gossip) { }); } } -async fn handle_connection( - mut conn: iroh_net::endpoint::Connecting, - gossip: Gossip, -) -> anyhow::Result<()> { +async fn handle_connection(mut conn: iroh_net::endpoint::Connecting, gossip: Gossip) -> Result<()> { let alpn = conn.alpn().await?; let conn = conn.await?; let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?; @@ -216,7 +212,7 @@ async fn handle_connection( Ok(()) } -fn input_loop(line_tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { +fn input_loop(line_tx: tokio::sync::mpsc::Sender) -> Result<()> { let mut buffer = String::new(); let stdin = std::io::stdin(); // We get `Stdin` here. loop { @@ -234,7 +230,7 @@ struct SignedMessage { } impl SignedMessage { - pub fn verify_and_decode(bytes: &[u8]) -> anyhow::Result<(PublicKey, Message)> { + pub fn verify_and_decode(bytes: &[u8]) -> Result<(PublicKey, Message)> { let signed_message: Self = postcard::from_bytes(bytes)?; let key: PublicKey = signed_message.from; key.verify(&signed_message.data, &signed_message.signature)?; @@ -242,7 +238,7 @@ impl SignedMessage { Ok((signed_message.from, message)) } - pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> anyhow::Result { + pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> Result { let data: Bytes = postcard::to_stdvec(&message)?.into(); let signature = secret_key.sign(&data); let from: PublicKey = secret_key.public(); @@ -269,7 +265,7 @@ struct Ticket { } impl Ticket { /// Deserializes from bytes. - fn from_bytes(bytes: &[u8]) -> anyhow::Result { + fn from_bytes(bytes: &[u8]) -> Result { postcard::from_bytes(bytes).map_err(Into::into) } /// Serializes to bytes. diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs deleted file mode 100644 index e724741ff2..0000000000 --- a/iroh-gossip/src/dispatcher.rs +++ /dev/null @@ -1,503 +0,0 @@ -//! A higher level wrapper for the gossip engine that manages multiple gossip subscriptions and updates. -use std::{ - collections::{btree_map::Entry, BTreeMap, BTreeSet}, - pin::Pin, - sync::{Arc, Mutex}, -}; - -use crate::{ - net::{Event as IrohGossipEvent, Gossip}, - proto::{DeliveryScope, TopicId}, -}; -use bytes::Bytes; -use futures_lite::StreamExt; -use futures_util::Stream; -use iroh_base::rpc::{RpcError, RpcResult}; -use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId}; -use serde::{Deserialize, Serialize}; - -/// Join a gossip topic -#[derive(Serialize, Deserialize, Debug)] -pub struct SubscribeOptions { - /// The initial bootstrap nodes - pub bootstrap: BTreeSet, - /// The maximum number of messages that can be buffered in a subscription. - /// - /// If this limit is reached, the subscriber will receive a `Lagged` response, - /// the message will be dropped, and the subscriber will be closed. - /// - /// This is to prevent a single slow subscriber from blocking the dispatch loop. - /// If a subscriber is lagging, it should be closed and re-opened. - pub subscription_capacity: usize, -} - -/// Send a gossip message -#[derive(Serialize, Deserialize, Debug)] -pub enum Command { - /// Broadcast a message to all nodes in the swarm - Broadcast(Bytes), - /// Broadcast a message to all direct neighbors - BroadcastNeighbors(Bytes), -} - -/// Update from a subscribed gossip topic -#[derive(Serialize, Deserialize, Debug)] -pub enum Event { - /// A message was received - Gossip(GossipEvent), - /// We missed some messages - Lagged, -} - -/// Gossip event -/// An event to be emitted to the application for a particular topic. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] -pub enum GossipEvent { - /// We have a new, direct neighbor in the swarm membership layer for this topic - NeighborUp(NodeId), - /// We dropped direct neighbor in the swarm membership layer for this topic - NeighborDown(NodeId), - /// A gossip message was received for this topic - Received(Message), -} - -impl From> for GossipEvent { - fn from(event: crate::proto::Event) -> Self { - match event { - crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), - crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), - crate::proto::Event::Received(message) => Self::Received(Message { - content: message.content, - scope: message.scope, - delivered_from: message.delivered_from, - }), - } - } -} - -/// A gossip message -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] -pub struct Message { - /// The content of the message - pub content: Bytes, - /// The scope of the message. - /// This tells us if the message is from a direct neighbor or actual gossip. - pub scope: DeliveryScope, - /// The node that delivered the message. This is not the same as the original author. - pub delivered_from: NodeId, -} - -/// A gossip engine that manages gossip subscriptions and updates. -#[derive(Debug, Clone)] -pub struct GossipDispatcher { - gossip: Gossip, - inner: Arc>, -} - -/// The mutable state of the gossip engine. -#[derive(Debug)] -struct State { - current_subscriptions: BTreeMap, - /// the single task that dispatches gossip events to all subscribed streams - /// - /// this isn't really part of the mutable state, but it needs to live somewhere - task: Option>, -} - -/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds. -type CommandStream = Box + Send + Sync + Unpin + 'static>; -/// Type alias for a sink of gossip events. -type EventSink = async_channel::Sender>; - -#[derive(derive_more::Debug)] -enum TopicState { - /// The topic is currently joining. - /// Making new subscriptions is allowed, but they will have to wait for the join to finish. - Joining { - /// Stream/sink pairs that are waiting for the topic to become live. - #[debug(skip)] - waiting: Vec<(CommandStream, EventSink)>, - /// Set of bootstrap nodes we are using. - bootstrap: BTreeSet, - /// The task that is driving the join future. - _join_task: AbortingJoinHandle<()>, - }, - /// The topic is currently live. - /// New subscriptions can be immediately added. - Live { - update_tasks: Vec>, - event_sinks: Vec, - }, - /// The topic is currently quitting. - /// We can't make new subscriptions without waiting for the quit to finish. - Quitting { - /// Stream/sink pairs that are waiting for the topic to quit so - /// it can be joined again. - #[debug(skip)] - waiting: Vec<(CommandStream, EventSink)>, - /// Set of bootstrap nodes we are using. - /// - /// This is used to re-join the topic after quitting. - bootstrap: BTreeSet, - /// The task that is driving the quit future. - #[allow(dead_code)] - quit_task: AbortingJoinHandle<()>, - }, -} - -impl TopicState { - /// Extract all senders from the state. - fn into_senders(self) -> Vec { - match self { - TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => { - waiting.into_iter().map(|(_, send)| send).collect() - } - TopicState::Live { event_sinks, .. } => event_sinks, - } - } -} - -impl GossipDispatcher { - /// Create a new gossip dispatcher with the given gossip instance. - pub fn new(gossip: Gossip) -> Self { - let inner = Arc::new(Mutex::new(State { - current_subscriptions: BTreeMap::new(), - task: None, - })); - let res = Self { gossip, inner }; - let dispatch_task = spawn_owned(res.clone().dispatch_task()); - res.inner.lock().unwrap().task = Some(dispatch_task); - res - } - - /// Quit a gossip topic and handle the result of the quitting. - /// - /// On quit success, will try to join the topic again with the bootstrap nodes we have accumulated while waiting for quit to finish. - /// On quit failure, all waiting streams will be notified with the error and removed. - async fn quit_task(self, topic: TopicId) { - let res = self.gossip.quit(topic).await; - let mut inner = self.inner.lock().unwrap(); - if let Some(TopicState::Quitting { - waiting, - bootstrap: peers, - .. - }) = inner.current_subscriptions.remove(&topic) - { - match res { - Ok(()) => { - if waiting.is_empty() { - return; - } - let bootstrap = peers.clone(); - let _join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); - inner.current_subscriptions.insert( - topic, - TopicState::Joining { - waiting, - bootstrap: peers, - _join_task, - }, - ); - } - Err(e) => { - // notify all waiting streams that there is something wrong with the topic - let error = RpcError::from(e); - for (_, send) in waiting { - send.try_send(Err(error.clone())).ok(); - } - } - } - } - } - - /// Try to send an event to a sink. - /// - /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. - fn try_send(send: &EventSink, event: &IrohGossipEvent) -> bool { - // If the stream is disconnected, we don't need to send to it. - if send.is_closed() { - return false; - } - // Check if the send buffer is almost full, and send a lagged response if it is. - if let Some(cap) = send.capacity() { - if send.len() >= cap - 1 { - send.try_send(Ok(Event::Lagged)).ok(); - return false; - } - } - // Send the event to the stream. - // We are the owner of the stream, so we can be sure that there is still room. - send.try_send(Ok(Event::Gossip(event.clone().into()))) - .is_ok() - } - - /// Dispatch gossip events to all subscribed streams. - /// - /// This should not fail unless the gossip instance is faulty. - async fn dispatch_loop(mut self) -> anyhow::Result<()> { - let stream = self.gossip.clone().subscribe_all(); - tokio::pin!(stream); - while let Some(item) = stream.next().await { - let (topic, event) = item?; - // The loop is only for the case that the topic is still in joining state, - // where we switch it to live here and have to re-lock the mutex afterwards. - loop { - let mut inner = self.inner.lock().unwrap(); - let Some(state) = inner.current_subscriptions.get_mut(&topic) else { - tracing::trace!("Received event for unknown topic, possibly sync {topic}",); - break; - }; - match state { - // The topic is in joining state. It can happen that we receive an event before - // our join task completed. In this case, we switch the topic to live here. - TopicState::Joining { .. } => { - drop(inner); - self.on_join(topic, Ok(())); - continue; - } - TopicState::Live { - update_tasks, - event_sinks, - } => { - // Send the message to all our senders, and remove disconnected senders. - event_sinks.retain(|sink| Self::try_send(sink, &event)); - // If no senders are left, and all update tasks are finished, we can quit - // the topic. - if event_sinks.is_empty() - && update_tasks.iter().all(|task| task.is_finished()) - { - let quit_task = tokio::task::spawn(self.clone().quit_task(topic)); - inner.current_subscriptions.insert( - topic, - TopicState::Quitting { - waiting: vec![], - bootstrap: BTreeSet::new(), - quit_task: quit_task.into(), - }, - ); - } - } - _ => {} - } - break; - } - } - Ok(()) - } - - /// Dispatch gossip events to all subscribed streams, and handle the unlikely case of a dispatch loop failure. - async fn dispatch_task(self) { - if let Err(cause) = self.clone().dispatch_loop().await { - // dispatch task failed. Not sure what to do here. - tracing::error!("Gossip dispatch task failed: {}", cause); - let mut inner = self.inner.lock().unwrap(); - let error = RpcError::from(cause); - for (_, state) in std::mem::take(&mut inner.current_subscriptions) { - for sender in state.into_senders() { - sender.try_send(Err(error.clone())).ok(); - } - } - } - } - - /// Handle updates from the client. - async fn update_loop( - gossip: Gossip, - topic: TopicId, - mut updates: CommandStream, - ) -> anyhow::Result<()> { - while let Some(update) = Pin::new(&mut updates).next().await { - match update { - Command::Broadcast(msg) => { - gossip.broadcast(topic, msg).await?; - } - Command::BroadcastNeighbors(msg) => { - gossip.broadcast_neighbors(topic, msg).await?; - } - } - } - Ok(()) - } - - /// Handle updates from the client, and handle update loop failure. - async fn update_task(self, topic: TopicId, updates: CommandStream) { - let res = Self::update_loop(self.gossip.clone(), topic, updates).await; - let mut inner = self.inner.lock().unwrap(); - - match res { - Err(err) => { - // we got an error while sending to the topic - if let Some(TopicState::Live { event_sinks, .. }) = - inner.current_subscriptions.remove(&topic) - { - let error = RpcError::from(err); - // notify all live streams that sending to the topic failed - for send in event_sinks { - send.try_send(Err(error.clone())).ok(); - } - } - } - Ok(()) => { - // check if we should quit the topic. - if let Some(TopicState::Live { - event_sinks, - update_tasks, - }) = inner.current_subscriptions.get(&topic) - { - if event_sinks.is_empty() && update_tasks.iter().all(|t| t.is_finished()) { - let quit_task = tokio::task::spawn(self.clone().quit_task(topic)); - inner.current_subscriptions.insert( - topic, - TopicState::Quitting { - waiting: vec![], - bootstrap: BTreeSet::new(), - quit_task: quit_task.into(), - }, - ); - } - } - } - } - } - - /// Call join, then await the result. - /// - /// Basically just flattens the two stages of joining into one. - async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { - let join = gossip.join(topic, bootstrap).await?; - join.await?; - Ok(()) - } - - /// Join a gossip topic and handle turning waiting streams into live streams. - async fn join_task(mut self, topic: TopicId, bootstrap: BTreeSet) { - let res = Self::join(self.gossip.clone(), topic, bootstrap.into_iter().collect()).await; - self.on_join(topic, res); - } - - /// Switch the state of a topic to live. - /// - /// If the topic is already live, this is a noop. - fn on_join(&mut self, topic: TopicId, res: anyhow::Result<()>) { - let mut inner = self.inner.lock().unwrap(); - let Some(state) = inner.current_subscriptions.remove(&topic) else { - return; - }; - match state { - TopicState::Live { - update_tasks, - event_sinks, - } => { - inner.current_subscriptions.insert( - topic, - TopicState::Live { - update_tasks, - event_sinks, - }, - ); - } - TopicState::Joining { waiting, .. } => { - match res { - Ok(()) => { - let mut event_sinks = vec![]; - let mut update_tasks = vec![]; - for (updates, event_sink) in waiting { - // if the stream is disconnected, we don't need to keep it and start the update task - if event_sink.is_closed() { - continue; - } - event_sinks.push(event_sink); - let task = spawn_owned(self.clone().update_task(topic, updates)); - update_tasks.push(task); - } - inner.current_subscriptions.insert( - topic, - TopicState::Live { - event_sinks, - update_tasks, - }, - ); - } - Err(e) => { - // notify all waiting streams that the subscription failed - let error = RpcError::from(e); - for (_, send) in waiting { - send.try_send(Err(error.clone())).ok(); - } - } - } - } - TopicState::Quitting { .. } => {} - } - } - - /// Subscribe to a gossip topic. - pub fn subscribe_with_opts( - &self, - topic: TopicId, - options: SubscribeOptions, - updates: CommandStream, - ) -> impl Stream> + Unpin { - let mut inner = self.inner.lock().unwrap(); - let (send, recv) = async_channel::bounded(options.subscription_capacity); - match inner.current_subscriptions.entry(topic) { - Entry::Vacant(entry) => { - // There is no existing subscription, so we need to start a new one. - let waiting = vec![(updates, send)]; - let this = self.clone(); - let _join_task = - spawn_owned(this.clone().join_task(topic, options.bootstrap.clone())); - entry.insert(TopicState::Joining { - waiting, - bootstrap: options.bootstrap, - _join_task, - }); - } - Entry::Occupied(mut entry) => { - // There is already a subscription - let state = entry.get_mut(); - match state { - TopicState::Joining { - waiting, - bootstrap: peers, - .. - } => { - // We are joining, so we need to wait with creating the update task. - // - // TODO: should we merge the bootstrap nodes and try to join with all of them? - peers.extend(options.bootstrap); - waiting.push((updates, send)); - } - TopicState::Quitting { - waiting, - bootstrap: peers, - .. - } => { - // We are quitting, so we need to wait with creating the update task. - peers.extend(options.bootstrap); - waiting.push((updates, send)); - } - TopicState::Live { - event_sinks, - update_tasks, - } => { - // There is already a live subscription, so we can immediately start the update task. - let task = spawn_owned(self.clone().update_task(topic, updates)); - event_sinks.push(send); - update_tasks.push(task); - } - } - } - } - recv.boxed() - } -} - -/// tokio::spawn but returns an `AbortingJoinHandle` that owns the task. -fn spawn_owned(f: F) -> AbortingJoinHandle -where - F: std::future::Future + Send + 'static, - T: Send + 'static, -{ - tokio::spawn(f).into() -} diff --git a/iroh-gossip/src/lib.rs b/iroh-gossip/src/lib.rs index 70a015a787..9c6dd3f27e 100644 --- a/iroh-gossip/src/lib.rs +++ b/iroh-gossip/src/lib.rs @@ -2,8 +2,6 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] -#[cfg(feature = "dispatcher")] -pub mod dispatcher; pub mod metrics; #[cfg(feature = "net")] pub mod net; diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 29f428ca49..f813927d47 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -1,23 +1,30 @@ //! Networking for the `iroh-gossip` protocol -use anyhow::{anyhow, Context}; -use bytes::{Bytes, BytesMut}; -use futures_lite::stream::Stream; -use genawaiter::sync::{Co, Gen}; +use anyhow::{anyhow, Context as _, Result}; +use bytes::BytesMut; +use futures_concurrency::{ + future::TryJoin, + stream::{stream_group, StreamGroup}, +}; +use futures_lite::{stream::Stream, StreamExt}; +use futures_util::TryFutureExt; use iroh_metrics::inc; use iroh_net::{ dialer::Dialer, endpoint::{get_remote_node_id, Connection}, key::PublicKey, - AddrInfo, Endpoint, NodeAddr, + util::SharedAbortingJoinHandle, + AddrInfo, Endpoint, NodeAddr, NodeId, }; use rand::rngs::StdRng; use rand_core::SeedableRng; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, task::Poll, time::Instant}; -use tokio::{ - sync::{broadcast, mpsc, oneshot}, - task::JoinHandle, +use std::{ + collections::{BTreeSet, HashMap, HashSet, VecDeque}, + pin::Pin, + task::{Context, Poll}, + time::Instant, }; +use tokio::{sync::mpsc, task::JoinSet}; use tracing::{debug, error_span, trace, warn, Instrument}; use self::util::{read_message, write_message, Timers}; @@ -26,14 +33,20 @@ use crate::{ proto::{self, PeerData, Scope, TopicId}, }; +mod handles; pub mod util; +pub use self::handles::{ + Command, CommandStream, Event, GossipEvent, GossipReceiver, GossipSender, GossipTopic, + JoinOptions, Message, +}; + /// ALPN protocol name pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0"; -/// Channel capacity for all subscription broadcast channels (single) -const SUBSCRIBE_ALL_CAP: usize = 2048; -/// Channel capacity for topic subscription broadcast channels (one per topic) -const SUBSCRIBE_TOPIC_CAP: usize = 2048; +/// Default channel capacity for topic subscription channels (one per topic) +const TOPIC_EVENTS_DEFAULT_CAP: usize = 2048; +/// Default channel capacity for topic subscription channels (one per topic) +const TOPIC_COMMANDS_DEFAULT_CAP: usize = 2048; /// Channel capacity for the send queue (one per connection) const SEND_QUEUE_CAP: usize = 64; /// Channel capacity for the ToActor message queue (single) @@ -46,9 +59,9 @@ const ON_ENDPOINTS_CAP: usize = 64; const SOURCE_NAME: &str = "gossip"; /// Events emitted from the gossip protocol -pub type Event = proto::Event; +pub type ProtoEvent = proto::Event; /// Commands for the gossip protocol -pub type Command = proto::Command; +pub type ProtoCommand = proto::Command; type InEvent = proto::InEvent; type OutEvent = proto::OutEvent; @@ -77,7 +90,7 @@ type ProtoMessage = proto::Message; pub struct Gossip { to_actor_tx: mpsc::Sender, on_direct_addrs_tx: mpsc::Sender>, - _actor_handle: Arc>>, + _actor_handle: SharedAbortingJoinHandle<()>, max_message_size: usize, } @@ -106,21 +119,18 @@ impl Gossip { in_event_rx, in_event_tx, on_direct_addr_rx: on_endpoints_rx, - conns: Default::default(), - conn_send_tx: Default::default(), - pending_sends: Default::default(), timers: Timers::new(), - subscribers_all: None, - subscribers_topic: Default::default(), + command_rx: StreamGroup::new().keyed(), + peers: Default::default(), + topics: Default::default(), + quit_queue: Default::default(), + connection_tasks: Default::default(), }; let actor_handle = tokio::spawn( async move { if let Err(err) = actor.run().await { warn!("gossip actor closed with error: {err:?}"); - Err(err) - } else { - Ok(()) } } .instrument(error_span!("gossip", %me)), @@ -128,7 +138,7 @@ impl Gossip { Self { to_actor_tx, on_direct_addrs_tx: on_endpoints_tx, - _actor_handle: Arc::new(actor_handle), + _actor_handle: actor_handle.into(), max_message_size, } } @@ -138,113 +148,77 @@ impl Gossip { self.max_message_size } - /// Join a topic and connect to peers. - /// - /// - /// This method only asks for [`PublicKey`]s. You must supply information on how to - /// connect to these peers manually before, by calling [`Endpoint::add_node_addr`] on - /// the underlying [`Endpoint`]. - /// - /// This method returns a future that completes once the request reached the local actor. - /// This completion returns a [`JoinTopicFut`] which completes once at least peer was joined - /// successfully and the swarm thus becomes operational. - /// - /// The [`JoinTopicFut`] has no timeout, so it will remain pending indefinitely if no peer - /// could be contacted. Usually you will want to add a timeout yourself. - /// - /// TODO: Resolve to an error once all connection attempts failed. - pub async fn join( - &self, - topic: TopicId, - peers: Vec, - ) -> anyhow::Result { - let (tx, rx) = oneshot::channel(); - self.send(ToActor::Join(topic, peers, tx)).await?; - Ok(JoinTopicFut(rx)) - } - - /// Quit a topic. - /// - /// This sends a disconnect message to all active peers and then drops the state - /// for this topic. - pub async fn quit(&self, topic: TopicId) -> anyhow::Result<()> { - self.send(ToActor::Quit(topic)).await?; - Ok(()) - } - - /// Broadcast a message on a topic to all peers in the swarm. - /// - /// This does not join the topic automatically, so you have to call [`Self::join`] yourself - /// for messages to be broadcast to peers. + /// Handle an incoming [`Connection`]. /// - /// Messages with the same content are only delivered once. - pub async fn broadcast(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.send(ToActor::Broadcast(topic, message, Scope::Swarm, tx)) + /// Make sure to check the ALPN protocol yourself before passing the connection. + pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> { + let peer_id = get_remote_node_id(&conn)?; + self.send(ToActor::HandleConnection(peer_id, ConnOrigin::Accept, conn)) .await?; - rx.await??; Ok(()) } - /// Broadcast a message on a topic to the immediate neighbors. - /// - /// This does not join the topic automatically, so you have to call [`Self::join`] yourself - /// for messages to be broadcast to peers. - pub async fn broadcast_neighbors(&self, topic: TopicId, message: Bytes) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.send(ToActor::Broadcast(topic, message, Scope::Neighbors, tx)) - .await?; - rx.await??; - Ok(()) + /// Join a gossip topic with the default options and wait for at least one active connection. + pub async fn join(&self, topic_id: TopicId, bootstrap: Vec) -> Result { + let mut sub = self.join_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap)); + sub.joined().await?; + Ok(sub) } - /// Subscribe to messages and event notifications for a topic. + /// Join a gossip topic with options. /// - /// Does not join the topic automatically, so you have to call [`Self::join`] yourself - /// to actually receive messages. - pub async fn subscribe(&self, topic: TopicId) -> anyhow::Result> { - let (tx, rx) = oneshot::channel(); - self.send(ToActor::Subscribe(topic, tx)).await?; - let res = rx.await.map_err(|_| anyhow!("subscribe_tx dropped"))??; - Ok(res) - } - - /// Subscribe to all events published on topics that you joined. + /// Returns a [`GossipTopic`] instantly. To wait for at least one connection to be established, + /// you can await [`GossipTopic::joined`]. /// - /// Note that this method takes self by value. Usually you would clone the [`Gossip`] handle. - /// before. - pub fn subscribe_all( - self, - ) -> impl Stream> { - Gen::new(|co| async move { - if let Err(err) = self.subscribe_all0(&co).await { - warn!("subscribe_all produced error: {err:?}"); - co.yield_(Err(broadcast::error::RecvError::Closed)).await - } - }) + /// Messages will be queued until a first connection is available. If the internal channel becomes full, + /// the oldest messages will be dropped from the channel. + pub fn join_with_opts(&self, topic_id: TopicId, opts: JoinOptions) -> GossipTopic { + let (command_tx, command_rx) = async_channel::bounded(TOPIC_COMMANDS_DEFAULT_CAP); + let command_rx: CommandStream = Box::pin(command_rx); + let event_rx = self.join_with_stream(topic_id, opts, command_rx); + GossipTopic::new(command_tx, Box::pin(event_rx)) } - async fn subscribe_all0( + /// Join a gossip topic with options and an externally-created update stream. + /// + /// This method differs from [`Self::join_with_opts`] by letting you pass in a `updates` command stream yourself + /// instead of using a channel created for you. + /// + /// It returns a stream of events. If you want to wait for the topic to become active, wait for + /// the [`GossipEvent::Joined`] event. + pub fn join_with_stream( &self, - co: &Co>, - ) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.send(ToActor::SubscribeAll(tx)).await?; - let mut res = rx.await??; - loop { - let event = res.recv().await; - co.yield_(event).await; + topic_id: TopicId, + options: JoinOptions, + updates: CommandStream, + ) -> impl Stream> + Send + 'static { + let (event_tx, event_rx) = async_channel::bounded(options.subscription_capacity); + let to_actor_tx = self.to_actor_tx.clone(); + let channels = SubscriberChannels { + command_rx: updates, + event_tx, + }; + // We spawn a task to send the subscribe action to the actor, because we want the send to + // succeed even if the returned stream is dropped right away without being polled, because + // it is legit to keep only the `updates` stream and drop the event stream. This situation + // is handled fine within the actor, but we have to make sure that the message reaches the + // actor. + let task = tokio::task::spawn(async move { + to_actor_tx + .send(ToActor::Join { + topic_id, + bootstrap: options.bootstrap, + channels, + }) + .await + .map_err(|_| anyhow!("Gossip actor dropped")) + }); + async move { + task.await + .map_err(|err| anyhow!("Task for sending to gossip actor failed: {err:?}"))??; + Ok(event_rx) } - } - - /// Handle an incoming [`Connection`]. - /// - /// Make sure to check the ALPN protocol yourself before passing the connection. - pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> { - let peer_id = get_remote_node_id(&conn)?; - self.send(ToActor::ConnIncoming(peer_id, ConnOrigin::Accept, conn)) - .await?; - Ok(()) + .try_flatten_stream() } /// Set info on our direct addresses. @@ -272,75 +246,24 @@ impl Gossip { } } -/// Future that completes once at least one peer is joined for this topic. -/// -/// The future has no timeout, so it will remain pending indefinitely if no peer -/// could be contacted. Usually you will want to add a timeout yourself. -/// -/// TODO: Optionally resolve to an error once all connection attempts failed. -#[derive(Debug)] -pub struct JoinTopicFut(oneshot::Receiver>); -impl Future for JoinTopicFut { - type Output = anyhow::Result; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let res = Pin::new(&mut self.0).poll(cx); - match res { - Poll::Pending => Poll::Pending, - Poll::Ready(Err(_err)) => Poll::Ready(Err(anyhow!("gossip actor dropped"))), - Poll::Ready(Ok(res)) => Poll::Ready(res), - } - } -} - -/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept) -#[derive(Debug)] -enum ConnOrigin { - Accept, - Dial, -} - /// Input messages for the gossip [`Actor`]. #[derive(derive_more::Debug)] enum ToActor { /// Handle a new QUIC connection, either from accept (external to the actor) or from connect /// (happens internally in the actor). - ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] Connection), - /// Join a topic with a list of peers. Reply with oneshot once at least one peer joined. - Join( - TopicId, - Vec, - #[debug(skip)] oneshot::Sender>, - ), - /// Leave a topic, send disconnect messages and drop all state. - Quit(TopicId), - /// Broadcast a message on a topic. - Broadcast( - TopicId, - #[debug("<{}b>", _1.len())] Bytes, - Scope, - #[debug(skip)] oneshot::Sender>, - ), - /// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a - /// topic. - Subscribe( - TopicId, - #[debug(skip)] oneshot::Sender>>, - ), - /// Subscribe to a topic. Return oneshot which resolves to a broadcast receiver for events on a - /// topic. - SubscribeAll( - #[debug(skip)] oneshot::Sender>>, - ), + HandleConnection(PublicKey, ConnOrigin, #[debug("Connection")] Connection), + Join { + topic_id: TopicId, + bootstrap: BTreeSet, + channels: SubscriberChannels, + }, } /// Actor that sends and handles messages between the connection and main state loops struct Actor { /// Protocol state state: proto::State, + /// The endpoint through which we dial peers endpoint: Endpoint, /// Dial machine to connect to peers dialer: Dialer, @@ -354,16 +277,16 @@ struct Actor { on_direct_addr_rx: mpsc::Receiver>, /// Queued timers timers: Timers, - /// Currently opened quinn connections to peers - conns: HashMap, - /// Channels to send outbound messages into the connection loops - conn_send_tx: HashMap>, - /// Queued messages that were to be sent before a dial completed - pending_sends: HashMap>, - /// Broadcast senders for active topic subscriptions from the application - subscribers_topic: HashMap>, - /// Broadcast senders for wildcard subscriptions from the application - subscribers_all: Option>, + /// Map of topics to their state. + topics: HashMap, + /// Map of peers to their state. + peers: HashMap, + /// Stream of commands from topic handles. + command_rx: stream_group::Keyed, + /// Internal queue of topic to close because all handles were dropped. + quit_queue: VecDeque, + /// Tasks for the connection loops, to keep track of panics. + connection_tasks: JoinSet<()>, } impl Actor { @@ -386,7 +309,12 @@ impl Actor { } } }, + Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => { + trace!(?i, "tick: command_rx"); + self.handle_command(topic, key, command).await?; + }, new_endpoints = self.on_direct_addr_rx.recv() => { + trace!(?i, "tick: new_endpoints"); match new_endpoints { Some(endpoints) => { inc!(Metrics, actor_tick_endpoint); @@ -411,7 +339,7 @@ impl Actor { Ok(conn) => { debug!(peer = ?peer_id, "dial successful"); inc!(Metrics, actor_tick_dialer_success); - self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?; + self.handle_connection(peer_id, ConnOrigin::Dial, conn); } Err(err) => { warn!(peer = ?peer_id, "dial failed: {err}"); @@ -437,155 +365,262 @@ impl Actor { self.handle_in_event(InEvent::TimerExpired(timer), now).await.context("timers.drain_expired -> handle_in_event")?; } } - + Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => { + if let Err(err) = res { + if !err.is_cancelled() { + warn!("connection task panicked: {err:?}"); + } + } + } } } Ok(()) } - async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> { - trace!("handle to_actor {msg:?}"); - match msg { - ToActor::ConnIncoming(peer_id, origin, conn) => { - self.conns.insert(peer_id, conn.clone()); - self.dialer.abort_dial(peer_id); - let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP); - self.conn_send_tx.insert(peer_id, send_tx.clone()); - - let max_message_size = self.state.max_message_size(); - - // Spawn a task for this connection - let in_event_tx = self.in_event_tx.clone(); - tokio::spawn( - async move { - debug!("connection established"); - match connection_loop( - peer_id, - conn, - origin, - send_rx, - &in_event_tx, - max_message_size, - ) - .await - { - Ok(()) => { - debug!("connection closed without error") - } - Err(err) => { - debug!("connection closed with error {err:?}") - } - } - in_event_tx - .send(InEvent::PeerDisconnected(peer_id)) - .await - .ok(); - } - .instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())), - ); - - // Forward queued pending sends - if let Some(send_queue) = self.pending_sends.remove(&peer_id) { - for msg in send_queue { - send_tx.send(msg).await?; + async fn handle_command( + &mut self, + topic: TopicId, + key: stream_group::Key, + command: Option, + ) -> anyhow::Result<()> { + debug!(?topic, ?key, ?command, "handle command"); + let Some(state) = self.topics.get_mut(&topic) else { + // TODO: unreachable? + warn!("received command for unknown topic"); + return Ok(()); + }; + let TopicState { + command_rx_keys, + event_senders, + .. + } = state; + match command { + Some(command) => { + let command = match command { + Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm), + Command::BroadcastNeighbors(message) => { + ProtoCommand::Broadcast(message, Scope::Neighbors) } + Command::JoinPeers(peers) => ProtoCommand::Join(peers), + }; + self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now()) + .await?; + } + None => { + command_rx_keys.remove(&key); + if command_rx_keys.is_empty() && event_senders.is_empty() { + self.quit_queue.push_back(topic); + self.process_quit_queue().await?; } } - ToActor::Join(topic_id, peers, reply) => { - self.handle_in_event(InEvent::Command(topic_id, Command::Join(peers)), now) - .await?; - if self.state.has_active_peers(&topic_id) { - // If the active_view contains at least one peer, reply now - reply.send(Ok(topic_id)).ok(); - } else { - // Otherwise, wait for any peer to come up as neighbor. - let sub = self.subscribe(topic_id); - tokio::spawn(async move { - let res = wait_for_neighbor_up(sub).await; - let res = res.map(|_| topic_id); - reply.send(res).ok(); - }); + } + Ok(()) + } + + fn handle_connection(&mut self, peer_id: NodeId, origin: ConnOrigin, conn: Connection) { + // Check that we only keep one connection per peer per direction. + if let Some(peer_info) = self.peers.get(&peer_id) { + if matches!(origin, ConnOrigin::Dial) && peer_info.conn_dialed.is_some() { + warn!(?peer_id, ?origin, "ignoring connection: already accepted"); + return; + } + if matches!(origin, ConnOrigin::Accept) && peer_info.conn_accepted.is_some() { + warn!(?peer_id, ?origin, "ignoring connection: already accepted"); + return; + } + } + + let mut peer_info = self.peers.remove(&peer_id).unwrap_or_default(); + + // Store the connection so that we can terminate it when the peer is removed. + match origin { + ConnOrigin::Dial => { + peer_info.conn_dialed = Some(conn.clone()); + } + ConnOrigin::Accept => { + peer_info.conn_accepted = Some(conn.clone()); + } + } + + // Extract the queue of pending messages. + let queue = match &mut peer_info.state { + PeerState::Pending { queue } => std::mem::take(queue), + PeerState::Active { .. } => Default::default(), + }; + + let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP); + let max_message_size = self.state.max_message_size(); + let in_event_tx = self.in_event_tx.clone(); + + // Spawn a task for this connection + self.connection_tasks.spawn( + async move { + match connection_loop( + peer_id, + conn, + origin, + send_rx, + &in_event_tx, + max_message_size, + queue, + ) + .await + { + Ok(()) => debug!("connection closed without error"), + Err(err) => warn!("connection closed: {err:?}"), } + in_event_tx + .send(InEvent::PeerDisconnected(peer_id)) + .await + .ok(); } - ToActor::Quit(topic_id) => { - self.handle_in_event(InEvent::Command(topic_id, Command::Quit), now) - .await?; - self.subscribers_topic.remove(&topic_id); + .instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())), + ); + + peer_info.state = match peer_info.state { + PeerState::Pending { .. } => PeerState::Active { send_tx }, + PeerState::Active { send_tx } => PeerState::Active { send_tx }, + }; + + self.peers.insert(peer_id, peer_info); + } + + async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> { + trace!("handle to_actor {msg:?}"); + match msg { + ToActor::HandleConnection(peer_id, origin, conn) => { + self.handle_connection(peer_id, origin, conn) } - ToActor::Broadcast(topic_id, message, scope, reply) => { + ToActor::Join { + topic_id, + bootstrap, + channels, + } => { + let state = self.topics.entry(topic_id).or_default(); + let TopicState { + neighbors, + event_senders, + command_rx_keys, + } = state; + if !neighbors.is_empty() { + let neighbors = neighbors.iter().copied().collect(); + channels + .event_tx + .try_send(Ok(Event::Gossip(GossipEvent::Joined(neighbors)))) + .ok(); + } + + event_senders.push(channels.event_tx); + let command_rx = TopicCommandStream::new(topic_id, channels.command_rx); + let key = self.command_rx.insert(command_rx); + command_rx_keys.insert(key); + self.handle_in_event( - InEvent::Command(topic_id, Command::Broadcast(message, scope)), + InEvent::Command( + topic_id, + ProtoCommand::Join(bootstrap.into_iter().collect()), + ), now, ) .await?; - reply.send(Ok(())).ok(); } - ToActor::Subscribe(topic_id, reply) => { - let rx = self.subscribe(topic_id); - reply.send(Ok(rx)).ok(); - } - ToActor::SubscribeAll(reply) => { - let rx = self.subscribe_all(); - reply.send(Ok(rx)).ok(); - } - }; + } Ok(()) } async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> { + self.handle_in_event_inner(event, now).await?; + self.process_quit_queue().await?; + Ok(()) + } + + async fn process_quit_queue(&mut self) -> anyhow::Result<()> { + while let Some(topic_id) = self.quit_queue.pop_front() { + self.handle_in_event_inner( + InEvent::Command(topic_id, ProtoCommand::Quit), + Instant::now(), + ) + .await?; + self.topics.remove(&topic_id); + } + Ok(()) + } + + async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> { if matches!(event, InEvent::TimerExpired(_)) { - trace!("handle in_event {event:?}"); + trace!(?event, "handle in_event"); } else { - debug!("handle in_event {event:?}"); + debug!(?event, "handle in_event"); }; if let InEvent::PeerDisconnected(peer) = &event { - self.conn_send_tx.remove(peer); + self.peers.remove(peer); } let out = self.state.handle(event, now); for event in out { if matches!(event, OutEvent::ScheduleTimer(_, _)) { - trace!("handle out_event {event:?}"); + trace!(?event, "handle out_event"); } else { - debug!("handle out_event {event:?}"); + debug!(?event, "handle out_event"); }; match event { OutEvent::SendMessage(peer_id, message) => { - if let Some(send) = self.conn_send_tx.get(&peer_id) { - if let Err(_err) = send.send(message).await { - warn!("conn receiver for {peer_id:?} dropped"); - self.conn_send_tx.remove(&peer_id); + let info = self.peers.entry(peer_id).or_default(); + match &mut info.state { + PeerState::Active { send_tx } => { + if let Err(_err) = send_tx.send(message).await { + // Removing the peer is handled by the in_event PeerDisconnected sent + // at the end of the connection task. + warn!("connection loop for {peer_id:?} dropped"); + } + } + PeerState::Pending { queue } => { + if queue.is_empty() { + self.dialer.queue_dial(peer_id, GOSSIP_ALPN); + } + queue.push(message); } - } else { - debug!(peer = ?peer_id, "dial"); - self.dialer.queue_dial(peer_id, GOSSIP_ALPN); - // TODO: Enforce max length - self.pending_sends.entry(peer_id).or_default().push(message); } } OutEvent::EmitEvent(topic_id, event) => { - if let Some(sender) = self.subscribers_all.as_mut() { - if let Err(_event) = sender.send((topic_id, event.clone())) { - self.subscribers_all = None; - } - } - if let Some(sender) = self.subscribers_topic.get(&topic_id) { - // Only error case is that all [broadcast::Receivers] have been dropped. - // If so, remove the sender as well. - if let Err(_event) = sender.send(event) { - self.subscribers_topic.remove(&topic_id); + let Some(state) = self.topics.get_mut(&topic_id) else { + // TODO: unreachable? + warn!(?topic_id, "gossip state emitted event for unknown topic"); + continue; + }; + let TopicState { + neighbors, + event_senders, + command_rx_keys, + } = state; + let event = if let ProtoEvent::NeighborUp(neighbor) = event { + let was_empty = neighbors.is_empty(); + neighbors.insert(neighbor); + if was_empty { + GossipEvent::Joined(vec![neighbor]) + } else { + GossipEvent::NeighborUp(neighbor) } + } else { + event.into() + }; + event_senders.send(&event); + if event_senders.is_empty() && command_rx_keys.is_empty() { + self.quit_queue.push_back(topic_id); } } OutEvent::ScheduleTimer(delay, timer) => { self.timers.insert(now + delay, timer); } - OutEvent::DisconnectPeer(peer) => { - if let Some(conn) = self.conns.remove(&peer) { - conn.close(0u8.into(), b"close from disconnect"); + OutEvent::DisconnectPeer(peer_id) => { + if let Some(peer) = self.peers.remove(&peer_id) { + if let Some(conn) = peer.conn_dialed { + conn.close(0u8.into(), b"close from disconnect"); + } + if let Some(conn) = peer.conn_accepted { + conn.close(0u8.into(), b"close from disconnect"); + } + drop(peer.state); } - self.conn_send_tx.remove(&peer); - self.pending_sends.remove(&peer); - self.dialer.abort_dial(peer); } OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) { Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"), @@ -604,40 +639,47 @@ impl Actor { } Ok(()) } +} - fn subscribe_all(&mut self) -> broadcast::Receiver<(TopicId, Event)> { - if let Some(tx) = self.subscribers_all.as_mut() { - tx.subscribe() - } else { - let (tx, rx) = broadcast::channel(SUBSCRIBE_ALL_CAP); - self.subscribers_all = Some(tx); - rx - } - } +#[derive(Debug, Default)] +struct PeerInfo { + state: PeerState, + conn_dialed: Option, + conn_accepted: Option, +} - fn subscribe(&mut self, topic_id: TopicId) -> broadcast::Receiver { - if let Some(tx) = self.subscribers_topic.get(&topic_id) { - tx.subscribe() - } else { - let (tx, rx) = broadcast::channel(SUBSCRIBE_TOPIC_CAP); - self.subscribers_topic.insert(topic_id, tx); - rx - } - } +#[derive(Debug)] +enum PeerState { + Pending { queue: Vec }, + Active { send_tx: mpsc::Sender }, } -async fn wait_for_neighbor_up(mut sub: broadcast::Receiver) -> anyhow::Result<()> { - loop { - match sub.recv().await { - Ok(Event::NeighborUp(_neighbor)) => break Ok(()), - Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => { - break Err(anyhow!("Failed to join swarm: channel closed")) - } - } +impl Default for PeerState { + fn default() -> Self { + PeerState::Pending { queue: Vec::new() } } } +#[derive(Debug, Default)] +struct TopicState { + neighbors: BTreeSet, + event_senders: EventSenders, + command_rx_keys: HashSet, +} + +/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept) +#[derive(Debug, Clone, Copy)] +enum ConnOrigin { + Accept, + Dial, +} +#[derive(derive_more::Debug)] +struct SubscriberChannels { + event_tx: async_channel::Sender>, + #[debug("CommandStream")] + command_rx: CommandStream, +} + async fn connection_loop( from: PublicKey, conn: Connection, @@ -645,14 +687,20 @@ async fn connection_loop( mut send_rx: mpsc::Receiver, in_event_tx: &mpsc::Sender, max_message_size: usize, + queue: Vec, ) -> anyhow::Result<()> { let (mut send, mut recv) = match origin { ConnOrigin::Accept => conn.accept_bi().await?, ConnOrigin::Dial => conn.open_bi().await?, }; + debug!("connection established"); let mut send_buf = BytesMut::new(); let mut recv_buf = BytesMut::new(); + let send_loop = async { + for msg in queue { + write_message(&mut send, &mut send_buf, &msg, max_message_size).await? + } while let Some(msg) = send_rx.recv().await { write_message(&mut send, &mut send_buf, &msg, max_message_size).await? } @@ -670,7 +718,7 @@ async fn connection_loop( Ok::<_, anyhow::Error>(()) }; - tokio::try_join!(send_loop, recv_loop)?; + (send_loop, recv_loop).try_join().await?; Ok(()) } @@ -690,10 +738,92 @@ fn decode_peer_data(peer_data: &PeerData) -> anyhow::Result { Ok(info) } +#[derive(Debug, Default)] +struct EventSenders { + senders: Vec<(async_channel::Sender>, bool)>, +} + +impl EventSenders { + fn is_empty(&self) -> bool { + self.senders.is_empty() + } + + fn push(&mut self, sender: async_channel::Sender>) { + self.senders.push((sender, false)); + } + + /// Send an event to all subscribers. + /// + /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. + fn send(&mut self, event: &GossipEvent) { + self.senders.retain_mut(|(send, lagged)| { + // If the stream is disconnected, we don't need to send to it. + if send.is_closed() { + return false; + } + + // Check if the send buffer is almost full, and send a lagged response if it is. + let cap = send.capacity().expect("we only use bounded channels"); + let event = if send.len() >= cap - 1 { + if *lagged { + return true; + } + *lagged = true; + Event::Lagged + } else { + *lagged = false; + Event::Gossip(event.clone()) + }; + match send.try_send(Ok(event)) { + Ok(()) => true, + Err(async_channel::TrySendError::Full(_)) => true, + Err(async_channel::TrySendError::Closed(_)) => false, + } + }) + } +} + +#[derive(derive_more::Debug)] +struct TopicCommandStream { + topic_id: TopicId, + #[debug("CommandStream")] + stream: CommandStream, + closed: bool, +} + +impl TopicCommandStream { + fn new(topic_id: TopicId, stream: CommandStream) -> Self { + Self { + topic_id, + stream, + closed: false, + } + } +} + +impl Stream for TopicCommandStream { + type Item = (TopicId, Option); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.closed { + return Poll::Ready(None); + } + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some((self.topic_id, Some(item)))), + Poll::Ready(None) => { + self.closed = true; + Poll::Ready(Some((self.topic_id, None))) + } + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod test { use std::time::Duration; + use bytes::Bytes; + use futures_concurrency::future::TryJoin; use iroh_net::key::SecretKey; use iroh_net::relay::{RelayMap, RelayMode}; use tokio::spawn; @@ -764,6 +894,7 @@ mod test { debug!("peer2 {:?}", ep2.node_id()); debug!("peer3 {:?}", ep3.node_id()); let pi1 = ep1.node_id(); + let pi2 = ep2.node_id(); let cancel = CancellationToken::new(); let tasks = [ @@ -774,31 +905,33 @@ mod test { debug!("----- adding peers ----- "); let topic: TopicId = blake3::hash(b"foobar").into(); - // share info that pi1 is on the same relay_node - let addr1 = NodeAddr::new(pi1).with_relay_url(relay_url); + + let addr1 = NodeAddr::new(pi1).with_relay_url(relay_url.clone()); + let addr2 = NodeAddr::new(pi2).with_relay_url(relay_url); ep2.add_node_addr(addr1.clone()).unwrap(); - ep3.add_node_addr(addr1).unwrap(); + ep3.add_node_addr(addr2).unwrap(); debug!("----- joining ----- "); // join the topics and wait for the connection to succeed - go1.join(topic, vec![]).await.unwrap(); - go2.join(topic, vec![pi1]).await.unwrap().await.unwrap(); - go3.join(topic, vec![pi1]).await.unwrap().await.unwrap(); + let [sub1, mut sub2, mut sub3] = [ + go1.join(topic, vec![]), + go2.join(topic, vec![pi1]), + go3.join(topic, vec![pi2]), + ] + .try_join() + .await + .unwrap(); - let len = 2; + let (sink1, _stream1) = sub1.split(); - // subscribe nodes 2 and 3 to the topic - let mut stream2 = go2.subscribe(topic).await.unwrap(); - let mut stream3 = go3.subscribe(topic).await.unwrap(); + let len = 2; // publish messages on node1 let pub1 = spawn(async move { for i in 0..len { let message = format!("hi{}", i); info!("go1 broadcast: {message:?}"); - go1.broadcast(topic, message.into_bytes().into()) - .await - .unwrap(); + sink1.broadcast(message.into_bytes().into()).await.unwrap(); tokio::time::sleep(Duration::from_micros(1)).await; } }); @@ -807,9 +940,9 @@ mod test { let sub2 = spawn(async move { let mut recv = vec![]; loop { - let ev = stream2.recv().await.unwrap(); + let ev = sub2.next().await.unwrap().unwrap(); info!("go2 event: {ev:?}"); - if let Event::Received(msg) = ev { + if let Event::Gossip(GossipEvent::Received(msg)) = ev { recv.push(msg.content); } if recv.len() == len { @@ -822,9 +955,9 @@ mod test { let sub3 = spawn(async move { let mut recv = vec![]; loop { - let ev = stream3.recv().await.unwrap(); + let ev = sub3.next().await.unwrap().unwrap(); info!("go3 event: {ev:?}"); - if let Event::Received(msg) = ev { + if let Event::Gossip(GossipEvent::Received(msg)) = ev { recv.push(msg.content); } if recv.len() == len { diff --git a/iroh-gossip/src/net/handles.rs b/iroh-gossip/src/net/handles.rs new file mode 100644 index 0000000000..c082192224 --- /dev/null +++ b/iroh-gossip/src/net/handles.rs @@ -0,0 +1,254 @@ +//! Topic handles for sending and receiving on a gossip topic. +//! +//! These are returned from [`super::Gossip`]. + +use std::{ + collections::{BTreeSet, HashSet}, + pin::Pin, + task::{Context, Poll}, +}; + +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use futures_lite::{Stream, StreamExt}; +use iroh_net::NodeId; +use serde::{Deserialize, Serialize}; + +use crate::{net::TOPIC_EVENTS_DEFAULT_CAP, proto::DeliveryScope}; + +/// Sender for a gossip topic. +#[derive(Debug)] +pub struct GossipSender(async_channel::Sender); + +impl GossipSender { + pub(crate) fn new(sender: async_channel::Sender) -> Self { + Self(sender) + } + + /// Broadcast a message to all nodes. + pub async fn broadcast(&self, message: Bytes) -> anyhow::Result<()> { + self.0 + .send(Command::Broadcast(message)) + .await + .map_err(|_| anyhow!("Gossip actor dropped")) + } + + /// Broadcast a message to our direct neighbors. + pub async fn broadcast_neighbors(&self, message: Bytes) -> anyhow::Result<()> { + self.0 + .send(Command::BroadcastNeighbors(message)) + .await + .map_err(|_| anyhow!("Gossip actor dropped")) + } + + /// Join a set of peers. + pub async fn join_peers(&self, peers: Vec) -> anyhow::Result<()> { + self.0 + .send(Command::JoinPeers(peers)) + .await + .map_err(|_| anyhow!("Gossip actor dropped")) + } +} + +type EventStream = Pin> + Send + 'static>>; + +/// Subscribed gossip topic. +/// +/// This handle is a [`Stream`] of [`Event`]s from the topic, and can be used to send messages. +/// +/// It may be split into sender and receiver parts with [`Self::split`]. +#[derive(Debug)] +pub struct GossipTopic { + sender: GossipSender, + receiver: GossipReceiver, +} + +impl GossipTopic { + pub(crate) fn new(sender: async_channel::Sender, receiver: EventStream) -> Self { + Self { + sender: GossipSender::new(sender), + receiver: GossipReceiver::new(Box::pin(receiver)), + } + } + + /// Splits `self` into [`GossipSender`] and [`GossipReceiver`] parts. + pub fn split(self) -> (GossipSender, GossipReceiver) { + (self.sender, self.receiver) + } + + /// Sends a message to all peers. + pub async fn broadcast(&self, message: Bytes) -> anyhow::Result<()> { + self.sender.broadcast(message).await + } + + /// Sends a message to our direct neighbors in the swarm. + pub async fn broadcast_neighbors(&self, message: Bytes) -> anyhow::Result<()> { + self.sender.broadcast_neighbors(message).await + } + + /// Waits until we are connected to at least one node. + pub async fn joined(&mut self) -> Result<()> { + self.receiver.joined().await + } + + /// Returns true if we are connected to at least one node. + pub fn is_joined(&self) -> bool { + self.receiver.is_joined() + } +} + +impl Stream for GossipTopic { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.receiver).poll_next(cx) + } +} + +/// Receiver for gossip events on a topic. +/// +/// This is a [`Stream`] of [`Event`]s emitted from the topic. +#[derive(derive_more::Debug)] +pub struct GossipReceiver { + #[debug("EventStream")] + stream: EventStream, + neighbors: HashSet, +} + +impl GossipReceiver { + pub(crate) fn new(events_rx: EventStream) -> Self { + Self { + stream: events_rx, + neighbors: Default::default(), + } + } + + /// Lists our current direct neighbors. + pub fn neighbors(&self) -> impl Iterator + '_ { + self.neighbors.iter().copied() + } + + /// Waits until we are connected to at least one node. + pub async fn joined(&mut self) -> Result<()> { + while self.neighbors.is_empty() { + let _ = self.try_next().await?; + } + Ok(()) + } + + /// Returns true if we are connected to at least one node. + pub fn is_joined(&self) -> bool { + !self.neighbors.is_empty() + } +} + +impl Stream for GossipReceiver { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = std::task::ready!(Pin::new(&mut self.stream).poll_next(cx)); + if let Some(Ok(item)) = &item { + match item { + Event::Gossip(GossipEvent::Joined(neighbors)) => { + self.neighbors.extend(neighbors.iter().copied()); + } + Event::Gossip(GossipEvent::NeighborUp(node_id)) => { + self.neighbors.insert(*node_id); + } + Event::Gossip(GossipEvent::NeighborDown(node_id)) => { + self.neighbors.remove(node_id); + } + _ => {} + } + } + Poll::Ready(item) + } +} + +/// Update from a subscribed gossip topic. +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub enum Event { + /// A message was received. + Gossip(GossipEvent), + /// We missed some messages. + Lagged, +} + +/// Gossip event +/// An event to be emitted to the application for a particular topic. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub enum GossipEvent { + /// We joined the topic with at least one peer. + Joined(Vec), + /// We have a new, direct neighbor in the swarm membership layer for this topic + NeighborUp(NodeId), + /// We dropped direct neighbor in the swarm membership layer for this topic + NeighborDown(NodeId), + /// A gossip message was received for this topic + Received(Message), +} + +impl From> for GossipEvent { + fn from(event: crate::proto::Event) -> Self { + match event { + crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), + crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), + crate::proto::Event::Received(message) => Self::Received(Message { + content: message.content, + scope: message.scope, + delivered_from: message.delivered_from, + }), + } + } +} + +/// A gossip message +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, derive_more::Debug, Serialize, Deserialize)] +pub struct Message { + /// The content of the message + #[debug("Bytes({})", self.content.len())] + pub content: Bytes, + /// The scope of the message. + /// This tells us if the message is from a direct neighbor or actual gossip. + pub scope: DeliveryScope, + /// The node that delivered the message. This is not the same as the original author. + pub delivered_from: NodeId, +} + +/// A stream of commands for a gossip subscription. +pub type CommandStream = Pin + Send + Sync + 'static>>; + +/// Send a gossip message +#[derive(Serialize, Deserialize, derive_more::Debug)] +pub enum Command { + /// Broadcast a message to all nodes in the swarm + Broadcast(#[debug("Bytes({})", _0.len())] Bytes), + /// Broadcast a message to all direct neighbors + BroadcastNeighbors(#[debug("Bytes({})", _0.len())] Bytes), + /// Connect to a set of peers + JoinPeers(Vec), +} + +/// Options for joining a gossip topic. +#[derive(Serialize, Deserialize, Debug)] +pub struct JoinOptions { + /// The initial bootstrap nodes + pub bootstrap: BTreeSet, + /// The maximum number of messages that can be buffered in a subscription. + /// + /// If this limit is reached, the subscriber will receive a `Lagged` response, + /// the message will be dropped, and the subscriber will be closed. + /// + /// This is to prevent a single slow subscriber from blocking the dispatch loop. + /// If a subscriber is lagging, it should be closed and re-opened. + pub subscription_capacity: usize, +} + +impl JoinOptions { + /// Creates [`JoinOptions`] with the provided bootstrap nodes and the default subscription + /// capacity. + pub fn with_bootstrap(nodes: impl IntoIterator) -> Self { + Self { + bootstrap: nodes.into_iter().collect(), + subscription_capacity: TOPIC_EVENTS_DEFAULT_CAP, + } + } +} diff --git a/iroh-gossip/src/proto/state.rs b/iroh-gossip/src/proto/state.rs index a841342014..b8561aeeef 100644 --- a/iroh-gossip/src/proto/state.rs +++ b/iroh-gossip/src/proto/state.rs @@ -216,11 +216,6 @@ impl State { match event { InEventMapped::TopicEvent(topic, event) => { - // when receiving messages, update our conn map to take note that this topic state may want - // to keep this connection - if let topic::InEvent::RecvMessage(from, _message) = &event { - self.peer_topics.entry(*from).or_default().insert(topic); - } // when receiving a join command, initialize state if it doesn't exist if matches!(&event, topic::InEvent::Command(Command::Join(_peers))) { if let hash_map::Entry::Vacant(e) = self.states.entry(topic) { @@ -239,6 +234,11 @@ impl State { // pass the event to the state handler if let Some(state) = self.states.get_mut(&topic) { + // when receiving messages, update our conn map to take note that this topic state may want + // to keep this connection + if let topic::InEvent::RecvMessage(from, _message) = &event { + self.peer_topics.entry(*from).or_default().insert(topic); + } let out = state.handle(event, now); for event in out { handle_out_event(topic, event, &mut self.peer_topics, &mut self.outbox); diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 9f24736365..dce8fd611c 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -55,14 +55,14 @@ impl Client { /// /// Returns a sink to send updates to the topic and a stream of responses. /// - /// Updates are either [Broadcast](iroh_gossip::dispatcher::Command::Broadcast) - /// or [BroadcastNeighbors](iroh_gossip::dispatcher::Command::BroadcastNeighbors). + /// Updates are either [Broadcast](iroh_gossip::net::Command::Broadcast) + /// or [BroadcastNeighbors](iroh_gossip::net::Command::BroadcastNeighbors). /// /// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to /// just the immediate neighbors of the node. /// - /// Responses are either [Gossip](iroh_gossip::dispatcher::Event::Gossip) or - /// [Lagged](iroh_gossip::dispatcher::Event::Lagged). + /// Responses are either [Gossip](iroh_gossip::net::Event::Gossip) or + /// [Lagged](iroh_gossip::net::Event::Lagged). /// /// Gossip events contain the actual message content, as well as information about the /// immediate neighbors of the node. diff --git a/iroh/src/metrics.rs b/iroh/src/metrics.rs index fb8e1cc93d..646fddf26d 100644 --- a/iroh/src/metrics.rs +++ b/iroh/src/metrics.rs @@ -74,7 +74,6 @@ pub fn try_init_metrics_collection() -> std::io::Result<()> { metrics.insert(iroh_net::metrics::MagicsockMetrics::new(reg)); metrics.insert(iroh_net::metrics::NetcheckMetrics::new(reg)); metrics.insert(iroh_net::metrics::PortmapMetrics::new(reg)); - metrics.insert(iroh_net::metrics::RelayMetrics::new(reg)); }) } @@ -101,10 +100,6 @@ pub fn get_metrics() -> anyhow::Result> { core.get_collector::(), &mut map, ); - collect( - core.get_collector::(), - &mut map, - ); Ok(map) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 182f1dcc50..1ebd808132 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -46,7 +46,6 @@ use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; -use iroh_gossip::dispatcher::GossipDispatcher; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::{ @@ -116,7 +115,6 @@ struct NodeInner { cancel_token: CancellationToken, client: crate::client::Iroh, downloader: Downloader, - gossip_dispatcher: GossipDispatcher, local_pool_handle: LocalPoolHandle, } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index a208cdaa94..456bf44d5a 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -15,10 +15,7 @@ use iroh_blobs::{ }; use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; -use iroh_gossip::{ - dispatcher::GossipDispatcher, - net::{Gossip, GOSSIP_ALPN}, -}; +use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; #[cfg(not(test))] use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery; use iroh_net::{ @@ -557,7 +554,6 @@ where downloader.clone(), ) .await?; - let gossip_dispatcher = GossipDispatcher::new(gossip.clone()); // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection::(32); @@ -577,7 +573,6 @@ where cancel_token: CancellationToken::new(), downloader, gossip, - gossip_dispatcher, local_pool_handle: lp.handle().clone(), }); diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 467e91d402..fa7c3398bc 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -209,14 +209,15 @@ impl Handler { match msg { Subscribe(msg) => { chan.bidi_streaming(msg, self, |handler, req, updates| { - handler.inner.gossip_dispatcher.subscribe_with_opts( + let stream = handler.inner.gossip.join_with_stream( req.topic, - iroh_gossip::dispatcher::SubscribeOptions { + iroh_gossip::net::JoinOptions { bootstrap: req.bootstrap, subscription_capacity: req.subscription_capacity, }, - Box::new(updates), - ) + Box::pin(updates), + ); + futures_util::TryStreamExt::map_err(stream, RpcError::from) }) .await } diff --git a/iroh/src/rpc_protocol/gossip.rs b/iroh/src/rpc_protocol/gossip.rs index f9a64dda5b..8e877ad297 100644 --- a/iroh/src/rpc_protocol/gossip.rs +++ b/iroh/src/rpc_protocol/gossip.rs @@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize}; use super::RpcService; -pub use iroh_gossip::dispatcher::Command as SubscribeUpdate; -pub use iroh_gossip::dispatcher::Event as SubscribeResponse; +pub use iroh_gossip::net::Command as SubscribeUpdate; +pub use iroh_gossip::net::Event as SubscribeResponse; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index cd1297f0c9..a22ce04eec 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -3,7 +3,7 @@ use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; use iroh::client::Iroh; use iroh_gossip::{ - dispatcher::{Command, Event, GossipEvent}, + net::{Command, Event, GossipEvent}, proto::TopicId, }; use iroh_net::{key::SecretKey, NodeAddr}; @@ -11,8 +11,8 @@ use testresult::TestResult; /// Spawn an iroh node in a separate thread and tokio runtime, and return /// the address and client. -fn spawn_node() -> (NodeAddr, Iroh) { - let (sender, receiver) = std::sync::mpsc::channel(); +async fn spawn_node() -> (NodeAddr, Iroh) { + let (sender, receiver) = tokio::sync::oneshot::channel(); std::thread::spawn(move || { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -21,16 +21,18 @@ fn spawn_node() -> (NodeAddr, Iroh) { let secret_key = SecretKey::generate(); let node = iroh::node::Builder::default() .secret_key(secret_key) + .relay_mode(iroh_net::relay::RelayMode::Disabled) + .node_discovery(iroh::node::DiscoveryConfig::None) .spawn() .await?; let addr = node.node_addr().await?; - sender.send((addr, node.client().clone()))?; + sender.send((addr, node.client().clone())).unwrap(); node.cancel_token().cancelled().await; anyhow::Ok(()) })?; anyhow::Ok(()) }); - receiver.recv().unwrap() + receiver.await.unwrap() } /// Await `n` messages from a stream of gossip events. @@ -62,15 +64,23 @@ async fn await_messages( #[ignore = "flaky"] async fn gossip_smoke() -> TestResult { let _ = tracing_subscriber::fmt::try_init(); - let (addr1, node1) = spawn_node(); - let (addr2, node2) = spawn_node(); + let (addr1, node1) = spawn_node().await; + let (addr2, node2) = spawn_node().await; let gossip1 = node1.gossip(); let gossip2 = node2.gossip(); node1.add_node_addr(addr2.clone()).await?; node2.add_node_addr(addr1.clone()).await?; + let topic = TopicId::from([0u8; 32]); - let (mut sink1, _stream1) = gossip1.subscribe(topic, [addr2.node_id]).await?; + let (mut sink1, mut stream1) = gossip1.subscribe(topic, [addr2.node_id]).await?; let (_sink2, stream2) = gossip2.subscribe(topic, [addr1.node_id]).await?; + + assert_eq!( + stream1.next().await.unwrap().unwrap(), + Event::Gossip(GossipEvent::Joined(vec![addr2.node_id])) + ); + drop(stream1); + sink1.send(Command::Broadcast("hello".into())).await?; let msgs = await_messages(stream2, 1).await?; assert_eq!(msgs, vec![Bytes::from("hello")]); @@ -78,11 +88,10 @@ async fn gossip_smoke() -> TestResult { } #[tokio::test] -#[ignore = "flaky"] async fn gossip_drop_sink() -> TestResult { let _ = tracing_subscriber::fmt::try_init(); - let (addr1, node1) = spawn_node(); - let (addr2, node2) = spawn_node(); + let (addr1, node1) = spawn_node().await; + let (addr2, node2) = spawn_node().await; let gossip1 = node1.gossip(); let gossip2 = node2.gossip(); node1.add_node_addr(addr2.clone()).await?; @@ -90,9 +99,14 @@ async fn gossip_drop_sink() -> TestResult { let topic = TopicId::from([0u8; 32]); - let (mut sink1, stream1) = gossip1.subscribe(topic, [addr2.node_id]).await?; + let (mut sink1, mut stream1) = gossip1.subscribe(topic, [addr2.node_id]).await?; let (sink2, stream2) = gossip2.subscribe(topic, [addr1.node_id]).await?; + assert_eq!( + stream1.next().await.unwrap().unwrap(), + Event::Gossip(GossipEvent::Joined(vec![addr2.node_id])) + ); + drop(stream1); drop(sink2);