Skip to content

Commit c96ca8d

Browse files
authored
feat: support aggr push down in distributed query (#1232)
## Rationale Part of #1108 New distributed query framework have been impl, we support aggregate push down in this pr. ## Detailed Changes + push down the aggregate node when resolving partitioned scan. + support to switch new/old distributed query through http. ## Test Plan Test by exist tests.
1 parent 3923d47 commit c96ca8d

32 files changed

+863
-181
lines changed

df_engine_extensions/src/dist_sql_query/physical_plan.rs

+151-25
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ use datafusion::{
2828
execution::TaskContext,
2929
physical_expr::PhysicalSortExpr,
3030
physical_plan::{
31+
aggregates::{AggregateExec, AggregateMode},
32+
coalesce_batches::CoalesceBatchesExec,
33+
coalesce_partitions::CoalescePartitionsExec,
34+
displayable,
35+
filter::FilterExec,
36+
projection::ProjectionExec,
37+
repartition::RepartitionExec,
3138
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
3239
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
3340
},
@@ -111,33 +118,94 @@ impl DisplayAs for UnresolvedPartitionedScan {
111118
/// related nodes to execute.
112119
#[derive(Debug)]
113120
pub struct ResolvedPartitionedScan {
114-
pub remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
115-
pub remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
121+
pub remote_exec_ctx: Arc<RemoteExecContext>,
122+
pub pushdown_continue: bool,
116123
}
117124

118125
impl ResolvedPartitionedScan {
119-
pub fn extend_remote_exec_plans(
126+
pub fn new(
127+
remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
128+
remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
129+
) -> Self {
130+
let remote_exec_ctx = Arc::new(RemoteExecContext {
131+
executor: remote_executor,
132+
plans: remote_exec_plans,
133+
});
134+
135+
Self {
136+
remote_exec_ctx,
137+
pushdown_continue: true,
138+
}
139+
}
140+
141+
pub fn pushdown_finished(&self) -> Arc<dyn ExecutionPlan> {
142+
Arc::new(Self {
143+
remote_exec_ctx: self.remote_exec_ctx.clone(),
144+
pushdown_continue: false,
145+
})
146+
}
147+
148+
pub fn try_to_push_down_more(
120149
&self,
121-
extended_node: Arc<dyn ExecutionPlan>,
122-
) -> DfResult<Arc<ResolvedPartitionedScan>> {
150+
cur_node: Arc<dyn ExecutionPlan>,
151+
) -> DfResult<Arc<dyn ExecutionPlan>> {
152+
// Can not push more...
153+
if !self.pushdown_continue {
154+
return cur_node.with_new_children(vec![self.pushdown_finished()]);
155+
}
156+
157+
// Push down more, and when occur the terminated push down able node, we need to
158+
// set `can_push_down_more` false.
159+
let pushdown_status = Self::maybe_a_pushdown_node(cur_node.clone());
160+
let (node, can_push_down_more) = match pushdown_status {
161+
PushDownStatus::Continue(node) => (node, true),
162+
PushDownStatus::Terminated(node) => (node, false),
163+
PushDownStatus::Unable => {
164+
let partitioned_scan = self.pushdown_finished();
165+
return cur_node.with_new_children(vec![partitioned_scan]);
166+
}
167+
};
168+
123169
let new_plans = self
124-
.remote_exec_plans
170+
.remote_exec_ctx
171+
.plans
125172
.iter()
126173
.map(|(table, plan)| {
127-
extended_node
128-
.clone()
174+
node.clone()
129175
.with_new_children(vec![plan.clone()])
130176
.map(|extended_plan| (table.clone(), extended_plan))
131177
})
132178
.collect::<DfResult<Vec<_>>>()?;
133179

180+
let remote_exec_ctx = Arc::new(RemoteExecContext {
181+
executor: self.remote_exec_ctx.executor.clone(),
182+
plans: new_plans,
183+
});
134184
let plan = ResolvedPartitionedScan {
135-
remote_executor: self.remote_executor.clone(),
136-
remote_exec_plans: new_plans,
185+
remote_exec_ctx,
186+
pushdown_continue: can_push_down_more,
137187
};
138188

139189
Ok(Arc::new(plan))
140190
}
191+
192+
#[inline]
193+
pub fn maybe_a_pushdown_node(plan: Arc<dyn ExecutionPlan>) -> PushDownStatus {
194+
PushDownStatus::new(plan)
195+
}
196+
197+
/// `ResolvedPartitionedScan` can be executable after satisfying followings:
198+
/// + The pushdown searching process is finished.
199+
#[inline]
200+
fn is_executable(&self) -> bool {
201+
!self.pushdown_continue
202+
}
203+
}
204+
205+
#[derive(Debug)]
206+
pub struct RemoteExecContext {
207+
executor: Arc<dyn RemotePhysicalPlanExecutor>,
208+
plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
141209
}
142210

143211
impl ExecutionPlan for ResolvedPartitionedScan {
@@ -146,31 +214,36 @@ impl ExecutionPlan for ResolvedPartitionedScan {
146214
}
147215

148216
fn schema(&self) -> ArrowSchemaRef {
149-
self.remote_exec_plans
217+
self.remote_exec_ctx
218+
.plans
150219
.first()
151220
.expect("remote_exec_plans should not be empty")
152221
.1
153222
.schema()
154223
}
155224

156225
fn output_partitioning(&self) -> Partitioning {
157-
Partitioning::UnknownPartitioning(self.remote_exec_plans.len())
226+
Partitioning::UnknownPartitioning(self.remote_exec_ctx.plans.len())
158227
}
159228

160229
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
161230
None
162231
}
163232

164233
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
165-
vec![]
234+
self.remote_exec_ctx
235+
.plans
236+
.iter()
237+
.map(|(_, plan)| plan.clone())
238+
.collect()
166239
}
167240

168241
fn with_new_children(
169242
self: Arc<Self>,
170243
_children: Vec<Arc<dyn ExecutionPlan>>,
171244
) -> DfResult<Arc<dyn ExecutionPlan>> {
172245
Err(DataFusionError::Internal(
173-
"UnresolvedPartitionedScan should not have children".to_string(),
246+
"UnresolvedPartitionedScan can't be built directly from new children".to_string(),
174247
))
175248
}
176249

@@ -179,11 +252,19 @@ impl ExecutionPlan for ResolvedPartitionedScan {
179252
partition: usize,
180253
context: Arc<TaskContext>,
181254
) -> DfResult<DfSendableRecordBatchStream> {
182-
let (sub_table, plan) = &self.remote_exec_plans[partition];
255+
if !self.is_executable() {
256+
return Err(DataFusionError::Internal(format!(
257+
"partitioned scan is still inexecutable, plan:{}",
258+
displayable(self).indent(true)
259+
)));
260+
}
261+
262+
let (sub_table, plan) = &self.remote_exec_ctx.plans[partition];
183263

184264
// Send plan for remote execution.
185265
let stream_future =
186-
self.remote_executor
266+
self.remote_exec_ctx
267+
.executor
187268
.execute(sub_table.clone(), &context, plan.clone())?;
188269
let record_stream = PartitionedScanStream::new(stream_future, plan.schema());
189270

@@ -280,15 +361,18 @@ pub(crate) enum StreamState {
280361
Polling(DfSendableRecordBatchStream),
281362
}
282363

283-
// TODO: make display for the plan more pretty.
284364
impl DisplayAs for ResolvedPartitionedScan {
285-
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
286-
write!(
287-
f,
288-
"ResolvedPartitionedScan: remote_exec_plans:{:?}, partition_count={}",
289-
self.remote_exec_plans,
290-
self.output_partitioning().partition_count(),
291-
)
365+
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
366+
match t {
367+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
368+
write!(
369+
f,
370+
"ResolvedPartitionedScan: pushing_down:{}, partition_count:{}",
371+
self.pushdown_continue,
372+
self.remote_exec_ctx.plans.len()
373+
)
374+
}
375+
}
292376
}
293377
}
294378

@@ -352,7 +436,7 @@ impl DisplayAs for UnresolvedSubTableScan {
352436
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
353437
write!(
354438
f,
355-
"UnresolvedSubTableScan: table={:?}, read_request:{:?}, partition_count={}",
439+
"UnresolvedSubTableScan: table:{:?}, request:{:?}, partition_count:{}",
356440
self.table,
357441
self.read_request,
358442
self.output_partitioning().partition_count(),
@@ -406,6 +490,48 @@ impl TryFrom<UnresolvedSubTableScan> for ceresdbproto::remote_engine::Unresolved
406490
}
407491
}
408492

493+
/// Pushdown status, including:
494+
/// + Unable, plan node which can't be pushed down to
495+
/// `ResolvedPartitionedScan` node.
496+
/// + Continue, node able to be pushed down to `ResolvedPartitionedScan`, and
497+
/// the newly generated `ResolvedPartitionedScan` can continue to accept
498+
/// more pushdown nodes after.
499+
/// + Terminated, node able to be pushed down to `ResolvedPartitionedScan`,
500+
/// but the newly generated `ResolvedPartitionedScan` can't accept more
501+
/// pushdown nodes after.
502+
pub enum PushDownStatus {
503+
Unable,
504+
Continue(Arc<dyn ExecutionPlan>),
505+
Terminated(Arc<dyn ExecutionPlan>),
506+
}
507+
508+
impl PushDownStatus {
509+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
510+
if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
511+
if *aggr.mode() == AggregateMode::Partial {
512+
Self::Terminated(plan)
513+
} else {
514+
Self::Unable
515+
}
516+
} else if plan.as_any().downcast_ref::<FilterExec>().is_some()
517+
|| plan.as_any().downcast_ref::<ProjectionExec>().is_some()
518+
|| plan.as_any().downcast_ref::<RepartitionExec>().is_some()
519+
|| plan
520+
.as_any()
521+
.downcast_ref::<CoalescePartitionsExec>()
522+
.is_some()
523+
|| plan
524+
.as_any()
525+
.downcast_ref::<CoalesceBatchesExec>()
526+
.is_some()
527+
{
528+
Self::Continue(plan)
529+
} else {
530+
Self::Unable
531+
}
532+
}
533+
}
534+
409535
#[cfg(test)]
410536
mod test {
411537
use datafusion::error::DataFusionError;

0 commit comments

Comments
 (0)