Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PartiallyOrdered handling from BoundedWindowAggExec #11

Open
wants to merge 24 commits into
base: apache_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d022cf3
Remove partially ordered execution from bounded windowExec
mustafasrepo Mar 13, 2024
fe06250
Resolve linter error
mustafasrepo Mar 13, 2024
9d103bc
Remove unnecessary code
mustafasrepo Mar 13, 2024
f0bed47
Aggregate starting code.
mustafasrepo Mar 13, 2024
f4583f2
Add aggregate handling
mustafasrepo Mar 13, 2024
7bfe7ea
Add aggregate tests
mustafasrepo Mar 13, 2024
d9df3a0
Add aggregate literal ignore
mustafasrepo Mar 14, 2024
4d798f3
Add partiallysorted check to the group ordering.
mustafasrepo Mar 14, 2024
e5880dc
Update tests to reflect new behaviour
mustafasrepo Mar 14, 2024
ece6418
Resolve linter errors, Remove unnecessary code.
mustafasrepo Mar 14, 2024
5ec1067
Merge branch 'apache_main' into feature/window_partiall_ordered_remove
mustafasrepo Mar 14, 2024
3523419
tmp
mustafasrepo Mar 14, 2024
f356f1a
Move impls to single rule
mustafasrepo Mar 14, 2024
d1a5d6d
Simplifications
mustafasrepo Mar 14, 2024
b494cab
Add invalidated ordering handling
mustafasrepo Mar 14, 2024
4308adf
Minor changes
mustafasrepo Mar 14, 2024
1e891fb
Resolve linter errors. Remove constants during input order mode analysis
mustafasrepo Mar 14, 2024
8f5ef80
Simplifications
mustafasrepo Mar 14, 2024
0981c2f
Bring back aggregate partial sort support
mustafasrepo Mar 18, 2024
e9e56de
Fix tests
mustafasrepo Mar 18, 2024
6492611
Merge branch 'apache_main' into feature/window_partiall_ordered_remove
mustafasrepo Mar 18, 2024
d780937
Resolve linter errors
mustafasrepo Mar 18, 2024
859da28
Minor changes
mustafasrepo Mar 18, 2024
e8a4aaa
Minor changes
mustafasrepo Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 166 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ use crate::physical_plan::windows::{
};
use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use arrow_schema::SortOptions;
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_common::utils::{get_at_indices, set_difference};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{
LexOrdering, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::windows::get_window_mode;
use datafusion_physical_plan::ExecutionPlanProperties;

use itertools::izip;
Expand Down Expand Up @@ -184,9 +190,16 @@ impl PhysicalOptimizerRule for EnforceSorting {
})
.data()?;

let plan = updated_plan
.plan
.transform_up(&|plan| {
Ok(Transformed::yes(replace_partial_mode_with_full_mode(plan)?))
})
.data()?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
let mut sort_pushdown = SortPushDown::new_default(plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?.data;

Expand Down Expand Up @@ -241,6 +254,157 @@ fn replace_with_partial_sort(
Ok(plan)
}

fn add_sort_on_top(
mut plan: Arc<dyn ExecutionPlan>,
mut sort_exprs: LexOrdering,
) -> Arc<dyn ExecutionPlan> {
sort_exprs.retain(|sort_expr| {
!plan
.equivalence_properties()
.is_expr_constant(&sort_expr.expr)
});
if plan.equivalence_properties().ordering_satisfy(&sort_exprs) {
// Requirement already satisfied, return existing plan without modification.
return plan;
}
while let Some(sort) = plan.as_any().downcast_ref::<SortExec>() {
plan = sort.input().clone();
}
let preserve_partitioning = plan.output_partitioning().partition_count() > 1;
Arc::new(
SortExec::new(sort_exprs, plan).with_preserve_partitioning(preserve_partitioning),
)
}

fn replace_partial_mode_with_full_mode(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(window) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
// If None, return original plan as is
Ok(replace_mode_of_window(window)?.unwrap_or(plan))
} else {
let required_orderings = plan.required_input_ordering();
let children = plan.children();
let mut child_modified = false;
let new_children = izip!(children, required_orderings)
.map(|(child, reqs)| {
let reqs = reqs.unwrap_or_default();
if !child
.equivalence_properties()
.ordering_satisfy_requirement(&reqs)
{
let sort_exprs = PhysicalSortRequirement::to_sort_exprs(reqs);
child_modified = true;
add_sort_on_top(child, sort_exprs)
} else {
child
}
})
.collect::<Vec<_>>();
if child_modified {
// Since at least one of the children changes, update plan.
plan.with_new_children(new_children)
} else {
Ok(plan)
}
}
}

