Skip to content

Commit

Permalink
chore(turbo-tasks): Clean up consistency/untracked APIs around local …
Browse files Browse the repository at this point in the history
…outputs and cells (#75171)

We only read data owned by the the currently-executing task when reading from a local Vc, so:
- A separate "untracked" API doesn't make sense. There's never any tracking needed for reading local outputs or cells. (would we mark the current task as dependent on itself?)
- Allowing a `consistency` argument isn't useful because you can't strongly consistently await your own task.
- We don't need to call `notify_scheduled_tasks`, we're not adding any new task dependencies.
  • Loading branch information
bgw authored Jan 24, 2025
1 parent 66d3b9a commit 75ab6b0
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 73 deletions.
10 changes: 0 additions & 10 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,9 @@ impl TurboTasksApi for VcStorage {
}

fn try_read_local_output(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency)
}

fn try_read_local_output_untracked(
&self,
_parent_task_id: TaskId,
_local_task_id: LocalTaskId,
_consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
unimplemented!()
}
Expand Down
40 changes: 11 additions & 29 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,13 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
index: CellId,
) -> Result<Result<TypedCellContent, EventListener>>;

/// This does not accept a consistency argument, as you cannot control consistency of a read of
/// an operation owned by your own task. Strongly consistent reads are only allowed on
/// `OperationVc`s, which should never be local tasks.
fn try_read_local_output(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>>;

/// INVALIDATION: Be careful with this, it will not track dependencies, so
/// using it could break cache invalidation.
fn try_read_local_output_untracked(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>>;

fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;
Expand Down Expand Up @@ -335,7 +328,7 @@ pub enum TaskPersistence {
LocalCells,
}

#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ReadConsistency {
/// The default behavior for most APIs. Reads are faster, but may return stale values, which
/// may later trigger re-computation.
Expand Down Expand Up @@ -1323,26 +1316,16 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
// we don't currently support reading a local output outside of it's own task, so
// tracked/untracked is currently irrelevant
self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency)
}

/// INVALIDATION: Be careful with this, it will not track dependencies, so
/// using it could break cache invalidation.
fn try_read_local_output_untracked(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
// we don't currently support reading a local output outside of it's own task, so
// consistency is currently irrelevant
_consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
CURRENT_GLOBAL_TASK_STATE.with(|gts| {
let gts_read = gts.read().unwrap();

// Local Vcs are local to their parent task, and do not exist outside of it. This is
// weakly enforced at compile time using the `NonLocalValue` marker trait. This
// assertion exists to handle any potential escapes that the compile-time checks cannot
// capture.
gts_read.assert_task_id(parent_task_id);

match gts_read.get_local_task(local_task_id) {
LocalTask::Scheduled { done_event } => Ok(Err(done_event.listen())),
LocalTask::Done { output } => Ok(Ok(output.as_read_result()?)),
Expand Down Expand Up @@ -2051,10 +2034,9 @@ pub(crate) async fn read_local_output(
this: &dyn TurboTasksApi,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<RawVc> {
loop {
match this.try_read_local_output(parent_task_id, local_task_id, consistency)? {
match this.try_read_local_output(parent_task_id, local_task_id)? {
Ok(raw_vc) => return Ok(raw_vc),
Err(event_listener) => event_listener.await,
}
Expand Down
48 changes: 14 additions & 34 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,9 @@ impl RawVc {
}
}
RawVc::LocalOutput(task_id, local_cell_id) => {
current =
read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual)
.await
.map_err(|source| ResolveTypeError::TaskError { source })?;
current = read_local_output(&*tt, task_id, local_cell_id)
.await
.map_err(|source| ResolveTypeError::TaskError { source })?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
Expand Down Expand Up @@ -214,24 +213,22 @@ impl RawVc {
let tt = turbo_tasks();
let mut current = self;
let mut notified = false;
let mut lazily_notify = || {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
};
loop {
match current {
RawVc::TaskOutput(task) => {
lazily_notify();
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
current = read_task_output(&*tt, task, consistency).await?;
}
RawVc::TaskCell(_, _) => return Ok(current),
RawVc::LocalOutput(task_id, local_cell_id) => {
lazily_notify();
current = read_local_output(&*tt, task_id, local_cell_id, consistency).await?;
debug_assert_eq!(consistency, ReadConsistency::Eventual);
current = read_local_output(&*tt, task_id, local_cell_id).await?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
debug_assert_eq!(consistency, ReadConsistency::Eventual);
let shared_reference = read_local_cell(execution_id, local_cell_id);
let value_type = get_value_type(shared_reference.0);
return Ok((value_type.raw_cell)(shared_reference));
Expand All @@ -245,20 +242,10 @@ impl RawVc {
pub(crate) async fn to_non_local(self) -> Result<RawVc> {
let tt = turbo_tasks();
let mut current = self;
let mut notified = false;
let mut lazily_notify = || {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
};
loop {
match current {
RawVc::LocalOutput(task_id, local_cell_id) => {
lazily_notify();
current =
read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual)
.await?;
current = read_local_output(&*tt, task_id, local_cell_id).await?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
Expand Down Expand Up @@ -435,18 +422,10 @@ impl Future for ReadRawVcFuture {
}
}
RawVc::LocalOutput(task_id, local_output_id) => {
let read_result = if this.untracked {
tt.try_read_local_output_untracked(
task_id,
local_output_id,
this.consistency,
)
} else {
tt.try_read_local_output(task_id, local_output_id, this.consistency)
};
debug_assert_eq!(this.consistency, ReadConsistency::Eventual);
let read_result = tt.try_read_local_output(task_id, local_output_id);
match read_result {
Ok(Ok(vc)) => {
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
}
Expand All @@ -455,6 +434,7 @@ impl Future for ReadRawVcFuture {
}
}
RawVc::LocalCell(execution_id, local_cell_id) => {
debug_assert_eq!(this.consistency, ReadConsistency::Eventual);
return Poll::Ready(
Ok(read_local_cell(execution_id, local_cell_id).into()),
);
Expand Down

0 comments on commit 75ab6b0

Please sign in to comment.