diff --git a/nativelink-scheduler/tests/utils/mock_scheduler.rs b/nativelink-scheduler/tests/utils/mock_scheduler.rs index fe0e37035..113a3d25c 100644 --- a/nativelink-scheduler/tests/utils/mock_scheduler.rs +++ b/nativelink-scheduler/tests/utils/mock_scheduler.rs @@ -17,12 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use nativelink_error::{make_input_err, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; -use nativelink_util::{ - action_messages::{ActionInfo, OperationId}, - known_platform_property_provider::KnownPlatformPropertyProvider, - operation_state_manager::{ - ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, - }, +use nativelink_util::action_messages::{ActionInfo, OperationId}; +use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::operation_state_manager::{ + ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, }; use tokio::sync::{mpsc, Mutex}; diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index ac17063f1..cf9de79b8 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -16,6 +16,7 @@ rust_library( "src/common.rs", "src/connection_manager.rs", "src/digest_hasher.rs", + "src/drop_protected_future.rs", "src/evicting_map.rs", "src/fastcdc.rs", "src/fs.rs", diff --git a/nativelink-util/src/drop_protected_future.rs b/nativelink-util/src/drop_protected_future.rs new file mode 100644 index 000000000..8a2e3b92f --- /dev/null +++ b/nativelink-util/src/drop_protected_future.rs @@ -0,0 +1,68 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use tracing::{Instrument, Span}; + +use crate::origin_context::{ContextAwareFuture, OriginContext}; +use crate::spawn; +#[derive(Clone)] +pub struct DropProtectedFuture +where + T: Send + 'static, + F: Future + Send + 'static + Unpin + Clone, +{ + future: ContextAwareFuture, +} + +impl DropProtectedFuture +where + T: Send + 'static, + F: Future + Send + 'static + Unpin + Clone, +{ + pub fn new(f: F, span: Span, ctx: Option>) -> Self { + Self { + future: ContextAwareFuture::new(ctx, f.instrument(span)), + } + } +} + +impl Future for DropProtectedFuture +where + T: Send + 'static, + F: Future + Send + 'static + Unpin + Clone, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.future).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(output) => Poll::Ready(output), + } + } +} + +impl Drop for DropProtectedFuture +where + T: Send + 'static, + F: Future + Send + 'static + Unpin + Clone, +{ + fn drop(&mut self) { + spawn!("DropProtectedFuture::drop", self.future.clone()); + } +} diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 90a1d5597..9527d62ab 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -26,10 +26,12 @@ use lru::LruCache; use nativelink_config::stores::EvictionPolicy; use nativelink_metric::MetricsComponent; use serde::{Deserialize, Serialize}; -use tracing::{event, Level}; +use tracing::{event, info_span, Level}; +use crate::drop_protected_future::DropProtectedFuture; use crate::instant_wrapper::InstantWrapper; use crate::metrics_utils::{Counter, CounterWithTime}; +use crate::origin_context::ActiveOriginContext; #[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] pub struct SerializedLRU { @@ -98,7 +100,7 @@ impl LenEntry for Arc { } #[derive(MetricsComponent)] -struct State { +struct State { lru: LruCache>, btree: Option>, #[metric(help = "Total size of all items in the store")] @@ -116,7 +118,7 @@ struct State { lifetime_inserted_bytes: Counter, } -impl State { +impl State { /// Removes an item from the cache. async fn remove(&mut self, key: &Q, eviction_item: &EvictionItem, replaced: bool) where @@ -153,7 +155,11 @@ impl State } #[derive(MetricsComponent)] -pub struct EvictingMap { +pub struct EvictingMap< + K: Ord + Hash + Eq + Clone + Debug + Send, + T: LenEntry + Debug, + I: InstantWrapper, +> { #[metric] state: Mutex>, anchor_time: I, @@ -169,7 +175,7 @@ pub struct EvictingMap EvictingMap where - K: Ord + Hash + Eq + Clone + Debug, + K: Ord + Hash + Eq + Clone + Debug + Send + Sync, T: LenEntry + Debug + Clone + Send + Sync, I: InstantWrapper, { @@ -403,6 +409,7 @@ where let mut state = self.state.lock().await; let results = self .inner_insert_many(&mut state, [(key, data)], seconds_since_anchor) + .await .await; results.into_iter().next() } @@ -418,6 +425,7 @@ where let state = &mut self.state.lock().await; self.inner_insert_many(state, inserts, self.anchor_time.elapsed().as_secs() as i32) .await + .await } async fn inner_insert_many( @@ -425,23 +433,29 @@ where mut state: &mut State, inserts: impl IntoIterator, seconds_since_anchor: i32, - ) -> Vec { - let mut replaced_items = Vec::new(); - for (key, data) in inserts.into_iter() { - let new_item_size = data.len() as u64; - let eviction_item = EvictionItem { - seconds_since_anchor, - data, - }; - - if let Some(old_item) = state.put(key, eviction_item).await { - replaced_items.push(old_item); - } - state.sum_store_size += new_item_size; - state.lifetime_inserted_bytes.add(new_item_size); - self.evict_items(state.deref_mut()).await; - } - replaced_items + ) -> impl Future> + Send { + DropProtectedFuture::new( + async move { + let mut replaced_items = Vec::new(); + for (key, data) in inserts.into_iter() { + let new_item_size = data.len() as u64; + let eviction_item = EvictionItem { + seconds_since_anchor, + data, + }; + + if let Some(old_item) = state.put(key, eviction_item).await { + replaced_items.push(old_item); + } + state.sum_store_size += new_item_size; + state.lifetime_inserted_bytes.add(new_item_size); + self.evict_items(state.deref_mut()).await; + } + replaced_items + }, + info_span!("EvictingMap::inner_insert_many"), + ActiveOriginContext::get(), + ) } pub async fn remove(&self, key: &Q) -> bool diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 17edbf700..6bd1c0f22 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -19,6 +19,7 @@ pub mod chunked_stream; pub mod common; pub mod connection_manager; pub mod digest_hasher; +pub mod drop_protected_future; pub mod evicting_map; pub mod fastcdc; pub mod fs; diff --git a/nativelink-util/src/origin_context.rs b/nativelink-util/src/origin_context.rs index be466ae94..6529ae1c7 100644 --- a/nativelink-util/src/origin_context.rs +++ b/nativelink-util/src/origin_context.rs @@ -284,6 +284,7 @@ impl Drop for ContextDropGuard { pin_project! { #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Clone)] pub struct ContextAwareFuture { // `ManuallyDrop` is used so we can call `self.span.enter()` in the `drop()` // of our inner future, then drop the span.