diff --git a/integration_tests/cases/common/dml/issue-341.result b/integration_tests/cases/common/dml/issue-341.result new file mode 100644 index 0000000000..1882431286 --- /dev/null +++ b/integration_tests/cases/common/dml/issue-341.result @@ -0,0 +1,138 @@ +DROP TABLE IF EXISTS `issue341_t1`; + +affected_rows: 0 + +DROP TABLE IF EXISTS `issue341_t2`; + +affected_rows: 0 + +CREATE TABLE `issue341_t1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `tag1` string tag, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='append' +); + +affected_rows: 0 + +INSERT INTO `issue341_t1` (`timestamp`, `value`, `tag1`) + VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3"); + +affected_rows: 3 + +SELECT + `timestamp`, + `value` +FROM + `issue341_t1`; + +timestamp,value, +Timestamp(1),Int32(1), +Timestamp(3),Int32(3), +Timestamp(2),Int32(2), + + +SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + `value` = 3; + +timestamp,value, +Timestamp(3),Int32(3), + + +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + `value` = 3; + +plan_type,plan, +String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"), + + +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + tag1 = "t3"; + +plan_type,plan, +String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"), + + +CREATE TABLE `issue341_t2` ( + `timestamp` timestamp NOT NULL, + `value` double, + `tag1` string tag, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='overwrite' +); + +affected_rows: 0 + +INSERT INTO `issue341_t2` (`timestamp`, `value`, `tag1`) + VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3"); + +affected_rows: 3 + +SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + `value` = 3; + +timestamp,value, +Timestamp(3),Double(3.0), + + +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + `value` = 3; + +plan_type,plan, +String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"), + + +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + tag1 = "t3"; + +plan_type,plan, +String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8\n"), + + +DROP TABLE IF EXISTS `issue341_t1`; + +affected_rows: 0 + +DROP TABLE IF EXISTS `issue341_t2`; + +affected_rows: 0 + diff --git a/integration_tests/cases/common/dml/issue-341.sql b/integration_tests/cases/common/dml/issue-341.sql new file mode 100644 index 0000000000..fa8ee8fa82 --- /dev/null +++ b/integration_tests/cases/common/dml/issue-341.sql @@ -0,0 +1,92 @@ + +DROP TABLE IF EXISTS `issue341_t1`; +DROP TABLE IF EXISTS `issue341_t2`; + +CREATE TABLE `issue341_t1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `tag1` string tag, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='append' +); + +INSERT INTO `issue341_t1` (`timestamp`, `value`, `tag1`) + VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3"); + +SELECT + `timestamp`, + `value` +FROM + `issue341_t1`; + +SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + `value` = 3; + +-- FilterExec node should not be in plan. +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + `value` = 3; + +-- FilterExec node should not be in plan. +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t1` +WHERE + tag1 = "t3"; + +-- Repeat operations above, but with overwrite table + +CREATE TABLE `issue341_t2` ( + `timestamp` timestamp NOT NULL, + `value` double, + `tag1` string tag, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode='overwrite' +); + +INSERT INTO `issue341_t2` (`timestamp`, `value`, `tag1`) + VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3"); + +SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + `value` = 3; + +-- FilterExec node should be in plan. +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + `value` = 3; + +-- When using tag as filter, FilterExec node should not be in plan. +EXPLAIN SELECT + `timestamp`, + `value` +FROM + `issue341_t2` +WHERE + tag1 = "t3"; + +DROP TABLE IF EXISTS `issue341_t1`; +DROP TABLE IF EXISTS `issue341_t2`; diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 8142c0e3f6..e0f3a586dd 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -23,7 +23,9 @@ use std::{ use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; +use common_types::{ + projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, UPDATE_MODE, +}; use datafusion::{ config::{ConfigEntry, ConfigExtension, ExtensionOptions}, datasource::TableProvider, @@ -184,12 +186,12 @@ impl TableProviderAdapter { } fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef { - let unique_keys = self.read_schema.unique_keys(); - - let push_down_filters = filters + let pushdown_states = self.pushdown_inner(&filters.iter().collect::>()); + let pushdown_filters = filters .iter() - .filter_map(|filter| { - if Self::only_filter_unique_key_columns(filter, &unique_keys) { + .zip(pushdown_states.iter()) + .filter_map(|(filter, state)| { + if matches!(state, &TableProviderFilterPushDown::Exact) { Some(filter.clone()) } else { None @@ -198,8 +200,8 @@ impl TableProviderAdapter { .collect::>(); PredicateBuilder::default() - .add_pushdown_exprs(&push_down_filters) - .extract_time_range(&self.read_schema, &push_down_filters) + .add_pushdown_exprs(&pushdown_filters) + .extract_time_range(&self.read_schema, filters) .build() } @@ -214,6 +216,29 @@ impl TableProviderAdapter { } true } + + fn pushdown_inner(&self, filters: &[&Expr]) -> Vec { + let unique_keys = self.read_schema.unique_keys(); + // TODO: add pushdown check in table trait + let options = &self.table.options(); + let is_append = matches!(options.get(UPDATE_MODE), Some(mode) if mode == "APPEND"); + let is_system_engine = self.table.engine_type() == "system"; + + filters + .iter() + .map(|filter| { + if is_system_engine { + return TableProviderFilterPushDown::Inexact; + } + + if is_append || Self::only_filter_unique_key_columns(filter, &unique_keys) { + TableProviderFilterPushDown::Exact + } else { + TableProviderFilterPushDown::Inexact + } + }) + .collect() + } } #[async_trait] @@ -237,8 +262,11 @@ impl TableProvider for TableProviderAdapter { self.scan_table(state, projection, filters, limit).await } - fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { - Ok(TableProviderFilterPushDown::Inexact) + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(self.pushdown_inner(filters)) } /// Get the type of this table for metadata/catalog purposes. @@ -264,8 +292,11 @@ impl TableSource for TableProviderAdapter { /// Tests whether the table provider can make use of a filter expression /// to optimize data retrieval. - fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { - Ok(TableProviderFilterPushDown::Inexact) + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(self.pushdown_inner(filters)) } }