diff --git a/Cargo.lock b/Cargo.lock index 1763b01c1b..81ea7d6c32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2322,6 +2322,7 @@ dependencies = [ "futures", "indexmap 2.7.0", "indicatif", + "itertools 0.11.0", "lazy_static", "log", "loole", @@ -2371,6 +2372,7 @@ dependencies = [ "indexmap 2.7.0", "itertools 0.11.0", "log", + "num-format", "pretty_assertions", "pyo3", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 5f8ad3ba37..5b6fee94ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -218,6 +218,7 @@ jaq-parse = "1.0.0" jaq-std = "1.2.0" mur3 = "0.1.0" num-derive = "0.3.3" +num-format = "0.4.4" num-traits = "0.2" once_cell = "1.19.0" path_macro = "1.0.0" diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index ab2024d845..173a89c10d 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1698,8 +1698,18 @@ class LogicalPlanBuilder: class NativeExecutor: def __init__(self) -> None: ... def run( - self, psets: dict[str, list[PartitionT]], cfg: PyDaftExecutionConfig, results_buffer_size: int | None + self, + builder: LogicalPlanBuilder, + psets: dict[str, list[PartitionT]], + daft_execution_config: PyDaftExecutionConfig, + results_buffer_size: int | None, ) -> Iterator[PyMicroPartition]: ... + def repr_ascii( + self, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, simple: bool + ) -> str: ... + def repr_mermaid( + self, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, options: MermaidOptions + ) -> str: ... class PyDaftExecutionConfig: @staticmethod diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 56bb527a09..c2e6022812 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -36,6 +36,7 @@ from daft.dataframe.preview import DataFramePreview from daft.datatype import DataType from daft.errors import ExpressionTypeError +from daft.execution.native_executor import NativeExecutor from daft.expressions import Expression, ExpressionsProjection, col, lit from daft.filesystem import overwrite_files from daft.logical.builder import LogicalPlanBuilder @@ -208,10 +209,15 @@ def explain( print_to_file("\n== Optimized Logical Plan ==\n") builder = builder.optimize() print_to_file(builder.pretty_print(simple)) + print_to_file("\n== Physical Plan ==\n") if get_context().get_or_create_runner().name != "native": - print_to_file("\n== Physical Plan ==\n") physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config) print_to_file(physical_plan_scheduler.pretty_print(simple, format=format)) + else: + native_executor = NativeExecutor() + print_to_file( + native_executor.pretty_print(builder, get_context().daft_execution_config, simple, format=format) + ) else: print_to_file( "\n \nSet `show_all=True` to also see the Optimized and Physical plans. This will run the query optimizer.", diff --git a/daft/dataframe/display.py b/daft/dataframe/display.py index f3e53c7ecd..efca5b91b9 100644 --- a/daft/dataframe/display.py +++ b/daft/dataframe/display.py @@ -60,10 +60,20 @@ def _repr_markdown_(self): display_opts.with_subgraph_options(name="Optimized LogicalPlan", subgraph_id="optimized") ) output += "\n" - physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config) - output += physical_plan_scheduler._scheduler.repr_mermaid( - display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical") - ) + if get_context().get_or_create_runner().name != "native": + physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config) + output += physical_plan_scheduler._scheduler.repr_mermaid( + display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical") + ) + else: + from daft.execution.native_executor import NativeExecutor + + native_executor = NativeExecutor() + output += native_executor._executor.repr_mermaid( + builder._builder, + get_context().daft_execution_config, + display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical"), + ) output += "\n" output += "unoptimized --> optimized\n" output += "optimized --> physical\n" diff --git a/daft/execution/native_executor.py b/daft/execution/native_executor.py index 333db5fc4c..6ff88626fe 100644 --- a/daft/execution/native_executor.py +++ b/daft/execution/native_executor.py @@ -5,6 +5,7 @@ from daft.daft import ( NativeExecutor as _NativeExecutor, ) +from daft.dataframe.display import MermaidOptions from daft.table import MicroPartition if TYPE_CHECKING: @@ -37,3 +38,18 @@ def run( LocalMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(builder._builder, psets_mp, daft_execution_config, results_buffer_size) ) + + def pretty_print( + self, + builder: LogicalPlanBuilder, + daft_execution_config: PyDaftExecutionConfig, + simple: bool = False, + format: str = "ascii", + ) -> str: + """Pretty prints the current underlying logical plan.""" + if format == "ascii": + return self._executor.repr_ascii(builder._builder, daft_execution_config, simple) + elif format == "mermaid": + return self._executor.repr_mermaid(builder._builder, daft_execution_config, MermaidOptions(simple)) + else: + raise ValueError(f"Unknown format: {format}") diff --git a/src/common/display/src/ascii.rs b/src/common/display/src/ascii.rs index 4365f1f25b..383b3d0ed2 100644 --- a/src/common/display/src/ascii.rs +++ b/src/common/display/src/ascii.rs @@ -39,7 +39,7 @@ fn fmt_tree_indent_style<'a, W: fmt::Write + 'a>( // Print the tree recursively, and illustrate the tree structure in the same style as `git log --graph`. // `depth` is the number of forks in this node's ancestors. -fn fmt_tree_gitstyle<'a, W: fmt::Write + 'a>( +pub fn fmt_tree_gitstyle<'a, W: fmt::Write + 'a>( node: &dyn TreeDisplay, depth: usize, s: &'a mut W, diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 63d4e17c20..0b232e2b6e 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -25,10 +25,11 @@ daft-writers = {path = "../daft-writers", default-features = false} futures = {workspace = true} indexmap = {workspace = true} indicatif = "0.17.9" +itertools = {workspace = true} lazy_static = {workspace = true} log = {workspace = true} loole = "0.4.0" -num-format = "0.4.4" +num-format = {workspace = true} pin-project = "1" pyo3 = {workspace = true, optional = true} snafu = {workspace = true} diff --git a/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs b/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs index c19be4374f..5709a82126 100644 --- a/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs +++ b/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs @@ -1,16 +1,17 @@ -use std::sync::Arc; +use std::{sync::Arc, vec}; use common_error::DaftResult; #[cfg(feature = "python")] use daft_dsl::python::PyExpr; use daft_dsl::{ count_actor_pool_udfs, - functions::python::{get_batch_size, get_concurrency, get_resource_request}, + functions::python::{get_batch_size, get_concurrency, get_resource_request, get_udf_names}, ExprRef, }; #[cfg(feature = "python")] use daft_micropartition::python::PyMicroPartition; use daft_micropartition::MicroPartition; +use itertools::Itertools; #[cfg(feature = "python")] use pyo3::prelude::*; use tracing::{instrument, Span}; @@ -184,6 +185,30 @@ impl IntermediateOperator for ActorPoolProjectOperator { "ActorPoolProject" } + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push("ActorPoolProject:".to_string()); + res.push(format!( + "Projection = [{}]", + self.projection.iter().map(|e| e.to_string()).join(", ") + )); + res.push(format!( + "UDFs = [{}]", + self.projection.iter().flat_map(get_udf_names).join(", ") + )); + res.push(format!("Concurrency = {}", self.concurrency)); + if let Some(resource_request) = get_resource_request(&self.projection) { + let multiline_display = resource_request.multiline_display(); + res.push(format!( + "Resource request = {{ {} }}", + multiline_display.join(", ") + )); + } else { + res.push("Resource request = None".to_string()); + } + res + } + fn make_state(&self) -> DaftResult> { // TODO: Pass relevant CUDA_VISIBLE_DEVICES to the actor Ok(Box::new(ActorPoolProjectState { diff --git a/src/daft-local-execution/src/intermediate_ops/cross_join.rs b/src/daft-local-execution/src/intermediate_ops/cross_join.rs index 3affd31f79..c160cb7bd6 100644 --- a/src/daft-local-execution/src/intermediate_ops/cross_join.rs +++ b/src/daft-local-execution/src/intermediate_ops/cross_join.rs @@ -139,7 +139,14 @@ impl IntermediateOperator for CrossJoinOperator { } fn name(&self) -> &'static str { - "CrossJoinOperator" + "CrossJoin" + } + + fn multiline_display(&self) -> Vec { + vec![ + "CrossJoin:".to_string(), + format!("Stream Side = {:?}", self.stream_side), + ] } fn make_state(&self) -> DaftResult> { diff --git a/src/daft-local-execution/src/intermediate_ops/explode.rs b/src/daft-local-execution/src/intermediate_ops/explode.rs index 244b80b295..d0f9beacde 100644 --- a/src/daft-local-execution/src/intermediate_ops/explode.rs +++ b/src/daft-local-execution/src/intermediate_ops/explode.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use daft_dsl::ExprRef; use daft_functions::list::explode; use daft_micropartition::MicroPartition; +use itertools::Itertools; use tracing::{instrument, Span}; use super::intermediate_op::{ @@ -46,7 +47,14 @@ impl IntermediateOperator for ExplodeOperator { .into() } + fn multiline_display(&self) -> Vec { + vec![format!( + "Explode: {}", + self.to_explode.iter().map(|e| e.to_string()).join(", ") + )] + } + fn name(&self) -> &'static str { - "ExplodeOperator" + "Explode" } } diff --git a/src/daft-local-execution/src/intermediate_ops/filter.rs b/src/daft-local-execution/src/intermediate_ops/filter.rs index 5a633ab702..f804cffcbc 100644 --- a/src/daft-local-execution/src/intermediate_ops/filter.rs +++ b/src/daft-local-execution/src/intermediate_ops/filter.rs @@ -43,7 +43,11 @@ impl IntermediateOperator for FilterOperator { .into() } + fn multiline_display(&self) -> Vec { + vec![format!("Filter: {}", self.predicate)] + } + fn name(&self) -> &'static str { - "FilterOperator" + "Filter" } } diff --git a/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs b/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs index 27ada2d87b..af8f387a4c 100644 --- a/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs +++ b/src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs @@ -6,6 +6,7 @@ use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; use daft_table::{GrowableTable, ProbeState}; use indexmap::IndexSet; +use itertools::Itertools; use tracing::{info_span, instrument, Span}; use super::intermediate_op::{ @@ -210,7 +211,22 @@ impl IntermediateOperator for InnerHashJoinProbeOperator { } fn name(&self) -> &'static str { - "InnerHashJoinProbeOperator" + "InnerHashJoinProbe" + } + + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push("InnerHashJoinProbe:".to_string()); + res.push(format!( + "Probe on: [{}]", + self.params + .probe_on + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + res.push(format!("Build on left: {}", self.params.build_on_left)); + res } fn make_state(&self) -> DaftResult> { diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index cf60e90011..accc41eccb 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_display::tree::TreeDisplay; use common_error::DaftResult; use common_runtime::get_compute_runtime; +use daft_logical_plan::stats::StatsState; use daft_micropartition::MicroPartition; use snafu::ResultExt; use tracing::{info_span, instrument}; @@ -47,6 +48,7 @@ pub trait IntermediateOperator: Send + Sync { task_spawner: &ExecutionTaskSpawner, ) -> IntermediateOpExecuteResult; fn name(&self) -> &'static str; + fn multiline_display(&self) -> Vec; fn make_state(&self) -> DaftResult> { Ok(Box::new(DefaultIntermediateOperatorState {})) } @@ -78,26 +80,30 @@ pub struct IntermediateNode { intermediate_op: Arc, children: Vec>, runtime_stats: Arc, + plan_stats: StatsState, } impl IntermediateNode { pub(crate) fn new( intermediate_op: Arc, children: Vec>, + plan_stats: StatsState, ) -> Self { let rts = RuntimeStatsContext::new(); - Self::new_with_runtime_stats(intermediate_op, children, rts) + Self::new_with_runtime_stats(intermediate_op, children, rts, plan_stats) } pub(crate) fn new_with_runtime_stats( intermediate_op: Arc, children: Vec>, runtime_stats: Arc, + plan_stats: StatsState, ) -> Self { Self { intermediate_op, children, runtime_stats, + plan_stats, } } @@ -172,12 +178,24 @@ impl TreeDisplay for IntermediateNode { fn display_as(&self, level: common_display::DisplayLevel) -> String { use std::fmt::Write; let mut display = String::new(); - writeln!(display, "{}", self.intermediate_op.name()).unwrap(); - use common_display::DisplayLevel::Compact; - if matches!(level, Compact) { - } else { - let rt_result = self.runtime_stats.result(); - rt_result.display(&mut display, true, true, true).unwrap(); + + use common_display::DisplayLevel; + match level { + DisplayLevel::Compact => { + writeln!(display, "{}", self.intermediate_op.name()).unwrap(); + } + level => { + let multiline_display = self.intermediate_op.multiline_display().join("\n"); + writeln!(display, "{}", multiline_display).unwrap(); + if let StatsState::Materialized(stats) = &self.plan_stats { + writeln!(display, "Stats = {}", stats).unwrap(); + } + if matches!(level, DisplayLevel::Verbose) { + writeln!(display).unwrap(); + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } } display } diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index 7b3dbc9772..c41977e829 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_error::{DaftError, DaftResult}; use daft_dsl::{functions::python::get_resource_request, ExprRef}; use daft_micropartition::MicroPartition; +use itertools::Itertools; use tracing::{instrument, Span}; use super::intermediate_op::{ @@ -55,7 +56,25 @@ impl IntermediateOperator for ProjectOperator { } fn name(&self) -> &'static str { - "ProjectOperator" + "Project" + } + + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push(format!( + "Project: {}", + self.projection.iter().map(|e| e.to_string()).join(", ") + )); + if let Some(resource_request) = get_resource_request(&self.projection) { + let multiline_display = resource_request.multiline_display(); + res.push(format!( + "Resource request = {{ {} }}", + multiline_display.join(", ") + )); + } else { + res.push("Resource request = None".to_string()); + } + res } fn max_concurrency(&self) -> DaftResult { diff --git a/src/daft-local-execution/src/intermediate_ops/sample.rs b/src/daft-local-execution/src/intermediate_ops/sample.rs index ec089ba618..7eb67489aa 100644 --- a/src/daft-local-execution/src/intermediate_ops/sample.rs +++ b/src/daft-local-execution/src/intermediate_ops/sample.rs @@ -58,7 +58,18 @@ impl IntermediateOperator for SampleOperator { .into() } + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push(format!("Sample: {}", self.params.fraction)); + res.push(format!( + "With replacement = {}", + self.params.with_replacement + )); + res.push(format!("Seed = {:?}", self.params.seed)); + res + } + fn name(&self) -> &'static str { - "SampleOperator" + "Sample" } } diff --git a/src/daft-local-execution/src/intermediate_ops/unpivot.rs b/src/daft-local-execution/src/intermediate_ops/unpivot.rs index f0c74a4b35..ec83752e76 100644 --- a/src/daft-local-execution/src/intermediate_ops/unpivot.rs +++ b/src/daft-local-execution/src/intermediate_ops/unpivot.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; +use itertools::Itertools; use tracing::{instrument, Span}; use super::intermediate_op::{ @@ -66,7 +67,22 @@ impl IntermediateOperator for UnpivotOperator { .into() } + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push(format!( + "Unpivot: {}", + self.params.values.iter().map(|e| e.to_string()).join(", ") + )); + res.push(format!( + "Ids = {}", + self.params.ids.iter().map(|e| e.to_string()).join(", ") + )); + res.push(format!("Variable name = {}", self.params.variable_name)); + res.push(format!("Value name = {}", self.params.value_name)); + res + } + fn name(&self) -> &'static str { - "UnpivotOperator" + "Unpivot" } } diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index fd3b896ecb..0033758491 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -1,7 +1,12 @@ use std::sync::Arc; use common_daft_config::DaftExecutionConfig; -use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay}; +use common_display::{ + ascii::fmt_tree_gitstyle, + mermaid::{MermaidDisplayVisitor, SubgraphOptions}, + tree::TreeDisplay, + DisplayLevel, +}; use common_error::DaftResult; use common_file_formats::FileFormat; use daft_core::{ @@ -50,7 +55,7 @@ use crate::{ streaming_sink::StreamingSinkNode, write::{WriteFormat, WriteSink}, }, - sources::{empty_scan::EmptyScanSource, in_memory::InMemorySource}, + sources::{empty_scan::EmptyScanSource, in_memory::InMemorySource, source::SourceNode}, state_bridge::BroadcastStateBridge, ExecutionRuntimeContext, PipelineCreationSnafu, }; @@ -67,18 +72,30 @@ pub(crate) trait PipelineNode: Sync + Send + TreeDisplay { fn as_tree_display(&self) -> &dyn TreeDisplay; } -pub fn viz_pipeline(root: &dyn PipelineNode) -> String { +pub fn viz_pipeline_mermaid( + root: &dyn PipelineNode, + display_type: DisplayLevel, + bottom_up: bool, + subgraph_options: Option, +) -> String { let mut output = String::new(); - let mut visitor = MermaidDisplayVisitor::new( - &mut output, - common_display::DisplayLevel::Default, - true, - Default::default(), - ); + let mut visitor = + MermaidDisplayVisitor::new(&mut output, display_type, bottom_up, subgraph_options); visitor.fmt(root.as_tree_display()).unwrap(); output } +pub fn viz_pipeline_ascii(root: &dyn PipelineNode, simple: bool) -> String { + let mut s = String::new(); + let level = if simple { + DisplayLevel::Compact + } else { + DisplayLevel::Default + }; + fmt_tree_gitstyle(root.as_tree_display(), 0, &mut s, level).unwrap(); + s +} + pub fn physical_plan_to_pipeline( physical_plan: &LocalPhysicalPlan, psets: &(impl PartitionSetCache> + ?Sized), @@ -88,15 +105,18 @@ pub fn physical_plan_to_pipeline( use crate::sources::scan_task::ScanTaskSource; let out: Box = match physical_plan { - LocalPhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => { + LocalPhysicalPlan::EmptyScan(EmptyScan { + schema, + stats_state, + }) => { let source = EmptyScanSource::new(schema.clone()); - source.arced().into() + SourceNode::new(source.arced(), stats_state.clone()).boxed() } LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, pushdowns, schema, - .. + stats_state, }) => { let scan_tasks = scan_tasks .iter() @@ -105,75 +125,106 @@ pub fn physical_plan_to_pipeline( let scan_task_source = ScanTaskSource::new(scan_tasks, pushdowns.clone(), schema.clone(), cfg); - scan_task_source.arced().into() + SourceNode::new(scan_task_source.arced(), stats_state.clone()).boxed() } - LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { + LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, stats_state }) => { let cache_key: Arc = info.cache_key.clone().into(); - let materialized_pset = psets - .get_partition_set(&cache_key) - .unwrap_or_else(|| panic!("Cache key not found: {:?}", info.cache_key)); - - InMemorySource::new(materialized_pset, info.source_schema.clone()) - .arced() - .into() + let materialized_pset = psets.get_partition_set(&cache_key); + let in_memory_source = InMemorySource::new( + materialized_pset, + info.source_schema.clone(), + info.size_bytes, + ) + .arced(); + SourceNode::new(in_memory_source, stats_state.clone()).boxed() } LocalPhysicalPlan::Project(Project { - input, projection, .. + input, + projection, + stats_state, + .. }) => { let proj_op = ProjectOperator::new(projection.clone()); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(proj_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(proj_op), vec![child_node], stats_state.clone()).boxed() } LocalPhysicalPlan::ActorPoolProject(ActorPoolProject { - input, projection, .. + input, + projection, + stats_state, + .. }) => { let proj_op = ActorPoolProjectOperator::new(projection.clone()); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(proj_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(proj_op), vec![child_node], stats_state.clone()).boxed() } LocalPhysicalPlan::Sample(Sample { input, fraction, with_replacement, seed, + stats_state, .. }) => { let sample_op = SampleOperator::new(*fraction, *with_replacement, *seed); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(sample_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(sample_op), vec![child_node], stats_state.clone()) + .boxed() } LocalPhysicalPlan::Filter(Filter { - input, predicate, .. + input, + predicate, + stats_state, + .. }) => { let filter_op = FilterOperator::new(predicate.clone()); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(filter_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(filter_op), vec![child_node], stats_state.clone()) + .boxed() } LocalPhysicalPlan::Explode(Explode { - input, to_explode, .. + input, + to_explode, + stats_state, + .. }) => { let explode_op = ExplodeOperator::new(to_explode.clone()); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(explode_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(explode_op), vec![child_node], stats_state.clone()) + .boxed() } LocalPhysicalPlan::Limit(Limit { - input, num_rows, .. + input, + num_rows, + stats_state, + .. }) => { let sink = LimitSink::new(*num_rows as usize); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - StreamingSinkNode::new(Arc::new(sink), vec![child_node]).boxed() + StreamingSinkNode::new(Arc::new(sink), vec![child_node], stats_state.clone()).boxed() } - LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { + LocalPhysicalPlan::Concat(Concat { + input, + other, + stats_state, + .. + }) => { let left_child = physical_plan_to_pipeline(input, psets, cfg)?; let right_child = physical_plan_to_pipeline(other, psets, cfg)?; let sink = ConcatSink {}; - StreamingSinkNode::new(Arc::new(sink), vec![left_child, right_child]).boxed() + StreamingSinkNode::new( + Arc::new(sink), + vec![left_child, right_child], + stats_state.clone(), + ) + .boxed() } LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { input, aggregations, schema, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -182,13 +233,14 @@ pub fn physical_plan_to_pipeline( plan_name: physical_plan.name(), } })?; - BlockingSinkNode::new(Arc::new(agg_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(agg_sink), child_node, stats_state.clone()).boxed() } LocalPhysicalPlan::HashAggregate(HashAggregate { input, aggregations, group_by, schema, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -196,7 +248,7 @@ pub fn physical_plan_to_pipeline( .with_context(|_| PipelineCreationSnafu { plan_name: physical_plan.name(), })?; - BlockingSinkNode::new(Arc::new(agg_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(agg_sink), child_node, stats_state.clone()).boxed() } LocalPhysicalPlan::Unpivot(Unpivot { input, @@ -204,6 +256,7 @@ pub fn physical_plan_to_pipeline( values, variable_name, value_name, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -213,7 +266,8 @@ pub fn physical_plan_to_pipeline( variable_name.clone(), value_name.clone(), ); - IntermediateNode::new(Arc::new(unpivot_op), vec![child_node]).boxed() + IntermediateNode::new(Arc::new(unpivot_op), vec![child_node], stats_state.clone()) + .boxed() } LocalPhysicalPlan::Pivot(Pivot { input, @@ -222,6 +276,7 @@ pub fn physical_plan_to_pipeline( value_column, aggregation, names, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -232,30 +287,36 @@ pub fn physical_plan_to_pipeline( aggregation.clone(), names.clone(), ); - BlockingSinkNode::new(Arc::new(pivot_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(pivot_sink), child_node, stats_state.clone()).boxed() } LocalPhysicalPlan::Sort(Sort { input, sort_by, descending, nulls_first, + stats_state, .. }) => { let sort_sink = SortSink::new(sort_by.clone(), descending.clone(), nulls_first.clone()); let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - BlockingSinkNode::new(Arc::new(sort_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(sort_sink), child_node, stats_state.clone()).boxed() } LocalPhysicalPlan::MonotonicallyIncreasingId(MonotonicallyIncreasingId { input, column_name, schema, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; let monotonically_increasing_id_sink = MonotonicallyIncreasingIdSink::new(column_name.clone(), schema.clone()); - StreamingSinkNode::new(Arc::new(monotonically_increasing_id_sink), vec![child_node]) - .boxed() + StreamingSinkNode::new( + Arc::new(monotonically_increasing_id_sink), + vec![child_node], + stats_state.clone(), + ) + .boxed() } LocalPhysicalPlan::HashJoin(HashJoin { left, @@ -265,6 +326,7 @@ pub fn physical_plan_to_pipeline( null_equals_null, join_type, schema, + stats_state, .. }) => { let left_schema = left.schema(); @@ -411,8 +473,12 @@ pub fn physical_plan_to_pipeline( probe_state_bridge.clone(), )?; let build_child_node = physical_plan_to_pipeline(build_child, psets, cfg)?; - let build_node = - BlockingSinkNode::new(Arc::new(build_sink), build_child_node).boxed(); + let build_node = BlockingSinkNode::new( + Arc::new(build_sink), + build_child_node, + build_child.get_stats_state().clone(), + ) + .boxed(); let probe_child_node = physical_plan_to_pipeline(probe_child, psets, cfg)?; @@ -426,6 +492,7 @@ pub fn physical_plan_to_pipeline( build_on_left, )), vec![build_node, probe_child_node], + stats_state.clone(), ) .boxed()), JoinType::Inner => Ok(IntermediateNode::new( @@ -439,6 +506,7 @@ pub fn physical_plan_to_pipeline( probe_state_bridge, )), vec![build_node, probe_child_node], + stats_state.clone(), ) .boxed()), JoinType::Left | JoinType::Right | JoinType::Outer => { @@ -454,6 +522,7 @@ pub fn physical_plan_to_pipeline( probe_state_bridge, )), vec![build_node, probe_child_node], + stats_state.clone(), ) .boxed()) } @@ -467,6 +536,7 @@ pub fn physical_plan_to_pipeline( left, right, schema, + stats_state, .. }) => { let left_stats_state = left.get_stats_state(); @@ -505,6 +575,7 @@ pub fn physical_plan_to_pipeline( let collect_node = BlockingSinkNode::new( Arc::new(CrossJoinCollectSink::new(state_bridge.clone())), collect_child_node, + collect_child.get_stats_state().clone(), ) .boxed(); @@ -515,6 +586,7 @@ pub fn physical_plan_to_pipeline( state_bridge, )), vec![collect_node, stream_child_node], + stats_state.clone(), ) .boxed() } @@ -522,6 +594,7 @@ pub fn physical_plan_to_pipeline( input, file_info, file_schema, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -539,13 +612,14 @@ pub fn physical_plan_to_pipeline( file_info.partition_cols.clone(), file_schema.clone(), ); - BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(write_sink), child_node, stats_state.clone()).boxed() } #[cfg(feature = "python")] LocalPhysicalPlan::CatalogWrite(daft_local_plan::CatalogWrite { input, catalog_type, file_schema, + stats_state, .. }) => { use daft_logical_plan::CatalogType; @@ -585,13 +659,14 @@ pub fn physical_plan_to_pipeline( partition_by, file_schema.clone(), ); - BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(write_sink), child_node, stats_state.clone()).boxed() } #[cfg(feature = "python")] LocalPhysicalPlan::LanceWrite(daft_local_plan::LanceWrite { input, lance_info, file_schema, + stats_state, .. }) => { let child_node = physical_plan_to_pipeline(input, psets, cfg)?; @@ -602,7 +677,7 @@ pub fn physical_plan_to_pipeline( None, file_schema.clone(), ); - BlockingSinkNode::new(Arc::new(write_sink), child_node).boxed() + BlockingSinkNode::new(Arc::new(write_sink), child_node, stats_state.clone()).boxed() } }; diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 644914d523..cddcf28248 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -7,6 +7,7 @@ use std::{ }; use common_daft_config::DaftExecutionConfig; +use common_display::{mermaid::MermaidDisplayOptions, DisplayLevel}; use common_error::DaftResult; use common_tracing::refresh_chrome_trace; use daft_local_plan::translate; @@ -30,7 +31,7 @@ use { use crate::{ channel::{create_channel, Receiver}, - pipeline::{physical_plan_to_pipeline, viz_pipeline}, + pipeline::{physical_plan_to_pipeline, viz_pipeline_ascii, viz_pipeline_mermaid}, progress_bar::{make_progress_bar_manager, ProgressBarManager}, resource_manager::get_or_init_memory_manager, Error, ExecutionRuntimeContext, @@ -125,6 +126,28 @@ impl PyNativeExecutor { let part_iter = LocalPartitionIterator { iter }; Ok(part_iter.into_pyobject(py)?.into_any()) } + + pub fn repr_ascii( + &self, + logical_plan_builder: &PyLogicalPlanBuilder, + cfg: PyDaftExecutionConfig, + simple: bool, + ) -> PyResult { + Ok(self + .executor + .repr_ascii(&logical_plan_builder.builder, cfg.config, simple)) + } + + pub fn repr_mermaid( + &self, + logical_plan_builder: &PyLogicalPlanBuilder, + cfg: PyDaftExecutionConfig, + options: MermaidDisplayOptions, + ) -> PyResult { + Ok(self + .executor + .repr_mermaid(&logical_plan_builder.builder, cfg.config, options)) + } } #[derive(Debug, Clone)] @@ -229,7 +252,16 @@ impl NativeExecutor { .as_millis(); let file_name = format!("explain-analyze-{curr_ms}-mermaid.md"); let mut file = File::create(file_name)?; - writeln!(file, "```mermaid\n{}\n```", viz_pipeline(pipeline.as_ref()))?; + writeln!( + file, + "```mermaid\n{}\n```", + viz_pipeline_mermaid( + pipeline.as_ref(), + DisplayLevel::Verbose, + true, + Default::default() + ) + )?; } Ok(()) }; @@ -256,6 +288,46 @@ impl NativeExecutor { receiver: rx, }) } + + fn repr_ascii( + &self, + logical_plan_builder: &LogicalPlanBuilder, + cfg: Arc, + simple: bool, + ) -> String { + let logical_plan = logical_plan_builder.build(); + let physical_plan = translate(&logical_plan).unwrap(); + let pipeline_node = + physical_plan_to_pipeline(&physical_plan, &InMemoryPartitionSetCache::empty(), &cfg) + .unwrap(); + + viz_pipeline_ascii(pipeline_node.as_ref(), simple) + } + + fn repr_mermaid( + &self, + logical_plan_builder: &LogicalPlanBuilder, + cfg: Arc, + options: MermaidDisplayOptions, + ) -> String { + let logical_plan = logical_plan_builder.build(); + let physical_plan = translate(&logical_plan).unwrap(); + let pipeline_node = + physical_plan_to_pipeline(&physical_plan, &InMemoryPartitionSetCache::empty(), &cfg) + .unwrap(); + + let display_type = if options.simple { + DisplayLevel::Compact + } else { + DisplayLevel::Default + }; + viz_pipeline_mermaid( + pipeline_node.as_ref(), + display_type, + options.bottom_up, + options.subgraph_options, + ) + } } impl Drop for NativeExecutor { diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs index 4de2c1ef67..835997c5b7 100644 --- a/src/daft-local-execution/src/runtime_stats.rs +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -43,7 +43,7 @@ impl RuntimeStats { if received { writeln!( w, - "rows received = {}", + "Rows received = {}", self.rows_received.to_formatted_string(&Locale::en) )?; } @@ -51,7 +51,7 @@ impl RuntimeStats { if emitted { writeln!( w, - "rows emitted = {}", + "Rows emitted = {}", self.rows_emitted.to_formatted_string(&Locale::en) )?; } diff --git a/src/daft-local-execution/src/sinks/aggregate.rs b/src/daft-local-execution/src/sinks/aggregate.rs index 3263c1a148..db80206a16 100644 --- a/src/daft-local-execution/src/sinks/aggregate.rs +++ b/src/daft-local-execution/src/sinks/aggregate.rs @@ -5,6 +5,7 @@ use daft_core::prelude::SchemaRef; use daft_dsl::{Expr, ExprRef}; use daft_micropartition::MicroPartition; use daft_physical_plan::extract_agg_expr; +use itertools::Itertools; use tracing::{instrument, Span}; use super::blocking_sink::{ @@ -136,7 +137,18 @@ impl BlockingSink for AggregateSink { } fn name(&self) -> &'static str { - "AggregateSink" + "Aggregate" + } + + fn multiline_display(&self) -> Vec { + vec![format!( + "Aggregate: {}", + self.agg_sink_params + .sink_agg_exprs + .iter() + .map(|e| e.to_string()) + .join(", ") + )] } fn max_concurrency(&self) -> usize { diff --git a/src/daft-local-execution/src/sinks/anti_semi_hash_join_probe.rs b/src/daft-local-execution/src/sinks/anti_semi_hash_join_probe.rs index 7dc7f124ba..b1c6ba49c0 100644 --- a/src/daft-local-execution/src/sinks/anti_semi_hash_join_probe.rs +++ b/src/daft-local-execution/src/sinks/anti_semi_hash_join_probe.rs @@ -7,6 +7,7 @@ use daft_logical_plan::JoinType; use daft_micropartition::MicroPartition; use daft_table::{GrowableTable, ProbeState, Probeable, Table}; use futures::{stream, StreamExt}; +use itertools::Itertools; use tracing::{info_span, instrument, Span}; use super::{ @@ -270,7 +271,32 @@ impl StreamingSink for AntiSemiProbeSink { } fn name(&self) -> &'static str { - "AntiSemiProbeSink" + "AntiSemiHashJoinProbe" + } + + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + if self.params.is_semi { + res.push(format!( + "SemiHashJoinProbe: {}", + self.params + .probe_on + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + } else { + res.push(format!( + "AntiHashJoinProbe: {}", + self.params + .probe_on + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + } + res.push(format!("Build on left: {}", self.build_on_left)); + res } #[instrument(skip_all, name = "AntiSemiProbeSink::finalize")] diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index 80d28a87b5..efb09c2be5 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_display::tree::TreeDisplay; use common_error::DaftResult; use common_runtime::get_compute_runtime; +use daft_logical_plan::stats::StatsState; use daft_micropartition::MicroPartition; use snafu::ResultExt; use tracing::{info_span, instrument}; @@ -42,6 +43,7 @@ pub trait BlockingSink: Send + Sync { spawner: &ExecutionTaskSpawner, ) -> BlockingSinkFinalizeResult; fn name(&self) -> &'static str; + fn multiline_display(&self) -> Vec; fn make_state(&self) -> DaftResult>; fn dispatch_spawner( &self, @@ -59,16 +61,22 @@ pub struct BlockingSinkNode { name: &'static str, child: Box, runtime_stats: Arc, + plan_stats: StatsState, } impl BlockingSinkNode { - pub(crate) fn new(op: Arc, child: Box) -> Self { + pub(crate) fn new( + op: Arc, + child: Box, + plan_stats: StatsState, + ) -> Self { let name = op.name(); Self { op, name, child, runtime_stats: RuntimeStatsContext::new(), + plan_stats, } } pub(crate) fn boxed(self) -> Box { @@ -123,12 +131,23 @@ impl TreeDisplay for BlockingSinkNode { fn display_as(&self, level: common_display::DisplayLevel) -> String { use std::fmt::Write; let mut display = String::new(); - writeln!(display, "{}", self.name()).unwrap(); - use common_display::DisplayLevel::Compact; - if matches!(level, Compact) { - } else { - let rt_result = self.runtime_stats.result(); - rt_result.display(&mut display, true, true, true).unwrap(); + + use common_display::DisplayLevel; + match level { + DisplayLevel::Compact => { + writeln!(display, "{}", self.op.name()).unwrap(); + } + level => { + let multiline_display = self.op.multiline_display().join("\n"); + writeln!(display, "{}", multiline_display).unwrap(); + if let StatsState::Materialized(stats) = &self.plan_stats { + writeln!(display, "Stats = {}", stats).unwrap(); + } + if matches!(level, DisplayLevel::Verbose) { + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } } display } diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs index 6a7ef56959..cd24bb257d 100644 --- a/src/daft-local-execution/src/sinks/concat.rs +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -39,6 +39,10 @@ impl StreamingSink for ConcatSink { "Concat" } + fn multiline_display(&self) -> Vec { + vec!["Concat".to_string()] + } + fn finalize( &self, _states: Vec>, diff --git a/src/daft-local-execution/src/sinks/cross_join_collect.rs b/src/daft-local-execution/src/sinks/cross_join_collect.rs index 9423145042..c4d615806f 100644 --- a/src/daft-local-execution/src/sinks/cross_join_collect.rs +++ b/src/daft-local-execution/src/sinks/cross_join_collect.rs @@ -31,7 +31,7 @@ impl CrossJoinCollectSink { impl BlockingSink for CrossJoinCollectSink { fn name(&self) -> &'static str { - "CrossJoinCollectSink" + "CrossJoinCollect" } fn sink( @@ -90,6 +90,10 @@ impl BlockingSink for CrossJoinCollectSink { Ok(Box::new(CrossJoinCollectState(Some(Vec::new())))) } + fn multiline_display(&self) -> Vec { + vec!["CrossJoinCollect".to_string()] + } + fn max_concurrency(&self) -> usize { 1 } diff --git a/src/daft-local-execution/src/sinks/grouped_aggregate.rs b/src/daft-local-execution/src/sinks/grouped_aggregate.rs index f16e359c49..9e21aa647c 100644 --- a/src/daft-local-execution/src/sinks/grouped_aggregate.rs +++ b/src/daft-local-execution/src/sinks/grouped_aggregate.rs @@ -9,6 +9,7 @@ use daft_core::prelude::SchemaRef; use daft_dsl::{col, Expr, ExprRef}; use daft_micropartition::MicroPartition; use daft_physical_plan::extract_agg_expr; +use itertools::Itertools; use tracing::{instrument, Span}; use super::blocking_sink::{ @@ -420,7 +421,28 @@ impl BlockingSink for GroupedAggregateSink { } fn name(&self) -> &'static str { - "GroupedAggregateSink" + "GroupedAggregate" + } + + fn multiline_display(&self) -> Vec { + let mut display = vec![]; + display.push(format!( + "GroupedAggregate: {}", + self.grouped_aggregate_params + .original_aggregations + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + display.push(format!( + "Group by: {}", + self.grouped_aggregate_params + .group_by + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + display } fn max_concurrency(&self) -> usize { diff --git a/src/daft-local-execution/src/sinks/hash_join_build.rs b/src/daft-local-execution/src/sinks/hash_join_build.rs index 769b9e298d..20d4a22182 100644 --- a/src/daft-local-execution/src/sinks/hash_join_build.rs +++ b/src/daft-local-execution/src/sinks/hash_join_build.rs @@ -5,6 +5,7 @@ use daft_core::prelude::SchemaRef; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; use daft_table::{make_probeable_builder, ProbeState, ProbeableBuilder, Table}; +use itertools::Itertools; use tracing::{info_span, instrument}; use super::blocking_sink::{ @@ -117,7 +118,21 @@ impl HashJoinBuildSink { impl BlockingSink for HashJoinBuildSink { fn name(&self) -> &'static str { - "HashJoinBuildSink" + "HashJoinBuild" + } + + fn multiline_display(&self) -> Vec { + let mut display = vec![]; + display.push("HashJoinBuild:".to_string()); + display.push(format!("Track Indices: {}", self.track_indices)); + display.push(format!("Key Schema: {}", self.key_schema.short_string())); + if let Some(null_equals_nulls) = &self.nulls_equal_aware { + display.push(format!( + "Null equals Nulls = [{}]", + null_equals_nulls.iter().map(|b| b.to_string()).join(", ") + )); + }; + display } fn sink( diff --git a/src/daft-local-execution/src/sinks/limit.rs b/src/daft-local-execution/src/sinks/limit.rs index 0e9c07fa40..fd9ef7e36c 100644 --- a/src/daft-local-execution/src/sinks/limit.rs +++ b/src/daft-local-execution/src/sinks/limit.rs @@ -87,6 +87,10 @@ impl StreamingSink for LimitSink { "Limit" } + fn multiline_display(&self) -> Vec { + vec![format!("Limit: {}", self.limit)] + } + fn finalize( &self, _states: Vec>, diff --git a/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs b/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs index 95782b63de..55c29ab395 100644 --- a/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs +++ b/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs @@ -97,6 +97,10 @@ impl StreamingSink for MonotonicallyIncreasingIdSink { "MonotonicallyIncreasingId" } + fn multiline_display(&self) -> Vec { + vec!["MonotonicallyIncreasingId".to_string()] + } + fn finalize( &self, _states: Vec>, diff --git a/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs b/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs index bce4e6bd6c..110b45ff87 100644 --- a/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs +++ b/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs @@ -14,6 +14,7 @@ use daft_micropartition::MicroPartition; use daft_table::{GrowableTable, ProbeState, Table}; use futures::{stream, StreamExt}; use indexmap::IndexSet; +use itertools::Itertools; use tracing::{info_span, instrument, Span}; use super::streaming_sink::{ @@ -623,7 +624,29 @@ impl StreamingSink for OuterHashJoinProbeSink { } fn name(&self) -> &'static str { - "OuterHashJoinProbeSink" + "OuterHashJoinProbe" + } + + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + match self.params.join_type { + JoinType::Left => res.push("LeftHashJoinProbe:".to_string()), + JoinType::Right => res.push("RightHashJoinProbe:".to_string()), + JoinType::Outer => res.push("OuterHashJoinProbe:".to_string()), + _ => unreachable!( + "Only Left, Right, and Outer joins are supported in OuterHashJoinProbeSink" + ), + } + res.push(format!( + "Probe on: [{}]", + self.params + .probe_on + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + res.push(format!("Build on left: {}", self.params.build_on_left)); + res } fn make_state(&self) -> Box { diff --git a/src/daft-local-execution/src/sinks/pivot.rs b/src/daft-local-execution/src/sinks/pivot.rs index cdf298cf25..bac76f8e0f 100644 --- a/src/daft-local-execution/src/sinks/pivot.rs +++ b/src/daft-local-execution/src/sinks/pivot.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_error::DaftResult; use daft_dsl::{AggExpr, Expr, ExprRef}; use daft_micropartition::MicroPartition; +use itertools::Itertools; use tracing::{instrument, Span}; use super::blocking_sink::{ @@ -132,7 +133,27 @@ impl BlockingSink for PivotSink { } fn name(&self) -> &'static str { - "PivotSink" + "Pivot" + } + + fn multiline_display(&self) -> Vec { + let mut display = vec![]; + display.push("Pivot:".to_string()); + display.push(format!( + "Group by = {}", + self.pivot_params + .group_by + .iter() + .map(|e| e.to_string()) + .join(", ") + )); + display.push(format!("Pivot column: {}", self.pivot_params.pivot_column)); + display.push(format!("Value column: {}", self.pivot_params.value_column)); + display.push(format!( + "Pivoted columns: {}", + self.pivot_params.names.iter().join(", ") + )); + display } fn max_concurrency(&self) -> usize { diff --git a/src/daft-local-execution/src/sinks/sort.rs b/src/daft-local-execution/src/sinks/sort.rs index 35a0a9a2be..1c5eecdaa8 100644 --- a/src/daft-local-execution/src/sinks/sort.rs +++ b/src/daft-local-execution/src/sinks/sort.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_error::DaftResult; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; +use itertools::Itertools; use tracing::{instrument, Span}; use super::blocking_sink::{ @@ -110,7 +111,29 @@ impl BlockingSink for SortSink { } fn name(&self) -> &'static str { - "SortResult" + "Sort" + } + + fn multiline_display(&self) -> Vec { + let mut lines = vec![]; + assert!(!self.params.sort_by.is_empty()); + let pairs = self + .params + .sort_by + .iter() + .zip(self.params.descending.iter()) + .zip(self.params.nulls_first.iter()) + .map(|((sb, d), nf)| { + format!( + "({}, {}, {})", + sb, + if *d { "descending" } else { "ascending" }, + if *nf { "nulls first" } else { "nulls last" } + ) + }) + .join(", "); + lines.push(format!("Sort: Sort by = {}", pairs)); + lines } fn make_state(&self) -> DaftResult> { diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index 78ae0bf5ed..d054be142b 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_display::tree::TreeDisplay; use common_error::DaftResult; use common_runtime::get_compute_runtime; +use daft_logical_plan::stats::StatsState; use daft_micropartition::MicroPartition; use snafu::ResultExt; use tracing::{info_span, instrument}; @@ -56,6 +57,8 @@ pub trait StreamingSink: Send + Sync { /// The name of the StreamingSink operator. fn name(&self) -> &'static str; + fn multiline_display(&self) -> Vec; + /// Create a new worker-local state for this StreamingSink. fn make_state(&self) -> Box; @@ -77,16 +80,22 @@ pub struct StreamingSinkNode { name: &'static str, children: Vec>, runtime_stats: Arc, + plan_stats: StatsState, } impl StreamingSinkNode { - pub(crate) fn new(op: Arc, children: Vec>) -> Self { + pub(crate) fn new( + op: Arc, + children: Vec>, + plan_stats: StatsState, + ) -> Self { let name = op.name(); Self { op, name, children, runtime_stats: RuntimeStatsContext::new(), + plan_stats, } } @@ -164,12 +173,23 @@ impl TreeDisplay for StreamingSinkNode { fn display_as(&self, level: common_display::DisplayLevel) -> String { use std::fmt::Write; let mut display = String::new(); - writeln!(display, "{}", self.name()).unwrap(); - use common_display::DisplayLevel::Compact; - if matches!(level, Compact) { - } else { - let rt_result = self.runtime_stats.result(); - rt_result.display(&mut display, true, true, true).unwrap(); + + use common_display::DisplayLevel; + match level { + DisplayLevel::Compact => { + writeln!(display, "{}", self.op.name()).unwrap(); + } + level => { + let multiline_display = self.op.multiline_display().join("\n"); + writeln!(display, "{}", multiline_display).unwrap(); + if let StatsState::Materialized(stats) = &self.plan_stats { + writeln!(display, "Stats = {}", stats).unwrap(); + } + if matches!(level, DisplayLevel::Verbose) { + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } } display } diff --git a/src/daft-local-execution/src/sinks/write.rs b/src/daft-local-execution/src/sinks/write.rs index d9724ff760..3ae84b0e87 100644 --- a/src/daft-local-execution/src/sinks/write.rs +++ b/src/daft-local-execution/src/sinks/write.rs @@ -17,6 +17,7 @@ use crate::{ ExecutionRuntimeContext, ExecutionTaskSpawner, NUM_CPUS, }; +#[derive(Debug)] pub enum WriteFormat { Parquet, PartitionedParquet, @@ -156,6 +157,15 @@ impl BlockingSink for WriteSink { } } + fn multiline_display(&self) -> Vec { + let mut lines = vec![]; + lines.push(format!("Write: {:?}", self.write_format)); + if let Some(partition_by) = &self.partition_by { + lines.push(format!("Partition by: {:?}", partition_by)); + } + lines + } + fn max_concurrency(&self) -> usize { if self.partition_by.is_some() { *NUM_CPUS diff --git a/src/daft-local-execution/src/sources/empty_scan.rs b/src/daft-local-execution/src/sources/empty_scan.rs index 785ce9fc17..4184efe760 100644 --- a/src/daft-local-execution/src/sources/empty_scan.rs +++ b/src/daft-local-execution/src/sources/empty_scan.rs @@ -35,7 +35,13 @@ impl Source for EmptyScanSource { Ok(Box::pin(futures::stream::once(async { Ok(empty) }))) } fn name(&self) -> &'static str { - "EmptyScanSource" + "EmptyScan" + } + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push("EmptyScan:".to_string()); + res.push(format!("Schema = {}", self.schema.short_string())); + res } fn schema(&self) -> &SchemaRef { &self.schema diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 53f682f543..ae0fe203c1 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -11,13 +11,22 @@ use super::source::Source; use crate::sources::source::SourceStream; pub struct InMemorySource { - data: PartitionSetRef, + data: Option>, + size_bytes: usize, schema: SchemaRef, } impl InMemorySource { - pub fn new(data: PartitionSetRef, schema: SchemaRef) -> Self { - Self { data, schema } + pub fn new( + data: Option>, + schema: SchemaRef, + size_bytes: usize, + ) -> Self { + Self { + data, + size_bytes, + schema, + } } pub fn arced(self) -> Arc { Arc::new(self) as Arc @@ -32,11 +41,25 @@ impl Source for InMemorySource { _maintain_order: bool, _io_stats: IOStatsRef, ) -> DaftResult> { - Ok(self.data.clone().to_partition_stream()) + Ok(self + .data + .as_ref() + .unwrap_or_else(|| panic!("No data in InMemorySource")) + .clone() + .to_partition_stream()) } fn name(&self) -> &'static str { - "InMemory" + "InMemorySource" } + + fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push("InMemorySource:".to_string()); + res.push(format!("Schema = {}", self.schema.short_string())); + res.push(format!("Size bytes = {}", self.size_bytes)); + res + } + fn schema(&self) -> &SchemaRef { &self.schema } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 6e4a76e1fe..909442a8c9 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -5,10 +5,11 @@ use std::{ use async_trait::async_trait; use common_daft_config::DaftExecutionConfig; +use common_display::{tree::TreeDisplay, DisplayAs, DisplayLevel}; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; use common_runtime::get_io_runtime; -use common_scan_info::Pushdowns; +use common_scan_info::{Pushdowns, ScanTaskLike}; use daft_core::prelude::{AsArrow, Int64Array, SchemaRef, Utf8Array}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; @@ -120,7 +121,14 @@ impl Source for ScanTaskSource { } fn name(&self) -> &'static str { - "ScanTask" + "ScanTaskSource" + } + + fn multiline_display(&self) -> Vec { + self.display_as(DisplayLevel::Default) + .lines() + .map(|s| s.to_string()) + .collect() } fn schema(&self) -> &SchemaRef { @@ -128,6 +136,90 @@ impl Source for ScanTaskSource { } } +impl TreeDisplay for ScanTaskSource { + fn display_as(&self, level: DisplayLevel) -> String { + use std::fmt::Write; + fn base_display(scan: &ScanTaskSource) -> String { + let num_scan_tasks = scan.scan_tasks.len(); + let total_bytes: usize = scan + .scan_tasks + .iter() + .map(|st| st.size_bytes_on_disk().unwrap_or(0)) + .sum(); + + #[allow(unused_mut)] + let mut s = format!( + "ScanTaskSource: +Num Scan Tasks = {num_scan_tasks} +Estimated Scan Bytes = {total_bytes} +" + ); + #[cfg(feature = "python")] + if let FileFormatConfig::Database(config) = + scan.scan_tasks[0].file_format_config().as_ref() + { + if num_scan_tasks == 1 { + writeln!(s, "SQL Query = {}", &config.sql).unwrap(); + } else { + writeln!(s, "SQL Queries = [{},..]", &config.sql).unwrap(); + } + } + s + } + match level { + DisplayLevel::Compact => self.get_name(), + DisplayLevel::Default => { + let mut s = base_display(self); + // We're only going to display the pushdowns and schema for the first scan task. + let pushdown = self.scan_tasks[0].pushdowns(); + if !pushdown.is_empty() { + s.push_str(&pushdown.display_as(DisplayLevel::Compact)); + s.push('\n'); + } + + let schema = self.scan_tasks[0].schema(); + writeln!( + s, + "Schema: {{{}}}", + schema.display_as(DisplayLevel::Compact) + ) + .unwrap(); + + let tasks = self.scan_tasks.iter(); + + writeln!(s, "Scan Tasks: [").unwrap(); + for (i, st) in tasks.enumerate() { + if i < 3 || i >= self.scan_tasks.len() - 3 { + writeln!(s, "{}", st.as_ref().display_as(DisplayLevel::Compact)).unwrap(); + } else if i == 3 { + writeln!(s, "...").unwrap(); + } + } + writeln!(s, "]").unwrap(); + + s + } + DisplayLevel::Verbose => { + let mut s = base_display(self); + writeln!(s, "Scan Tasks: [").unwrap(); + + for st in &self.scan_tasks { + writeln!(s, "{}", st.as_ref().display_as(DisplayLevel::Verbose)).unwrap(); + } + s + } + } + } + + fn get_name(&self) -> String { + "ScanTaskSource".to_string() + } + + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + vec![] + } +} + // Read all iceberg delete files and return a map of file paths to delete positions async fn get_delete_map( scan_tasks: &[Arc], diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 2f0f9a2e23..6fad499f57 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -5,6 +5,7 @@ use common_display::{tree::TreeDisplay, utils::bytes_to_human_readable}; use common_error::DaftResult; use daft_core::prelude::SchemaRef; use daft_io::{IOStatsContext, IOStatsRef}; +use daft_logical_plan::stats::StatsState; use daft_micropartition::MicroPartition; use futures::{stream::BoxStream, StreamExt}; @@ -21,6 +22,7 @@ pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; #[async_trait] pub trait Source: Send + Sync { fn name(&self) -> &'static str; + fn multiline_display(&self) -> Vec; async fn get_data( &self, maintain_order: bool, @@ -29,31 +31,61 @@ pub trait Source: Send + Sync { fn schema(&self) -> &SchemaRef; } -struct SourceNode { +pub(crate) struct SourceNode { source: Arc, runtime_stats: Arc, + plan_stats: StatsState, io_stats: IOStatsRef, } +impl SourceNode { + pub fn new(source: Arc, plan_stats: StatsState) -> Self { + let runtime_stats = RuntimeStatsContext::new(); + let io_stats = IOStatsContext::new(source.name()); + Self { + source, + runtime_stats, + plan_stats, + io_stats, + } + } + + pub fn boxed(self) -> Box { + Box::new(self) + } +} + impl TreeDisplay for SourceNode { fn display_as(&self, level: common_display::DisplayLevel) -> String { use std::fmt::Write; let mut display = String::new(); - writeln!(display, "{}", self.name()).unwrap(); - use common_display::DisplayLevel::Compact; - if matches!(level, Compact) { - } else { - let rt_result = self.runtime_stats.result(); + use common_display::DisplayLevel; + match level { + DisplayLevel::Compact => { + writeln!(display, "{}", self.source.name()).unwrap(); + } + level => { + let multiline_display = self.source.multiline_display().join("\n"); + writeln!(display, "{}", multiline_display).unwrap(); - writeln!(display).unwrap(); - rt_result.display(&mut display, false, true, false).unwrap(); - let bytes_read = self.io_stats.load_bytes_read(); - writeln!( - display, - "bytes read = {}", - bytes_to_human_readable(bytes_read) - ) - .unwrap(); + if let StatsState::Materialized(stats) = &self.plan_stats { + writeln!(display, "Stats = {}", stats).unwrap(); + } + + if matches!(level, DisplayLevel::Verbose) { + let rt_result = self.runtime_stats.result(); + + writeln!(display).unwrap(); + rt_result.display(&mut display, false, true, false).unwrap(); + let bytes_read = self.io_stats.load_bytes_read(); + writeln!( + display, + "Bytes read = {}", + bytes_to_human_readable(bytes_read) + ) + .unwrap(); + } + } } display } @@ -112,14 +144,3 @@ impl PipelineNode for SourceNode { self } } - -impl From> for Box { - fn from(source: Arc) -> Self { - let name = source.name(); - Box::new(SourceNode { - source, - runtime_stats: RuntimeStatsContext::new(), - io_stats: IOStatsContext::new(name), - }) - } -} diff --git a/src/daft-logical-plan/Cargo.toml b/src/daft-logical-plan/Cargo.toml index 0ff2d2ac1c..879a3efa85 100644 --- a/src/daft-logical-plan/Cargo.toml +++ b/src/daft-logical-plan/Cargo.toml @@ -18,6 +18,7 @@ derivative = {workspace = true} indexmap = {workspace = true} itertools = {workspace = true} log = {workspace = true} +num-format = {workspace = true} pyo3 = {workspace = true, optional = true} serde = {workspace = true, features = ["rc"]} snafu = {workspace = true} diff --git a/src/daft-logical-plan/src/stats.rs b/src/daft-logical-plan/src/stats.rs index 0faad9f184..b41d8888cb 100644 --- a/src/daft-logical-plan/src/stats.rs +++ b/src/daft-logical-plan/src/stats.rs @@ -1,5 +1,6 @@ use std::{fmt::Display, hash::Hash, ops::Deref}; +use common_display::utils::bytes_to_human_readable; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] @@ -44,10 +45,12 @@ impl Default for PlanStats { impl Display for PlanStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use num_format::{Locale, ToFormattedString}; write!( f, - "{{ Approx num rows = {}, Approx num bytes = {} }}", - self.approx_stats.num_rows, self.approx_stats.size_bytes, + "{{ Approx num rows = {}, Approx size bytes = {} }}", + self.approx_stats.num_rows.to_formatted_string(&Locale::en), + bytes_to_human_readable(self.approx_stats.size_bytes), ) } }