Skip to content

Commit

Permalink
fix(watermark): fix merge watermark on scale (#8223)
Browse files Browse the repository at this point in the history
* fix(watermark): fix merge watermark on scale

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fmt

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Feb 28, 2023
1 parent ad46917 commit 63890f6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
7 changes: 4 additions & 3 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.positions(|idx| *idx == watermark.col_idx);
let mut watermarks_to_emit = vec![];
for idx in wm_in_jk {
let buffers = self.watermark_buffers.entry(idx).or_insert_with(|| {
BufferedWatermarks::with_ids(vec![SideType::Left, SideType::Right])
});
let buffers = self
.watermark_buffers
.entry(idx)
.or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
let empty_indices = vec![];
let output_indices = side_update
Expand Down
32 changes: 18 additions & 14 deletions src/stream/src/executor/watermark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,39 @@ use std::hash::Hash;

use super::Watermark;

#[derive(Default)]
#[derive(Default, Debug)]
pub(super) struct StagedWatermarks {
in_heap: bool,
staged: VecDeque<Watermark>,
}

pub(super) struct BufferedWatermarks<ID> {
pub(super) struct BufferedWatermarks<Id> {
/// We store the smallest watermark of each upstream, because the next watermark to emit is
/// among them.
pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, ID)>>,
pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, Id)>>,
/// We buffer other watermarks of each upstream. The next-to-smallest one will become the
/// smallest when the smallest is emitted and be moved into heap.
pub other_buffered_watermarks: BTreeMap<ID, StagedWatermarks>,
pub other_buffered_watermarks: BTreeMap<Id, StagedWatermarks>,
}

impl<ID: Ord + Hash> BufferedWatermarks<ID> {
pub fn with_ids(buffer_ids: Vec<ID>) -> Self {
impl<Id: Ord + Hash + std::fmt::Debug> BufferedWatermarks<Id> {
pub fn with_ids(buffer_ids: impl IntoIterator<Item = Id>) -> Self {
let other_buffered_watermarks: BTreeMap<_, _> = buffer_ids
.into_iter()
.map(|id| (id, Default::default()))
.collect();
let first_buffered_watermarks = BinaryHeap::with_capacity(other_buffered_watermarks.len());

BufferedWatermarks {
first_buffered_watermarks: BinaryHeap::with_capacity(buffer_ids.len()),
other_buffered_watermarks: BTreeMap::from_iter(
buffer_ids.into_iter().map(|id| (id, Default::default())),
),
first_buffered_watermarks,
other_buffered_watermarks,
}
}

pub fn add_buffers(&mut self, buffer_ids: Vec<ID>) {
pub fn add_buffers(&mut self, buffer_ids: impl IntoIterator<Item = Id>) {
buffer_ids.into_iter().for_each(|id| {
self.other_buffered_watermarks
.insert(id, Default::default())
.try_insert(id, Default::default())
.unwrap();
});
}
Expand All @@ -62,7 +66,7 @@ impl<ID: Ord + Hash> BufferedWatermarks<ID> {

/// Handle a new watermark message. Optionally returns the watermark message to emit and the
/// buffer id.
pub fn handle_watermark(&mut self, buffer_id: ID, watermark: Watermark) -> Option<Watermark> {
pub fn handle_watermark(&mut self, buffer_id: Id, watermark: Watermark) -> Option<Watermark> {
// Note: The staged watermark buffer should be created before handling the watermark.
let mut staged = self.other_buffered_watermarks.get_mut(&buffer_id).unwrap();

Expand Down Expand Up @@ -100,7 +104,7 @@ impl<ID: Ord + Hash> BufferedWatermarks<ID> {
}

/// Remove buffers and return watermark to emit.
pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<ID>) -> Option<Watermark> {
pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<Id>) -> Option<Watermark> {
self.first_buffered_watermarks
.retain(|Reverse((_, id))| !buffer_ids_to_remove.contains(id));
self.other_buffered_watermarks
Expand Down
1 change: 0 additions & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ pub struct Args {
#[cfg(madsim)]
#[madsim::main]
async fn main() {
use std::env;
use std::sync::Arc;

use risingwave_simulation::client::RisingWave;
Expand Down
43 changes: 35 additions & 8 deletions src/tests/simulation/tests/it/nexmark_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,35 @@ use madsim::time::sleep;
use risingwave_simulation::cluster::Configuration;
use risingwave_simulation::ctl_ext::predicate::identity_contains;
use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT};
use risingwave_simulation::utils::AssertResult;

/// Check that everything works well after scaling of source-related executor.
#[madsim::test]
async fn nexmark_source() -> Result<()> {
let events = 20 * THROUGHPUT;
nexmark_source_inner(false).await
}

#[madsim::test]
async fn nexmark_source_with_watermark() -> Result<()> {
nexmark_source_inner(true).await
}

/// Check that everything works well after scaling of source-related executor.
async fn nexmark_source_inner(watermark: bool) -> Result<()> {
let expected_events = 20 * THROUGHPUT;
let expected_events_range = if watermark {
// If there's watermark, we'll possibly get fewer events.
(0.99 * expected_events as f64) as usize..=expected_events
} else {
// If there's no watermark, we'll get exactly the expected number of events.
expected_events..=expected_events
};

let mut cluster =
NexmarkCluster::new(Configuration::for_scale(), 6, Some(events), false).await?;
let mut cluster = NexmarkCluster::new(
Configuration::for_scale(),
6,
Some(expected_events),
watermark,
)
.await?;

// Materialize all sources so that we can also check whether the row id generator is working
// correctly after scaling.
Expand Down Expand Up @@ -63,16 +83,23 @@ async fn nexmark_source() -> Result<()> {
sleep(Duration::from_secs(30)).await;

// Check the total number of events.
cluster
let result = cluster
.run(
r#"
with count_p as (select count(*) count_p from materialized_person),
count_a as (select count(*) count_a from materialized_auction),
count_b as (select count(*) count_b from materialized_bid)
select count_p + count_a + count_b from count_p, count_a, count_b;"#,
)
.await?
.assert_result_eq(events.to_string());
.await?;

let actual_events: usize = result.trim().parse()?;
assert!(
expected_events_range.contains(&actual_events),
"expected event num in {:?}, got {}",
expected_events_range,
actual_events
);

Ok(())
}

0 comments on commit 63890f6

Please sign in to comment.