@@ -23,7 +23,9 @@ use std::{
23
23
24
24
use arrow:: datatypes:: SchemaRef ;
25
25
use async_trait:: async_trait;
26
- use common_types:: { projected_schema:: ProjectedSchema , request_id:: RequestId , schema:: Schema , UPDATE_MODE } ;
26
+ use common_types:: {
27
+ projected_schema:: ProjectedSchema , request_id:: RequestId , schema:: Schema , UPDATE_MODE ,
28
+ } ;
27
29
use datafusion:: {
28
30
config:: { ConfigEntry , ConfigExtension , ExtensionOptions } ,
29
31
datasource:: TableProvider ,
@@ -184,18 +186,12 @@ impl TableProviderAdapter {
184
186
}
185
187
186
188
fn check_and_build_predicate_from_filters ( & self , filters : & [ Expr ] ) -> PredicateRef {
187
- let unique_keys = self . read_schema . unique_keys ( ) ;
188
-
189
- let options = & self . table . options ( ) ;
190
- let is_append = match options. get ( UPDATE_MODE ) {
191
- Some ( mode) if mode == "APPEND" => true ,
192
- _ => false
193
- } ;
194
-
195
- let push_down_filters = filters
189
+ let pushdown_states = self . pushdown_inner ( & filters. iter ( ) . collect :: < Vec < _ > > ( ) ) ;
190
+ let pushdown_filters = filters
196
191
. iter ( )
197
- . filter_map ( |filter| {
198
- if Self :: only_filter_unique_key_columns ( filter, & unique_keys) && is_append {
192
+ . zip ( pushdown_states. iter ( ) )
193
+ . filter_map ( |( filter, state) | {
194
+ if matches ! ( state, & TableProviderFilterPushDown :: Exact ) {
199
195
Some ( filter. clone ( ) )
200
196
} else {
201
197
None
@@ -204,8 +200,8 @@ impl TableProviderAdapter {
204
200
. collect :: < Vec < _ > > ( ) ;
205
201
206
202
PredicateBuilder :: default ( )
207
- . add_pushdown_exprs ( & push_down_filters )
208
- . extract_time_range ( & self . read_schema , & push_down_filters )
203
+ . add_pushdown_exprs ( & pushdown_filters )
204
+ . extract_time_range ( & self . read_schema , filters )
209
205
. build ( )
210
206
}
211
207
@@ -220,6 +216,30 @@ impl TableProviderAdapter {
220
216
}
221
217
true
222
218
}
219
+
220
+ fn pushdown_inner ( & self , filters : & [ & Expr ] ) -> Vec < TableProviderFilterPushDown > {
221
+ let unique_keys = self . read_schema . unique_keys ( ) ;
222
+
223
+ // TODO: add pushdown check in table trait
224
+ let options = & self . table . options ( ) ;
225
+ let is_append = matches ! ( options. get( UPDATE_MODE ) , Some ( mode) if mode == "APPEND" ) ;
226
+ let is_system_engine = self . table . engine_type ( ) == "system" ;
227
+
228
+ filters
229
+ . iter ( )
230
+ . map ( |filter| {
231
+ if is_system_engine {
232
+ return TableProviderFilterPushDown :: Inexact ;
233
+ }
234
+
235
+ if is_append || Self :: only_filter_unique_key_columns ( filter, & unique_keys) {
236
+ TableProviderFilterPushDown :: Exact
237
+ } else {
238
+ TableProviderFilterPushDown :: Inexact
239
+ }
240
+ } )
241
+ . collect ( )
242
+ }
223
243
}
224
244
225
245
#[ async_trait]
@@ -243,8 +263,11 @@ impl TableProvider for TableProviderAdapter {
243
263
self . scan_table ( state, projection, filters, limit) . await
244
264
}
245
265
246
- fn supports_filter_pushdown ( & self , _filter : & Expr ) -> Result < TableProviderFilterPushDown > {
247
- Ok ( TableProviderFilterPushDown :: Exact )
266
+ fn supports_filters_pushdown (
267
+ & self ,
268
+ filters : & [ & Expr ] ,
269
+ ) -> Result < Vec < TableProviderFilterPushDown > > {
270
+ Ok ( self . pushdown_inner ( filters) )
248
271
}
249
272
250
273
/// Get the type of this table for metadata/catalog purposes.
@@ -270,8 +293,11 @@ impl TableSource for TableProviderAdapter {
270
293
271
294
/// Tests whether the table provider can make use of a filter expression
272
295
/// to optimize data retrieval.
273
- fn supports_filter_pushdown ( & self , _filter : & Expr ) -> Result < TableProviderFilterPushDown > {
274
- Ok ( TableProviderFilterPushDown :: Exact )
296
+ fn supports_filters_pushdown (
297
+ & self ,
298
+ filters : & [ & Expr ] ,
299
+ ) -> Result < Vec < TableProviderFilterPushDown > > {
300
+ Ok ( self . pushdown_inner ( filters) )
275
301
}
276
302
}
277
303
0 commit comments