Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(common): unify order-related types #8449

Merged
merged 13 commits into from
Mar 10, 2023
14 changes: 7 additions & 7 deletions src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
use risingwave_batch::executor::{BoxedExecutor, SortExecutor};
use risingwave_common::enable_jemalloc_on_linux;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use tokio::runtime::Runtime;
use utils::{create_input, execute_executor};

Expand All @@ -30,9 +30,9 @@ fn create_order_by_executor(
single_column: bool,
) -> BoxedExecutor {
const CHUNK_SIZE: usize = 1024;
let (child, order_pairs) = if single_column {
let (child, column_orders) = if single_column {
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
(input, vec![OrderPair::new(0, OrderType::Ascending)])
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
} else {
let input = create_input(
&[
Expand All @@ -47,16 +47,16 @@ fn create_order_by_executor(
(
input,
vec![
OrderPair::new(0, OrderType::Ascending),
OrderPair::new(1, OrderType::Descending),
OrderPair::new(2, OrderType::Ascending),
ColumnOrder::new(0, OrderType::ascending()),
ColumnOrder::new(1, OrderType::descending()),
ColumnOrder::new(2, OrderType::ascending()),
],
)
};

Box::new(SortExecutor::new(
child,
order_pairs,
column_orders,
"SortExecutor".into(),
CHUNK_SIZE,
))
Expand Down
14 changes: 7 additions & 7 deletions src/batch/benches/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
use risingwave_batch::executor::{BoxedExecutor, TopNExecutor};
use risingwave_common::enable_jemalloc_on_linux;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use tokio::runtime::Runtime;
use utils::{create_input, execute_executor};

Expand All @@ -32,9 +32,9 @@ fn create_top_n_executor(
limit: usize,
) -> BoxedExecutor {
const CHUNK_SIZE: usize = 1024;
let (child, order_pairs) = if single_column {
let (child, column_orders) = if single_column {
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
(input, vec![OrderPair::new(0, OrderType::Ascending)])
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
} else {
let input = create_input(
&[
Expand All @@ -49,16 +49,16 @@ fn create_top_n_executor(
(
input,
vec![
OrderPair::new(0, OrderType::Ascending),
OrderPair::new(1, OrderType::Descending),
OrderPair::new(2, OrderType::Ascending),
ColumnOrder::new(0, OrderType::ascending()),
ColumnOrder::new(1, OrderType::descending()),
ColumnOrder::new(2, OrderType::ascending()),
],
)
};

Box::new(TopNExecutor::new(
child,
order_pairs,
column_orders,
offset,
limit,
false,
Expand Down
36 changes: 18 additions & 18 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::encoding_for_comparison::encode_chunk;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderPair;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::top_n::{HeapElem, TopNHeap};
Expand All @@ -41,7 +41,7 @@ use crate::task::BatchTaskContext;
/// For each group, use a N-heap to store the smallest N rows.
pub struct GroupTopNExecutor<K: HashKey> {
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
group_key: Vec<usize>,
Expand All @@ -54,7 +54,7 @@ pub struct GroupTopNExecutor<K: HashKey> {

pub struct GroupTopNExecutorBuilder {
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
group_key: Vec<usize>,
Expand All @@ -70,7 +70,7 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder {
fn dispatch_impl<K: HashKey>(self) -> Self::Output {
Box::new(GroupTopNExecutor::<K>::new(
self.child,
self.order_pairs,
self.column_orders,
self.offset,
self.limit,
self.with_ties,
Expand Down Expand Up @@ -98,10 +98,10 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
NodeBody::GroupTopN
)?;

let order_pairs = top_n_node
let column_orders = top_n_node
.column_orders
.iter()
.map(OrderPair::from_protobuf)
.map(ColumnOrder::from_protobuf)
.collect();

let group_key = top_n_node
Expand All @@ -117,7 +117,7 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {

let builder = Self {
child,
order_pairs,
column_orders,
offset: top_n_node.get_offset() as usize,
limit: top_n_node.get_limit() as usize,
group_key,
Expand All @@ -135,7 +135,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[expect(clippy::too_many_arguments)]
pub fn new(
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
with_ties: bool,
Expand All @@ -146,7 +146,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
let schema = child.schema().clone();
Self {
child,
order_pairs,
column_orders,
offset,
limit,
with_ties,
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
let chunk = Arc::new(chunk?.compact());
let keys = K::build(self.group_key.as_slice(), &chunk)?;

for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.order_pairs)
for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)
.into_iter()
.zip_eq_fast(keys.into_iter())
.enumerate()
Expand Down Expand Up @@ -256,19 +256,19 @@ mod tests {
5 2 2
",
));
let order_pairs = vec![
OrderPair {
column_idx: 1,
order_type: OrderType::Ascending,
let column_orders = vec![
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
];
let top_n_executor = (GroupTopNExecutorBuilder {
child: Box::new(mock_executor),
order_pairs,
column_orders,
offset: 1,
limit: 3,
with_ties: false,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
let order_types: Vec<OrderType> = table_desc
.pk
.iter()
.map(|order| OrderType::from_protobuf(&order.get_order_type().unwrap().direction()))
.map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
.collect();

let pk_indices = table_desc
Expand Down
18 changes: 9 additions & 9 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod tests {
use risingwave_common::hash::HashKeyDispatcher;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::expr::{
new_binary_expr, BoxedExpression, InputRefExpression, LiteralExpression,
};
Expand Down Expand Up @@ -557,20 +557,20 @@ mod tests {
}

fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
let order_pairs = vec![
OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
let column_orders = vec![
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
OrderPair {
column_idx: 1,
order_type: OrderType::Ascending,
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
];

Box::new(SortExecutor::new(
child,
order_pairs,
column_orders,
"SortExecutor".into(),
CHUNK_SIZE,
))
Expand Down
22 changes: 11 additions & 11 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::sort_util::{HeapElem, OrderPair};
use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;

Expand All @@ -39,7 +39,7 @@ pub struct MergeSortExchangeExecutorImpl<CS, C> {
context: C,
/// keeps one data chunk of each source if any
source_inputs: Vec<Option<DataChunk>>,
order_pairs: Arc<Vec<OrderPair>>,
column_orders: Arc<Vec<ColumnOrder>>,
min_heap: BinaryHeap<HeapElem>,
proto_sources: Vec<ProstExchangeSource>,
sources: Vec<ExchangeSourceImpl>, // impl
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
assert!(source_idx < self.source_inputs.len());
let chunk_ref = self.source_inputs[source_idx].as_ref().unwrap();
self.min_heap.push(HeapElem {
order_pairs: self.order_pairs.clone(),
column_orders: self.column_orders.clone(),
chunk: chunk_ref.clone(),
chunk_idx: source_idx,
elem_idx: row_idx,
Expand Down Expand Up @@ -191,12 +191,12 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
NodeBody::MergeSortExchange
)?;

let order_pairs = sort_merge_node
let column_orders = sort_merge_node
.column_orders
.iter()
.map(OrderPair::from_protobuf)
.map(ColumnOrder::from_protobuf)
.collect();
let order_pairs = Arc::new(order_pairs);
let column_orders = Arc::new(column_orders);

let exchange_node = sort_merge_node.get_exchange()?;
let proto_sources: Vec<ProstExchangeSource> = exchange_node.get_sources().to_vec();
Expand All @@ -213,7 +213,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
Ok(Box::new(MergeSortExchangeExecutor::<C> {
context: source.context().clone(),
source_inputs: vec![None; num_sources],
order_pairs,
column_orders,
min_heap: BinaryHeap::new(),
proto_sources,
sources: vec![],
Expand Down Expand Up @@ -260,9 +260,9 @@ mod tests {
proto_sources.push(ProstExchangeSource::default());
source_creators.push(fake_create_source.clone());
}
let order_pairs = Arc::new(vec![OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
let column_orders = Arc::new(vec![ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
}]);

let executor = Box::new(MergeSortExchangeExecutorImpl::<
Expand All @@ -271,7 +271,7 @@ mod tests {
> {
context: ComputeNodeContext::for_test(),
source_inputs: vec![None; proto_sources.len()],
order_pairs,
column_orders,
min_heap: BinaryHeap::new(),
proto_sources,
sources: vec![],
Expand Down
Loading