@@ -23,7 +23,9 @@ use std::{
23
23
24
24
use arrow:: datatypes:: SchemaRef ;
25
25
use async_trait:: async_trait;
26
- use common_types:: { projected_schema:: ProjectedSchema , request_id:: RequestId , schema:: Schema , UPDATE_MODE } ;
26
+ use common_types:: {
27
+ projected_schema:: ProjectedSchema , request_id:: RequestId , schema:: Schema , UPDATE_MODE ,
28
+ } ;
27
29
use datafusion:: {
28
30
config:: { ConfigEntry , ConfigExtension , ExtensionOptions } ,
29
31
datasource:: TableProvider ,
@@ -184,18 +186,13 @@ impl TableProviderAdapter {
184
186
}
185
187
186
188
fn check_and_build_predicate_from_filters ( & self , filters : & [ Expr ] ) -> PredicateRef {
187
- let unique_keys = self . read_schema . unique_keys ( ) ;
188
-
189
- let options = & self . table . options ( ) ;
190
- let is_append = match options. get ( UPDATE_MODE ) {
191
- Some ( mode) if mode == "APPEND" => true ,
192
- _ => false
193
- } ;
189
+ let pushdowns = self . pushdown_inner ( & filters. iter ( ) . collect :: < Vec < _ > > ( ) ) ;
194
190
195
- let push_down_filters = filters
191
+ let pushdown_pushdown_filters = filters
196
192
. iter ( )
197
- . filter_map ( |filter| {
198
- if Self :: only_filter_unique_key_columns ( filter, & unique_keys) && is_append {
193
+ . zip ( pushdowns. iter ( ) )
194
+ . filter_map ( |( filter, pushdown) | {
195
+ if matches ! ( pushdown, & TableProviderFilterPushDown :: Exact ) {
199
196
Some ( filter. clone ( ) )
200
197
} else {
201
198
None
@@ -204,8 +201,8 @@ impl TableProviderAdapter {
204
201
. collect :: < Vec < _ > > ( ) ;
205
202
206
203
PredicateBuilder :: default ( )
207
- . add_pushdown_exprs ( & push_down_filters )
208
- . extract_time_range ( & self . read_schema , & push_down_filters )
204
+ . add_pushdown_exprs ( & pushdown_pushdown_filters )
205
+ . extract_time_range ( & self . read_schema , filters )
209
206
. build ( )
210
207
}
211
208
@@ -220,6 +217,30 @@ impl TableProviderAdapter {
220
217
}
221
218
true
222
219
}
220
+
221
+ fn pushdown_inner ( & self , filters : & [ & Expr ] ) -> Vec < TableProviderFilterPushDown > {
222
+ let unique_keys = self . read_schema . unique_keys ( ) ;
223
+
224
+ // TODO: add pushdown check in table trait
225
+ let options = & self . table . options ( ) ;
226
+ let is_append = matches ! ( options. get( UPDATE_MODE ) , Some ( mode) if mode == "APPEND" ) ;
227
+ let is_system_engine = self . table . engine_type ( ) == "system" ;
228
+
229
+ filters
230
+ . iter ( )
231
+ . map ( |filter| {
232
+ if is_system_engine {
233
+ return TableProviderFilterPushDown :: Inexact ;
234
+ }
235
+
236
+ if is_append || Self :: only_filter_unique_key_columns ( filter, & unique_keys) {
237
+ TableProviderFilterPushDown :: Exact
238
+ } else {
239
+ TableProviderFilterPushDown :: Inexact
240
+ }
241
+ } )
242
+ . collect ( )
243
+ }
223
244
}
224
245
225
246
#[ async_trait]
@@ -243,8 +264,11 @@ impl TableProvider for TableProviderAdapter {
243
264
self . scan_table ( state, projection, filters, limit) . await
244
265
}
245
266
246
- fn supports_filter_pushdown ( & self , _filter : & Expr ) -> Result < TableProviderFilterPushDown > {
247
- Ok ( TableProviderFilterPushDown :: Exact )
267
+ fn supports_filters_pushdown (
268
+ & self ,
269
+ filters : & [ & Expr ] ,
270
+ ) -> Result < Vec < TableProviderFilterPushDown > > {
271
+ Ok ( self . pushdown_inner ( filters) )
248
272
}
249
273
250
274
/// Get the type of this table for metadata/catalog purposes.
@@ -270,8 +294,11 @@ impl TableSource for TableProviderAdapter {
270
294
271
295
/// Tests whether the table provider can make use of a filter expression
272
296
/// to optimize data retrieval.
273
- fn supports_filter_pushdown ( & self , _filter : & Expr ) -> Result < TableProviderFilterPushDown > {
274
- Ok ( TableProviderFilterPushDown :: Exact )
297
+ fn supports_filters_pushdown (
298
+ & self ,
299
+ filters : & [ & Expr ] ,
300
+ ) -> Result < Vec < TableProviderFilterPushDown > > {
301
+ Ok ( self . pushdown_inner ( filters) )
275
302
}
276
303
}
277
304
0 commit comments