Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strange Behaviour on RepartitionExec with CoalescePartitionsExec. #5278

Closed
metesynnada opened this issue Feb 14, 2023 · 9 comments · Fixed by #5299
Closed

Strange Behaviour on RepartitionExec with CoalescePartitionsExec. #5278

metesynnada opened this issue Feb 14, 2023 · 9 comments · Fixed by #5299
Labels
bug Something isn't working

Comments

@metesynnada
Copy link
Contributor

metesynnada commented Feb 14, 2023

Describe the bug
I am working with

=== Physical plan ===
CoalescePartitionsExec
  RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
    UnboundableExec: unbounded=true

plan. Using the physical plan with CoalescePartitionsExec and RepartitionExec causes strange behavior when providing a stream with only one unique value.

The strange behavior:

  • The issue with the CoalescePartitionsExec and RepartitionExec physical plan is that when a stream with only one unique value is provided, no data is read from the RepartitionExec until the stream is exhausted. Even calling wake_receivers() does not wake up the DistributionReceiver . This behavior is not observed without CoalescePartitionsExec.

  • If I use more unique values, there is no blocking problem. This problem occurs if we put 1 unique value.

  • Plans with blocking repartition:

    === Physical plan ===
    CoalescePartitionsExec
      ProjectionExec: expr=[a2@0 as a5]
        RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
          UnboundableExec: unbounded=false
    
    === Physical plan ===
    CoalescePartitionsExec
      RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
        UnboundableExec: unbounded=false
    
  • Plan without blocking (plan.execute(2, task)), and this can change according to hash value.) :

    === Physical plan ===
    RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
      UnboundableExec: unbounded=false
    

To Reproduce

  • Create a file datafusion/core/tests/repartition_exec_blocks.rs
  • Put this code
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{
    displayable, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
    SendableRecordBatchStream,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct MyUnboundedExec {
    batch_produce: Option<usize>,
    schema: Arc<Schema>,
    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
    refs: Arc<()>,
}
impl MyUnboundedExec {
    pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
        Self {
            batch_produce,
            schema: Arc::new(schema),
            refs: Default::default(),
        }
    }
}
impl ExecutionPlan for MyUnboundedExec {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        Arc::clone(&self.schema)
    }

    fn output_partitioning(&self) -> Partitioning {
        Partitioning::UnknownPartitioning(1)
    }

    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
        None
    }

    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(self)
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        Ok(Box::pin(UnboundedStream {
            batch_produce: self.batch_produce,
            count: 0,
            schema: Arc::clone(&self.schema),
            _refs: Arc::clone(&self.refs),
        }))
    }

    fn fmt_as(
        &self,
        t: DisplayFormatType,
        f: &mut std::fmt::Formatter,
    ) -> std::fmt::Result {
        match t {
            DisplayFormatType::Default => {
                write!(
                    f,
                    "UnboundableExec: unbounded={}",
                    self.batch_produce.is_none(),
                )
            }
        }
    }

    fn statistics(&self) -> Statistics {
        Statistics::default()
    }
}

#[derive(Debug)]
pub struct UnboundedStream {
    batch_produce: Option<usize>,
    count: usize,
    /// Schema mocked by this stream.
    schema: SchemaRef,

    /// Ref-counting helper to check if the stream are still in memory.
    _refs: Arc<()>,
}

impl Stream for UnboundedStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if let Some(val) = self.batch_produce {
            if val <= self.count {
                println!("Stream Finished");
                return Poll::Ready(None);
            }
        }
        let batch = RecordBatch::try_new(
            self.schema.clone(),
            vec![Arc::new(UInt32Array::from_slice([1]))],
        )?;
        self.count += 1;
        std::thread::sleep(std::time::Duration::from_millis(100));
        Poll::Ready(Some(Ok(batch)))
    }
}

impl RecordBatchStream for UnboundedStream {
    fn schema(&self) -> SchemaRef {
        Arc::clone(&self.schema)
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn unbounded_repartition_sa() -> Result<()> {
    let config = SessionConfig::new();
    let ctx = SessionContext::with_config(config);
    let task = ctx.task_ctx();
    let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]);
    let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); // If you put None, it will be a unbounded source.
    let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
    let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?);
    let plan = Arc::new(ProjectionExec::try_new(
        vec![(col("a2", &schema)?, "a5".to_string())],
        plan.clone(),
    )?);
    let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
    println!(
        "=== Physical plan ===\n{}\n",
        displayable(plan.as_ref()).indent()
    );
    let mut stream = plan.execute(0, task)?;
    while let Some(result) = stream.next().await {
        print_batches(&[result?.clone()])?;
    }
    Ok(())
}

