Skip to content

Commit

Permalink
refactor(turbopack): Remove ReadRawVcFuture.turbo_task (#75217)
Browse files Browse the repository at this point in the history
### Why?

`core::ptr::drop_in_place` of `ReadRawVcFuture.turbo_task` is consuming too much time.
  • Loading branch information
kdy1 authored Jan 23, 2025
1 parent 2f0c80f commit 8952a10
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 104 deletions.
13 changes: 9 additions & 4 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,11 @@ impl<B: Backend + 'static> TurboTasks<B> {
// track a dependency
let raw_result =
read_task_output_untracked(self, task_id, ReadConsistency::Eventual).await?;
ReadVcFuture::<Completion>::from(raw_result.into_read_untracked_with_turbo_tasks(self))
.await?;
turbo_tasks_future_scope(
self.pin(),
ReadVcFuture::<Completion>::from(raw_result.into_read_untracked_with_turbo_tasks(self)),
)
.await?;

Ok(rx.await?)
}
Expand Down Expand Up @@ -1748,7 +1751,8 @@ pub async fn run_once<T: Send + 'static>(
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
// track a dependency
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
ReadVcFuture::<Completion>::from(raw_result.into_read_untracked_with_turbo_tasks(&*tt)).await?;
let raw_future = raw_result.into_read_untracked_with_turbo_tasks(&*tt);
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;

Ok(rx.await?)
}
Expand All @@ -1773,7 +1777,8 @@ pub async fn run_once_with_reason<T: Send + 'static>(
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
// track a dependency
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
ReadVcFuture::<Completion>::from(raw_result.into_read_untracked_with_turbo_tasks(&*tt)).await?;
let raw_future = raw_result.into_read_untracked_with_turbo_tasks(&*tt);
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;

Ok(rx.await?)
}
Expand Down
181 changes: 81 additions & 100 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
use std::{
fmt::{Debug, Display},
future::Future,
hash::Hash,
pin::Pin,
sync::Arc,
task::Poll,
};
use std::{fmt::Display, future::Future, pin::Pin, task::Poll};

