Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor Resolver in dist sql query #1186

Merged
merged 7 commits into from
Sep 8, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor StreamState's states.
Rachelint committed Sep 8, 2023
commit 232504c4acd0e076c932779b1ce6511e7e5a1ee8
82 changes: 32 additions & 50 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
@@ -265,27 +265,14 @@ impl Stream for PartitionedScanStream {
*stream_state = StreamState::Polling(stream);
}
Poll::Ready(Err(e)) => {
*stream_state = StreamState::Failed(e.to_string());
*stream_state = StreamState::InitializeFailed;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => return Poll::Pending,
}
}

StreamState::Polling(stream) => {
let poll_res = stream.poll_next_unpin(cx);
if let Poll::Ready(Some(Err(e))) = &poll_res {
*stream_state = StreamState::Failed(e.to_string());
}

return poll_res;
}

StreamState::Failed(err_msg) => {
return Poll::Ready(Some(Err(DataFusionError::Internal(format!(
"failed to poll record stream, err:{err_msg}"
)))));
}
},
StreamState::InitializeFailed => return Poll::Ready(None),
StreamState::Polling(stream) => return stream.poll_next_unpin(cx),
}
}
}
@@ -296,30 +283,23 @@ impl Stream for PartitionedScanStream {
/// stream first. The process of state changing is like:
///
/// ```plaintext
/// ┌────┐
/// │INIT│
/// └──┬─┘
/// _________▽_________
/// ╱ ╲ ┌───────┐
/// ╱ Success to init the ╲________│POLLING│
/// ╲ record batch stream ╱yes └────┬──┘
/// ╲___________________╱ ________▽_________
/// │no ╱ ╲
/// ┌───▽──┐ ╱ Success to poll ╲___
/// │FAILED│ ╲ all record batches ╱yes│
/// └──────┘ ╲__________________╱ │
/// │no │
/// ┌───▽──┐ │
/// │FAILED│ │
/// └──────┘ │
/// ┌─▽─┐
/// │END│
/// └───┘
/// ┌────────────┐
/// │Initializing│
/// └──────┬─────┘
/// _________▽_________ ┌──────────────────────────────┐
/// ╱ ╲ │Polling(we just return the │
/// ╱ Success to init the ╲___│inner stream's polling result)│
/// ╲ record batch stream ╱yes└──────────────────────────────┘
/// ╲___________________╱
/// │no
/// ┌────────▽───────┐
/// │InitializeFailed│
/// └────────────────┘
/// ```
pub enum StreamState {
Initializing,
InitializeFailed,
Polling(DfSendableRecordBatchStream),
Failed(String),
}

// TODO: make display for the plan more pretty.
@@ -468,7 +448,7 @@ mod test {

#[tokio::test]
async fn test_stream_init_failed() {
let builder = MockPartitionedScanStreamBuilder::new(PartitionedScanStreamCase::InitFailed);
let builder = MockPartitionedScanStreamBuilder::new(PartitionedScanStreamCase::InitializeFailed);
let stream = builder.build();
test_stream_failed_state(stream, "failed to init").await
}
@@ -481,19 +461,21 @@ mod test {
}

async fn test_stream_failed_state(mut stream: PartitionedScanStream, failed_msg: &str) {
// If error happened, it continue to return this error in later polling.
for _ in 0..2 {
let result_opt = stream.next().await;
assert!(result_opt.is_some());
let result = result_opt.unwrap();
assert!(result.is_err());
let err = result.unwrap_err();
match err {
DataFusionError::Internal(msg) => {
assert!(msg.contains(failed_msg))
}
other => panic!("unexpected error:{other}"),
// Error happened, check error message.
let result_opt = stream.next().await;
assert!(result_opt.is_some());
let result = result_opt.unwrap();
assert!(result.is_err());
let err = result.unwrap_err();
match err {
DataFusionError::Internal(msg) => {
assert!(msg.contains(failed_msg))
}
other => panic!("unexpected error:{other}"),
}

// Should return `None` in next poll.
let result_opt = stream.next().await;
assert!(result_opt.is_none());
}
}
14 changes: 11 additions & 3 deletions df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
@@ -362,7 +362,7 @@ pub struct MockPartitionedScanStreamBuilder {

#[derive(Clone, Copy)]
pub enum PartitionedScanStreamCase {
InitFailed,
InitializeFailed,
PollFailed,
Success,
}
@@ -376,7 +376,7 @@ impl MockPartitionedScanStreamBuilder {
pub fn build(&self) -> PartitionedScanStream {
let stream_future: BoxFuture<'static, DfResult<SendableRecordBatchStream>> = match self.case
{
PartitionedScanStreamCase::InitFailed => {
PartitionedScanStreamCase::InitializeFailed => {
Box::pin(
async move { Err(DataFusionError::Internal("failed to init".to_string())) },
)
@@ -409,11 +409,14 @@ impl MockPartitionedScanStreamBuilder {
pub struct ErrorRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,

/// Mark the stream is terminated.
done: bool,
}

impl ErrorRecordBatchStream {
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
Self { schema, done: false }
}
}

@@ -427,6 +430,11 @@ impl Stream for ErrorRecordBatchStream {
type Item = DfResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

self.get_mut().done = true;
Poll::Ready(Some(Err(DataFusionError::Internal(
"failed to poll".to_string(),
))))