diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index f891387ddf4c7..976ae29ec0bf1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -2,13 +2,14 @@ use std::{ cmp::{max, Ordering}, collections::{hash_map::Entry as HashMapEntry, VecDeque}, hash::Hash, + mem::take, num::NonZeroU32, }; use indexmap::map::Entry; use rustc_hash::{FxHashMap, FxHashSet}; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; #[cfg(any( feature = "trace_aggregation_update", feature = "trace_find_and_schedule" @@ -50,7 +51,7 @@ pub fn is_root_node(aggregation_number: u32) -> bool { fn get_followers_with_aggregation_number( task: &impl TaskGuard, aggregation_number: u32, -) -> Vec { +) -> SmallVec<[TaskId; 4]> { if is_aggregating_node(aggregation_number) { get_many!(task, Follower { task } count if *count > 0 => task) } else { @@ -60,13 +61,13 @@ fn get_followers_with_aggregation_number( /// Returns a list of tasks that are considered as "following" the task. The current tasks is not /// aggregating over the follower tasks and they should be aggregated by all upper tasks. -fn get_followers(task: &impl TaskGuard) -> Vec { +fn get_followers(task: &impl TaskGuard) -> SmallVec<[TaskId; 4]> { get_followers_with_aggregation_number(task, get_aggregation_number(task)) } /// Returns a list of tasks that are considered as "upper" tasks of the task. The upper tasks are /// aggregating over the task. -pub fn get_uppers(task: &impl TaskGuard) -> Vec { +pub fn get_uppers(task: &impl TaskGuard) -> SmallVec<[TaskId; 4]> { get_many!(task, Upper { task } count if *count > 0 => task) } @@ -82,6 +83,42 @@ pub fn get_aggregation_number(task: &impl TaskGuard) -> u32 { .unwrap_or_default() } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct InnerOfUppersHasNewFollowersJob { + pub upper_ids: SmallVec<[TaskId; 4]>, + pub new_follower_ids: SmallVec<[TaskId; 4]>, +} + +impl From for AggregationUpdateJob { + fn from(job: InnerOfUppersHasNewFollowersJob) -> Self { + AggregationUpdateJob::InnerOfUppersHasNewFollowers(Box::new(job)) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct InnerOfUppersLostFollowersJob { + pub upper_ids: SmallVec<[TaskId; 4]>, + pub lost_follower_ids: SmallVec<[TaskId; 4]>, +} + +impl From for AggregationUpdateJob { + fn from(job: InnerOfUppersLostFollowersJob) -> Self { + AggregationUpdateJob::InnerOfUppersLostFollowers(Box::new(job)) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct AggregatedDataUpdateJob { + pub upper_ids: SmallVec<[TaskId; 4]>, + pub update: AggregatedDataUpdate, +} + +impl From for AggregationUpdateJob { + fn from(job: AggregatedDataUpdateJob) -> Self { + AggregationUpdateJob::AggregatedDataUpdate(Box::new(job)) + } +} + /// A job in the job queue for updating something in the aggregated graph. #[derive(Serialize, Deserialize, Clone, Debug)] pub enum AggregationUpdateJob { @@ -99,39 +136,30 @@ pub enum AggregationUpdateJob { }, /// Notifies multiple upper tasks that one of its inner tasks has a new follower. InnerOfUppersHasNewFollower { - upper_ids: Vec, + upper_ids: SmallVec<[TaskId; 4]>, new_follower_id: TaskId, }, /// Notifies an upper task that one of its inner tasks has new followers. InnerOfUpperHasNewFollowers { upper_id: TaskId, - new_follower_ids: Vec, + new_follower_ids: SmallVec<[TaskId; 4]>, }, /// Notifies multiple upper tasks that one of its inner tasks has new followers. - InnerOfUppersHasNewFollowers { - upper_ids: Vec, - new_follower_ids: Vec, - }, + InnerOfUppersHasNewFollowers(Box), /// Notifies multiple upper tasks that one of its inner tasks has lost a follower. InnerOfUppersLostFollower { - upper_ids: Vec, + upper_ids: SmallVec<[TaskId; 4]>, lost_follower_id: TaskId, }, /// Notifies multiple upper tasks that one of its inner tasks has lost followers. - InnerOfUppersLostFollowers { - upper_ids: Vec, - lost_follower_ids: Vec, - }, + InnerOfUppersLostFollowers(Box), /// Notifies an upper task that one of its inner tasks has lost followers. InnerOfUpperLostFollowers { upper_id: TaskId, - lost_follower_ids: Vec, + lost_follower_ids: SmallVec<[TaskId; 4]>, }, /// Notifies an upper task about changed data from an inner task. - AggregatedDataUpdate { - upper_ids: Vec, - update: AggregatedDataUpdate, - }, + AggregatedDataUpdate(Box), /// Invalidates tasks that are dependent on a collectible type. InvalidateDueToCollectiblesChange { task_ids: SmallVec<[TaskId; 4]>, @@ -141,11 +169,11 @@ pub enum AggregationUpdateJob { /// Increases the active counter of the task IncreaseActiveCount { task: TaskId }, /// Increases the active counters of the tasks - IncreaseActiveCounts { task_ids: Vec }, + IncreaseActiveCounts { task_ids: SmallVec<[TaskId; 4]> }, /// Decreases the active counter of the task DecreaseActiveCount { task: TaskId }, /// Decreases the active counters of the tasks - DecreaseActiveCounts { task_ids: Vec }, + DecreaseActiveCounts { task_ids: SmallVec<[TaskId; 4]> }, /// Balances the edges of the graph. This checks if the graph invariant is still met for this /// edge and coverts a upper edge to a follower edge or vice versa. Balancing might triggers /// more changes to the structure. @@ -157,12 +185,15 @@ impl AggregationUpdateJob { task: &mut impl TaskGuard, update: AggregatedDataUpdate, ) -> Option { - let upper_ids: Vec<_> = get_uppers(task); + let upper_ids: SmallVec<_> = get_uppers(task); if !upper_ids.is_empty() { - Some(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: update.clone(), - }) + Some( + AggregatedDataUpdateJob { + upper_ids, + update: update.clone(), + } + .into(), + ) } else { None } @@ -667,10 +698,11 @@ impl AggregationUpdateQueue { // These jobs are never pushed to the queue unreachable!(); } - AggregationUpdateJob::InnerOfUppersHasNewFollowers { - mut upper_ids, - mut new_follower_ids, - } => { + AggregationUpdateJob::InnerOfUppersHasNewFollowers(mut boxed) => { + let InnerOfUppersHasNewFollowersJob { + upper_ids, + new_follower_ids, + } = &mut *boxed; let uppers = upper_ids.len(); let followers = new_follower_ids.len(); if uppers == 1 && followers == 1 { @@ -681,25 +713,27 @@ impl AggregationUpdateQueue { ); } else if uppers > followers { if let Some(new_follower_id) = new_follower_ids.pop() { - if !new_follower_ids.is_empty() { + let upper_ids = if !new_follower_ids.is_empty() { + let upper_ids = upper_ids.clone(); self.jobs.push_front(AggregationUpdateJobItem::new( - AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids: upper_ids.clone(), - new_follower_ids, - }, + AggregationUpdateJob::InnerOfUppersHasNewFollowers(boxed), )); - } + upper_ids + } else { + take(upper_ids) + }; self.inner_of_uppers_has_new_follower(ctx, new_follower_id, upper_ids); } } else if let Some(upper_id) = upper_ids.pop() { - if !upper_ids.is_empty() { + let new_follower_ids = if !upper_ids.is_empty() { + let new_follower_ids = new_follower_ids.clone(); self.jobs.push_front(AggregationUpdateJobItem::new( - AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids, - new_follower_ids: new_follower_ids.clone(), - }, + AggregationUpdateJob::InnerOfUppersHasNewFollowers(boxed), )); - } + new_follower_ids + } else { + take(new_follower_ids) + }; self.inner_of_upper_has_new_followers(ctx, new_follower_ids, upper_id); } } @@ -729,31 +763,34 @@ impl AggregationUpdateQueue { } => { self.inner_of_upper_has_new_follower(ctx, new_follower_id, upper_id); } - AggregationUpdateJob::InnerOfUppersLostFollowers { - mut upper_ids, - mut lost_follower_ids, - } => { + AggregationUpdateJob::InnerOfUppersLostFollowers(mut boxed) => { + let InnerOfUppersLostFollowersJob { + upper_ids, + lost_follower_ids, + } = &mut *boxed; if upper_ids.len() > lost_follower_ids.len() { if let Some(lost_follower_id) = lost_follower_ids.pop() { - if !lost_follower_ids.is_empty() { + let upper_ids = if !lost_follower_ids.is_empty() { + let upper_ids = upper_ids.clone(); self.jobs.push_front(AggregationUpdateJobItem::new( - AggregationUpdateJob::InnerOfUppersLostFollowers { - upper_ids: upper_ids.clone(), - lost_follower_ids, - }, + AggregationUpdateJob::InnerOfUppersLostFollowers(boxed), )); - } + upper_ids + } else { + take(upper_ids) + }; self.inner_of_uppers_lost_follower(ctx, lost_follower_id, upper_ids); } } else if let Some(upper_id) = upper_ids.pop() { - if !upper_ids.is_empty() { + let lost_follower_ids = if !upper_ids.is_empty() { + let lost_follower_ids = lost_follower_ids.clone(); self.jobs.push_front(AggregationUpdateJobItem::new( - AggregationUpdateJob::InnerOfUppersLostFollowers { - upper_ids, - lost_follower_ids: lost_follower_ids.clone(), - }, + AggregationUpdateJob::InnerOfUppersLostFollowers(boxed), )); - } + lost_follower_ids + } else { + take(lost_follower_ids) + }; self.inner_of_upper_lost_followers(ctx, lost_follower_ids, upper_id); } } @@ -769,7 +806,10 @@ impl AggregationUpdateQueue { } => { self.inner_of_upper_lost_followers(ctx, lost_follower_ids, upper_id); } - AggregationUpdateJob::AggregatedDataUpdate { upper_ids, update } => { + AggregationUpdateJob::AggregatedDataUpdate(box AggregatedDataUpdateJob { + upper_ids, + update, + }) => { self.aggregated_data_update(upper_ids, ctx, update); } AggregationUpdateJob::InvalidateDueToCollectiblesChange { @@ -951,10 +991,13 @@ impl AggregationUpdateQueue { if !upper_ids.is_empty() && !diff.is_empty() { // Notify uppers about changed aggregated data - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids: upper_ids.clone(), - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids: upper_ids.clone(), + update: diff, + } + .into(), + ); } if !followers.is_empty() { if has_active_count { @@ -998,7 +1041,7 @@ impl AggregationUpdateQueue { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("make follower").entered(); - let upper_ids: Vec<_> = get_uppers(&upper); + let upper_ids = get_uppers(&upper); // Add the same amount of follower edges if update_count!(upper, Follower { task: task_id }, count) { @@ -1025,16 +1068,22 @@ impl AggregationUpdateQueue { let followers = get_followers(&task); let diff = data.apply(&mut upper, ctx.session_id(), self); if !upper_ids.is_empty() && !diff.is_empty() { - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids: upper_ids.clone(), - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids: upper_ids.clone(), + update: diff, + } + .into(), + ); } if !followers.is_empty() { - self.push(AggregationUpdateJob::InnerOfUppersLostFollowers { - upper_ids: vec![upper_id], - lost_follower_ids: followers, - }); + self.push( + InnerOfUppersLostFollowersJob { + upper_ids: smallvec![upper_id], + lost_follower_ids: followers, + } + .into(), + ); } } Ordering::Equal => {} @@ -1097,7 +1146,7 @@ impl AggregationUpdateQueue { fn aggregated_data_update( &mut self, - upper_ids: Vec, + upper_ids: SmallVec<[TaskId; 4]>, ctx: &mut impl ExecuteContext, update: AggregatedDataUpdate, ) { @@ -1107,10 +1156,13 @@ impl AggregationUpdateQueue { if !diff.is_empty() { let upper_ids = get_uppers(&upper); if !upper_ids.is_empty() { - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ); } } } @@ -1120,7 +1172,7 @@ impl AggregationUpdateQueue { &mut self, ctx: &mut impl ExecuteContext, lost_follower_id: TaskId, - mut upper_ids: Vec, + mut upper_ids: SmallVec<[TaskId; 4]>, ) { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("lost follower (n uppers)", uppers = upper_ids.len()).entered(); @@ -1128,7 +1180,7 @@ impl AggregationUpdateQueue { let mut follower = ctx.task(lost_follower_id, TaskDataCategory::Meta); let mut follower_in_upper_ids = Vec::new(); let mut persistent_uppers = 0; - upper_ids.retain(|&upper_id| { + upper_ids.retain(|&mut upper_id| { let mut keep_upper = false; update!(follower, Upper { task: upper_id }, |old| { let Some(old) = old else { @@ -1152,7 +1204,7 @@ impl AggregationUpdateQueue { }); if !upper_ids.is_empty() { let data = AggregatedDataUpdate::from_task(&mut follower).invert(); - let followers: Vec<_> = get_followers(&follower); + let followers = get_followers(&follower); drop(follower); if !data.is_empty() { @@ -1162,18 +1214,24 @@ impl AggregationUpdateQueue { let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }) + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ) } } } if !followers.is_empty() { - self.push(AggregationUpdateJob::InnerOfUppersLostFollowers { - upper_ids: upper_ids.clone(), - lost_follower_ids: followers, - }); + self.push( + InnerOfUppersLostFollowersJob { + upper_ids: upper_ids.clone(), + lost_follower_ids: followers, + } + .into(), + ); } } else { drop(follower); @@ -1217,7 +1275,7 @@ impl AggregationUpdateQueue { fn inner_of_upper_lost_followers( &mut self, ctx: &mut impl ExecuteContext, - mut lost_follower_ids: Vec, + mut lost_follower_ids: SmallVec<[TaskId; 4]>, upper_id: TaskId, ) { #[cfg(feature = "trace_aggregation_update")] @@ -1248,7 +1306,7 @@ impl AggregationUpdateQueue { }); if remove_upper { let data = AggregatedDataUpdate::from_task(&mut follower).invert(); - let followers: Vec<_> = get_followers(&follower); + let followers = get_followers(&follower); drop(follower); if !data.is_empty() { @@ -1257,10 +1315,13 @@ impl AggregationUpdateQueue { let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }) + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ) } } if !followers.is_empty() { @@ -1313,7 +1374,7 @@ impl AggregationUpdateQueue { &mut self, ctx: &mut impl ExecuteContext, new_follower_id: TaskId, - mut upper_ids: Vec, + mut upper_ids: SmallVec<[TaskId; 4]>, ) { #[cfg(feature = "trace_aggregation_update")] let _span = @@ -1323,8 +1384,8 @@ impl AggregationUpdateQueue { let follower = ctx.task(new_follower_id, TaskDataCategory::Meta); get_aggregation_number(&follower) }; - let mut upper_upper_ids_with_new_follower = Vec::new(); - let mut tasks_for_which_increment_active_count = Vec::new(); + let mut upper_upper_ids_with_new_follower = SmallVec::new(); + let mut tasks_for_which_increment_active_count = SmallVec::new(); let mut is_active = false; swap_retain(&mut upper_ids, |&mut upper_id| { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); @@ -1408,7 +1469,7 @@ impl AggregationUpdateQueue { } let data = AggregatedDataUpdate::from_task(&mut follower); - let children: Vec<_> = get_followers(&follower); + let children = get_followers(&follower); drop(follower); if !data.is_empty() { @@ -1418,18 +1479,24 @@ impl AggregationUpdateQueue { let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }) + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ) } } } if !children.is_empty() { - self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids: upper_ids.clone(), - new_follower_ids: children, - }); + self.push( + InnerOfUppersHasNewFollowersJob { + upper_ids: upper_ids.clone(), + new_follower_ids: children, + } + .into(), + ); } } else { drop(follower); @@ -1456,7 +1523,7 @@ impl AggregationUpdateQueue { fn inner_of_upper_has_new_followers( &mut self, ctx: &mut impl ExecuteContext, - new_follower_ids: Vec, + new_follower_ids: SmallVec<[TaskId; 4]>, upper_id: TaskId, ) { #[cfg(feature = "trace_aggregation_update")] @@ -1474,10 +1541,10 @@ impl AggregationUpdateQueue { }) .collect::>(); - let mut new_followers_of_upper_uppers = Vec::new(); + let mut new_followers_of_upper_uppers = SmallVec::new(); let is_active; let has_active_count; - let mut upper_upper_ids_for_new_followers = Vec::new(); + let mut upper_upper_ids_for_new_followers = SmallVec::new(); let upper_aggregation_number; { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); @@ -1526,7 +1593,7 @@ impl AggregationUpdateQueue { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new inner").entered(); let mut upper_data_updates = Vec::new(); - let mut upper_new_followers = Vec::new(); + let mut upper_new_followers = SmallVec::new(); for &(follower_id, _) in followers_with_aggregation_number.iter() { let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { @@ -1536,7 +1603,7 @@ impl AggregationUpdateQueue { // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); - let children: Vec<_> = get_followers(&follower); + let children = get_followers(&follower); let follower_aggregation_number = get_aggregation_number(&follower); drop(follower); @@ -1580,16 +1647,22 @@ impl AggregationUpdateQueue { drop(upper); // TODO merge AggregatedDataUpdate for next_diff in iter { - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids: upper_ids.clone(), - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids: upper_ids.clone(), + update: diff, + } + .into(), + ); diff = next_diff; } - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ); } } if is_active { @@ -1612,10 +1685,13 @@ impl AggregationUpdateQueue { } // notify uppers about new follower if !upper_upper_ids_for_new_followers.is_empty() { - self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids: upper_upper_ids_for_new_followers, - new_follower_ids: new_followers_of_upper_uppers, - }); + self.push( + InnerOfUppersHasNewFollowersJob { + upper_ids: upper_upper_ids_for_new_followers, + new_follower_ids: new_followers_of_upper_uppers, + } + .into(), + ); } } } @@ -1701,7 +1777,7 @@ impl AggregationUpdateQueue { } // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); - let children: Vec<_> = get_followers(&follower); + let children = get_followers(&follower); drop(follower); if !data.is_empty() { @@ -1710,10 +1786,13 @@ impl AggregationUpdateQueue { let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, - }); + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ); } } if !children.is_empty() { @@ -2003,7 +2082,7 @@ impl Operation for AggregationUpdateQueue { } } -fn swap_retain(vec: &mut Vec, mut f: impl FnMut(&mut T) -> bool) { +fn swap_retain(vec: &mut SmallVec<[T; N]>, mut f: impl FnMut(&mut T) -> bool) { let mut i = 0; while i < vec.len() { if !f(&mut vec[i]) { @@ -2016,12 +2095,15 @@ fn swap_retain(vec: &mut Vec, mut f: impl FnMut(&mut T) -> bool) { #[cfg(test)] mod tests { + use smallvec::{smallvec, SmallVec}; + use crate::backend::operation::aggregation_update::swap_retain; #[test] fn test_swap_retain() { - let mut vec = vec![1, 2, 3, 4, 5]; + let mut vec: SmallVec<[i32; 4]> = smallvec![1, 2, 3, 4, 5]; swap_retain(&mut vec, |a| *a % 2 != 0); - assert_eq!(vec, vec![1, 5, 3]); + let expected: SmallVec<[i32; 4]> = smallvec![1, 5, 3]; + assert_eq!(vec, expected); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index 4b2788f0c884d..e73386aea00e1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -2,6 +2,7 @@ use std::mem::take; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use turbo_tasks::TaskId; #[cfg(feature = "trace_task_dirty")] @@ -12,7 +13,7 @@ use crate::{ operation::{ aggregation_update::{ get_aggregation_number, get_uppers, is_aggregating_node, AggregationUpdateJob, - AggregationUpdateQueue, + AggregationUpdateQueue, InnerOfUppersLostFollowersJob, }, invalidate::make_task_dirty, AggregatedDataUpdate, ExecuteContext, Operation, TaskGuard, @@ -77,7 +78,7 @@ impl Operation for CleanupOldEdgesOperation { if let Some(edge) = outdated.pop() { match edge { OutdatedEdge::Child(child_id) => { - let mut children = Vec::new(); + let mut children = SmallVec::new(); children.push(child_id); outdated.retain(|e| match e { OutdatedEdge::Child(id) => { @@ -104,10 +105,13 @@ impl Operation for CleanupOldEdgesOperation { task_ids: children.clone(), }); } - queue.push(AggregationUpdateJob::InnerOfUppersLostFollowers { - upper_ids, - lost_follower_ids: children, - }); + queue.push( + InnerOfUppersLostFollowersJob { + upper_ids, + lost_follower_ids: children, + } + .into(), + ); } } OutdatedEdge::Collectible(collectible, count) => { diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index 1d3fef0f365ae..d41dce80a55a1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -1,6 +1,7 @@ #![feature(anonymous_lifetime_in_impl_trait)] #![feature(associated_type_defaults)] #![feature(iter_collect_into)] +#![feature(box_patterns)] mod backend; mod backing_storage; diff --git a/turbopack/crates/turbo-tasks/src/capture_future.rs b/turbopack/crates/turbo-tasks/src/capture_future.rs index 1478e00af78d3..8b5d2b21b6e27 100644 --- a/turbopack/crates/turbo-tasks/src/capture_future.rs +++ b/turbopack/crates/turbo-tasks/src/capture_future.rs @@ -1,24 +1,28 @@ use std::{ + cell::RefCell, future::Future, pin::Pin, - sync::{Arc, Mutex}, task::{Context, Poll}, time::{Duration, Instant}, }; use pin_project_lite::pin_project; -use tokio::{task::futures::TaskLocalFuture, task_local}; use turbo_tasks_malloc::{AllocationInfo, TurboMalloc}; -task_local! { - static EXTRA: Arc>; +struct ThreadLocalData { + duration: Duration, + allocations: usize, + deallocations: usize, +} + +thread_local! { + static EXTRA: RefCell> = const { RefCell::new(None) }; } pin_project! { pub struct CaptureFuture> { - cell: Arc>, #[pin] - future: TaskLocalFuture>, F>, + future: F, duration: Duration, allocations: usize, deallocations: usize, @@ -27,10 +31,8 @@ pin_project! { impl> CaptureFuture { pub fn new(future: F) -> Self { - let cell = Arc::new(Mutex::new((Duration::ZERO, 0, 0))); Self { - future: EXTRA.scope(cell.clone(), future), - cell, + future, duration: Duration::ZERO, allocations: 0, deallocations: 0, @@ -38,15 +40,27 @@ impl> CaptureFuture { } } +fn try_with_thread_local_data(f: impl FnOnce(&mut ThreadLocalData)) { + EXTRA.with_borrow(|cell| { + if let Some(data) = cell { + // Safety: This data is thread local and only accessed in this thread + unsafe { + f(&mut **data); + } + } + }); +} + pub fn add_duration(duration: Duration) { - let _ = EXTRA.try_with(|cell| cell.lock().unwrap().0 += duration); + try_with_thread_local_data(|data| { + data.duration += duration; + }); } pub fn add_allocation_info(alloc_info: AllocationInfo) { - let _ = EXTRA.try_with(|cell| { - let mut guard = cell.lock().unwrap(); - guard.1 += alloc_info.allocations; - guard.2 += alloc_info.deallocations; + try_with_thread_local_data(|data| { + data.allocations += alloc_info.allocations; + data.deallocations += alloc_info.deallocations; }); } @@ -57,20 +71,38 @@ impl> Future for CaptureFuture { let this = self.project(); let start = Instant::now(); let start_allocations = TurboMalloc::allocation_counters(); + let guard = ThreadLocalDataDropGuard; + let mut data = ThreadLocalData { + duration: Duration::ZERO, + allocations: 0, + deallocations: 0, + }; + EXTRA.with_borrow_mut(|cell| { + *cell = Some(&mut data as *mut ThreadLocalData); + }); let result = this.future.poll(cx); + drop(guard); let elapsed = start.elapsed(); let allocations = start_allocations.until_now(); - *this.duration += elapsed; - *this.allocations += allocations.allocations; - *this.deallocations += allocations.deallocations; + *this.duration += elapsed + data.duration; + *this.allocations += allocations.allocations + data.allocations; + *this.deallocations += allocations.deallocations + data.deallocations; match result { Poll::Ready(r) => { - let (duration, allocations, deallocations) = *this.cell.lock().unwrap(); - let memory_usage = (*this.allocations + allocations) - .saturating_sub(*this.deallocations + deallocations); - Poll::Ready((r, *this.duration + duration, memory_usage)) + let memory_usage = this.allocations.saturating_sub(*this.deallocations); + Poll::Ready((r, *this.duration, memory_usage)) } Poll::Pending => Poll::Pending, } } } + +struct ThreadLocalDataDropGuard; + +impl Drop for ThreadLocalDataDropGuard { + fn drop(&mut self) { + EXTRA.with_borrow_mut(|cell| { + *cell = None; + }); + } +}