Skip to content

Commit

Permalink
feat: Explain for swordfish (#3667)
Browse files Browse the repository at this point in the history
Enable printing swordfish plan in `.explain(show_all=True)`, and adds
more detail in explain analyze, e.g. show the estimated stats and then
the actual stats

Examples (TPCH Q2)
- Explain: <img width="1079" alt="Screenshot 2025-01-10 at 3 51 36 PM"
src="https://github.com/user-attachments/assets/c812cd1e-2b22-4937-a760-760a0c6dc3af"
/>
- Explain Analyze: <img width="901" alt="Screenshot 2025-01-10 at 3 52
36 PM"
src="https://github.com/user-attachments/assets/af526808-09f2-4ae4-a4d6-40c582fb7709"
/>

https://github.com/user-attachments/files/18383405/explain-analyze-1736553138856-mermaid.md

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
  • Loading branch information
3 people authored Jan 27, 2025
1 parent a6ba617 commit 603199f
Show file tree
Hide file tree
Showing 40 changed files with 802 additions and 132 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ jaq-parse = "1.0.0"
jaq-std = "1.2.0"
mur3 = "0.1.0"
num-derive = "0.3.3"
num-format = "0.4.4"
num-traits = "0.2"
once_cell = "1.19.0"
path_macro = "1.0.0"
Expand Down
12 changes: 11 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1698,8 +1698,18 @@ class LogicalPlanBuilder:
class NativeExecutor:
def __init__(self) -> None: ...
def run(
self, psets: dict[str, list[PartitionT]], cfg: PyDaftExecutionConfig, results_buffer_size: int | None
self,
builder: LogicalPlanBuilder,
psets: dict[str, list[PartitionT]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
) -> Iterator[PyMicroPartition]: ...
def repr_ascii(
self, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, simple: bool
) -> str: ...
def repr_mermaid(
self, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, options: MermaidOptions
) -> str: ...

class PyDaftExecutionConfig:
@staticmethod
Expand Down
8 changes: 7 additions & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.execution.native_executor import NativeExecutor
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.filesystem import overwrite_files
from daft.logical.builder import LogicalPlanBuilder
Expand Down Expand Up @@ -208,10 +209,15 @@ def explain(
print_to_file("\n== Optimized Logical Plan ==\n")
builder = builder.optimize()
print_to_file(builder.pretty_print(simple))
print_to_file("\n== Physical Plan ==\n")
if get_context().get_or_create_runner().name != "native":
print_to_file("\n== Physical Plan ==\n")
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
print_to_file(physical_plan_scheduler.pretty_print(simple, format=format))
else:
native_executor = NativeExecutor()
print_to_file(
native_executor.pretty_print(builder, get_context().daft_execution_config, simple, format=format)
)
else:
print_to_file(
"\n \nSet `show_all=True` to also see the Optimized and Physical plans. This will run the query optimizer.",
Expand Down
18 changes: 14 additions & 4 deletions daft/dataframe/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,20 @@ def _repr_markdown_(self):
display_opts.with_subgraph_options(name="Optimized LogicalPlan", subgraph_id="optimized")
)
output += "\n"
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
output += physical_plan_scheduler._scheduler.repr_mermaid(
display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical")
)
if get_context().get_or_create_runner().name != "native":
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
output += physical_plan_scheduler._scheduler.repr_mermaid(
display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical")
)
else:
from daft.execution.native_executor import NativeExecutor

native_executor = NativeExecutor()
output += native_executor._executor.repr_mermaid(
builder._builder,
get_context().daft_execution_config,
display_opts.with_subgraph_options(name="Physical Plan", subgraph_id="physical"),
)
output += "\n"
output += "unoptimized --> optimized\n"
output += "optimized --> physical\n"
Expand Down
16 changes: 16 additions & 0 deletions daft/execution/native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from daft.daft import (
NativeExecutor as _NativeExecutor,
)
from daft.dataframe.display import MermaidOptions
from daft.table import MicroPartition

if TYPE_CHECKING:
Expand Down Expand Up @@ -37,3 +38,18 @@ def run(
LocalMaterializedResult(MicroPartition._from_pymicropartition(part))
for part in self._executor.run(builder._builder, psets_mp, daft_execution_config, results_buffer_size)
)

def pretty_print(
self,
builder: LogicalPlanBuilder,
daft_execution_config: PyDaftExecutionConfig,
simple: bool = False,
format: str = "ascii",
) -> str:
"""Pretty prints the current underlying logical plan."""
if format == "ascii":
return self._executor.repr_ascii(builder._builder, daft_execution_config, simple)
elif format == "mermaid":
return self._executor.repr_mermaid(builder._builder, daft_execution_config, MermaidOptions(simple))
else:
raise ValueError(f"Unknown format: {format}")
2 changes: 1 addition & 1 deletion src/common/display/src/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn fmt_tree_indent_style<'a, W: fmt::Write + 'a>(

// Print the tree recursively, and illustrate the tree structure in the same style as `git log --graph`.
// `depth` is the number of forks in this node's ancestors.
fn fmt_tree_gitstyle<'a, W: fmt::Write + 'a>(
pub fn fmt_tree_gitstyle<'a, W: fmt::Write + 'a>(
node: &dyn TreeDisplay,
depth: usize,
s: &'a mut W,
Expand Down
3 changes: 2 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ daft-writers = {path = "../daft-writers", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
indicatif = "0.17.9"
itertools = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
num-format = "0.4.4"
num-format = {workspace = true}
pin-project = "1"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::sync::Arc;
use std::{sync::Arc, vec};

use common_error::DaftResult;
#[cfg(feature = "python")]
use daft_dsl::python::PyExpr;
use daft_dsl::{
count_actor_pool_udfs,
functions::python::{get_batch_size, get_concurrency, get_resource_request},
functions::python::{get_batch_size, get_concurrency, get_resource_request, get_udf_names},
ExprRef,
};
#[cfg(feature = "python")]
use daft_micropartition::python::PyMicroPartition;
use daft_micropartition::MicroPartition;
use itertools::Itertools;
#[cfg(feature = "python")]
use pyo3::prelude::*;
use tracing::{instrument, Span};
Expand Down Expand Up @@ -184,6 +185,30 @@ impl IntermediateOperator for ActorPoolProjectOperator {
"ActorPoolProject"
}

fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push("ActorPoolProject:".to_string());
res.push(format!(
"Projection = [{}]",
self.projection.iter().map(|e| e.to_string()).join(", ")
));
res.push(format!(
"UDFs = [{}]",
self.projection.iter().flat_map(get_udf_names).join(", ")
));
res.push(format!("Concurrency = {}", self.concurrency));
if let Some(resource_request) = get_resource_request(&self.projection) {
let multiline_display = resource_request.multiline_display();
res.push(format!(
"Resource request = {{ {} }}",
multiline_display.join(", ")
));
} else {
res.push("Resource request = None".to_string());
}
res
}

fn make_state(&self) -> DaftResult<Box<dyn IntermediateOpState>> {
// TODO: Pass relevant CUDA_VISIBLE_DEVICES to the actor
Ok(Box::new(ActorPoolProjectState {
Expand Down
9 changes: 8 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,14 @@ impl IntermediateOperator for CrossJoinOperator {
}

fn name(&self) -> &'static str {
"CrossJoinOperator"
"CrossJoin"
}

fn multiline_display(&self) -> Vec<String> {
vec![
"CrossJoin:".to_string(),
format!("Stream Side = {:?}", self.stream_side),
]
}

fn make_state(&self) -> DaftResult<Box<dyn IntermediateOpState>> {
Expand Down
10 changes: 9 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use daft_dsl::ExprRef;
use daft_functions::list::explode;
use daft_micropartition::MicroPartition;
use itertools::Itertools;
use tracing::{instrument, Span};

use super::intermediate_op::{
Expand Down Expand Up @@ -46,7 +47,14 @@ impl IntermediateOperator for ExplodeOperator {
.into()
}

fn multiline_display(&self) -> Vec<String> {
vec![format!(
"Explode: {}",
self.to_explode.iter().map(|e| e.to_string()).join(", ")
)]
}

fn name(&self) -> &'static str {
"ExplodeOperator"
"Explode"
}
}
6 changes: 5 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ impl IntermediateOperator for FilterOperator {
.into()
}

fn multiline_display(&self) -> Vec<String> {
vec![format!("Filter: {}", self.predicate)]
}

fn name(&self) -> &'static str {
"FilterOperator"
"Filter"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_table::{GrowableTable, ProbeState};
use indexmap::IndexSet;
use itertools::Itertools;
use tracing::{info_span, instrument, Span};

use super::intermediate_op::{
Expand Down Expand Up @@ -210,7 +211,22 @@ impl IntermediateOperator for InnerHashJoinProbeOperator {
}

fn name(&self) -> &'static str {
"InnerHashJoinProbeOperator"
"InnerHashJoinProbe"
}

fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push("InnerHashJoinProbe:".to_string());
res.push(format!(
"Probe on: [{}]",
self.params
.probe_on
.iter()
.map(|e| e.to_string())
.join(", ")
));
res.push(format!("Build on left: {}", self.params.build_on_left));
res
}

fn make_state(&self) -> DaftResult<Box<dyn IntermediateOpState>> {
Expand Down
32 changes: 25 additions & 7 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_runtime::get_compute_runtime;
use daft_logical_plan::stats::StatsState;
use daft_micropartition::MicroPartition;
use snafu::ResultExt;
use tracing::{info_span, instrument};
Expand Down Expand Up @@ -47,6 +48,7 @@ pub trait IntermediateOperator: Send + Sync {
task_spawner: &ExecutionTaskSpawner,
) -> IntermediateOpExecuteResult;
fn name(&self) -> &'static str;
fn multiline_display(&self) -> Vec<String>;
fn make_state(&self) -> DaftResult<Box<dyn IntermediateOpState>> {
Ok(Box::new(DefaultIntermediateOperatorState {}))
}
Expand Down Expand Up @@ -78,26 +80,30 @@ pub struct IntermediateNode {
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
runtime_stats: Arc<RuntimeStatsContext>,
plan_stats: StatsState,
}

impl IntermediateNode {
pub(crate) fn new(
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
plan_stats: StatsState,
) -> Self {
let rts = RuntimeStatsContext::new();
Self::new_with_runtime_stats(intermediate_op, children, rts)
Self::new_with_runtime_stats(intermediate_op, children, rts, plan_stats)
}

pub(crate) fn new_with_runtime_stats(
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
runtime_stats: Arc<RuntimeStatsContext>,
plan_stats: StatsState,
) -> Self {
Self {
intermediate_op,
children,
runtime_stats,
plan_stats,
}
}

Expand Down Expand Up @@ -172,12 +178,24 @@ impl TreeDisplay for IntermediateNode {
fn display_as(&self, level: common_display::DisplayLevel) -> String {
use std::fmt::Write;
let mut display = String::new();
writeln!(display, "{}", self.intermediate_op.name()).unwrap();
use common_display::DisplayLevel::Compact;
if matches!(level, Compact) {
} else {
let rt_result = self.runtime_stats.result();
rt_result.display(&mut display, true, true, true).unwrap();

use common_display::DisplayLevel;
match level {
DisplayLevel::Compact => {
writeln!(display, "{}", self.intermediate_op.name()).unwrap();
}
level => {
let multiline_display = self.intermediate_op.multiline_display().join("\n");
writeln!(display, "{}", multiline_display).unwrap();
if let StatsState::Materialized(stats) = &self.plan_stats {
writeln!(display, "Stats = {}", stats).unwrap();
}
if matches!(level, DisplayLevel::Verbose) {
writeln!(display).unwrap();
let rt_result = self.runtime_stats.result();
rt_result.display(&mut display, true, true, true).unwrap();
}
}
}
display
}
Expand Down
21 changes: 20 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use common_error::{DaftError, DaftResult};
use daft_dsl::{functions::python::get_resource_request, ExprRef};
use daft_micropartition::MicroPartition;
use itertools::Itertools;
use tracing::{instrument, Span};

use super::intermediate_op::{
Expand Down Expand Up @@ -55,7 +56,25 @@ impl IntermediateOperator for ProjectOperator {
}

fn name(&self) -> &'static str {
"ProjectOperator"
"Project"
}

fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!(
"Project: {}",
self.projection.iter().map(|e| e.to_string()).join(", ")
));
if let Some(resource_request) = get_resource_request(&self.projection) {
let multiline_display = resource_request.multiline_display();
res.push(format!(
"Resource request = {{ {} }}",
multiline_display.join(", ")
));
} else {
res.push("Resource request = None".to_string());
}
res
}

fn max_concurrency(&self) -> DaftResult<usize> {
Expand Down
Loading

0 comments on commit 603199f

Please sign in to comment.