Skip to content

Commit

Permalink
feat(optimizer): optimizing vanilla min/max on index/pk (#8337)
Browse files Browse the repository at this point in the history
Signed-off-by: Clearlove <yifei.c.wei@gmail.com>
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
y-wei and BugenZhao authored Mar 6, 2023
1 parent 6e26cba commit f5e41a1
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 5 deletions.
18 changes: 17 additions & 1 deletion e2e_test/batch/aggregate/min_max.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(v1 smallint, v2 bigint, v3 real, v4 varchar)
create table t(v1 smallint primary key, v2 bigint, v3 real, v4 varchar)

statement ok
insert into t values (3, 4, 1.5, 'bar'), (2, 5, 2.5, 'ba')
Expand All @@ -17,5 +17,21 @@ select max(v1), max(v2), max(v3), max(v4) from t
----
3 5 2.5 bar

query I
select min(v1) from t
----
2

statement ok
create index idx on t(v2 desc)

statement ok
insert into t values (1, null, 3.5, 'null v2')

query I
select max(v2) from t
----
5

statement ok
drop table t
43 changes: 43 additions & 0 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,49 @@
└─LogicalAgg { group_key: [t.v2], aggs: [min(t.v1)] }
└─LogicalProject { exprs: [t.v2, t.v1] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
- name: min/max on index
sql: |
create table t (v1 varchar, v2 int);
create index idx on t(v2 desc);
select max(v2) from t;
logical_plan: |
LogicalProject { exprs: [max(t.v2)] }
└─LogicalAgg { aggs: [max(t.v2)] }
└─LogicalProject { exprs: [t.v2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
optimized_logical_plan_for_batch: |
LogicalAgg { aggs: [max(idx.v2)] }
└─LogicalLimit { limit: 1, offset: 0 }
└─LogicalFilter { predicate: IsNotNull(idx.v2) }
└─LogicalScan { table: idx, columns: [idx.v2] }
- name: min/max on index with group by, shall NOT optimize
sql: |
create table t (v1 int, v2 int);
create index idx on t(v2 desc);
select max(v2) from t group by v1;
logical_plan: |
LogicalProject { exprs: [max(t.v2)] }
└─LogicalAgg { group_key: [t.v1], aggs: [max(t.v2)] }
└─LogicalProject { exprs: [t.v1, t.v2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
optimized_logical_plan_for_batch: |
LogicalProject { exprs: [max(t.v2)] }
└─LogicalAgg { group_key: [t.v1], aggs: [max(t.v2)] }
└─LogicalScan { table: t, columns: [t.v1, t.v2] }
- name: min/max on primary key
sql: |
create table t (v1 int primary key);
select min(v1) from t;
logical_plan: |
LogicalProject { exprs: [min(t.v1)] }
└─LogicalAgg { aggs: [min(t.v1)] }
└─LogicalProject { exprs: [t.v1] }
└─LogicalScan { table: t, columns: [t.v1] }
optimized_logical_plan_for_batch: |
LogicalAgg { aggs: [min(t.v1)] }
└─LogicalLimit { limit: 1, offset: 0 }
└─LogicalFilter { predicate: IsNotNull(t.v1) }
└─LogicalScan { table: t, columns: [t.v1] }
- name: stddev_samp
sql: |
create table t (v1 int);
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ lazy_static! {
ApplyOrder::TopDown,
);

static ref AGG_ON_INDEX: OptimizationStage = OptimizationStage::new(
"Agg on Index",
vec![TopNOnIndexRule::create()],
static ref TOP_N_AGG_ON_INDEX: OptimizationStage = OptimizationStage::new(
"TopN/SimpleAgg on Index",
vec![TopNOnIndexRule::create(),
MinMaxOnIndexRule::create()],
ApplyOrder::TopDown,
);
}
Expand Down Expand Up @@ -470,7 +471,7 @@ impl LogicalOptimizer {

plan = plan.optimize_by_rules(&DEDUP_GROUP_KEYS);

plan = plan.optimize_by_rules(&AGG_ON_INDEX);
plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX);

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());
Expand Down
238 changes: 238 additions & 0 deletions src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::expr::AggKind;

use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalLimit, LogicalScan, PlanAggCall, PlanTreeNodeUnary,
};
use crate::optimizer::property::{Direction, FieldOrder, Order};
use crate::optimizer::PlanRef;
use crate::utils::Condition;

pub struct MinMaxOnIndexRule {}

impl Rule for MinMaxOnIndexRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let logical_agg: &LogicalAgg = plan.as_logical_agg()?;
if !logical_agg.group_key().is_empty() {
return None;
}
let calls = logical_agg.agg_calls();
if calls.is_empty() {
return None;
}
let first_call = calls.iter().exactly_one().ok()?;

if matches!(first_call.agg_kind, AggKind::Min | AggKind::Max)
&& !first_call.distinct
&& first_call.filter.always_true()
&& first_call.order_by_fields.is_empty()
{
let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned();
let kind = calls.first()?.agg_kind;
if !logical_scan.predicate().always_true() {
return None;
}
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let order = Order {
field_order: vec![FieldOrder {
index: calls.first()?.inputs.first()?.index(),
direct: if kind == AggKind::Min {
Direction::Asc
} else {
Direction::Desc
},
}],
};
if let Some(p) =
self.try_on_index(logical_agg, logical_scan.clone(), &order, &output_col_map)
{
Some(p)
} else {
self.try_on_pk(logical_agg, logical_scan, &order, &output_col_map)
}
} else {
None
}
}
}

impl MinMaxOnIndexRule {
pub fn create() -> BoxedRule {
Box::new(MinMaxOnIndexRule {})
}

fn try_on_index(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
) -> Option<PlanRef> {
let unmatched_idx = output_col_map.len();
let index = logical_scan.indexes().iter().find(|idx| {
let s2p_mapping = idx.secondary_to_primary_mapping();
Order {
field_order: idx
.index_table
.pk()
.iter()
.map(|idx_item| FieldOrder {
index: *output_col_map
.get(
s2p_mapping
.get(&idx_item.index)
.expect("should be in s2p mapping"),
)
.unwrap_or(&unmatched_idx),
direct: idx_item.direct,
})
.collect(),
}
.satisfies(order)
})?;

let p2s_mapping = index.primary_to_secondary_mapping();

let index_scan = if logical_scan
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
Some(logical_scan.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
))
} else {
None
}?;

let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by_fields: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

Some(formatting_agg.into())
}

fn try_on_pk(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
) -> Option<PlanRef> {
let unmatched_idx = output_col_map.len();
let primary_key = logical_scan.primary_key();
let primary_key_order = Order {
field_order: primary_key
.into_iter()
.map(|op| FieldOrder {
index: *output_col_map.get(&op.column_idx).unwrap_or(&unmatched_idx),
direct: if op.order_type == OrderType::Ascending {
Direction::Asc
} else {
Direction::Desc
},
})
.collect::<Vec<_>>(),
};
if primary_key_order.satisfies(order) {
let non_null_filter = LogicalFilter::create_with_expr(
logical_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by_fields: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

Some(formatting_agg.into())
} else {
None
}
}
}
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ mod rewrite_like_expr_rule;
pub use rewrite_like_expr_rule::*;
mod avoid_exchange_share_rule;
pub use avoid_exchange_share_rule::*;
mod min_max_on_index_rule;
pub use min_max_on_index_rule::*;

#[macro_export]
macro_rules! for_all_rules {
Expand Down Expand Up @@ -130,6 +132,7 @@ macro_rules! for_all_rules {
, { UnionInputValuesMergeRule }
, { RewriteLikeExprRule }
, { AvoidExchangeShareRule }
, { MinMaxOnIndexRule }
}
};
}
Expand Down

0 comments on commit f5e41a1

Please sign in to comment.