Skip to content

Commit

Permalink
make adaptive rate limit configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 30, 2024
1 parent e842dad commit 6150e3a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ statement ok
insert into fact select 1 from generate_series(1, 250000);

statement ok
insert into dim select 1 from generate_series(1, 100);
insert into dim select 1 from generate_series(1, 1000);

statement ok
flush;
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ where
let upstream_table_id = self.upstream_table.table_id();
let mut upstream_table = self.upstream_table;
let vnodes = upstream_table.vnodes().clone();
let rate_limit = self.rate_limit;
self.chunk_size = 1;
let mut rate_limit = self.rate_limit;

// These builders will build data chunks.
// We must supply them with the full datatypes which correspond to
Expand Down Expand Up @@ -149,7 +148,9 @@ where
};
tracing::debug!(target: "adaptive_rate_limit", highest_barrier_latency, threshold_barrier_latency, "initial configs");
let adaptive_rate_limit = true;
let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
if adaptive_rate_limit {
rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT);
}

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
Expand Down Expand Up @@ -557,7 +558,7 @@ where

// Adapt Rate Limit
if adaptive_rate_limit {
Self::adapt_rate_limit_3(
Self::adapt_rate_limit_2(
&self.actor_id,
&self.metrics,
threshold_barrier_latency,
Expand Down

0 comments on commit 6150e3a

Please sign in to comment.