Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Stageify plan on shuffle boundaries #3781

Merged
merged 16 commits into from
Feb 26, 2025
43 changes: 42 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ jobs:
daft-runner: [py, ray, native]
pyarrow-version: [8.0.0, 16.0.0]
os: [ubuntu-20.04, windows-latest]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
- daft-runner: ray
pyarrow-version: 8.0.0
os: ubuntu-20.04
Expand Down Expand Up @@ -119,6 +124,7 @@ jobs:
CARGO_TARGET_DIR: ./target

DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}

- name: Build library and Test with pytest (Windows)
if: ${{ (runner.os == 'Windows') }}
Expand All @@ -135,6 +141,7 @@ jobs:
# cargo llvm-cov --no-run --lcov --output-path report-output\rust-coverage-${{ join(matrix.*, '-') }}.lcov
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}

- name: Upload coverage report
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -221,6 +228,12 @@ jobs:
matrix:
python-version: ['3.9']
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -263,7 +276,7 @@ jobs:
pytest tests/integration/test_tpch.py --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}

DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -296,6 +309,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -340,6 +359,7 @@ jobs:
pytest tests/io -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -374,6 +394,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
# These permissions are needed to interact with GitHub's OIDC Token endpoint.
# This is used in the step "Assume GitHub Actions AWS Credentials"
permissions:
Expand Down Expand Up @@ -436,6 +462,7 @@ jobs:
pytest tests/integration/io -m 'integration and not benchmark' --credentials --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() }}
Expand Down Expand Up @@ -468,6 +495,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -513,6 +546,7 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -545,6 +579,12 @@ jobs:
matrix:
python-version: ['3.9'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray, native]
enable-aqe: [1, 0]
exclude:
- daft-runner: py
enable-aqe: 1
- daft-runner: native
enable-aqe: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -589,6 +629,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_AQE: ${{ matrix.enable-aqe }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v2.0.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
3 changes: 2 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1588,13 +1588,14 @@ class AdaptivePhysicalPlanScheduler:
# Todo use in memory info here instead
def update(
self,
source_id: int,
stage_id: int,
partition_key: str,
cache_entry: PartitionCacheEntry,
num_partitions: int,
size_bytes: int,
num_rows: int,
) -> None: ...
def explain_analyze(self, explain_analyze_dir: str) -> None: ...

class LogicalPlanBuilder:
"""A logical plan builder, which simplifies constructing logical plans via a fluent interface.
Expand Down
7 changes: 5 additions & 2 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
def is_done(self) -> bool:
return self._scheduler.is_done()

def update(self, source_id: int, cache_entry: PartitionCacheEntry):
def update(self, stage_id: int, cache_entry: PartitionCacheEntry):
num_partitions = cache_entry.num_partitions()
assert num_partitions is not None
size_bytes = cache_entry.size_bytes()
Expand All @@ -92,10 +92,13 @@
assert num_rows is not None

self._scheduler.update(
source_id,
stage_id,
cache_entry.key,
cache_entry,
num_partitions=num_partitions,
size_bytes=size_bytes,
num_rows=num_rows,
)

def explain_analyze(self, explain_analyze_dir: str) -> None:
self._scheduler.explain_analyze(explain_analyze_dir)

Check warning on line 104 in daft/plan_scheduler/physical_plan_scheduler.py

View check run for this annotation

Codecov / codecov/patch

daft/plan_scheduler/physical_plan_scheduler.py#L104

Added line #L104 was not covered by tests
19 changes: 15 additions & 4 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import dataclasses
import logging
import os
import threading
import time
import uuid
Expand Down Expand Up @@ -1280,20 +1281,30 @@
if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
while not adaptive_planner.is_done():
source_id, plan_scheduler = adaptive_planner.next()
stage_id, plan_scheduler = adaptive_planner.next()
# don't store partition sets in variable to avoid reference
result_uuid = self._start_plan(
plan_scheduler, daft_execution_config, results_buffer_size=results_buffer_size
)
del plan_scheduler
results_iter = self._stream_plan(result_uuid)
# if source_id is None that means this is the final stage
if source_id is None:
# if stage_id is None that means this is the final stage
if stage_id is None:
yield from results_iter
else:
cache_entry = self._collect_into_cache(results_iter)
adaptive_planner.update(source_id, cache_entry)
adaptive_planner.update(stage_id, cache_entry)
del cache_entry
enable_explain_analyze = os.getenv("DAFT_DEV_ENABLE_EXPLAIN_ANALYZE")
ray_logs_location = ray_tracing.get_log_location()
if (
ray_logs_location.exists()
and enable_explain_analyze is not None
and enable_explain_analyze in ["1", "true"]
):
explain_analyze_dir = ray_tracing.get_daft_trace_location(ray_logs_location)
explain_analyze_dir.mkdir(exist_ok=True, parents=True)
adaptive_planner.explain_analyze(str(explain_analyze_dir))

Check warning on line 1307 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L1305-L1307

Added lines #L1305 - L1307 were not covered by tests
else:
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
Expand Down
3 changes: 2 additions & 1 deletion src/daft-logical-plan/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ impl LogicalPlanBuilder {
let source_info = SourceInfo::InMemory(InMemoryInfo::new(
schema.clone(),
partition_key.into(),
cache_entry,
Some(cache_entry),
num_partitions,
size_bytes,
num_rows,
None, // TODO(sammy) thread through clustering spec to Python
None,
));
let logical_plan: LogicalPlan = ops::Source::new(schema, source_info.into()).into();

Expand Down
5 changes: 0 additions & 5 deletions src/daft-logical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ mod test {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand All @@ -76,7 +75,6 @@ mod test {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand Down Expand Up @@ -126,7 +124,6 @@ On = col(id)
Output schema = id#Int32, text#Utf8, id2#UInt64, first_name#Utf8, last_name#Utf8"]
Filter4["Filter: col(id) == lit(1)"]
Source5["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
Source5 --> Filter4
Expand All @@ -138,7 +135,6 @@ Limit9["Limit: 1000"]
Filter10["Filter: startswith(col(last_name), lit('S')) & endswith(col(last_name),
lit('n'))"]
Source11["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = first_name#Utf8, last_name#Utf8, id#Int32"]
Source11 --> Filter10
Expand Down Expand Up @@ -237,7 +233,6 @@ Project1 --> Limit0
let mermaid_repr = plan.repr_mermaid(opts);
let expected = r#"subgraph optimized["Optimized Logical Plan"]
optimizedSource0["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
end
Expand Down
5 changes: 1 addition & 4 deletions src/daft-logical-plan/src/ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,9 @@ impl Source {
res.push(format!("Number of partitions = {}", num_partitions));
}
SourceInfo::PlaceHolder(PlaceHolderInfo {
source_id,
clustering_spec,
..
clustering_spec, ..
}) => {
res.push("PlaceHolder:".to_string());
res.push(format!("Source ID = {}", source_id));
res.extend(clustering_spec.multiline_display());
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ impl Default for OptimizerBuilder {
}

impl OptimizerBuilder {
pub fn new() -> Self {
Self {
rule_batches: vec![],
config: Default::default(),
}
}

pub fn reorder_joins(mut self) -> Self {
self.rule_batches.push(RuleBatch::new(
vec![
Expand All @@ -168,6 +175,14 @@ impl OptimizerBuilder {
self
}

pub fn enrich_with_stats(mut self) -> Self {
self.rule_batches.push(RuleBatch::new(
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
));
self
}

pub fn simplify_expressions(mut self) -> Self {
// Try to simplify expressions again as other rules could introduce new exprs.
self.rule_batches.push(RuleBatch::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ mod tests {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand All @@ -470,7 +469,6 @@ mod tests {
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
))
.arced()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ mod test {
source_info: Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
stats_state: StatsState::NotMaterialized,
})
Expand Down
Loading
Loading