Skip to content

Commit 50b8f77

Browse files
dust1jiacai2050
andauthored
feat: remove filter plan node in pipeline (#1126)
## Rationale close #341 ## Detailed Changes I just change `supports_filter_pushdown` in `TableProvider` to return `TableProviderFilterPushDown::Exact` to fix this ## Test Plan pass --------- Co-authored-by: jiacai2050 <dev@liujiacai.net>
1 parent b1bbaf7 commit 50b8f77

File tree

3 files changed

+273
-12
lines changed

3 files changed

+273
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
DROP TABLE IF EXISTS `issue341_t1`;
2+
3+
affected_rows: 0
4+
5+
DROP TABLE IF EXISTS `issue341_t2`;
6+
7+
affected_rows: 0
8+
9+
CREATE TABLE `issue341_t1` (
10+
`timestamp` timestamp NOT NULL,
11+
`value` int,
12+
`tag1` string tag,
13+
timestamp KEY (timestamp)) ENGINE=Analytic
14+
WITH(
15+
enable_ttl='false',
16+
update_mode='append'
17+
);
18+
19+
affected_rows: 0
20+
21+
INSERT INTO `issue341_t1` (`timestamp`, `value`, `tag1`)
22+
VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3");
23+
24+
affected_rows: 3
25+
26+
SELECT
27+
`timestamp`,
28+
`value`
29+
FROM
30+
`issue341_t1`;
31+
32+
timestamp,value,
33+
Timestamp(1),Int32(1),
34+
Timestamp(3),Int32(3),
35+
Timestamp(2),Int32(2),
36+
37+
38+
SELECT
39+
`timestamp`,
40+
`value`
41+
FROM
42+
`issue341_t1`
43+
WHERE
44+
`value` = 3;
45+
46+
timestamp,value,
47+
Timestamp(3),Int32(3),
48+
49+
50+
EXPLAIN SELECT
51+
`timestamp`,
52+
`value`
53+
FROM
54+
`issue341_t1`
55+
WHERE
56+
`value` = 3;
57+
58+
plan_type,plan,
59+
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"),
60+
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"),
61+
62+
63+
EXPLAIN SELECT
64+
`timestamp`,
65+
`value`
66+
FROM
67+
`issue341_t1`
68+
WHERE
69+
tag1 = "t3";
70+
71+
plan_type,plan,
72+
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
73+
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"),
74+
75+
76+
CREATE TABLE `issue341_t2` (
77+
`timestamp` timestamp NOT NULL,
78+
`value` double,
79+
`tag1` string tag,
80+
timestamp KEY (timestamp)) ENGINE=Analytic
81+
WITH(
82+
enable_ttl='false',
83+
update_mode='overwrite'
84+
);
85+
86+
affected_rows: 0
87+
88+
INSERT INTO `issue341_t2` (`timestamp`, `value`, `tag1`)
89+
VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3");
90+
91+
affected_rows: 3
92+
93+
SELECT
94+
`timestamp`,
95+
`value`
96+
FROM
97+
`issue341_t2`
98+
WHERE
99+
`value` = 3;
100+
101+
timestamp,value,
102+
Timestamp(3),Double(3.0),
103+
104+
105+
EXPLAIN SELECT
106+
`timestamp`,
107+
`value`
108+
FROM
109+
`issue341_t2`
110+
WHERE
111+
`value` = 3;
112+
113+
plan_type,plan,
114+
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"),
115+
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"),
116+
117+
118+
EXPLAIN SELECT
119+
`timestamp`,
120+
`value`
121+
FROM
122+
`issue341_t2`
123+
WHERE
124+
tag1 = "t3";
125+
126+
plan_type,plan,
127+
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
128+
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8\n"),
129+
130+
131+
DROP TABLE IF EXISTS `issue341_t1`;
132+
133+
affected_rows: 0
134+
135+
DROP TABLE IF EXISTS `issue341_t2`;
136+
137+
affected_rows: 0
138+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
2+
DROP TABLE IF EXISTS `issue341_t1`;
3+
DROP TABLE IF EXISTS `issue341_t2`;
4+
5+
CREATE TABLE `issue341_t1` (
6+
`timestamp` timestamp NOT NULL,
7+
`value` int,
8+
`tag1` string tag,
9+
timestamp KEY (timestamp)) ENGINE=Analytic
10+
WITH(
11+
enable_ttl='false',
12+
update_mode='append'
13+
);
14+
15+
INSERT INTO `issue341_t1` (`timestamp`, `value`, `tag1`)
16+
VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3");
17+
18+
SELECT
19+
`timestamp`,
20+
`value`
21+
FROM
22+
`issue341_t1`;
23+
24+
SELECT
25+
`timestamp`,
26+
`value`
27+
FROM
28+
`issue341_t1`
29+
WHERE
30+
`value` = 3;
31+
32+
-- FilterExec node should not be in plan.
33+
EXPLAIN SELECT
34+
`timestamp`,
35+
`value`
36+
FROM
37+
`issue341_t1`
38+
WHERE
39+
`value` = 3;
40+
41+
-- FilterExec node should not be in plan.
42+
EXPLAIN SELECT
43+
`timestamp`,
44+
`value`
45+
FROM
46+
`issue341_t1`
47+
WHERE
48+
tag1 = "t3";
49+
50+
-- Repeat operations above, but with overwrite table
51+
52+
CREATE TABLE `issue341_t2` (
53+
`timestamp` timestamp NOT NULL,
54+
`value` double,
55+
`tag1` string tag,
56+
timestamp KEY (timestamp)) ENGINE=Analytic
57+
WITH(
58+
enable_ttl='false',
59+
update_mode='overwrite'
60+
);
61+
62+
INSERT INTO `issue341_t2` (`timestamp`, `value`, `tag1`)
63+
VALUES (1, 1, "t1"), (2, 2, "t2"), (3, 3, "t3");
64+
65+
SELECT
66+
`timestamp`,
67+
`value`
68+
FROM
69+
`issue341_t2`
70+
WHERE
71+
`value` = 3;
72+
73+
-- FilterExec node should be in plan.
74+
EXPLAIN SELECT
75+
`timestamp`,
76+
`value`
77+
FROM
78+
`issue341_t2`
79+
WHERE
80+
`value` = 3;
81+
82+
-- When using tag as filter, FilterExec node should not be in plan.
83+
EXPLAIN SELECT
84+
`timestamp`,
85+
`value`
86+
FROM
87+
`issue341_t2`
88+
WHERE
89+
tag1 = "t3";
90+
91+
DROP TABLE IF EXISTS `issue341_t1`;
92+
DROP TABLE IF EXISTS `issue341_t2`;

table_engine/src/provider.rs

+43-12
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::{
2323

2424
use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
26-
use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema};
26+
use common_types::{
27+
projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, UPDATE_MODE,
28+
};
2729
use datafusion::{
2830
config::{ConfigEntry, ConfigExtension, ExtensionOptions},
2931
datasource::TableProvider,
@@ -184,12 +186,12 @@ impl TableProviderAdapter {
184186
}
185187

186188
fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef {
187-
let unique_keys = self.read_schema.unique_keys();
188-
189-
let push_down_filters = filters
189+
let pushdown_states = self.pushdown_inner(&filters.iter().collect::<Vec<_>>());
190+
let pushdown_filters = filters
190191
.iter()
191-
.filter_map(|filter| {
192-
if Self::only_filter_unique_key_columns(filter, &unique_keys) {
192+
.zip(pushdown_states.iter())
193+
.filter_map(|(filter, state)| {
194+
if matches!(state, &TableProviderFilterPushDown::Exact) {
193195
Some(filter.clone())
194196
} else {
195197
None
@@ -198,8 +200,8 @@ impl TableProviderAdapter {
198200
.collect::<Vec<_>>();
199201

200202
PredicateBuilder::default()
201-
.add_pushdown_exprs(&push_down_filters)
202-
.extract_time_range(&self.read_schema, &push_down_filters)
203+
.add_pushdown_exprs(&pushdown_filters)
204+
.extract_time_range(&self.read_schema, filters)
203205
.build()
204206
}
205207

@@ -214,6 +216,29 @@ impl TableProviderAdapter {
214216
}
215217
true
216218
}
219+
220+
fn pushdown_inner(&self, filters: &[&Expr]) -> Vec<TableProviderFilterPushDown> {
221+
let unique_keys = self.read_schema.unique_keys();
222+
// TODO: add pushdown check in table trait
223+
let options = &self.table.options();
224+
let is_append = matches!(options.get(UPDATE_MODE), Some(mode) if mode == "APPEND");
225+
let is_system_engine = self.table.engine_type() == "system";
226+
227+
filters
228+
.iter()
229+
.map(|filter| {
230+
if is_system_engine {
231+
return TableProviderFilterPushDown::Inexact;
232+
}
233+
234+
if is_append || Self::only_filter_unique_key_columns(filter, &unique_keys) {
235+
TableProviderFilterPushDown::Exact
236+
} else {
237+
TableProviderFilterPushDown::Inexact
238+
}
239+
})
240+
.collect()
241+
}
217242
}
218243

219244
#[async_trait]
@@ -237,8 +262,11 @@ impl TableProvider for TableProviderAdapter {
237262
self.scan_table(state, projection, filters, limit).await
238263
}
239264

240-
fn supports_filter_pushdown(&self, _filter: &Expr) -> Result<TableProviderFilterPushDown> {
241-
Ok(TableProviderFilterPushDown::Inexact)
265+
fn supports_filters_pushdown(
266+
&self,
267+
filters: &[&Expr],
268+
) -> Result<Vec<TableProviderFilterPushDown>> {
269+
Ok(self.pushdown_inner(filters))
242270
}
243271

244272
/// Get the type of this table for metadata/catalog purposes.
@@ -264,8 +292,11 @@ impl TableSource for TableProviderAdapter {
264292

265293
/// Tests whether the table provider can make use of a filter expression
266294
/// to optimize data retrieval.
267-
fn supports_filter_pushdown(&self, _filter: &Expr) -> Result<TableProviderFilterPushDown> {
268-
Ok(TableProviderFilterPushDown::Inexact)
295+
fn supports_filters_pushdown(
296+
&self,
297+
filters: &[&Expr],
298+
) -> Result<Vec<TableProviderFilterPushDown>> {
299+
Ok(self.pushdown_inner(filters))
269300
}
270301
}
271302

0 commit comments

Comments
 (0)