diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index ec6087008b..a493c177f3 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -61,7 +61,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::node_info::NodeInfo, endpoint::force_staging_infra, - watcher::{Direct, Disconnected, Watchable, Watcher as _}, + watcher::{self, Disconnected, Watchable, Watcher as _}, Endpoint, }; @@ -221,7 +221,7 @@ struct PublisherService { secret_key: SecretKey, #[debug("PkarrClient")] pkarr_client: PkarrRelayClient, - watcher: Direct>, + watcher: watcher::Direct>, ttl: u32, republish_interval: Duration, } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 5d687a7898..23ac5ee9ab 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -779,7 +779,7 @@ impl Endpoint { /// /// ```no_run /// # async fn wrapper() -> testresult::TestResult { - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// let endpoint = Endpoint::builder() /// .alpns(vec![b"my-alpn".to_vec()]) @@ -830,7 +830,7 @@ impl Endpoint { /// To wait for a home relay connection to be established, use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { @@ -866,7 +866,7 @@ impl Endpoint { /// To get the first set of direct addresses use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index 7080b058bd..951760f120 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -13,11 +13,7 @@ use tokio::{ use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, info_span, trace, Instrument}; -use crate::{ - magicsock::ConnectionType, - metrics::MagicsockMetrics, - watcher::{Direct, Stream}, -}; +use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watcher}; #[derive(Debug)] pub(super) struct RttHandle { @@ -55,7 +51,7 @@ pub(super) enum RttMessage { /// The connection. connection: quinn::WeakConnectionHandle, /// Path changes for this connection from the magic socket. - conn_type_changes: Stream>, + conn_type_changes: watcher::Stream>, /// For reporting-only, the Node ID of this connection. node_id: NodeId, }, @@ -68,7 +64,7 @@ pub(super) enum RttMessage { #[derive(Debug)] struct RttActor { /// Stream of connection type changes. - connection_events: stream_group::Keyed>>, + connection_events: stream_group::Keyed>>, /// References to the connections. /// /// These are weak references so not to keep the connections alive. The key allows @@ -125,7 +121,7 @@ impl RttActor { fn handle_new_connection( &mut self, connection: quinn::WeakConnectionHandle, - conn_type_changes: Stream>, + conn_type_changes: watcher::Stream>, node_id: NodeId, ) { let key = self.connection_events.insert(conn_type_changes); diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 094d12ee78..8ceaf1d706 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -21,7 +21,7 @@ use super::{ }; use crate::{ disco::{CallMeMaybe, Pong, SendAddr}, - watcher::Direct, + watcher, }; mod best_addr; @@ -293,7 +293,10 @@ impl NodeMap { /// the `node_id` /// /// [`Watcher`]: crate::watcher::Watcher - pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + pub(super) fn conn_type( + &self, + node_id: NodeId, + ) -> anyhow::Result> { self.inner.lock().expect("poisoned").conn_type(node_id) } @@ -461,7 +464,7 @@ impl NodeMapInner { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { match self.get(NodeStateKey::NodeId(node_id)) { Some(ep) => Ok(ep.conn_type()), None => anyhow::bail!("No endpoint for {node_id:?} found"), diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index f5a738e1a0..1f457812ed 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use crate::{ disco::{self, SendAddr}, magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, util::relay_only_mode, - watcher::{Direct, Watchable}, + watcher::{self, Watchable}, }; /// Number of addresses that are not active that we keep around per node. @@ -191,7 +191,7 @@ impl NodeState { self.id } - pub(super) fn conn_type(&self) -> Direct { + pub(super) fn conn_type(&self) -> watcher::Direct { self.conn_type.watch() } diff --git a/iroh/src/watcher.rs b/iroh/src/watcher.rs index b7f15a9df8..f0297152bf 100644 --- a/iroh/src/watcher.rs +++ b/iroh/src/watcher.rs @@ -7,6 +7,11 @@ //! In that way, a [`Watchable`] is like a [`tokio::sync::broadcast::Sender`] (and a //! [`Watcher`] is like a [`tokio::sync::broadcast::Receiver`]), except that there's no risk //! of the channel filling up, but instead you might miss items. +//! +//! This module is meant to be imported like this (if you use all of these things): +//! ```ignore +//! use iroh::watcher::{self, Watchable, Watcher as _}; +//! ``` #[cfg(not(iroh_loom))] use std::sync;