Skip to content

Commit

Permalink
feat: Stageify plan on shuffle boundaries (#3781)
Browse files Browse the repository at this point in the history
Refactor the current adaptive planner to stageify the plan on, works as
follows:

1. Translate logical plan until the root is reached or we hit a point
where materialized data can better inform the planning decision, i.e. a
join.
2. Split the translated physical plan on shuffle exchanges, first
emitting the plan until the exchange, then the exchange, then repeat
until the physical plan is complete.
3. Optimize the logical plan and repeat from step 1.

Essentially, the logical boundaries are joins (these are the only places
where data size affects the planning decision) and the physical
boundaries are shuffles.

**Results of TPCH SF1000, 4 nodes**

After:

Daft Q1 took 31.26 seconds
Daft Q2 took 35.46 seconds
Daft Q3 took 53.77 seconds
Daft Q4 took 19.98 seconds
Daft Q5 took 206.01 seconds
Daft Q6 took 10.50 seconds
Daft Q7 took 91.34 seconds
Daft Q8 took 142.85 seconds
Daft Q9 took 271.16 seconds
Daft Q10 took 53.28 seconds
Total time: 918.28 seconds
Spilled 2194637 MiB

Before:

Q1 took 31.05 seconds
Q2 took 24.95 seconds
Q3 took 50.91 seconds
Q4 took 24.11 seconds
Q5 took 177.07 seconds
Q6 took 11.17 seconds
Q7 took 75.97 seconds
Q8 took 150.76 seconds
Q9 took 263.51 seconds
Q10 took 59.37 seconds
Total: 868.87 seconds
Spilled 2200948 MiB

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
  • Loading branch information
3 people authored Feb 26, 2025
1 parent 0080a7e commit f782869
Show file tree
Hide file tree
Showing 25 changed files with 782 additions and 218 deletions.
43 changes: 42 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ jobs:
daft-runner: [py, ray, native]
pyarrow-version: [8.0.0, 16.0.0]
os: [ubuntu-20.04, windows-latest]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
- daft-runner: ray
pyarrow-version: 8.0.0
os: ubuntu-20.04
Expand Down Expand Up @@ -119,6 +124,7 @@ jobs:
CARGO_TARGET_DIR: ./target

DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}

- name: Build library and Test with pytest (Windows)
if: ${{ (runner.os == 'Windows') }}
Expand All @@ -135,6 +141,7 @@ jobs:
# cargo llvm-cov --no-run --lcov --output-path report-output\rust-coverage-${{ join(matrix.*, '-') }}.lcov
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}

- name: Upload coverage report
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -221,6 +228,12 @@ jobs:
matrix:
python-version: ['3.9']
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -263,7 +276,7 @@ jobs:
pytest tests/integration/test_tpch.py --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}

DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -296,6 +309,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -340,6 +359,7 @@ jobs:
pytest tests/io -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -374,6 +394,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
# These permissions are needed to interact with GitHub's OIDC Token endpoint.
# This is used in the step "Assume GitHub Actions AWS Credentials"
permissions:
Expand Down Expand Up @@ -436,6 +462,7 @@ jobs:
pytest tests/integration/io -m 'integration and not benchmark' --credentials --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() }}
Expand Down Expand Up @@ -468,6 +495,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -513,6 +546,7 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -545,6 +579,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -589,6 +629,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
3 changes: 2 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1588,13 +1588,14 @@ class AdaptivePhysicalPlanScheduler:
# Todo use in memory info here instead
def update(
self,
source_id: int,
stage_id: int,
partition_key: str,
cache_entry: PartitionCacheEntry,
num_partitions: int,
size_bytes: int,
num_rows: int,
) -> None: ...
def explain_analyze(self, explain_analyze_dir: str) -> None: ...

