From 2cc26466b74fcf7835cb33a9ab48a2798698d912 Mon Sep 17 00:00:00 2001 From: Qinxuan Chen Date: Mon, 10 Jul 2023 05:24:23 +0800 Subject: [PATCH] replace lru with schnellru (#14539) --- Cargo.lock | 8 ++--- client/executor/Cargo.toml | 2 +- client/executor/src/wasm_runtime.rs | 17 +++++------ client/network-gossip/Cargo.toml | 2 +- client/network-gossip/src/state_machine.rs | 29 +++++++++---------- client/network/sync/Cargo.toml | 2 +- .../network/sync/src/block_request_handler.rs | 16 +++++----- client/network/sync/src/engine.rs | 17 +++++------ .../network/sync/src/state_request_handler.rs | 16 +++++----- primitives/blockchain/Cargo.toml | 2 +- primitives/blockchain/src/header_metadata.rs | 18 +++++------- 11 files changed, 58 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2a20ece4d0f7..76f3459b16603 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9502,7 +9502,6 @@ dependencies = [ "assert_matches", "criterion", "env_logger 0.9.3", - "lru", "num_cpus", "parity-scale-codec", "parking_lot 0.12.1", @@ -9512,6 +9511,7 @@ dependencies = [ "sc-executor-wasmtime", "sc-runtime-test", "sc-tracing", + "schnellru", "sp-api", "sp-core", "sp-externalities", @@ -9699,10 +9699,10 @@ dependencies = [ "futures-timer", "libp2p", "log", - "lru", "quickcheck", "sc-network", "sc-network-common", + "schnellru", "sp-runtime", "substrate-prometheus-endpoint", "substrate-test-runtime-client", @@ -9759,7 +9759,6 @@ dependencies = [ "futures-timer", "libp2p", "log", - "lru", "mockall", "parity-scale-codec", "prost", @@ -9771,6 +9770,7 @@ dependencies = [ "sc-network", "sc-network-common", "sc-utils", + "schnellru", "smallvec", "sp-arithmetic", "sp-blockchain", @@ -10990,9 +10990,9 @@ version = "4.0.0-dev" dependencies = [ "futures", "log", - "lru", "parity-scale-codec", "parking_lot 0.12.1", + "schnellru", "sp-api", "sp-consensus", "sp-database", diff --git a/client/executor/Cargo.toml b/client/executor/Cargo.toml index 790dd5a9846a2..449fd22e3ffc1 100644 --- a/client/executor/Cargo.toml +++ b/client/executor/Cargo.toml @@ -14,8 +14,8 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -lru = "0.10.0" parking_lot = "0.12.1" +schnellru = "0.2.1" tracing = "0.1.29" codec = { package = "parity-scale-codec", version = "3.6.1" } diff --git a/client/executor/src/wasm_runtime.rs b/client/executor/src/wasm_runtime.rs index dea84467963c3..6dec3abdb20cf 100644 --- a/client/executor/src/wasm_runtime.rs +++ b/client/executor/src/wasm_runtime.rs @@ -22,24 +22,24 @@ //! components of the runtime that are expensive to initialize. use crate::error::{Error, WasmError}; + use codec::Decode; -use lru::LruCache; use parking_lot::Mutex; use sc_executor_common::{ runtime_blob::RuntimeBlob, wasm_runtime::{HeapAllocStrategy, WasmInstance, WasmModule}, }; +use schnellru::{ByLength, LruMap}; use sp_core::traits::{Externalities, FetchRuntimeCode, RuntimeCode}; use sp_version::RuntimeVersion; +use sp_wasm_interface::HostFunctions; + use std::{ - num::NonZeroUsize, panic::AssertUnwindSafe, path::{Path, PathBuf}, sync::Arc, }; -use sp_wasm_interface::HostFunctions; - /// Specification of different methods of executing the runtime Wasm code. #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)] pub enum WasmExecutionMethod { @@ -163,7 +163,7 @@ pub struct RuntimeCache { /// A cache of runtimes along with metadata. /// /// Runtimes sorted by recent usage. The most recently used is at the front. - runtimes: Mutex>>, + runtimes: Mutex>>, /// The size of the instances cache for each runtime. max_runtime_instances: usize, cache_path: Option, @@ -185,9 +185,8 @@ impl RuntimeCache { cache_path: Option, runtime_cache_size: u8, ) -> RuntimeCache { - let cap = - NonZeroUsize::new(runtime_cache_size.max(1) as usize).expect("cache size is not zero"); - RuntimeCache { runtimes: Mutex::new(LruCache::new(cap)), max_runtime_instances, cache_path } + let cap = ByLength::new(runtime_cache_size.max(1) as u32); + RuntimeCache { runtimes: Mutex::new(LruMap::new(cap)), max_runtime_instances, cache_path } } /// Prepares a WASM module instance and executes given function for it. @@ -275,7 +274,7 @@ impl RuntimeCache { let versioned_runtime = Arc::new(result?); // Save new versioned wasm runtime in cache - runtimes.put(versioned_runtime_id, versioned_runtime.clone()); + runtimes.insert(versioned_runtime_id, versioned_runtime.clone()); versioned_runtime }; diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index a3616b403aca7..e25a769587dab 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -19,7 +19,7 @@ futures = "0.3.21" futures-timer = "3.0.1" libp2p = "0.51.3" log = "0.4.17" -lru = "0.10.0" +schnellru = "0.2.1" tracing = "0.1.29" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-network = { version = "0.10.0-dev", path = "../network/" } diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 24373cd402513..f874a5c15b38b 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -20,12 +20,13 @@ use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContex use ahash::AHashSet; use libp2p::PeerId; -use lru::LruCache; +use schnellru::{ByLength, LruMap}; + use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network::types::ProtocolName; use sc_network_common::role::ObservedRole; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; -use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::Instant}; +use std::{collections::HashMap, iter, sync::Arc, time, time::Instant}; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 // NOTE: The current value is adjusted based on largest production network deployment (Kusama) and @@ -36,7 +37,7 @@ use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time:: // // Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then // this cache should take about 256 KB of memory. -const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192; +const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192; const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750); @@ -155,7 +156,7 @@ where pub struct ConsensusGossip { peers: HashMap>, messages: Vec>, - known_messages: LruCache, + known_messages: LruMap, protocol: ProtocolName, validator: Arc>, next_broadcast: Instant, @@ -181,11 +182,7 @@ impl ConsensusGossip { ConsensusGossip { peers: HashMap::new(), messages: Default::default(), - known_messages: { - let cap = NonZeroUsize::new(KNOWN_MESSAGES_CACHE_SIZE) - .expect("cache capacity is not zero"); - LruCache::new(cap) - }, + known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) }, protocol, validator, next_broadcast: Instant::now() + REBROADCAST_INTERVAL, @@ -216,7 +213,7 @@ impl ConsensusGossip { message: Vec, sender: Option, ) { - if self.known_messages.put(message_hash, ()).is_none() { + if self.known_messages.insert(message_hash, ()) { self.messages.push(MessageEntry { message_hash, topic, message, sender }); if let Some(ref metrics) = self.metrics { @@ -313,7 +310,7 @@ impl ConsensusGossip { ); for (_, ref mut peer) in self.peers.iter_mut() { - peer.known_messages.retain(|h| known_messages.contains(h)); + peer.known_messages.retain(|h| known_messages.get(h).is_some()); } } @@ -348,7 +345,7 @@ impl ConsensusGossip { for message in messages { let message_hash = HashFor::::hash(&message[..]); - if self.known_messages.contains(&message_hash) { + if self.known_messages.get(&message_hash).is_some() { tracing::trace!( target: "gossip", %who, @@ -545,7 +542,7 @@ mod tests { macro_rules! push_msg { ($consensus:expr, $topic:expr, $hash: expr, $m:expr) => { - if $consensus.known_messages.put($hash, ()).is_none() { + if $consensus.known_messages.insert($hash, ()) { $consensus.messages.push(MessageEntry { message_hash: $hash, topic: $topic, @@ -720,8 +717,8 @@ mod tests { push_msg!(consensus, prev_hash, m1_hash, m1); push_msg!(consensus, best_hash, m2_hash, m2); - consensus.known_messages.put(m1_hash, ()); - consensus.known_messages.put(m2_hash, ()); + consensus.known_messages.insert(m1_hash, ()); + consensus.known_messages.insert(m2_hash, ()); consensus.collect_garbage(); assert_eq!(consensus.messages.len(), 2); @@ -734,7 +731,7 @@ mod tests { assert_eq!(consensus.messages.len(), 1); // known messages are only pruned based on size. assert_eq!(consensus.known_messages.len(), 2); - assert!(consensus.known_messages.contains(&m2_hash)); + assert!(consensus.known_messages.get(&m2_hash).is_some()); } #[test] diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 5afc92265e26f..1feb1316dbcde 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -24,9 +24,9 @@ futures = "0.3.21" futures-timer = "3.0.2" libp2p = "0.51.3" log = "0.4.17" -lru = "0.10.0" mockall = "0.11.3" prost = "0.11" +schnellru = "0.2.1" smallvec = "1.8.0" thiserror = "1.0" fork-tree = { version = "3.0.0", path = "../../../utils/fork-tree" } diff --git a/client/network/sync/src/block_request_handler.rs b/client/network/sync/src/block_request_handler.rs index 4fa5de3ca3c84..1c576c6188619 100644 --- a/client/network/sync/src/block_request_handler.rs +++ b/client/network/sync/src/block_request_handler.rs @@ -26,8 +26,8 @@ use codec::{Decode, Encode}; use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::debug; -use lru::LruCache; use prost::Message; +use schnellru::{ByLength, LruMap}; use sc_client_api::BlockBackend; use sc_network::{ @@ -44,7 +44,6 @@ use sp_runtime::{ use std::{ cmp::min, hash::{Hash, Hasher}, - num::NonZeroUsize, sync::Arc, time::Duration, }; @@ -137,7 +136,7 @@ pub struct BlockRequestHandler { /// Maps from request to number of times we have seen this request. /// /// This is used to check if a peer is spamming us with the same request. - seen_requests: LruCache, SeenRequestsValue>, + seen_requests: LruMap, SeenRequestsValue>, } impl BlockRequestHandler @@ -167,9 +166,8 @@ where ); protocol_config.inbound_queue = Some(tx); - let capacity = - NonZeroUsize::new(num_peer_hint.max(1) * 2).expect("cache capacity is not zero"); - let seen_requests = LruCache::new(capacity); + let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2); + let seen_requests = LruMap::new(capacity); (Self { client, request_receiver, seen_requests }, protocol_config) } @@ -236,7 +234,7 @@ where .difference(BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION) .is_empty(); - match self.seen_requests.get_mut(&key) { + match self.seen_requests.get(&key) { Some(SeenRequestsValue::First) => {}, Some(SeenRequestsValue::Fulfilled(ref mut requests)) => { *requests = requests.saturating_add(1); @@ -250,7 +248,7 @@ where } }, None => { - self.seen_requests.put(key.clone(), SeenRequestsValue::First); + self.seen_requests.insert(key.clone(), SeenRequestsValue::First); }, } @@ -277,7 +275,7 @@ where .iter() .any(|b| !b.header.is_empty() || !b.body.is_empty() || b.is_empty_justification) { - if let Some(value) = self.seen_requests.get_mut(&key) { + if let Some(value) = self.seen_requests.get(&key) { // If this is the first time we have processed this request, we need to change // it to `Fulfilled`. if let SeenRequestsValue::First = value { diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index 773b6b40f4241..1c57e4a275978 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -28,10 +28,10 @@ use codec::{Decode, Encode}; use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; -use lru::LruCache; use prometheus_endpoint::{ register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64, }; +use schnellru::{ByLength, LruMap}; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::ImportQueueService; @@ -239,7 +239,7 @@ pub struct SyncingEngine { default_peers_set_num_light: usize, /// A cache for the data that was associated to a block announcement. - block_announce_data_cache: LruCache>, + block_announce_data_cache: LruMap>, /// The `PeerId`'s of all boot nodes. boot_node_ids: HashSet, @@ -294,12 +294,9 @@ where } else { net_config.network_config.max_blocks_per_request }; - let cache_capacity = NonZeroUsize::new( - (net_config.network_config.default_peers_set.in_peers as usize + - net_config.network_config.default_peers_set.out_peers as usize) - .max(1), - ) - .expect("cache capacity is not zero"); + let cache_capacity = (net_config.network_config.default_peers_set.in_peers + + net_config.network_config.default_peers_set.out_peers) + .max(1); let important_peers = { let mut imp_p = HashSet::new(); for reserved in &net_config.network_config.default_peers_set.reserved_nodes { @@ -381,7 +378,7 @@ where network_service, peers: HashMap::new(), evicted: HashSet::new(), - block_announce_data_cache: LruCache::new(cache_capacity), + block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), block_announce_protocol_name, num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), @@ -465,7 +462,7 @@ where if let Some(data) = announce.data { if !data.is_empty() { - self.block_announce_data_cache.put(announce.header.hash(), data); + self.block_announce_data_cache.insert(announce.header.hash(), data); } } }, diff --git a/client/network/sync/src/state_request_handler.rs b/client/network/sync/src/state_request_handler.rs index 5e2d0ae48f62b..f3af9a3844300 100644 --- a/client/network/sync/src/state_request_handler.rs +++ b/client/network/sync/src/state_request_handler.rs @@ -23,8 +23,8 @@ use codec::{Decode, Encode}; use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::{debug, trace}; -use lru::LruCache; use prost::Message; +use schnellru::{ByLength, LruMap}; use sc_client_api::{BlockBackend, ProofProvider}; use sc_network::{ @@ -35,7 +35,6 @@ use sp_runtime::traits::Block as BlockT; use std::{ hash::{Hash, Hasher}, - num::NonZeroUsize, sync::Arc, time::Duration, }; @@ -115,7 +114,7 @@ pub struct StateRequestHandler { /// Maps from request to number of times we have seen this request. /// /// This is used to check if a peer is spamming us with the same request. - seen_requests: LruCache, SeenRequestsValue>, + seen_requests: LruMap, SeenRequestsValue>, } impl StateRequestHandler @@ -145,9 +144,8 @@ where ); protocol_config.inbound_queue = Some(tx); - let capacity = - NonZeroUsize::new(num_peer_hint.max(1) * 2).expect("cache capacity is not zero"); - let seen_requests = LruCache::new(capacity); + let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2); + let seen_requests = LruMap::new(capacity); (Self { client, request_receiver, seen_requests }, protocol_config) } @@ -180,7 +178,7 @@ where let mut reputation_changes = Vec::new(); - match self.seen_requests.get_mut(&key) { + match self.seen_requests.get(&key) { Some(SeenRequestsValue::First) => {}, Some(SeenRequestsValue::Fulfilled(ref mut requests)) => { *requests = requests.saturating_add(1); @@ -190,7 +188,7 @@ where } }, None => { - self.seen_requests.put(key.clone(), SeenRequestsValue::First); + self.seen_requests.insert(key.clone(), SeenRequestsValue::First); }, } @@ -247,7 +245,7 @@ where .last() .map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))), ); - if let Some(value) = self.seen_requests.get_mut(&key) { + if let Some(value) = self.seen_requests.get(&key) { // If this is the first time we have processed this request, we need to change // it to `Fulfilled`. if let SeenRequestsValue::First = value { diff --git a/primitives/blockchain/Cargo.toml b/primitives/blockchain/Cargo.toml index 725b21a6ba5ad..6320fab9390b9 100644 --- a/primitives/blockchain/Cargo.toml +++ b/primitives/blockchain/Cargo.toml @@ -17,8 +17,8 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false, features = ["derive"] } futures = "0.3.21" log = "0.4.17" -lru = "0.10.0" parking_lot = "0.12.1" +schnellru = "0.2.1" thiserror = "1.0.30" sp-api = { version = "4.0.0-dev", path = "../api" } sp-consensus = { version = "0.10.0-dev", path = "../consensus/common" } diff --git a/primitives/blockchain/src/header_metadata.rs b/primitives/blockchain/src/header_metadata.rs index 1d406dd0f4ed4..08b3c9ab3dfbd 100644 --- a/primitives/blockchain/src/header_metadata.rs +++ b/primitives/blockchain/src/header_metadata.rs @@ -18,13 +18,12 @@ //! Implements tree backend, cached header metadata and algorithms //! to compute routes efficiently over the tree of headers. -use lru::LruCache; use parking_lot::RwLock; +use schnellru::{ByLength, LruMap}; use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One}; -use std::num::NonZeroUsize; /// Set to the expected max difference between `best` and `finalized` blocks at sync. -const LRU_CACHE_SIZE: usize = 5_000; +const LRU_CACHE_SIZE: u32 = 5_000; /// Get lowest common ancestor between two blocks in the tree. /// @@ -243,20 +242,19 @@ pub trait HeaderMetadata { /// Caches header metadata in an in-memory LRU cache. pub struct HeaderMetadataCache { - cache: RwLock>>, + cache: RwLock>>, } impl HeaderMetadataCache { /// Creates a new LRU header metadata cache with `capacity`. - pub fn new(capacity: NonZeroUsize) -> Self { - HeaderMetadataCache { cache: RwLock::new(LruCache::new(capacity)) } + pub fn new(capacity: u32) -> Self { + HeaderMetadataCache { cache: RwLock::new(LruMap::new(ByLength::new(capacity))) } } } impl Default for HeaderMetadataCache { fn default() -> Self { - let cap = NonZeroUsize::new(LRU_CACHE_SIZE).expect("cache capacity is not zero"); - HeaderMetadataCache { cache: RwLock::new(LruCache::new(cap)) } + HeaderMetadataCache { cache: RwLock::new(LruMap::new(ByLength::new(LRU_CACHE_SIZE))) } } } @@ -266,11 +264,11 @@ impl HeaderMetadataCache { } pub fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata) { - self.cache.write().put(hash, metadata); + self.cache.write().insert(hash, metadata); } pub fn remove_header_metadata(&self, hash: Block::Hash) { - self.cache.write().pop(&hash); + self.cache.write().remove(&hash); } }