Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch): simplify batch executor builder #19731

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ clap = { version = "4", features = ["cargo", "derive", "env"] }
deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] }
itertools = "0.13.0"
jsonbb = "0.1.4"
linkme = { version = "0.3", features = ["used_linker"] }
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
parquet = { version = "53.2", features = ["async"] }
mysql_async = { version = "0.34", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/execution/local_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct LocalExchangeSource {
impl LocalExchangeSource {
pub fn create(
output_id: TaskOutputId,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
task_id: TaskId,
) -> Result<Self> {
let task_output = context.get_task_output(output_id)?;
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// [`DeleteExecutor`] implements table deletion with values from its child executor.
// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
Expand Down Expand Up @@ -164,10 +163,9 @@ impl DeleteExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for DeleteExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use crate::error::{BatchError, Result};
use crate::task::BatchTaskContext;

pub struct ExpandExecutor {
column_subsets: Vec<Vec<usize>>,
Expand Down Expand Up @@ -90,10 +89,9 @@ impl ExpandExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for ExpandExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let expand_node = try_match_expand!(
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct FilterExecutor {
expr: BoxedExpression,
Expand Down Expand Up @@ -76,10 +75,9 @@ impl FilterExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for FilterExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [input]: [_; 1] = inputs.try_into().unwrap();
Expand Down
56 changes: 25 additions & 31 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
Expand All @@ -33,16 +34,16 @@ use crate::execution::local_exchange::LocalExchangeSource;
use crate::executor::ExecutorBuilder;
use crate::task::{BatchTaskContext, TaskId};

pub type ExchangeExecutor<C> = GenericExchangeExecutor<DefaultCreateSource, C>;
pub type ExchangeExecutor = GenericExchangeExecutor<DefaultCreateSource>;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};
use crate::monitor::BatchMetrics;

pub struct GenericExchangeExecutor<CS, C> {
pub struct GenericExchangeExecutor<CS> {
proto_sources: Vec<PbExchangeSource>,
/// Mock-able `CreateSource`.
source_creators: Vec<CS>,
sequential: bool,
context: C,
context: Arc<dyn BatchTaskContext>,

schema: Schema,
#[expect(dead_code)]
Expand All @@ -59,7 +60,7 @@ pub struct GenericExchangeExecutor<CS, C> {
pub trait CreateSource: Send {
async fn create_source(
&self,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl>;
}
Expand All @@ -79,7 +80,7 @@ impl DefaultCreateSource {
impl CreateSource for DefaultCreateSource {
async fn create_source(
&self,
context: impl BatchTaskContext,
context: &dyn BatchTaskContext,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl> {
let peer_addr = prost_source.get_host()?.into();
Expand Down Expand Up @@ -146,10 +147,9 @@ impl CreateSource for DefaultCreateSource {

pub struct GenericExchangeExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
Expand All @@ -170,7 +170,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {

let input_schema: Vec<NodeField> = node.get_input_schema().to_vec();
let fields = input_schema.iter().map(Field::from).collect::<Vec<Field>>();
Ok(Box::new(ExchangeExecutor::<C> {
Ok(Box::new(ExchangeExecutor {
proto_sources,
source_creators,
sequential,
Expand All @@ -183,9 +183,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
}
}

impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
for GenericExchangeExecutor<CS, C>
{
impl<CS: 'static + Send + CreateSource> Executor for GenericExchangeExecutor<CS> {
fn schema(&self) -> &Schema {
&self.schema
}
Expand All @@ -199,7 +197,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
}
}

impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExecutor<CS, C> {
impl<CS: 'static + Send + CreateSource> GenericExchangeExecutor<CS> {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
let streams = self
Expand All @@ -210,7 +208,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
Self::data_chunk_stream(
prost_source,
source_creator,
self.context.clone(),
&*self.context,
self.metrics.clone(),
)
});
Expand All @@ -235,12 +233,10 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
async fn data_chunk_stream(
prost_source: PbExchangeSource,
source_creator: CS,
context: C,
context: &dyn BatchTaskContext,
metrics: Option<BatchMetrics>,
) {
let mut source = source_creator
.create_source(context.clone(), &prost_source)
.await?;
let mut source = source_creator.create_source(context, &prost_source).await?;
// create the collector
let counter = metrics
.as_ref()
Expand Down Expand Up @@ -290,20 +286,18 @@ mod tests {
source_creators.push(fake_create_source);
}

let executor = Box::new(
GenericExchangeExecutor::<FakeCreateSource, ComputeNodeContext> {
metrics: None,
proto_sources,
source_creators,
sequential: false,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
},
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
let executor = Box::new(GenericExchangeExecutor::<FakeCreateSource> {
metrics: None,
proto_sources,
source_creators,
sequential: false,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
},
);
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
});

let mut stream = executor.execute();
let mut chunks: Vec<DataChunk> = vec![];
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// Group Top-N Executor
///
Expand Down Expand Up @@ -90,10 +89,9 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
use crate::task::{ShutdownToken, TaskId};

type AggHashMap<K, A> = hashbrown::HashMap<K, Vec<AggregateState>, PrecomputedBuildHasher, A>;

Expand Down Expand Up @@ -149,10 +149,9 @@ impl HashAggExecutorBuilder {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HashAggExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct HopWindowExecutor {
child: BoxedExecutor,
identity: String,
Expand All @@ -39,10 +39,9 @@ pub struct HopWindowExecutor {
output_indices: Vec<usize>,
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HopWindowExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::monitor::BatchMetrics;
use crate::task::BatchTaskContext;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;
Expand Down Expand Up @@ -225,10 +224,9 @@ impl IcebergScanExecutor {

pub struct IcebergScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
ensure!(
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// [`InsertExecutor`] implements table insertion with values from its child executor.
pub struct InsertExecutor {
Expand Down Expand Up @@ -208,10 +207,9 @@ impl InsertExecutor {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for InsertExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::{
unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder,
BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
};
use crate::task::{BatchTaskContext, ShutdownToken};
use crate::task::ShutdownToken;

/// Distributed Lookup Join Executor.
/// High level Execution flow:
Expand Down Expand Up @@ -81,10 +81,9 @@ impl<K> DistributedLookupJoinExecutor<K> {

pub struct DistributedLookupJoinExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};
use crate::task::ShutdownToken;

/// Hash Join Executor
///
Expand Down Expand Up @@ -2144,10 +2144,9 @@ impl DataChunkMutator {
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for HashJoinExecutor<()> {
async fn new_boxed_executor<C: BatchTaskContext>(
context: &ExecutorBuilder<'_, C>,
async fn new_boxed_executor(
context: &ExecutorBuilder<'_>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
Expand Down
Loading
Loading