Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Dec 19, 2024
1 parent e5caf89 commit cd2bf41
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 18 deletions.
4 changes: 2 additions & 2 deletions iroh/src/discovery/pkarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
discovery::{Discovery, DiscoveryItem},
dns::node_info::NodeInfo,
endpoint::force_staging_infra,
watcher::{Direct, Disconnected, Watchable, Watcher as _},
watcher::{self, Disconnected, Watchable, Watcher as _},
Endpoint,
};

Expand Down Expand Up @@ -221,7 +221,7 @@ struct PublisherService {
secret_key: SecretKey,
#[debug("PkarrClient")]
pkarr_client: PkarrRelayClient,
watcher: Direct<Option<NodeInfo>>,
watcher: watcher::Direct<Option<NodeInfo>>,
ttl: u32,
republish_interval: Duration,
}
Expand Down
6 changes: 3 additions & 3 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ impl Endpoint {
///
/// ```no_run
/// # async fn wrapper() -> testresult::TestResult {
/// use iroh::{Endpoint, watcher::{Watcher as _}};
/// use iroh::{Endpoint, watcher::Watcher};
///
/// let endpoint = Endpoint::builder()
/// .alpns(vec![b"my-alpn".to_vec()])
Expand Down Expand Up @@ -830,7 +830,7 @@ impl Endpoint {
/// To wait for a home relay connection to be established, use [`Watcher::initialized`]:
/// ```no_run
/// use futures_lite::StreamExt;
/// use iroh::{Endpoint, watcher::{Watcher as _}};
/// use iroh::{Endpoint, watcher::Watcher};
///
/// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
/// # rt.block_on(async move {
Expand Down Expand Up @@ -866,7 +866,7 @@ impl Endpoint {
/// To get the first set of direct addresses use [`Watcher::initialized`]:
/// ```no_run
/// use futures_lite::StreamExt;
/// use iroh::{Endpoint, watcher::{Watcher as _}};
/// use iroh::{Endpoint, watcher::Watcher};
///
/// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
/// # rt.block_on(async move {
Expand Down
12 changes: 4 additions & 8 deletions iroh/src/endpoint/rtt_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ use tokio::{
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, info_span, trace, Instrument};

use crate::{
magicsock::ConnectionType,
metrics::MagicsockMetrics,
watcher::{Direct, Stream},
};
use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watcher};

#[derive(Debug)]
pub(super) struct RttHandle {
Expand Down Expand Up @@ -55,7 +51,7 @@ pub(super) enum RttMessage {
/// The connection.
connection: quinn::WeakConnectionHandle,
/// Path changes for this connection from the magic socket.
conn_type_changes: Stream<Direct<ConnectionType>>,
conn_type_changes: watcher::Stream<watcher::Direct<ConnectionType>>,
/// For reporting-only, the Node ID of this connection.
node_id: NodeId,
},
Expand All @@ -68,7 +64,7 @@ pub(super) enum RttMessage {
#[derive(Debug)]
struct RttActor {
/// Stream of connection type changes.
connection_events: stream_group::Keyed<Stream<Direct<ConnectionType>>>,
connection_events: stream_group::Keyed<watcher::Stream<watcher::Direct<ConnectionType>>>,
/// References to the connections.
///
/// These are weak references so not to keep the connections alive. The key allows
Expand Down Expand Up @@ -125,7 +121,7 @@ impl RttActor {
fn handle_new_connection(
&mut self,
connection: quinn::WeakConnectionHandle,
conn_type_changes: Stream<Direct<ConnectionType>>,
conn_type_changes: watcher::Stream<watcher::Direct<ConnectionType>>,
node_id: NodeId,
) {
let key = self.connection_events.insert(conn_type_changes);
Expand Down
9 changes: 6 additions & 3 deletions iroh/src/magicsock/node_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
};
use crate::{
disco::{CallMeMaybe, Pong, SendAddr},
watcher::Direct,
watcher,
};

mod best_addr;
Expand Down Expand Up @@ -293,7 +293,10 @@ impl NodeMap {
/// the `node_id`
///
/// [`Watcher`]: crate::watcher::Watcher
pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result<Direct<ConnectionType>> {
pub(super) fn conn_type(
&self,
node_id: NodeId,
) -> anyhow::Result<watcher::Direct<ConnectionType>> {
self.inner.lock().expect("poisoned").conn_type(node_id)
}

Expand Down Expand Up @@ -461,7 +464,7 @@ impl NodeMapInner {
///
/// Will return an error if there is not an entry in the [`NodeMap`] for
/// the `public_key`
fn conn_type(&self, node_id: NodeId) -> anyhow::Result<Direct<ConnectionType>> {
fn conn_type(&self, node_id: NodeId) -> anyhow::Result<watcher::Direct<ConnectionType>> {
match self.get(NodeStateKey::NodeId(node_id)) {
Some(ep) => Ok(ep.conn_type()),
None => anyhow::bail!("No endpoint for {node_id:?} found"),
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
disco::{self, SendAddr},
magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL},
util::relay_only_mode,
watcher::{Direct, Watchable},
watcher::{self, Watchable},
};

/// Number of addresses that are not active that we keep around per node.
Expand Down Expand Up @@ -191,7 +191,7 @@ impl NodeState {
self.id
}

pub(super) fn conn_type(&self) -> Direct<ConnectionType> {
pub(super) fn conn_type(&self) -> watcher::Direct<ConnectionType> {
self.conn_type.watch()
}

Expand Down
5 changes: 5 additions & 0 deletions iroh/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
//! In that way, a [`Watchable`] is like a [`tokio::sync::broadcast::Sender`] (and a
//! [`Watcher`] is like a [`tokio::sync::broadcast::Receiver`]), except that there's no risk
//! of the channel filling up, but instead you might miss items.
//!
//! This module is meant to be imported like this (if you use all of these things):
//! ```ignore
//! use iroh::watcher::{self, Watchable, Watcher as _};
//! ```
#[cfg(not(iroh_loom))]
use std::sync;
Expand Down

0 comments on commit cd2bf41

Please sign in to comment.