Skip to content

Commit

Permalink
fix: add yield point to RepartitionExec (#5299)
Browse files Browse the repository at this point in the history
* fix: add yield point to `RepartitionExec`

This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes #5278.

* refactor: use a single `UnboundedExec` for testing

* refactor: rename test
  • Loading branch information
crepererum authored Feb 17, 2023
1 parent 22b974f commit ded897e
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 78 deletions.
23 changes: 18 additions & 5 deletions datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,11 @@ mod hash_join_tests {
use crate::physical_optimizer::join_selection::swap_join_type;
use crate::physical_optimizer::test_utils::SourceType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::PartitionMode;
use crate::physical_plan::projection::ProjectionExec;
use crate::{physical_plan::joins::PartitionMode, test::exec::UnboundedExec};
use crate::test_util::UnboundedExec;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

struct TestCase {
Expand Down Expand Up @@ -529,17 +531,28 @@ mod hash_join_tests {
}
Ok(())
}

#[allow(clippy::vtable_address_comparisons)]
async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded;
let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded;
let left_exec = Arc::new(UnboundedExec::new(
left_unbounded,
Schema::new(vec![Field::new("a", DataType::Int32, false)]),
(!left_unbounded).then_some(1),
RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Int32,
false,
)]))),
2,
)) as Arc<dyn ExecutionPlan>;
let right_exec = Arc::new(UnboundedExec::new(
right_unbounded,
Schema::new(vec![Field::new("b", DataType::Int32, false)]),
(!right_unbounded).then_some(1),
RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
"b",
DataType::Int32,
false,
)]))),
2,
)) as Arc<dyn ExecutionPlan>;

let join = HashJoinExec::try_new(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,10 @@ impl RepartitionExec {
}
timer.done();
}

// If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
// See https://github.com/apache/arrow-datafusion/issues/5278.
tokio::task::yield_now().await;
}

Ok(())
Expand Down
70 changes: 0 additions & 70 deletions datafusion/core/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,76 +507,6 @@ impl ExecutionPlan for StatisticsExec {
}
}

/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct UnboundedExec {
unbounded: bool,
schema: Arc<Schema>,
}
impl UnboundedExec {
pub fn new(unbounded: bool, schema: Schema) -> Self {
Self {
unbounded,
schema: Arc::new(schema),
}
}
}
impl ExecutionPlan for UnboundedExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(2)
}

fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(self.unbounded)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("This plan only serves for testing statistics")
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "UnboundableExec: unbounded={}", self.unbounded,)
}
}
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

/// Execution plan that emits streams that block forever.
///
/// This is useful to test shutdown / cancelation behavior of certain execution plans.
Expand Down
132 changes: 129 additions & 3 deletions datafusion/core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, error::Error, path::PathBuf, sync::Arc};

use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
use crate::execution::context::SessionState;
use crate::error::Result;
use crate::execution::context::{SessionState, TaskContext};
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down Expand Up @@ -332,6 +341,123 @@ impl TableProvider for TestTableProvider {
}
}

/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct UnboundedExec {
batch_produce: Option<usize>,
batch: RecordBatch,
partitions: usize,
}
impl UnboundedExec {
/// Create new exec that clones the given record batch to its output.
///
/// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition.
pub fn new(
batch_produce: Option<usize>,
batch: RecordBatch,
partitions: usize,
) -> Self {
Self {
batch_produce,
batch,
partitions,
}
}
}
impl ExecutionPlan for UnboundedExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.batch.schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions)
}

fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(self.batch_produce.is_none())
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(UnboundedStream {
batch_produce: self.batch_produce,
count: 0,
batch: self.batch.clone(),
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"UnboundableExec: unbounded={}",
self.batch_produce.is_none(),
)
}
}
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

#[derive(Debug)]
struct UnboundedStream {
batch_produce: Option<usize>,
count: usize,
batch: RecordBatch,
}

impl Stream for UnboundedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(val) = self.batch_produce {
if val <= self.count {
return Poll::Ready(None);
}
}
self.count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}

impl RecordBatchStream for UnboundedStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
60 changes: 60 additions & 0 deletions datafusion/core/tests/repartition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::UnboundedExec;
use datafusion_common::from_slice::FromSlice;
use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use futures::StreamExt;
use std::sync::Arc;

/// See <https://github.com/apache/arrow-datafusion/issues/5278>
#[tokio::test]
async fn unbounded_repartition() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
let task = ctx.task_ctx();
let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, false)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(UInt32Array::from_slice([1]))],
)?;
let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1));
let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?);
let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
let mut stream = plan.execute(0, task)?;

// Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a
// CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not
// trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably).
let batch_actual = stream
.next()
.await
.expect("not terminated")
.expect("no error in stream");
assert_eq!(batch_actual, batch);
Ok(())
}

0 comments on commit ded897e

Please sign in to comment.