Skip to content

Commit

Permalink
refactor: ExchangeWriter should report error throught status
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Mar 13, 2023
1 parent 28c539c commit c7246bb
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{ErrorCode, RwError};
use thiserror::Error;
use tonic::Status;

use crate::error::BatchError::Internal;

Expand Down Expand Up @@ -67,3 +68,9 @@ impl From<RwError> for BatchError {
Internal(anyhow!(format!("{}", s)))
}
}

impl<'a> From<&'a BatchError> for Status {
fn from(err: &'a BatchError) -> Self {
Status::internal(err.to_string())
}
}
16 changes: 8 additions & 8 deletions src/batch/src/rpc/service/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use risingwave_common::error::{Result, ToRwResult};
use risingwave_pb::task_service::GetDataResponse;
use tonic::Status;

type ExchangeDataSender = tokio::sync::mpsc::Sender<std::result::Result<GetDataResponse, Status>>;
pub type GetDataResponseResult = std::result::Result<GetDataResponse, Status>;

type ExchangeDataSender = tokio::sync::mpsc::Sender<GetDataResponseResult>;

#[async_trait::async_trait]
pub trait ExchangeWriter: Send {
async fn write(&mut self, resp: GetDataResponse) -> Result<()>;
async fn write(&mut self, resp: GetDataResponseResult) -> Result<()>;
}

pub struct GrpcExchangeWriter {
Expand All @@ -41,12 +42,11 @@ impl GrpcExchangeWriter {
}
}

#[async_trait::async_trait]
impl ExchangeWriter for GrpcExchangeWriter {
async fn write(&mut self, data: GetDataResponse) -> Result<()> {
async fn write(&mut self, data: GetDataResponseResult) -> Result<()> {
self.written_chunks += 1;
self.sender
.send(Ok(data))
.send(data)
.await
.to_rw_result_with(|| "failed to write data to ExchangeWriter".into())
}
Expand All @@ -62,7 +62,7 @@ mod tests {
async fn test_exchange_writer() {
let (tx, _rx) = tokio::sync::mpsc::channel(10);
let mut writer = GrpcExchangeWriter::new(tx);
writer.write(GetDataResponse::default()).await.unwrap();
writer.write(Ok(GetDataResponse::default())).await.unwrap();
assert_eq!(writer.written_chunks(), 1);
}

Expand All @@ -71,7 +71,7 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(10);
drop(rx);
let mut writer = GrpcExchangeWriter::new(tx);
let res = writer.write(GetDataResponse::default()).await;
let res = writer.write(Ok(GetDataResponse::default())).await;
assert!(res.is_err());
}
}
10 changes: 5 additions & 5 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl TaskOutput {
/// Return whether the data stream is finished.
async fn take_data_inner(
&mut self,
writer: &mut dyn ExchangeWriter,
writer: &mut impl ExchangeWriter,
at_most_num: Option<usize>,
) -> Result<bool> {
let mut cnt: usize = 0;
Expand All @@ -212,15 +212,15 @@ impl TaskOutput {
let resp = GetDataResponse {
record_batch: Some(pb),
};
writer.write(resp).await?;
writer.write(Ok(resp)).await?;
}
// Reached EOF
Ok(None) => {
break;
}
// Error happened
Err(e) => {
return Err(to_rw_error(e));
writer.write(Err(Status::from(&*e))).await?;
}
}
cnt += 1;
Expand All @@ -232,14 +232,14 @@ impl TaskOutput {
/// Return whether the data stream is finished.
pub async fn take_data_with_num(
&mut self,
writer: &mut dyn ExchangeWriter,
writer: &mut impl ExchangeWriter,
num: usize,
) -> Result<bool> {
self.take_data_inner(writer, Some(num)).await
}

/// Take all data and write the data in serialized format to `ExchangeWriter`.
pub async fn take_data(&mut self, writer: &mut dyn ExchangeWriter) -> Result<()> {
pub async fn take_data(&mut self, writer: &mut impl ExchangeWriter) -> Result<()> {
let finish = self.take_data_inner(writer, None).await?;
assert!(finish);
Ok(())
Expand Down

0 comments on commit c7246bb

Please sign in to comment.