Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho committed Feb 14, 2025
1 parent 4471680 commit 0f51cc7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
3 changes: 0 additions & 3 deletions src/daft-logical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ On = col(id)
Output schema = id#Int32, text#Utf8, id2#UInt64, first_name#Utf8, last_name#Utf8"]
Filter4["Filter: col(id) == lit(1)"]
Source5["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
Source5 --> Filter4
Expand All @@ -135,7 +134,6 @@ Limit9["Limit: 1000"]
Filter10["Filter: startswith(col(last_name), lit('S')) & endswith(col(last_name),
lit('n'))"]
Source11["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = first_name#Utf8, last_name#Utf8, id#Int32"]
Source11 --> Filter10
Expand Down Expand Up @@ -233,7 +231,6 @@ Project1 --> Limit0
let mermaid_repr = plan.repr_mermaid(opts);
let expected = r#"subgraph optimized["Optimized Logical Plan"]
optimizedSource0["PlaceHolder:
Source ID = 0
Num partitions = 0
Output schema = text#Utf8, id#Int32"]
end
Expand Down
7 changes: 4 additions & 3 deletions src/daft-physical-plan/src/ops/placeholder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ impl PlaceholderScan {
}

Check warning on line 15 in src/daft-physical-plan/src/ops/placeholder.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/placeholder.rs#L13-L15

Added lines #L13 - L15 were not covered by tests

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push("PlaceholderScan:".to_string());
res
// let mut res = vec![];
// res.push("PlaceholderScan:".to_string());
// res
vec!["PlaceholderScan".to_string()]
}

Check warning on line 22 in src/daft-physical-plan/src/ops/placeholder.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/placeholder.rs#L17-L22

Added lines #L17 - L22 were not covered by tests

pub fn clustering_spec(&self) -> &Arc<ClusteringSpec> {
Expand Down
51 changes: 26 additions & 25 deletions src/daft-physical-plan/src/physical_planner/planner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{cmp::Ordering, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
Expand Down Expand Up @@ -109,19 +109,20 @@ impl TreeNodeRewriter for LogicalStageTranslator {
let right_num_in_memory_children =
num_in_memory_children(right.as_ref());

if left_num_in_memory_children > right_num_in_memory_children {
RunNext::Left
} else if left_num_in_memory_children < right_num_in_memory_children {
RunNext::Right
} else {
// both sides are not in memory, so we should rank which side to run
let left_stats = left.approximate_stats();
let right_stats = right.approximate_stats();

if left_stats.size_bytes <= right_stats.size_bytes {
RunNext::Left
} else {
RunNext::Right
// Bias towards the side that has more materialized children
match left_num_in_memory_children.cmp(&right_num_in_memory_children) {
Ordering::Greater => RunNext::Left,
Ordering::Less => RunNext::Right,

Check warning on line 115 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L108-L115

Added lines #L108 - L115 were not covered by tests
Ordering::Equal => {
// Both sides are not in memory, so we should rank which side to run
let left_stats = left.approximate_stats();
let right_stats = right.approximate_stats();

if left_stats.size_bytes <= right_stats.size_bytes {
RunNext::Left

Check warning on line 122 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L118-L122

Added lines #L118 - L122 were not covered by tests
} else {
RunNext::Right

Check warning on line 124 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L124

Added line #L124 was not covered by tests
}
}
}
}
Expand Down Expand Up @@ -263,7 +264,7 @@ impl TreeNodeRewriter for PhysicalStageTranslator {
) {
self.partial_physical_plan = Some(node.clone());
let placeholder =
PhysicalPlan::PlaceholderScan(PlaceholderScan::new(node.clustering_spec().clone()));
PhysicalPlan::PlaceholderScan(PlaceholderScan::new(node.clustering_spec()));
return Ok(Transformed::new(
placeholder.arced(),
true,
Expand All @@ -274,7 +275,7 @@ impl TreeNodeRewriter for PhysicalStageTranslator {
// Otherwise, emit the child and add a placeholder as the child of the shuffle exchange
let child = shuffle_exchange.input.clone();
let placeholder =
PhysicalPlan::PlaceholderScan(PlaceholderScan::new(child.clustering_spec().clone()));
PhysicalPlan::PlaceholderScan(PlaceholderScan::new(child.clustering_spec()));
self.partial_physical_plan = Some(child);
let node_with_placeholder_child = node.with_new_children(&[placeholder.arced()]);
Ok(Transformed::new(
Expand Down Expand Up @@ -302,7 +303,7 @@ impl TreeNodeRewriter for ReplacePhysicalPlaceholderWithMaterializedResults {
let mat_results = self.mat_results.take().unwrap();
let new_source_node = PhysicalPlan::InMemoryScan(InMemoryScan::new(
mat_results.in_memory_info.source_schema.clone(),
mat_results.in_memory_info.clone(),
mat_results.in_memory_info,
ph_scan.clustering_spec().clone(),
));
Ok(Transformed::new(
Expand Down Expand Up @@ -363,11 +364,11 @@ impl AdaptivePlanner {
assert!(physical_stage_translator.partial_physical_plan.is_some());
self.remaining_physical_plan = Some(result.data);
let physical_plan = physical_stage_translator.partial_physical_plan.unwrap();
return Ok(physical_plan);
Ok(physical_plan)

Check warning on line 367 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L354-L367

Added lines #L354 - L367 were not covered by tests
} else {
assert!(physical_stage_translator.partial_physical_plan.is_none());
let physical_plan = result.data;
return Ok(physical_plan);
Ok(physical_plan)

Check warning on line 371 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L369-L371

Added lines #L369 - L371 were not covered by tests
}
}

Check warning on line 373 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L373

Added line #L373 was not covered by tests

Expand Down Expand Up @@ -422,9 +423,9 @@ impl AdaptivePlanner {
"Emitting partial plan:\n {}",
transformed_physical_plan.repr_ascii(true)

Check warning on line 424 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L423-L424

Added lines #L423 - L424 were not covered by tests
);
return Ok(QueryStageOutput::Partial {
Ok(QueryStageOutput::Partial {
physical_plan: transformed_physical_plan,

Check warning on line 427 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L427

Added line #L427 was not covered by tests
});
})
} else {
log::info!("Logical plan translation complete");
if self.remaining_physical_plan.is_some() {
Expand All @@ -433,18 +434,18 @@ impl AdaptivePlanner {
transformed_physical_plan.repr_ascii(true)

Check warning on line 434 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L430-L434

Added lines #L430 - L434 were not covered by tests
);
self.status = AdaptivePlannerStatus::WaitingForStats;
return Ok(QueryStageOutput::Partial {
Ok(QueryStageOutput::Partial {
physical_plan: transformed_physical_plan,
});
})

Check warning on line 439 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L436-L439

Added lines #L436 - L439 were not covered by tests
} else {
log::info!(
"Emitting final plan:\n {}",
transformed_physical_plan.repr_ascii(true)

Check warning on line 443 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L441-L443

Added lines #L441 - L443 were not covered by tests
);
self.status = AdaptivePlannerStatus::Done;
return Ok(QueryStageOutput::Final {
Ok(QueryStageOutput::Final {
physical_plan: transformed_physical_plan,
});
})

Check warning on line 448 in src/daft-physical-plan/src/physical_planner/planner.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/physical_planner/planner.rs#L445-L448

Added lines #L445 - L448 were not covered by tests
}
}
}
Expand Down

0 comments on commit 0f51cc7

Please sign in to comment.