Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbo-tasks): Use Return Position Impl Traits (RPIT) to reimplement GraphTraversal #74896

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks/src/graph/adjacency_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where

impl<T> GraphStore for AdjacencyMap<T>
where
T: Eq + Hash + Clone,
T: Eq + Hash + Clone + Send,
{
type Node = T;
type Handle = T;
Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbo-tasks/src/graph/graph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use super::VisitedNodes;

/// A graph store is a data structure that will be built up during a graph
/// traversal. It is used to store the results of the traversal.
pub trait GraphStore {
type Node;
type Handle: Clone;
pub trait GraphStore: Send {
type Node: Send;
type Handle: Clone + Send;

// TODO(alexkirsz) An `entry(from_handle) -> Entry` API would be more
// efficient, as right now we're getting the same key multiple times.
Expand Down
217 changes: 63 additions & 154 deletions turbopack/crates/turbo-tasks/src/graph/graph_traversal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashSet, future::Future, pin::Pin};
use std::{collections::HashSet, future::Future};

use anyhow::Result;
use futures::{stream::FuturesUnordered, Stream};
use futures::{stream::FuturesUnordered, StreamExt};

use super::{
graph_store::{GraphNode, GraphStore},
Expand All @@ -19,14 +19,15 @@ pub struct VisitedNodes<T>(pub HashSet<T>);
/// The traversal is done in parallel, and the order of the nodes in the traversal
/// result is determined by the [`GraphStore`] parameter.
pub trait GraphTraversal: GraphStore + Sized {
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
fn visit<VisitImpl, Abort, Impl>(
self,
root_edges: RootEdgesIt,
root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
visit: VisitImpl,
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl, VisitImpl::EdgesFuture>
) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
where
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>;
VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
Abort: Send,
Impl: Send;

fn skip_duplicates(self) -> SkipDuplicates<Self>;
fn skip_duplicates_with_visited_nodes(
Expand All @@ -41,16 +42,22 @@ where
{
/// Visits the graph starting from the given `roots`, and returns a future
/// that will resolve to the traversal result.
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
fn visit<VisitImpl, Abort, Impl>(
mut self,
root_edges: RootEdgesIt,
root_edges: impl IntoIterator<Item = VisitImpl::Edge>,
mut visit: VisitImpl,
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl, VisitImpl::EdgesFuture>
) -> impl Future<Output = GraphTraversalResult<Result<Self>, Abort>> + Send
where
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>,
VisitImpl: Visit<Self::Node, Abort, Impl> + Send,
Abort: Send,
Impl: Send,
{
let futures = FuturesUnordered::new();
let mut futures = FuturesUnordered::new();
let mut root_abort = None;

// Populate `futures` with all the roots, `root_edges` isn't required to be `Send`, so this
// has to happen outside of the future. We could require `root_edges` to be `Send` in the
// future.
for edge in root_edges {
match visit.visit(edge) {
VisitControlFlow::Continue(node) => {
Expand All @@ -63,19 +70,52 @@ where
self.insert(None, GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
return GraphTraversalFuture {
state: GraphTraversalState::Aborted { abort },
};
// this must be returned inside the `async` block below so that it's part of the
// returned future
root_abort = Some(abort)
}
}
}
GraphTraversalFuture {
state: GraphTraversalState::Running(GraphTraversalRunningState {
store: self,
futures,
visit,
_phantom: std::marker::PhantomData,
}),

async move {
if let Some(abort) = root_abort {
return GraphTraversalResult::Aborted(abort);
}
loop {
match futures.next().await {
Some((parent_handle, span, Ok(edges))) => {
let _guard = span.enter();
for edge in edges {
match visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((node_handle, node_ref)) =
self.insert(Some(parent_handle.clone()), GraphNode(node))
{
let span = visit.span(node_ref);
futures.push(With::new(
visit.edges(node_ref),
span,
node_handle,
));
}
}
VisitControlFlow::Skip(node) => {
self.insert(Some(parent_handle.clone()), GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
return GraphTraversalResult::Aborted(abort)
}
}
}
}
Some((_, _, Err(err))) => {
return GraphTraversalResult::Completed(Err(err));
}
None => {
return GraphTraversalResult::Completed(Ok(self));
}
}
}
}
}

Expand All @@ -91,47 +131,6 @@ where
}
}

/// A future that resolves to a [`GraphStore`] containing the result of a graph
/// traversal.
pub struct GraphTraversalFuture<Store, VisitImpl, Abort, Impl, EdgesFuture>
where
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
EdgesFuture: Future,
{
state: GraphTraversalState<Store, VisitImpl, Abort, Impl, EdgesFuture>,
}

#[derive(Default)]
enum GraphTraversalState<Store, VisitImpl, Abort, Impl, EdgesFuture>
where
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
EdgesFuture: Future,
{
#[default]
Completed,
Aborted {
abort: Abort,
},
Running(GraphTraversalRunningState<Store, VisitImpl, Abort, Impl, EdgesFuture>),
}

struct GraphTraversalRunningState<Store, VisitImpl, Abort, Impl, EdgesFuture>
where
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
EdgesFuture: Future,
{
store: Store,
// This should be VisitImpl::EdgesFuture, but this causes a bug in the Rust
// compiler (see https://github.com/rust-lang/rust/issues/102211).
// Instead, we pass the associated type as an additional generic parameter.
futures: FuturesUnordered<With<EdgesFuture, Store::Handle>>,
visit: VisitImpl,
_phantom: std::marker::PhantomData<(Abort, Impl)>,
}

pub enum GraphTraversalResult<Completed, Aborted> {
Completed(Completed),
Aborted(Aborted),
Expand All @@ -145,93 +144,3 @@ impl<Completed> GraphTraversalResult<Completed, !> {
}
}
}

impl<Store, VisitImpl, Abort, Impl, EdgesFuture> Future
for GraphTraversalFuture<Store, VisitImpl, Abort, Impl, EdgesFuture>
where
Store: GraphStore,
// The EdgesFuture bound is necessary to avoid the compiler bug mentioned
// above.
VisitImpl: Visit<Store::Node, Abort, Impl, EdgesFuture = EdgesFuture>,
EdgesFuture: Future<Output = Result<VisitImpl::EdgesIntoIter>>,
{
type Output = GraphTraversalResult<Result<Store>, Abort>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };

let result;
(this.state, result) = match std::mem::take(&mut this.state) {
GraphTraversalState::Completed => {
panic!("polled after completion")
}
GraphTraversalState::Aborted { abort } => (
GraphTraversalState::Completed,
std::task::Poll::Ready(GraphTraversalResult::Aborted(abort)),
),
GraphTraversalState::Running(mut running) => 'outer: loop {
let futures_pin = unsafe { Pin::new_unchecked(&mut running.futures) };
match futures_pin.poll_next(cx) {
std::task::Poll::Ready(Some((parent_handle, span, Ok(edges)))) => {
let _guard = span.enter();
for edge in edges {
match running.visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((node_handle, node_ref)) = running
.store
.insert(Some(parent_handle.clone()), GraphNode(node))
{
let span = running.visit.span(node_ref);
running.futures.push(With::new(
running.visit.edges(node_ref),
span,
node_handle,
));
}
}
VisitControlFlow::Skip(node) => {
running
.store
.insert(Some(parent_handle.clone()), GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
break 'outer (
GraphTraversalState::Completed,
std::task::Poll::Ready(GraphTraversalResult::Aborted(
abort,
)),
);
}
}
}
}
std::task::Poll::Ready(Some((_, _, Err(err)))) => {
break (
GraphTraversalState::Completed,
std::task::Poll::Ready(GraphTraversalResult::Completed(Err(err))),
);
}
std::task::Poll::Ready(None) => {
break (
GraphTraversalState::Completed,
std::task::Poll::Ready(GraphTraversalResult::Completed(Ok(
running.store
))),
);
}
std::task::Poll::Pending => {
break (
GraphTraversalState::Running(running),
std::task::Poll::Pending,
);
}
}
},
};

result
}
}
5 changes: 4 additions & 1 deletion turbopack/crates/turbo-tasks/src/graph/non_deterministic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ impl<T> NonDeterministic<T> {
}
}

