Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support aggr push down in distributed query #1232

Merged
merged 13 commits into from
Sep 30, 2023
Merged
176 changes: 151 additions & 25 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ use datafusion::{
execution::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
aggregates::{AggregateExec, AggregateMode},
coalesce_batches::CoalesceBatchesExec,
coalesce_partitions::CoalescePartitionsExec,
displayable,
filter::FilterExec,
projection::ProjectionExec,
repartition::RepartitionExec,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
Expand Down Expand Up @@ -111,33 +118,94 @@ impl DisplayAs for UnresolvedPartitionedScan {
/// related nodes to execute.
#[derive(Debug)]
pub struct ResolvedPartitionedScan {
pub remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
pub remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
pub remote_exec_ctx: Arc<RemoteExecContext>,
pub pushdown_continue: bool,
}

impl ResolvedPartitionedScan {
pub fn extend_remote_exec_plans(
pub fn new(
remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
) -> Self {
let remote_exec_ctx = Arc::new(RemoteExecContext {
executor: remote_executor,
plans: remote_exec_plans,
});

Self {
remote_exec_ctx,
pushdown_continue: true,
}
}

pub fn pushdown_finished(&self) -> Arc<dyn ExecutionPlan> {
Arc::new(Self {
remote_exec_ctx: self.remote_exec_ctx.clone(),
pushdown_continue: false,
})
}

pub fn try_to_push_down_more(
&self,
extended_node: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<ResolvedPartitionedScan>> {
cur_node: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
// Can not push more...
if !self.pushdown_continue {
return cur_node.with_new_children(vec![self.pushdown_finished()]);
}

// Push down more, and when occur the terminated push down able node, we need to
// set `can_push_down_more` false.
let pushdown_status = Self::maybe_a_pushdown_node(cur_node.clone());
let (node, can_push_down_more) = match pushdown_status {
PushDownStatus::Continue(node) => (node, true),
PushDownStatus::Terminated(node) => (node, false),
PushDownStatus::Unable => {
let partitioned_scan = self.pushdown_finished();
return cur_node.with_new_children(vec![partitioned_scan]);
}
};

let new_plans = self
.remote_exec_plans
.remote_exec_ctx
.plans
.iter()
.map(|(table, plan)| {
extended_node
.clone()
node.clone()
.with_new_children(vec![plan.clone()])
.map(|extended_plan| (table.clone(), extended_plan))
})
.collect::<DfResult<Vec<_>>>()?;

let remote_exec_ctx = Arc::new(RemoteExecContext {
executor: self.remote_exec_ctx.executor.clone(),
plans: new_plans,
});
let plan = ResolvedPartitionedScan {
remote_executor: self.remote_executor.clone(),
remote_exec_plans: new_plans,
remote_exec_ctx,
pushdown_continue: can_push_down_more,
};

Ok(Arc::new(plan))
}

#[inline]
pub fn maybe_a_pushdown_node(plan: Arc<dyn ExecutionPlan>) -> PushDownStatus {
PushDownStatus::new(plan)
}

/// `ResolvedPartitionedScan` can be executable after satisfying followings:
/// + The pushdown searching process is finished.
#[inline]
fn is_executable(&self) -> bool {
!self.pushdown_continue
}
}

#[derive(Debug)]
pub struct RemoteExecContext {
executor: Arc<dyn RemotePhysicalPlanExecutor>,
plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
}

impl ExecutionPlan for ResolvedPartitionedScan {
Expand All @@ -146,31 +214,36 @@ impl ExecutionPlan for ResolvedPartitionedScan {
}

fn schema(&self) -> ArrowSchemaRef {
self.remote_exec_plans
self.remote_exec_ctx
.plans
.first()
.expect("remote_exec_plans should not be empty")
.1
.schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.remote_exec_plans.len())
Partitioning::UnknownPartitioning(self.remote_exec_ctx.plans.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
self.remote_exec_ctx
.plans
.iter()
.map(|(_, plan)| plan.clone())
.collect()
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(
"UnresolvedPartitionedScan should not have children".to_string(),
"UnresolvedPartitionedScan can't be built directly from new children".to_string(),
))
}

Expand All @@ -179,11 +252,19 @@ impl ExecutionPlan for ResolvedPartitionedScan {
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let (sub_table, plan) = &self.remote_exec_plans[partition];
if !self.is_executable() {
return Err(DataFusionError::Internal(format!(
"partitioned scan is still inexecutable, plan:{}",
displayable(self).indent(true)
)));
}

let (sub_table, plan) = &self.remote_exec_ctx.plans[partition];

// Send plan for remote execution.
let stream_future =
self.remote_executor
self.remote_exec_ctx
.executor
.execute(sub_table.clone(), &context, plan.clone())?;
let record_stream = PartitionedScanStream::new(stream_future, plan.schema());

Expand Down Expand Up @@ -280,15 +361,18 @@ pub(crate) enum StreamState {
Polling(DfSendableRecordBatchStream),
}

// TODO: make display for the plan more pretty.
impl DisplayAs for ResolvedPartitionedScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"ResolvedPartitionedScan: remote_exec_plans:{:?}, partition_count={}",
self.remote_exec_plans,
self.output_partitioning().partition_count(),
)
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"ResolvedPartitionedScan: pushing_down:{}, partition_count:{}",
self.pushdown_continue,
self.remote_exec_ctx.plans.len()
)
}
}
}
}

Expand Down Expand Up @@ -352,7 +436,7 @@ impl DisplayAs for UnresolvedSubTableScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"UnresolvedSubTableScan: table={:?}, read_request:{:?}, partition_count={}",
"UnresolvedSubTableScan: table:{:?}, request:{:?}, partition_count:{}",
self.table,
self.read_request,
self.output_partitioning().partition_count(),
Expand Down Expand Up @@ -406,6 +490,48 @@ impl TryFrom<UnresolvedSubTableScan> for ceresdbproto::remote_engine::Unresolved
}
}

/// Pushdown status, including:
/// + Unable, plan node which can't be pushed down to
/// `ResolvedPartitionedScan` node.
/// + Continue, node able to be pushed down to `ResolvedPartitionedScan`, and
/// the newly generated `ResolvedPartitionedScan` can continue to accept
/// more pushdown nodes after.
/// + Terminated, node able to be pushed down to `ResolvedPartitionedScan`,
/// but the newly generated `ResolvedPartitionedScan` can't accept more
/// pushdown nodes after.
pub enum PushDownStatus {
Unable,
Continue(Arc<dyn ExecutionPlan>),
Terminated(Arc<dyn ExecutionPlan>),
}

impl PushDownStatus {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
if *aggr.mode() == AggregateMode::Partial {
Self::Terminated(plan)
} else {
Self::Unable
}
} else if plan.as_any().downcast_ref::<FilterExec>().is_some()
|| plan.as_any().downcast_ref::<ProjectionExec>().is_some()
|| plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
|| plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some()
{
Self::Continue(plan)
} else {
Self::Unable
}
}
}

#[cfg(test)]
mod test {
use datafusion::error::DataFusionError;
Expand Down
Loading