use anyhow::Result;
use auto_hash_map::AutoSet;
Expand All @@ -18,7 +11,7 @@ use crate::{
id::{ExecutionId, LocalCellId, LocalTaskId},
manager::{
assert_execution_id, current_task, read_local_cell, read_local_output, read_task_cell,
read_task_output, TurboTasksApi,
read_task_output, with_turbo_tasks, TurboTasksApi,
},
registry::{self, get_value_type},
turbo_tasks, CollectiblesSource, ReadConsistency, TaskId, TraitTypeId, ValueType, ValueTypeId,
Expand Down Expand Up @@ -335,7 +328,6 @@ impl CollectiblesSource for RawVc {
}

pub struct ReadRawVcFuture {
turbo_tasks: Arc<dyn TurboTasksApi>,
consistency: ReadConsistency,
current: RawVc,
untracked: bool,
Expand All @@ -344,20 +336,16 @@ pub struct ReadRawVcFuture {

impl ReadRawVcFuture {
pub(crate) fn new(vc: RawVc) -> Self {
let tt = turbo_tasks();
ReadRawVcFuture {
turbo_tasks: tt,
consistency: ReadConsistency::Eventual,
current: vc,
untracked: false,
listener: None,
}
}

fn new_untracked_with_turbo_tasks(vc: RawVc, turbo_tasks: &dyn TurboTasksApi) -> Self {
let tt = turbo_tasks.pin();
fn new_untracked_with_turbo_tasks(vc: RawVc, _turbo_tasks: &dyn TurboTasksApi) -> Self {
ReadRawVcFuture {
turbo_tasks: tt,
consistency: ReadConsistency::Eventual,
current: vc,
untracked: true,
Expand All @@ -366,9 +354,7 @@ impl ReadRawVcFuture {
}

fn new_untracked(vc: RawVc) -> Self {
let tt = turbo_tasks();
ReadRawVcFuture {
turbo_tasks: tt,
consistency: ReadConsistency::Eventual,
current: vc,
untracked: true,
Expand All @@ -377,9 +363,7 @@ impl ReadRawVcFuture {
}

fn new_strongly_consistent(vc: RawVc) -> Self {
let tt = turbo_tasks();
ReadRawVcFuture {
turbo_tasks: tt,
consistency: ReadConsistency::Strong,
current: vc,
untracked: false,
Expand All @@ -388,9 +372,7 @@ impl ReadRawVcFuture {
}

fn new_strongly_consistent_untracked(vc: RawVc) -> Self {
let tt = turbo_tasks();
ReadRawVcFuture {
turbo_tasks: tt,
consistency: ReadConsistency::Strong,
current: vc,
untracked: true,
Expand All @@ -403,92 +385,91 @@ impl Future for ReadRawVcFuture {
type Output = Result<TypedCellContent>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.turbo_tasks.notify_scheduled_tasks();
// SAFETY: we are not moving this
let this = unsafe { self.get_unchecked_mut() };
'outer: loop {
if let Some(listener) = &mut this.listener {
// SAFETY: listener is from previous pinned this
let listener = unsafe { Pin::new_unchecked(listener) };
if listener.poll(cx).is_pending() {
return Poll::Pending;
with_turbo_tasks(|tt| {
tt.notify_scheduled_tasks();
// SAFETY: we are not moving this
let this = unsafe { self.get_unchecked_mut() };
'outer: loop {
if let Some(listener) = &mut this.listener {
// SAFETY: listener is from previous pinned this
let listener = unsafe { Pin::new_unchecked(listener) };
if listener.poll(cx).is_pending() {
return Poll::Pending;
}
this.listener = None;
}
this.listener = None;
}
let mut listener = match this.current {
RawVc::TaskOutput(task) => {
let read_result = if this.untracked {
this.turbo_tasks
.try_read_task_output_untracked(task, this.consistency)
} else {
this.turbo_tasks
.try_read_task_output(task, this.consistency)
};
match read_result {
Ok(Ok(vc)) => {
// We no longer need to read strongly consistent, as any Vc returned
// from the first task will be inside of the scope of the first task. So
// it's already strongly consistent.
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
let mut listener = match this.current {
RawVc::TaskOutput(task) => {
let read_result = if this.untracked {
tt.try_read_task_output_untracked(task, this.consistency)
} else {
tt.try_read_task_output(task, this.consistency)
};
match read_result {
Ok(Ok(vc)) => {
// We no longer need to read strongly consistent, as any Vc returned
// from the first task will be inside of the scope of the first
// task. So it's already strongly
// consistent.
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
}
RawVc::TaskCell(task, index) => {
let read_result = if this.untracked {
this.turbo_tasks.try_read_task_cell_untracked(task, index)
} else {
this.turbo_tasks.try_read_task_cell(task, index)
};
match read_result {
Ok(Ok(content)) => {
// SAFETY: Constructor ensures that T and U are binary identical
return Poll::Ready(Ok(content));
RawVc::TaskCell(task, index) => {
let read_result = if this.untracked {
tt.try_read_task_cell_untracked(task, index)
} else {
tt.try_read_task_cell(task, index)
};
match read_result {
Ok(Ok(content)) => {
// SAFETY: Constructor ensures that T and U are binary identical
return Poll::Ready(Ok(content));
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
}
RawVc::LocalOutput(task_id, local_output_id) => {
let read_result = if this.untracked {
this.turbo_tasks.try_read_local_output_untracked(
task_id,
local_output_id,
this.consistency,
)
} else {
this.turbo_tasks.try_read_local_output(
task_id,
local_output_id,
this.consistency,
)
};
match read_result {
Ok(Ok(vc)) => {
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
RawVc::LocalOutput(task_id, local_output_id) => {
let read_result = if this.untracked {
tt.try_read_local_output_untracked(
task_id,
local_output_id,
this.consistency,
)
} else {
tt.try_read_local_output(task_id, local_output_id, this.consistency)
};
match read_result {
Ok(Ok(vc)) => {
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
}
RawVc::LocalCell(execution_id, local_cell_id) => {
return Poll::Ready(Ok(read_local_cell(execution_id, local_cell_id).into()));
}
};
// SAFETY: listener is from previous pinned this
match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
Poll::Ready(_) => continue,
Poll::Pending => {
this.listener = Some(listener);
return Poll::Pending;
}
};
}
RawVc::LocalCell(execution_id, local_cell_id) => {
return Poll::Ready(
Ok(read_local_cell(execution_id, local_cell_id).into()),
);
}
};
// SAFETY: listener is from previous pinned this
match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
Poll::Ready(_) => continue,
Poll::Pending => {
this.listener = Some(listener);
return Poll::Pending;
}
};
}
})
}
}

Expand Down

0 comments on commit 8952a10

Please sign in to comment.