Skip to content

Commit

Permalink
Rewrite network::ChainNetwork to not use the peers module (#1200)
Browse files Browse the repository at this point in the history
* Writing new network service

* Start updating light-base/network_service

* Remove Kademlia from the service

* Don't merge this PR before fixing block requests

* Mostly update network_service w.r.t peer slots change

* Move code to use `ChainId` instead of `usize`

* Update `StartBlocksRequest`

* More updating

* More updating light-base/network_service

* Remove some useless stuff

* More updating

* More updating light-base/network_service

* Wasm node compiles again

* More work

* Doesn't panic anymore

* Update connected_unopened_gossip_desired

* Bugfix

* Misc work

* More WIP

* Rename `AddressBook` to `BasicPeeringStrategy`

* Work

* Small work

* Remove using `peers` module

* Remove old `service.rs` and `peers.rs`

* Rename `service2` to `service`

* More work

* More work

* Work

* Bugfix

* More work

* Work

* Bugfix

* Warp syncing works

* Properly gossip grandpa state

* Warnings fixes

* Fix WebRTC connections through a hack

* Change BTreeSet fields order

* Fix compilation

* State mismatch fix

* Misc work

* Misc work

* Fix compilation

* Work

* Work

* Work

* Add some basic explanations for `ChainNetwork`

* Remove now obsolete TODO

* Indicate the existing chain in case of duplicate

* Work

* Work

* Work

* Work

* Work on API

* Some docs

* Remove `now` from `Discover` message

* Fix all warnings in network_service.rs

* Work on `BasicPeeringStrategy`

* Fix warning in lib.rs

* Update full node for changes

* Update libp2p.rs documentation

* Implement rejecting in gossip requests

* Rustfmt

* Docfix

* Docfix

* Docfix again

* Tweaks

* Actually close substreams when necessary

* Rename remove_chain_peer to unassign_slot_and_remove_chain_peer

* Add ChainNetwork::opened_gossip_undesired

* Add ChainNetwork::opened_gossip_undesired_by_chain

* Use the new function instead of in slots

* Remove concept of "in slot" from basic_peering_strategy

* Log things in full node

* Fix number of in and out slots in full node
  • Loading branch information
tomaka authored Oct 27, 2023
1 parent 607a720 commit 9839de9
Show file tree
Hide file tree
Showing 29 changed files with 5,790 additions and 7,093 deletions.
31 changes: 17 additions & 14 deletions full-node/src/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ pub struct Config {
/// Stores of key to use for all block-production-related purposes.
pub keystore: Arc<keystore::Keystore>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// Access to the network, and identifier of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Receiver for events coming from the network, as returned by
/// [`network_service::NetworkService::new`].
Expand Down Expand Up @@ -333,7 +336,7 @@ impl ConsensusService {
keystore: config.keystore,
finalized_runtime: Arc::new(Mutex::new(Some(finalized_runtime))),
network_service: config.network_service.0,
network_chain_index: config.network_service.1,
network_chain_id: config.network_service.1,
to_background_rx,
blocks_notifications: Vec::with_capacity(8),
from_network_service: config.network_events_receiver,
Expand Down Expand Up @@ -630,7 +633,7 @@ struct SyncBackground {
/// Index, within the [`SyncBackground::network_service`], of the chain that this sync service
/// is syncing from. This value must be passed as parameter when starting requests on the
/// network service.
network_chain_index: usize,
network_chain_id: network_service::ChainId,

/// Stream of events coming from the [`SyncBackground::network_service`]. Used to know what
/// happens on the peer-to-peer network.
Expand Down Expand Up @@ -974,10 +977,10 @@ impl SyncBackground {

WhatHappened::NetworkEvent(network_service::Event::Connected {
peer_id,
chain_index,
chain_id,
best_block_number,
best_block_hash,
}) if chain_index == self.network_chain_index => {
}) if chain_id == self.network_chain_id => {
// Most of the time, we insert a new source in the state machine.
// However, a source of that `PeerId` might already exist but be considered as
// disconnected. If that is the case, we simply mark it as no
Expand Down Expand Up @@ -1005,8 +1008,8 @@ impl SyncBackground {
}
WhatHappened::NetworkEvent(network_service::Event::Disconnected {
peer_id,
chain_index,
}) if chain_index == self.network_chain_index => {
chain_id,
}) if chain_id == self.network_chain_id => {
// Sources that disconnect are only immediately removed from the sync state
// machine if they have no request in progress. If that is not the case, they
// are instead only marked as disconnected.
Expand All @@ -1022,11 +1025,11 @@ impl SyncBackground {
}
}
WhatHappened::NetworkEvent(network_service::Event::BlockAnnounce {
chain_index,
chain_id,
peer_id,
scale_encoded_header,
is_best,
}) if chain_index == self.network_chain_index => {
}) if chain_id == self.network_chain_id => {
let _jaeger_span = self.jaeger_service.block_announce_process_span(
&header::hash_from_scale_encoded_header(&scale_encoded_header),
);
Expand Down Expand Up @@ -1534,7 +1537,7 @@ impl SyncBackground {

let request = self.network_service.clone().blocks_request(
peer_id,
self.network_chain_index,
self.network_chain_id,
network::protocol::BlocksRequestConfig {
start: if let Some(first_block_hash) = first_block_hash {
network::protocol::BlocksRequestConfigStart::Hash(first_block_hash)
Expand Down Expand Up @@ -1839,7 +1842,7 @@ impl SyncBackground {
if is_new_best {
// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
self.network_chain_id,
self.sync.best_block_hash(),
self.sync.best_block_number(),
);
Expand Down Expand Up @@ -1885,7 +1888,7 @@ impl SyncBackground {
.clone()
.send_block_announce(
peer_id.clone(),
0,
self.network_chain_id,
scale_encoded_header.clone(),
is_new_best,
)
Expand Down Expand Up @@ -2049,7 +2052,7 @@ impl SyncBackground {

if updates_best_block {
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
self.network_chain_id,
self.sync.best_block_hash(),
self.sync.best_block_number(),
);
Expand Down
9 changes: 6 additions & 3 deletions full-node/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ pub struct Config {
/// Database to access blocks.
pub database: Arc<database_thread::DatabaseThread>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Where to bind the WebSocket server. If `None`, no TCP server is started.
pub bind_address: Option<SocketAddr>,
Expand Down
9 changes: 6 additions & 3 deletions full-node/src/json_rpc_service/requests_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ pub struct Config {
/// Database to access blocks.
pub database: Arc<database_thread::DatabaseThread>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Name of the chain, as found in the chain specification.
pub chain_name: String,
Expand Down
32 changes: 18 additions & 14 deletions full-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,16 @@ impl Client {
*self.network_known_best.lock().await
}

/// Returns the current number of peers of the client.
/// Returns the current total number of peers of the client.
// TODO: weird API
pub async fn num_peers(&self) -> u64 {
u64::try_from(self.network_service.num_peers(0).await).unwrap_or(u64::max_value())
u64::try_from(self.network_service.num_total_peers().await).unwrap_or(u64::max_value())
}

/// Returns the current number of network connections of the client.
/// Returns the current total number of network connections of the client.
// TODO: weird API
pub async fn num_network_connections(&self) -> u64 {
u64::try_from(self.network_service.num_established_connections().await)
.unwrap_or(u64::max_value())
u64::try_from(self.network_service.num_connections().await).unwrap_or(u64::max_value())
}

// TODO: not the best API
Expand Down Expand Up @@ -405,11 +406,12 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
.await
.map_err(StartError::JaegerInit)?;

let (network_service, network_events_receivers) =
let (network_service, network_service_chain_ids, network_events_receivers) =
network_service::NetworkService::new(network_service::Config {
listen_addresses: config.listen_addresses,
num_events_receivers: 2 + if relay_chain_database.is_some() { 1 } else { 0 },
chains: iter::once(network_service::ChainConfig {
log_name: chain_spec.name().to_owned(),
fork_id: chain_spec.fork_id().map(|n| n.to_owned()),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
database: database.clone(),
Expand Down Expand Up @@ -479,6 +481,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
.chain(
if let Some(relay_chains_specs) = &relay_chain_spec {
Some(network_service::ChainConfig {
log_name: relay_chains_specs.name().to_owned(),
fork_id: relay_chains_specs.fork_id().map(|n| n.to_owned()),
block_number_bytes: usize::from(relay_chains_specs.block_number_bytes()),
database: relay_chain_database.clone().unwrap(),
Expand Down Expand Up @@ -588,7 +591,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
genesis_block_hash,
network_events_receiver: network_events_receivers.next().unwrap(),
network_service: (network_service.clone(), 0),
network_service: (network_service.clone(), network_service_chain_ids[0]),
database: database.clone(),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
keystore,
Expand All @@ -615,7 +618,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
relay_chain_spec.as_ref().unwrap().block_number_bytes(),
)),
network_events_receiver: network_events_receivers.next().unwrap(),
network_service: (network_service.clone(), 1),
network_service: (network_service.clone(), network_service_chain_ids[1]),
database: relay_chain_database.clone(),
block_number_bytes: usize::from(
relay_chain_spec.as_ref().unwrap().block_number_bytes(),
Expand Down Expand Up @@ -657,7 +660,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
database,
consensus_service: consensus_service.clone(),
network_service: (network_service.clone(), 0),
network_service: (network_service.clone(), network_service_chain_ids[0]),
bind_address: config.chain.json_rpc_listen.as_ref().map(|cfg| cfg.address),
max_parallel_requests: 32,
max_json_rpc_clients: config
Expand Down Expand Up @@ -686,7 +689,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
database: relay_chain_database.clone().unwrap(),
consensus_service: relay_chain_consensus_service.clone().unwrap(),
network_service: (network_service.clone(), 1),
network_service: (network_service.clone(), network_service_chain_ids[1]),
bind_address: relay_chain_cfg
.json_rpc_listen
.as_ref()
Expand Down Expand Up @@ -722,6 +725,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
let network_known_best = Arc::new(Mutex::new(None));
(config.tasks_executor)(Box::pin({
let mut main_network_events_receiver = network_events_receivers.next().unwrap();
let network_service_chain_id = network_service_chain_ids[0];
let network_known_best = network_known_best.clone();

// TODO: shut down this task if the client stops?
Expand All @@ -732,10 +736,10 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {

match network_event {
network_service::Event::BlockAnnounce {
chain_index: 0,
chain_id,
scale_encoded_header,
..
} => match (
} if chain_id == network_service_chain_id => match (
*network_known_best,
header::decode(
&scale_encoded_header,
Expand All @@ -750,10 +754,10 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
}
},
network_service::Event::Connected {
chain_index: 0,
chain_id,
best_block_number,
..
} => match *network_known_best {
} if chain_id == network_service_chain_id => match *network_known_best {
Some(n) if n >= best_block_number => {}
_ => *network_known_best = Some(best_block_number),
},
Expand Down
Loading

0 comments on commit 9839de9

Please sign in to comment.