Skip to content

Commit

Permalink
feat: Emit children of join before shuffle + add stats to explain ana…
Browse files Browse the repository at this point in the history
…lyze (#3852)

Follow on from: #3781

This PR modifies stagification logic on joins to emit the children of
joins before any shuffle exchanges, if necessary. This is so we can
dynamically adjust partitioning schemes for shuffles based on exact
runtime stats of the input data.

Additionally, this PR adds stats (time taken, num rows, size bytes) per
stage to the explain analyze.

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
  • Loading branch information
3 people authored Feb 26, 2025
1 parent 6d92c6f commit af2a841
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 446 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,9 @@ class AdaptivePhysicalPlanScheduler:
size_bytes: int,
num_rows: int,
) -> None: ...
def update_stats(
self, time_taken: float, size_bytes: int | None, num_rows: int | None, stage_id: int | None
) -> None: ...
def explain_analyze(self, explain_analyze_dir: str) -> None: ...

class LogicalPlanBuilder:
Expand Down
5 changes: 5 additions & 0 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,10 @@ def update(self, stage_id: int, cache_entry: PartitionCacheEntry):
num_rows=num_rows,
)

def update_stats(
self, time_taken: float, size_bytes: int | None, num_rows: int | None, stage_id: int | None
) -> None:
self._scheduler.update_stats(time_taken, size_bytes, num_rows, stage_id)

def explain_analyze(self, explain_analyze_dir: str) -> None:
self._scheduler.explain_analyze(explain_analyze_dir)
23 changes: 20 additions & 3 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,7 @@ def run_iter(
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
while not adaptive_planner.is_done():
stage_id, plan_scheduler = adaptive_planner.next()
start_time = time.time()
# don't store partition sets in variable to avoid reference
result_uuid = self._start_plan(
plan_scheduler, daft_execution_config, results_buffer_size=results_buffer_size
Expand All @@ -1290,18 +1291,34 @@ def run_iter(
results_iter = self._stream_plan(result_uuid)
# if stage_id is None that means this is the final stage
if stage_id is None:
yield from results_iter
num_rows_processed = 0
bytes_processed = 0

for result in results_iter:
num_rows_processed += result.metadata().num_rows
size_bytes = result.metadata().size_bytes
if size_bytes is not None:
bytes_processed += size_bytes
yield result
adaptive_planner.update_stats(
time.time() - start_time, bytes_processed, num_rows_processed, stage_id
)
else:
cache_entry = self._collect_into_cache(results_iter)
adaptive_planner.update_stats(
time.time() - start_time, cache_entry.size_bytes(), cache_entry.num_rows(), stage_id
)
adaptive_planner.update(stage_id, cache_entry)
del cache_entry

enable_explain_analyze = os.getenv("DAFT_DEV_ENABLE_EXPLAIN_ANALYZE")
ray_logs_location = ray_tracing.get_log_location()
if (
should_explain_analyze = (
ray_logs_location.exists()
and enable_explain_analyze is not None
and enable_explain_analyze in ["1", "true"]
):
)
if should_explain_analyze:
explain_analyze_dir = ray_tracing.get_daft_trace_location(ray_logs_location)
explain_analyze_dir.mkdir(exist_ok=True, parents=True)
adaptive_planner.explain_analyze(str(explain_analyze_dir))
Expand Down
35 changes: 34 additions & 1 deletion src/common/display/src/mermaid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct SubgraphOptions {
pub name: String,
/// The unique id for the subgraph.
pub subgraph_id: String,
/// metadata
pub metadata: Option<String>,
}

impl<T: TreeDisplay> MermaidDisplay for T {
Expand Down Expand Up @@ -146,9 +148,40 @@ where
}

pub fn fmt(&mut self, node: &dyn TreeDisplay) -> fmt::Result {
if let Some(SubgraphOptions { name, subgraph_id }) = &self.subgraph_options {
if let Some(SubgraphOptions {
name,
subgraph_id,
metadata,
}) = &self.subgraph_options
{
writeln!(self.output, r#"subgraph {subgraph_id}["{name}"]"#)?;
if self.bottom_up {
writeln!(self.output, r#"direction BT"#)?;
} else {
writeln!(self.output, r#"direction TB"#)?;
}
// add metadata to the subgraph
let metadata_id = if let Some(metadata) = metadata {
let id = format!("{subgraph_id}_metadata");
writeln!(self.output, r#"{id}["{metadata}"]"#)?;
Some(id)
} else {
None
};

self.fmt_node(node)?;

// stack metadata on top of first node with an invisible edge
if let Some(metadata_id) = metadata_id {
if self.bottom_up {
let first_node_id = self.nodes.values().next().unwrap();
writeln!(self.output, r#"{first_node_id} ~~~ {metadata_id}"#)?;
} else {
let last_node_id = self.nodes.values().last().unwrap();
writeln!(self.output, r#"{metadata_id} ~~~ {last_node_id}"#)?;
};
}

writeln!(self.output, "end")?;
} else {
if self.bottom_up {
Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,13 @@ Project1 --> Limit0
subgraph_options: Some(SubgraphOptions {
name: "Optimized Logical Plan".to_string(),
subgraph_id: "optimized".to_string(),
metadata: None,
}),
};

let mermaid_repr = plan.repr_mermaid(opts);
let expected = r#"subgraph optimized["Optimized Logical Plan"]
direction TB
optimizedSource0["PlaceHolder:
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
Expand Down
1 change: 1 addition & 0 deletions src/daft-physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ daft-logical-plan = {path = "../daft-logical-plan", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
itertools = {workspace = true}
log = {workspace = true}
num-format = {workspace = true}
serde = {workspace = true, features = ["rc"]}

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/daft-physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ mod test;

pub use physical_planner::{
extract_agg_expr, logical_to_physical, populate_aggregation_stages, AdaptivePlanner,
MaterializedResults, QueryStageOutput,
MaterializedResults, QueryStageOutput, StageStats,
};
pub use plan::{PhysicalPlan, PhysicalPlanRef};
11 changes: 10 additions & 1 deletion src/daft-physical-plan/src/physical_planner/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ where
let subgraph_options = SubgraphOptions {
name,
subgraph_id: stage_id,
metadata: Some(node.stats.as_ref().unwrap().to_string()),
};
let display = node.physical_plan.repr_mermaid(MermaidDisplayOptions {
simple,
Expand Down Expand Up @@ -79,8 +80,16 @@ where
}

pub fn fmt(&mut self, node: &EmittedStage) -> fmt::Result {
if let Some(SubgraphOptions { name, subgraph_id }) = &self.options.subgraph_options {
if let Some(SubgraphOptions {
name, subgraph_id, ..
}) = &self.options.subgraph_options
{
writeln!(self.output, r#"subgraph {subgraph_id}["{name}"]"#)?;
if self.options.bottom_up {
writeln!(self.output, r#"direction BT"#)?;
} else {
writeln!(self.output, r#"direction TB"#)?;
}
self.fmt_node(node)?;
writeln!(self.output, "end")?;
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-physical-plan/src/physical_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use daft_logical_plan::LogicalPlan;
mod planner;

use planner::PhysicalPlanTranslator;
pub use planner::{AdaptivePlanner, MaterializedResults, QueryStageOutput};
pub use planner::{AdaptivePlanner, MaterializedResults, QueryStageOutput, StageStats};

use crate::{optimization::optimizer::PhysicalOptimizer, PhysicalPlanRef};
mod translate;
Expand Down
Loading

0 comments on commit af2a841

Please sign in to comment.