Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite network::ChainNetwork to not use the peers module #1200

Merged
merged 78 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
b583f09
Writing new network service
tomaka Sep 27, 2023
038cc75
Start updating light-base/network_service
tomaka Oct 18, 2023
da38ca1
Remove Kademlia from the service
tomaka Oct 19, 2023
692a9ed
Don't merge this PR before fixing block requests
tomaka Oct 19, 2023
43cee00
Mostly update network_service w.r.t peer slots change
tomaka Oct 19, 2023
20f03c0
Move code to use `ChainId` instead of `usize`
tomaka Oct 19, 2023
7ac40a6
Update `StartBlocksRequest`
tomaka Oct 19, 2023
b009dee
More updating
tomaka Oct 19, 2023
59edc9a
More updating light-base/network_service
tomaka Oct 19, 2023
e0fe24f
Remove some useless stuff
tomaka Oct 19, 2023
4a632af
More updating
tomaka Oct 19, 2023
455576c
More updating light-base/network_service
tomaka Oct 19, 2023
814fc33
Wasm node compiles again
tomaka Oct 19, 2023
f86ad5f
More work
tomaka Oct 19, 2023
d7e4630
Doesn't panic anymore
tomaka Oct 20, 2023
d405b85
Update connected_unopened_gossip_desired
tomaka Oct 20, 2023
c11f794
Bugfix
tomaka Oct 20, 2023
055a5b8
Misc work
tomaka Oct 20, 2023
1e43dd2
More WIP
tomaka Oct 20, 2023
32292f4
Rename `AddressBook` to `BasicPeeringStrategy`
tomaka Oct 20, 2023
847dd5b
Work
tomaka Oct 20, 2023
600f58c
Small work
tomaka Oct 20, 2023
5a2f91f
Remove using `peers` module
tomaka Oct 20, 2023
a1d51ae
Remove old `service.rs` and `peers.rs`
tomaka Oct 20, 2023
6d0dcad
Rename `service2` to `service`
tomaka Oct 20, 2023
5bd381a
More work
tomaka Oct 21, 2023
4fafbdb
More work
tomaka Oct 21, 2023
2dfe983
Merge branch 'main' into service-rewrite
tomaka Oct 21, 2023
3268e0e
Merge branch 'main' into service-rewrite
tomaka Oct 21, 2023
6bbe6c2
Work
tomaka Oct 21, 2023
63ba797
Bugfix
tomaka Oct 21, 2023
ea64ba9
More work
tomaka Oct 22, 2023
bed44c9
Work
tomaka Oct 22, 2023
b58f608
Bugfix
tomaka Oct 22, 2023
ee35c51
Warp syncing works
tomaka Oct 22, 2023
6757311
Properly gossip grandpa state
tomaka Oct 22, 2023
f082c31
Warnings fixes
tomaka Oct 22, 2023
6624a56
Fix WebRTC connections through a hack
tomaka Oct 22, 2023
eb0cd32
Change BTreeSet fields order
tomaka Oct 22, 2023
95db64e
Fix compilation
tomaka Oct 22, 2023
6672ae3
State mismatch fix
tomaka Oct 22, 2023
5491aa1
Misc work
tomaka Oct 22, 2023
2adedaa
Misc work
tomaka Oct 22, 2023
7702da0
Fix compilation
tomaka Oct 22, 2023
e24f77c
Work
tomaka Oct 22, 2023
4015ffa
Work
tomaka Oct 23, 2023
7c114b3
Merge branch 'main' into service-rewrite
tomaka Oct 23, 2023
5f10326
Work
tomaka Oct 23, 2023
58a3235
Add some basic explanations for `ChainNetwork`
tomaka Oct 23, 2023
8fda0a0
Remove now obsolete TODO
tomaka Oct 23, 2023
bc2e1c1
Indicate the existing chain in case of duplicate
tomaka Oct 23, 2023
732c57e
Work
tomaka Oct 23, 2023
a7b2c20
Work
tomaka Oct 23, 2023
e228b18
Work
tomaka Oct 23, 2023
59cae9d
Work
tomaka Oct 23, 2023
218465b
Work on API
tomaka Oct 24, 2023
d3c5107
Some docs
tomaka Oct 24, 2023
f0bb2ad
Remove `now` from `Discover` message
tomaka Oct 24, 2023
0b58cd6
Fix all warnings in network_service.rs
tomaka Oct 24, 2023
b0b05c4
Work on `BasicPeeringStrategy`
tomaka Oct 24, 2023
e78e2cb
Fix warning in lib.rs
tomaka Oct 24, 2023
c335ed8
Update full node for changes
tomaka Oct 25, 2023
70cf409
Update libp2p.rs documentation
tomaka Oct 25, 2023
3591807
Implement rejecting in gossip requests
tomaka Oct 25, 2023
a9792e1
Rustfmt
tomaka Oct 25, 2023
d9d6bfe
Docfix
tomaka Oct 25, 2023
d6f0004
Docfix
tomaka Oct 25, 2023
ac27595
Docfix again
tomaka Oct 25, 2023
c910a70
Tweaks
tomaka Oct 25, 2023
ddfb5ed
Actually close substreams when necessary
tomaka Oct 26, 2023
a92d439
Rename remove_chain_peer to unassign_slot_and_remove_chain_peer
tomaka Oct 26, 2023
7433007
Add ChainNetwork::opened_gossip_undesired
tomaka Oct 26, 2023
41e5c10
Add ChainNetwork::opened_gossip_undesired_by_chain
tomaka Oct 26, 2023
93930c5
Use the new function instead of in slots
tomaka Oct 26, 2023
5ade40e
Remove concept of "in slot" from basic_peering_strategy
tomaka Oct 26, 2023
450b85e
Log things in full node
tomaka Oct 26, 2023
b1d11e6
Fix number of in and out slots in full node
tomaka Oct 27, 2023
d4c6120
Merge branch 'main' into service-rewrite
tomaka Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions full-node/src/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ pub struct Config {
/// Stores of key to use for all block-production-related purposes.
pub keystore: Arc<keystore::Keystore>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// Access to the network, and identifier of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Receiver for events coming from the network, as returned by
/// [`network_service::NetworkService::new`].
Expand Down Expand Up @@ -333,7 +336,7 @@ impl ConsensusService {
keystore: config.keystore,
finalized_runtime: Arc::new(Mutex::new(Some(finalized_runtime))),
network_service: config.network_service.0,
network_chain_index: config.network_service.1,
network_chain_id: config.network_service.1,
to_background_rx,
blocks_notifications: Vec::with_capacity(8),
from_network_service: config.network_events_receiver,
Expand Down Expand Up @@ -630,7 +633,7 @@ struct SyncBackground {
/// Index, within the [`SyncBackground::network_service`], of the chain that this sync service
/// is syncing from. This value must be passed as parameter when starting requests on the
/// network service.
network_chain_index: usize,
network_chain_id: network_service::ChainId,

/// Stream of events coming from the [`SyncBackground::network_service`]. Used to know what
/// happens on the peer-to-peer network.
Expand Down Expand Up @@ -974,10 +977,10 @@ impl SyncBackground {

WhatHappened::NetworkEvent(network_service::Event::Connected {
peer_id,
chain_index,
chain_id,
best_block_number,
best_block_hash,
}) if chain_index == self.network_chain_index => {
}) if chain_id == self.network_chain_id => {
// Most of the time, we insert a new source in the state machine.
// However, a source of that `PeerId` might already exist but be considered as
// disconnected. If that is the case, we simply mark it as no
Expand Down Expand Up @@ -1005,8 +1008,8 @@ impl SyncBackground {
}
WhatHappened::NetworkEvent(network_service::Event::Disconnected {
peer_id,
chain_index,
}) if chain_index == self.network_chain_index => {
chain_id,
}) if chain_id == self.network_chain_id => {
// Sources that disconnect are only immediately removed from the sync state
// machine if they have no request in progress. If that is not the case, they
// are instead only marked as disconnected.
Expand All @@ -1022,11 +1025,11 @@ impl SyncBackground {
}
}
WhatHappened::NetworkEvent(network_service::Event::BlockAnnounce {
chain_index,
chain_id,
peer_id,
scale_encoded_header,
is_best,
}) if chain_index == self.network_chain_index => {
}) if chain_id == self.network_chain_id => {
let _jaeger_span = self.jaeger_service.block_announce_process_span(
&header::hash_from_scale_encoded_header(&scale_encoded_header),
);
Expand Down Expand Up @@ -1534,7 +1537,7 @@ impl SyncBackground {

let request = self.network_service.clone().blocks_request(
peer_id,
self.network_chain_index,
self.network_chain_id,
network::protocol::BlocksRequestConfig {
start: if let Some(first_block_hash) = first_block_hash {
network::protocol::BlocksRequestConfigStart::Hash(first_block_hash)
Expand Down Expand Up @@ -1839,7 +1842,7 @@ impl SyncBackground {
if is_new_best {
// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
self.network_chain_id,
self.sync.best_block_hash(),
self.sync.best_block_number(),
);
Expand Down Expand Up @@ -1885,7 +1888,7 @@ impl SyncBackground {
.clone()
.send_block_announce(
peer_id.clone(),
0,
self.network_chain_id,
scale_encoded_header.clone(),
is_new_best,
)
Expand Down Expand Up @@ -2049,7 +2052,7 @@ impl SyncBackground {

if updates_best_block {
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
self.network_chain_id,
self.sync.best_block_hash(),
self.sync.best_block_number(),
);
Expand Down
9 changes: 6 additions & 3 deletions full-node/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ pub struct Config {
/// Database to access blocks.
pub database: Arc<database_thread::DatabaseThread>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Where to bind the WebSocket server. If `None`, no TCP server is started.
pub bind_address: Option<SocketAddr>,
Expand Down
9 changes: 6 additions & 3 deletions full-node/src/json_rpc_service/requests_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ pub struct Config {
/// Database to access blocks.
pub database: Arc<database_thread::DatabaseThread>,

/// Access to the network, and index of the chain to sync from the point of view of the
/// network service.
pub network_service: (Arc<network_service::NetworkService>, usize),
/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService>,
network_service::ChainId,
),

/// Name of the chain, as found in the chain specification.
pub chain_name: String,
Expand Down
32 changes: 18 additions & 14 deletions full-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,16 @@ impl Client {
*self.network_known_best.lock().await
}

/// Returns the current number of peers of the client.
/// Returns the current total number of peers of the client.
// TODO: weird API
pub async fn num_peers(&self) -> u64 {
u64::try_from(self.network_service.num_peers(0).await).unwrap_or(u64::max_value())
u64::try_from(self.network_service.num_total_peers().await).unwrap_or(u64::max_value())
}

/// Returns the current number of network connections of the client.
/// Returns the current total number of network connections of the client.
// TODO: weird API
pub async fn num_network_connections(&self) -> u64 {
u64::try_from(self.network_service.num_established_connections().await)
.unwrap_or(u64::max_value())
u64::try_from(self.network_service.num_connections().await).unwrap_or(u64::max_value())
}

// TODO: not the best API
Expand Down Expand Up @@ -405,11 +406,12 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
.await
.map_err(StartError::JaegerInit)?;

let (network_service, network_events_receivers) =
let (network_service, network_service_chain_ids, network_events_receivers) =
network_service::NetworkService::new(network_service::Config {
listen_addresses: config.listen_addresses,
num_events_receivers: 2 + if relay_chain_database.is_some() { 1 } else { 0 },
chains: iter::once(network_service::ChainConfig {
log_name: chain_spec.name().to_owned(),
fork_id: chain_spec.fork_id().map(|n| n.to_owned()),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
database: database.clone(),
Expand Down Expand Up @@ -479,6 +481,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
.chain(
if let Some(relay_chains_specs) = &relay_chain_spec {
Some(network_service::ChainConfig {
log_name: relay_chains_specs.name().to_owned(),
fork_id: relay_chains_specs.fork_id().map(|n| n.to_owned()),
block_number_bytes: usize::from(relay_chains_specs.block_number_bytes()),
database: relay_chain_database.clone().unwrap(),
Expand Down Expand Up @@ -588,7 +591,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
genesis_block_hash,
network_events_receiver: network_events_receivers.next().unwrap(),
network_service: (network_service.clone(), 0),
network_service: (network_service.clone(), network_service_chain_ids[0]),
database: database.clone(),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
keystore,
Expand All @@ -615,7 +618,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
relay_chain_spec.as_ref().unwrap().block_number_bytes(),
)),
network_events_receiver: network_events_receivers.next().unwrap(),
network_service: (network_service.clone(), 1),
network_service: (network_service.clone(), network_service_chain_ids[1]),
database: relay_chain_database.clone(),
block_number_bytes: usize::from(
relay_chain_spec.as_ref().unwrap().block_number_bytes(),
Expand Down Expand Up @@ -657,7 +660,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
database,
consensus_service: consensus_service.clone(),
network_service: (network_service.clone(), 0),
network_service: (network_service.clone(), network_service_chain_ids[0]),
bind_address: config.chain.json_rpc_listen.as_ref().map(|cfg| cfg.address),
max_parallel_requests: 32,
max_json_rpc_clients: config
Expand Down Expand Up @@ -686,7 +689,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
log_callback: config.log_callback.clone(),
database: relay_chain_database.clone().unwrap(),
consensus_service: relay_chain_consensus_service.clone().unwrap(),
network_service: (network_service.clone(), 1),
network_service: (network_service.clone(), network_service_chain_ids[1]),
bind_address: relay_chain_cfg
.json_rpc_listen
.as_ref()
Expand Down Expand Up @@ -722,6 +725,7 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
let network_known_best = Arc::new(Mutex::new(None));
(config.tasks_executor)(Box::pin({
let mut main_network_events_receiver = network_events_receivers.next().unwrap();
let network_service_chain_id = network_service_chain_ids[0];
let network_known_best = network_known_best.clone();

// TODO: shut down this task if the client stops?
Expand All @@ -732,10 +736,10 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {

match network_event {
network_service::Event::BlockAnnounce {
chain_index: 0,
chain_id,
scale_encoded_header,
..
} => match (
} if chain_id == network_service_chain_id => match (
*network_known_best,
header::decode(
&scale_encoded_header,
Expand All @@ -750,10 +754,10 @@ pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
}
},
network_service::Event::Connected {
chain_index: 0,
chain_id,
best_block_number,
..
} => match *network_known_best {
} if chain_id == network_service_chain_id => match *network_known_best {
Some(n) if n >= best_block_number => {}
_ => *network_known_best = Some(best_block_number),
},
Expand Down
Loading