diff --git a/src/daft-logical-plan/src/display.rs b/src/daft-logical-plan/src/display.rs index e56d6cd551..65cad9da37 100644 --- a/src/daft-logical-plan/src/display.rs +++ b/src/daft-logical-plan/src/display.rs @@ -123,7 +123,6 @@ On = col(id) Output schema = id#Int32, text#Utf8, id2#UInt64, first_name#Utf8, last_name#Utf8"] Filter4["Filter: col(id) == lit(1)"] Source5["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = text#Utf8, id#Int32"] Source5 --> Filter4 @@ -135,7 +134,6 @@ Limit9["Limit: 1000"] Filter10["Filter: startswith(col(last_name), lit('S')) & endswith(col(last_name), lit('n'))"] Source11["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = first_name#Utf8, last_name#Utf8, id#Int32"] Source11 --> Filter10 @@ -233,7 +231,6 @@ Project1 --> Limit0 let mermaid_repr = plan.repr_mermaid(opts); let expected = r#"subgraph optimized["Optimized Logical Plan"] optimizedSource0["PlaceHolder: -Source ID = 0 Num partitions = 0 Output schema = text#Utf8, id#Int32"] end diff --git a/src/daft-physical-plan/src/ops/placeholder.rs b/src/daft-physical-plan/src/ops/placeholder.rs index 14adfaae95..a5beabd225 100644 --- a/src/daft-physical-plan/src/ops/placeholder.rs +++ b/src/daft-physical-plan/src/ops/placeholder.rs @@ -15,9 +15,10 @@ impl PlaceholderScan { } pub fn multiline_display(&self) -> Vec { - let mut res = vec![]; - res.push("PlaceholderScan:".to_string()); - res + // let mut res = vec![]; + // res.push("PlaceholderScan:".to_string()); + // res + vec!["PlaceholderScan".to_string()] } pub fn clustering_spec(&self) -> &Arc { diff --git a/src/daft-physical-plan/src/physical_planner/planner.rs b/src/daft-physical-plan/src/physical_planner/planner.rs index 87844a7a52..2cbb82ffe5 100644 --- a/src/daft-physical-plan/src/physical_planner/planner.rs +++ b/src/daft-physical-plan/src/physical_planner/planner.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{cmp::Ordering, sync::Arc}; use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; @@ -109,19 +109,20 @@ impl TreeNodeRewriter for LogicalStageTranslator { let right_num_in_memory_children = num_in_memory_children(right.as_ref()); - if left_num_in_memory_children > right_num_in_memory_children { - RunNext::Left - } else if left_num_in_memory_children < right_num_in_memory_children { - RunNext::Right - } else { - // both sides are not in memory, so we should rank which side to run - let left_stats = left.approximate_stats(); - let right_stats = right.approximate_stats(); - - if left_stats.size_bytes <= right_stats.size_bytes { - RunNext::Left - } else { - RunNext::Right + // Bias towards the side that has more materialized children + match left_num_in_memory_children.cmp(&right_num_in_memory_children) { + Ordering::Greater => RunNext::Left, + Ordering::Less => RunNext::Right, + Ordering::Equal => { + // Both sides are not in memory, so we should rank which side to run + let left_stats = left.approximate_stats(); + let right_stats = right.approximate_stats(); + + if left_stats.size_bytes <= right_stats.size_bytes { + RunNext::Left + } else { + RunNext::Right + } } } } @@ -263,7 +264,7 @@ impl TreeNodeRewriter for PhysicalStageTranslator { ) { self.partial_physical_plan = Some(node.clone()); let placeholder = - PhysicalPlan::PlaceholderScan(PlaceholderScan::new(node.clustering_spec().clone())); + PhysicalPlan::PlaceholderScan(PlaceholderScan::new(node.clustering_spec())); return Ok(Transformed::new( placeholder.arced(), true, @@ -274,7 +275,7 @@ impl TreeNodeRewriter for PhysicalStageTranslator { // Otherwise, emit the child and add a placeholder as the child of the shuffle exchange let child = shuffle_exchange.input.clone(); let placeholder = - PhysicalPlan::PlaceholderScan(PlaceholderScan::new(child.clustering_spec().clone())); + PhysicalPlan::PlaceholderScan(PlaceholderScan::new(child.clustering_spec())); self.partial_physical_plan = Some(child); let node_with_placeholder_child = node.with_new_children(&[placeholder.arced()]); Ok(Transformed::new( @@ -302,7 +303,7 @@ impl TreeNodeRewriter for ReplacePhysicalPlaceholderWithMaterializedResults { let mat_results = self.mat_results.take().unwrap(); let new_source_node = PhysicalPlan::InMemoryScan(InMemoryScan::new( mat_results.in_memory_info.source_schema.clone(), - mat_results.in_memory_info.clone(), + mat_results.in_memory_info, ph_scan.clustering_spec().clone(), )); Ok(Transformed::new( @@ -363,11 +364,11 @@ impl AdaptivePlanner { assert!(physical_stage_translator.partial_physical_plan.is_some()); self.remaining_physical_plan = Some(result.data); let physical_plan = physical_stage_translator.partial_physical_plan.unwrap(); - return Ok(physical_plan); + Ok(physical_plan) } else { assert!(physical_stage_translator.partial_physical_plan.is_none()); let physical_plan = result.data; - return Ok(physical_plan); + Ok(physical_plan) } } @@ -422,9 +423,9 @@ impl AdaptivePlanner { "Emitting partial plan:\n {}", transformed_physical_plan.repr_ascii(true) ); - return Ok(QueryStageOutput::Partial { + Ok(QueryStageOutput::Partial { physical_plan: transformed_physical_plan, - }); + }) } else { log::info!("Logical plan translation complete"); if self.remaining_physical_plan.is_some() { @@ -433,18 +434,18 @@ impl AdaptivePlanner { transformed_physical_plan.repr_ascii(true) ); self.status = AdaptivePlannerStatus::WaitingForStats; - return Ok(QueryStageOutput::Partial { + Ok(QueryStageOutput::Partial { physical_plan: transformed_physical_plan, - }); + }) } else { log::info!( "Emitting final plan:\n {}", transformed_physical_plan.repr_ascii(true) ); self.status = AdaptivePlannerStatus::Done; - return Ok(QueryStageOutput::Final { + Ok(QueryStageOutput::Final { physical_plan: transformed_physical_plan, - }); + }) } } }