Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wire network primitives to remaining components #13143

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ mod tests {
async fn can_write_and_read_cleartext() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down Expand Up @@ -572,7 +572,7 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down Expand Up @@ -614,7 +614,7 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down
4 changes: 1 addition & 3 deletions crates/net/network/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,13 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
let request_handler = EthRequestHandler::new(client, peers, rx);
NetworkBuilder { network, request_handler, transactions }
}
}

impl<Tx, Eth> NetworkBuilder<Tx, Eth> {
/// Creates a new [`TransactionsManager`] and wires it to the network.
pub fn transactions<Pool: TransactionPool>(
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
) -> NetworkBuilder<TransactionsManager<Pool>, Eth> {
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
let Self { mut network, request_handler, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
network.set_transactions(tx);
Expand Down
12 changes: 5 additions & 7 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,17 @@ where
}
}

impl<C> NetworkConfig<C>
impl<C, N> NetworkConfig<C, N>
where
C: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
> + HeaderProvider
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt, Header = N::BlockHeader>
+ HeaderProvider
+ Clone
+ Unpin
+ 'static,
{
/// Starts the networking stack given a [`NetworkConfig`] and returns a handle to the network.
pub async fn start_network(self) -> Result<NetworkHandle, NetworkError> {
pub async fn start_network(self) -> Result<NetworkHandle<N>, NetworkError> {
let client = self.client.clone();
let (handle, network, _txpool, eth) = NetworkManager::builder::<C>(self)
.await?
Expand Down
10 changes: 6 additions & 4 deletions crates/net/network/src/eth_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
}
}

impl<C> EthRequestHandler<C>
impl<C, N> EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BlockReader + HeaderProvider + ReceiptProvider<Receipt = reth_primitives::Receipt>,
{
/// Returns the list of requested headers
Expand Down Expand Up @@ -222,10 +223,11 @@ where
/// An endless future.
///
/// This should be spawned or used as part of `tokio::select!`.
impl<C> Future for EthRequestHandler<C>
impl<C, N> Future for EthRequestHandler<C, N>
where
C: BlockReader<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>
+ HeaderProvider<Header = reth_primitives::Header>
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt>
+ HeaderProvider<Header = N::BlockHeader>
+ Unpin,
{
type Output = ();
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_poll_fetcher() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());

poll_fn(move |cx| {
assert!(fetcher.poll(cx).is_pending());
Expand All @@ -497,7 +498,8 @@ mod tests {
#[tokio::test]
async fn test_peer_rotation() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = B512::random();
let peer2 = B512::random();
Expand All @@ -520,7 +522,8 @@ mod tests {
#[tokio::test]
async fn test_peer_prioritization() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = B512::random();
let peer2 = B512::random();
Expand All @@ -545,7 +548,8 @@ mod tests {
#[tokio::test]
async fn test_on_block_headers_response() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_id = B512::random();

assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
Expand Down Expand Up @@ -575,7 +579,8 @@ mod tests {
#[tokio::test]
async fn test_header_response_outcome() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_id = B512::random();

let request_pair = || {
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
//! // The key that's used for encrypting sessions and to identify our node.
//! let local_key = rng_secret_key();
//!
//! let config = NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client);
//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
//! .boot_nodes(mainnet_nodes())
//! .build(client);
//!
//! // create the network instance
//! let network = NetworkManager::<EthNetworkPrimitives>::new(config).await.unwrap();
//! let network = NetworkManager::new(config).await.unwrap();
//!
//! // keep a handle to the network and spawn it
//! let handle = network.handle().clone();
Expand All @@ -73,7 +75,9 @@
//! ### Configure all components of the Network with the [`NetworkBuilder`]
//!
//! ```
//! use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
//! use reth_network::{
//! config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
//! };
//! use reth_network_peers::mainnet_nodes;
//! use reth_provider::test_utils::NoopProvider;
//! use reth_transaction_pool::TransactionPool;
Expand All @@ -84,8 +88,9 @@
//! // The key that's used for encrypting sessions and to identify our node.
//! let local_key = rng_secret_key();
//!
//! let config =
//! NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
//! .boot_nodes(mainnet_nodes())
//! .build(client.clone());
//! let transactions_manager_config = config.transactions_manager_config.clone();
//!
//! // create the network instance
Expand Down
9 changes: 6 additions & 3 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// components of the network
///
/// ```
/// use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
/// use reth_network::{
/// config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
/// };
/// use reth_network_peers::mainnet_nodes;
/// use reth_provider::test_utils::NoopProvider;
/// use reth_transaction_pool::TransactionPool;
Expand All @@ -303,8 +305,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// // The key that's used for encrypting sessions and to identify our node.
/// let local_key = rng_secret_key();
///
/// let config =
/// NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
/// let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
/// .boot_nodes(mainnet_nodes())
/// .build(client.clone());
/// let transactions_manager_config = config.transactions_manager_config.clone();
///
/// // create the network instance
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
}
}

impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
impl<N: NetworkPrimitives> NetworkEventListenerProvider<PeerRequest<N>> for NetworkHandle<N> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<N>>> {
self.inner.event_sender.new_listener()
}

Expand Down
9 changes: 5 additions & 4 deletions crates/net/network/src/session/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ impl<N: NetworkPrimitives> Sink<EthMessage<N>> for EthRlpxConnection<N> {
mod tests {
use super::*;

const fn assert_eth_stream<St>()
const fn assert_eth_stream<N, St>()
where
St: Stream<Item = Result<EthMessage, EthStreamError>> + Sink<EthMessage>,
N: NetworkPrimitives,
St: Stream<Item = Result<EthMessage<N>, EthStreamError>> + Sink<EthMessage<N>>,
{
}

#[test]
const fn test_eth_stream_variants() {
assert_eth_stream::<EthSatelliteConnection>();
assert_eth_stream::<EthRlpxConnection>();
assert_eth_stream::<EthNetworkPrimitives, EthSatelliteConnection<EthNetworkPrimitives>>();
assert_eth_stream::<EthNetworkPrimitives, EthRlpxConnection<EthNetworkPrimitives>>();
}
}
4 changes: 2 additions & 2 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ mod tests {

use alloy_consensus::Header;
use alloy_primitives::B256;
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion};
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
use reth_network_api::PeerRequestSender;
use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
use reth_network_peers::PeerId;
Expand All @@ -581,7 +581,7 @@ mod tests {
};

/// Returns a testing instance of the [`NetworkState`].
fn state() -> NetworkState {
fn state() -> NetworkState<EthNetworkPrimitives> {
let peers = PeersManager::default();
let handle = peers.handle();
NetworkState {
Expand Down
22 changes: 12 additions & 10 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::{
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_chainspec::{Hardforks, MAINNET};
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_eth_wire::{
protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
Expand Down Expand Up @@ -140,7 +142,7 @@ where
}

/// Returns all handles to the networks
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle> + '_ {
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
self.peers.iter().map(|p| p.handle())
}

Expand Down Expand Up @@ -346,11 +348,11 @@ impl<C, Pool> TestnetHandle<C, Pool> {
#[derive(Debug)]
pub struct Peer<C, Pool = TestPool> {
#[pin]
network: NetworkManager,
network: NetworkManager<EthNetworkPrimitives>,
#[pin]
request_handler: Option<EthRequestHandler<C>>,
request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
#[pin]
transactions_manager: Option<TransactionsManager<Pool>>,
transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
pool: Option<Pool>,
client: C,
secret_key: SecretKey,
Expand Down Expand Up @@ -393,12 +395,12 @@ where
}

/// Returns mutable access to the network.
pub fn network_mut(&mut self) -> &mut NetworkManager {
pub fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
&mut self.network
}

/// Returns the [`NetworkHandle`] of this peer.
pub fn handle(&self) -> NetworkHandle {
pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
self.network.handle().clone()
}

Expand Down Expand Up @@ -506,8 +508,8 @@ pub struct PeerConfig<C = NoopProvider> {
/// A handle to a peer in the [`Testnet`].
#[derive(Debug)]
pub struct PeerHandle<Pool> {
network: NetworkHandle,
transactions: Option<TransactionsHandle>,
network: NetworkHandle<EthNetworkPrimitives>,
transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
pool: Option<Pool>,
}

Expand Down Expand Up @@ -545,7 +547,7 @@ impl<Pool> PeerHandle<Pool> {
}

/// Returns the [`NetworkHandle`] of this peer.
pub const fn network(&self) -> &NetworkHandle {
pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
&self.network
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
metrics: TransactionsManagerMetrics,
}

impl<Pool: TransactionPool> TransactionsManager<Pool> {
impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Sets up a new instance.
///
/// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
pub fn new(
network: NetworkHandle,
network: NetworkHandle<N>,
pool: Pool,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent>,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
transactions_manager_config: TransactionsManagerConfig,
) -> Self {
let network_events = network.event_listener();
Expand Down Expand Up @@ -332,9 +332,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
metrics,
}
}
}

impl<Pool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle<N> {
TransactionsHandle { manager_tx: self.command_tx.clone() }
Expand Down Expand Up @@ -1928,7 +1926,9 @@ mod tests {
use tests::fetcher::TxFetchMetadata;
use tracing::error;

async fn new_tx_manager() -> (TransactionsManager<TestPool>, NetworkManager) {
async fn new_tx_manager(
) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
{
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();

Expand Down Expand Up @@ -1959,7 +1959,7 @@ mod tests {
pub(super) fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (PeerMetadata, mpsc::Receiver<PeerRequest>) {
) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);

(
Expand Down Expand Up @@ -1991,7 +1991,7 @@ mod tests {

let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
Expand Down
Loading
Loading