Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove the codes about the reverse reading #1116

Merged
merged 3 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ impl SizeTieredPicker {
return None;
}

// Find the hotest bucket
// Find the hottest bucket
if let Some((bucket, hotness)) =
pruned_bucket_and_hotness
.into_iter()
Expand All @@ -419,12 +419,12 @@ impl SizeTieredPicker {
if !c.is_eq() {
return c;
}
//TODO(boyan), compacting smallest sstables first?
// TODO(boyan), compacting smallest sstables first?
b1.avg_size.cmp(&b2.avg_size)
})
{
debug!(
"Find the hotest bucket, hotness: {}, bucket: {:?}",
"Find the hottest bucket, hotness: {}, bucket: {:?}",
hotness, bucket
);
Some(bucket.files)
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ impl SpaceStore {
let table_options = table_data.table_options();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_read_options = SstReadOptions {
reverse: false,
num_rows_per_row_group: table_options.num_rows_per_row_group,
frequency: ReadFrequency::Once,
projected_schema: projected_schema.clone(),
Expand Down
22 changes: 3 additions & 19 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ const ITER_NUM_METRIC_NAME: &str = "iter_num";
const MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "merge_iter";
const CHAIN_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "chain_iter";

/// Check whether it needs to apply merge sorting when reading the table with
/// the `table_options` by the `read_request`.
fn need_merge_sort_streams(table_options: &TableOptions, read_request: &ReadRequest) -> bool {
table_options.need_dedup() || read_request.order.is_in_order()
}

impl Instance {
/// Read data in multiple time range from table, and return
/// `read_parallelism` output streams.
Expand All @@ -95,7 +89,7 @@ impl Instance {
let table_options = table_data.table_options();
// Collect metrics.
table_data.metrics.on_read_request_begin();
let need_merge_sort = need_merge_sort_streams(&table_options, &request);
let need_merge_sort = table_options.need_dedup();
request.metrics_collector.collect(Metric::boolean(
MERGE_SORT_METRIC_NAME.to_string(),
need_merge_sort,
Expand All @@ -118,15 +112,10 @@ impl Instance {
fn build_partitioned_streams(
&self,
request: &ReadRequest,
mut partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
) -> Result<PartitionedStreams> {
let read_parallelism = request.opts.read_parallelism;

if read_parallelism == 1 && request.order.is_in_desc_order() {
// TODO(xikai): it seems this can be avoided.
partitioned_iters.reverse();
};

// Split iterators into `read_parallelism` groups.
let mut splitted_iters: Vec<_> = std::iter::repeat_with(Vec::new)
.take(read_parallelism)
Expand Down Expand Up @@ -157,7 +146,6 @@ impl Instance {
let sequence = table_data.last_sequence();
let projected_schema = request.projected_schema.clone();
let sst_read_options = SstReadOptions {
reverse: request.order.is_in_desc_order(),
frequency: ReadFrequency::Frequent,
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
Expand Down Expand Up @@ -191,7 +179,7 @@ impl Instance {
store_picker: self.space_store.store_picker(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: request.order.is_in_desc_order(),
reverse: false,
};

let merge_iter = MergeBuilder::new(merge_config)
Expand Down Expand Up @@ -226,11 +214,7 @@ impl Instance {
) -> Result<Vec<ChainIterator>> {
let projected_schema = request.projected_schema.clone();

assert!(request.order.is_out_of_order());

let sst_read_options = SstReadOptions {
// no need to read in order so just read in asc order by default.
reverse: false,
frequency: ReadFrequency::Frequent,
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub struct MergeConfig<'a> {
pub merge_iter_options: IterOptions,

pub need_dedup: bool,
// TODO: Currently, the read the sst in a reverse order is not supported yet, that is to say,
// the output won't be expected if it is set.
pub reverse: bool,
}

Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ impl Default for ScanOptions {

#[derive(Debug, Clone)]
pub struct SstReadOptions {
pub reverse: bool,
pub frequency: ReadFrequency,
pub num_rows_per_row_group: usize,
pub projected_schema: ProjectedSchema,
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ mod tests {
let scan_options = ScanOptions::default();
// read sst back to test
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 5,
projected_schema: reader_projected_schema,
Expand Down
7 changes: 3 additions & 4 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use table_engine::{
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder,
ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites,
WaitForPendingWrites, Write, WriteRequest,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadRequest,
Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, WaitForPendingWrites,
Write, WriteRequest,
},
ANALYTIC_ENGINE_TYPE,
};
Expand Down Expand Up @@ -491,7 +491,6 @@ impl Table for TableImpl {
opts: ReadOptions::default(),
projected_schema: request.projected_schema,
predicate,
order: ReadOrder::None,
metrics_collector: MetricsCollector::new(GET_METRICS_COLLECTOR_NAME.to_string()),
};
let mut batch_stream = self
Expand Down
3 changes: 0 additions & 3 deletions analytic_engine/src/tests/read_write_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{thread, time};

use common_types::time::Timestamp;
use log::info;
use table_engine::table::ReadOrder;

use crate::{
setup::WalsOpener,
Expand Down Expand Up @@ -765,7 +764,6 @@ fn test_table_write_read_reverse<T: EngineBuildContext>(engine_context: T) {
"Test read write table",
test_table,
&expect_reversed_rows,
ReadOrder::Desc,
)
.await;
});
Expand Down Expand Up @@ -864,7 +862,6 @@ fn test_table_write_read_reverse_after_flush<T: EngineBuildContext>(engine_conte
"Test read write table",
test_table,
&expect_reversed_rows,
ReadOrder::Desc,
)
.await;
});
Expand Down
15 changes: 5 additions & 10 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use table_engine::{
self,
engine::{CreateTableRequest, TableState},
predicate::Predicate,
table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq},
table::{GetRequest, ReadOptions, ReadRequest, SchemaId, TableId, TableSeq},
};
use time_ext::ReadableDuration;
use trace_metric::MetricsCollector;
Expand Down Expand Up @@ -119,8 +119,8 @@ impl FixedSchemaTable {
row_util::new_row_6(data)
}

pub fn new_read_all_request(&self, opts: ReadOptions, read_order: ReadOrder) -> ReadRequest {
new_read_all_request_with_order(self.create_request.table_schema.clone(), opts, read_order)
pub fn new_read_all_request(&self, opts: ReadOptions) -> ReadRequest {
new_read_all_request_with_order(self.create_request.table_schema.clone(), opts)
}

pub fn new_get_request(&self, key: KeyTuple) -> GetRequest {
Expand Down Expand Up @@ -175,23 +175,18 @@ pub fn read_opts_list() -> Vec<ReadOptions> {
]
}

pub fn new_read_all_request_with_order(
schema: Schema,
opts: ReadOptions,
order: ReadOrder,
) -> ReadRequest {
pub fn new_read_all_request_with_order(schema: Schema, opts: ReadOptions) -> ReadRequest {
ReadRequest {
request_id: RequestId::next_id(),
opts,
projected_schema: ProjectedSchema::no_projection(schema),
predicate: Arc::new(Predicate::empty()),
order,
metrics_collector: MetricsCollector::default(),
}
}

pub fn new_read_all_request(schema: Schema, opts: ReadOptions) -> ReadRequest {
new_read_all_request_with_order(schema, opts, ReadOrder::None)
new_read_all_request_with_order(schema, opts)
}

pub fn assert_batch_eq_to_row_group(record_batches: &[RecordBatch], row_group: &RowGroup) {
Expand Down
17 changes: 4 additions & 13 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use table_engine::{
Result as EngineResult, TableDef, TableEngineRef,
},
table::{
AlterSchemaRequest, FlushRequest, GetRequest, ReadOrder, ReadRequest, Result, SchemaId,
TableId, TableRef, WriteRequest,
AlterSchemaRequest, FlushRequest, GetRequest, ReadRequest, Result, SchemaId, TableId,
TableRef, WriteRequest,
},
};
use tempfile::TempDir;
Expand Down Expand Up @@ -51,15 +51,14 @@ pub async fn check_read_with_order<T: WalsOpener>(
msg: &str,
table_name: &str,
rows: &[RowTuple<'_>],
read_order: ReadOrder,
) {
for read_opts in table::read_opts_list() {
info!("{}, opts:{:?}", msg, read_opts);

let record_batches = test_ctx
.read_table(
table_name,
fixed_schema_table.new_read_all_request(read_opts, read_order),
fixed_schema_table.new_read_all_request(read_opts),
)
.await;

Expand All @@ -74,15 +73,7 @@ pub async fn check_read<T: WalsOpener>(
table_name: &str,
rows: &[RowTuple<'_>],
) {
check_read_with_order(
test_ctx,
fixed_schema_table,
msg,
table_name,
rows,
ReadOrder::None,
)
.await
check_read_with_order(test_ctx, fixed_schema_table, msg, table_name, rows).await
}

pub async fn check_get<T: WalsOpener>(
Expand Down
1 change: 0 additions & 1 deletion benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ fn mock_sst_read_options(
num_streams_to_prefetch: 0,
};
SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 500,
projected_schema,
Expand Down
1 change: 0 additions & 1 deletion benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl MergeSstBench {
num_streams_to_prefetch: 0,
};
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: config.num_rows_per_row_group,
projected_schema,
Expand Down
1 change: 0 additions & 1 deletion benchmarks/src/sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl SstBench {
num_streams_to_prefetch: 0,
};
let sst_read_options = SstReadOptions {
reverse: config.reverse,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: config.num_rows_per_row_group,
projected_schema,
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/src/sst_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
num_streams_to_prefetch: 2,
};
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Once,
num_rows_per_row_group: config.num_rows_per_row_group,
projected_schema,
Expand Down Expand Up @@ -220,7 +219,6 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {
let store_picker: ObjectStorePickerRef = Arc::new(store);
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Once,
num_rows_per_row_group: config.num_rows_per_row_group,
projected_schema: projected_schema.clone(),
Expand Down
1 change: 0 additions & 1 deletion benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub async fn load_sst_to_memtable(
num_streams_to_prefetch: 0,
};
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 8192,
projected_schema: ProjectedSchema::no_projection(schema.clone()),
Expand Down
1 change: 0 additions & 1 deletion components/parquet_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
pub mod meta_data;
pub mod prune;
pub mod reader;
pub mod reverse_reader;
#[cfg(test)]
pub mod tests;

Expand Down
Loading