Skip to content

Commit

Permalink
fix(file source): clear counter splits_on_fetch when updating rate …
Browse files Browse the repository at this point in the history
…limit (#20622)
  • Loading branch information
wcy-fdu authored and wcy-fdu committed Feb 27, 2025
1 parent cdd5719 commit bfc8a46
Showing 1 changed file with 2 additions and 4 deletions.
6 changes: 2 additions & 4 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,6 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
Either::Left(msg) => {
match &msg {
Message::Barrier(barrier) => {
let mut need_rebuild_reader = false;

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Expand All @@ -264,7 +262,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
*new_rate_limit
);
self.rate_limit_rps = *new_rate_limit;
need_rebuild_reader = true;
splits_on_fetch = 0;
}
}
_ => (),
Expand All @@ -290,7 +288,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}
}

if splits_on_fetch == 0 || need_rebuild_reader {
if splits_on_fetch == 0 {
Self::replace_with_new_batch_reader(
&mut splits_on_fetch,
&state_store_handler,
Expand Down

0 comments on commit bfc8a46

Please sign in to comment.