fn replace_mode_of_window(
window: &BoundedWindowAggExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut input_order_mode = InputOrderMode::Sorted;
let mut ordering_satisfied = false;
let mut window_exprs = window.window_expr().to_vec();
let partition_keys = window.partition_keys.to_vec();
let partition_by_exprs = window.window_expr()[0].partition_by();
let order_by_exprs = window.window_expr()[0].order_by();
if let Some((should_reverse, mode)) =
get_window_mode(partition_by_exprs, order_by_exprs, window.input())
{
if should_reverse {
if let Some(reversed_window_expr) = window
.window_expr()
.iter()
.map(|e| e.get_reverse_expr())
.collect::<Option<Vec<_>>>()
{
ordering_satisfied = true;
window_exprs = reversed_window_expr;
input_order_mode = mode;
}
} else {
ordering_satisfied = true;
input_order_mode = mode;
}
}

if !ordering_satisfied {
let child = window.input().clone();
return contruct_window(window_exprs, child, partition_keys).map(Some);
}

match input_order_mode {
InputOrderMode::Sorted | InputOrderMode::Linear => Ok(None),
InputOrderMode::PartiallySorted(_) => {
let child = window.input().clone();
contruct_window(window_exprs, child, partition_keys).map(Some)
}
}
}

fn contruct_window(
window_exprs: Vec<Arc<dyn WindowExpr>>,
child: Arc<dyn ExecutionPlan>,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let partition_by_exprs = window_exprs[0].partition_by();
let order_by_exprs = window_exprs[0].order_by();

let (mut ordering, pb_ordered_indices) = child
.equivalence_properties()
.find_longest_permutation(partition_by_exprs);

let mode = if !pb_ordered_indices.is_empty() || partition_by_exprs.is_empty() {
let all_indices = (0..partition_by_exprs.len()).collect::<Vec<_>>();
let missing_indices = set_difference(all_indices, &pb_ordered_indices);
let missing_partition_bys = get_at_indices(partition_by_exprs, missing_indices)?;
ordering.extend(with_default_ordering(missing_partition_bys));
ordering.extend_from_slice(order_by_exprs);
InputOrderMode::Sorted
} else if !order_by_exprs.is_empty()
&& child
.equivalence_properties()
.ordering_satisfy(&order_by_exprs[0..1])
{
ordering.extend_from_slice(order_by_exprs);
InputOrderMode::Linear
} else {
assert!(ordering.is_empty());
ordering.extend(with_default_ordering(partition_by_exprs.to_vec()));
ordering.extend_from_slice(order_by_exprs);
InputOrderMode::Sorted
};

let new_child = add_sort_on_top(child, ordering);
Ok(Arc::new(BoundedWindowAggExec::try_new(
window_exprs,
new_child,
partition_keys,
mode,
)?))
}

fn with_default_ordering(exprs: Vec<Arc<dyn PhysicalExpr>>) -> Vec<PhysicalSortExpr> {
exprs
.into_iter()
.map(|expr| PhysicalSortExpr {
expr,
options: SortOptions::default(),
})
.collect()
}

