diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index bb6e1027c9a40..a30e25af95c5c 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -779,9 +779,10 @@ impl HashJoinExecutor, } -pub(super) struct BufferedWatermarks { +pub(super) struct BufferedWatermarks { /// We store the smallest watermark of each upstream, because the next watermark to emit is /// among them. - pub first_buffered_watermarks: BinaryHeap>, + pub first_buffered_watermarks: BinaryHeap>, /// We buffer other watermarks of each upstream. The next-to-smallest one will become the /// smallest when the smallest is emitted and be moved into heap. - pub other_buffered_watermarks: BTreeMap, + pub other_buffered_watermarks: BTreeMap, } -impl BufferedWatermarks { - pub fn with_ids(buffer_ids: Vec) -> Self { +impl BufferedWatermarks { + pub fn with_ids(buffer_ids: impl IntoIterator) -> Self { + let other_buffered_watermarks: BTreeMap<_, _> = buffer_ids + .into_iter() + .map(|id| (id, Default::default())) + .collect(); + let first_buffered_watermarks = BinaryHeap::with_capacity(other_buffered_watermarks.len()); + BufferedWatermarks { - first_buffered_watermarks: BinaryHeap::with_capacity(buffer_ids.len()), - other_buffered_watermarks: BTreeMap::from_iter( - buffer_ids.into_iter().map(|id| (id, Default::default())), - ), + first_buffered_watermarks, + other_buffered_watermarks, } } - pub fn add_buffers(&mut self, buffer_ids: Vec) { + pub fn add_buffers(&mut self, buffer_ids: impl IntoIterator) { buffer_ids.into_iter().for_each(|id| { self.other_buffered_watermarks - .insert(id, Default::default()) + .try_insert(id, Default::default()) .unwrap(); }); } @@ -62,7 +66,7 @@ impl BufferedWatermarks { /// Handle a new watermark message. Optionally returns the watermark message to emit and the /// buffer id. - pub fn handle_watermark(&mut self, buffer_id: ID, watermark: Watermark) -> Option { + pub fn handle_watermark(&mut self, buffer_id: Id, watermark: Watermark) -> Option { // Note: The staged watermark buffer should be created before handling the watermark. let mut staged = self.other_buffered_watermarks.get_mut(&buffer_id).unwrap(); @@ -100,7 +104,7 @@ impl BufferedWatermarks { } /// Remove buffers and return watermark to emit. - pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet) -> Option { + pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet) -> Option { self.first_buffered_watermarks .retain(|Reverse((_, id))| !buffer_ids_to_remove.contains(id)); self.other_buffered_watermarks diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 3e4cf1a435f92..b6629bacb45fd 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -137,7 +137,6 @@ pub struct Args { #[cfg(madsim)] #[madsim::main] async fn main() { - use std::env; use std::sync::Arc; use risingwave_simulation::client::RisingWave; diff --git a/src/tests/simulation/tests/it/nexmark_source.rs b/src/tests/simulation/tests/it/nexmark_source.rs index cb50853a00500..9d1c784a8d8f6 100644 --- a/src/tests/simulation/tests/it/nexmark_source.rs +++ b/src/tests/simulation/tests/it/nexmark_source.rs @@ -21,15 +21,35 @@ use madsim::time::sleep; use risingwave_simulation::cluster::Configuration; use risingwave_simulation::ctl_ext::predicate::identity_contains; use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT}; -use risingwave_simulation::utils::AssertResult; -/// Check that everything works well after scaling of source-related executor. #[madsim::test] async fn nexmark_source() -> Result<()> { - let events = 20 * THROUGHPUT; + nexmark_source_inner(false).await +} + +#[madsim::test] +async fn nexmark_source_with_watermark() -> Result<()> { + nexmark_source_inner(true).await +} + +/// Check that everything works well after scaling of source-related executor. +async fn nexmark_source_inner(watermark: bool) -> Result<()> { + let expected_events = 20 * THROUGHPUT; + let expected_events_range = if watermark { + // If there's watermark, we'll possibly get fewer events. + (0.99 * expected_events as f64) as usize..=expected_events + } else { + // If there's no watermark, we'll get exactly the expected number of events. + expected_events..=expected_events + }; - let mut cluster = - NexmarkCluster::new(Configuration::for_scale(), 6, Some(events), false).await?; + let mut cluster = NexmarkCluster::new( + Configuration::for_scale(), + 6, + Some(expected_events), + watermark, + ) + .await?; // Materialize all sources so that we can also check whether the row id generator is working // correctly after scaling. @@ -63,7 +83,7 @@ async fn nexmark_source() -> Result<()> { sleep(Duration::from_secs(30)).await; // Check the total number of events. - cluster + let result = cluster .run( r#" with count_p as (select count(*) count_p from materialized_person), @@ -71,8 +91,15 @@ with count_p as (select count(*) count_p from materialized_person), count_b as (select count(*) count_b from materialized_bid) select count_p + count_a + count_b from count_p, count_a, count_b;"#, ) - .await? - .assert_result_eq(events.to_string()); + .await?; + + let actual_events: usize = result.trim().parse()?; + assert!( + expected_events_range.contains(&actual_events), + "expected event num in {:?}, got {}", + expected_events_range, + actual_events + ); Ok(()) }