diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2bc27649ec..d8bdb56b93 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -28,7 +28,12 @@ jobs: daft-runner: [py, ray, native] pyarrow-version: [8.0.0, 16.0.0] os: [ubuntu-20.04, windows-latest] + enable-aqe: [1, 0] exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 - daft-runner: ray pyarrow-version: 8.0.0 os: ubuntu-20.04 @@ -119,6 +124,7 @@ jobs: CARGO_TARGET_DIR: ./target DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Build library and Test with pytest (Windows) if: ${{ (runner.os == 'Windows') }} @@ -135,6 +141,7 @@ jobs: # cargo llvm-cov --no-run --lcov --output-path report-output\rust-coverage-${{ join(matrix.*, '-') }}.lcov env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Upload coverage report uses: actions/upload-artifact@v4 @@ -221,6 +228,12 @@ jobs: matrix: python-version: ['3.9'] daft-runner: [py, ray, native] + enable-aqe: [1, 0] + exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 steps: - uses: actions/checkout@v4 with: @@ -263,7 +276,7 @@ jobs: pytest tests/integration/test_tpch.py --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} - + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v2.0.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -296,6 +309,12 @@ jobs: matrix: python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray, native] + enable-aqe: [1, 0] + exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 steps: - uses: actions/checkout@v4 with: @@ -340,6 +359,7 @@ jobs: pytest tests/io -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v2.0.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -374,6 +394,12 @@ jobs: matrix: python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray, native] + enable-aqe: [1, 0] + exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 # These permissions are needed to interact with GitHub's OIDC Token endpoint. # This is used in the step "Assume GitHub Actions AWS Credentials" permissions: @@ -436,6 +462,7 @@ jobs: pytest tests/integration/io -m 'integration and not benchmark' --credentials --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v2.0.0 if: ${{ failure() }} @@ -468,6 +495,12 @@ jobs: matrix: python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray, native] + enable-aqe: [1, 0] + exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 steps: - uses: actions/checkout@v4 with: @@ -513,6 +546,7 @@ jobs: pytest tests/integration/iceberg -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v2.0.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -545,6 +579,12 @@ jobs: matrix: python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs daft-runner: [py, ray, native] + enable-aqe: [1, 0] + exclude: + - daft-runner: py + enable-aqe: 1 + - daft-runner: native + enable-aqe: 1 steps: - uses: actions/checkout@v4 with: @@ -589,6 +629,7 @@ jobs: pytest tests/integration/sql -m 'integration or not integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} + DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v2.0.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index af497a7ec0..0a9cbe2a4f 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1588,13 +1588,14 @@ class AdaptivePhysicalPlanScheduler: # Todo use in memory info here instead def update( self, - source_id: int, + stage_id: int, partition_key: str, cache_entry: PartitionCacheEntry, num_partitions: int, size_bytes: int, num_rows: int, ) -> None: ... + def explain_analyze(self, explain_analyze_dir: str) -> None: ... class LogicalPlanBuilder: """A logical plan builder, which simplifies constructing logical plans via a fluent interface. diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 6391d9d4d3..9eb76a9758 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -83,7 +83,7 @@ def next(self) -> tuple[int | None, PhysicalPlanScheduler]: def is_done(self) -> bool: return self._scheduler.is_done() - def update(self, source_id: int, cache_entry: PartitionCacheEntry): + def update(self, stage_id: int, cache_entry: PartitionCacheEntry): num_partitions = cache_entry.num_partitions() assert num_partitions is not None size_bytes = cache_entry.size_bytes() @@ -92,10 +92,13 @@ def update(self, source_id: int, cache_entry: PartitionCacheEntry): assert num_rows is not None self._scheduler.update( - source_id, + stage_id, cache_entry.key, cache_entry, num_partitions=num_partitions, size_bytes=size_bytes, num_rows=num_rows, ) + + def explain_analyze(self, explain_analyze_dir: str) -> None: + self._scheduler.explain_analyze(explain_analyze_dir) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 91f3715825..c65d2507f9 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -3,6 +3,7 @@ import contextlib import dataclasses import logging +import os import threading import time import uuid @@ -1280,20 +1281,30 @@ def run_iter( if daft_execution_config.enable_aqe: adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config) while not adaptive_planner.is_done(): - source_id, plan_scheduler = adaptive_planner.next() + stage_id, plan_scheduler = adaptive_planner.next() # don't store partition sets in variable to avoid reference result_uuid = self._start_plan( plan_scheduler, daft_execution_config, results_buffer_size=results_buffer_size ) del plan_scheduler results_iter = self._stream_plan(result_uuid) - # if source_id is None that means this is the final stage - if source_id is None: + # if stage_id is None that means this is the final stage + if stage_id is None: yield from results_iter else: cache_entry = self._collect_into_cache(results_iter) - adaptive_planner.update(source_id, cache_entry) + adaptive_planner.update(stage_id, cache_entry) del cache_entry + enable_explain_analyze = os.getenv("DAFT_DEV_ENABLE_EXPLAIN_ANALYZE") + ray_logs_location = ray_tracing.get_log_location() + if ( + ray_logs_location.exists() + and enable_explain_analyze is not None + and enable_explain_analyze in ["1", "true"] + ): + explain_analyze_dir = ray_tracing.get_daft_trace_location(ray_logs_location) + explain_analyze_dir.mkdir(exist_ok=True, parents=True) + adaptive_planner.explain_analyze(str(explain_analyze_dir)) else: # Finalize the logical plan and get a physical plan scheduler for translating the # physical plan to executable tasks. diff --git a/src/daft-logical-plan/src/builder/mod.rs b/src/daft-logical-plan/src/builder/mod.rs index 09f3ff50a5..3599718191 100644 --- a/src/daft-logical-plan/src/builder/mod.rs +++ b/src/daft-logical-plan/src/builder/mod.rs @@ -137,11 +137,12 @@ impl LogicalPlanBuilder { let source_info = SourceInfo::InMemory(InMemoryInfo::new( schema.clone(), partition_key.into(), - cache_entry, + Some(cache_entry), num_partitions, size_bytes, num_rows, None, // TODO(sammy) thread through clustering spec to Python + None, )); let logical_plan: LogicalPlan = ops::Source::new(schema, source_info.into()).into(); diff --git a/src/daft-logical-plan/src/display.rs b/src/daft-logical-plan/src/display.rs index eec605c096..c8f65afc10 100644 --- a/src/daft-logical-plan/src/display.rs +++ b/src/daft-logical-plan/src/display.rs @@ -56,7 +56,6 @@ mod test { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() @@ -76,7 +75,6 @@ mod test { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() @@ -126,7 +124,6 @@ On = col(id) Output schema = id#Int32, text#Utf8, id2#UInt64, first_name#Utf8, last_name#Utf8"] Filter4["Filter: col(id) == lit(1)"] Source5["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = text#Utf8, id#Int32"] Source5 --> Filter4 @@ -138,7 +135,6 @@ Limit9["Limit: 1000"] Filter10["Filter: startswith(col(last_name), lit('S')) & endswith(col(last_name), lit('n'))"] Source11["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = first_name#Utf8, last_name#Utf8, id#Int32"] Source11 --> Filter10 @@ -237,7 +233,6 @@ Project1 --> Limit0 let mermaid_repr = plan.repr_mermaid(opts); let expected = r#"subgraph optimized["Optimized Logical Plan"] optimizedSource0["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = text#Utf8, id#Int32"] end diff --git a/src/daft-logical-plan/src/ops/source.rs b/src/daft-logical-plan/src/ops/source.rs index 5f3172c28f..2315c86deb 100644 --- a/src/daft-logical-plan/src/ops/source.rs +++ b/src/daft-logical-plan/src/ops/source.rs @@ -116,12 +116,9 @@ impl Source { res.push(format!("Number of partitions = {}", num_partitions)); } SourceInfo::PlaceHolder(PlaceHolderInfo { - source_id, - clustering_spec, - .. + clustering_spec, .. }) => { res.push("PlaceHolder:".to_string()); - res.push(format!("Source ID = {}", source_id)); res.extend(clustering_spec.multiline_display()); } } diff --git a/src/daft-logical-plan/src/optimization/optimizer.rs b/src/daft-logical-plan/src/optimization/optimizer.rs index 4bf7f86f8f..82ed758d62 100644 --- a/src/daft-logical-plan/src/optimization/optimizer.rs +++ b/src/daft-logical-plan/src/optimization/optimizer.rs @@ -157,6 +157,13 @@ impl Default for OptimizerBuilder { } impl OptimizerBuilder { + pub fn new() -> Self { + Self { + rule_batches: vec![], + config: Default::default(), + } + } + pub fn reorder_joins(mut self) -> Self { self.rule_batches.push(RuleBatch::new( vec![ @@ -168,6 +175,14 @@ impl OptimizerBuilder { self } + pub fn enrich_with_stats(mut self) -> Self { + self.rule_batches.push(RuleBatch::new( + vec![Box::new(EnrichWithStats::new())], + RuleExecutionStrategy::Once, + )); + self + } + pub fn simplify_expressions(mut self) -> Self { // Try to simplify expressions again as other rules could introduce new exprs. self.rule_batches.push(RuleBatch::new( diff --git a/src/daft-logical-plan/src/optimization/rules/eliminate_cross_join.rs b/src/daft-logical-plan/src/optimization/rules/eliminate_cross_join.rs index 55fdcf4887..8b01ae58b0 100644 --- a/src/daft-logical-plan/src/optimization/rules/eliminate_cross_join.rs +++ b/src/daft-logical-plan/src/optimization/rules/eliminate_cross_join.rs @@ -449,7 +449,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() @@ -470,7 +469,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() diff --git a/src/daft-logical-plan/src/optimization/rules/simplify_expressions.rs b/src/daft-logical-plan/src/optimization/rules/simplify_expressions.rs index a5c42fe6a7..52a8820d68 100644 --- a/src/daft-logical-plan/src/optimization/rules/simplify_expressions.rs +++ b/src/daft-logical-plan/src/optimization/rules/simplify_expressions.rs @@ -70,7 +70,6 @@ mod test { source_info: Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), stats_state: StatsState::NotMaterialized, }) diff --git a/src/daft-logical-plan/src/source_info/mod.rs b/src/daft-logical-plan/src/source_info/mod.rs index 9ae2f1d68b..7579a6a7bb 100644 --- a/src/daft-logical-plan/src/source_info/mod.rs +++ b/src/daft-logical-plan/src/source_info/mod.rs @@ -1,8 +1,5 @@ pub mod file_info; -use std::{ - hash::{Hash, Hasher}, - sync::atomic::AtomicUsize, -}; +use std::hash::{Hash, Hasher}; use common_partitioning::PartitionCacheEntry; use common_scan_info::PhysicalScanInfo; @@ -23,22 +20,25 @@ pub enum SourceInfo { pub struct InMemoryInfo { pub source_schema: SchemaRef, pub cache_key: String, - pub cache_entry: PartitionCacheEntry, + pub cache_entry: Option, pub num_partitions: usize, pub size_bytes: usize, pub num_rows: usize, pub clustering_spec: Option, + pub source_stage_id: Option, } impl InMemoryInfo { + #[allow(clippy::too_many_arguments)] pub fn new( source_schema: SchemaRef, cache_key: String, - cache_entry: PartitionCacheEntry, + cache_entry: Option, num_partitions: usize, size_bytes: usize, num_rows: usize, clustering_spec: Option, + source_stage_id: Option, ) -> Self { Self { source_schema, @@ -48,6 +48,7 @@ impl InMemoryInfo { size_bytes, num_rows, clustering_spec, + source_stage_id, } } } @@ -66,13 +67,10 @@ impl Hash for InMemoryInfo { } } -static PLACEHOLDER_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); - #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct PlaceHolderInfo { pub source_schema: SchemaRef, pub clustering_spec: ClusteringSpecRef, - pub source_id: usize, } impl PlaceHolderInfo { @@ -80,7 +78,6 @@ impl PlaceHolderInfo { Self { source_schema, clustering_spec, - source_id: PLACEHOLDER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst), } } } diff --git a/src/daft-physical-plan/src/display.rs b/src/daft-physical-plan/src/display.rs index 42c827cd25..8f5b2e5d2a 100644 --- a/src/daft-physical-plan/src/display.rs +++ b/src/daft-physical-plan/src/display.rs @@ -10,6 +10,7 @@ impl TreeDisplay for PhysicalPlan { Self::InMemoryScan(scan) => scan.display_as(level), Self::TabularScan(scan) => scan.display_as(level), Self::EmptyScan(scan) => scan.display_as(level), + Self::PreviousStageScan(scan) => scan.display_as(level), Self::Project(p) => p.display_as(level), Self::ActorPoolProject(p) => p.display_as(level), Self::Filter(f) => f.display_as(level), diff --git a/src/daft-physical-plan/src/ops/in_memory.rs b/src/daft-physical-plan/src/ops/in_memory.rs index 96c20c052f..667386d846 100644 --- a/src/daft-physical-plan/src/ops/in_memory.rs +++ b/src/daft-physical-plan/src/ops/in_memory.rs @@ -34,6 +34,9 @@ impl InMemoryScan { "Clustering spec = {{ {} }}", self.clustering_spec.multiline_display().join(", ") )); + if let Some(source_stage_id) = self.in_memory_info.source_stage_id { + res.push(format!("Source Stage ID = {}", source_stage_id)); + } res } } @@ -46,10 +49,16 @@ impl TreeDisplay for InMemoryScan { "InMemoryScan: Schema = {}, Size bytes = {}, -Clustering spec = {{ {} }}", +Clustering spec = {{ {} }} +Source Stage ID = {} +", self.schema.short_string(), self.in_memory_info.size_bytes, - self.clustering_spec.multiline_display().join(", ") + self.clustering_spec.multiline_display().join(", "), + match self.in_memory_info.source_stage_id { + Some(source_stage_id) => source_stage_id.to_string(), + None => "None".to_string(), + } ) } DisplayLevel::Verbose => todo!(), diff --git a/src/daft-physical-plan/src/ops/mod.rs b/src/daft-physical-plan/src/ops/mod.rs index 78a10eb7c6..3f1735a5fe 100644 --- a/src/daft-physical-plan/src/ops/mod.rs +++ b/src/daft-physical-plan/src/ops/mod.rs @@ -20,6 +20,7 @@ mod limit; mod monotonically_increasing_id; mod parquet; mod pivot; +mod previous_stage_scan; mod project; mod sample; mod scan; @@ -50,6 +51,7 @@ pub use limit::Limit; pub use monotonically_increasing_id::MonotonicallyIncreasingId; pub use parquet::TabularWriteParquet; pub use pivot::Pivot; +pub use previous_stage_scan::PreviousStageScan; pub use project::Project; pub use sample::Sample; pub use scan::TabularScan; diff --git a/src/daft-physical-plan/src/ops/previous_stage_scan.rs b/src/daft-physical-plan/src/ops/previous_stage_scan.rs new file mode 100644 index 0000000000..160f576ef5 --- /dev/null +++ b/src/daft-physical-plan/src/ops/previous_stage_scan.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use common_display::{tree::TreeDisplay, DisplayLevel}; +use daft_logical_plan::ClusteringSpec; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PreviousStageScan { + pub clustering_spec: Arc, +} + +impl PreviousStageScan { + pub(crate) fn new(clustering_spec: Arc) -> Self { + Self { clustering_spec } + } + + pub fn multiline_display(&self) -> Vec { + vec![ + "PreviousStageScan".to_string(), + self.clustering_spec.multiline_display().join(", "), + ] + } + + pub fn clustering_spec(&self) -> &Arc { + &self.clustering_spec + } +} +impl TreeDisplay for PreviousStageScan { + fn display_as(&self, level: DisplayLevel) -> String { + match level { + DisplayLevel::Compact => self.get_name(), + DisplayLevel::Default => { + format!( + "PreviousStageScan: +Clustering spec = {{ {} }}", + self.clustering_spec.multiline_display().join(", ") + ) + } + DisplayLevel::Verbose => todo!(), + } + } + + fn get_name(&self) -> String { + "PreviousStageScan".to_string() + } + + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + vec![] + } +} diff --git a/src/daft-physical-plan/src/optimization/rules/reorder_partition_keys.rs b/src/daft-physical-plan/src/optimization/rules/reorder_partition_keys.rs index 787f52e5ec..017f9e56a5 100644 --- a/src/daft-physical-plan/src/optimization/rules/reorder_partition_keys.rs +++ b/src/daft-physical-plan/src/optimization/rules/reorder_partition_keys.rs @@ -156,6 +156,7 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys { PhysicalPlan::InMemoryScan(..) | PhysicalPlan::TabularScan(..) | PhysicalPlan::EmptyScan(..) | + PhysicalPlan::PreviousStageScan(..) | PhysicalPlan::Concat(..) | PhysicalPlan::HashJoin(..) | PhysicalPlan::SortMergeJoin(..) | diff --git a/src/daft-physical-plan/src/physical_planner/display.rs b/src/daft-physical-plan/src/physical_planner/display.rs new file mode 100644 index 0000000000..f775a15442 --- /dev/null +++ b/src/daft-physical-plan/src/physical_planner/display.rs @@ -0,0 +1,97 @@ +use std::fmt; + +use common_display::mermaid::{MermaidDisplay, MermaidDisplayOptions, SubgraphOptions}; + +use super::planner::EmittedStage; + +pub struct StageDisplayMermaidVisitor<'a, W> { + output: &'a mut W, + options: MermaidDisplayOptions, +} + +impl<'a, W> StageDisplayMermaidVisitor<'a, W> { + pub fn new(w: &'a mut W, options: MermaidDisplayOptions) -> Self { + Self { output: w, options } + } +} + +impl<'a, W> StageDisplayMermaidVisitor<'a, W> +where + W: fmt::Write, +{ + fn add_node(&mut self, node: &EmittedStage) -> fmt::Result { + let display = self.display_for_node(node)?; + write!(self.output, "{}", display)?; + Ok(()) + } + + fn display_for_node(&self, node: &EmittedStage) -> Result { + let simple = self.options.simple; + let bottom_up = self.options.bottom_up; + let stage_id = self.get_node_id(node); + let name = self.get_node_name(node); + let subgraph_options = SubgraphOptions { + name, + subgraph_id: stage_id, + }; + let display = node.physical_plan.repr_mermaid(MermaidDisplayOptions { + simple, + bottom_up, + subgraph_options: Some(subgraph_options), + }); + Ok(display) + } + + // Get the id of a node that has already been added. + fn get_node_id(&self, node: &EmittedStage) -> String { + let id = match node.stage_id { + Some(id) => id.to_string(), + None => "final".to_string(), + }; + format!("stage_{}", id) + } + + fn get_node_name(&self, node: &EmittedStage) -> String { + let id = match node.stage_id { + Some(id) => id.to_string(), + None => "final".to_string(), + }; + format!("Stage {}", id) + } + + fn add_edge(&mut self, parent: String, child: String) -> fmt::Result { + writeln!(self.output, r#"{child} --> {parent}"#) + } + + fn fmt_node(&mut self, node: &EmittedStage) -> fmt::Result { + self.add_node(node)?; + let children = &node.input_stages; + if children.is_empty() { + return Ok(()); + } + + for child in children { + self.fmt_node(child)?; + self.add_edge(self.get_node_id(node), self.get_node_id(child))?; + } + + Ok(()) + } + + pub fn fmt(&mut self, node: &EmittedStage) -> fmt::Result { + if let Some(SubgraphOptions { name, subgraph_id }) = &self.options.subgraph_options { + writeln!(self.output, r#"subgraph {subgraph_id}["{name}"]"#)?; + self.fmt_node(node)?; + writeln!(self.output, "end")?; + } else { + if self.options.bottom_up { + writeln!(self.output, "flowchart BT")?; + } else { + writeln!(self.output, "flowchart TD")?; + } + + self.fmt_node(node)?; + } + Ok(()) + } +} diff --git a/src/daft-physical-plan/src/physical_planner/mod.rs b/src/daft-physical-plan/src/physical_planner/mod.rs index 0cae9e62d7..d7bfe77c13 100644 --- a/src/daft-physical-plan/src/physical_planner/mod.rs +++ b/src/daft-physical-plan/src/physical_planner/mod.rs @@ -13,6 +13,8 @@ use crate::{optimization::optimizer::PhysicalOptimizer, PhysicalPlanRef}; mod translate; pub use translate::{extract_agg_expr, populate_aggregation_stages}; +mod display; + /// Translate a logical plan to a physical plan. pub fn logical_to_physical( logical_plan: Arc, diff --git a/src/daft-physical-plan/src/physical_planner/planner.rs b/src/daft-physical-plan/src/physical_planner/planner.rs index 149cae0c29..7792badcb6 100644 --- a/src/daft-physical-plan/src/physical_planner/planner.rs +++ b/src/daft-physical-plan/src/physical_planner/planner.rs @@ -1,6 +1,14 @@ -use std::sync::Arc; +use std::{ + cmp::Ordering, + collections::HashMap, + fs::File, + io::Write, + sync::{atomic::AtomicUsize, Arc}, + time::{SystemTime, UNIX_EPOCH}, +}; use common_daft_config::DaftExecutionConfig; +use common_display::mermaid::MermaidDisplayOptions; use common_error::DaftResult; use common_treenode::{ DynTreeNode, Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, @@ -13,8 +21,13 @@ use daft_logical_plan::{ }; use serde::{Deserialize, Serialize}; -use super::translate::translate_single_logical_node; -use crate::{PhysicalPlan, PhysicalPlanRef}; +use super::{display::StageDisplayMermaidVisitor, translate::translate_single_logical_node}; +use crate::{ + ops::{InMemoryScan, PreviousStageScan}, + PhysicalPlan, PhysicalPlanRef, +}; + +static STAGE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); pub(super) struct PhysicalPlanTranslator { pub physical_children: Vec>, @@ -28,35 +41,20 @@ impl TreeNodeVisitor for PhysicalPlanTranslator { } fn f_up(&mut self, node: &Self::Node) -> DaftResult { - let output = translate_single_logical_node(node, &mut self.physical_children, &self.cfg)?; + let (output, _) = + translate_single_logical_node(node, &mut self.physical_children, &self.cfg)?; self.physical_children.push(output); Ok(TreeNodeRecursion::Continue) } } -pub(super) struct QueryStagePhysicalPlanTranslator { +pub(super) struct LogicalStageTranslator { pub physical_children: Vec>, pub root: Arc, pub cfg: Arc, - pub source_id: Option, -} - -fn is_query_stage_boundary(plan: &PhysicalPlan) -> bool { - use crate::PhysicalPlan::*; - match plan { - Sort(..) | HashJoin(..) | SortMergeJoin(..) | ShuffleExchange(..) => { - plan.clustering_spec().num_partitions() > 1 - } - Aggregate(agg) => match agg.input.as_ref() { - ShuffleExchange(..) => plan.clustering_spec().num_partitions() > 1, - _ => false, - }, - Project(proj) => is_query_stage_boundary(&proj.input), - _ => false, - } } -impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { +impl TreeNodeRewriter for LogicalStageTranslator { type Node = Arc; fn f_down(&mut self, node: Self::Node) -> DaftResult> { @@ -64,12 +62,11 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { } fn f_up(&mut self, node: Self::Node) -> DaftResult> { - let translated_pplan = + let (translated_pplan, is_logical_boundary) = translate_single_logical_node(&node, &mut self.physical_children, &self.cfg)?; - let is_query_stage_boundary = is_query_stage_boundary(&translated_pplan); let is_root_node = Arc::ptr_eq(&node, &self.root); - if is_query_stage_boundary && !is_root_node { + if is_logical_boundary && !is_root_node { log::info!( "Detected Query Stage Boundary at {}", translated_pplan.name() @@ -81,9 +78,6 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { let ph_info = PlaceHolderInfo::new(node.schema(), translated_pplan.clustering_spec()); - assert_eq!(self.source_id, None); - self.source_id = Some(ph_info.source_id); - let new_scan = LogicalPlan::Source(Source::new( node.schema(), SourceInfo::PlaceHolder(ph_info).into(), @@ -96,16 +90,11 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { } [left, right] => { enum RunNext { - Parent, Left, Right, } let run_next: RunNext = match (left.as_ref(), right.as_ref()) { - (PhysicalPlan::InMemoryScan(..), PhysicalPlan::InMemoryScan(..)) => { - // both are in memory, emit as is. - RunNext::Parent - } (PhysicalPlan::InMemoryScan(..), _) => { // we know the left, so let's run the right RunNext::Right @@ -115,37 +104,43 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { RunNext::Left } (_, _) => { - // both sides are not in memory, so we should rank which side to run - let left_stats = left.approximate_stats(); - let right_stats = right.approximate_stats(); - - if left_stats.size_bytes <= right_stats.size_bytes { - RunNext::Left - } else { - RunNext::Right + let left_num_in_memory_children = num_in_memory_children(left.as_ref()); + let right_num_in_memory_children = + num_in_memory_children(right.as_ref()); + + // Bias towards the side that has more materialized children + match left_num_in_memory_children.cmp(&right_num_in_memory_children) { + Ordering::Greater => RunNext::Left, + Ordering::Less => RunNext::Right, + Ordering::Equal => { + // Both sides are not in memory, so we should rank which side to run + let left_stats = left.approximate_stats(); + let right_stats = right.approximate_stats(); + + if left_stats.size_bytes <= right_stats.size_bytes { + RunNext::Left + } else { + RunNext::Right + } + } } } }; + + let logical_children = node.arc_children(); + let logical_left = logical_children.first().expect( + "we expect the logical node of a binary op to also have 2 children", + ); + let logical_right = logical_children.get(1).expect( + "we expect the logical node of a binary op to also have 2 children", + ); + match run_next { - RunNext::Parent => { - self.physical_children.push(translated_pplan.clone()); - Ok(Transformed::no(node)) - } RunNext::Left => { self.physical_children.push(left.clone()); - let logical_children = node.arc_children(); - let logical_left = logical_children.first().expect( - "we expect the logical node of a binary op to also have 2 children", - ); - let logical_right = logical_children.get(1).expect( - "we expect the logical node of a binary op to also have 2 children", - ); let ph_info = PlaceHolderInfo::new(logical_left.schema(), left.clustering_spec()); - assert_eq!(self.source_id, None); - self.source_id = Some(ph_info.source_id); - let new_left_scan = LogicalPlan::Source(Source::new( logical_left.schema(), SourceInfo::PlaceHolder(ph_info).into(), @@ -160,22 +155,11 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { } RunNext::Right => { self.physical_children.push(right.clone()); - let logical_children = node.arc_children(); - let logical_left = logical_children.first().expect( - "we expect the logical node of a binary op to also have 2 children", - ); - let logical_right = logical_children.get(1).expect( - "we expect the logical node of a binary op to also have 2 children", - ); - let ph_info = PlaceHolderInfo::new( logical_right.schema(), right.clustering_spec(), ); - assert_eq!(self.source_id, None); - self.source_id = Some(ph_info.source_id); - let new_right_scan = LogicalPlan::Source(Source::new( logical_right.schema(), SourceInfo::PlaceHolder(ph_info).into(), @@ -199,11 +183,11 @@ impl TreeNodeRewriter for QueryStagePhysicalPlanTranslator { } } -struct ReplacePlaceholdersWithMaterializedResult { +struct ReplaceLogicalPlaceholderWithMaterializedResults { mat_results: Option, } -impl TreeNodeRewriter for ReplacePlaceholdersWithMaterializedResult { +impl TreeNodeRewriter for ReplaceLogicalPlaceholderWithMaterializedResults { type Node = Arc; fn f_down(&mut self, node: Self::Node) -> DaftResult> { @@ -220,9 +204,6 @@ impl TreeNodeRewriter for ReplacePlaceholdersWithMaterializedResult { SourceInfo::PlaceHolder(phi) => { assert!(self.mat_results.is_some()); let mut mat_results = self.mat_results.take().unwrap(); - if mat_results.source_id != phi.source_id { - return Err(common_error::DaftError::ValueError(format!("During AQE: We are replacing a PlaceHolder Node with materialized results. There should only be 1 placeholder at a time but we found one with a different id, {} vs {}", mat_results.source_id, phi.source_id))); - } // use the clustering spec from the original plan mat_results.in_memory_info.clustering_spec = Some(phi.clustering_spec.clone()); mat_results.in_memory_info.source_schema = phi.source_schema.clone(); @@ -245,11 +226,149 @@ impl TreeNodeRewriter for ReplacePlaceholdersWithMaterializedResult { } } +struct PhysicalStageTranslator { + partial_physical_plan: Option, + root: Arc, +} + +impl TreeNodeRewriter for PhysicalStageTranslator { + type Node = Arc; + + fn f_down(&mut self, node: Self::Node) -> DaftResult> { + Ok(Transformed::no(node)) + } + + fn f_up(&mut self, node: Self::Node) -> DaftResult> { + // If not a shuffle exchange, don't transform + let shuffle_exchange = match node.as_ref() { + PhysicalPlan::ShuffleExchange(se) => se, + _ => return Ok(Transformed::no(node)), + }; + + // If child is not an in memory scan, emit the child + if !matches!( + shuffle_exchange.input.as_ref(), + PhysicalPlan::InMemoryScan(..) + ) { + let child = shuffle_exchange.input.clone(); + self.partial_physical_plan = Some(child.clone()); + + let placeholder = + PhysicalPlan::PreviousStageScan(PreviousStageScan::new(child.clustering_spec())); + + return Ok(Transformed::new( + node.with_new_children(&[placeholder.arced()]).arced(), + true, + TreeNodeRecursion::Stop, + )); + } + + // If it's a root node with in memory scan child, don't transform + let is_root = Arc::ptr_eq(&node, &self.root); + if is_root { + return Ok(Transformed::no(node)); + } + + // Otherwise emit the shuffle exchange and return transformed placeholder + self.partial_physical_plan = Some(node.clone()); + + let placeholder = + PhysicalPlan::PreviousStageScan(PreviousStageScan::new(node.clustering_spec())); + + Ok(Transformed::new( + placeholder.arced(), + true, + TreeNodeRecursion::Stop, + )) + } +} + +struct ReplacePreviousStageScanWithInMemoryScan { + mat_results: Option, +} + +impl TreeNodeRewriter for ReplacePreviousStageScanWithInMemoryScan { + type Node = Arc; + + fn f_down(&mut self, node: Self::Node) -> DaftResult> { + Ok(Transformed::no(node)) + } + + fn f_up(&mut self, node: Self::Node) -> DaftResult> { + match node.as_ref() { + PhysicalPlan::PreviousStageScan(ph_scan) => { + let mat_results = self.mat_results.take().unwrap(); + let new_source_node = PhysicalPlan::InMemoryScan(InMemoryScan::new( + mat_results.in_memory_info.source_schema.clone(), + mat_results.in_memory_info, + ph_scan.clustering_spec().clone(), + )); + Ok(Transformed::new( + new_source_node.arced(), + true, + TreeNodeRecursion::Stop, + )) + } + _ => Ok(Transformed::no(node)), + } + } +} + +// Strip the cache entry from the in memory scan, so that we don't hold a reference to the cache entry for plans that we cache for explain purposes +struct StripCacheEntryFromInMemoryScan {} + +impl TreeNodeRewriter for StripCacheEntryFromInMemoryScan { + type Node = Arc; + + fn f_down(&mut self, node: Self::Node) -> DaftResult> { + Ok(Transformed::no(node)) + } + + fn f_up(&mut self, node: Self::Node) -> DaftResult> { + match node.as_ref() { + PhysicalPlan::InMemoryScan(in_memory_scan) => { + // new in memory scan with no partition cache entry + let mut new_in_memory_info = in_memory_scan.in_memory_info.clone(); + new_in_memory_info.cache_entry = None; + let new_in_memory_scan = InMemoryScan::new( + in_memory_scan.schema.clone(), + new_in_memory_info, + in_memory_scan.clustering_spec.clone(), + ); + let new_physical_plan = PhysicalPlan::InMemoryScan(new_in_memory_scan); + Ok(Transformed::yes(new_physical_plan.arced())) + } + _ => Ok(Transformed::no(node)), + } + } +} + +fn num_in_memory_children(plan: &PhysicalPlan) -> usize { + if matches!(plan, PhysicalPlan::InMemoryScan(..)) { + 1 + } else { + plan.arc_children() + .iter() + .map(|c| num_in_memory_children(c)) + .sum() + } +} + +// Check that the physical plan has no cache entries after stripping them +fn has_cache_entries(plan: &PhysicalPlan) -> bool { + match plan { + PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info, .. }) => { + in_memory_info.cache_entry.is_some() + } + _ => plan.children().iter().any(|child| has_cache_entries(child)), + } +} + #[derive(Debug, Serialize, Deserialize)] pub enum QueryStageOutput { Partial { physical_plan: PhysicalPlanRef, - source_id: usize, + stage_id: Option, }, Final { physical_plan: PhysicalPlanRef, @@ -257,23 +376,21 @@ pub enum QueryStageOutput { } impl QueryStageOutput { - pub fn unwrap(self) -> (Option, PhysicalPlanRef) { + pub fn stage_id(&self) -> Option { match self { - Self::Partial { - physical_plan, - source_id, - } => (Some(source_id), physical_plan), - Self::Final { physical_plan } => (None, physical_plan), + Self::Partial { stage_id, .. } => *stage_id, + Self::Final { .. } => None, } } - pub fn source_id(&self) -> Option { + pub fn physical_plan(&self) -> &PhysicalPlanRef { match self { - Self::Partial { source_id, .. } => Some(*source_id), - Self::Final { .. } => None, + Self::Partial { physical_plan, .. } => physical_plan, + Self::Final { physical_plan, .. } => physical_plan, } } } + #[derive(PartialEq, Debug)] enum AdaptivePlannerStatus { Ready, @@ -282,12 +399,117 @@ enum AdaptivePlannerStatus { } pub struct MaterializedResults { - pub source_id: usize, + pub stage_id: usize, pub in_memory_info: InMemoryInfo, } +pub struct EmittedStage { + pub stage_id: Option, + pub physical_plan: PhysicalPlanRef, + pub input_stages: Vec, +} + +impl EmittedStage { + fn new(query_stage_output: &QueryStageOutput, input_stages: Vec) -> DaftResult { + let mut strip_cache_entry = StripCacheEntryFromInMemoryScan {}; + let physical_plan = query_stage_output + .physical_plan() + .clone() + .rewrite(&mut strip_cache_entry)? + .data; + + assert!( + !has_cache_entries(&physical_plan), + "Should not have cache entries in an emitted stage" + ); + Ok(Self { + stage_id: query_stage_output.stage_id(), + physical_plan, + input_stages, + }) + } +} + +enum StageCache { + Intermediate { + intermediate_stages: HashMap, + }, + Final { + final_stage: EmittedStage, + }, +} + +impl StageCache { + pub fn new() -> Self { + Self::Intermediate { + intermediate_stages: HashMap::new(), + } + } + + pub fn insert_stage(&mut self, stage: &QueryStageOutput) -> DaftResult<()> { + let intermediate_stages = match self { + Self::Intermediate { + intermediate_stages, + } => intermediate_stages, + Self::Final { .. } => panic!("Cannot insert stage if we already have a final stage"), + }; + + fn find_input_stage_ids(plan: &PhysicalPlan, stage_ids: &mut Vec) { + match plan { + PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info, .. }) => { + if let Some(stage_id) = in_memory_info.source_stage_id { + stage_ids.push(stage_id); + } + } + _ => { + for child in plan.children() { + find_input_stage_ids(child, stage_ids); + } + } + } + } + let mut stage_ids = vec![]; + find_input_stage_ids(stage.physical_plan(), &mut stage_ids); + + if stage_ids.is_empty() { + let emitted_stage = EmittedStage::new(stage, vec![])?; + if let Some(stage_id) = stage.stage_id() { + intermediate_stages.insert(stage_id, emitted_stage); + } else { + *self = Self::Final { + final_stage: emitted_stage, + }; + } + } else { + let input_stages = stage_ids + .iter() + .map(|stage_id| intermediate_stages.remove(stage_id).unwrap()) + .collect(); + let emitted_stage = EmittedStage::new(stage, input_stages)?; + if let Some(stage_id) = stage.stage_id() { + intermediate_stages.insert(stage_id, emitted_stage); + } else { + *self = Self::Final { + final_stage: emitted_stage, + }; + } + } + Ok(()) + } + + pub fn final_stage(&self) -> Option<&EmittedStage> { + match self { + Self::Final { final_stage } => Some(final_stage), + _ => None, + } + } +} + pub struct AdaptivePlanner { - logical_plan: LogicalPlanRef, + remaining_logical_plan: Option, + remaining_physical_plan: Option, + last_stage_id: usize, + stage_cache: StageCache, cfg: Arc, status: AdaptivePlannerStatus, } @@ -295,51 +517,110 @@ pub struct AdaptivePlanner { impl AdaptivePlanner { pub fn new(logical_plan: LogicalPlanRef, cfg: Arc) -> Self { Self { - logical_plan, + remaining_logical_plan: Some(logical_plan), + remaining_physical_plan: None, + last_stage_id: 0, + stage_cache: StageCache::new(), cfg, status: AdaptivePlannerStatus::Ready, } } + fn transform_physical_plan( + &self, + physical_plan: PhysicalPlanRef, + ) -> DaftResult<(PhysicalPlanRef, Option)> { + let mut physical_stage_translator = PhysicalStageTranslator { + partial_physical_plan: None, + root: physical_plan.clone(), + }; + let result = physical_plan.rewrite(&mut physical_stage_translator)?; + if result.transformed { + assert!(physical_stage_translator.partial_physical_plan.is_some()); + let physical_plan = physical_stage_translator.partial_physical_plan.unwrap(); + Ok((physical_plan, Some(result.data))) + } else { + assert!(physical_stage_translator.partial_physical_plan.is_none()); + let physical_plan = result.data; + Ok((physical_plan, None)) + } + } + pub fn next_stage(&mut self) -> DaftResult { assert_eq!(self.status, AdaptivePlannerStatus::Ready); - let mut rewriter = QueryStagePhysicalPlanTranslator { - physical_children: vec![], - root: self.logical_plan.clone(), - cfg: self.cfg.clone(), - source_id: None, - }; - let output = self.logical_plan.clone().rewrite(&mut rewriter)?; - let physical_plan = rewriter - .physical_children - .pop() - .expect("should have at least 1 child"); + // First, check if we have a remaining physical plan + let next_physical_plan = match self.remaining_physical_plan.take() { + Some(remaining_physical_plan) => { + let (physical_plan, remaining_physical_plan) = + self.transform_physical_plan(remaining_physical_plan)?; + self.remaining_physical_plan = remaining_physical_plan; - if output.transformed { - self.logical_plan = output.data; - self.status = AdaptivePlannerStatus::WaitingForStats; - let source_id = rewriter.source_id.expect("If we transformed the plan, it should have a placeholder and therefore a source_id"); + physical_plan + } + None => { + // If we have no remaining physical plan, we need to translate the remaining logical plan to a physical plan + let logical_plan = self.remaining_logical_plan.take().unwrap(); + let mut logical_rewriter = LogicalStageTranslator { + physical_children: vec![], + root: logical_plan.clone(), + cfg: self.cfg.clone(), + }; + let logical_output = logical_plan.rewrite(&mut logical_rewriter)?; + + // If we transformed the logical plan, this means we have a remaining logical plan + if logical_output.transformed { + self.remaining_logical_plan = Some(logical_output.data); + } + let physical_plan = logical_rewriter + .physical_children + .pop() + .expect("should have at least 1 child"); - log::info!( - "Emitting partial plan:\n {}", - physical_plan.repr_ascii(true) - ); + // Now, transform the new physical plan + let (physical_plan, remaining_physical_plan) = + self.transform_physical_plan(physical_plan)?; + self.remaining_physical_plan = remaining_physical_plan; - log::info!( - "Logical plan remaining:\n {}", - self.logical_plan.repr_ascii(true) - ); - Ok(QueryStageOutput::Partial { - physical_plan, - source_id, - }) - } else { - log::info!("Emitting final plan:\n {}", physical_plan.repr_ascii(true)); + physical_plan + } + }; + + let next_stage = + if self.remaining_physical_plan.is_some() || self.remaining_logical_plan.is_some() { + let stage_id = STAGE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.last_stage_id = stage_id; + QueryStageOutput::Partial { + physical_plan: next_physical_plan, + stage_id: Some(stage_id), + } + } else { + QueryStageOutput::Final { + physical_plan: next_physical_plan, + } + }; - self.status = AdaptivePlannerStatus::Done; - Ok(QueryStageOutput::Final { physical_plan }) + self.stage_cache.insert_stage(&next_stage)?; + match &next_stage { + QueryStageOutput::Final { physical_plan } => { + log::info!("Emitting final plan:\n {}", physical_plan.repr_ascii(true)); + self.status = AdaptivePlannerStatus::Done; + } + QueryStageOutput::Partial { physical_plan, .. } => { + log::info!( + "Emitting partial plan:\n {}", + physical_plan.repr_ascii(true) + ); + self.status = AdaptivePlannerStatus::WaitingForStats; + } } + if num_in_memory_children(next_stage.physical_plan()) > 1 { + assert!( + has_cache_entries(next_stage.physical_plan()), + "Next stage must have cache entries in in-memory scans" + ); + } + Ok(next_stage) } pub fn is_done(&self) -> bool { @@ -348,40 +629,80 @@ impl AdaptivePlanner { pub fn update(&mut self, mat_results: MaterializedResults) -> DaftResult<()> { assert_eq!(self.status, AdaptivePlannerStatus::WaitingForStats); + assert!(mat_results.stage_id == self.last_stage_id); - let mut rewriter = ReplacePlaceholdersWithMaterializedResult { - mat_results: Some(mat_results), - }; - let result = self.logical_plan.clone().rewrite(&mut rewriter)?; - - assert!(result.transformed); + // If we have a remaining physical plan, we need to replace the physical previous stage scan with the materialized results + if let Some(remaining_physical_plan) = self.remaining_physical_plan.take() { + let mut rewriter = ReplacePreviousStageScanWithInMemoryScan { + mat_results: Some(mat_results), + }; + let result = remaining_physical_plan.rewrite(&mut rewriter)?; - self.logical_plan = result.data; + assert!(result.transformed); - let optimizer = OptimizerBuilder::default().simplify_expressions().build(); - - self.logical_plan = optimizer.optimize( - self.logical_plan.clone(), - |new_plan, rule_batch, pass, transformed, seen| { - if transformed { - log::debug!( + self.remaining_physical_plan = Some(result.data); + } + // Otherwise, we need to replace the logical placeholder with the materialized results + else { + let mut rewriter = ReplaceLogicalPlaceholderWithMaterializedResults { + mat_results: Some(mat_results), + }; + let logical_plan = self.remaining_logical_plan.take().unwrap(); + let result = logical_plan.rewrite(&mut rewriter)?; + + assert!(result.transformed); + + let optimizer = OptimizerBuilder::new().enrich_with_stats().build(); + + let optimized_logical_plan = optimizer.optimize( + result.data, + |new_plan, rule_batch, pass, transformed, seen| { + if transformed { + log::debug!( "Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}", rule_batch, pass, if seen { "an already seen" } else { "a new" }, new_plan.repr_ascii(true), ); - } else { - log::debug!( - "Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}", - rule_batch, - pass, - new_plan.repr_ascii(true), - ); - } - }, - )?; + } else { + log::debug!( + "Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}", + rule_batch, + pass, + new_plan.repr_ascii(true), + ); + } + }, + )?; + + self.remaining_logical_plan = Some(optimized_logical_plan); + } + self.status = AdaptivePlannerStatus::Ready; + Ok(()) + } + + pub fn explain_analyze(&self, explain_analyze_dir: &str) -> DaftResult<()> { + let curr_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis(); + let file_name = format!("explain-analyze-{curr_ms}-mermaid.md"); + let mut file = File::create(std::path::Path::new(explain_analyze_dir).join(file_name))?; + let mut s = String::new(); + let options = MermaidDisplayOptions { + simple: false, + bottom_up: false, + subgraph_options: None, + }; + let mut visitor = StageDisplayMermaidVisitor::new(&mut s, options); + let _ = visitor.fmt( + self.stage_cache + .final_stage() + .expect("Explain analyze should have a final stage"), + ); + writeln!(file, "```mermaid\n{}\n```", s)?; Ok(()) } diff --git a/src/daft-physical-plan/src/physical_planner/translate.rs b/src/daft-physical-plan/src/physical_planner/translate.rs index f0ebdd4e78..ee372fb56c 100644 --- a/src/daft-physical-plan/src/physical_planner/translate.rs +++ b/src/daft-physical-plan/src/physical_planner/translate.rs @@ -41,7 +41,8 @@ pub(super) fn translate_single_logical_node( logical_plan: &LogicalPlan, physical_children: &mut Vec, cfg: &DaftExecutionConfig, -) -> DaftResult { +) -> DaftResult<(PhysicalPlanRef, bool)> { + let mut is_logical_boundary = false; let physical_plan = match logical_plan { LogicalPlan::Source(Source { source_info, .. }) => match source_info.as_ref() { SourceInfo::Physical(PhysicalScanInfo { @@ -101,8 +102,8 @@ pub(super) fn translate_single_logical_node( .arced(); Ok(scan) } - SourceInfo::PlaceHolder(PlaceHolderInfo { source_id, .. }) => { - panic!("Placeholder {source_id} should not get to translation. This should have been optimized away"); + SourceInfo::PlaceHolder(PlaceHolderInfo { .. }) => { + panic!("Placeholder should not get to translation. This should have been optimized away"); } }, LogicalPlan::Project(LogicalProject { projection, .. }) => { @@ -197,40 +198,41 @@ pub(super) fn translate_single_logical_node( // Repartitioning to the same partition spec as the input is always a no-op. || (&clustering_spec == input_clustering_spec.as_ref()) { - return Ok(input_physical); - } - let repartitioned_plan = match clustering_spec { - ClusteringSpec::Unknown(_) => { - match num_partitions.cmp(&input_num_partitions) { - Ordering::Equal => { - // # of output partitions == # of input partitions; this should have already short-circuited with - // a repartition drop above. - unreachable!("Simple repartitioning with same # of output partitions as the input; this should have been dropped.") + Ok(input_physical) + } else { + let repartitioned_plan = match clustering_spec { + ClusteringSpec::Unknown(_) => { + match num_partitions.cmp(&input_num_partitions) { + Ordering::Equal => { + // # of output partitions == # of input partitions; this should have already short-circuited with + // a repartition drop above. + unreachable!("Simple repartitioning with same # of output partitions as the input; this should have been dropped.") + } + _ => PhysicalPlan::ShuffleExchange( + ShuffleExchangeFactory::new(input_physical) + .get_split_or_coalesce(num_partitions), + ), } - _ => PhysicalPlan::ShuffleExchange( - ShuffleExchangeFactory::new(input_physical) - .get_split_or_coalesce(num_partitions), - ), } - } - ClusteringSpec::Random(_) => PhysicalPlan::ShuffleExchange( - ShuffleExchangeFactory::new(input_physical) - .get_random_partitioning(num_partitions, Some(cfg)), - ), - ClusteringSpec::Hash(HashClusteringConfig { by, .. }) => { - PhysicalPlan::ShuffleExchange( - ShuffleExchangeFactory::new(input_physical).get_hash_partitioning( - by, - num_partitions, - Some(cfg), - ), - ) - } - ClusteringSpec::Range(_) => { - unreachable!("Repartitioning by range is not supported") - } - }; - Ok(repartitioned_plan.arced()) + ClusteringSpec::Random(_) => PhysicalPlan::ShuffleExchange( + ShuffleExchangeFactory::new(input_physical) + .get_random_partitioning(num_partitions, Some(cfg)), + ), + ClusteringSpec::Hash(HashClusteringConfig { by, .. }) => { + PhysicalPlan::ShuffleExchange( + ShuffleExchangeFactory::new(input_physical).get_hash_partitioning( + by, + num_partitions, + Some(cfg), + ), + ) + } + ClusteringSpec::Range(_) => { + unreachable!("Repartitioning by range is not supported") + } + }; + Ok(repartitioned_plan.arced()) + } } LogicalPlan::Distinct(LogicalDistinct { input, .. }) => { let input_physical = physical_children.pop().expect("requires 1 input"); @@ -606,6 +608,7 @@ pub(super) fn translate_single_logical_node( // Note that these merge-join inputs will most likely not have aligned boundaries, which could // result in less efficient merge-joins (~all-to-all broadcast). if !is_left_sort_partitioned { + is_logical_boundary = true; left_physical = PhysicalPlan::Sort(Sort::new( left_physical, left_on.clone(), @@ -616,6 +619,7 @@ pub(super) fn translate_single_logical_node( .arced(); } if !is_right_sort_partitioned { + is_logical_boundary = true; right_physical = PhysicalPlan::Sort(Sort::new( right_physical, right_on.clone(), @@ -668,6 +672,7 @@ pub(super) fn translate_single_logical_node( if num_left_partitions != num_partitions || (num_partitions > 1 && !is_left_hash_partitioned) { + is_logical_boundary = true; left_physical = PhysicalPlan::ShuffleExchange( ShuffleExchangeFactory::new(left_physical).get_hash_partitioning( left_on.clone(), @@ -680,6 +685,7 @@ pub(super) fn translate_single_logical_node( if num_right_partitions != num_partitions || (num_partitions > 1 && !is_right_hash_partitioned) { + is_logical_boundary = true; right_physical = PhysicalPlan::ShuffleExchange( ShuffleExchangeFactory::new(right_physical).get_hash_partitioning( right_on.clone(), @@ -815,7 +821,7 @@ pub(super) fn translate_single_logical_node( // different size estimations depending on when the approximation is computed. Once we fix // this, we can add back in the assertion here. // debug_assert!(logical_plan.get_stats().approx_stats == physical_plan.approximate_stats()); - Ok(physical_plan) + Ok((physical_plan, is_logical_boundary)) } pub fn extract_agg_expr(expr: &ExprRef) -> DaftResult { diff --git a/src/daft-physical-plan/src/plan.rs b/src/daft-physical-plan/src/plan.rs index 161700ce96..f93c8f191e 100644 --- a/src/daft-physical-plan/src/plan.rs +++ b/src/daft-physical-plan/src/plan.rs @@ -19,6 +19,7 @@ pub enum PhysicalPlan { InMemoryScan(InMemoryScan), TabularScan(TabularScan), EmptyScan(EmptyScan), + PreviousStageScan(PreviousStageScan), Project(Project), ActorPoolProject(ActorPoolProject), Filter(Filter), @@ -64,6 +65,9 @@ impl PhysicalPlan { Self::EmptyScan(EmptyScan { clustering_spec, .. }) => clustering_spec.clone(), + Self::PreviousStageScan(PreviousStageScan { + clustering_spec, .. + }) => clustering_spec.clone(), Self::Project(Project { clustering_spec, .. }) => clustering_spec.clone(), @@ -214,6 +218,11 @@ impl PhysicalPlan { size_bytes: 0, acc_selectivity: 0.0, }, + Self::PreviousStageScan(..) => ApproxStats { + num_rows: 0, + size_bytes: 0, + acc_selectivity: 0.0, + }, // Assume no row/column pruning in cardinality-affecting operations. // TODO(Clark): Estimate row/column pruning to get a better size approximation. Self::Filter(Filter { @@ -365,7 +374,7 @@ impl PhysicalPlan { pub fn children(&self) -> Vec<&Self> { match self { Self::InMemoryScan(..) => vec![], - Self::TabularScan(..) | Self::EmptyScan(..) => vec![], + Self::TabularScan(..) | Self::EmptyScan(..) | Self::PreviousStageScan(..) => vec![], Self::Project(Project { input, .. }) => vec![input], Self::ActorPoolProject(ActorPoolProject { input, .. }) => vec![input], Self::Filter(Filter { input, .. }) => vec![input], @@ -408,7 +417,8 @@ impl PhysicalPlan { [input] => match self { Self::InMemoryScan(..) => panic!("Source nodes don't have children, with_new_children() should never be called for source ops"), Self::TabularScan(..) - | Self::EmptyScan(..) => panic!("Source nodes don't have children, with_new_children() should never be called for source ops"), + | Self::EmptyScan(..) + | Self::PreviousStageScan(..) => panic!("Source nodes don't have children, with_new_children() should never be called for source ops"), Self::Project(Project { projection, clustering_spec, .. }) => Self::Project(Project::new_with_clustering_spec( input.clone(), projection.clone(), clustering_spec.clone(), @@ -464,6 +474,7 @@ impl PhysicalPlan { Self::InMemoryScan(..) => "InMemoryScan", Self::TabularScan(..) => "TabularScan", Self::EmptyScan(..) => "EmptyScan", + Self::PreviousStageScan(..) => "PreviousStageScan", Self::Project(..) => "Project", Self::ActorPoolProject(..) => "ActorPoolProject", Self::Filter(..) => "Filter", @@ -499,6 +510,7 @@ impl PhysicalPlan { Self::InMemoryScan(in_memory_scan) => in_memory_scan.multiline_display(), Self::TabularScan(tabular_scan) => tabular_scan.multiline_display(), Self::EmptyScan(empty_scan) => empty_scan.multiline_display(), + Self::PreviousStageScan(previous_stage_scan) => previous_stage_scan.multiline_display(), Self::Project(project) => project.multiline_display(), Self::ActorPoolProject(ap_project) => ap_project.multiline_display(), Self::Filter(filter) => filter.multiline_display(), diff --git a/src/daft-scheduler/src/adaptive.rs b/src/daft-scheduler/src/adaptive.rs index 4b58a1b9b1..61e283fcc7 100644 --- a/src/daft-scheduler/src/adaptive.rs +++ b/src/daft-scheduler/src/adaptive.rs @@ -44,7 +44,7 @@ impl AdaptivePhysicalPlanScheduler { pub fn next(&mut self, py: Python) -> PyResult<(Option, PhysicalPlanScheduler)> { py.allow_threads(|| { let output = self.planner.next_stage()?; - let sid = output.source_id(); + let sid = output.stage_id(); Ok((sid, output.into())) }) } @@ -55,7 +55,7 @@ impl AdaptivePhysicalPlanScheduler { #[allow(clippy::too_many_arguments)] pub fn update( &mut self, - source_id: usize, + stage_id: usize, partition_key: &str, cache_entry: PyObject, num_partitions: usize, @@ -67,18 +67,24 @@ impl AdaptivePhysicalPlanScheduler { let in_memory_info = InMemoryInfo::new( Schema::empty().into(), // TODO thread in schema from in memory scan partition_key.into(), - PartitionCacheEntry::Python(Arc::new(cache_entry)), + Some(PartitionCacheEntry::Python(Arc::new(cache_entry))), num_partitions, size_bytes, num_rows, None, // TODO(sammy) thread through clustering spec to Python + Some(stage_id), ); self.planner.update(MaterializedResults { - source_id, + stage_id, in_memory_info, })?; Ok(()) }) } + + pub fn explain_analyze(&self, explain_analyze_dir: &str) -> PyResult<()> { + self.planner.explain_analyze(explain_analyze_dir)?; + Ok(()) + } } diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index 6e57f9c5b4..5149b882af 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -269,6 +269,9 @@ fn physical_plan_to_partition_tasks( use daft_dsl::Expr; use daft_physical_plan::ops::{CrossJoin, ShuffleExchange, ShuffleExchangeStrategy}; match physical_plan { + PhysicalPlan::PreviousStageScan(..) => { + panic!("PreviousStageScan should be optimized away before reaching the scheduler") + } PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info: InMemoryInfo { cache_key, .. }, .. diff --git a/src/daft-session/src/session.rs b/src/daft-session/src/session.rs index 62e43d7087..ed89b5d5dc 100644 --- a/src/daft-session/src/session.rs +++ b/src/daft-session/src/session.rs @@ -271,7 +271,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() diff --git a/src/daft-sql/src/lib.rs b/src/daft-sql/src/lib.rs index 9dc28865ed..e118e1494c 100644 --- a/src/daft-sql/src/lib.rs +++ b/src/daft-sql/src/lib.rs @@ -65,7 +65,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() @@ -86,7 +85,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced() @@ -107,7 +105,6 @@ mod tests { Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo { source_schema: schema, clustering_spec: Arc::new(ClusteringSpec::unknown()), - source_id: 0, })), )) .arced()