Skip to content

Commit

Permalink
chore: use ethereum-forks types directly (#12702)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Nov 20, 2024
1 parent 868f3ac commit f12d7a9
Show file tree
Hide file tree
Showing 31 changed files with 234 additions and 277 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ reth-network-p2p.workspace = true
reth-discv4.workspace = true
reth-discv5.workspace = true
reth-dns-discovery.workspace = true
reth-ethereum-forks.workspace = true
reth-eth-wire.workspace = true
reth-eth-wire-types.workspace = true
reth-ecies.workspace = true
Expand Down
9 changes: 4 additions & 5 deletions crates/net/network/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
//! Builder support for configuring the entire setup.
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;

use crate::{
eth_requests::EthRequestHandler,
transactions::{TransactionsManager, TransactionsManagerConfig},
NetworkHandle, NetworkManager,
};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;

/// We set the max channel capacity of the `EthRequestHandler` to 256
/// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node.
Expand Down
3 changes: 1 addition & 2 deletions crates/net/network/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Network cache support
use core::hash::BuildHasher;
use std::{fmt, hash::Hash};

use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use schnellru::{ByLength, Limiter, RandomState, Unlimited};
use std::{fmt, hash::Hash};

/// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity.
///
Expand Down
18 changes: 8 additions & 10 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
//! Network config support
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
};
use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_primitives::{ForkFilter, Head};
use reth_storage_api::{noop::NoopBlockReader, BlockNumReader, BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::SECP256K1;

use crate::{
error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport},
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

// re-export for convenience
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols};
Expand Down
24 changes: 11 additions & 13 deletions crates/net/network/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
//! Discovery support for the network.
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
};

use enr::Enr;
use futures::StreamExt;
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
use reth_discv5::{DiscoveredPeer, Discv5};
use reth_dns_discovery::{
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
};
use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::PeerAddr;
use reth_primitives::{EnrForkIdEntry, ForkId};
use secp256k1::SecretKey;
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tracing::trace;

use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
};

/// Default max capacity for cache of discovered peers.
///
/// Default is 10 000 peers.
Expand Down
6 changes: 2 additions & 4 deletions crates/net/network/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
//! Possible errors when interacting with the network.
use std::{fmt, io, io::ErrorKind, net::SocketAddr};

use crate::session::PendingSessionHandshakeError;
use reth_dns_discovery::resolver::ResolveError;
use reth_ecies::ECIESErrorImpl;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_network_types::BackoffKind;

use crate::session::PendingSessionHandshakeError;
use std::{fmt, io, io::ErrorKind, net::SocketAddr};

/// Service kind.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
Expand Down
20 changes: 9 additions & 11 deletions crates/net/network/src/eth_requests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! Blocks/Headers management for the p2p network.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics,
};

use alloy_consensus::Header;
use alloy_eips::BlockHashOrNumber;
use alloy_rlp::Encodable;
Expand All @@ -20,14 +17,15 @@ use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_primitives::BlockBody;
use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream;

use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics,
};

// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>

/// Maximum number of receipts to serve.
Expand Down
12 changes: 5 additions & 7 deletions crates/net/network/src/fetch/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
//! A client implementation that can interact with the network and download data.
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
use alloy_primitives::B256;
use futures::{future, future::Either};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
Expand All @@ -18,10 +14,12 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};

use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Front-end API for fetching data from the network.
///
Expand Down
20 changes: 9 additions & 11 deletions crates/net/network/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@ mod client;

pub use client::FetchClient;

use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};

use crate::message::BlockRequest;
use alloy_primitives::B256;
use futures::StreamExt;
use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
Expand All @@ -24,11 +16,17 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::message::BlockRequest;

type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;

Expand Down
5 changes: 2 additions & 3 deletions crates/net/network/src/flattened_response.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use futures::Future;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures::Future;
use pin_project::pin_project;
use tokio::sync::oneshot::{error::RecvError, Receiver};

/// Flatten a [Receiver] message in order to get rid of the [RecvError] result
Expand Down
6 changes: 2 additions & 4 deletions crates/net/network/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//! This module provides an abstraction over block import in the form of the `BlockImport` trait.
use std::task::{Context, Poll};

use reth_network_peers::PeerId;

use crate::message::NewBlockMessage;
use reth_network_peers::PeerId;
use std::task::{Context, Poll};

/// Abstraction over block import.
pub trait BlockImport<B = reth_primitives::Block>: std::fmt::Debug + Send + Sync {
Expand Down
3 changes: 1 addition & 2 deletions crates/net/network/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Contains connection-oriented interfaces.
use futures::{ready, Stream};
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};

use futures::{ready, Stream};
use tokio::net::{TcpListener, TcpStream};

/// A tcp connection listener.
Expand Down
62 changes: 30 additions & 32 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@
//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
use std::{
net::SocketAddr,
path::Path,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
use crate::{
budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
config::NetworkConfig,
discovery::Discovery,
error::{NetworkError, ServiceKind},
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
transactions::NetworkTransactionEvent,
FetchClient, NetworkBuilder,
};

use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
Expand All @@ -44,31 +52,21 @@ use reth_storage_api::BlockNumReader;
use reth_tasks::shutdown::GracefulShutdown;
use reth_tokio_util::EventSender;
use secp256k1::SecretKey;
use std::{
net::SocketAddr,
path::Path,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};

use crate::{
budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
config::NetworkConfig,
discovery::Discovery,
error::{NetworkError, ServiceKind},
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
transactions::NetworkTransactionEvent,
FetchClient, NetworkBuilder,
};

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Manages the _entire_ state of the network.
///
Expand Down
9 changes: 4 additions & 5 deletions crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use std::{
sync::Arc,
task::{ready, Context, Poll},
};

use alloy_consensus::BlockHeader;
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
Expand All @@ -20,6 +15,10 @@ use reth_eth_wire::{
use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_primitives::{PooledTransactionsElement, ReceiptWithBloom};
use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;

/// Internal form of a `NewBlock` message
Expand Down
Loading

0 comments on commit f12d7a9

Please sign in to comment.