From a873525401cfbcdc1e68ee946969d4424b7b5e63 Mon Sep 17 00:00:00 2001 From: shamardy <39480341+shamardy@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:02:39 +0200 Subject: [PATCH] chore(deps): `timed-map` migration (#2247) --- Cargo.lock | 7 +- mm2src/coins/Cargo.toml | 2 + .../eth/web3_transport/websocket_transport.rs | 11 +- .../rpc_clients/electrum_rpc/connection.rs | 8 +- mm2src/common/common.rs | 2 - mm2src/common/expirable_map.rs | 169 --------- mm2src/common/time_cache.rs | 352 ------------------ mm2src/mm2_core/Cargo.toml | 2 + mm2src/mm2_core/src/data_asker.rs | 19 +- mm2src/mm2_core/src/mm_ctx.rs | 12 +- mm2src/mm2_main/Cargo.toml | 2 + mm2src/mm2_main/src/lp_healthcheck.rs | 37 +- mm2src/mm2_main/src/lp_ordermatch.rs | 87 +++-- mm2src/mm2_main/src/lp_swap.rs | 9 +- mm2src/mm2_main/src/lp_swap/swap_watcher.rs | 8 +- mm2src/mm2_main/src/ordermatch_tests.rs | 17 +- mm2src/mm2_p2p/Cargo.toml | 4 +- mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 11 +- 18 files changed, 137 insertions(+), 622 deletions(-) delete mode 100644 mm2src/common/expirable_map.rs delete mode 100644 mm2src/common/time_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 842966e7ab..e60009ae85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,6 +927,7 @@ dependencies = [ "spv_validation", "tendermint-rpc", "time 0.3.20", + "timed-map", "tokio", "tokio-rustls 0.24.1", "tokio-tungstenite-wasm", @@ -3900,6 +3901,7 @@ dependencies = [ "serde", "serde_json", "shared_ref_counter", + "timed-map", "tokio", "uuid", "wasm-bindgen-test", @@ -4116,6 +4118,7 @@ dependencies = [ "sp-trie", "spv_validation", "testcontainers", + "timed-map", "tokio", "trading_api", "trie-db", @@ -6968,9 +6971,9 @@ dependencies = [ [[package]] name = "timed-map" -version = "1.1.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b102d4d896895d697f1dff4141dff28307532dac57a376b2b5665a55b280dc6" +checksum = "30565aee368a9b233f397f46cd803c59285b61d54c5b3ae378611bd467beecbe" dependencies = [ "rustc-hash", "web-time", diff --git a/mm2src/coins/Cargo.toml b/mm2src/coins/Cargo.toml index a706d9d22c..eca8c018bd 100644 --- a/mm2src/coins/Cargo.toml +++ b/mm2src/coins/Cargo.toml @@ -125,6 +125,7 @@ mm2_db = { path = "../mm2_db" } mm2_metamask = { path = "../mm2_metamask" } mm2_test_helpers = { path = "../mm2_test_helpers" } time = { version = "0.3.20", features = ["wasm-bindgen"] } +timed-map = { version = "1.3", features = ["rustc-hash", "wasm"] } tonic = { version = "0.10", default-features = false, features = ["prost", "codegen", "gzip"] } tower-service = "0.3" wasm-bindgen = "0.2.86" @@ -148,6 +149,7 @@ lightning-net-tokio = "0.0.113" rust-ini = { version = "0.13" } rustls = { version = "0.21", features = ["dangerous_configuration"] } secp256k1v24 = { version = "0.24", package = "secp256k1" } +timed-map = { version = "1.3", features = ["rustc-hash"] } tokio = { version = "1.20" } tokio-rustls = { version = "0.24" } tonic = { version = "0.10", features = ["tls", "tls-webpki-roots", "gzip"] } diff --git a/mm2src/coins/eth/web3_transport/websocket_transport.rs b/mm2src/coins/eth/web3_transport/websocket_transport.rs index 36b13bdb78..6a7d084d54 100644 --- a/mm2src/coins/eth/web3_transport/websocket_transport.rs +++ b/mm2src/coins/eth/web3_transport/websocket_transport.rs @@ -11,7 +11,6 @@ use crate::eth::web3_transport::Web3SendOut; use crate::eth::{EthCoin, RpcTransportEventHandlerShared}; use crate::{MmCoin, RpcTransportEventHandler}; use common::executor::{AbortSettings, SpawnAbortable, Timer}; -use common::expirable_map::ExpirableMap; use common::log; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; @@ -25,6 +24,7 @@ use proxy_signature::{ProxySign, RawMessage}; use std::sync::atomic::AtomicBool; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; +use timed_map::TimedMap; use tokio_tungstenite_wasm::WebSocketStream; use web3::error::{Error, TransportError}; use web3::helpers::to_string; @@ -136,7 +136,7 @@ impl WebsocketTransport { &self, request: Option, wsocket: &mut WebSocketStream, - response_notifiers: &mut ExpirableMap>>, + response_notifiers: &mut TimedMap>>, ) -> OuterAction { match request { Some(ControllerMessage::Request(WsRequest { @@ -144,7 +144,7 @@ impl WebsocketTransport { serialized_request, response_notifier, })) => { - response_notifiers.insert( + response_notifiers.insert_expirable( request_id, response_notifier, // Since request will be cancelled when timeout occurs, we are free to drop its state. @@ -187,7 +187,7 @@ impl WebsocketTransport { async fn handle_response( &self, message: Option>, - response_notifiers: &mut ExpirableMap>>, + response_notifiers: &mut TimedMap>>, ) -> OuterAction { match message { Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => { @@ -248,7 +248,8 @@ impl WebsocketTransport { let _guard = self.connection_guard.lock().await; // List of awaiting requests - let mut response_notifiers: ExpirableMap>> = ExpirableMap::default(); + let mut response_notifiers: TimedMap>> = + TimedMap::new_with_map_kind(timed_map::MapKind::FxHashMap).expiration_tick_cap(30); let mut wsocket = match self .attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION) diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs index ca7a4bac60..28229b746e 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs @@ -6,12 +6,12 @@ use crate::{RpcTransportEventHandler, SharableRpcTransportEventHandler}; use common::custom_futures::timeout::FutureTimerExt; use common::executor::{abortable_queue::AbortableQueue, abortable_queue::WeakSpawner, AbortableSystem, SpawnFuture, Timer}; -use common::expirable_map::ExpirableMap; use common::jsonrpc_client::{JsonRpcBatchResponse, JsonRpcErrorType, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseEnum}; use common::log::{error, info}; use common::{now_float, now_ms}; use mm2_rpc::data::legacy::ElectrumProtocol; +use timed_map::{MapKind, TimedMap}; use std::io; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; @@ -48,7 +48,7 @@ cfg_wasm32! { use std::sync::atomic::AtomicUsize; } -pub type JsonRpcPendingRequests = ExpirableMap>; +pub type JsonRpcPendingRequests = TimedMap>; macro_rules! disconnect_and_return { ($typ:tt, $err:expr, $conn:expr, $handlers:expr) => {{ @@ -177,7 +177,7 @@ impl ElectrumConnection { settings, tx: Mutex::new(None), establishing_connection: AsyncMutex::new(()), - responses: Mutex::new(JsonRpcPendingRequests::new()), + responses: Mutex::new(JsonRpcPendingRequests::new_with_map_kind(MapKind::BTreeMap).expiration_tick_cap(50)), protocol_version: Mutex::new(None), last_error: Mutex::new(None), abortable_system, @@ -251,7 +251,7 @@ impl ElectrumConnection { self.responses .lock() .unwrap() - .insert(rpc_id, req_tx, Duration::from_secs_f64(timeout)); + .insert_expirable(rpc_id, req_tx, Duration::from_secs_f64(timeout)); let tx = self .tx .lock() diff --git a/mm2src/common/common.rs b/mm2src/common/common.rs index 117a84bff9..d6e9a45579 100644 --- a/mm2src/common/common.rs +++ b/mm2src/common/common.rs @@ -128,12 +128,10 @@ pub mod crash_reports; pub mod custom_futures; pub mod custom_iter; #[path = "executor/mod.rs"] pub mod executor; -pub mod expirable_map; pub mod notifier; pub mod number_type_casting; pub mod password_policy; pub mod seri; -pub mod time_cache; #[cfg(not(target_arch = "wasm32"))] #[path = "wio.rs"] diff --git a/mm2src/common/expirable_map.rs b/mm2src/common/expirable_map.rs deleted file mode 100644 index 0b3110c066..0000000000 --- a/mm2src/common/expirable_map.rs +++ /dev/null @@ -1,169 +0,0 @@ -//! This module provides a cross-compatible map that associates values with keys and supports expiring entries. -//! -//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood, -//! and is not suitable for cryptographic purposes. - -use instant::{Duration, Instant}; -use rustc_hash::FxHashMap; -use std::{collections::BTreeMap, hash::Hash}; - -#[derive(Clone, Debug)] -pub struct ExpirableEntry { - pub(crate) value: V, - pub(crate) expires_at: Instant, -} - -impl ExpirableEntry { - #[inline(always)] - pub fn new(v: V, exp: Duration) -> Self { - Self { - expires_at: Instant::now() + exp, - value: v, - } - } - - #[inline(always)] - pub fn get_element(&self) -> &V { &self.value } - - #[inline(always)] - pub fn update_value(&mut self, v: V) { self.value = v } - - #[inline(always)] - pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at } - - /// Checks whether entry has longer ttl than the given one. - #[inline(always)] - pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl } -} - -impl Default for ExpirableMap { - fn default() -> Self { Self::new() } -} - -/// A map that allows associating values with keys and expiring entries. -/// It is important to note that this implementation does not have a background worker to -/// automatically clear expired entries. Outdated entries are only removed when the control flow -/// is handed back to the map mutably (i.e. some mutable method of the map is invoked). -/// -/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap` -/// under the hood and is not suitable for cryptographic purposes. -#[derive(Clone, Debug)] -pub struct ExpirableMap { - map: FxHashMap>, - /// A sorted inverse map from expiration times to keys to speed up expired entries clearing. - expiries: BTreeMap, -} - -impl ExpirableMap { - /// Creates a new empty `ExpirableMap` - #[inline] - pub fn new() -> Self { - Self { - map: FxHashMap::default(), - expiries: BTreeMap::new(), - } - } - - /// Returns the associated value if present and not expired. - #[inline] - pub fn get(&self, k: &K) -> Option<&V> { - self.map - .get(k) - .filter(|v| v.expires_at > Instant::now()) - .map(|v| &v.value) - } - - /// Removes a key-value pair from the map and returns the associated value if present and not expired. - #[inline] - pub fn remove(&mut self, k: &K) -> Option { - self.map.remove(k).filter(|v| v.expires_at > Instant::now()).map(|v| { - self.expiries.remove(&v.expires_at); - v.value - }) - } - - /// Inserts a key-value pair with an expiration duration. - /// - /// If a value already exists for the given key, it will be updated and then - /// the old one will be returned. - pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { - self.clear_expired_entries(); - let entry = ExpirableEntry::new(v, exp); - self.expiries.insert(entry.expires_at, k); - self.map.insert(k, entry).map(|v| v.value) - } - - /// Clears the map. - pub fn clear(&mut self) { - self.map.clear(); - self.expiries.clear(); - } - - /// Removes expired entries from the map. - /// - /// Iterates through the `expiries` in order, removing entries that have expired. - /// Stops at the first non-expired entry, leveraging the sorted nature of `BTreeMap`. - fn clear_expired_entries(&mut self) { - let now = Instant::now(); - - // `pop_first()` is used here as it efficiently removes expired entries. - // `first_key_value()` was considered as it wouldn't need re-insertion for - // non-expired entries, but it would require an extra remove operation for - // each expired entry. `pop_first()` needs only one re-insertion per call, - // which is an acceptable trade-off compared to multiple remove operations. - while let Some((exp, key)) = self.expiries.pop_first() { - if exp > now { - self.expiries.insert(exp, key); - break; - } - self.map.remove(&key); - } - } -} - -#[cfg(any(test, target_arch = "wasm32"))] -mod tests { - use super::*; - use crate::cross_test; - use crate::executor::Timer; - - crate::cfg_wasm32! { - use wasm_bindgen_test::*; - wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); - } - - cross_test!(test_clear_expired_entries, { - let mut expirable_map = ExpirableMap::new(); - let value = "test_value"; - let exp = Duration::from_secs(1); - - // Insert 2 entries with 1 sec expiration time - expirable_map.insert("key1", value, exp); - expirable_map.insert("key2", value, exp); - - // Wait for entries to expire - Timer::sleep(2.).await; - - // Clear expired entries - expirable_map.clear_expired_entries(); - - // We waited for 2 seconds, so we shouldn't have any entry accessible - assert_eq!(expirable_map.map.len(), 0); - - // Insert 5 entries - expirable_map.insert("key1", value, Duration::from_secs(5)); - expirable_map.insert("key2", value, Duration::from_secs(4)); - expirable_map.insert("key3", value, Duration::from_secs(7)); - expirable_map.insert("key4", value, Duration::from_secs(2)); - expirable_map.insert("key5", value, Duration::from_millis(3750)); - - // Wait 2 seconds to expire some entries - Timer::sleep(2.).await; - - // Clear expired entries - expirable_map.clear_expired_entries(); - - // We waited for 2 seconds, only one entry should expire - assert_eq!(expirable_map.map.len(), 4); - }); -} diff --git a/mm2src/common/time_cache.rs b/mm2src/common/time_cache.rs deleted file mode 100644 index a1c3987ec2..0000000000 --- a/mm2src/common/time_cache.rs +++ /dev/null @@ -1,352 +0,0 @@ -// Copyright 2020 Sigma Prime Pty Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! This implements a time-based LRU cache for checking gossipsub message duplicates. - -use fnv::FnvHashMap; -use instant::Instant; -use std::collections::hash_map::{self, - Entry::{Occupied, Vacant}, - Iter, Keys}; -use std::collections::VecDeque; -use std::time::Duration; - -use crate::expirable_map::ExpirableEntry; - -#[derive(Debug)] -pub struct TimeCache { - /// Mapping a key to its value together with its latest expire time (can be updated through - /// reinserts). - map: FnvHashMap>, - /// An ordered list of keys by expires time. - list: VecDeque>, - /// The time elements remain in the cache. - ttl: Duration, -} - -pub struct OccupiedEntry<'a, K, V> { - expiration: Instant, - entry: hash_map::OccupiedEntry<'a, K, ExpirableEntry>, - list: &'a mut VecDeque>, -} - -impl<'a, K, V> OccupiedEntry<'a, K, V> -where - K: Eq + std::hash::Hash + Clone, -{ - pub fn into_mut(self) -> &'a mut V { &mut self.entry.into_mut().value } - - #[allow(dead_code)] - pub fn insert_without_updating_expiration(&mut self, value: V) -> V { - //keep old expiration, only replace value of element - ::std::mem::replace(&mut self.entry.get_mut().value, value) - } - - #[allow(dead_code)] - pub fn insert_and_update_expiration(&mut self, value: V) -> V { - //We push back an additional element, the first reference in the list will be ignored - // since we also updated the expires in the map, see below. - self.list.push_back(ExpirableEntry { - value: self.entry.key().clone(), - expires_at: self.expiration, - }); - self.entry - .insert(ExpirableEntry { - value, - expires_at: self.expiration, - }) - .value - } - - pub fn into_mut_with_update_expiration(mut self) -> &'a mut V { - //We push back an additional element, the first reference in the list will be ignored - // since we also updated the expires in the map, see below. - self.list.push_back(ExpirableEntry { - value: self.entry.key().clone(), - expires_at: self.expiration, - }); - self.entry.get_mut().update_expiration(self.expiration); - &mut self.entry.into_mut().value - } -} - -pub struct VacantEntry<'a, K, V> { - expiration: Instant, - entry: hash_map::VacantEntry<'a, K, ExpirableEntry>, - list: &'a mut VecDeque>, -} - -impl<'a, K, V> VacantEntry<'a, K, V> -where - K: Eq + std::hash::Hash + Clone, -{ - pub fn insert(self, value: V) -> &'a mut V { - self.list.push_back(ExpirableEntry { - value: self.entry.key().clone(), - expires_at: self.expiration, - }); - &mut self - .entry - .insert(ExpirableEntry { - value, - expires_at: self.expiration, - }) - .value - } -} - -pub enum Entry<'a, K: 'a, V: 'a> { - Occupied(OccupiedEntry<'a, K, V>), - Vacant(VacantEntry<'a, K, V>), -} - -#[allow(dead_code)] -impl<'a, K: 'a, V: 'a> Entry<'a, K, V> -where - K: Eq + std::hash::Hash + Clone, -{ - pub fn or_insert_with V>(self, default: F) -> &'a mut V { - match self { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => entry.insert(default()), - } - } - - pub fn or_insert_with_update_expiration V>(self, default: F) -> &'a mut V { - match self { - Entry::Occupied(entry) => entry.into_mut_with_update_expiration(), - Entry::Vacant(entry) => entry.insert(default()), - } - } -} - -impl TimeCache -where - Key: Eq + std::hash::Hash + Clone, -{ - pub fn new(ttl: Duration) -> Self { - TimeCache { - map: FnvHashMap::default(), - list: VecDeque::new(), - ttl, - } - } - - fn remove_expired_keys(&mut self, now: Instant) { - while let Some(element) = self.list.pop_front() { - if element.expires_at > now { - self.list.push_front(element); - break; - } - if let Occupied(entry) = self.map.entry(element.value.clone()) { - if entry.get().expires_at <= now { - entry.remove(); - } - } - } - } - - pub fn entry(&mut self, key: Key) -> Entry { - let now = Instant::now(); - self.remove_expired_keys(now); - match self.map.entry(key) { - Occupied(entry) => Entry::Occupied(OccupiedEntry { - expiration: now + self.ttl, - entry, - list: &mut self.list, - }), - Vacant(entry) => Entry::Vacant(VacantEntry { - expiration: now + self.ttl, - entry, - list: &mut self.list, - }), - } - } - - // Inserts new element and removes any expired elements. - // - // If the key was not present this returns `true`. If the value was already present this - // returns `false`. - pub fn insert(&mut self, key: Key, value: Value) -> bool { - if let Entry::Vacant(entry) = self.entry(key) { - entry.insert(value); - true - } else { - false - } - } - - // Removes a certain key even if it didn't expire plus removing other expired keys - pub fn remove(&mut self, key: Key) -> Option { - let result = self.map.remove(&key).map(|el| el.value); - self.remove_expired_keys(Instant::now()); - result - } - - /// Empties the entire cache. - #[allow(dead_code)] - pub fn clear(&mut self) { - self.map.clear(); - self.list.clear(); - } - - pub fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) } - - pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.value) } - - pub fn len(&self) -> usize { self.map.len() } - - pub fn is_empty(&self) -> bool { self.map.is_empty() } - - pub fn ttl(&self) -> Duration { self.ttl } - - pub fn iter(&self) -> Iter> { self.map.iter() } - - pub fn keys(&self) -> Keys> { self.map.keys() } -} - -impl TimeCache -where - Key: Eq + std::hash::Hash + Clone, - Value: Clone, -{ - pub fn as_hash_map(&self) -> std::collections::HashMap { - self.map - .iter() - .map(|(key, expiring_el)| (key.clone(), expiring_el.value.clone())) - .collect() - } -} - -pub struct DuplicateCache(TimeCache); - -impl DuplicateCache -where - Key: Eq + std::hash::Hash + Clone, -{ - pub fn new(ttl: Duration) -> Self { Self(TimeCache::new(ttl)) } - - // Inserts new elements and removes any expired elements. - // - // If the key was not present this returns `true`. If the value was already present this - // returns `false`. - pub fn insert(&mut self, key: Key) -> bool { - if let Entry::Vacant(entry) = self.0.entry(key) { - entry.insert(()); - true - } else { - false - } - } - - pub fn contains(&mut self, key: &Key) -> bool { self.0.contains_key(key) } - - // Removes a certain key even if it didn't expire plus removing other expired keys - #[inline] - pub fn remove(&mut self, key: Key) { self.0.remove(key); } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn time_cache_added_entries_exist() { - let mut cache = TimeCache::new(Duration::from_secs(10)); - - assert!(cache.insert("t", "tv".to_owned())); - assert!(cache.insert("e", "ev".to_owned())); - - // Should report that 't' and 't' already exists - assert!(!cache.insert("t", "td".to_owned())); - assert!(!cache.insert("e", "ed".to_owned())); - - assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); - assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); - assert_eq!(cache.get(&"f"), None); - } - - #[test] - fn time_cache_expired() { - let mut cache = TimeCache::new(Duration::from_secs(1)); - - assert!(cache.insert("t", "tv".to_owned())); - assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); - - std::thread::sleep(Duration::from_millis(500)); - assert!(cache.insert("e", "ev".to_owned())); - assert_eq!(cache.get(&"t"), Some(&"tv".to_owned())); - assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); - - std::thread::sleep(Duration::from_millis(700)); - // insert other value to initiate the expiration - assert!(cache.insert("f", "fv".to_owned())); - // must be expired already - assert_eq!(cache.get(&"t"), None); - assert_eq!(cache.get(&"e"), Some(&"ev".to_owned())); - - std::thread::sleep(Duration::from_millis(700)); - // insert other value to initiate the expiration - assert!(cache.insert("d", "dv".to_owned())); - // must be expired already - assert_eq!(cache.get(&"t"), None); - assert_eq!(cache.get(&"e"), None); - } - - #[test] - fn cache_added_entries_exist() { - let mut cache = DuplicateCache::new(Duration::from_secs(10)); - - cache.insert("t"); - cache.insert("e"); - - // Should report that 't' and 't' already exists - assert!(!cache.insert("t")); - assert!(!cache.insert("e")); - } - - #[test] - fn cache_entries_expire() { - let mut cache = DuplicateCache::new(Duration::from_millis(100)); - - cache.insert("t"); - assert!(!cache.insert("t")); - cache.insert("e"); - //assert!(!cache.insert("t")); - assert!(!cache.insert("e")); - // sleep until cache expiry - std::thread::sleep(Duration::from_millis(101)); - // add another element to clear previous cache - cache.insert("s"); - - // should be removed from the cache - assert!(cache.insert("t")); - } - - #[test] - fn test_remove() { - let mut cache = TimeCache::new(Duration::from_secs(10)); - - cache.insert("t", ""); - cache.insert("e", ""); - cache.remove("e"); - assert!(!cache.contains_key(&"e")); - } -} diff --git a/mm2src/mm2_core/Cargo.toml b/mm2src/mm2_core/Cargo.toml index 13ba18d739..980adfc50c 100644 --- a/mm2src/mm2_core/Cargo.toml +++ b/mm2src/mm2_core/Cargo.toml @@ -34,9 +34,11 @@ uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] } [target.'cfg(target_arch = "wasm32")'.dependencies] instant = { version = "0.1.12", features = ["wasm-bindgen"] } mm2_rpc = { path = "../mm2_rpc", features = [ "rpc_facilities" ] } +timed-map = { version = "1.3", features = ["rustc-hash", "wasm"] } wasm-bindgen-test = { version = "0.3.2" } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] rustls = { version = "0.21", default-features = false } instant = "0.1.12" tokio = { version = "1.20", features = ["io-util", "rt-multi-thread", "net"] } +timed-map = { version = "1.3", features = ["rustc-hash"] } diff --git a/mm2src/mm2_core/src/data_asker.rs b/mm2src/mm2_core/src/data_asker.rs index 7f32f93365..5e40192256 100644 --- a/mm2src/mm2_core/src/data_asker.rs +++ b/mm2src/mm2_core/src/data_asker.rs @@ -1,4 +1,3 @@ -use common::expirable_map::ExpirableMap; use common::{HttpStatusCode, StatusCode}; use derive_more::Display; use futures::channel::oneshot; @@ -12,15 +11,27 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; +use timed_map::{MapKind, TimedMap}; use crate::mm_ctx::{MmArc, MmCtx}; const EVENT_NAME: &str = "DATA_NEEDED"; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct DataAsker { data_id: Arc, - awaiting_asks: Arc>>>, + awaiting_asks: Arc>>>, +} + +impl Default for DataAsker { + fn default() -> Self { + Self { + data_id: Default::default(), + awaiting_asks: Arc::new(AsyncMutex::new( + TimedMap::new_with_map_kind(MapKind::FxHashMap).expiration_tick_cap(5), + )), + } + } } #[derive(Debug, Display)] @@ -59,7 +70,7 @@ impl MmCtx { .awaiting_asks .lock() .await - .insert(data_id, sender, timeout); + .insert_expirable(data_id, sender, timeout); } let input = json!({ diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 0a1afb2eea..d251bd3446 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -1,10 +1,9 @@ #[cfg(feature = "track-ctx-pointer")] use common::executor::Timer; +use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner}, + graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture}; use common::log::{self, LogLevel, LogOnError, LogState}; use common::{cfg_native, cfg_wasm32, small_rng}; -use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner}, - graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture}, - expirable_map::ExpirableMap}; use futures::channel::oneshot; use futures::lock::Mutex as AsyncMutex; use gstuff::{try_s, ERR, ERRL}; @@ -23,6 +22,7 @@ use std::fmt; use std::future::Future; use std::ops::Deref; use std::sync::{Arc, Mutex, OnceLock}; +use timed_map::{MapKind, TimedMap}; use crate::data_asker::DataAsker; @@ -146,7 +146,7 @@ pub struct MmCtx { #[cfg(not(target_arch = "wasm32"))] pub async_sqlite_connection: OnceLock>>, /// Links the RPC context to the P2P context to handle health check responses. - pub healthcheck_response_handler: AsyncMutex>>, + pub healthcheck_response_handler: AsyncMutex>>, } impl MmCtx { @@ -196,7 +196,9 @@ impl MmCtx { nft_ctx: Mutex::new(None), #[cfg(not(target_arch = "wasm32"))] async_sqlite_connection: OnceLock::default(), - healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()), + healthcheck_response_handler: AsyncMutex::new( + TimedMap::new_with_map_kind(MapKind::FxHashMap).expiration_tick_cap(3), + ), } } diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index 27f937cf6f..5dac282483 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -105,6 +105,7 @@ instant = { version = "0.1.12", features = ["wasm-bindgen"] } js-sys = { version = "0.3.27" } mm2_db = { path = "../mm2_db" } mm2_test_helpers = { path = "../mm2_test_helpers" } +timed-map = { version = "1.3", features = ["rustc-hash", "wasm"] } wasm-bindgen = "0.2.86" wasm-bindgen-futures = { version = "0.4.1" } wasm-bindgen-test = { version = "0.3.1" } @@ -117,6 +118,7 @@ hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] } rcgen = "0.10" rustls = { version = "0.21", default-features = false } rustls-pemfile = "1.0.2" +timed-map = { version = "1.3", features = ["rustc-hash"] } tokio = { version = "1.20", features = ["io-util", "rt-multi-thread", "net", "signal"] } [target.'cfg(windows)'.dependencies] diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 20a6004c95..ed2440a7a1 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -1,7 +1,6 @@ use async_std::prelude::FutureExt; use chrono::Utc; use common::executor::SpawnFuture; -use common::expirable_map::ExpirableEntry; use common::{log, HttpStatusCode, StatusCode}; use derive_more::Display; use futures::channel::oneshot::{self, Receiver, Sender}; @@ -80,35 +79,27 @@ impl HealthcheckMessage { const MIN_DURATION_FOR_REUSABLE_MSG: Duration = Duration::from_secs(5); lazy_static! { - static ref RECENTLY_GENERATED_MESSAGE: Mutex> = - Mutex::new(ExpirableEntry::new( - // Using dummy values in order to initialize `HealthcheckMessage` context. - HealthcheckMessage { - signature: vec![], - data: HealthcheckData { - sender_public_key: vec![], - expires_at_secs: 0, - is_a_reply: false, - }, - }, - Duration::from_secs(0) - )); + static ref RECENTLY_GENERATED_MESSAGE: Mutex> = Mutex::new(None); } // If recently generated message has longer life than `MIN_DURATION_FOR_REUSABLE_MSG`, we can reuse it to // reduce the message generation overhead under high pressure. let mut mutexed_msg = RECENTLY_GENERATED_MESSAGE.lock().unwrap(); - if mutexed_msg.has_longer_life_than(MIN_DURATION_FOR_REUSABLE_MSG) { - Ok(mutexed_msg.get_element().clone()) - } else { - let new_msg = HealthcheckMessage::generate_message(ctx, true)?; + if let Some((ref msg, expiration)) = *mutexed_msg { + if expiration > Instant::now() + MIN_DURATION_FOR_REUSABLE_MSG { + return Ok(msg.clone()); + } + } - mutexed_msg.update_value(new_msg.clone()); - mutexed_msg.update_expiration(Instant::now() + Duration::from_secs(healthcheck_message_exp_secs())); + let new_msg = HealthcheckMessage::generate_message(ctx, true)?; - Ok(new_msg) - } + *mutexed_msg = Some(( + new_msg.clone(), + Instant::now() + Duration::from_secs(healthcheck_message_exp_secs()), + )); + + Ok(new_msg) } fn is_received_message_valid(&self) -> Result { @@ -266,7 +257,7 @@ pub async fn peer_connection_healthcheck_rpc( { let mut book = ctx.healthcheck_response_handler.lock().await; - book.insert(target_peer_address.into(), tx, address_record_exp); + book.insert_expirable(target_peer_address.into(), tx, address_record_exp); } broadcast_p2p_msg( diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index dba2139998..631d7cb55d 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -29,7 +29,6 @@ use coins::{coin_conf, find_pair, lp_coinfind, BalanceTradeFeeUpdatedHandler, Co use common::executor::{simple_map::AbortableSimpleMap, AbortSettings, AbortableSystem, AbortedError, SpawnAbortable, SpawnFuture, Timer}; use common::log::{error, warn, LogOnError}; -use common::time_cache::TimeCache; use common::{bits256, log, new_uuid, now_ms, now_sec}; use crypto::privkey::SerializableSecp256k1Keypair; use crypto::{CryptoCtx, CryptoCtxError}; @@ -67,6 +66,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use timed_map::{MapKind, TimedMap}; use trie_db::NodeCodec as NodeCodecT; use uuid::Uuid; @@ -365,7 +365,9 @@ fn process_maker_order_cancelled(ctx: &MmArc, from_pubkey: String, cancelled_msg // is received within the `RECENTLY_CANCELLED_TIMEOUT` timeframe. // We do this even if the order is in the order_set, because it could have been added through // means other than the order creation message. - orderbook.recently_cancelled.insert(uuid, from_pubkey.clone()); + orderbook + .recently_cancelled + .insert_expirable(uuid, from_pubkey.clone(), RECENTLY_CANCELLED_TIMEOUT); if let Some(order) = orderbook.order_set.get(&uuid) { if order.pubkey == from_pubkey { orderbook.remove_order_trie_update(uuid); @@ -510,7 +512,7 @@ fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair: return; } - pubkey_state.order_pairs_trie_state_history.remove(alb_pair.into()); + pubkey_state.order_pairs_trie_state_history.remove(&alb_pair.to_owned()); let mut orders_to_remove = Vec::with_capacity(pubkey_state.orders_uuids.len()); pubkey_state.orders_uuids.retain(|(uuid, alb)| { @@ -2344,7 +2346,7 @@ struct TrieDiff { #[derive(Debug)] struct TrieDiffHistory { - inner: TimeCache>, + inner: TimedMap>, } impl TrieDiffHistory { @@ -2354,30 +2356,31 @@ impl TrieDiffHistory { return; } - match self.inner.remove(diff.next_root) { + match self.inner.remove(&diff.next_root) { Some(mut diff) => { // we reached a state that was already reached previously // history can be cleaned up to this state hash - while let Some(next_diff) = self.inner.remove(diff.next_root) { + while let Some(next_diff) = self.inner.remove(&diff.next_root) { diff = next_diff; } }, None => { - self.inner.insert(insert_at, diff); + self.inner + .insert_expirable(insert_at, diff, Duration::from_secs(TRIE_ORDER_HISTORY_TIMEOUT)); }, }; } #[allow(dead_code)] - fn remove_key(&mut self, key: H64) { self.inner.remove(key); } + fn remove_key(&mut self, key: H64) { self.inner.remove(&key); } #[allow(dead_code)] - fn contains_key(&self, key: &H64) -> bool { self.inner.contains_key(key) } + fn contains_key(&self, key: &H64) -> bool { self.inner.get(key).is_some() } fn get(&self, key: &H64) -> Option<&TrieDiff> { self.inner.get(key) } #[allow(dead_code)] - fn len(&self) -> usize { self.inner.len() } + fn len(&self) -> usize { self.inner.len_unchecked() } } type TrieOrderHistory = TrieDiffHistory; @@ -2387,7 +2390,7 @@ struct OrderbookPubkeyState { last_keep_alive: u64, /// The map storing historical data about specific pair subtrie changes /// Used to get diffs of orders of pair between specific root hashes - order_pairs_trie_state_history: TimeCache, + order_pairs_trie_state_history: TimedMap, /// The known UUIDs owned by pubkey with alphabetically ordered pair to ease the lookup during pubkey orderbook requests orders_uuids: HashSet<(Uuid, AlbOrderedOrderbookPair)>, /// The map storing alphabetically ordered pair with trie root hash of orders owned by pubkey. @@ -2395,10 +2398,10 @@ struct OrderbookPubkeyState { } impl OrderbookPubkeyState { - pub fn with_history_timeout(ttl: Duration) -> OrderbookPubkeyState { + pub fn new() -> OrderbookPubkeyState { OrderbookPubkeyState { last_keep_alive: now_sec(), - order_pairs_trie_state_history: TimeCache::new(ttl), + order_pairs_trie_state_history: TimedMap::new_with_map_kind(MapKind::FxHashMap), orders_uuids: HashSet::default(), trie_roots: HashMap::default(), } @@ -2423,7 +2426,7 @@ fn pubkey_state_mut<'a>( match state.raw_entry_mut().from_key(from_pubkey) { RawEntryMut::Occupied(e) => e.into_mut(), RawEntryMut::Vacant(e) => { - let state = OrderbookPubkeyState::with_history_timeout(Duration::new(TRIE_STATE_HISTORY_TIMEOUT, 0)); + let state = OrderbookPubkeyState::new(); e.insert(from_pubkey.to_string(), state).1 }, } @@ -2436,17 +2439,6 @@ fn order_pair_root_mut<'a>(state: &'a mut HashMap, } } -fn pair_history_mut<'a>( - state: &'a mut TimeCache, - pair: &str, -) -> &'a mut TrieOrderHistory { - state - .entry(pair.into()) - .or_insert_with_update_expiration(|| TrieOrderHistory { - inner: TimeCache::new(Duration::from_secs(TRIE_ORDER_HISTORY_TIMEOUT)), - }) -} - /// `parity_util_mem::malloc_size` crushes for some reason on wasm32 #[cfg(target_arch = "wasm32")] fn collect_orderbook_metrics(_ctx: &MmArc, _orderbook: &Orderbook) {} @@ -2472,11 +2464,11 @@ struct Orderbook { order_set: HashMap, /// a map of orderbook states of known maker pubkeys pubkeys_state: HashMap, - /// The `TimeCache` of recently canceled orders, mapping `Uuid` to the maker pubkey as `String`, + /// `TimedMap` of recently canceled orders, mapping `Uuid` to the maker pubkey as `String`, /// used to avoid order recreation in case of out-of-order p2p messages, /// e.g., when receiving the order cancellation message before the order is created. /// Entries are kept for `RECENTLY_CANCELLED_TIMEOUT` seconds. - recently_cancelled: TimeCache, + recently_cancelled: TimedMap, topics_subscribed_to: HashMap, /// MemoryDB instance to store Patricia Tries data memory_db: MemoryDB, @@ -2492,7 +2484,7 @@ impl Default for Orderbook { unordered: HashMap::default(), order_set: HashMap::default(), pubkeys_state: HashMap::default(), - recently_cancelled: TimeCache::new(RECENTLY_CANCELLED_TIMEOUT), + recently_cancelled: TimedMap::new_with_map_kind(MapKind::FxHashMap), topics_subscribed_to: HashMap::default(), memory_db: MemoryDB::default(), my_p2p_pubkeys: HashSet::default(), @@ -2557,7 +2549,31 @@ impl Orderbook { } if prev_root != H64::default() { - let history = pair_history_mut(&mut pubkey_state.order_pairs_trie_state_history, &alb_ordered); + let _ = pubkey_state + .order_pairs_trie_state_history + .update_expiration_status(alb_ordered.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); + + let history = match pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_ordered) + { + Some(t) => t, + None => { + pubkey_state.order_pairs_trie_state_history.insert_expirable( + alb_ordered.clone(), + TrieOrderHistory { + inner: TimedMap::new_with_map_kind(MapKind::FxHashMap), + }, + Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT), + ); + + pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_ordered) + .expect("must exist") + }, + }; + history.insert_new_diff(prev_root, TrieDiff { delta: vec![(order.uuid, Some(order.clone()))], next_root: *pair_root, @@ -2656,13 +2672,20 @@ impl Orderbook { }, }; - if pubkey_state.order_pairs_trie_state_history.get(&alb_ordered).is_some() { - let history = pair_history_mut(&mut pubkey_state.order_pairs_trie_state_history, &alb_ordered); + let _ = pubkey_state + .order_pairs_trie_state_history + .update_expiration_status(alb_ordered.clone(), Duration::from_secs(TRIE_STATE_HISTORY_TIMEOUT)); + + if let Some(history) = pubkey_state + .order_pairs_trie_state_history + .get_mut_unchecked(&alb_ordered) + { history.insert_new_diff(old_state, TrieDiff { delta: vec![(uuid, None)], next_root: *pair_state, }); - } + }; + Some(order) } diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 02f5dc70b1..a77ca9edf8 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -65,7 +65,6 @@ use bitcrypto::{dhash160, sha256}; use coins::{lp_coinfind, lp_coinfind_or_err, CoinFindError, DexFee, MmCoin, MmCoinEnum, TradeFee, TransactionEnum}; use common::log::{debug, warn}; use common::now_sec; -use common::time_cache::DuplicateCache; use common::{bits256, calc_total_pages, executor::{spawn_abortable, AbortOnDropHandle, SpawnFuture, Timer}, log::{error, info}, @@ -89,7 +88,7 @@ use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use timed_map::{MapKind, TimedMap}; use uuid::Uuid; #[cfg(any(feature = "custom-swap-locktime", test, feature = "run-docker-tests"))] @@ -523,7 +522,7 @@ struct SwapsContext { banned_pubkeys: Mutex>, swap_msgs: Mutex>, swap_v2_msgs: Mutex>, - taker_swap_watchers: PaMutex>>, + taker_swap_watchers: PaMutex, ()>>, locked_amounts: Mutex>>, #[cfg(target_arch = "wasm32")] swap_db: ConstructibleDb, @@ -539,9 +538,7 @@ impl SwapsContext { banned_pubkeys: Mutex::new(HashMap::new()), swap_msgs: Mutex::new(HashMap::new()), swap_v2_msgs: Mutex::new(HashMap::new()), - taker_swap_watchers: PaMutex::new(DuplicateCache::new(Duration::from_secs( - TAKER_SWAP_ENTRY_TIMEOUT_SEC, - ))), + taker_swap_watchers: PaMutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)), locked_amounts: Mutex::new(HashMap::new()), #[cfg(target_arch = "wasm32")] swap_db: ConstructibleDb::new(ctx), diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index bba8479b7c..e379e4dee1 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -12,6 +12,7 @@ use common::executor::{AbortSettings, SpawnAbortable, Timer}; use common::log::{debug, error, info}; use common::{now_sec, DEX_FEE_ADDR_RAW_PUBKEY}; use futures::compat::Future01CompatExt; +use instant::Duration; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MapToMmResult; use mm2_libp2p::{decode_signed, pub_sub_topic, TopicPrefix}; @@ -560,7 +561,10 @@ impl SwapWatcherLock { fn lock_taker(swap_ctx: Arc, fee_hash: Vec) -> Option { { let mut guard = swap_ctx.taker_swap_watchers.lock(); - if !guard.insert(fee_hash.clone()) { + if guard + .insert_expirable(fee_hash.clone(), (), Duration::from_secs(TAKER_SWAP_ENTRY_TIMEOUT_SEC)) + .is_some() + { // There is the same hash already. return None; } @@ -577,7 +581,7 @@ impl SwapWatcherLock { impl Drop for SwapWatcherLock { fn drop(&mut self) { match self.watcher_type { - WatcherType::Taker => self.swap_ctx.taker_swap_watchers.lock().remove(self.fee_hash.clone()), + WatcherType::Taker => self.swap_ctx.taker_swap_watchers.lock().remove(&self.fee_hash.clone()), }; } } diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 3bf81d6370..56a9c6a135 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -2500,9 +2500,7 @@ fn test_orderbook_pubkey_sync_request_relay() { #[test] fn test_trie_diff_avoid_cycle_on_insertion() { - let mut history = TrieDiffHistory:: { - inner: TimeCache::new(Duration::from_secs(3600)), - }; + let mut history = TrieDiffHistory:: { inner: TimedMap::new() }; history.insert_new_diff([1; 8], TrieDiff { delta: vec![], next_root: [2; 8], @@ -2524,12 +2522,12 @@ fn test_trie_diff_avoid_cycle_on_insertion() { next_root: [2; 8], }); - let expected = HashMap::from_iter(iter::once(([1u8; 8], TrieDiff { + let expected = TrieDiff { delta: vec![], next_root: [2; 8], - }))); + }; - assert_eq!(expected, history.inner.as_hash_map()); + assert_eq!(&expected, history.inner.get(&[1u8; 8]).unwrap()); } #[test] @@ -2634,7 +2632,12 @@ fn check_if_orderbook_contains_only(orderbook: &Orderbook, pubkey: &str, orders: assert_eq!(orderbook.unordered, expected_unordered); // history - let actual_keys: HashSet<_> = pubkey_state.order_pairs_trie_state_history.keys().cloned().collect(); + let actual_keys: HashSet<_> = pubkey_state + .order_pairs_trie_state_history + .keys() + .iter() + .cloned() + .collect(); let expected_keys: HashSet<_> = orders .iter() .map(|order| alb_ordered_pair(&order.base, &order.rel)) diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index 687b61d880..812c1346de 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -39,14 +39,14 @@ void = "1.0" futures-rustls = "0.24" instant = "0.1.12" libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.12", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } -timed-map = { version = "1.1.1", features = ["rustc-hash"] } +timed-map = { version = "1.3", features = ["rustc-hash"] } tokio = { version = "1.20", default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-rustls = "0.22" instant = { version = "0.1.12", features = ["wasm-bindgen"] } libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.12", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] } -timed-map = { version = "1.1.1", features = ["rustc-hash", "wasm"] } +timed-map = { version = "1.3", features = ["rustc-hash", "wasm"] } [dev-dependencies] async-std = "1.6.2" diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 6d3ccb9d69..7e855487b3 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -26,7 +26,7 @@ use std::iter; use std::net::IpAddr; use std::sync::{Mutex, MutexGuard}; use std::task::{Context, Poll}; -use timed_map::{MapKind, StdClock, TimedMap}; +use timed_map::{MapKind, TimedMap}; use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse}; use super::ping::AdexPing; @@ -67,7 +67,7 @@ const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5); lazy_static! { /// Tracks recently dialed peers to avoid repeated connection attempts. - static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); + static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); } pub const DEPRECATED_NETID_LIST: &[u16] = &[ @@ -186,16 +186,13 @@ pub enum AdexBehaviourCmd { /// /// Returns `false` if a dial attempt to the given address has already been made, /// in which case the caller must skip the dial attempt. -fn check_and_mark_dialed( - recently_dialed_peers: &mut MutexGuard>, - addr: &Multiaddr, -) -> bool { +fn check_and_mark_dialed(recently_dialed_peers: &mut MutexGuard>, addr: &Multiaddr) -> bool { if recently_dialed_peers.get(addr).is_some() { info!("Connection attempt was already made recently to '{addr}'."); return false; } - recently_dialed_peers.insert_expirable_unchecked(addr.clone(), (), DIAL_RETRY_DELAY); + recently_dialed_peers.insert_expirable(addr.clone(), (), DIAL_RETRY_DELAY); true }