From 6150e3a34f1dc9b82c285f1bc2f52d05bb7405e9 Mon Sep 17 00:00:00 2001 From: noel Date: Thu, 30 May 2024 14:05:37 +0800 Subject: [PATCH] make adaptive rate limit configurable --- .../backfill/adaptive-rate-limit/amplification-100.slt | 2 +- src/stream/src/executor/backfill/arrangement_backfill.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt b/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt index fa6cf81cef304..e4bd3cc0574a1 100644 --- a/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt +++ b/e2e_test/backfill/adaptive-rate-limit/amplification-100.slt @@ -10,7 +10,7 @@ statement ok insert into fact select 1 from generate_series(1, 250000); statement ok -insert into dim select 1 from generate_series(1, 100); +insert into dim select 1 from generate_series(1, 1000); statement ok flush; diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index bdd703ed6c85c..a1f278a5ba356 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -111,8 +111,7 @@ where let upstream_table_id = self.upstream_table.table_id(); let mut upstream_table = self.upstream_table; let vnodes = upstream_table.vnodes().clone(); - let rate_limit = self.rate_limit; - self.chunk_size = 1; + let mut rate_limit = self.rate_limit; // These builders will build data chunks. // We must supply them with the full datatypes which correspond to @@ -149,7 +148,9 @@ where }; tracing::debug!(target: "adaptive_rate_limit", highest_barrier_latency, threshold_barrier_latency, "initial configs"); let adaptive_rate_limit = true; - let mut rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT); + if adaptive_rate_limit { + rate_limit = Some(INITIAL_ADAPTIVE_RATE_LIMIT); + } // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; @@ -557,7 +558,7 @@ where // Adapt Rate Limit if adaptive_rate_limit { - Self::adapt_rate_limit_3( + Self::adapt_rate_limit_2( &self.actor_id, &self.metrics, threshold_barrier_latency,