impl<T> GraphStore for NonDeterministic<T> {
impl<T> GraphStore for NonDeterministic<T>
where
T: Send,
{
type Node = T;
type Handle = ();

Expand Down
15 changes: 10 additions & 5 deletions turbopack/crates/turbo-tasks/src/graph/visit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::VisitControlFlow;
pub trait Visit<Node, Abort = !, Impl = ()> {
type Edge;
type EdgesIntoIter: IntoIterator<Item = Self::Edge>;
type EdgesFuture: Future<Output = Result<Self::EdgesIntoIter>>;
type EdgesFuture: Future<Output = Result<Self::EdgesIntoIter>> + Send;

/// Visits an edge to get to the neighbor node. Should return a
/// [`VisitControlFlow`] that indicates whether to:
Expand All @@ -19,8 +19,13 @@ pub trait Visit<Node, Abort = !, Impl = ()> {
/// * abort the traversal entirely.
fn visit(&mut self, edge: Self::Edge) -> VisitControlFlow<Node, Abort>;

/// Returns a future that resolves to the outgoing edges of the given
/// `node`.
/// Returns a future that resolves to the outgoing edges of the given `node`.
///
/// Lifetimes:
/// - The returned future's lifetime cannot depend on the reference to self because there are
/// multiple `edges` futures created and awaited concurrently.
/// - The returned future's lifetime cannot depend on `node` because `GraphStore::insert`
/// returns a node reference that's only valid for the lifetime of its `&mut self` reference.
fn edges(&mut self, node: &Node) -> Self::EdgesFuture;

/// Returns a [Span] for the given `node`, under which all edges are
Expand All @@ -41,7 +46,7 @@ pub struct ImplRef;
impl<Node, VisitFn, NeighFut, NeighIt> Visit<Node, !, ImplRef> for VisitFn
where
VisitFn: FnMut(&Node) -> NeighFut,
NeighFut: Future<Output = Result<NeighIt>>,
NeighFut: Future<Output = Result<NeighIt>> + Send,
NeighIt: IntoIterator<Item = Node>,
{
type Edge = Node;
Expand All @@ -63,7 +68,7 @@ impl<Node, VisitFn, NeighFut, NeighIt> Visit<Node, !, ImplValue> for VisitFn
where
Node: Clone,
VisitFn: FnMut(Node) -> NeighFut,
NeighFut: Future<Output = Result<NeighIt>>,
NeighFut: Future<Output = Result<NeighIt>> + Send,
NeighIt: IntoIterator<Item = Node>,
{
type Edge = Node;
Expand Down
2 changes: 0 additions & 2 deletions turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
#![feature(arbitrary_self_types)]
#![feature(arbitrary_self_types_pointers)]
#![feature(new_zeroed_alloc)]
#![feature(type_alias_impl_trait)]
#![feature(never_type)]
#![feature(impl_trait_in_assoc_type)]

pub mod backend;
mod capture_future;
Expand Down
Loading