/// This function turns plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
Expand Down
13 changes: 1 addition & 12 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};
use datafusion::physical_plan::InputOrderMode::{Linear, Sorted};
use datafusion::physical_plan::{collect, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
Expand Down Expand Up @@ -97,17 +97,6 @@ async fn window_bounded_window_random_comparison() -> Result<()> {
(vec!["c", "b"], vec!["a", "b"], Linear),
(vec!["c", "b"], vec!["a", "c"], Linear),
(vec!["c", "b"], vec!["a", "b", "c"], Linear),
(vec!["c", "a"], vec!["a"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b", "c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "c"], PartiallySorted(vec![1])),
(
vec!["c", "a"],
vec!["a", "b", "c"],
PartiallySorted(vec![1]),
),
(vec!["c", "b", "a"], vec!["a"], Sorted),
(vec!["c", "b", "a"], vec!["b"], Sorted),
(vec!["c", "b", "a"], vec!["c"], Sorted),
Expand Down
41 changes: 8 additions & 33 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ impl BoundedWindowAggExec {
input_schema,
})
}
InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => Box::new(
LinearSearch::new(ordered_partition_by_indices, input_schema),
),
InputOrderMode::PartiallySorted(_) => {
return exec_err!("BoundedWindowAggExec cannot work in InputOrderMode::PartiallySorted mode.");
}
InputOrderMode::Linear => Box::new(LinearSearch::new(input_schema)),
})
}

Expand Down Expand Up @@ -435,11 +436,6 @@ pub struct LinearSearch {
input_buffer_hashes: VecDeque<u64>,
/// Used during hash value calculation.
random_state: RandomState,
/// Input ordering and partition by key ordering need not be the same, so
/// this vector stores the mapping between them. For instance, if the input
/// is ordered by a, b and the window expression contains a PARTITION BY b, a
/// clause, this attribute stores [1, 0].
ordered_partition_by_indices: Vec<usize>,
/// We use this [`RawTable`] to calculate unique partitions for each new
/// RecordBatch. First entry in the tuple is the hash value, the second
/// entry is the unique ID for each partition (increments from 0 to n).
Expand Down Expand Up @@ -566,32 +562,12 @@ impl PartitionSearcher for LinearSearch {
self.input_buffer_hashes.drain(0..n_out);
}

fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches) {
// We should be in the `PartiallySorted` case, otherwise we can not
// tell when we are at the end of a given partition.
if !self.ordered_partition_by_indices.is_empty() {
if let Some((last_row, _)) = partition_buffers.last() {
let last_sorted_cols = self
.ordered_partition_by_indices
.iter()
.map(|idx| last_row[*idx].clone())
.collect::<Vec<_>>();
for (row, partition_batch_state) in partition_buffers.iter_mut() {
let sorted_cols = self
.ordered_partition_by_indices
.iter()
.map(|idx| &row[*idx]);
// All the partitions other than `last_sorted_cols` are done.
// We are sure that we will no longer receive values for these
// partitions (arrival of a new value would violate ordering).
partition_batch_state.is_end = !sorted_cols.eq(&last_sorted_cols);
}
}
}
fn mark_partition_end(&self, _partition_buffers: &mut PartitionBatches) {
// In Linear mode, We cannot mark a partition as ended
}

fn is_mode_linear(&self) -> bool {
self.ordered_partition_by_indices.is_empty()
true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

}

fn input_schema(&self) -> &SchemaRef {
Expand All @@ -601,11 +577,10 @@ impl PartitionSearcher for LinearSearch {

impl LinearSearch {
/// Initialize a new [`LinearSearch`] partition searcher.
fn new(ordered_partition_by_indices: Vec<usize>, input_schema: SchemaRef) -> Self {
fn new(input_schema: SchemaRef) -> Self {
LinearSearch {
input_buffer_hashes: VecDeque::new(),
random_state: Default::default(),
ordered_partition_by_indices,
row_map_batch: RawTable::with_capacity(256),
row_map_out: RawTable::with_capacity(256),
input_schema,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ FROM t1
----
11 11 11

# subsequent inner join
# subsequent inner join
query III rowsort
SELECT t1.t1_id, t2.t2_id, t3.t3_id
FROM t1
Expand Down
Loading
Loading