Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(watermark): fix merge watermark on scale #8223

Merged
merged 2 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.positions(|idx| *idx == watermark.col_idx);
let mut watermarks_to_emit = vec![];
for idx in wm_in_jk {
let buffers = self.watermark_buffers.entry(idx).or_insert_with(|| {
BufferedWatermarks::with_ids(vec![SideType::Left, SideType::Right])
});
let buffers = self
.watermark_buffers
.entry(idx)
.or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
let empty_indices = vec![];
let output_indices = side_update
Expand Down
32 changes: 18 additions & 14 deletions src/stream/src/executor/watermark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,39 @@ use std::hash::Hash;

use super::Watermark;

#[derive(Default)]
#[derive(Default, Debug)]
pub(super) struct StagedWatermarks {
in_heap: bool,
staged: VecDeque<Watermark>,
}

pub(super) struct BufferedWatermarks<ID> {
pub(super) struct BufferedWatermarks<Id> {
/// We store the smallest watermark of each upstream, because the next watermark to emit is
/// among them.
pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, ID)>>,
pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, Id)>>,
/// We buffer other watermarks of each upstream. The next-to-smallest one will become the
/// smallest when the smallest is emitted and be moved into heap.
pub other_buffered_watermarks: BTreeMap<ID, StagedWatermarks>,
pub other_buffered_watermarks: BTreeMap<Id, StagedWatermarks>,
}

impl<ID: Ord + Hash> BufferedWatermarks<ID> {
pub fn with_ids(buffer_ids: Vec<ID>) -> Self {
impl<Id: Ord + Hash + std::fmt::Debug> BufferedWatermarks<Id> {
pub fn with_ids(buffer_ids: impl IntoIterator<Item = Id>) -> Self {
let other_buffered_watermarks: BTreeMap<_, _> = buffer_ids
.into_iter()
.map(|id| (id, Default::default()))
.collect();
let first_buffered_watermarks = BinaryHeap::with_capacity(other_buffered_watermarks.len());

BufferedWatermarks {
first_buffered_watermarks: BinaryHeap::with_capacity(buffer_ids.len()),
other_buffered_watermarks: BTreeMap::from_iter(
buffer_ids.into_iter().map(|id| (id, Default::default())),
),
first_buffered_watermarks,
other_buffered_watermarks,
}
}

pub fn add_buffers(&mut self, buffer_ids: Vec<ID>) {
pub fn add_buffers(&mut self, buffer_ids: impl IntoIterator<Item = Id>) {
buffer_ids.into_iter().for_each(|id| {
self.other_buffered_watermarks
.insert(id, Default::default())
.try_insert(id, Default::default())
.unwrap();
});
}
Expand All @@ -62,7 +66,7 @@ impl<ID: Ord + Hash> BufferedWatermarks<ID> {

/// Handle a new watermark message. Optionally returns the watermark message to emit and the
/// buffer id.
pub fn handle_watermark(&mut self, buffer_id: ID, watermark: Watermark) -> Option<Watermark> {
pub fn handle_watermark(&mut self, buffer_id: Id, watermark: Watermark) -> Option<Watermark> {
// Note: The staged watermark buffer should be created before handling the watermark.
let mut staged = self.other_buffered_watermarks.get_mut(&buffer_id).unwrap();

Expand Down Expand Up @@ -100,7 +104,7 @@ impl<ID: Ord + Hash> BufferedWatermarks<ID> {
}

/// Remove buffers and return watermark to emit.
pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<ID>) -> Option<Watermark> {
pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<Id>) -> Option<Watermark> {
self.first_buffered_watermarks
.retain(|Reverse((_, id))| !buffer_ids_to_remove.contains(id));
self.other_buffered_watermarks
Expand Down
1 change: 0 additions & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ pub struct Args {
#[cfg(madsim)]
#[madsim::main]
async fn main() {
use std::env;
use std::sync::Arc;

use risingwave_simulation::client::RisingWave;
Expand Down
43 changes: 35 additions & 8 deletions src/tests/simulation/tests/it/nexmark_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,35 @@ use madsim::time::sleep;
use risingwave_simulation::cluster::Configuration;
use risingwave_simulation::ctl_ext::predicate::identity_contains;
use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT};
use risingwave_simulation::utils::AssertResult;

/// Check that everything works well after scaling of source-related executor.
#[madsim::test]
async fn nexmark_source() -> Result<()> {
let events = 20 * THROUGHPUT;
nexmark_source_inner(false).await
}

#[madsim::test]
async fn nexmark_source_with_watermark() -> Result<()> {
nexmark_source_inner(true).await
}

/// Check that everything works well after scaling of source-related executor.
async fn nexmark_source_inner(watermark: bool) -> Result<()> {
let expected_events = 20 * THROUGHPUT;
let expected_events_range = if watermark {
// If there's watermark, we'll possibly get fewer events.
(0.99 * expected_events as f64) as usize..=expected_events
} else {
// If there's no watermark, we'll get exactly the expected number of events.
expected_events..=expected_events
};

let mut cluster =
NexmarkCluster::new(Configuration::for_scale(), 6, Some(events), false).await?;
let mut cluster = NexmarkCluster::new(
Configuration::for_scale(),
6,
Some(expected_events),
watermark,
)
.await?;

// Materialize all sources so that we can also check whether the row id generator is working
// correctly after scaling.
Expand Down Expand Up @@ -63,16 +83,23 @@ async fn nexmark_source() -> Result<()> {
sleep(Duration::from_secs(30)).await;

// Check the total number of events.
cluster
let result = cluster
.run(
r#"
with count_p as (select count(*) count_p from materialized_person),
count_a as (select count(*) count_a from materialized_auction),
count_b as (select count(*) count_b from materialized_bid)
select count_p + count_a + count_b from count_p, count_a, count_b;"#,
)
.await?
.assert_result_eq(events.to_string());
.await?;

let actual_events: usize = result.trim().parse()?;
assert!(
expected_events_range.contains(&actual_events),
"expected event num in {:?}, got {}",
expected_events_range,
actual_events
);

Ok(())
}