From 1cb8c097d48c6b8e1328b84881d7a46c6c9166ed Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 09:53:50 +0100 Subject: [PATCH 1/7] Rename `Event` to `HighLevelEvent` --- light-base/src/network_service.rs | 63 +++++++++++++---------- light-base/src/sync_service/parachain.rs | 13 +++-- light-base/src/sync_service/standalone.rs | 25 +++++---- 3 files changed, 60 insertions(+), 41 deletions(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index f45c181ea8..99d882694b 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -36,9 +36,9 @@ //! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically //! purged. //! -//! An important part of the API is the list of channel receivers of [`Event`] returned by -//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the -//! network connectivity. +//! An important part of the API is the list of channel receivers of [`HighLevelEvent`] returned +//! by [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to +//! the network connectivity. use crate::{ log, @@ -310,7 +310,7 @@ impl NetworkServiceChain { /// Panics if the given [`ChainId`] is invalid. /// // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour - pub async fn subscribe(&self) -> async_channel::Receiver { + pub async fn subscribe(&self) -> async_channel::Receiver { let (tx, rx) = async_channel::bounded(128); let _ = self @@ -322,17 +322,17 @@ impl NetworkServiceChain { rx } - /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be - /// generated. Prevents a new gossip link with the same peer from being reopened for a - /// little while. + /// Starts asynchronously disconnecting the given peer. A [`HighLevelEvent::Disconnected`] + /// will later be generated. Prevents a new gossip link with the same peer from being reopened + /// for a little while. /// /// `reason` is a human-readable string printed in the logs. /// /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the /// reconnection was already happening as the call to this function is still being processed. - /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other - /// words, this function guarantees that we will be disconnected in the future rather than - /// guarantees that we will disconnect. + /// If that happens another [`HighLevelEvent::Disconnected`] will be delivered afterwards. + /// In other words, this function guarantees that we will be disconnected in the future rather + /// than guarantees that we will disconnect. pub async fn ban_and_disconnect( &self, peer_id: PeerId, @@ -582,7 +582,7 @@ impl NetworkServiceChain { /// Event that can happen on the network service. #[derive(Debug, Clone)] -pub enum Event { +pub enum HighLevelEvent { Connected { peer_id: PeerId, role: Role, @@ -673,7 +673,7 @@ enum ToBackground { enum ToBackgroundChain { RemoveChain, Subscribe { - sender: async_channel::Sender, + sender: async_channel::Sender, }, DisconnectAndBan { peer_id: PeerId, @@ -790,7 +790,7 @@ struct BackgroundTask { important_nodes: HashSet, /// Event about to be sent on the senders of [`BackgroundTask::event_senders`]. - event_pending_send: Option<(ChainId, Event)>, + event_pending_send: Option<(ChainId, HighLevelEvent)>, /// Sending events through the public API. /// @@ -798,13 +798,18 @@ struct BackgroundTask { /// the senders back once it is finished. // TODO: sort by ChainId instead of using a Vec? event_senders: either::Either< - Vec<(ChainId, async_channel::Sender)>, - Pin)>> + Send>>, + Vec<(ChainId, async_channel::Sender)>, + Pin< + Box< + dyn future::Future)>> + + Send, + >, + >, >, /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list. /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders. - pending_new_subscriptions: Vec<(ChainId, async_channel::Sender)>, + pending_new_subscriptions: Vec<(ChainId, async_channel::Sender)>, main_messages_rx: Pin>>>, @@ -1131,7 +1136,7 @@ async fn background_task(mut task: BackgroundTask) { } let _ = new_subscription - .send(Event::Connected { + .send(HighLevelEvent::Connected { peer_id: peer_id.clone(), role: state.role, best_block_number: state.best_block_number, @@ -1141,7 +1146,7 @@ async fn background_task(mut task: BackgroundTask) { if let Some(finalized_block_height) = state.finalized_block_height { let _ = new_subscription - .send(Event::GrandpaNeighborPacket { + .send(HighLevelEvent::GrandpaNeighborPacket { peer_id: peer_id.clone(), finalized_block_height, }) @@ -1273,7 +1278,8 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(_was_in.is_some()); debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); + task.event_pending_send = + Some((chain_id, HighLevelEvent::Disconnected { peer_id })); } } WakeUpReason::MessageForChain( @@ -1857,8 +1863,10 @@ async fn background_task(mut task: BackgroundTask) { } debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = - Some((chain_id, Event::BlockAnnounce { peer_id, announce })); + task.event_pending_send = Some(( + chain_id, + HighLevelEvent::BlockAnnounce { peer_id, announce }, + )); } WakeUpReason::NetworkEvent(service::Event::GossipConnected { peer_id, @@ -1893,7 +1901,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some(( chain_id, - Event::Connected { + HighLevelEvent::Connected { peer_id, role, best_block_number: best_number, @@ -2001,7 +2009,8 @@ async fn background_task(mut task: BackgroundTask) { } debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); + task.event_pending_send = + Some((chain_id, HighLevelEvent::Disconnected { peer_id })); } WakeUpReason::NetworkEvent(service::Event::RequestResult { substream_id, @@ -2443,7 +2452,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some(( chain_id, - Event::GrandpaNeighborPacket { + HighLevelEvent::GrandpaNeighborPacket { peer_id, finalized_block_height: state.commit_finalized_height, }, @@ -2465,8 +2474,10 @@ async fn background_task(mut task: BackgroundTask) { ); debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = - Some((chain_id, Event::GrandpaCommitMessage { peer_id, message })); + task.event_pending_send = Some(( + chain_id, + HighLevelEvent::GrandpaCommitMessage { peer_id, message }, + )); } WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => { // TODO: handle properly? diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index 7b72318489..ecb8d45126 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -110,7 +110,8 @@ struct ParachainBackgroundTask { network_service: Arc>, /// Events coming from the networking service. `None` if not subscribed yet. - from_network_service: Option>>>, + from_network_service: + Option>>>, /// Runtime service of the relay chain. relay_chain_sync: Arc>, @@ -219,7 +220,7 @@ impl ParachainBackgroundTask { Notification(runtime_service::Notification), SubscriptionDead, MustSubscribeNetworkEvents, - NetworkEvent(network_service::Event), + NetworkEvent(network_service::HighLevelEvent), AdvanceSyncTree, } @@ -1261,7 +1262,7 @@ impl ParachainBackgroundTask { } ( - WakeUpReason::NetworkEvent(network_service::Event::Connected { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::Connected { peer_id, role, best_block_number, @@ -1278,7 +1279,9 @@ impl ParachainBackgroundTask { } ( - WakeUpReason::NetworkEvent(network_service::Event::Disconnected { peer_id }), + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::Disconnected { + peer_id, + }), _, ) => { let local_id = self.sync_sources_map.remove(&peer_id).unwrap(); @@ -1287,7 +1290,7 @@ impl ParachainBackgroundTask { } ( - WakeUpReason::NetworkEvent(network_service::Event::BlockAnnounce { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::BlockAnnounce { peer_id, announce, }), diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index d518b334aa..f588f93042 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -163,7 +163,7 @@ pub(super) async fn start_standalone_chain( MustUpdateNetworkWithBestBlock, MustUpdateNetworkWithFinalizedBlock, MustSubscribeNetworkEvents, - NetworkEvent(network_service::Event), + NetworkEvent(network_service::HighLevelEvent), ForegroundMessage(ToBackground), ForegroundClosed, StartRequest(all::SourceId, all::DesiredRequest), @@ -255,7 +255,7 @@ pub(super) async fn start_standalone_chain( }; match wake_up_reason { - WakeUpReason::NetworkEvent(network_service::Event::Connected { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::Connected { peer_id, role, best_block_number, @@ -268,7 +268,9 @@ pub(super) async fn start_standalone_chain( ); } - WakeUpReason::NetworkEvent(network_service::Event::Disconnected { peer_id }) => { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::Disconnected { + peer_id, + }) => { let sync_source_id = task.peers_source_id_map.remove(&peer_id).unwrap(); let (_, requests) = task.sync.remove_source(sync_source_id); @@ -281,7 +283,7 @@ pub(super) async fn start_standalone_chain( } } - WakeUpReason::NetworkEvent(network_service::Event::BlockAnnounce { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::BlockAnnounce { peer_id, announce, }) => { @@ -319,16 +321,18 @@ pub(super) async fn start_standalone_chain( } } - WakeUpReason::NetworkEvent(network_service::Event::GrandpaNeighborPacket { - peer_id, - finalized_block_height, - }) => { + WakeUpReason::NetworkEvent( + network_service::HighLevelEvent::GrandpaNeighborPacket { + peer_id, + finalized_block_height, + }, + ) => { let sync_source_id = *task.peers_source_id_map.get(&peer_id).unwrap(); task.sync .update_source_finality_state(sync_source_id, finalized_block_height); } - WakeUpReason::NetworkEvent(network_service::Event::GrandpaCommitMessage { + WakeUpReason::NetworkEvent(network_service::HighLevelEvent::GrandpaCommitMessage { peer_id, message, }) => { @@ -909,7 +913,8 @@ struct Task { /// Chain of the network service. Used to send out requests to peers. network_service: Arc>, /// Events coming from the networking service. `None` if not subscribed yet. - from_network_service: Option>>>, + from_network_service: + Option>>>, /// List of requests currently in progress. pending_requests: stream::FuturesUnordered< From 6282968a6e52e24cb0842af5235cc8fbf2611aab Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 11:20:48 +0100 Subject: [PATCH 2/7] Draft `LowLevelEvent` --- light-base/src/network_service.rs | 63 ++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 99d882694b..693c58f654 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -73,7 +73,9 @@ use smoldot::{ }; pub use codec::{CallProofRequestConfig, Role}; -pub use service::{ChainId, EncodedMerkleProof, PeerId, QueueNotificationError}; +pub use service::{ + ChainId, ConnectionId, EncodedMerkleProof, PeerId, QueueNotificationError, SubstreamId, +}; mod tasks; @@ -580,6 +582,40 @@ impl NetworkServiceChain { } } +/// Event that can happen on the network service. +#[derive(Debug, Clone)] +pub enum LowLevelEvent { + TransportConnecting { + connection_id: ConnectionId, + expected_peer_id: PeerId, + }, + TransportConnectingAbort { + connection_id: ConnectionId, + expected_peer_id: PeerId, + }, + TransportConnected { + connection_id: ConnectionId, + peer_id: PeerId, + }, + TransportDisconnected { + connection_id: ConnectionId, + peer_id: PeerId, + }, + BlockAnnounce { + peer_id: PeerId, + announce: service::EncodedBlockAnnounce, + }, + GrandpaNeighborPacket { + peer_id: PeerId, + finalized_block_height: u64, + }, + /// Received a GrandPa commit message from the network. + GrandpaCommitMessage { + peer_id: PeerId, + message: service::EncodedGrandpaCommitMessage, + }, +} + /// Event that can happen on the network service. #[derive(Debug, Clone)] pub enum HighLevelEvent { @@ -607,6 +643,31 @@ pub enum HighLevelEvent { }, } +impl From for Option { + fn from(event: LowLevelEvent) -> Self { + // TODO: + match event { + LowLevelEvent::BlockAnnounce { peer_id, announce } => { + Some(HighLevelEvent::BlockAnnounce { peer_id, announce }) + } + LowLevelEvent::GrandpaCommitMessage { peer_id, message } => { + Some(HighLevelEvent::GrandpaCommitMessage { peer_id, message }) + } + LowLevelEvent::GrandpaNeighborPacket { + peer_id, + finalized_block_height, + } => Some(HighLevelEvent::GrandpaNeighborPacket { + peer_id, + finalized_block_height, + }), + LowLevelEvent::TransportConnecting { .. } + | LowLevelEvent::TransportConnectingAbort { .. } + | LowLevelEvent::TransportConnected { .. } + | LowLevelEvent::TransportDisconnected { .. } => None, + } + } +} + /// Error returned by [`NetworkServiceChain::blocks_request`]. #[derive(Debug, derive_more::Display)] pub enum BlocksRequestError { From 2d281ed321324be24e2dc77a8146d8fc32f5b641 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 11:26:57 +0100 Subject: [PATCH 3/7] Wrap networking service events receiver around HighLevelEventsReceiver --- light-base/src/network_service.rs | 43 +++++++++++++++++------ light-base/src/sync_service/parachain.rs | 5 ++- light-base/src/sync_service/standalone.rs | 5 ++- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 693c58f654..e2ea435cf0 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -294,25 +294,25 @@ pub enum BanSeverity { impl NetworkServiceChain { /// Subscribes to the networking events that happen on the given chain. /// - /// Calling this function returns a `Receiver` that receives events about the chain. - /// The new channel will immediately receive events about all the existing connections, so - /// that it is able to maintain a coherent view of the network. + /// Calling this function returns a [`HighLevelEventsReceiver`] that receives events about + /// the chain. The new channel will immediately receive events about all the existing + /// connections, so that it is able to maintain a coherent view of the network. /// /// Note that this function is `async`, but it should return very quickly. /// - /// The `Receiver` **must** be polled continuously. When the channel is full, the networking - /// connections will be back-pressured until the channel isn't full anymore. + /// The [`HighLevelEventsReceiver`] **must** be polled continuously. When the channel is + /// full, the networking connections will be back-pressured until the channel isn't full + /// anymore. /// - /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed. + /// The [`HighLevelEventsReceiver`] never yields `None` unless the [`NetworkService`] crashes. /// If `None` is yielded and the [`NetworkService`] is still alive, you should call - /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`. + /// [`NetworkServiceChain::subscribe`] again to obtain a new [`HighLevelEventsReceiver`]. /// /// # Panic /// /// Panics if the given [`ChainId`] is invalid. /// - // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour - pub async fn subscribe(&self) -> async_channel::Receiver { + pub async fn subscribe(&self) -> HighLevelEventsReceiver { let (tx, rx) = async_channel::bounded(128); let _ = self @@ -321,7 +321,10 @@ impl NetworkServiceChain { .await .unwrap(); - rx + HighLevelEventsReceiver { + inner: rx, + _keep_alive_messages_tx: self._keep_alive_messages_tx.clone(), + } } /// Starts asynchronously disconnecting the given peer. A [`HighLevelEvent::Disconnected`] @@ -668,6 +671,26 @@ impl From for Option { } } +/// Channel returned by [`NetworkServiceChain::subscribe`] +#[pin_project::pin_project] +pub struct HighLevelEventsReceiver { + #[pin] + inner: async_channel::Receiver, + + /// Dummy channel used to keep the chain alive. + _keep_alive_messages_tx: async_channel::Sender>, +} + +impl HighLevelEventsReceiver { + /// Returns the next event. + /// + /// Returns `None` if the network service has crashed. + pub async fn next(self: Pin<&mut Self>) -> Option { + let mut this = self.project(); + this.inner.next().await + } +} + /// Error returned by [`NetworkServiceChain::blocks_request`]. #[derive(Debug, derive_more::Display)] pub enum BlocksRequestError { diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index ecb8d45126..421c36ec42 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -110,8 +110,7 @@ struct ParachainBackgroundTask { network_service: Arc>, /// Events coming from the networking service. `None` if not subscribed yet. - from_network_service: - Option>>>, + from_network_service: Option>>>, /// Runtime service of the relay chain. relay_chain_sync: Arc>, @@ -272,7 +271,7 @@ impl ParachainBackgroundTask { .or(async { if is_relaychain_subscribed { if let Some(from_network_service) = self.from_network_service.as_mut() { - match from_network_service.next().await { + match from_network_service.as_mut().next().await { Some(ev) => WakeUpReason::NetworkEvent(ev), None => { self.from_network_service = None; diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index f588f93042..789ee5cef1 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -175,7 +175,7 @@ pub(super) async fn start_standalone_chain( let wake_up_reason = { async { if let Some(from_network_service) = task.from_network_service.as_mut() { - match from_network_service.next().await { + match from_network_service.as_mut().next().await { Some(ev) => WakeUpReason::NetworkEvent(ev), None => { task.from_network_service = None; @@ -913,8 +913,7 @@ struct Task { /// Chain of the network service. Used to send out requests to peers. network_service: Arc>, /// Events coming from the networking service. `None` if not subscribed yet. - from_network_service: - Option>>>, + from_network_service: Option>>>, /// List of requests currently in progress. pending_requests: stream::FuturesUnordered< From 53220249db8e1df45cc7bec05e10d56682f36b7e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 11:32:33 +0100 Subject: [PATCH 4/7] Make the networking service channel yield LowLevelEvent --- light-base/src/network_service.rs | 65 ++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index e2ea435cf0..0c38d641f2 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -604,6 +604,16 @@ pub enum LowLevelEvent { connection_id: ConnectionId, peer_id: PeerId, }, + // TODO: events below should be cleaned up + GossipConnected { + peer_id: PeerId, + role: Role, + best_block_number: u64, + best_block_hash: [u8; 32], + }, + GossipDisconnected { + peer_id: PeerId, + }, BlockAnnounce { peer_id: PeerId, announce: service::EncodedBlockAnnounce, @@ -650,6 +660,20 @@ impl From for Option { fn from(event: LowLevelEvent) -> Self { // TODO: match event { + LowLevelEvent::GossipConnected { + peer_id, + role, + best_block_number, + best_block_hash, + } => Some(HighLevelEvent::Connected { + peer_id, + role, + best_block_number, + best_block_hash, + }), + LowLevelEvent::GossipDisconnected { peer_id } => { + Some(HighLevelEvent::Disconnected { peer_id }) + } LowLevelEvent::BlockAnnounce { peer_id, announce } => { Some(HighLevelEvent::BlockAnnounce { peer_id, announce }) } @@ -675,7 +699,7 @@ impl From for Option { #[pin_project::pin_project] pub struct HighLevelEventsReceiver { #[pin] - inner: async_channel::Receiver, + inner: async_channel::Receiver, /// Dummy channel used to keep the chain alive. _keep_alive_messages_tx: async_channel::Sender>, @@ -687,7 +711,14 @@ impl HighLevelEventsReceiver { /// Returns `None` if the network service has crashed. pub async fn next(self: Pin<&mut Self>) -> Option { let mut this = self.project(); - this.inner.next().await + loop { + let event = this.inner.next().await?; + // The channel receives "low-level" events. If the low-level event corresponds to + // a high-level event, return it, otherwise continue looping. + if let Some(high_level_event) = Option::::from(event) { + return Some(high_level_event); + } + } } } @@ -757,7 +788,7 @@ enum ToBackground { enum ToBackgroundChain { RemoveChain, Subscribe { - sender: async_channel::Sender, + sender: async_channel::Sender, }, DisconnectAndBan { peer_id: PeerId, @@ -874,7 +905,7 @@ struct BackgroundTask { important_nodes: HashSet, /// Event about to be sent on the senders of [`BackgroundTask::event_senders`]. - event_pending_send: Option<(ChainId, HighLevelEvent)>, + event_pending_send: Option<(ChainId, LowLevelEvent)>, /// Sending events through the public API. /// @@ -882,10 +913,10 @@ struct BackgroundTask { /// the senders back once it is finished. // TODO: sort by ChainId instead of using a Vec? event_senders: either::Either< - Vec<(ChainId, async_channel::Sender)>, + Vec<(ChainId, async_channel::Sender)>, Pin< Box< - dyn future::Future)>> + dyn future::Future)>> + Send, >, >, @@ -893,7 +924,7 @@ struct BackgroundTask { /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list. /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders. - pending_new_subscriptions: Vec<(ChainId, async_channel::Sender)>, + pending_new_subscriptions: Vec<(ChainId, async_channel::Sender)>, main_messages_rx: Pin>>>, @@ -1220,7 +1251,7 @@ async fn background_task(mut task: BackgroundTask) { } let _ = new_subscription - .send(HighLevelEvent::Connected { + .send(LowLevelEvent::GossipConnected { peer_id: peer_id.clone(), role: state.role, best_block_number: state.best_block_number, @@ -1230,7 +1261,7 @@ async fn background_task(mut task: BackgroundTask) { if let Some(finalized_block_height) = state.finalized_block_height { let _ = new_subscription - .send(HighLevelEvent::GrandpaNeighborPacket { + .send(LowLevelEvent::GrandpaNeighborPacket { peer_id: peer_id.clone(), finalized_block_height, }) @@ -1363,7 +1394,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = - Some((chain_id, HighLevelEvent::Disconnected { peer_id })); + Some((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); } } WakeUpReason::MessageForChain( @@ -1947,10 +1978,8 @@ async fn background_task(mut task: BackgroundTask) { } debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some(( - chain_id, - HighLevelEvent::BlockAnnounce { peer_id, announce }, - )); + task.event_pending_send = + Some((chain_id, LowLevelEvent::BlockAnnounce { peer_id, announce })); } WakeUpReason::NetworkEvent(service::Event::GossipConnected { peer_id, @@ -1985,7 +2014,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some(( chain_id, - HighLevelEvent::Connected { + LowLevelEvent::GossipConnected { peer_id, role, best_block_number: best_number, @@ -2094,7 +2123,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = - Some((chain_id, HighLevelEvent::Disconnected { peer_id })); + Some((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); } WakeUpReason::NetworkEvent(service::Event::RequestResult { substream_id, @@ -2536,7 +2565,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some(( chain_id, - HighLevelEvent::GrandpaNeighborPacket { + LowLevelEvent::GrandpaNeighborPacket { peer_id, finalized_block_height: state.commit_finalized_height, }, @@ -2560,7 +2589,7 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some(( chain_id, - HighLevelEvent::GrandpaCommitMessage { peer_id, message }, + LowLevelEvent::GrandpaCommitMessage { peer_id, message }, )); } WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => { From 935ece6f27d23ccc25b3f01fd6c237e0d26b65c8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 11:46:15 +0100 Subject: [PATCH 5/7] Add `subscribe_low_level` --- light-base/src/network_service.rs | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 0c38d641f2..0f995ac79f 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -327,6 +327,23 @@ impl NetworkServiceChain { } } + /// Similar to [`NetworkServiceChain::subscribe`], but returns [`LowLevelEvent`]s instead of + /// [`HighLevelEvent`]s. + pub async fn subscribe_low_level(&self) -> LowLevelEventsReceiver { + let (tx, rx) = async_channel::bounded(128); + + let _ = self + .messages_tx + .send(ToBackgroundChain::Subscribe { sender: tx }) + .await + .unwrap(); + + LowLevelEventsReceiver { + inner: rx, + _keep_alive_messages_tx: self._keep_alive_messages_tx.clone(), + } + } + /// Starts asynchronously disconnecting the given peer. A [`HighLevelEvent::Disconnected`] /// will later be generated. Prevents a new gossip link with the same peer from being reopened /// for a little while. @@ -629,6 +646,26 @@ pub enum LowLevelEvent { }, } +/// Channel returned by [`NetworkServiceChain::subscribe_low_level`] +#[pin_project::pin_project] +pub struct LowLevelEventsReceiver { + #[pin] + inner: async_channel::Receiver, + + /// Dummy channel used to keep the chain alive. + _keep_alive_messages_tx: async_channel::Sender>, +} + +impl LowLevelEventsReceiver { + /// Returns the next event. + /// + /// Returns `None` if the network service has crashed. + pub async fn next(self: Pin<&mut Self>) -> Option { + let mut this = self.project(); + this.inner.next().await + } +} + /// Event that can happen on the network service. #[derive(Debug, Clone)] pub enum HighLevelEvent { From 14fc2f293320022615c539901beb8726d5844a60 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 23 Jan 2024 13:47:17 +0100 Subject: [PATCH 6/7] `events_pending_send` is now a `VecDeque` --- light-base/src/network_service.rs | 44 +++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 0f995ac79f..38c399c673 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -48,7 +48,7 @@ use crate::{ use alloc::{ borrow::ToOwned as _, boxed::Box, - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, format, string::{String, ToString as _}, sync::Arc, @@ -191,7 +191,7 @@ impl NetworkService { next_recent_connection_restore: None, platform: config.platform.clone(), open_gossip_links: BTreeMap::new(), - event_pending_send: None, + events_pending_send: VecDeque::new(), // TODO: capacity? event_senders: either::Left(Vec::new()), pending_new_subscriptions: Vec::new(), important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()), @@ -941,8 +941,8 @@ struct BackgroundTask { // TODO: should also detect whenever we fail to open a block announces substream with any of these peers important_nodes: HashSet, - /// Event about to be sent on the senders of [`BackgroundTask::event_senders`]. - event_pending_send: Option<(ChainId, LowLevelEvent)>, + /// Events about to be sent on the senders of [`BackgroundTask::event_senders`]. + events_pending_send: VecDeque<(ChainId, LowLevelEvent)>, /// Sending events through the public API. /// @@ -1074,7 +1074,7 @@ async fn background_task(mut task: BackgroundTask) { } }; let service_event = async { - if let Some(event) = (task.event_pending_send.is_none() + if let Some(event) = (task.events_pending_send.is_empty() && task.pending_new_subscriptions.is_empty()) .then(|| task.network.next_event()) .flatten() @@ -1172,7 +1172,7 @@ async fn background_task(mut task: BackgroundTask) { let event_senders = event_sending_future.await; task.event_senders = either::Left(event_senders); WakeUpReason::EventSendersReady - } else if task.event_pending_send.is_some() + } else if !task.events_pending_send.is_empty() || !task.pending_new_subscriptions.is_empty() { WakeUpReason::EventSendersReady @@ -1256,7 +1256,7 @@ async fn background_task(mut task: BackgroundTask) { }; if let Some((event_to_dispatch_chain_id, event_to_dispatch)) = - task.event_pending_send.take() + task.events_pending_send.pop_front() { let mut event_senders = mem::take(event_senders); task.event_senders = either::Right(Box::pin(async move { @@ -1429,9 +1429,9 @@ async fn background_task(mut task: BackgroundTask) { let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone())); debug_assert!(_was_in.is_some()); - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = - Some((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send + .push_back((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); } } WakeUpReason::MessageForChain( @@ -2014,9 +2014,9 @@ async fn background_task(mut task: BackgroundTask) { } } - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = - Some((chain_id, LowLevelEvent::BlockAnnounce { peer_id, announce })); + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send + .push_back((chain_id, LowLevelEvent::BlockAnnounce { peer_id, announce })); } WakeUpReason::NetworkEvent(service::Event::GossipConnected { peer_id, @@ -2048,8 +2048,8 @@ async fn background_task(mut task: BackgroundTask) { ); debug_assert!(_prev_value.is_none()); - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some(( + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send.push_back(( chain_id, LowLevelEvent::GossipConnected { peer_id, @@ -2158,9 +2158,9 @@ async fn background_task(mut task: BackgroundTask) { ); } - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = - Some((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send + .push_back((chain_id, LowLevelEvent::GossipDisconnected { peer_id })); } WakeUpReason::NetworkEvent(service::Event::RequestResult { substream_id, @@ -2599,8 +2599,8 @@ async fn background_task(mut task: BackgroundTask) { .unwrap() .finalized_block_height = Some(state.commit_finalized_height); - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some(( + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send.push_back(( chain_id, LowLevelEvent::GrandpaNeighborPacket { peer_id, @@ -2623,8 +2623,8 @@ async fn background_task(mut task: BackgroundTask) { target_block_hash = HashDisplay(message.decode().target_hash), ); - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some(( + debug_assert!(task.events_pending_send.is_empty()); + task.events_pending_send.push_back(( chain_id, LowLevelEvent::GrandpaCommitMessage { peer_id, message }, )); From adae6ceba530add7e4132c6dda918d2f595de9ec Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Jan 2024 15:16:38 +0100 Subject: [PATCH 7/7] Generate transport low-level events --- lib/src/network/basic_peering_strategy.rs | 37 +++++ lib/src/network/service.rs | 47 ++++++ light-base/src/network_service.rs | 166 +++++++++++++++++++++- 3 files changed, 244 insertions(+), 6 deletions(-) diff --git a/lib/src/network/basic_peering_strategy.rs b/lib/src/network/basic_peering_strategy.rs index 9b71593b7a..a3070c6192 100644 --- a/lib/src/network/basic_peering_strategy.rs +++ b/lib/src/network/basic_peering_strategy.rs @@ -434,6 +434,43 @@ where } } + /// Returns `true` if the peer belongs to the given chain. + /// + /// Also returns `true` if the peer is banned.s + pub fn peer_in_chain(&self, peer_id: &PeerId, chain_id: &TChainId) -> bool { + let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else { + // If the `PeerId` is unknown, it means it doesn't belong to any chain. + return false; + }; + + let Some(&chain_index) = self.chains_indices.get(chain_id) else { + // If the `TChainId` is unknown, it means that it doesn't have any peer. + return false; + }; + + self.peers_chains + .contains_key(&(peer_id_index, chain_index)) + } + + /// Returns the list of chains that the given peer belongs to. + /// + /// Includes chains where the peer is banned. + pub fn peer_chains_unordered( + &'_ self, + peer_id: &PeerId, + ) -> impl Iterator + '_ { + let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else { + // If the `PeerId` is unknown, it means it doesn't belong to any chain. + return either::Right(iter::empty()); + }; + + either::Left( + self.peers_chains + .range((peer_id_index, usize::MIN)..(peer_id_index + 1, usize::MIN)) + .map(|((_, chain_index), _)| &self.chains[*chain_index]), + ) + } + /// Returns the list of all addresses that have been inserted for the given peer. pub fn peer_addresses(&'_ self, peer_id: &PeerId) -> impl Iterator + '_ { let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else { diff --git a/lib/src/network/service.rs b/lib/src/network/service.rs index 461cb76949..22d389b03e 100644 --- a/lib/src/network/service.rs +++ b/lib/src/network/service.rs @@ -1118,6 +1118,53 @@ where &self.inner[id].address } + /// Returns the list of fully-established connections (i.e. whose handshake has finished) + /// with the given peer. + pub fn established_connections( + &'_ self, + peer_id: &PeerId, + ) -> impl Iterator + '_ { + let Some(&peer_index) = self.peers_by_peer_id.get(peer_id) else { + return either::Right(iter::empty()); + }; + + either::Left( + self.connections_by_peer_id + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .filter(|(_, connection_id)| { + // TODO: what about shutting_down? + self.inner.connection_state(*connection_id).established + }) + .map(|&(_, connection_id)| connection_id), + ) + } + + /// Returns the list of connections that are still in their handshaking phase but are expected + /// to reach the given peer. + pub fn potential_connections( + &'_ self, + peer_id: &PeerId, + ) -> impl Iterator + '_ { + let Some(&peer_index) = self.peers_by_peer_id.get(peer_id) else { + return either::Right(iter::empty()); + }; + + either::Left( + self.connections_by_peer_id + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .filter(|(_, connection_id)| { + !self.inner.connection_state(*connection_id).established + }) + .map(|&(_, connection_id)| connection_id), + ) + } + /// Returns the number of connections with the given peer. /// /// Both connections that have and have not finished their handshaking phase are considered. diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 38c399c673..ff06184f24 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -608,6 +608,7 @@ pub enum LowLevelEvent { TransportConnecting { connection_id: ConnectionId, expected_peer_id: PeerId, + target_multiaddr: Vec, }, TransportConnectingAbort { connection_id: ConnectionId, @@ -1757,6 +1758,8 @@ async fn background_task(mut task: BackgroundTask) { .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant } + + // TODO: code here is very incomplete and should DRY with Kademlia find node responses } } WakeUpReason::MessageForChain( @@ -1849,10 +1852,12 @@ async fn background_task(mut task: BackgroundTask) { WakeUpReason::NetworkEvent(service::Event::HandshakeFinished { peer_id, expected_peer_id, - id, + id: connection_id, }) => { let remote_addr = - Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap + Multiaddr::from_bytes(task.network.connection_remote_addr(connection_id)) + .unwrap(); // TODO: review this unwrap + if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id) { log!( @@ -1877,6 +1882,48 @@ async fn background_task(mut task: BackgroundTask) { remote_addr.into_bytes().to_vec(), 10, ); + + for &chain_id in task.peering_strategy.peer_chains_unordered(&peer_id) { + if !task + .peering_strategy + .peer_in_chain(&expected_peer_id, &chain_id) + { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnecting { + connection_id, + expected_peer_id: peer_id.clone(), + target_multiaddr: task + .network + .connection_remote_addr(connection_id) + .to_owned(), + }, + )); + } + + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnected { + connection_id, + peer_id: peer_id.clone(), + }, + )); + } + + for &chain_id in task + .peering_strategy + .peer_chains_unordered(&expected_peer_id) + { + if !task.peering_strategy.peer_in_chain(&peer_id, &chain_id) { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnectingAbort { + connection_id, + expected_peer_id: expected_peer_id.clone(), + }, + )); + } + } } else { log!( &task.platform, @@ -1886,13 +1933,26 @@ async fn background_task(mut task: BackgroundTask) { remote_addr, peer_id ); + + for &chain_id in task.peering_strategy.peer_chains_unordered(&peer_id) { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnected { + connection_id, + peer_id: peer_id.clone(), + }, + )); + } } } WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { + id: connection_id, expected_peer_id: Some(_), .. }) - | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => { + | WakeUpReason::NetworkEvent(service::Event::Disconnected { + id: connection_id, .. + }) => { let (address, peer_id, handshake_finished) = match wake_up_reason { WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { address, @@ -1921,6 +1981,23 @@ async fn background_task(mut task: BackgroundTask) { ?handshake_finished ); + for &chain_id in task.peering_strategy.peer_chains_unordered(&peer_id) { + task.events_pending_send.push_back(( + chain_id, + if handshake_finished { + LowLevelEvent::TransportDisconnected { + connection_id, + peer_id: peer_id.clone(), + } + } else { + LowLevelEvent::TransportConnectingAbort { + connection_id, + expected_peer_id: peer_id.clone(), + } + }, + )); + } + // Ban the peer in order to avoid trying over and over again the same address(es). // Even if the handshake was finished, it is possible that the peer simply shuts // down connections immediately after it has been opened, hence the ban. @@ -2420,6 +2497,29 @@ async fn background_task(mut task: BackgroundTask) { chain = &task.network[chain_id].log_name, peer_id = peer_removed, ); + + for connection_id in + task.network.potential_connections(&peer_removed) + { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnectingAbort { + connection_id, + expected_peer_id: peer_removed.clone(), + }, + )); + } + for connection_id in + task.network.established_connections(&peer_removed) + { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportDisconnected { + connection_id, + peer_id: peer_removed.clone(), + }, + )); + } } log!( @@ -2432,6 +2532,40 @@ async fn background_task(mut task: BackgroundTask) { addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::>(), // TODO: better formatting? obtained_from = requestee_peer_id ); + + for connection_id in task.network.established_connections(&peer_id) { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnecting { + connection_id, + target_multiaddr: task + .network + .connection_remote_addr(connection_id) + .to_owned(), + expected_peer_id: peer_id.clone(), + }, + )); + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnected { + connection_id, + peer_id: peer_id.clone(), + }, + )); + } + for connection_id in task.network.potential_connections(&peer_id) { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnecting { + connection_id, + target_multiaddr: task + .network + .connection_remote_addr(connection_id) + .to_owned(), + expected_peer_id: peer_id.clone(), + }, + )); + } } } @@ -2665,7 +2799,7 @@ async fn background_task(mut task: BackgroundTask) { task.num_recent_connection_opening.saturating_sub(1); } WakeUpReason::CanStartConnect(expected_peer_id) => { - let Some(multiaddr) = task + let Some(multiaddr_bytes) = task .peering_strategy .pick_address_and_add_connection(&expected_peer_id) else { @@ -2698,7 +2832,7 @@ async fn background_task(mut task: BackgroundTask) { continue; }; - let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) { + let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr_bytes.to_owned()) { Ok(a) => a, Err((multiaddr::FromBytesError, addr)) => { // Address is in an invalid format. @@ -2765,7 +2899,7 @@ async fn background_task(mut task: BackgroundTask) { async_channel::bounded(8); let task_name = format!("connection-{}", multiaddr); - match address { + let connection_id = match address { address_parse::AddressOrMultiStreamAddress::Address(address) => { // As documented in the `PlatformRef` trait, `connect_stream` must // return as soon as possible. @@ -2795,6 +2929,8 @@ async fn background_task(mut task: BackgroundTask) { task.tasks_messages_tx.clone(), ), ); + + connection_id } address_parse::AddressOrMultiStreamAddress::MultiStreamAddress( platform::MultiStreamAddress::WebRtc { @@ -2852,7 +2988,25 @@ async fn background_task(mut task: BackgroundTask) { task.tasks_messages_tx.clone(), ), ); + + connection_id } + }; + + // Notify susbcribers of the new connection. + let multiaddr = multiaddr_bytes.to_owned(); + for &chain_id in task + .peering_strategy + .peer_chains_unordered(&expected_peer_id) + { + task.events_pending_send.push_back(( + chain_id, + LowLevelEvent::TransportConnecting { + connection_id, + target_multiaddr: multiaddr.clone(), + expected_peer_id: expected_peer_id.clone(), + }, + )); } } WakeUpReason::CanOpenGossip(peer_id, chain_id) => {