diff --git a/e2e_test/batch/aggregate/min_max.slt.part b/e2e_test/batch/aggregate/min_max.slt.part index 7687066b9ee76..b144d55ef987b 100644 --- a/e2e_test/batch/aggregate/min_max.slt.part +++ b/e2e_test/batch/aggregate/min_max.slt.part @@ -2,7 +2,7 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; statement ok -create table t(v1 smallint, v2 bigint, v3 real, v4 varchar) +create table t(v1 smallint primary key, v2 bigint, v3 real, v4 varchar) statement ok insert into t values (3, 4, 1.5, 'bar'), (2, 5, 2.5, 'ba') @@ -17,5 +17,21 @@ select max(v1), max(v2), max(v3), max(v4) from t ---- 3 5 2.5 bar +query I +select min(v1) from t +---- +2 + +statement ok +create index idx on t(v2 desc) + +statement ok +insert into t values (1, null, 3.5, 'null v2') + +query I +select max(v2) from t +---- +5 + statement ok drop table t diff --git a/src/frontend/planner_test/tests/testdata/agg.yaml b/src/frontend/planner_test/tests/testdata/agg.yaml index 2a899f5799d59..d833613807caa 100644 --- a/src/frontend/planner_test/tests/testdata/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/agg.yaml @@ -1006,6 +1006,49 @@ └─LogicalAgg { group_key: [t.v2], aggs: [min(t.v1)] } └─LogicalProject { exprs: [t.v2, t.v1] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } +- name: min/max on index + sql: | + create table t (v1 varchar, v2 int); + create index idx on t(v2 desc); + select max(v2) from t; + logical_plan: | + LogicalProject { exprs: [max(t.v2)] } + └─LogicalAgg { aggs: [max(t.v2)] } + └─LogicalProject { exprs: [t.v2] } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + optimized_logical_plan_for_batch: | + LogicalAgg { aggs: [max(idx.v2)] } + └─LogicalLimit { limit: 1, offset: 0 } + └─LogicalFilter { predicate: IsNotNull(idx.v2) } + └─LogicalScan { table: idx, columns: [idx.v2] } +- name: min/max on index with group by, shall NOT optimize + sql: | + create table t (v1 int, v2 int); + create index idx on t(v2 desc); + select max(v2) from t group by v1; + logical_plan: | + LogicalProject { exprs: [max(t.v2)] } + └─LogicalAgg { group_key: [t.v1], aggs: [max(t.v2)] } + └─LogicalProject { exprs: [t.v1, t.v2] } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + optimized_logical_plan_for_batch: | + LogicalProject { exprs: [max(t.v2)] } + └─LogicalAgg { group_key: [t.v1], aggs: [max(t.v2)] } + └─LogicalScan { table: t, columns: [t.v1, t.v2] } +- name: min/max on primary key + sql: | + create table t (v1 int primary key); + select min(v1) from t; + logical_plan: | + LogicalProject { exprs: [min(t.v1)] } + └─LogicalAgg { aggs: [min(t.v1)] } + └─LogicalProject { exprs: [t.v1] } + └─LogicalScan { table: t, columns: [t.v1] } + optimized_logical_plan_for_batch: | + LogicalAgg { aggs: [min(t.v1)] } + └─LogicalLimit { limit: 1, offset: 0 } + └─LogicalFilter { predicate: IsNotNull(t.v1) } + └─LogicalScan { table: t, columns: [t.v1] } - name: stddev_samp sql: | create table t (v1 int); diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index ba9489c2b68ac..27506513c332c 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -236,9 +236,10 @@ lazy_static! { ApplyOrder::TopDown, ); - static ref AGG_ON_INDEX: OptimizationStage = OptimizationStage::new( - "Agg on Index", - vec![TopNOnIndexRule::create()], + static ref TOP_N_AGG_ON_INDEX: OptimizationStage = OptimizationStage::new( + "TopN/SimpleAgg on Index", + vec![TopNOnIndexRule::create(), + MinMaxOnIndexRule::create()], ApplyOrder::TopDown, ); } @@ -470,7 +471,7 @@ impl LogicalOptimizer { plan = plan.optimize_by_rules(&DEDUP_GROUP_KEYS); - plan = plan.optimize_by_rules(&AGG_ON_INDEX); + plan = plan.optimize_by_rules(&TOP_N_AGG_ON_INDEX); #[cfg(debug_assertions)] InputRefValidator.validate(plan.clone()); diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs new file mode 100644 index 0000000000000..02819856c0a72 --- /dev/null +++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs @@ -0,0 +1,238 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +use std::collections::BTreeMap; + +use itertools::Itertools; +use risingwave_common::types::DataType; +use risingwave_common::util::sort_util::OrderType; +use risingwave_expr::expr::AggKind; + +use super::{BoxedRule, Rule}; +use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::{ + LogicalAgg, LogicalFilter, LogicalLimit, LogicalScan, PlanAggCall, PlanTreeNodeUnary, +}; +use crate::optimizer::property::{Direction, FieldOrder, Order}; +use crate::optimizer::PlanRef; +use crate::utils::Condition; + +pub struct MinMaxOnIndexRule {} + +impl Rule for MinMaxOnIndexRule { + fn apply(&self, plan: PlanRef) -> Option { + let logical_agg: &LogicalAgg = plan.as_logical_agg()?; + if !logical_agg.group_key().is_empty() { + return None; + } + let calls = logical_agg.agg_calls(); + if calls.is_empty() { + return None; + } + let first_call = calls.iter().exactly_one().ok()?; + + if matches!(first_call.agg_kind, AggKind::Min | AggKind::Max) + && !first_call.distinct + && first_call.filter.always_true() + && first_call.order_by_fields.is_empty() + { + let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned(); + let kind = calls.first()?.agg_kind; + if !logical_scan.predicate().always_true() { + return None; + } + let output_col_map = logical_scan + .output_col_idx() + .iter() + .cloned() + .enumerate() + .map(|(id, col)| (col, id)) + .collect::>(); + let order = Order { + field_order: vec![FieldOrder { + index: calls.first()?.inputs.first()?.index(), + direct: if kind == AggKind::Min { + Direction::Asc + } else { + Direction::Desc + }, + }], + }; + if let Some(p) = + self.try_on_index(logical_agg, logical_scan.clone(), &order, &output_col_map) + { + Some(p) + } else { + self.try_on_pk(logical_agg, logical_scan, &order, &output_col_map) + } + } else { + None + } + } +} + +impl MinMaxOnIndexRule { + pub fn create() -> BoxedRule { + Box::new(MinMaxOnIndexRule {}) + } + + fn try_on_index( + &self, + logical_agg: &LogicalAgg, + logical_scan: LogicalScan, + order: &Order, + output_col_map: &BTreeMap, + ) -> Option { + let unmatched_idx = output_col_map.len(); + let index = logical_scan.indexes().iter().find(|idx| { + let s2p_mapping = idx.secondary_to_primary_mapping(); + Order { + field_order: idx + .index_table + .pk() + .iter() + .map(|idx_item| FieldOrder { + index: *output_col_map + .get( + s2p_mapping + .get(&idx_item.index) + .expect("should be in s2p mapping"), + ) + .unwrap_or(&unmatched_idx), + direct: idx_item.direct, + }) + .collect(), + } + .satisfies(order) + })?; + + let p2s_mapping = index.primary_to_secondary_mapping(); + + let index_scan = if logical_scan + .required_col_idx() + .iter() + .all(|x| p2s_mapping.contains_key(x)) + { + Some(logical_scan.to_index_scan( + &index.name, + index.index_table.table_desc().into(), + p2s_mapping, + )) + } else { + None + }?; + + let non_null_filter = LogicalFilter::create_with_expr( + index_scan.into(), + FunctionCall::new_unchecked( + ExprType::IsNotNull, + vec![ExprImpl::InputRef(Box::new(InputRef::new( + 0, + logical_agg.schema().fields[0].data_type.clone(), + )))], + DataType::Boolean, + ) + .into(), + ); + + let limit = LogicalLimit::create(non_null_filter, 1, 0); + + let formatting_agg = LogicalAgg::new( + vec![PlanAggCall { + agg_kind: logical_agg.agg_calls().first()?.agg_kind, + return_type: logical_agg.schema().fields[0].data_type.clone(), + inputs: vec![InputRef::new( + 0, + logical_agg.schema().fields[0].data_type.clone(), + )], + order_by_fields: vec![], + distinct: false, + filter: Condition { + conjunctions: vec![], + }, + }], + vec![], + limit, + ); + + Some(formatting_agg.into()) + } + + fn try_on_pk( + &self, + logical_agg: &LogicalAgg, + logical_scan: LogicalScan, + order: &Order, + output_col_map: &BTreeMap, + ) -> Option { + let unmatched_idx = output_col_map.len(); + let primary_key = logical_scan.primary_key(); + let primary_key_order = Order { + field_order: primary_key + .into_iter() + .map(|op| FieldOrder { + index: *output_col_map.get(&op.column_idx).unwrap_or(&unmatched_idx), + direct: if op.order_type == OrderType::Ascending { + Direction::Asc + } else { + Direction::Desc + }, + }) + .collect::>(), + }; + if primary_key_order.satisfies(order) { + let non_null_filter = LogicalFilter::create_with_expr( + logical_scan.into(), + FunctionCall::new_unchecked( + ExprType::IsNotNull, + vec![ExprImpl::InputRef(Box::new(InputRef::new( + 0, + logical_agg.schema().fields[0].data_type.clone(), + )))], + DataType::Boolean, + ) + .into(), + ); + + let limit = LogicalLimit::create(non_null_filter, 1, 0); + + let formatting_agg = LogicalAgg::new( + vec![PlanAggCall { + agg_kind: logical_agg.agg_calls().first()?.agg_kind, + return_type: logical_agg.schema().fields[0].data_type.clone(), + inputs: vec![InputRef::new( + 0, + logical_agg.schema().fields[0].data_type.clone(), + )], + order_by_fields: vec![], + distinct: false, + filter: Condition { + conjunctions: vec![], + }, + }], + vec![], + limit, + ); + + Some(formatting_agg.into()) + } else { + None + } + } +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index bdafbde1c578a..78c0fc15f93bc 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -93,6 +93,8 @@ mod rewrite_like_expr_rule; pub use rewrite_like_expr_rule::*; mod avoid_exchange_share_rule; pub use avoid_exchange_share_rule::*; +mod min_max_on_index_rule; +pub use min_max_on_index_rule::*; #[macro_export] macro_rules! for_all_rules { @@ -130,6 +132,7 @@ macro_rules! for_all_rules { , { UnionInputValuesMergeRule } , { RewriteLikeExprRule } , { AvoidExchangeShareRule } + , { MinMaxOnIndexRule } } }; }