class LogicalPlanBuilder:
"""A logical plan builder, which simplifies constructing logical plans via a fluent interface.
Expand Down
7 changes: 5 additions & 2 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def next(self) -> tuple[int | None, PhysicalPlanScheduler]:
def is_done(self) -> bool:
return self._scheduler.is_done()

def update(self, source_id: int, cache_entry: PartitionCacheEntry):
def update(self, stage_id: int, cache_entry: PartitionCacheEntry):
num_partitions = cache_entry.num_partitions()
assert num_partitions is not None
size_bytes = cache_entry.size_bytes()
Expand All @@ -92,10 +92,13 @@ def update(self, source_id: int, cache_entry: PartitionCacheEntry):
assert num_rows is not None

self._scheduler.update(
source_id,
stage_id,
cache_entry.key,
cache_entry,
num_partitions=num_partitions,
size_bytes=size_bytes,
num_rows=num_rows,
)

def explain_analyze(self, explain_analyze_dir: str) -> None:
self._scheduler.explain_analyze(explain_analyze_dir)
19 changes: 15 additions & 4 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import dataclasses
import logging
import os
import threading
import time
import uuid
Expand Down Expand Up @@ -1280,20 +1281,30 @@ def run_iter(
if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
while not adaptive_planner.is_done():
source_id, plan_scheduler = adaptive_planner.next()
stage_id, plan_scheduler = adaptive_planner.next()
# don't store partition sets in variable to avoid reference
result_uuid = self._start_plan(
plan_scheduler, daft_execution_config, results_buffer_size=results_buffer_size
)
del plan_scheduler
results_iter = self._stream_plan(result_uuid)
# if source_id is None that means this is the final stage
if source_id is None:
# if stage_id is None that means this is the final stage
if stage_id is None:
yield from results_iter
else:
cache_entry = self._collect_into_cache(results_iter)
adaptive_planner.update(source_id, cache_entry)
adaptive_planner.update(stage_id, cache_entry)
del cache_entry
enable_explain_analyze = os.getenv("DAFT_DEV_ENABLE_EXPLAIN_ANALYZE")
ray_logs_location = ray_tracing.get_log_location()
if (
ray_logs_location.exists()
and enable_explain_analyze is not None
and enable_explain_analyze in ["1", "true"]
):
explain_analyze_dir = ray_tracing.get_daft_trace_location(ray_logs_location)
explain_analyze_dir.mkdir(exist_ok=True, parents=True)
adaptive_planner.explain_analyze(str(explain_analyze_dir))
else:
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
Expand Down
3 changes: 2 additions & 1 deletion src/daft-logical-plan/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ impl LogicalPlanBuilder {
let source_info = SourceInfo::InMemory(InMemoryInfo::new(
schema.clone(),
partition_key.into(),
cache_entry,
Some(cache_entry),
num_partitions,
size_bytes,
num_rows,
None, // TODO(sammy) thread through clustering spec to Python
None,
));
let logical_plan: LogicalPlan = ops::Source::new(schema, source_info.into()).into();

Expand Down
5 changes: 0 additions & 5 deletions src/daft-logical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ mod test {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand All @@ -76,7 +75,6 @@ mod test {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand Down Expand Up @@ -126,7 +124,6 @@ On = col(id)
Output schema = id#Int32, text#Utf8, id2#UInt64, first_name#Utf8, last_name#Utf8"]
Filter4["Filter: col(id) == lit(1)"]
Source5["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
Source5 --> Filter4
Expand All @@ -138,7 +135,6 @@ Limit9["Limit: 1000"]
Filter10["Filter: startswith(col(last_name), lit('S')) & endswith(col(last_name),
lit('n'))"]
Source11["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = first_name#Utf8, last_name#Utf8, id#Int32"]
Source11 --> Filter10
Expand Down Expand Up @@ -237,7 +233,6 @@ Project1 --> Limit0
let mermaid_repr = plan.repr_mermaid(opts);
let expected = r#"subgraph optimized["Optimized Logical Plan"]
optimizedSource0["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
end
Expand Down
5 changes: 1 addition & 4 deletions src/daft-logical-plan/src/ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,9 @@ impl Source {
res.push(format!("Number of partitions = {}", num_partitions));
}
SourceInfo::PlaceHolder(PlaceHolderInfo {
source_id,
clustering_spec,
..
clustering_spec, ..
}) => {
res.push("PlaceHolder:".to_string());
res.push(format!("Source ID = {}", source_id));
res.extend(clustering_spec.multiline_display());
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ impl Default for OptimizerBuilder {
}

impl OptimizerBuilder {
pub fn new() -> Self {
Self {
rule_batches: vec![],
config: Default::default(),
}
}

pub fn reorder_joins(mut self) -> Self {
self.rule_batches.push(RuleBatch::new(
vec![
Expand All @@ -168,6 +175,14 @@ impl OptimizerBuilder {
self
}

pub fn enrich_with_stats(mut self) -> Self {
self.rule_batches.push(RuleBatch::new(
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
));
self
}

pub fn simplify_expressions(mut self) -> Self {
// Try to simplify expressions again as other rules could introduce new exprs.
self.rule_batches.push(RuleBatch::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ mod tests {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand All @@ -470,7 +469,6 @@ mod tests {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ mod test {
source_info: Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
stats_state: StatsState::NotMaterialized,
})
Expand Down
Loading

0 comments on commit f782869

Please sign in to comment.