Expected behavior

//! There are `N` virtual MPSC (multi-producer, single consumer) channels with unbounded capacity. However, if all
//! buffers/channels are non-empty, than a global gate will be closed preventing new data from being written (the
//! sender futures will be [pending](Poll::Pending)) until at least one channel is empty (and not closed).
  • Since it does not block the senders, I would expect that the waker on the receiver should wake up after wake_receivers() call.

Additional context
Add any other context about the problem here.

cc @crepererum @alamb @tustvold

@metesynnada metesynnada added the bug Something isn't working label Feb 14, 2023
@crepererum
Copy link
Contributor

crepererum commented Feb 14, 2023

Having a quick look an the plan: the repartition will partition into a single partition (because you only have a single unique key) which is likely not the first partition. The stream for the first partition will only advance when RepartitionExec either gets an element that is hashed into it (never in your case) or when the input terminates. However I don't understand why this is an issue here, because CoalescePartitionsExec polls all streams/partitions in parallel. So there's indeed some bug here.

Edit 1:
Could be some async waker issue. I'll check.

@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 14, 2023

I thought so. If you need assistance, I could provide it.

@alamb
Copy link
Contributor

alamb commented Feb 15, 2023

    std::thread::sleep(std::time::Duration::from_millis(100));

Does it make any difference if this is tokio::time::sleep(..).await?? (rather than blocking the task?)

@mingmwang
Copy link
Contributor

I will take a look.

@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 16, 2023

Does it make any difference if this is tokio::time::sleep(..).await?? (rather than blocking the task?)

It does. I did not put the tokio::time::sleep(..).await? since pool_next is a sync method. However, when I remove thread sleep, it worked. What could be the reason for this behavior? It seems that only CoalescePartitionsExec is affected by thread sleep.

@crepererum
Copy link
Contributor

I think the issue is your unbounded stream: it never yields back to tokio and hence the tasks that receive the data will never be executed (even though they are scheduled), see https://tokio.rs/blog/2020-04-preemption and https://docs.rs/tokio/latest/tokio/task/fn.yield_now.html.

I'm pretty sure the repartition channels are correct. I've added a bunch of printlns and they wake the receivers but the receiver never is executed by tokio.

@crepererum
Copy link
Contributor

This BTW will an issue with ALL CPU-bound tasks in tokio/DataFusion. You MUST yield to tokio from time to time to keep the system stable.

@alamb
Copy link
Contributor

alamb commented Feb 16, 2023

What could be the reason for this behavior? It seems that only CoalescePartitionsExec is affected by thread sleep.

~ Your test (implicitly) uses a single threaded tokio executor (so there is only a single thread). With only a single thread, as @crepererum mentions you need to yield control back to the scheduler (which is what .await does under the covers).~

I think the need for yielding is true for any cooperative scheduler system, but using await definitely may make that too magical

I take it back -- I see your example has a multithreaded executor

@crepererum
Copy link
Contributor

I think we could make this issue less likely by inserting a yield point into the distributor channels. Let me draft a PR...

crepererum added a commit to crepererum/arrow-datafusion that referenced this issue Feb 16, 2023
This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes apache#5278.
crepererum added a commit to crepererum/arrow-datafusion that referenced this issue Feb 16, 2023
This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes apache#5278.
crepererum added a commit to crepererum/arrow-datafusion that referenced this issue Feb 16, 2023
This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes apache#5278.
crepererum added a commit to crepererum/arrow-datafusion that referenced this issue Feb 16, 2023
This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes apache#5278.
alamb pushed a commit that referenced this issue Feb 17, 2023
* fix: add yield point to `RepartitionExec`

This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes #5278.

* refactor: use a single `UnboundedExec` for testing

* refactor: rename test
jiangzhx pushed a commit to jiangzhx/arrow-datafusion that referenced this issue Feb 24, 2023
* fix: add yield point to `RepartitionExec`

This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.

Fixes apache#5278.

* refactor: use a single `UnboundedExec` for testing

* refactor: rename test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants