Skip to content

Commit 1ee1e73

Browse files
committed
chore: remove the codes about the reverse reading
1 parent 1607d1d commit 1ee1e73

File tree

28 files changed

+38
-977
lines changed

28 files changed

+38
-977
lines changed

analytic_engine/src/compaction/picker.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ impl SizeTieredPicker {
410410
return None;
411411
}
412412

413-
// Find the hotest bucket
413+
// Find the hottest bucket
414414
if let Some((bucket, hotness)) =
415415
pruned_bucket_and_hotness
416416
.into_iter()
@@ -419,12 +419,12 @@ impl SizeTieredPicker {
419419
if !c.is_eq() {
420420
return c;
421421
}
422-
//TODO(boyan), compacting smallest sstables first?
422+
// TODO(boyan), compacting smallest sstables first?
423423
b1.avg_size.cmp(&b2.avg_size)
424424
})
425425
{
426426
debug!(
427-
"Find the hotest bucket, hotness: {}, bucket: {:?}",
427+
"Find the hottest bucket, hotness: {}, bucket: {:?}",
428428
hotness, bucket
429429
);
430430
Some(bucket.files)

analytic_engine/src/instance/flush_compaction.rs

-1
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,6 @@ impl SpaceStore {
776776
let table_options = table_data.table_options();
777777
let projected_schema = ProjectedSchema::no_projection(schema.clone());
778778
let sst_read_options = SstReadOptions {
779-
reverse: false,
780779
num_rows_per_row_group: table_options.num_rows_per_row_group,
781780
frequency: ReadFrequency::Once,
782781
projected_schema: projected_schema.clone(),

analytic_engine/src/instance/read.rs

+3-19
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ const ITER_NUM_METRIC_NAME: &str = "iter_num";
7373
const MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "merge_iter";
7474
const CHAIN_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "chain_iter";
7575

76-
/// Check whether it needs to apply merge sorting when reading the table with
77-
/// the `table_options` by the `read_request`.
78-
fn need_merge_sort_streams(table_options: &TableOptions, read_request: &ReadRequest) -> bool {
79-
table_options.need_dedup() || read_request.order.is_in_order()
80-
}
81-
8276
impl Instance {
8377
/// Read data in multiple time range from table, and return
8478
/// `read_parallelism` output streams.
@@ -95,7 +89,7 @@ impl Instance {
9589
let table_options = table_data.table_options();
9690
// Collect metrics.
9791
table_data.metrics.on_read_request_begin();
98-
let need_merge_sort = need_merge_sort_streams(&table_options, &request);
92+
let need_merge_sort = table_options.need_dedup();
9993
request.metrics_collector.collect(Metric::boolean(
10094
MERGE_SORT_METRIC_NAME.to_string(),
10195
need_merge_sort,
@@ -118,15 +112,10 @@ impl Instance {
118112
fn build_partitioned_streams(
119113
&self,
120114
request: &ReadRequest,
121-
mut partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
115+
partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
122116
) -> Result<PartitionedStreams> {
123117
let read_parallelism = request.opts.read_parallelism;
124118

125-
if read_parallelism == 1 && request.order.is_in_desc_order() {
126-
// TODO(xikai): it seems this can be avoided.
127-
partitioned_iters.reverse();
128-
};
129-
130119
// Split iterators into `read_parallelism` groups.
131120
let mut splitted_iters: Vec<_> = std::iter::repeat_with(Vec::new)
132121
.take(read_parallelism)
@@ -157,7 +146,6 @@ impl Instance {
157146
let sequence = table_data.last_sequence();
158147
let projected_schema = request.projected_schema.clone();
159148
let sst_read_options = SstReadOptions {
160-
reverse: request.order.is_in_desc_order(),
161149
frequency: ReadFrequency::Frequent,
162150
projected_schema: projected_schema.clone(),
163151
predicate: request.predicate.clone(),
@@ -191,7 +179,7 @@ impl Instance {
191179
store_picker: self.space_store.store_picker(),
192180
merge_iter_options: iter_options.clone(),
193181
need_dedup: table_options.need_dedup(),
194-
reverse: request.order.is_in_desc_order(),
182+
reverse: false,
195183
};
196184

197185
let merge_iter = MergeBuilder::new(merge_config)
@@ -226,11 +214,7 @@ impl Instance {
226214
) -> Result<Vec<ChainIterator>> {
227215
let projected_schema = request.projected_schema.clone();
228216

229-
assert!(request.order.is_out_of_order());
230-
231217
let sst_read_options = SstReadOptions {
232-
// no need to read in order so just read in asc order by default.
233-
reverse: false,
234218
frequency: ReadFrequency::Frequent,
235219
projected_schema: projected_schema.clone(),
236220
predicate: request.predicate.clone(),

analytic_engine/src/row_iter/merge.rs

+2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ pub struct MergeConfig<'a> {
105105
pub merge_iter_options: IterOptions,
106106

107107
pub need_dedup: bool,
108+
// TODO: Currently, the read the sst in a reverse order is not supported yet, that is to say,
109+
// the output won't be expected if it is set.
108110
pub reverse: bool,
109111
}
110112

analytic_engine/src/sst/factory.rs

-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ impl Default for ScanOptions {
119119

120120
#[derive(Debug, Clone)]
121121
pub struct SstReadOptions {
122-
pub reverse: bool,
123122
pub frequency: ReadFrequency,
124123
pub num_rows_per_row_group: usize,
125124
pub projected_schema: ProjectedSchema,

analytic_engine/src/sst/parquet/writer.rs

-1
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ mod tests {
467467
let scan_options = ScanOptions::default();
468468
// read sst back to test
469469
let sst_read_options = SstReadOptions {
470-
reverse: false,
471470
frequency: ReadFrequency::Frequent,
472471
num_rows_per_row_group: 5,
473472
projected_schema: reader_projected_schema,

analytic_engine/src/table/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use table_engine::{
2626
stream::{PartitionedStreams, SendableRecordBatchStream},
2727
table::{
2828
AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get,
29-
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder,
30-
ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites,
31-
WaitForPendingWrites, Write, WriteRequest,
29+
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadRequest,
30+
Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, WaitForPendingWrites,
31+
Write, WriteRequest,
3232
},
3333
ANALYTIC_ENGINE_TYPE,
3434
};
@@ -491,7 +491,6 @@ impl Table for TableImpl {
491491
opts: ReadOptions::default(),
492492
projected_schema: request.projected_schema,
493493
predicate,
494-
order: ReadOrder::None,
495494
metrics_collector: MetricsCollector::new(GET_METRICS_COLLECTOR_NAME.to_string()),
496495
};
497496
let mut batch_stream = self

analytic_engine/src/tests/read_write_test.rs

-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::{thread, time};
66

77
use common_types::time::Timestamp;
88
use log::info;
9-
use table_engine::table::ReadOrder;
109

1110
use crate::{
1211
setup::WalsOpener,
@@ -765,7 +764,6 @@ fn test_table_write_read_reverse<T: EngineBuildContext>(engine_context: T) {
765764
"Test read write table",
766765
test_table,
767766
&expect_reversed_rows,
768-
ReadOrder::Desc,
769767
)
770768
.await;
771769
});
@@ -864,7 +862,6 @@ fn test_table_write_read_reverse_after_flush<T: EngineBuildContext>(engine_conte
864862
"Test read write table",
865863
test_table,
866864
&expect_reversed_rows,
867-
ReadOrder::Desc,
868865
)
869866
.await;
870867
});

analytic_engine/src/tests/table.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use table_engine::{
1919
self,
2020
engine::{CreateTableRequest, TableState},
2121
predicate::Predicate,
22-
table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq},
22+
table::{GetRequest, ReadOptions, ReadRequest, SchemaId, TableId, TableSeq},
2323
};
2424
use time_ext::ReadableDuration;
2525
use trace_metric::MetricsCollector;
@@ -119,8 +119,8 @@ impl FixedSchemaTable {
119119
row_util::new_row_6(data)
120120
}
121121

122-
pub fn new_read_all_request(&self, opts: ReadOptions, read_order: ReadOrder) -> ReadRequest {
123-
new_read_all_request_with_order(self.create_request.table_schema.clone(), opts, read_order)
122+
pub fn new_read_all_request(&self, opts: ReadOptions) -> ReadRequest {
123+
new_read_all_request_with_order(self.create_request.table_schema.clone(), opts)
124124
}
125125

126126
pub fn new_get_request(&self, key: KeyTuple) -> GetRequest {
@@ -175,23 +175,18 @@ pub fn read_opts_list() -> Vec<ReadOptions> {
175175
]
176176
}
177177

178-
pub fn new_read_all_request_with_order(
179-
schema: Schema,
180-
opts: ReadOptions,
181-
order: ReadOrder,
182-
) -> ReadRequest {
178+
pub fn new_read_all_request_with_order(schema: Schema, opts: ReadOptions) -> ReadRequest {
183179
ReadRequest {
184180
request_id: RequestId::next_id(),
185181
opts,
186182
projected_schema: ProjectedSchema::no_projection(schema),
187183
predicate: Arc::new(Predicate::empty()),
188-
order,
189184
metrics_collector: MetricsCollector::default(),
190185
}
191186
}
192187

193188
pub fn new_read_all_request(schema: Schema, opts: ReadOptions) -> ReadRequest {
194-
new_read_all_request_with_order(schema, opts, ReadOrder::None)
189+
new_read_all_request_with_order(schema, opts)
195190
}
196191

197192
pub fn assert_batch_eq_to_row_group(record_batches: &[RecordBatch], row_group: &RowGroup) {

analytic_engine/src/tests/util.rs

+4-13
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use table_engine::{
2121
Result as EngineResult, TableDef, TableEngineRef,
2222
},
2323
table::{
24-
AlterSchemaRequest, FlushRequest, GetRequest, ReadOrder, ReadRequest, Result, SchemaId,
25-
TableId, TableRef, WriteRequest,
24+
AlterSchemaRequest, FlushRequest, GetRequest, ReadRequest, Result, SchemaId, TableId,
25+
TableRef, WriteRequest,
2626
},
2727
};
2828
use tempfile::TempDir;
@@ -51,15 +51,14 @@ pub async fn check_read_with_order<T: WalsOpener>(
5151
msg: &str,
5252
table_name: &str,
5353
rows: &[RowTuple<'_>],
54-
read_order: ReadOrder,
5554
) {
5655
for read_opts in table::read_opts_list() {
5756
info!("{}, opts:{:?}", msg, read_opts);
5857

5958
let record_batches = test_ctx
6059
.read_table(
6160
table_name,
62-
fixed_schema_table.new_read_all_request(read_opts, read_order),
61+
fixed_schema_table.new_read_all_request(read_opts),
6362
)
6463
.await;
6564

@@ -74,15 +73,7 @@ pub async fn check_read<T: WalsOpener>(
7473
table_name: &str,
7574
rows: &[RowTuple<'_>],
7675
) {
77-
check_read_with_order(
78-
test_ctx,
79-
fixed_schema_table,
80-
msg,
81-
table_name,
82-
rows,
83-
ReadOrder::None,
84-
)
85-
.await
76+
check_read_with_order(test_ctx, fixed_schema_table, msg, table_name, rows).await
8677
}
8778

8879
pub async fn check_get<T: WalsOpener>(

benchmarks/src/merge_memtable_bench.rs

-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ fn mock_sst_read_options(
202202
num_streams_to_prefetch: 0,
203203
};
204204
SstReadOptions {
205-
reverse: false,
206205
frequency: ReadFrequency::Frequent,
207206
num_rows_per_row_group: 500,
208207
projected_schema,

benchmarks/src/merge_sst_bench.rs

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ impl MergeSstBench {
6767
num_streams_to_prefetch: 0,
6868
};
6969
let sst_read_options = SstReadOptions {
70-
reverse: false,
7170
frequency: ReadFrequency::Frequent,
7271
num_rows_per_row_group: config.num_rows_per_row_group,
7372
projected_schema,

benchmarks/src/sst_bench.rs

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ impl SstBench {
4545
num_streams_to_prefetch: 0,
4646
};
4747
let sst_read_options = SstReadOptions {
48-
reverse: config.reverse,
4948
frequency: ReadFrequency::Frequent,
5049
num_rows_per_row_group: config.num_rows_per_row_group,
5150
projected_schema,

benchmarks/src/sst_tools.rs

-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
107107
num_streams_to_prefetch: 2,
108108
};
109109
let sst_read_options = SstReadOptions {
110-
reverse: false,
111110
frequency: ReadFrequency::Once,
112111
num_rows_per_row_group: config.num_rows_per_row_group,
113112
projected_schema,
@@ -220,7 +219,6 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {
220219
let store_picker: ObjectStorePickerRef = Arc::new(store);
221220
let projected_schema = ProjectedSchema::no_projection(schema.clone());
222221
let sst_read_options = SstReadOptions {
223-
reverse: false,
224222
frequency: ReadFrequency::Once,
225223
num_rows_per_row_group: config.num_rows_per_row_group,
226224
projected_schema: projected_schema.clone(),

benchmarks/src/util.rs

-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ pub async fn load_sst_to_memtable(
104104
num_streams_to_prefetch: 0,
105105
};
106106
let sst_read_options = SstReadOptions {
107-
reverse: false,
108107
frequency: ReadFrequency::Frequent,
109108
num_rows_per_row_group: 8192,
110109
projected_schema: ProjectedSchema::no_projection(schema.clone()),

components/parquet_ext/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
pub mod meta_data;
44
pub mod prune;
55
pub mod reader;
6-
pub mod reverse_reader;
76
#[cfg(test)]
87
pub mod tests;
98

0 commit comments

Comments
 (0)