Skip to content

Commit

Permalink
feat(streaming): enable dml executor to pause and resume on scaling (#…
Browse files Browse the repository at this point in the history
…8110)

- Enable dml executor to pause and resume on scaling.
- A little refactor on `StreamReaderWithPause` (previously named `SourceReaderStream`):
- Make the left arm accept general message types instead of barriers only.
- Introduce non-biased `StreamReaderWithPause`.

Fixes #8056

Approved-By: st1page
Approved-By: waruto210
  • Loading branch information
xx01cyx authored Feb 23, 2023
1 parent ba92df4 commit dd7fc13
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 168 deletions.
50 changes: 29 additions & 21 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::future::Either;
use futures::stream::select;
use either::Either;
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId};
use risingwave_connector::source::StreamChunkWithState;
use risingwave_source::dml_manager::DmlManagerRef;

use super::error::StreamExecutorError;
use super::stream_reader::StreamReaderWithPause;
use super::{
expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices,
PkIndicesRef,
expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, Mutation,
PkIndices, PkIndicesRef,
};

/// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific
Expand Down Expand Up @@ -92,31 +91,40 @@ impl DmlExecutor {
.dml_manager
.register_reader(self.table_id, self.table_version_id, &self.column_descs)
.map_err(StreamExecutorError::connector_error)?;
let batch_reader = batch_reader
.stream_reader()
.into_stream()
.map(Either::Right);
let batch_reader = batch_reader.stream_reader().into_stream();

yield Message::Barrier(barrier);
// Merge the two streams using `StreamReaderWithPause` because when we receive a pause
// barrier, we should stop receiving the data from DML. We poll data from the two streams in
// a round robin way.
let mut stream = StreamReaderWithPause::<false>::new(upstream, batch_reader);

// Stream data from the upstream executor.
let upstream = upstream.map(Either::Left);
// If the first barrier is configuration change, then the DML executor must be newly
// created, and we should start with the paused state.
if barrier.is_update() {
stream.pause_stream();
}

// Merge the two streams.
let stream = select(upstream, batch_reader);
yield Message::Barrier(barrier);

#[for_await]
for input_msg in stream {
match input_msg {
while let Some(input_msg) = stream.next().await {
match input_msg? {
Either::Left(msg) => {
// Stream data.
let msg: Message = msg?;
// Stream messages.
if let Message::Barrier(barrier) = &msg {
// We should handle barrier messages here to pause or resume the data from
// DML.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => {}
}
}
}
yield msg;
}
Either::Right(chunk) => {
// Batch data.
let chunk: StreamChunkWithState =
chunk.map_err(StreamExecutorError::connector_error)?;
yield Message::Chunk(chunk.chunk);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod sink;
mod sort;
mod sort_buffer;
pub mod source;
mod stream_reader;
pub mod subtask;
mod top_n;
mod union;
Expand Down
97 changes: 55 additions & 42 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::executor_core::StreamSourceCore;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::source::reader::SourceReaderStream;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::*;
/// [`FsSourceExecutor`] is a streaming source, fir external file systems
/// such as s3.
Expand Down Expand Up @@ -110,10 +110,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
Ok(steam_reader.into_stream())
}

async fn apply_split_change(
async fn apply_split_change<const BIASED: bool>(
&mut self,
source_desc: &FsSourceDesc,
stream: &mut SourceReaderStream,
stream: &mut StreamReaderWithPause<BIASED>,
mapping: &HashMap<ActorId, Vec<SplitImpl>>,
) -> StreamExecutorResult<()> {
if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() {
Expand Down Expand Up @@ -176,10 +176,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
Ok((!no_change_flag).then_some(target_state))
}

async fn replace_stream_reader_with_target_state(
async fn replace_stream_reader_with_target_state<const BIASED: bool>(
&mut self,
source_desc: &FsSourceDesc,
stream: &mut SourceReaderStream,
stream: &mut StreamReaderWithPause<BIASED>,
target_state: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
tracing::info!(
Expand All @@ -192,7 +192,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
let reader = self
.build_stream_source_reader(source_desc, Some(target_state.clone()))
.await?;
stream.replace_source_stream(reader);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
.into_iter()
Expand Down Expand Up @@ -332,10 +332,12 @@ impl<S: StateStore> FsSourceExecutor<S> {
.stack_trace("fs_source_start_reader")
.await?;

// Merge the chunks from source and the barriers into a single stream.
let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader);
// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream = StreamReaderWithPause::<true>::new(barrier_stream, source_chunk_reader);
if start_with_paused {
stream.pause_source();
stream.pause_stream();
}

yield Message::Barrier(barrier);
Expand All @@ -349,42 +351,53 @@ impl<S: StateStore> FsSourceExecutor<S> {
while let Some(msg) = stream.next().await {
match msg? {
// This branch will be preferred.
Either::Left(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
stream.resume_source();
self_paused = false;
}
let epoch = barrier.epoch;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::SourceChangeSplit(actor_splits) => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
}
Mutation::Pause => stream.pause_source(),
Mutation::Resume => stream.resume_source(),
Mutation::Update { actor_splits, .. } => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
stream.resume_stream();
self_paused = false;
}
let epoch = barrier.epoch;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::SourceChangeSplit(actor_splits) => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
}
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::Update { actor_splits, .. } => {
self.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
)
.await?;
}
_ => {}
}
_ => {}
}
self.take_snapshot_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.ctx.id.to_string().as_str(),
self.stream_source_core.source_identify.as_ref(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield msg;
}
self.take_snapshot_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.ctx.id.to_string().as_str(),
self.stream_source_core.source_identify.as_ref(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield Message::Barrier(barrier);
}
_ => {
// For the source executor, the message we receive from this arm should
// always be barrier message.
unreachable!();
}
},

Either::Right(StreamChunkWithState {
chunk,
Expand All @@ -395,7 +408,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
// we can guarantee the source is not paused since it received stream
// chunks.
self_paused = true;
stream.pause_source();
stream.pause_stream();
}
// update split offset
if let Some(mapping) = split_offset_mapping {
Expand Down
19 changes: 17 additions & 2 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,28 @@
// limitations under the License.

pub mod executor_core;
use async_stack_trace::StackTrace;
pub use executor_core::StreamSourceCore;
mod fs_source_executor;
pub use fs_source_executor::*;
use risingwave_common::bail;
pub use state_table_handler::*;

pub mod source_executor;

mod reader;
pub mod state_table_handler;

pub use state_table_handler::*;
use futures_async_stream::try_stream;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::error::StreamExecutorError;
use crate::executor::{Barrier, Message};

/// Receive barriers from barrier manager with the channel, error on channel close.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
while let Some(barrier) = rx.recv().stack_trace("receive_barrier").await {
yield Message::Barrier(barrier);
}
bail!("barrier reader closed unexpectedly");
}
Loading

0 comments on commit dd7fc13

Please sign in to comment.