diff --git a/lib/src/libp2p/connection/established/substream.rs b/lib/src/libp2p/connection/established/substream.rs index 47d1380cc9..bbb1ef7cb0 100644 --- a/lib/src/libp2p/connection/established/substream.rs +++ b/lib/src/libp2p/connection/established/substream.rs @@ -1469,6 +1469,7 @@ pub enum InboundTy { /// Maximum allowed size of the request. /// If `None`, then no data is expected on the substream, not even the length of the /// request. + // TODO: use a proper enum request_max_size: Option, }, Notifications { diff --git a/lib/src/network.rs b/lib/src/network.rs index 7c31fd0210..7f804da93f 100644 --- a/lib/src/network.rs +++ b/lib/src/network.rs @@ -23,3 +23,4 @@ pub mod kademlia; pub mod protocol; pub mod service; +pub mod service2; diff --git a/lib/src/network/protocol.rs b/lib/src/network/protocol.rs index f199884136..acb41ca7fd 100644 --- a/lib/src/network/protocol.rs +++ b/lib/src/network/protocol.rs @@ -24,6 +24,9 @@ // TODO: expand docs +use alloc::borrow::Cow; +use core::iter; + // Implementation note: each protocol goes into a different sub-module whose content is // re-exported here. @@ -44,3 +47,237 @@ pub use self::identify::*; pub use self::kademlia::*; pub use self::state_request::*; pub use self::storage_call_proof::*; + +pub enum ProtocolName<'a> { + Identify, + Ping, + BlockAnnounces { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + Transactions { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + Grandpa { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + Sync { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + Light { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + Kad { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + SyncWarp { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, + State { + genesis_hash: [u8; 32], + fork_id: Option<&'a str>, + }, +} + +pub fn encode_protocol_name( + protocol: ProtocolName<'_>, +) -> impl Iterator + '_> + '_ { + let (genesis_hash, fork_id, base_protocol_name) = match protocol { + ProtocolName::Identify => return either::Left(iter::once(Cow::Borrowed("/ipfs/id/1.0.0"))), + ProtocolName::Ping => return either::Left(iter::once(Cow::Borrowed("/ipfs/ping/1.0.0"))), + ProtocolName::BlockAnnounces { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "block-announces/1"), + ProtocolName::Transactions { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "transactions/1"), + ProtocolName::Grandpa { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "grandpa/1"), + ProtocolName::Sync { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "sync/2"), + ProtocolName::Light { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "light/2"), + ProtocolName::Kad { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "kad"), + ProtocolName::SyncWarp { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "sync/warp"), + ProtocolName::State { + genesis_hash, + fork_id, + } => (genesis_hash, fork_id, "state/2"), + }; + + let genesis_hash = hex::encode(&genesis_hash); + + if let Some(fork_id) = fork_id { + either::Right(either::Right( + [ + Cow::Borrowed("/"), + Cow::Owned(genesis_hash), + Cow::Borrowed("/"), + Cow::Borrowed(fork_id), + Cow::Borrowed("/"), + Cow::Borrowed(base_protocol_name), + ] + .into_iter(), + )) + } else { + either::Right(either::Left( + [ + Cow::Borrowed("/"), + Cow::Owned(genesis_hash), + Cow::Borrowed("/"), + Cow::Borrowed(base_protocol_name), + ] + .into_iter(), + )) + } +} + +pub fn encode_protocol_name_string(protocol: ProtocolName<'_>) -> String { + encode_protocol_name(protocol).fold(String::with_capacity(128), |mut a, b| { + a.push_str(b.as_ref()); + a + }) +} + +/// Decodes a protocol name into its components. +/// +/// Returns an error if the protocol name isn't recognized. +pub fn decode_protocol_name(name: &str) -> Result { + nom::combinator::all_consuming(nom::branch::alt(( + nom::combinator::map(nom::bytes::complete::tag("/ipfs/id/1.0.0"), |_| { + ProtocolName::Identify + }), + nom::combinator::map(nom::bytes::complete::tag("/ipfs/ping/1.0.0"), |_| { + ProtocolName::Ping + }), + nom::combinator::map( + nom::sequence::tuple(( + nom::bytes::complete::tag("/"), + genesis_hash, + nom::bytes::complete::tag("/"), + protocol_ty, + )), + |(_, genesis_hash, _, protocol_ty)| { + protocol_ty_to_real_protocol(protocol_ty, genesis_hash, None) + }, + ), + nom::combinator::map( + nom::sequence::tuple(( + nom::bytes::complete::tag("/"), + genesis_hash, + nom::bytes::complete::tag("/"), + nom::bytes::complete::take_until("/"), + nom::bytes::complete::tag("/"), + protocol_ty, + )), + |(_, genesis_hash, _, fork_id, _, protocol_ty)| { + protocol_ty_to_real_protocol(protocol_ty, genesis_hash, Some(fork_id)) + }, + ), + )))(name) + .map(|(_, parse_result)| parse_result) + .map_err(|_| ()) +} + +fn genesis_hash(name: &str) -> nom::IResult<&str, [u8; 32]> { + nom::combinator::map_opt(nom::bytes::complete::take(64u32), |hash| { + hex::decode(&hash) + .ok() + .map(|hash| <[u8; 32]>::try_from(hash).unwrap_or_else(|_| unreachable!())) + })(name) +} + +enum ProtocolTy { + BlockAnnounces, + Transactions, + Grandpa, + Sync, + Light, + Kad, + SyncWarp, + State, +} + +fn protocol_ty(name: &str) -> nom::IResult<&str, ProtocolTy> { + nom::branch::alt(( + nom::combinator::map(nom::bytes::complete::tag("block-announces/1"), |_| { + ProtocolTy::BlockAnnounces + }), + nom::combinator::map(nom::bytes::complete::tag("transactions/1"), |_| { + ProtocolTy::Transactions + }), + nom::combinator::map(nom::bytes::complete::tag("grandpa/1"), |_| { + ProtocolTy::Grandpa + }), + nom::combinator::map(nom::bytes::complete::tag("sync/2"), |_| ProtocolTy::Sync), + nom::combinator::map(nom::bytes::complete::tag("light/2"), |_| ProtocolTy::Light), + nom::combinator::map(nom::bytes::complete::tag("kad"), |_| ProtocolTy::Kad), + nom::combinator::map(nom::bytes::complete::tag("sync/warp"), |_| { + ProtocolTy::SyncWarp + }), + nom::combinator::map(nom::bytes::complete::tag("state/2"), |_| ProtocolTy::State), + ))(name) +} + +fn protocol_ty_to_real_protocol( + ty: ProtocolTy, + genesis_hash: [u8; 32], + fork_id: Option<&str>, +) -> ProtocolName { + match ty { + ProtocolTy::BlockAnnounces => ProtocolName::BlockAnnounces { + genesis_hash, + fork_id, + }, + ProtocolTy::Transactions => ProtocolName::Transactions { + genesis_hash, + fork_id, + }, + ProtocolTy::Grandpa => ProtocolName::Grandpa { + genesis_hash, + fork_id, + }, + ProtocolTy::Sync => ProtocolName::Sync { + genesis_hash, + fork_id, + }, + ProtocolTy::Light => ProtocolName::Light { + genesis_hash, + fork_id, + }, + ProtocolTy::Kad => ProtocolName::Kad { + genesis_hash, + fork_id, + }, + ProtocolTy::SyncWarp => ProtocolName::SyncWarp { + genesis_hash, + fork_id, + }, + ProtocolTy::State => ProtocolName::State { + genesis_hash, + fork_id, + }, + } +} + +// TODO: tests for the protocol names diff --git a/lib/src/network/service2.rs b/lib/src/network/service2.rs new file mode 100644 index 0000000000..2cac098a26 --- /dev/null +++ b/lib/src/network/service2.rs @@ -0,0 +1,2781 @@ +// Smoldot +// Copyright (C) 2023 Pierre Krieger +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::libp2p::collection; +use crate::network::{kademlia, protocol}; +use crate::util::{self, SipHasherBuild}; + +use alloc::{ + collections::{BTreeSet, VecDeque}, + format, + string::String, + vec::Vec, +}; +use core::{ + fmt, + hash::Hash, + iter, + num::NonZeroUsize, + ops::{Add, Sub}, + time::Duration, +}; +use rand_chacha::rand_core::{RngCore as _, SeedableRng as _}; + +pub use crate::libp2p::{ + collection::{ + ConnectionId, ConnectionToCoordinator, CoordinatorToConnection, MultiStreamConnectionTask, + ReadWrite, RequestError, SingleStreamConnectionTask, SubstreamId, + }, + connection::noise::{self, NoiseKey}, + multiaddr::{self, Multiaddr}, + peer_id::{self, PeerId}, +}; + +pub use crate::network::protocol::Role; + +mod addresses; +mod requests_responses; + +pub use requests_responses::KademliaFindNodeError; + +/// Configuration for a [`ChainNetwork`]. +pub struct Config { + /// Capacity to initially reserve to the list of connections. + pub connections_capacity: usize, + + /// Capacity to initially reserve to the list of peers. + // TODO: remove? + pub peers_capacity: usize, + + /// Capacity to reserve for the list of chains. + pub chains_capacity: usize, + + /// Seed for the randomness within the networking state machine. + /// + /// While this seed influences the general behavior of the networking state machine, it + /// notably isn't used when generating the ephemeral key used for the Diffie-Hellman + /// handshake. + /// This is a defensive measure against users passing a dummy seed instead of actual entropy. + pub randomness_seed: [u8; 32], + + /// Key used for the encryption layer. + /// This is a Noise static key, according to the Noise specification. + /// Signed using the actual libp2p key. + pub noise_key: NoiseKey, + + /// Amount of time after which a connection handshake is considered to have taken too long + /// and must be aborted. + pub handshake_timeout: Duration, + + /// Maximum number of addresses kept in memory per network identity. + /// + /// > **Note**: As the number of network identities kept in memory is capped, having a + /// > maximum number of addresses per peer ensures that the total number of + /// > addresses is capped as well. + // TODO: remove? + pub max_addresses_per_peer: NonZeroUsize, +} + +/// Configuration for a specific overlay network. +/// +/// See [`Config::chains`]. +pub struct ChainConfig { + /// Hash of the genesis block (i.e. block number 0) according to the local node. + pub genesis_hash: [u8; 32], + + /// Optional identifier to insert into the networking protocol names. Used to differentiate + /// between chains with the same genesis hash. + /// + /// > **Note**: This value is typically found in the specification of the chain (the + /// > "chain spec"). + pub fork_id: Option, + + /// Number of bytes of the block number in the networking protocol. + pub block_number_bytes: usize, + + /// If `Some`, the chain uses the GrandPa networking protocol. + pub grandpa_protocol_config: Option, + + /// `true` if incoming block requests are allowed. + pub allow_inbound_block_requests: bool, + + /// Hash of the best block according to the local node. + pub best_hash: [u8; 32], + /// Height of the best block according to the local node. + pub best_number: u64, + + /// Role of the local node. Sent to the remote nodes and used as a hint. Has no incidence + /// on the behavior of any function. + pub role: Role, +} + +/// Identifier of a chain added through [`ChainNetwork::add_chain`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ChainId(usize); + +/// Identifier for an operation. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct OperationId(u64); + +/// Data structure containing the list of all connections and their latest known state. See also +/// [the module-level documentation](..). +pub struct ChainNetwork { + /// Underlying data structure. + inner: collection::Network, + + /// All the substreams of [`ChainNetwork::inner`], with info attached to them. + // TODO: add a substream user data to `collection::Network` instead + substreams: hashbrown::HashMap, + + /// Connections indexed by the value in [`ConnectionInfo::peer_id`]. + connections_by_peer_id: BTreeSet<(PeerId, collection::ConnectionId)>, + + /// All the outbound notification substreams, indexed by protocol, `PeerId`, and state. + // TODO: unclear whether PeerId should come before or after the state, same for direction/state + notification_substreams_by_peer_id: BTreeSet<( + NotificationsProtocol, + PeerId, + NotificationsSubstreamDirection, + NotificationsSubstreamState, + collection::SubstreamId, + )>, + + /// See [`Config::noise_key`]. + // TODO: make rotatable, see + noise_key: NoiseKey, + + /// See [`Config::max_addresses_per_peer`]. + max_addresses_per_peer: NonZeroUsize, + + /// Contains an entry for each peer present in at least one k-bucket of a chain. + kbuckets_peers: hashbrown::HashMap, + + /// Identifier to assign to the next Kademlia operation that is started. + next_kademlia_operation_id: OperationId, + + /// List of all chains that have been added. + chains: slab::Slab>, + + /// Chains indexed by genesis hash and fork ID. + /// + /// Contains the same number of entries as [`ChainNetwork::chains`]. The values are `usize`s + /// that are indices into [`ChainNetwork::chains`]. + chains_by_protocol_info: + hashbrown::HashMap<([u8; 32], Option), usize, fnv::FnvBuildHasher>, + + /// List of peers that have been marked as desired. Can include peers not connected to the + /// local node yet. + gossip_desired_peers_by_chain: BTreeSet<(usize, PeerId, GossipKind)>, + + /// Subset of peers in [`ChainNetwork::gossip_out_peers_by_chain`] for which no healthy + /// connection exists. + unconnected_desired: hashbrown::HashSet, + + // TODO: doc + connected_unopened_gossip_desired: + hashbrown::HashSet<(PeerId, ChainId, GossipKind), util::SipHasherBuild>, + + /// Generator for randomness. + randomness: rand_chacha::ChaCha20Rng, +} + +struct Chain { + /// See [`ChainConfig::block_number_bytes`]. + block_number_bytes: usize, + + /// See [`ChainConfig::genesis_hash`]. + genesis_hash: [u8; 32], + /// See [`ChainConfig::fork_id`]. + fork_id: Option, + + /// See [`ChainConfig::role`]. + role: Role, + + /// See [`ChainConfig::best_hash`]. + best_hash: [u8; 32], + /// See [`ChainConfig::best_number`]. + best_number: u64, + + /// See [`ChainConfig::grandpa_protocol_config`]. + grandpa_protocol_config: Option, + + /// See [`ChainConfig::allow_inbound_block_requests`]. + allow_inbound_block_requests: bool, + + /// Kademlia k-buckets of this chain. + /// + /// Used in order to hold the list of peers that are known to be part of this chain. + /// + /// A peer is marked as "connected" in the k-buckets when a block announces substream is open + /// and that the remote's handshake is valid (i.e. can be parsed and containing a correct + /// genesis hash), and disconnected when it is closed or that the remote's handshake isn't + /// satisfactory. + kbuckets: kademlia::kbuckets::KBuckets, +} + +/// See [`ChainNetwork::inner`]. +struct ConnectionInfo { + address: multiaddr::Multiaddr, + + /// Identity of the remote. Can be either the expected or the actual identity. + /// + /// `None` if unknown, which can only be the case if the connection is still in its handshake + /// phase. + peer_id: Option, +} + +/// See [`ChainNetwork::substreams`]. +struct SubstreamInfo { + // TODO: substream <-> connection mapping should be provided by collection.rs instead + connection_id: collection::ConnectionId, + /// `true` if we have opened the substream. `false` if the remote has opened the substream. + // TODO: this information should be provided by collection.rs instead + outgoing: bool, + protocol: Protocol, +} + +enum Protocol { + Identify, + Ping, + BlockAnnounces { chain_index: usize }, + Transactions { chain_index: usize }, + Grandpa { chain_index: usize }, + Sync { chain_index: usize }, + LightUnknown { chain_index: usize }, + LightStorage { chain_index: usize }, + LightCall { chain_index: usize }, + Kad { chain_index: usize }, + SyncWarp { chain_index: usize }, + State { chain_index: usize }, +} + +impl Protocol { + // TODO useless? + fn chain_index(&self) -> Option { + match *self { + Protocol::Identify => None, + Protocol::Ping => None, + Protocol::BlockAnnounces { chain_index } => Some(chain_index), + Protocol::Transactions { chain_index } => Some(chain_index), + Protocol::Grandpa { chain_index } => Some(chain_index), + Protocol::Sync { chain_index } => Some(chain_index), + Protocol::LightUnknown { chain_index } => Some(chain_index), + Protocol::LightStorage { chain_index } => Some(chain_index), + Protocol::LightCall { chain_index } => Some(chain_index), + Protocol::Kad { chain_index } => Some(chain_index), + Protocol::SyncWarp { chain_index } => Some(chain_index), + Protocol::State { chain_index } => Some(chain_index), + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum NotificationsProtocol { + BlockAnnounces { chain_index: usize }, + Transactions { chain_index: usize }, + Grandpa { chain_index: usize }, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum NotificationsSubstreamDirection { + In, + Out, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum NotificationsSubstreamState { + Pending, + Open, +} + +struct KBucketsPeer { + /// Number of k-buckets containing this peer. Used to know when to remove this entry. + num_references: NonZeroUsize, + + /// List of addresses known for this peer, and whether we currently have an outgoing connection + /// to each of them. In this context, "connected" means "outgoing connection whose handshake is + /// finished and is not shutting down". + /// + /// It is not possible to have multiple outgoing connections for a single address. + /// Incoming connections are not taken into account at all. + /// + /// An address is marked as pending when there is a "pending connection" (see + /// [`ChainNetwork::pending_ids`]) to it, or if there is an outgoing connection to it that is + /// still handshaking. + /// + /// An address is marked as disconnected as soon as the shutting down is starting. + /// + /// Must never be empty. + addresses: addresses::Addresses, +} + +enum InRequestTy { + Identify { observed_addr: multiaddr::Multiaddr }, + Blocks, +} + +enum OutRequestTy { + Blocks { + checked: Option, + }, + GrandpaWarpSync, + State, + StorageProof, + CallProof, + KademliaFindNode, + KademliaDiscoveryFindNode(OperationId), +} + +impl ChainNetwork +where + TNow: Clone + Add + Sub + Ord, +{ + /// Initializes a new [`ChainNetwork`]. + pub fn new(config: Config) -> Self { + let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed); + + let local_peer_id = PeerId::from_public_key(&peer_id::PublicKey::Ed25519( + *config.noise_key.libp2p_public_ed25519_key(), + )); + + // Maximum number that each remote is allowed to open. + // Note that this maximum doesn't have to be precise. There only needs to be *a* limit + // that is not exaggerately large, and this limit shouldn't be too low as to cause + // legitimate substreams to be refused. + // According to the protocol, a remote can only open one substream of each protocol at + // a time. However, we multiply this value by 2 in order to be generous. We also add 1 + // to account for the ping protocol. + let max_inbound_substreams = chains.len() + * (1 + requests_responses::REQUEST_RESPONSE_PROTOCOLS_PER_CHAIN + + NOTIFICATIONS_PROTOCOLS_PER_CHAIN) + * 2; + + ChainNetwork { + inner: collection::Network::new(collection::Config { + capacity: config.connections_capacity, + max_inbound_substreams, + randomness_seed: { + let mut seed = [0; 32]; + randomness.fill_bytes(&mut seed); + seed + }, + ping_protocol: "/ipfs/ping/1.0.0".into(), + handshake_timeout: config.handshake_timeout, + }), + substreams: hashbrown::HashMap::with_capacity_and_hasher( + config.connections_capacity * 20, // TODO: capacity? + fnv::FnvBuildHasher::default(), + ), + connections_by_peer_id: BTreeSet::new(), + notification_substreams_by_peer_id: BTreeSet::new(), + gossip_desired_peers_by_chain: BTreeSet::new(), + unconnected_desired: hashbrown::HashSet::with_capacity_and_hasher( + config.peers_capacity, + SipHasherBuild::new({ + let mut seed = [0; 16]; + randomness.fill_bytes(&mut seed); + seed + }), + ), + connected_unopened_gossip_desired: hashbrown::HashSet::with_capacity_and_hasher( + config.peers_capacity, + SipHasherBuild::new({ + let mut seed = [0; 16]; + randomness.fill_bytes(&mut seed); + seed + }), + ), + kbuckets_peers: hashbrown::HashMap::with_capacity_and_hasher( + config.peers_capacity, + SipHasherBuild::new({ + let mut seed = [0; 16]; + randomness.fill_bytes(&mut seed); + seed + }), + ), + next_kademlia_operation_id: OperationId(0), + chains: slab::Slab::with_capacity(config.chains_capacity), + chains_by_protocol_info: hashbrown::HashMap::with_capacity_and_hasher( + config.chains_capacity, + Default::default(), + ), + noise_key: config.noise_key, + max_addresses_per_peer: config.max_addresses_per_peer, + randomness, + } + } + + /// Returns the Noise key originally passed as [`Config::noise_key`]. + pub fn noise_key(&self) -> &NoiseKey { + &self.noise_key + } + + /// Adds a chain to the list of chains that is handled by the [`ChainNetwork`]. + /// + /// It is not possible to add a chain if its protocol names would conflict with an existing + /// chain. + pub fn add_chain(&mut self, config: ChainConfig) -> Result { + let chain_entry = self.chains.vacant_entry(); + let chain_id = chain_entry.key(); + + if let hashbrown::hash_map::Entry::Vacant(entry) = self + .chains_by_protocol_info + .entry((config.genesis_hash, config.fork_id)) + { + entry.insert(chain_id); + } else { + return Err(AddChainError::Duplicate); + } + + let local_peer_id = PeerId::from_public_key(&peer_id::PublicKey::Ed25519( + *self.noise_key.libp2p_public_ed25519_key(), + )); + + chain_entry.insert(Chain { + block_number_bytes: config.block_number_bytes, + genesis_hash: config.genesis_hash, + fork_id: config.fork_id, + role: config.role, + best_hash: config.best_hash, + best_number: config.best_number, + allow_inbound_block_requests: config.allow_inbound_block_requests, + grandpa_protocol_config: config.grandpa_protocol_config, + kbuckets: kademlia::kbuckets::KBuckets::new( + local_peer_id, + Duration::from_secs(20), // TODO: hardcoded + ), + }); + + Ok(ChainId(chain_id)) + } + + /// Removes support for a certain chain from the networking. + /// + /// # Panic + /// + /// Panics if the given [`ChainId`] is invalid. + /// + // TODO: what to do? synchronously return a list of closed connections and close the substreams in the background? make the removal asynchronous? + pub fn remove_chain(&mut self, chain_id: ChainId) { + // TODO: hard lol + todo!() + } + + /// Modifies the best block of the local node for the given chain. See + /// [`ChainConfig::best_hash`] and [`ChainConfig::best_number`]. + /// + /// This information is sent when a block announces substream is opened. + /// + /// # Panic + /// + /// Panics if the [`Chainid`] is out of range. + /// + pub fn set_chain_local_best_block( + &mut self, + chain_id: ChainId, + best_hash: [u8; 32], + best_number: u64, + ) { + let chain = &mut self.chains[chain_id.0]; + chain.best_hash = best_hash; + chain.best_number = best_number; + } + + /// Returns the list of all the chains that have been added. + pub fn chains(&'_ self) -> impl Iterator + '_ { + self.chains.iter().map(|(idx, _)| ChainId(idx)) + } + + /// Returns the value passed as [`ChainConfig::block_number_bytes`] for the given chain. + /// + /// # Panic + /// + /// Panics if the given [`ChainId`] is invalid. + /// + pub fn block_number_bytes(&self, chain_id: ChainId) -> usize { + self.chains[chain_id.0].block_number_bytes + } + + /// Marks the given chain-peer combination as "desired". + /// + /// Has no effect if it was already marked as desired. + /// + /// # Panic + /// + /// Panics if the given [`ChainId`] is invalid. + /// + pub fn gossip_insert_desired(&mut self, chain_id: ChainId, peer_id: PeerId, kind: GossipKind) { + // TODO: update unconnected_desired + assert!(self.chains.contains(chain_id.0)); + self.gossip_desired_peers_by_chain + .insert((chain_id.0, peer_id, kind)); + } + + /// Removes the given chain-peer combination from the list of desired chain-peers. + /// + /// Has no effect if it was not marked as desired. + /// + /// # Panic + /// + /// Panics if the given [`ChainId`] is invalid. + /// + pub fn gossip_remove_desired(&mut self, chain_id: ChainId, peer_id: PeerId, kind: GossipKind) { + // TODO: update unconnected_desired + assert!(self.chains.contains(chain_id.0)); + self.gossip_desired_peers_by_chain + .remove(&(chain_id.0, peer_id, kind)); + } + + /// Returns the list of [`PeerId`]s that are desired (for any chain) but for which no + /// connection exists. + /// + /// This includes the gossip-desired peers, but also connections that are necessary for + /// Kademlia requests. + /// + /// > **Note**: Connections that are currently in the process of shutting down are also + /// > ignored for the purpose of this function. + pub fn unconnected_desired(&'_ self) -> impl Iterator + '_ { + self.unconnected_desired.iter() + } + + /// Returns the list of [`PeerId`]s that are marked as desired, and for which a healthy + /// connection exists, but for which no substream connection attempt exists. + pub fn connected_unopened_gossip_desired( + &'_ self, + ) -> impl Iterator + '_ { + self.connected_unopened_gossip_desired + .iter() + .map(move |(peer_id, chain_id, gossip_kind)| (peer_id, *chain_id, *gossip_kind)) + } + + /// Adds a single-stream connection to the state machine. + /// + /// This connection hasn't finished handshaking and the [`PeerId`] of he remote isn't known + /// yet. + /// + /// If `expected_peer_id` is `Some`, this connection is expected to reach the given [`PeerId`]. + /// The `expected_peer_id` is only used to influence the result of + /// [`ChainNetwork::unconnected_desired`]. + /// + /// Must be passed the moment (as a `TNow`) when the connection has first been opened, in + /// order to determine when the handshake timeout expires. + /// + /// The `remote_addr` is the address used to reach back the remote. In the case of TCP, it + /// contains the TCP dialing port of the remote. The remote can ask, through the `identify` + /// libp2p protocol, its own address, in which case we send it. + pub fn add_single_stream_connection( + &mut self, + when_connection_start: TNow, + handshake_kind: SingleStreamHandshakeKind, + remote_addr: multiaddr::Multiaddr, + expected_peer_id: Option, + ) -> (ConnectionId, SingleStreamConnectionTask) { + // TODO: do the max protocol name length better ; knowing that it can later change if a chain with a long forkId is added + let max_protocol_name_len = 256; + let substreams_capacity = 16; // TODO: ? + let (id, task) = self.inner.insert_single_stream( + when_connection_start, + match handshake_kind { + SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { is_initiator } => { + collection::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { + is_initiator, + noise_key: &self.noise_key, + } + } + }, + substreams_capacity, + max_protocol_name_len, + ConnectionInfo { + address: remote_addr, + peer_id: expected_peer_id.clone(), + }, + ); + if let Some(expected_peer_id) = expected_peer_id { + self.connections_by_peer_id.insert((expected_peer_id, id)); + self.unconnected_desired.remove(&expected_peer_id); + } + (id, task) + } + + /// Adds a multi-stream connection to the state machine. + /// + /// This connection hasn't finished handshaking and the [`PeerId`] of the remote isn't known + /// yet. + /// + /// If `expected_peer_id` is `Some`, this connection is expected to reach the given [`PeerId`]. + /// The `expected_peer_id` is only used to influence the result of + /// [`ChainNetwork::unconnected_desired`]. + /// + /// Must be passed the moment (as a `TNow`) when the connection has first been opened, in + /// order to determine when the handshake timeout expires. + /// + /// The `remote_addr` is the address used to reach back the remote. In the case of TCP, it + /// contains the TCP dialing port of the remote. The remote can ask, through the `identify` + /// libp2p protocol, its own address, in which case we send it. + pub fn add_multi_stream_connection( + &mut self, + when_connection_start: TNow, + handshake_kind: MultiStreamHandshakeKind, + remote_addr: multiaddr::Multiaddr, + expected_peer_id: Option, + ) -> (ConnectionId, MultiStreamConnectionTask) + where + TSubId: Clone + PartialEq + Eq + Hash, + { + // TODO: do the max protocol name length better ; knowing that it can later change if a chain with a long forkId is added + let max_protocol_name_len = 256; + let substreams_capacity = 16; // TODO: ? + let (id, task) = self.inner.insert_multi_stream( + when_connection_start, + match handshake_kind { + MultiStreamHandshakeKind::WebRtc { + is_initiator, + local_tls_certificate_multihash, + remote_tls_certificate_multihash, + } => collection::MultiStreamHandshakeKind::WebRtc { + is_initiator, + noise_key: &self.noise_key, + local_tls_certificate_multihash, + remote_tls_certificate_multihash, + }, + }, + substreams_capacity, + max_protocol_name_len, + ConnectionInfo { + address: remote_addr, + peer_id: expected_peer_id.clone(), + }, + ); + if let Some(expected_peer_id) = expected_peer_id { + self.connections_by_peer_id.insert((expected_peer_id, id)); + self.unconnected_desired.remove(&expected_peer_id); + } + (id, task) + } + + /// Pulls a message that must be sent to a connection. + /// + /// The message must be passed to [`SingleStreamConnectionTask::inject_coordinator_message`] + /// or [`MultiStreamConnectionTask::inject_coordinator_message`] in the appropriate connection. + /// + /// This function guarantees that the [`ConnectionId`] always refers to a connection that + /// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`] + /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`. + pub fn pull_message_to_connection( + &mut self, + ) -> Option<(ConnectionId, CoordinatorToConnection)> { + self.inner.pull_message_to_connection() + } + + /// Injects into the state machine a message generated by + /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] or + /// [`MultiStreamConnectionTask::pull_message_to_coordinator`]. + pub fn inject_connection_message( + &mut self, + connection_id: ConnectionId, + message: ConnectionToCoordinator, + ) { + self.inner.inject_connection_message(connection_id, message) + } + + /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of + /// the network. + /// + /// Nodes that are discovered might disappear over time. In other words, there is no guarantee + /// that a node that has been added through [`ChainNetwork::discover`] will later be returned + /// by [`ChainNetwork::discovered_nodes`]. + pub fn discovered_nodes( + &'_ self, + chain_id: ChainId, + ) -> impl Iterator)> + '_ + { + let kbuckets = &self.chains[chain_id.0].kbuckets; + kbuckets.iter_ordered().map(move |(peer_id, _)| { + ( + peer_id, + self.kbuckets_peers.get(peer_id).unwrap().addresses.iter(), + ) + }) + } + + /// Returns the next event produced by the service. + pub fn next_event(&mut self) -> Option { + loop { + let inner_event = self.inner.next_event()?; + match inner_event { + collection::Event::HandshakeFinished { + id, + peer_id: actual_peer_id, + } => { + // Store the actual `PeerId` into the connection, making sure to update `self`. + let connection_info = &mut self.inner[id]; + let expected_peer_id = connection_info.peer_id.clone(); + match &mut connection_info.peer_id { + Some(expected_peer_id) if *expected_peer_id == actual_peer_id => {} + Some(expected_peer_id) => { + // TODO: cloning of `expected_peer_id` overhead + let _was_removed = self + .connections_by_peer_id + .remove(&(expected_peer_id.clone(), id)); + debug_assert!(_was_removed); + let _was_inserted = self + .connections_by_peer_id + .insert((actual_peer_id.clone(), id)); + debug_assert!(_was_inserted); + self.unconnected_desired.remove(&actual_peer_id); + *expected_peer_id = actual_peer_id.clone(); + } + peer_id @ None => { + self.unconnected_desired.remove(&actual_peer_id); + *peer_id = Some(actual_peer_id.clone()); + } + } + + debug_assert!(!self.unconnected_desired.contains(&actual_peer_id)); + + // TODO: limit the number of connections per peer? + + return Some(Event::HandshakeFinished { + id, + expected_peer_id, + peer_id: actual_peer_id, + }); + } + + collection::Event::StartShutdown { id, .. } => { + // TODO: IMPORTANT this event should be turned into `NewOutboundSubstreamsForbidden` and the `reason` removed; see + todo!() + } + collection::Event::Shutdown { + id, + was_established, + user_data: connection_info, + } => { + // A connection has been closed. + // Note that the underlying state machine guarantees that all the substreams + // have been closed beforehand through other events. + + debug_assert!(connection_info.peer_id.is_some() || !was_established); + + if let Some(peer_id) = connection_info.peer_id { + let _was_removed = self.connections_by_peer_id.remove(&(peer_id, id)); + debug_assert!(_was_removed); + } + + // TODO: update unconnected_desired + + // TODO: IMPORTANT this event should indicate a clean shutdown, a pre-handshake interruption, a protocol error, a reset, etc. and should get a `reason`; see + todo!() + } + collection::Event::InboundError { id, error } => { + // TODO: report event to API user for diagnostics + continue; + } + + collection::Event::InboundNegotiated { + id, + substream_id, + protocol_name, + } => { + // An inbound substream has negotiated a protocol. We must decide whether to + // accept this protocol or instead reject the substream. + // If accepted, we must also save the protocol somewhere in `self` in order to + // load it later once things happen on this substream. + match self.recognize_protocol(&protocol_name) { + Ok(protocol) => { + let inbound_type = match protocol { + Protocol::Identify => collection::InboundTy::Request { + request_max_size: None, + }, + Protocol::Ping => collection::InboundTy::Ping, + Protocol::BlockAnnounces { .. } => { + collection::InboundTy::Notifications { + max_handshake_size: 1024 * 1024, // TODO: arbitrary + } + } + Protocol::Transactions { .. } => { + collection::InboundTy::Notifications { + max_handshake_size: 4, + } + } + Protocol::Grandpa { chain_index } + if self.chains[chain_index] + .grandpa_protocol_config + .is_some() => + { + collection::InboundTy::Notifications { + max_handshake_size: 4, + } + } + Protocol::Grandpa { .. } => { + self.inner.reject_inbound(substream_id); + continue; + } + Protocol::Sync { chain_index } + if self.chains[chain_index].allow_inbound_block_requests => + { + collection::InboundTy::Request { + request_max_size: Some(1024), + } + } + Protocol::Sync { .. } => { + self.inner.reject_inbound(substream_id); + continue; + } + + // TODO: protocols that are not supported + Protocol::LightUnknown { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => { + self.inner.reject_inbound(substream_id); + continue; + } + + Protocol::LightStorage { .. } | Protocol::LightCall { .. } => { + unreachable!() + } + }; + + let _prev_value = self.substreams.insert( + substream_id, + SubstreamInfo { + connection_id: id, + outgoing: false, + protocol, + }, + ); + debug_assert!(_prev_value.is_none()); + } + Err(()) => { + self.inner.reject_inbound(substream_id); + continue; + } + } + continue; + } + + collection::Event::InboundNegotiatedCancel { substream_id } => { + // Because we immediately accept or reject substreams, this event can never + // happen. + unreachable!() + } + + collection::Event::InboundAcceptedCancel { substream_id } => { + // An inbound substream has been aborted after having been accepted. + // Since we don't report any event to the API user when a substream is + // accepted, we have nothing to do but clean up our state. + let _was_in = self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + continue; + } + + collection::Event::Response { + substream_id, + response, + } => { + // Received a response on a connection. + let substream_info = self + .substreams + .remove(&substream_id) + .unwrap_or_else(|| unreachable!()); + let connection_info = &self.inner[substream_info.connection_id]; + // Requests can only happen on connections after their handshake phase is + // finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + // Decode/verify the response. + let response = match substream_info.protocol { + Protocol::Identify => todo!(), + Protocol::Sync { chain_index } => RequestResult::Blocks( + response + .map_err(BlocksRequestError::Request) + .and_then(|response| { + protocol::decode_block_response(&response) + .map_err(BlocksRequestError::Decode) + }), + ), + Protocol::LightUnknown { .. } => unreachable!(), + Protocol::LightStorage { chain_index } => RequestResult::StorageProof( + response + .map_err(StorageProofRequestError::Request) + .and_then(|payload| { + match protocol::decode_storage_or_call_proof_response( + protocol::StorageOrCallProof::StorageProof, + &payload, + ) { + Err(err) => Err(StorageProofRequestError::Decode(err)), + Ok(None) => { + Err(StorageProofRequestError::RemoteCouldntAnswer) + } + Ok(Some(_)) => Ok(EncodedMerkleProof( + payload, + protocol::StorageOrCallProof::StorageProof, + )), + } + }), + ), + Protocol::LightCall { chain_index } => { + RequestResult::CallProof( + response.map_err(CallProofRequestError::Request).and_then( + |payload| match protocol::decode_storage_or_call_proof_response( + protocol::StorageOrCallProof::CallProof, + &payload, + ) { + Err(err) => Err(CallProofRequestError::Decode(err)), + Ok(None) => Err(CallProofRequestError::RemoteCouldntAnswer), + Ok(Some(_)) => Ok(EncodedMerkleProof( + payload, + protocol::StorageOrCallProof::CallProof, + )), + }, + ), + ) + } + Protocol::Kad { chain_index } => todo!(), + Protocol::SyncWarp { chain_index } => RequestResult::GrandpaWarpSync( + response + .map_err(GrandpaWarpSyncRequestError::Request) + .and_then(|message| { + if let Err(err) = protocol::decode_grandpa_warp_sync_response( + &message, + self.chains[chain_index].block_number_bytes, + ) { + Err(GrandpaWarpSyncRequestError::Decode(err)) + } else { + Ok(EncodedGrandpaWarpSyncResponse { + message, + block_number_bytes: self.chains[chain_index] + .block_number_bytes, + }) + } + }), + ), + Protocol::State { chain_index } => RequestResult::State( + response + .map_err(StateRequestError::Request) + .and_then(|payload| { + if let Err(err) = protocol::decode_state_response(&payload) { + Err(StateRequestError::Decode(err)) + } else { + Ok(EncodedStateResponse(payload)) + } + }), + ), + + // The protocols below aren't request-response protocols. + Protocol::Ping + | Protocol::BlockAnnounces { .. } + | Protocol::Transactions { .. } + | Protocol::Grandpa { .. } => unreachable!(), + }; + + return Some(Event::RequestResult { + substream_id, + response, + }); + } + + collection::Event::RequestIn { + substream_id, + request_payload, + } => { + // Received a request on a connection. + let substream_info = self + .substreams + .get(&substream_id) + .unwrap_or_else(|| unreachable!()); + let connection_info = &self.inner[substream_info.connection_id]; + // Requests can only happen on connections after their handshake phase is + // finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + match substream_info.protocol { + Protocol::Identify => { + if request_payload.is_empty() { + return Some(Event::IdentifyRequestIn { + peer_id: peer_id.clone(), + substream_id, + }); + } else { + // TODO: can this actually be reached? isn't the inner code going to refuse a bad request anyway due to no length prefix? + let _ = self.substreams.remove(&substream_id); + self.inner.respond_in_request(substream_id, Err(())); + return Some(Event::ProtocolError { + peer_id: peer_id.clone(), + error: ProtocolError::BadIdentifyRequest, + }); + } + } + Protocol::Sync { chain_index } => { + match protocol::decode_block_request( + self.chains[chain_index].block_number_bytes, + &request_payload, + ) { + Ok(config) => { + return Some(Event::BlocksRequestIn { + peer_id: peer_id.clone(), + chain_id: ChainId(chain_index), + config, + substream_id, + }) + } + Err(error) => { + let _ = self.substreams.remove(&substream_id); + self.inner.respond_in_request(substream_id, Err(())); + return Some(Event::ProtocolError { + peer_id: peer_id.clone(), + error: ProtocolError::BadBlocksRequest(error), + }); + } + } + } + _ => unreachable!(), + } + } + + collection::Event::RequestInCancel { substream_id } => { + let _was_in = self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + return Some(Event::RequestInCancel { substream_id }); + } + + collection::Event::NotificationsOutResult { + substream_id, + result, + } => { + // Outgoing notifications substream has finished opening. + let substream_info = self + .substreams + .get(&substream_id) + .unwrap_or_else(|| unreachable!()); + let connection_info = &self.inner[substream_info.connection_id]; + // Notification substreams can only happen on connections after their + // handshake phase is finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + // Check validity of the handshake. + let result = match substream_info.protocol { + Protocol::BlockAnnounces { chain_index } => result.and_then(|handshake| { + protocol::decode_block_announces_handshake( + self.chains[chain_index].block_number_bytes, + &handshake, + ) + }), + Protocol::Transactions { chain_index } => todo!(), + Protocol::Grandpa { chain_index } => todo!(), + + // The other protocols aren't notification protocols. + Protocol::Identify + | Protocol::Ping + | Protocol::Sync { .. } + | Protocol::LightUnknown { .. } + | Protocol::LightStorage { .. } + | Protocol::LightCall { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => unreachable!(), + }; + } + + collection::Event::NotificationsOutCloseDemanded { substream_id } + | collection::Event::NotificationsOutReset { substream_id } => { + // Outgoing notifications substream has been closed or must be closed. + + // If the request demands the closing, we immediately comply. + if matches!( + inner_event, + collection::Event::NotificationsOutCloseDemanded { .. } + ) { + self.inner.close_out_notifications(substream_id); + } + + let substream_info = self + .substreams + .remove(&substream_id) + .unwrap_or_else(|| unreachable!()); + let connection_info = &self.inner[substream_info.connection_id]; + // Notification substreams can only happen on connections after their + // handshake phase is finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + // Clean up the local state. + let _was_in = self.notification_substreams_by_peer_id.remove(&( + match substream_info.protocol { + Protocol::BlockAnnounces { chain_index } => { + NotificationsProtocol::BlockAnnounces { chain_index } + } + Protocol::Transactions { chain_index } => { + NotificationsProtocol::Transactions { chain_index } + } + Protocol::Grandpa { chain_index } => { + NotificationsProtocol::Grandpa { chain_index } + } + // Other protocols aren't notification protocols. + Protocol::Identify + | Protocol::Ping + | Protocol::Sync { .. } + | Protocol::LightUnknown { .. } + | Protocol::LightStorage { .. } + | Protocol::LightCall { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => unreachable!(), + }, + peer_id.clone(), // TODO: cloning overhead :-/ + NotificationsSubstreamDirection::Out, + NotificationsSubstreamState::Open, + substream_id, + )); + debug_assert!(_was_in); + + // Some substreams are tied to the state of the block announces substream. + match substream_info.protocol { + Protocol::BlockAnnounces { chain_index } => { + todo!() + } + Protocol::Transactions { chain_index } + | Protocol::Grandpa { chain_index } => { + // These protocols are tied to the block announces substream. If + // there is a block announce substream with the peer, we try to reopen + // these two substreams. + todo!() + } + _ => {} + } + + // Generate an event if relevant. + if let Protocol::BlockAnnounces { chain_index } = substream_info.protocol { + return Some(Event::GossipDisconnected { + peer_id: peer_id.clone(), + chain_id: ChainId(chain_index), + kind: GossipKind::ConsensusTransactions, + }); + } + } + + collection::Event::NotificationsInOpen { substream_id, .. } => { + // Remote would like to open a notifications substream with us. + + let substream_info = self + .substreams + .get(&substream_id) + .unwrap_or_else(|| unreachable!()); + let connection_info = &self.inner[substream_info.connection_id]; + // Notification substreams can only happen on connections after their + // handshake phase is finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + // We generate an event for the API user only if no other substream opening + // attempt has been made yet. + let generate_event = todo!(); + + // Update local state. + let _was_inserted = self.notification_substreams_by_peer_id.insert(( + match substream_info.protocol { + Protocol::BlockAnnounces { chain_index } => { + NotificationsProtocol::BlockAnnounces { chain_index } + } + Protocol::Transactions { chain_index } => { + NotificationsProtocol::Transactions { chain_index } + } + Protocol::Grandpa { chain_index } => { + NotificationsProtocol::Grandpa { chain_index } + } + // Other protocols aren't notification protocols. + Protocol::Identify + | Protocol::Ping + | Protocol::Sync { .. } + | Protocol::LightUnknown { .. } + | Protocol::LightStorage { .. } + | Protocol::LightCall { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => unreachable!(), + }, + peer_id.clone(), // TODO: cloning overhead :-/ + NotificationsSubstreamDirection::In, + NotificationsSubstreamState::Pending, + substream_id, + )); + debug_assert!(_was_inserted); + + // TODO: must accept the substream if we already have an out substream with peer + + if generate_event { + return Some(Event::GossipInDesired { + peer_id: peer_id.clone(), + chain_id: todo!(), + kind: GossipKind::ConsensusTransactions, + }); + } + } + + collection::Event::NotificationsInOpenCancel { .. } => { + todo!() + } + + collection::Event::NotificationsIn { + substream_id, + notification, + } => { + // Received a notification from a remote. + let substream_info = self + .substreams + .get(&substream_id) + .unwrap_or_else(|| unreachable!()); + let chain_index = match substream_info.protocol { + Protocol::BlockAnnounces { chain_index } => chain_index, + Protocol::Transactions { chain_index } => chain_index, + Protocol::Grandpa { chain_index } => chain_index, + // Other protocols are not notification protocols. + Protocol::Identify + | Protocol::Ping + | Protocol::Sync { .. } + | Protocol::LightUnknown { .. } + | Protocol::LightStorage { .. } + | Protocol::LightCall { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => unreachable!(), + }; + let connection_info = &self.inner[substream_info.connection_id]; + // Notification substreams can only happen on connections after their + // handshake phase is finished, therefore their `PeerId` is known. + let peer_id = connection_info + .peer_id + .as_ref() + .unwrap_or_else(|| unreachable!()); + + // Check whether there is an open outgoing block announces substream, as this + // means that we are "gossip-connected". If not, then the notification is + // silently discarded. + // TODO: cloning of the peer_id + if self + .notification_substreams_by_peer_id + .range( + ( + NotificationsProtocol::BlockAnnounces { chain_index }, + peer_id.clone(), + NotificationsSubstreamDirection::Out, + NotificationsSubstreamState::Open, + collection::SubstreamId::min_value(), + ) + ..=( + NotificationsProtocol::BlockAnnounces { chain_index }, + peer_id.clone(), + NotificationsSubstreamDirection::Out, + NotificationsSubstreamState::Open, + collection::SubstreamId::max_value(), + ), + ) + .next() + .is_none() + { + continue; + } + + // Decode the notification and return an event. + match substream_info.protocol { + Protocol::BlockAnnounces { .. } => { + if let Err(err) = protocol::decode_block_announce( + ¬ification, + self.chains[chain_index].block_number_bytes, + ) { + return Some(Event::ProtocolError { + error: ProtocolError::BadBlockAnnounce(err), + peer_id: peer_id.clone(), + }); + } + + return Some(Event::BlockAnnounce { + chain_id: ChainId(chain_index), + peer_id: peer_id.clone(), + announce: EncodedBlockAnnounce { + message: notification, + block_number_bytes: self.chains[chain_index].block_number_bytes, + }, + }); + } + Protocol::Transactions { .. } => { + // TODO: not implemented + } + Protocol::Grandpa { .. } => { + let decoded_notif = match protocol::decode_grandpa_notification( + ¬ification, + self.chains[chain_index].block_number_bytes, + ) { + Ok(n) => n, + Err(err) => { + return Some(Event::ProtocolError { + error: ProtocolError::BadGrandpaNotification(err), + peer_id: peer_id.clone(), + }) + } + }; + + match decoded_notif { + protocol::GrandpaNotificationRef::Commit(_) => { + return Some(Event::GrandpaCommitMessage { + chain_id: ChainId(chain_index), + peer_id: peer_id.clone(), + message: EncodedGrandpaCommitMessage { + message: notification, + block_number_bytes: self.chains[chain_index] + .block_number_bytes, + }, + }) + } + protocol::GrandpaNotificationRef::Neighbor(n) => { + return Some(Event::GrandpaNeighborPacket { + chain_id: ChainId(chain_index), + peer_id: peer_id.clone(), + state: GrandpaState { + round_number: n.round_number, + set_id: n.set_id, + commit_finalized_height: n.commit_finalized_height, + }, + }) + } + _ => { + // Any other type of message is currently ignored. Support + // for them could be added in the future. + } + } + } + + // Other protocols are not notification protocols. + Protocol::Identify + | Protocol::Ping + | Protocol::Sync { .. } + | Protocol::LightUnknown { .. } + | Protocol::LightStorage { .. } + | Protocol::LightCall { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => unreachable!(), + } + } + + collection::Event::NotificationsInClose { substream_id, .. } => { + // An incoming notifications substream has been closed. + // Nothing to do except clean up the local state. + let _was_in = self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + } + + collection::Event::PingOutSuccess { .. } => { + // We ignore ping events. + // TODO: report to end user or something + } + collection::Event::PingOutFailed { id } => { + // TODO: handle the same way as a shutdown event + todo!() + } + } + } + } + + /// Sends a blocks request to the given peer. + /// + /// The code in this module does not verify the response in any way. The blocks might be + /// completely different from the ones requested, or might be missing some information. In + /// other words, the response is completely untrusted. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + // TODO: more docs + pub fn start_blocks_request( + &mut self, + now: TNow, + target: &PeerId, + chain_id: ChainId, + config: protocol::BlocksRequestConfig, + timeout: Duration, + ) -> Result { + let request_data = + protocol::build_block_request(self.chains[chain_id.0].block_number_bytes, &config) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + self.start_request( + now, + target, + request_data, + Protocol::SyncWarp { + chain_index: chain_id.0, + }, + timeout, + ) + } + + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + // TODO: docs + pub fn start_grandpa_warp_sync_request( + &mut self, + now: TNow, + target: &PeerId, + chain_id: ChainId, + begin_hash: [u8; 32], + timeout: Duration, + ) -> Result { + let request_data = begin_hash.to_vec(); + + self.start_request( + now, + target, + request_data, + Protocol::SyncWarp { + chain_index: chain_id.0, + }, + timeout, + ) + } + + /// Sends a state request to a peer. + /// + /// A state request makes it possible to download the storage of the chain at a given block. + /// The response is not unverified by this function. In other words, the peer is free to send + /// back erroneous data. It is the responsibility of the API user to verify the storage by + /// calculating the state trie root hash and comparing it with the value stored in the + /// block's header. + /// + /// Because response have a size limit, it is unlikely that a single request will return the + /// entire storage of the chain at once. Instead, call this function multiple times, each call + /// passing a `start_key` that follows the last key of the previous response. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn start_state_request( + &mut self, + now: TNow, + target: &PeerId, + chain_id: ChainId, + block_hash: &[u8; 32], + start_key: protocol::StateRequestStart, + timeout: Duration, + ) -> Result { + let request_data = protocol::build_state_request(protocol::StateRequest { + block_hash, + start_key, + }) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + self.start_request( + now, + target, + request_data, + Protocol::SyncWarp { + chain_index: chain_id.0, + }, + timeout, + ) + } + + /// Sends a storage request to the given peer. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + // TODO: more docs + pub fn start_storage_proof_request( + &mut self, + now: TNow, + target: &PeerId, + chain_id: ChainId, + config: protocol::StorageProofRequestConfig + Clone>>, + timeout: Duration, + ) -> Result { + let request_data = + protocol::build_storage_proof_request(config).fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + // The request data can possibly by higher than the protocol limit, especially due to the + // call data. + // TODO: check limit + + Ok(self.start_request( + now, + target, + request_data, + Protocol::SyncWarp { + chain_index: chain_id.0, + }, + timeout, + )?) + } + + /// Sends a call proof request to the given peer. + /// + /// This request is similar to [`ChainNetwork::start_storage_proof_request`]. Instead of + /// requesting specific keys, we request the list of all the keys that are accessed for a + /// specific runtime call. + /// + /// There exists no guarantee that the proof is complete (i.e. that it contains all the + /// necessary entries), as it is impossible to know this from just the proof itself. As such, + /// this method is just an optimization. When performing the actual call, regular storage proof + /// requests should be performed if the key is not present in the call proof response. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn start_call_proof_request( + &mut self, + now: TNow, + target: &PeerId, + chain_id: ChainId, + config: protocol::CallProofRequestConfig<'_, impl Iterator>>, + timeout: Duration, + ) -> Result { + let request_data = + protocol::build_call_proof_request(config).fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + // The request data can possibly by higher than the protocol limit, especially due to the + // call data. + // TODO: check limit + + Ok(self.start_request( + now, + target, + request_data, + Protocol::SyncWarp { + chain_index: chain_id.0, + }, + timeout, + )?) + } + + /// Underlying implementation of all the functions that start requests. + fn start_request( + &mut self, + now: TNow, + target: &PeerId, + request_data: Vec, + protocol: Protocol, + timeout: Duration, + ) -> Result { + // TODO: cloning of `PeerId` overhead + // TODO: this is O(n) but is it really a problem? you're only supposed to have max 1 or 2 connections per PeerId + let connection_id = self + .connections_by_peer_id + .range( + (target.clone(), collection::ConnectionId::min_value()) + ..=(target.clone(), collection::ConnectionId::max_value()), + ) + .map(|(_, connection_id)| *connection_id) + .find(|connection_id| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + .ok_or(StartRequestError::NoConnection)?; + + let protocol_name = { + let protocol_name = match protocol { + Protocol::Identify => protocol::ProtocolName::Identify, + Protocol::Ping => protocol::ProtocolName::Ping, + Protocol::BlockAnnounces { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::BlockAnnounces { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::Transactions { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Transactions { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::Grandpa { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Grandpa { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::Sync { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Sync { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::LightUnknown { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Light { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::LightStorage { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Light { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::LightCall { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Light { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::Kad { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::Kad { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::SyncWarp { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::SyncWarp { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + Protocol::State { chain_index } => { + let chain_info = &self.chains[chain_index]; + protocol::ProtocolName::State { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + } + } + }; + + protocol::encode_protocol_name_string(protocol_name) + }; + + let substream_id = self.inner.start_request( + connection_id, + protocol_name, + Some(request_data), + now + timeout, + 16 * 1024 * 1024, + ); + + let _prev_value = self.substreams.insert( + substream_id, + SubstreamInfo { + connection_id, + outgoing: true, + protocol, + }, + ); + debug_assert!(_prev_value.is_none()); + + Ok(substream_id) + } + + /// Responds to an identify request. Call this function in response to + /// a [`Event::IdentifyRequestIn`]. + /// + /// Only the `agent_version` needs to be specified. The other fields are automatically + /// filled by the [`ChainNetwork`]. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`SubstreamId`] is invalid or doesn't correspond to a blocks request or + /// if the request has been cancelled with a [`Event::RequestInCancel`]. + /// + pub fn respond_identify(&mut self, substream_id: SubstreamId, agent_version: &str) { + let substream_info = self.substreams.remove(&substream_id).unwrap(); + assert!(matches!(substream_info.protocol, Protocol::Identify { .. })); + + let response = { + let observed_addr = &self.inner[substream_info.connection_id].address; + + // TODO: all protocols + let supported_protocols = [protocol::ProtocolName::Ping].into_iter(); + + let supported_protocols_names = supported_protocols + .map(|proto| protocol::encode_protocol_name_string(proto)) + .collect::>(); + + protocol::build_identify_response(protocol::IdentifyResponse { + protocol_version: "/substrate/1.0", // TODO: same value as in Substrate, see also https://github.com/paritytech/substrate/issues/14331 + agent_version, + ed25519_public_key: *self.noise_key.libp2p_public_ed25519_key(), + listen_addrs: iter::empty(), // TODO: + observed_addr: observed_addr.as_ref(), + protocols: supported_protocols_names.iter().map(|p| &p[..]), + }) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }) + }; + + self.inner.respond_in_request(substream_id, Ok(response)); + } + + /// Responds to a blocks request. Call this function in response to + /// a [`Event::BlocksRequestIn`]. + /// + /// Pass `None` in order to deny the request. Do this if blocks aren't available locally. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`SubstreamId`] is invalid or doesn't correspond to a blocks request or + /// if the request has been cancelled with a [`Event::RequestInCancel`]. + /// + // TOOD: more zero-cost parameter + pub fn respond_blocks( + &mut self, + substream_id: SubstreamId, + response: Option>, + ) { + let substream_info = self.substreams.remove(&substream_id).unwrap(); + assert!(matches!(substream_info.protocol, Protocol::Sync { .. })); + + let response = if let Some(response) = response { + Ok( + protocol::build_block_response(response).fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }), + ) + } else { + Err(()) + }; + + self.inner.respond_in_request(substream_id, response); + } + + /// Performs a round of Kademlia discovery. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if [`ChainId`] is invalid. + /// + pub fn kademlia_start_discovery_round(&mut self, now: TNow, chain_id: ChainId) -> OperationId { + let random_peer_id = { + let mut pub_key = [0; 32]; + self.randomness.fill_bytes(&mut pub_key); + PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key)) + }; + + let queried_peer = { + let peer_id = self.chains[chain_id.0] + .kbuckets + .closest_entries(&random_peer_id) + // TODO: instead of filtering by connectd only, connect to nodes if not connected + // TODO: additionally, this only takes outgoing connections into account + .find(|(peer_id, _)| { + self.kbuckets_peers + .get(*peer_id) + .unwrap() + .addresses + .iter_connected() + .next() + .is_some() + }) + .map(|(peer_id, _)| peer_id.clone()); + peer_id + }; + + let kademlia_operation_id = self.next_kademlia_operation_id; + self.next_kademlia_operation_id.0 += 1; + + if let Some(queried_peer) = queried_peer { + let request_data = protocol::build_find_node_request(random_peer_id.as_bytes()); + + // The timeout needs to be long enough to potentially download the maximum + // response size of 1 MiB. Assuming a 128 kiB/sec connection, that's 8 seconds. + let timeout = now + Duration::from_secs(8); + + // TODO: use result + self.start_request( + now, + &queried_peer, + request_data, + Protocol::Kad { + chain_index: chain_id.0, + }, + timeout, + ); + } else { + self.pending_kademlia_errors + .push_back((kademlia_operation_id, DiscoveryError::NoPeer)) + } + + kademlia_operation_id + } + + /// Returns the list of all peers for a [`Event::GossipConnected`] event of the given kind has + /// been emitted. + /// It is possible to send gossip notifications to these peers. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn gossip_connected_peers( + &'_ self, + chain_id: ChainId, + kind: GossipKind, + ) -> impl Iterator + '_ { + assert!(self.chains.contains(chain_id.0)); + let GossipKind::ConsensusTransactions = kind; + // TODO: O(n) ; optimize this by using range(), but that's a bit complicated + self.notification_substreams_by_peer_id + .iter() + .filter(move |(p, _, d, s, _)| { + *p == NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + } && *d == NotificationsSubstreamDirection::Out + && *s == NotificationsSubstreamState::Open + }) + .map(|(_, peer_id, _, _, _)| peer_id) + } + + /// Open a gossiping substream with the given peer on the given chain. + /// + /// Either a [`Event::GossipConnected`] or [`Event::GossipOpenFailed`] is guaranteed to later + /// be generated. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + // TODO: proper error + pub fn gossip_open( + &mut self, + now: TNow, + chain_id: ChainId, + target: &PeerId, + kind: GossipKind, + ) -> Result<(), ()> { + let GossipKind::ConsensusTransactions = kind; + + let chain_info = &self.chains[chain_id.0]; + + // TODO: check with a call to `range()` whether there's already an opening in progress + + let protocol_name = + protocol::encode_protocol_name_string(protocol::ProtocolName::BlockAnnounces { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + }); + + // TODO: cloning of `PeerId` overhead + // TODO: this is O(n) but is it really a problem? you're only supposed to have max 1 or 2 connections per PeerId + let connection_id = self + .connections_by_peer_id + .range( + (target.clone(), collection::ConnectionId::min_value()) + ..=(target.clone(), collection::ConnectionId::max_value()), + ) + .map(|(_, connection_id)| *connection_id) + .find(|connection_id| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + .ok_or(StartRequestError::NoConnection)?; + + let handshake = protocol::encode_block_announces_handshake( + protocol::BlockAnnouncesHandshakeRef { + best_hash: &chain_info.best_hash, + best_number: chain_info.best_number, + role: chain_info.role, + genesis_hash: &chain_info.genesis_hash, + }, + self.chains[chain_id.0].block_number_bytes, + ) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + let substream_id = self.inner.open_out_notifications( + connection_id, + protocol_name, + now + Duration::from_secs(10), // TODO: arbitrary + handshake, + 1024 * 1024, // TODO: arbitrary + ); + + let _was_inserted = self.notification_substreams_by_peer_id.insert(( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + target.clone(), + NotificationsSubstreamDirection::Out, + NotificationsSubstreamState::Pending, + substream_id, + )); + debug_assert!(_was_inserted); + + Ok(()) + } + + /// Update the state of the local node with regards to GrandPa rounds. + /// + /// Calling this method does two things: + /// + /// - Send on all the active GrandPa substreams a "neighbor packet" indicating the state of + /// the local node. + /// - Update the neighbor packet that is automatically sent to peers when a GrandPa substream + /// gets opened. + /// + /// In other words, calling this function atomically informs all the present and future peers + /// of the state of the local node regarding the GrandPa protocol. + /// + /// > **Note**: The information passed as parameter isn't validated in any way by this method. + /// + /// This function might generate a message destined to connections. Use + /// [`ChainNetwork::pull_message_to_connection`] to process these messages after it has + /// returned. + /// + /// # Panic + /// + /// Panics if [`ChainId`] is invalid, or if the chain has GrandPa disabled. + /// + pub fn gossip_broadcast_grandpa_state_and_update( + &mut self, + chain_id: ChainId, + grandpa_state: GrandpaState, + ) { + // Bytes of the neighbor packet to send out. + let packet = protocol::GrandpaNotificationRef::Neighbor(protocol::NeighborPacket { + round_number: grandpa_state.round_number, + set_id: grandpa_state.set_id, + commit_finalized_height: grandpa_state.commit_finalized_height, + }) + .scale_encoding(self.chains[chain_id.0].block_number_bytes) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + // Now sending out. + self.queue_notification( + target, + NotificationsProtocol::Grandpa { + chain_index: chain_id.0, + }, + packet, + ); + self.inner + .broadcast_notification(chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN + 2, packet); + + // Update the locally-stored state. + *self.chains[chain_id.0] + .grandpa_protocol_config + .as_mut() + .unwrap() = grandpa_state; + } + + /// Sends a block announce gossip message to the given peer. + /// + /// If no [`Event::GossipConnected`] event of kind [`GossipKind::ConsensusTransactions`] has + /// been emitted for the given peer, then a [`QueueNotificationError::NoConnection`] will be + /// returned. + /// + /// This function might generate a message destined a connection. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + // TODO: there this extra parameter in block announces that is unused on many chains but not always + pub fn gossip_send_block_announce( + &mut self, + target: &PeerId, + chain_id: ChainId, + scale_encoded_header: &[u8], + is_best: bool, + ) -> Result<(), QueueNotificationError> { + let notification = protocol::encode_block_announce(protocol::BlockAnnounceRef { + scale_encoded_header, + is_best, + }) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + self.queue_notification( + target, + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + notification, + ) + } + + /// Sends a transaction gossip message to the given peer. + /// + /// Must be passed the SCALE-encoded transaction. + /// + /// If no [`Event::GossipConnected`] event of kind [`GossipKind::ConsensusTransactions`] has + /// been emitted for the given peer, then a [`QueueNotificationError::NoConnection`] will be + /// returned. + /// + /// This function might generate a message destined connections. Use + /// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn gossip_send_transaction( + &mut self, + target: &PeerId, + chain_id: ChainId, + extrinsic: &[u8], + ) -> Result<(), QueueNotificationError> { + let mut val = Vec::with_capacity(1 + extrinsic.len()); + val.extend_from_slice(util::encode_scale_compact_usize(1).as_ref()); + val.extend_from_slice(extrinsic); + self.queue_notification( + target, + NotificationsProtocol::Transactions { + chain_index: chain_id.0, + }, + val, + ) + } + + /// Inner implementation for all the notifications sends. + fn queue_notification( + &mut self, + target: &PeerId, + protocol: NotificationsProtocol, + notification: Vec, + ) -> Result<(), QueueNotificationError> { + let chain_index = match protocol { + NotificationsProtocol::BlockAnnounces { chain_index } => chain_index, + NotificationsProtocol::Transactions { chain_index } => chain_index, + NotificationsProtocol::Grandpa { chain_index } => chain_index, + }; + + assert!(self.chains.contains(chain_index)); + + // We first find a block announces substream for that peer. + // TODO: only relevant for GossipKind::ConsensusTransactions + // If none is found, then we are not considered "gossip-connected", and return an error + // no matter what, even if a substream of the requested protocol exists. + // TODO: O(n) ; optimize this by using range() + let block_announces_substream = self + .notification_substreams_by_peer_id + .iter() + .find(move |(p, id, d, s, _)| { + *p == NotificationsProtocol::BlockAnnounces { chain_index } + && id == target + && *d == NotificationsSubstreamDirection::Out + && *s == NotificationsSubstreamState::Open + }) + .map(|(_, _, _, _, substream_id)| *substream_id) + .ok_or(QueueNotificationError::NoConnection)?; + + // Now find a substream of the requested protocol. + let substream_id = if matches!(protocol, NotificationsProtocol::BlockAnnounces { .. }) { + block_announces_substream + } else { + // TODO: O(n) ; optimize this by using range() + let id = self + .notification_substreams_by_peer_id + .iter() + .find(move |(p, id, d, s, _)| { + *p == protocol + && id == target + && *d == NotificationsSubstreamDirection::Out + && *s == NotificationsSubstreamState::Open + }) + .map(|(_, _, _, _, substream_id)| *substream_id); + // If we are "gossip-connected" but no open transaction/grandpa substream exists, we + // silently discard the notification. + // TODO: this is a questionable behavior + let Some(id) = id else { return Ok(()) }; + id + }; + + match self.inner.queue_notification(substream_id, notification) { + Ok(()) => Ok(()), + Err(collection::QueueNotificationError::QueueFull) => { + Err(QueueNotificationError::QueueFull) + } + } + } + + fn recognize_protocol(&self, protocol_name: &str) -> Result { + Ok(match protocol::decode_protocol_name(protocol_name)? { + protocol::ProtocolName::Identify => Protocol::Identify, + protocol::ProtocolName::Ping => Protocol::Ping, + protocol::ProtocolName::BlockAnnounces { + genesis_hash, + fork_id, + } => Protocol::BlockAnnounces { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::Transactions { + genesis_hash, + fork_id, + } => Protocol::Transactions { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::Grandpa { + genesis_hash, + fork_id, + } => Protocol::Grandpa { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::Sync { + genesis_hash, + fork_id, + } => Protocol::Sync { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::Light { + genesis_hash, + fork_id, + } => Protocol::LightUnknown { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::Kad { + genesis_hash, + fork_id, + } => Protocol::Kad { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::SyncWarp { + genesis_hash, + fork_id, + } => Protocol::SyncWarp { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + protocol::ProtocolName::State { + genesis_hash, + fork_id, + } => Protocol::State { + chain_index: *self + .chains_by_protocol_info + .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) + .ok_or(())?, + }, + }) + } +} + +/// What kind of handshake to perform on the newly-added connection. +pub enum SingleStreamHandshakeKind { + /// Use the multistream-select protocol to negotiate the Noise encryption, then use the + /// multistream-select protocol to negotiate the Yamux multiplexing. + MultistreamSelectNoiseYamux { + /// Must be `true` if the connection has been initiated locally, or `false` if it has been + /// initiated by the remote. + is_initiator: bool, + }, +} + +/// What kind of handshake to perform on the newly-added connection. +pub enum MultiStreamHandshakeKind { + /// The connection is a WebRTC connection. + /// + /// See for details. + /// + /// The reading and writing side of substreams must never be closed. Substreams can only be + /// abruptly destroyed by either side. + WebRtc { + /// Must be `true` if the connection has been initiated locally, or `false` if it has been + /// initiated by the remote. + is_initiator: bool, + /// Multihash encoding of the TLS certificate used by the local node at the DTLS layer. + local_tls_certificate_multihash: Vec, + /// Multihash encoding of the TLS certificate used by the remote node at the DTLS layer. + remote_tls_certificate_multihash: Vec, + }, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum GossipKind { + ConsensusTransactions, +} + +/// Error returned by [`ChainNetwork::add_chain`]. +#[derive(Debug, derive_more::Display, Clone)] +pub enum AddChainError { + /// The genesis hash and fork id are identical to the ones of an existing chain. + Duplicate, +} + +/// Event generated by [`ChainNetwork::next_event`]. +#[derive(Debug)] +pub enum Event { + /// A connection that was added with [`ChainNetwork::add_single_stream_connection`] or + /// [`ChainNetwork::add_multi_stream_connection`] has now finished its handshake phase. + /// Its [`PeerId`] is now known which certainty. + HandshakeFinished { + /// Identifier of the connection. + id: ConnectionId, + /// Parameter that was passed to [`ChainNetwork::add_single_stream_connection`] or + /// [`ChainNetwork::add_multi_stream_connection`]. + expected_peer_id: Option, + /// Actual [`PeerId`] of the connection. + peer_id: PeerId, + }, + + /// Now connected to the given peer for gossiping purposes. + /// + /// This event can only happen as a result of a call to [`ChainNetwork::gossip_open`]. + GossipConnected { + /// Peer we are now connected to. + peer_id: PeerId, + /// Chain of the gossip connection. + chain_id: ChainId, + /// Which kind of gossip link is concerned. + kind: GossipKind, + /// Role the node reports playing on the network. + role: Role, + /// Height of the best block according to this node. + best_number: u64, + /// Hash of the best block according to this node. + best_hash: [u8; 32], + }, + + /// An attempt has been made to open the given chain, but something wrong happened. + /// + /// This event can only happen as a result of a call to [`ChainNetwork::gossip_open`]. + GossipOpenFailed { + /// Peer concerned by the event. + peer_id: PeerId, + /// Chain of the gossip connection. + chain_id: ChainId, + /// Which kind of gossip link is concerned. + kind: GossipKind, + /// Problem that happened. + error: GossipConnectError, + }, + + /// No longer connected to the given peer for gossiping purposes. + GossipDisconnected { + /// Peer we are no longer connected to. + peer_id: PeerId, + /// Chain of the gossip connection. + chain_id: ChainId, + /// Which kind of gossip link is concerned. + kind: GossipKind, + }, + + /// A peer would like to open a gossiping link with the local node. + // TODO: document what to do + // TODO: include handshake content? + GossipInDesired { + /// Peer concerned by the event. + peer_id: PeerId, + /// Chain of the gossip connection. + chain_id: ChainId, + /// Which kind of gossip link is concerned. + kind: GossipKind, + }, + + /// A previously-emitted [`Event::GossipInDesired`] is no longer relevant as the peer has + /// stopped the opening attempt. + GossipInDesiredCancel { + /// Peer concerned by the event. + peer_id: PeerId, + /// Chain of the gossip connection. + chain_id: ChainId, + /// Which kind of gossip link is concerned. + kind: GossipKind, + }, + + /// An outgoing request has finished, either successfully or not. + RequestResult { + /// Identifier of the request that was returned by the function that started the request. + substream_id: SubstreamId, + /// Outcome of the request. + response: RequestResult, + }, + + /// Received a new block announce from a peer. + /// + /// Can only happen after a [`Event::GossipConnected`] with the given [`PeerId`] and [`ChainId`] + /// combination has happened. + BlockAnnounce { + /// Identity of the sender of the block announce. + peer_id: PeerId, + /// Index of the chain the block relates to. + chain_id: ChainId, + announce: EncodedBlockAnnounce, + }, + + /// Received a GrandPa neighbor packet from the network. This contains an update to the + /// finality state of the given peer. + /// + /// Can only happen after a [`Event::GossipConnected`] with the given [`PeerId`] and [`ChainId`] + /// combination has happened. + GrandpaNeighborPacket { + /// Identity of the sender of the message. + peer_id: PeerId, + /// Index of the chain the message relates to. + chain_id: ChainId, + /// State of the remote. + state: GrandpaState, + }, + + /// Received a GrandPa commit message from the network. + /// + /// Can only happen after a [`Event::GossipConnected`] with the given [`PeerId`] and [`ChainId`] + /// combination has happened. + GrandpaCommitMessage { + /// Identity of the sender of the message. + peer_id: PeerId, + /// Index of the chain the commit message relates to. + chain_id: ChainId, + message: EncodedGrandpaCommitMessage, + }, + + /// Error in the protocol in a connection, such as failure to decode a message. This event + /// doesn't have any consequence on the health of the connection, and is purely for diagnostic + /// purposes. + ProtocolError { + /// Peer that has caused the protocol error. + peer_id: PeerId, + /// Error that happened. + error: ProtocolError, + }, + + /// A remote has sent a request for identification information. + /// + /// You are strongly encouraged to call [`ChainNetwork::respond_identify`]. + IdentifyRequestIn { + /// Remote that has sent the request. + peer_id: PeerId, + /// Identifier of the request. Necessary to send back the answer. + substream_id: SubstreamId, + }, + + /// A remote has sent a request for blocks. + /// + /// Can only happen for chains where [`ChainConfig::allow_inbound_block_requests`] is `true`. + /// + /// You are strongly encouraged to call [`ChainNetwork::respond_blocks`]. + BlocksRequestIn { + /// Remote that has sent the request. + peer_id: PeerId, + /// Index of the chain concerned by the request. + chain_id: ChainId, + /// Information about the request. + config: protocol::BlocksRequestConfig, + /// Identifier of the request. Necessary to send back the answer. + substream_id: SubstreamId, + }, + + /// A remote is no longer interested in the response to a request. + /// + /// Calling [`ChainNetwork::respond_identify`], [`ChainNetwork::respond_blocks`], or similar + /// will now panic. + RequestInCancel { + /// Identifier of the request. + /// + /// This [`SubstreamId`] is considered dead and no longer valid. + substream_id: SubstreamId, + }, + + KademliaDiscoveryResult { + operation_id: OperationId, + result: Result>)>, DiscoveryError>, + }, + /*Transactions { + peer_id: PeerId, + transactions: EncodedTransactions, + }*/ +} + +/// See [`Event::ProtocolError`]. +#[derive(Debug, derive_more::Display)] +pub enum ProtocolError { + /// Error in an incoming substream. + #[display(fmt = "Error in an incoming substream: {_0}")] + InboundError(InboundError), + /// Error while decoding the handshake of the block announces substream. + #[display(fmt = "Error while decoding the handshake of the block announces substream: {_0}")] + BadBlockAnnouncesHandshake(protocol::BlockAnnouncesHandshakeDecodeError), + /// Error while decoding a received block announce. + #[display(fmt = "Error while decoding a received block announce: {_0}")] + BadBlockAnnounce(protocol::DecodeBlockAnnounceError), + /// Error while decoding a received Grandpa notification. + #[display(fmt = "Error while decoding a received Grandpa notification: {_0}")] + BadGrandpaNotification(protocol::DecodeGrandpaNotificationError), + /// Received an invalid identify request. + BadIdentifyRequest, + /// Error while decoding a received blocks request. + #[display(fmt = "Error while decoding a received blocks request: {_0}")] + BadBlocksRequest(protocol::DecodeBlockRequestError), +} + +/// Error potentially returned when starting a request. +#[derive(Debug, Clone, derive_more::Display)] +pub enum StartRequestError { + /// There is no valid connection to the given peer on which the request can be started. + NoConnection, +} + +/// Error potentially returned when starting a request that might be too large. +#[derive(Debug, Clone, derive_more::Display)] +pub enum StartRequestMaybeTooLargeError { + /// There is no valid connection to the given peer on which the request can be started. + NoConnection, + /// Size of the request is over maximum allowed by the protocol. + RequestTooLarge, +} + +impl From for StartRequestMaybeTooLargeError { + fn from(err: StartRequestError) -> StartRequestMaybeTooLargeError { + match err { + StartRequestError::NoConnection => StartRequestMaybeTooLargeError::NoConnection, + } + } +} + +/// Response to an outgoing request. +/// +/// See [`Event::RequestResult`̀]. +#[derive(Debug)] +pub enum RequestResult { + Blocks(Result, BlocksRequestError>), + GrandpaWarpSync(Result), + State(Result), + StorageProof(Result), + CallProof(Result), + KademliaFindNode(Result>)>, KademliaFindNodeError>), +} + +/// Error returned by [`ChainNetwork::start_blocks_request`]. +#[derive(Debug, derive_more::Display)] +pub enum BlocksRequestError { + /// Error while waiting for the response from the peer. + #[display(fmt = "{_0}")] + Request(RequestError), + /// Error while decoding the response returned by the peer. + #[display(fmt = "Response decoding error: {_0}")] + Decode(protocol::DecodeBlockResponseError), +} + +/// Error returned by [`ChainNetwork::start_storage_proof_request`]. +#[derive(Debug, derive_more::Display, Clone)] +pub enum StorageProofRequestError { + #[display(fmt = "{_0}")] + Request(RequestError), + #[display(fmt = "Response decoding error: {_0}")] + Decode(protocol::DecodeStorageCallProofResponseError), + /// The remote is incapable of answering this specific request. + RemoteCouldntAnswer, +} + +/// Error returned by [`ChainNetwork::start_call_proof_request`]. +#[derive(Debug, Clone, derive_more::Display)] +pub enum CallProofRequestError { + #[display(fmt = "{_0}")] + Request(RequestError), + #[display(fmt = "Response decoding error: {_0}")] + Decode(protocol::DecodeStorageCallProofResponseError), + /// The remote is incapable of answering this specific request. + RemoteCouldntAnswer, +} + +impl CallProofRequestError { + /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related + /// issue. + pub fn is_network_problem(&self) -> bool { + match self { + CallProofRequestError::Request(_) => true, + CallProofRequestError::Decode(_) => false, + CallProofRequestError::RemoteCouldntAnswer => true, + } + } +} + +/// Error returned by [`ChainNetwork::start_grandpa_warp_sync_request`]. +#[derive(Debug, derive_more::Display)] +pub enum GrandpaWarpSyncRequestError { + #[display(fmt = "{_0}")] + Request(RequestError), + #[display(fmt = "Response decoding error: {_0}")] + Decode(protocol::DecodeGrandpaWarpSyncResponseError), +} + +/// Error returned by [`ChainNetwork::start_state_request`]. +#[derive(Debug, derive_more::Display)] +pub enum StateRequestError { + #[display(fmt = "{_0}")] + Request(RequestError), + #[display(fmt = "Response decoding error: {_0}")] + Decode(protocol::DecodeStateResponseError), +} + +/// Error potentially returned when queueing a notification. +#[derive(Debug, derive_more::Display)] +pub enum QueueNotificationError { + /// There is no valid substream to the given peer on which the notification can be sent. + NoConnection, + /// Queue of notifications with that peer is full. + QueueFull, +} + +/// Undecoded but valid block announce. +#[derive(Clone)] +pub struct EncodedBlockAnnounce { + message: Vec, + block_number_bytes: usize, +} + +impl EncodedBlockAnnounce { + /// Returns the decoded version of the announcement. + pub fn decode(&self) -> protocol::BlockAnnounceRef { + protocol::decode_block_announce(&self.message, self.block_number_bytes).unwrap() + } +} + +impl fmt::Debug for EncodedBlockAnnounce { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} + +/// Undecoded but valid Merkle proof. +#[derive(Clone)] +pub struct EncodedMerkleProof(Vec, protocol::StorageOrCallProof); + +impl EncodedMerkleProof { + /// Returns the SCALE-encoded Merkle proof. + pub fn decode(&self) -> &[u8] { + protocol::decode_storage_or_call_proof_response(self.1, &self.0) + .unwrap() + .unwrap() + } +} + +impl fmt::Debug for EncodedMerkleProof { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} + +/// Undecoded but valid GrandPa warp sync response. +#[derive(Clone)] +pub struct EncodedGrandpaWarpSyncResponse { + message: Vec, + block_number_bytes: usize, +} + +impl EncodedGrandpaWarpSyncResponse { + /// Returns the encoded bytes of the warp sync message. + pub fn as_encoded(&self) -> &[u8] { + &self.message + } + + /// Returns the decoded version of the warp sync message. + pub fn decode(&self) -> protocol::GrandpaWarpSyncResponse { + match protocol::decode_grandpa_warp_sync_response(&self.message, self.block_number_bytes) { + Ok(msg) => msg, + _ => unreachable!(), + } + } +} + +impl fmt::Debug for EncodedGrandpaWarpSyncResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} + +/// Undecoded but valid state response. +// TODO: merge with EncodedMerkleProof? +#[derive(Clone)] +pub struct EncodedStateResponse(Vec); + +impl EncodedStateResponse { + /// Returns the Merkle proof of the state response. + pub fn decode(&self) -> &[u8] { + match protocol::decode_state_response(&self.0) { + Ok(r) => r, + Err(_) => unreachable!(), + } + } +} + +impl fmt::Debug for EncodedStateResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} + +#[derive(Debug, Copy, Clone)] +// TODO: link to some doc about how GrandPa works: what is a round, what is the set id, etc. +pub struct GrandpaState { + pub round_number: u64, + /// Set of authorities that will be used by the node to try finalize the children of the block + /// of [`GrandpaState::commit_finalized_height`]. + pub set_id: u64, + /// Height of the highest block considered final by the node. + pub commit_finalized_height: u64, +} + +/// Undecoded but valid block announce handshake. +pub struct EncodedBlockAnnounceHandshake { + handshake: Vec, + block_number_bytes: usize, +} + +impl EncodedBlockAnnounceHandshake { + /// Returns the decoded version of the handshake. + pub fn decode(&self) -> protocol::BlockAnnouncesHandshakeRef { + protocol::decode_block_announces_handshake(self.block_number_bytes, &self.handshake) + .unwrap() + } +} + +impl fmt::Debug for EncodedBlockAnnounceHandshake { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} + +/// Error that can happen when trying to open an outbound block announces notifications substream. +#[derive(Debug, Clone, derive_more::Display)] +pub enum GossipConnectError { + /// Error in the underlying protocol. + #[display(fmt = "{_0}")] + Substream(peers::NotificationsOutErr), + /// Mismatch between the genesis hash of the remote and the local genesis hash. + #[display(fmt = "Mismatch between the genesis hash of the remote and the local genesis hash")] + GenesisMismatch { + /// Hash of the genesis block of the chain according to the local node. + local_genesis: [u8; 32], + /// Hash of the genesis block of the chain according to the remote node. + remote_genesis: [u8; 32], + }, +} + +/// Undecoded but valid GrandPa commit message. +#[derive(Clone)] +pub struct EncodedGrandpaCommitMessage { + message: Vec, + block_number_bytes: usize, +} + +impl EncodedGrandpaCommitMessage { + /// Returns the encoded bytes of the commit message. + pub fn into_encoded(mut self) -> Vec { + // Skip the first byte because `self.message` is a `GrandpaNotificationRef`. + self.message.remove(0); + self.message + } + + /// Returns the encoded bytes of the commit message. + pub fn as_encoded(&self) -> &[u8] { + // Skip the first byte because `self.message` is a `GrandpaNotificationRef`. + &self.message[1..] + } + + /// Returns the decoded version of the commit message. + pub fn decode(&self) -> protocol::CommitMessageRef { + match protocol::decode_grandpa_notification(&self.message, self.block_number_bytes) { + Ok(protocol::GrandpaNotificationRef::Commit(msg)) => msg, + _ => unreachable!(), + } + } +} + +impl fmt::Debug for EncodedGrandpaCommitMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.decode(), f) + } +} diff --git a/lib/src/network/service2/addresses.rs b/lib/src/network/service2/addresses.rs new file mode 100644 index 0000000000..04416b0d51 --- /dev/null +++ b/lib/src/network/service2/addresses.rs @@ -0,0 +1,223 @@ +// Smoldot +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::multiaddr; + +use alloc::vec::Vec; +use core::fmt; + +/// List of potential addresses of a single peer, reachable or not. +pub(super) struct Addresses { + list: Vec<(multiaddr::Multiaddr, State)>, +} + +impl Addresses { + /// Creates a new empty list of addresses with the given capacity pre-allocated. + pub(super) fn with_capacity(cap: usize) -> Self { + Addresses { + list: Vec::with_capacity(cap), + } + } + + /// Returns the number of addresses. + pub(super) fn len(&self) -> usize { + self.list.len() + } + + /// Returns the list of addresses stored in this list. + pub(super) fn iter(&'_ self) -> impl Iterator + '_ { + self.list.iter().map(|(a, _)| a) + } + + /// Returns the list of addresses stored in this list that are marked as connected. + pub(super) fn iter_connected(&'_ self) -> impl Iterator + '_ { + self.list + .iter() + .filter(|(_, s)| matches!(s, State::Connected)) + .map(|(a, _)| a) + } + + /// Inserts a new address in the list in the "not tried" state. + pub(super) fn insert_discovered(&mut self, addr: multiaddr::Multiaddr) { + if self.list.iter().any(|(a, _)| *a == addr) { + return; + } + + self.list.push((addr, State::NotTried)); + } + + /// If the given address is in the list, removes it. + /// + /// Returns whether the value was present. + pub(super) fn remove(&mut self, addr: &multiaddr::Multiaddr) -> bool { + if let Some(index) = self.list.iter().position(|(a, _)| a == addr) { + self.list.remove(index); + true + } else { + false + } + } + + /// Returns `true` if the list of addresses is empty. + /// + /// > **Note**: This is not the same as the list of addresses containing only disconnected + /// > addresses. + pub(super) fn is_empty(&self) -> bool { + self.list.is_empty() + } + + /// If the given address is in the list, sets its state to "connected". + /// + /// # Panic + /// + /// Panics if the state of this address was already connected. + /// + pub(super) fn set_connected(&mut self, addr: &multiaddr::Multiaddr) { + if let Some(index) = self.list.iter().position(|(a, _)| a == addr) { + assert!(!matches!(self.list[index].1, State::Connected)); + self.list[index].1 = State::Connected; + } + } + + /// If the given address is in the list, sets its state to "pending". + /// + /// # Panic + /// + /// Panics if the state of this address was already pending. + /// + pub(super) fn set_pending(&mut self, addr: &multiaddr::Multiaddr) { + if let Some(index) = self.list.iter().position(|(a, _)| a == addr) { + assert!(!matches!(self.list[index].1, State::PendingConnect)); + self.list[index].1 = State::PendingConnect; + } + } + + /// If the given address is in the list, sets its state to "disconnected". + /// + /// # Panic + /// + /// Panics if the state of this address was not connected or pending. + /// + pub(super) fn set_disconnected(&mut self, addr: &multiaddr::Multiaddr) { + if let Some(index) = self.list.iter().position(|(a, _)| a == addr) { + match &mut self.list[index].1 { + st @ State::Connected => *st = State::DisconnectedReachable, + st @ State::PendingConnect => *st = State::NotTried, + _ => panic!(), + } + } + } + + /// Changes the order of the addresses, in order to prevent the same address from being + /// picked again. + // TODO: this system is a bit naive? + pub(super) fn shuffle(&mut self) { + if self.list.is_empty() { + return; + } + + let item = self.list.remove(0); + self.list.push(item); + } + + /// Picks an address from the list whose state is "not connected", and switches it to + /// "pending". Returns `None` if no such address is available. + pub(super) fn addr_to_pending(&mut self) -> Option<&multiaddr::Multiaddr> { + let index = self + .list + .iter() + .position(|(_, s)| matches!(s, State::DisconnectedReachable | State::NotTried))?; + self.list[index].1 = State::PendingConnect; + Some(&self.list[index].0) + } +} + +impl fmt::Debug for Addresses { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_list().entries(self.list.iter()).finish() + } +} + +#[derive(Debug)] +enum State { + /// Currently connected to this address. + Connected, + /// Currently trying to connect to this address. + PendingConnect, + /// Not currently connected to this address, but address was reached in the past. + DisconnectedReachable, + /// Address has been discovered, but its reachability hasn't been tried yet. + NotTried, +} + +#[cfg(test)] +mod tests { + use crate::libp2p::multiaddr::Multiaddr; + + #[test] + fn transition_not_tried_connected() { + let mut addresses = super::Addresses::with_capacity(0); + assert!(addresses.is_empty()); + + let addr: Multiaddr = "/ip4/1.2.3.4/tcp/5".parse().unwrap(); + addresses.insert_discovered(addr.clone()); + + addresses.set_connected(&addr); + assert!(addresses.addr_to_pending().is_none()); + + addresses.set_disconnected(&addr); + } + + #[test] + fn transition_not_tried_pending_connected() { + let mut addresses = super::Addresses::with_capacity(0); + assert!(addresses.is_empty()); + + let addr: Multiaddr = "/ip4/1.2.3.4/tcp/5".parse().unwrap(); + addresses.insert_discovered(addr.clone()); + + assert_eq!(addresses.addr_to_pending(), Some(&addr)); + assert!(addresses.addr_to_pending().is_none()); + + addresses.set_connected(&addr); + + addresses.set_disconnected(&addr); + } + + #[test] + #[should_panic] + fn transition_not_tried_disconnected() { + let mut addresses = super::Addresses::with_capacity(0); + + let addr: Multiaddr = "/ip4/1.2.3.4/tcp/5".parse().unwrap(); + addresses.insert_discovered(addr.clone()); + + addresses.set_disconnected(&addr); + } + + #[test] + #[should_panic] + fn transition_connected_twice() { + let mut addresses = super::Addresses::with_capacity(0); + + let addr: Multiaddr = "/ip4/1.2.3.4/tcp/5".parse().unwrap(); + addresses.insert_discovered(addr.clone()); + + addresses.set_connected(&addr); + addresses.set_connected(&addr); + } +} diff --git a/lib/src/network/service2/requests_responses.rs b/lib/src/network/service2/requests_responses.rs new file mode 100644 index 0000000000..19f90eebba --- /dev/null +++ b/lib/src/network/service2/requests_responses.rs @@ -0,0 +1,194 @@ +// Smoldot +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::header; +use crate::libp2p::{ + multiaddr, peer_id, + peers::{self, ConfigRequestResponse}, + PeerId, +}; +use crate::network::{kademlia, protocol}; + +use super::*; + +use alloc::{format, vec::Vec}; +use core::{ + fmt, + hash::Hash, + iter, + num::NonZeroUsize, + ops::{Add, Sub}, + time::Duration, +}; + +pub use crate::libp2p::{ + collection::ReadWrite, + peers::{ + ConnectionId, ConnectionToCoordinator, CoordinatorToConnection, InRequestId, InboundError, + MultiStreamConnectionTask, MultiStreamHandshakeKind, OutRequestId, + SingleStreamConnectionTask, SingleStreamHandshakeKind, StartRequestError, + }, +}; + +impl ChainNetwork +where + TNow: Clone + Add + Sub + Ord, +{ + /// Inserts the given list of nodes into the list of known nodes held within the state machine. + pub fn discover( + &mut self, + now: &TNow, + chain_index: usize, + peer_id: PeerId, + discovered_addrs: impl IntoIterator, + ) { + let kbuckets = &mut self.chains[chain_index].kbuckets; + + let mut discovered_addrs = discovered_addrs.into_iter().peekable(); + + // Check whether there is any address in the iterator at all before inserting the + // node in the buckets. + if discovered_addrs.peek().is_none() { + return; + } + + let kbuckets_peer = match kbuckets.entry(&peer_id) { + kademlia::kbuckets::Entry::LocalKey => return, // TODO: return some diagnostic? + kademlia::kbuckets::Entry::Vacant(entry) => { + match entry.insert((), now, kademlia::kbuckets::PeerState::Disconnected) { + Err(kademlia::kbuckets::InsertError::Full) => return, // TODO: return some diagnostic? + Ok(kademlia::kbuckets::InsertResult { removed_entry, .. }) => { + // `removed_entry` is the peer that was removed the k-buckets as the + // result of the new insertion. Purge it from `self.kbuckets_peers` + // if necessary. + if let Some((removed_peer_id, _)) = removed_entry { + match self.kbuckets_peers.entry(removed_peer_id) { + hashbrown::hash_map::Entry::Occupied(e) + if e.get().num_references.get() == 1 => + { + e.remove(); + } + hashbrown::hash_map::Entry::Occupied(e) => { + let num_refs = &mut e.into_mut().num_references; + *num_refs = NonZeroUsize::new(num_refs.get() - 1).unwrap(); + } + hashbrown::hash_map::Entry::Vacant(_) => unreachable!(), + } + } + + match self.kbuckets_peers.entry(peer_id) { + hashbrown::hash_map::Entry::Occupied(e) => { + let e = e.into_mut(); + e.num_references = e.num_references.checked_add(1).unwrap(); + e + } + hashbrown::hash_map::Entry::Vacant(e) => { + // The peer was not in the k-buckets, but it is possible that + // we already have existing connections to it. + let mut addresses = addresses::Addresses::with_capacity( + self.max_addresses_per_peer.get(), + ); + + for connection_id in + self.inner.established_peer_connections(e.key()) + { + let state = self.inner.connection_state(connection_id); + debug_assert!(state.established); + // Because we mark addresses as disconnected when the + // shutdown process starts, we ignore shutting down + // connections. + if state.shutting_down { + continue; + } + if state.outbound { + addresses + .insert_discovered(self.inner[connection_id].clone()); + addresses.set_connected(&self.inner[connection_id]); + } + } + + for connection_id in + self.inner.handshaking_peer_connections(e.key()) + { + let state = self.inner.connection_state(connection_id); + debug_assert!(!state.established); + debug_assert!(state.outbound); + // Because we mark addresses as disconnected when the + // shutdown process starts, we ignore shutting down + // connections. + if state.shutting_down { + continue; + } + addresses.insert_discovered(self.inner[connection_id].clone()); + addresses.set_pending(&self.inner[connection_id]); + } + + // TODO: O(n) + for (_, (p, addr, _)) in &self.pending_ids { + if p == e.key() { + addresses.insert_discovered(addr.clone()); + addresses.set_pending(addr); + } + } + + e.insert(KBucketsPeer { + num_references: NonZeroUsize::new(1).unwrap(), + addresses, + }) + } + } + } + } + } + kademlia::kbuckets::Entry::Occupied(_) => { + self.kbuckets_peers.get_mut(&peer_id).unwrap() + } + }; + + for to_insert in discovered_addrs { + if kbuckets_peer.addresses.len() >= self.max_addresses_per_peer.get() { + continue; + } + + kbuckets_peer.addresses.insert_discovered(to_insert); + } + + // List of addresses must never be empty. + debug_assert!(!kbuckets_peer.addresses.is_empty()); + } +} + +/// Error during [`ChainNetwork::start_kademlia_discovery_round`]. +#[derive(Debug, derive_more::Display)] +pub enum DiscoveryError { + /// Not currently connected to any other node. + NoPeer, + /// Error during the request. + #[display(fmt = "{_0}")] + FindNode(KademliaFindNodeError), +} + +/// Error during [`ChainNetwork::start_kademlia_find_node`]. +#[derive(Debug, derive_more::Display)] +pub enum KademliaFindNodeError { + /// Error during the request. + #[display(fmt = "{_0}")] + RequestFailed(peers::RequestError), + /// Failed to decode the response. + #[display(fmt = "Response decoding error: {_0}")] + DecodeError(protocol::DecodeFindNodeResponseError), +}