@@ -22,7 +22,7 @@ use generic_error::BoxError;
22
22
use log:: { debug, error} ;
23
23
use object_store:: { ObjectStoreRef , Path } ;
24
24
use parquet:: data_type:: AsBytes ;
25
- use snafu:: ResultExt ;
25
+ use snafu:: { OptionExt , ResultExt } ;
26
26
use tokio:: io:: { AsyncWrite , AsyncWriteExt } ;
27
27
28
28
use super :: meta_data:: RowGroupFilter ;
@@ -36,7 +36,8 @@ use crate::{
36
36
} ,
37
37
writer:: {
38
38
self , BuildParquetFilter , EncodePbData , EncodeRecordBatch , Io , MetaData ,
39
- PollRecordBatch , RecordBatchStream , Result , SstInfo , SstWriter , Storage ,
39
+ PollRecordBatch , RecordBatchStream , RequireTimestampColumn , Result , SstInfo , SstWriter ,
40
+ Storage ,
40
41
} ,
41
42
} ,
42
43
table:: sst_util,
@@ -87,7 +88,10 @@ struct RecordBatchGroupWriter {
87
88
compression : Compression ,
88
89
level : Level ,
89
90
91
+ // inner status
90
92
input_exhausted : bool ,
93
+ // Time range of rows, not aligned to segment.
94
+ real_time_range : Option < TimeRange > ,
91
95
}
92
96
93
97
impl RecordBatchGroupWriter {
@@ -109,6 +113,7 @@ impl RecordBatchGroupWriter {
109
113
compression,
110
114
level,
111
115
input_exhausted : false ,
116
+ real_time_range : None ,
112
117
}
113
118
}
114
119
@@ -199,6 +204,25 @@ impl RecordBatchGroupWriter {
199
204
!self . level . is_min ( )
200
205
}
201
206
207
+ fn update_time_range ( & mut self , current_range : Option < TimeRange > ) {
208
+ if let Some ( current_range) = current_range {
209
+ if let Some ( real_range) = self . real_time_range {
210
+ // Use current range to update real range,
211
+ // We should expand range as possible as we can.
212
+ self . real_time_range = Some ( TimeRange :: new_unchecked (
213
+ current_range
214
+ . inclusive_start ( )
215
+ . min ( real_range. inclusive_start ( ) ) ,
216
+ current_range
217
+ . exclusive_end ( )
218
+ . max ( real_range. exclusive_end ( ) ) ,
219
+ ) ) ;
220
+ } else {
221
+ self . real_time_range = Some ( current_range) ;
222
+ }
223
+ }
224
+ }
225
+
202
226
async fn write_all < W : AsyncWrite + Send + Unpin + ' static > (
203
227
mut self ,
204
228
sink : W ,
@@ -223,7 +247,6 @@ impl RecordBatchGroupWriter {
223
247
None
224
248
} ;
225
249
let timestamp_index = self . meta_data . schema . timestamp_index ( ) ;
226
- let mut real_time_range: Option < TimeRange > = None ;
227
250
228
251
loop {
229
252
let row_group = self . fetch_next_row_group ( & mut prev_record_batch) . await ?;
@@ -238,22 +261,13 @@ impl RecordBatchGroupWriter {
238
261
let num_batches = row_group. len ( ) ;
239
262
for record_batch in row_group {
240
263
let column_block = record_batch. column ( timestamp_index) ;
241
- let ts_col = column_block. as_timestamp ( ) . unwrap ( ) ;
242
- if let Some ( current_range) = ts_col. min_max ( ) {
243
- if let Some ( real_range) = real_time_range {
244
- // Use current range to update real_range
245
- real_time_range = Some ( TimeRange :: new_unchecked (
246
- current_range
247
- . inclusive_start ( )
248
- . min ( real_range. inclusive_start ( ) ) ,
249
- current_range
250
- . exclusive_end ( )
251
- . min ( real_range. exclusive_end ( ) ) ,
252
- ) ) ;
253
- } else {
254
- real_time_range = Some ( current_range) ;
255
- }
256
- }
264
+ let ts_col = column_block
265
+ . as_timestamp ( )
266
+ . context ( RequireTimestampColumn {
267
+ datum_kind : column_block. datum_kind ( ) ,
268
+ } ) ?;
269
+ self . update_time_range ( ts_col. min_max ( ) ) ;
270
+
257
271
arrow_row_group. push ( record_batch. into_record_batch ( ) . into_arrow_record_batch ( ) ) ;
258
272
}
259
273
let num_rows = parquet_encoder
@@ -271,17 +285,12 @@ impl RecordBatchGroupWriter {
271
285
let parquet_meta_data = {
272
286
let mut parquet_meta_data = ParquetMetaData :: from ( self . meta_data . clone ( ) ) ;
273
287
parquet_meta_data. parquet_filter = parquet_filter;
274
- if let Some ( range) = real_time_range {
288
+ if let Some ( range) = self . real_time_range {
275
289
parquet_meta_data. time_range = range;
276
290
}
277
291
parquet_meta_data
278
292
} ;
279
293
280
- log:: info!(
281
- "Parquet sst path:{} range:{:?}" ,
282
- & meta_path,
283
- parquet_meta_data. time_range
284
- ) ;
285
294
parquet_encoder
286
295
. set_meta_data_path ( Some ( meta_path. to_string ( ) ) )
287
296
. box_err ( )
@@ -487,7 +496,7 @@ mod tests {
487
496
488
497
let schema = build_schema_with_dictionary ( ) ;
489
498
let reader_projected_schema = ProjectedSchema :: no_projection ( schema. clone ( ) ) ;
490
- let sst_meta = MetaData {
499
+ let mut sst_meta = MetaData {
491
500
min_key : Bytes :: from_static ( b"100" ) ,
492
501
max_key : Bytes :: from_static ( b"200" ) ,
493
502
time_range : TimeRange :: new_unchecked ( Timestamp :: new ( 1 ) , Timestamp :: new ( 2 ) ) ,
@@ -502,7 +511,6 @@ mod tests {
502
511
}
503
512
counter -= 1 ;
504
513
505
- // reach here when counter is 9 7 5 3 1
506
514
let ts = 100 + counter;
507
515
let rows = vec ! [
508
516
build_row_for_dictionary(
@@ -592,6 +600,13 @@ mod tests {
592
600
// sst filter is built insider sst writer, so overwrite to default for
593
601
// comparison.
594
602
sst_meta_readback. parquet_filter = Default :: default ( ) ;
603
+ // time_range is built insider sst writer, so overwrite it for
604
+ // comparison.
605
+ sst_meta. time_range = sst_info. time_range ;
606
+ assert_eq ! (
607
+ sst_meta. time_range,
608
+ TimeRange :: new_unchecked( 100 . into( ) , 105 . into( ) )
609
+ ) ;
595
610
assert_eq ! ( & sst_meta_readback, & ParquetMetaData :: from( sst_meta) ) ;
596
611
assert_eq ! (
597
612
expected_num_rows,
0 commit comments