From 61edc9a65a88ba14eb3ed3ab3f7202f11d8effb0 Mon Sep 17 00:00:00 2001 From: Aaron Siddhartha Mondal Date: Thu, 23 Jan 2025 01:28:42 +0100 Subject: [PATCH] Ensure that EvictingMap is threadsafe Clamp down some trait bounds so that thread safety issues have a higher chance of providing useful compiler errors. --- nativelink-util/src/evicting_map.rs | 64 +++++++++++++++------- nativelink-util/tests/evicting_map_test.rs | 2 +- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 40ee59f37..af8ad4945 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -98,7 +98,7 @@ impl LenEntry for Arc { } #[derive(MetricsComponent)] -struct State { +struct State { lru: LruCache>, btree: Option>, #[metric(help = "Total size of all items in the store")] @@ -116,12 +116,14 @@ struct State { lifetime_inserted_bytes: Counter, } -impl State { +impl + State +{ /// Removes an item from the cache. async fn remove(&mut self, key: &Q, eviction_item: &EvictionItem, replaced: bool) where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { if let Some(btree) = &mut self.btree { btree.remove(key.borrow()); @@ -153,7 +155,11 @@ impl State } #[derive(MetricsComponent)] -pub struct EvictingMap { +pub struct EvictingMap< + K: Ord + Hash + Eq + Clone + Debug + Send, + T: LenEntry + Debug + Send, + I: InstantWrapper, +> { #[metric] state: Mutex>, anchor_time: I, @@ -169,7 +175,7 @@ pub struct EvictingMap EvictingMap where - K: Ord + Hash + Eq + Clone + Debug, + K: Ord + Hash + Eq + Clone + Debug + Send + Sync, T: LenEntry + Debug + Clone + Send + Sync, I: InstantWrapper, { @@ -210,11 +216,11 @@ where /// and return the number of items that were processed. /// The `handler` function should return `true` to continue processing the next item /// or `false` to stop processing. - pub async fn range(&self, prefix_range: impl RangeBounds, mut handler: F) -> u64 + pub async fn range(&self, prefix_range: impl RangeBounds + Send, mut handler: F) -> u64 where - F: FnMut(&K, &T) -> bool, + F: FnMut(&K, &T) -> bool + Send, K: Borrow + Ord, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { let mut state = self.state.lock().await; let btree = if let Some(ref btree) = state.btree { @@ -302,7 +308,7 @@ where pub async fn size_for_key(&self, key: &Q) -> Option where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { let mut results = [None]; self.sizes_for_keys([key], &mut results[..], false).await; @@ -317,15 +323,18 @@ where /// LRU cache. Note: peek may still evict, but won't promote. pub async fn sizes_for_keys(&self, keys: It, results: &mut [Option], peek: bool) where - It: IntoIterator, + It: IntoIterator + Send, + // Note: It's not enough to have the inserts themselves be Send. The + // returned iterator should be Send as well. + ::IntoIter: Send, // This may look strange, but what we are doing is saying: // * `K` must be able to borrow `Q` // * `R` (the input stream item type) must also be able to borrow `Q` // Note: That K and R do not need to be the same type, they just both need // to be able to borrow a `Q`. K: Borrow, - R: Borrow, - Q: Ord + Hash + Eq + Debug, + R: Borrow + Send, + Q: Ord + Hash + Eq + Debug + Sync, { let mut state = self.state.lock().await; @@ -369,7 +378,7 @@ where pub async fn get(&self, key: &Q) -> Option where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { let mut state = self.state.lock().await; self.evict_items(&mut *state).await; @@ -404,7 +413,13 @@ where /// Same as `insert()`, but optimized for multiple inserts. /// Returns the replaced items if any. - pub async fn insert_many(&self, inserts: impl IntoIterator) -> Vec { + pub async fn insert_many(&self, inserts: It) -> Vec + where + It: IntoIterator + Send, + // Note: It's not enough to have the inserts themselves be Send. The + // returned iterator should be Send as well. + ::IntoIter: Send, + { let mut inserts = inserts.into_iter().peekable(); // Shortcut for cases where there are no inserts, so we don't need to lock. if inserts.peek().is_none() { @@ -415,12 +430,18 @@ where .await } - async fn inner_insert_many( + async fn inner_insert_many( &self, state: &mut State, - inserts: impl IntoIterator, + inserts: It, seconds_since_anchor: i32, - ) -> Vec { + ) -> Vec + where + It: IntoIterator + Send, + // Note: It's not enough to have the inserts themselves be Send. The + // returned iterator should be Send as well. + ::IntoIter: Send, + { let mut replaced_items = Vec::new(); for (key, data) in inserts { let new_item_size = data.len(); @@ -442,7 +463,7 @@ where pub async fn remove(&self, key: &Q) -> bool where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { let mut state = self.state.lock().await; self.inner_remove(&mut state, key).await @@ -451,7 +472,7 @@ where async fn inner_remove(&self, state: &mut State, key: &Q) -> bool where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, { self.evict_items(state).await; if let Some(entry) = state.lru.pop(key.borrow()) { @@ -463,10 +484,11 @@ where /// Same as `remove()`, but allows for a conditional to be applied to the /// entry before removal in an atomic fashion. - pub async fn remove_if bool>(&self, key: &Q, cond: F) -> bool + pub async fn remove_if(&self, key: &Q, cond: F) -> bool where K: Borrow, - Q: Ord + Hash + Eq + Debug, + Q: Ord + Hash + Eq + Debug + Sync, + F: FnOnce(&T) -> bool + Send, { let mut state = self.state.lock().await; if let Some(entry) = state.lru.get(key.borrow()) { diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index 5c6825f79..b4ec6ac94 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -590,7 +590,7 @@ async fn remove_evicts_on_time() -> Result<(), Error> { async fn range_multiple_items_test() -> Result<(), Error> { async fn get_map_range( evicting_map: &EvictingMap, - range: impl std::ops::RangeBounds, + range: impl std::ops::RangeBounds + Send, ) -> Vec<(String, Bytes)> { let mut found_values = Vec::new(); evicting_map