diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index c46771269597e..0cc6db2ed854d 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -427,7 +427,7 @@ async fn test_row_seq_scan() -> Result<()> { ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()), ]; - let mut state = StateTable::new_without_distribution( + let mut state = StateTable::<_>::new_without_distribution( memory_state_store.clone(), TableId::from(0x42), column_descs.clone(), diff --git a/src/stream/src/common/table/mod.rs b/src/stream/src/common/table/mod.rs index e8caa0b78d743..058c678c1f6b3 100644 --- a/src/stream/src/common/table/mod.rs +++ b/src/stream/src/common/table/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. pub mod state_table; -mod watermark; +pub mod watermark; #[cfg(test)] pub mod test_state_table; diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 419ff8e94e363..4906a921e9173 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -51,18 +51,15 @@ use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution use risingwave_storage::StateStore; use tracing::trace; -use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; +use super::watermark::{WatermarkBufferStrategy, WatermarkBufferStrategyByEpochDefault}; use crate::executor::{StreamExecutorError, StreamExecutorResult}; -/// This num is arbitrary and we may want to improve this choice in the future. -const STATE_CLEANING_PERIOD_EPOCH: usize = 5; - /// `StateTable` is the interface accessing relational data in KV(`StateStore`) with /// row-based encoding. #[derive(Clone)] pub struct StateTable< S: StateStore, - W: WatermarkBufferStrategy = WatermarkBufferByEpoch, + W: WatermarkBufferStrategy = WatermarkBufferStrategyByEpochDefault, > { /// Id for this table. table_id: TableId, @@ -508,7 +505,7 @@ impl StateTable { const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions); // point get -impl StateTable { +impl StateTable { /// Get a single row from state table. pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult> { let compacted_row: Option = self.get_compacted_row(pk).await?; @@ -600,7 +597,7 @@ impl StateTable { } // write -impl StateTable { +impl StateTable { #[expect(clippy::boxed_local)] fn handle_mem_table_error(&self, e: Box) { match *e { diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 2fc3699f36237..5c13770e86a39 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -50,7 +50,7 @@ async fn test_state_table_update_insert() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -228,7 +228,7 @@ async fn test_state_table_iter_with_prefix() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -353,7 +353,7 @@ async fn test_state_table_iter_with_pk_range() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -487,7 +487,7 @@ async fn test_mem_table_assertion() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let epoch = EpochPair::new_test_epoch(1); @@ -530,7 +530,7 @@ async fn test_state_table_iter_with_value_indices() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -691,7 +691,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -926,7 +926,7 @@ async fn test_state_table_write_chunk() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let epoch = EpochPair::new_test_epoch(1); @@ -1055,7 +1055,7 @@ async fn test_state_table_write_chunk_visibility() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let epoch = EpochPair::new_test_epoch(1); @@ -1182,7 +1182,7 @@ async fn test_state_table_write_chunk_value_indices() { test_env.register_table(table.clone()).await; let mut state_table = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let epoch = EpochPair::new_test_epoch(1); diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index 903027fc82310..3225c1666faf5 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -64,7 +64,7 @@ async fn test_storage_table_value_indices() { test_env.register_table(table.clone()).await; let mut state = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let table = StorageTable::for_test( @@ -192,7 +192,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() { test_env.register_table(table.clone()).await; let mut state = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let mut epoch = EpochPair::new_test_epoch(1); @@ -295,7 +295,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() { test_env.register_table(table.clone()).await; let mut state = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)]; @@ -402,7 +402,7 @@ async fn test_batch_scan_with_value_indices() { test_env.register_table(table.clone()).await; let mut state = - StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) + StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None) .await; let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)]; diff --git a/src/stream/src/common/table/watermark.rs b/src/stream/src/common/table/watermark.rs index e7314dc21e55c..26b028baea73a 100644 --- a/src/stream/src/common/table/watermark.rs +++ b/src/stream/src/common/table/watermark.rs @@ -13,7 +13,7 @@ // limitations under the License. /// Strategy to decide how to buffer the watermarks, used for state cleaning. -pub trait WatermarkBufferStrategy: Default { +pub trait WatermarkBufferStrategy: Default + Send + Sync + 'static { /// Trigger when a epoch is committed. fn tick(&mut self); @@ -58,3 +58,9 @@ impl WatermarkBufferStrategy for WatermarkBufferByEpoch; diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 104cb2ef46a66..2ad1ca74ae647 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -27,6 +27,7 @@ use risingwave_storage::StateStore; use super::agg_state::{AggState, AggStateStorage}; use super::AggCall; use crate::common::table::state_table::StateTable; +use crate::common::table::watermark::WatermarkBufferStrategy; use crate::executor::error::StreamExecutorResult; use crate::executor::PkIndices; @@ -47,7 +48,7 @@ impl Debug for AggGroup { f.debug_struct("AggGroup") .field("group_key", &self.group_key) .field("prev_outputs", &self.prev_outputs) - .finish() + .finish_non_exhaustive() } } @@ -67,11 +68,11 @@ pub struct AggChangesInfo { impl AggGroup { /// Create [`AggGroup`] for the given [`AggCall`]s and `group_key`. /// For [`crate::executor::GlobalSimpleAggExecutor`], the `group_key` should be `None`. - pub async fn create( + pub async fn create( group_key: Option, agg_calls: &[AggCall], storages: &[AggStateStorage], - result_table: &StateTable, + result_table: &StateTable, pk_indices: &PkIndices, extreme_cache_size: usize, input_schema: &Schema, diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 3008363c98ac8..a9bdfc3646788 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -325,7 +325,7 @@ mod tests { .collect_vec(); let mapping = StateTableColumnMapping::new(upstream_columns, None); let pk_len = order_types.len(); - let table = StateTable::new_without_distribution( + let table = StateTable::<_>::new_without_distribution( MemoryStateStore::new(), table_id, columns, diff --git a/src/stream/src/executor/aggregation/mod.rs b/src/stream/src/executor/aggregation/mod.rs index 11b30129e0ded..4bc6e37054623 100644 --- a/src/stream/src/executor/aggregation/mod.rs +++ b/src/stream/src/executor/aggregation/mod.rs @@ -39,9 +39,11 @@ mod state_cache; mod table; mod value; -/// Generate [`crate::executor::HashAggExecutor`]'s schema from `input`, `agg_calls` and -/// `group_key_indices`. For [`crate::executor::HashAggExecutor`], the group key indices should -/// be provided. +/// Generate [`HashAggExecutor`][HashAggExecutor]'s schema from `input`, `agg_calls` and +/// `group_key_indices`. For [`HashAggExecutor`][HashAggExecutor], the group key indices should be +/// provided. +/// +/// [HashAggExecutor]:crate::executor::HashAggExecutor pub fn generate_agg_schema( input: &dyn Executor, agg_calls: &[AggCall], diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 93cfa74b80495..3ec5934d6ca63 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -492,7 +492,7 @@ mod tests { ) -> (StateTable, StateTable) { let column_descs = ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64); // TODO: enable sanity check for dynamic filter - let state_table_l = StateTable::new_without_distribution_no_sanity_check( + let state_table_l = StateTable::<_>::new_without_distribution_no_sanity_check( mem_state.clone(), TableId::new(0), vec![column_descs.clone()], @@ -500,7 +500,7 @@ mod tests { vec![0], ) .await; - let state_table_r = StateTable::new_without_distribution_no_sanity_check( + let state_table_r = StateTable::<_>::new_without_distribution_no_sanity_check( mem_state, TableId::new(1), vec![column_descs], diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index ebc9e126c6658..c9bef410a7c1e 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::marker::PhantomData; +use std::ptr::NonNull; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -25,7 +26,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashCode, HashKey, PrecomputedBuildHasher}; -use risingwave_common::row::RowExt; +use risingwave_common::row::{Row, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::hash_util::Crc32FastBuilder; use risingwave_common::util::iter_util::ZipEqFast; @@ -37,6 +38,9 @@ use super::{ }; use crate::cache::{cache_may_stale, new_with_hasher, ExecutorCache}; use crate::common::table::state_table::StateTable; +use crate::common::table::watermark::{ + WatermarkBufferStrategy, WatermarkBufferStrategyByEpochDefault, WatermarkNoBuffer, +}; use crate::error::StreamResult; use crate::executor::aggregation::{generate_agg_schema, AggCall, AggChangesInfo, AggGroup}; use crate::executor::error::StreamExecutorError; @@ -47,6 +51,42 @@ use crate::task::AtomicU64Ref; type BoxedAggGroup = Box>; type AggGroupCache = ExecutorCache, PrecomputedBuildHasher>; +pub trait EmitPolicy: Send + Sync + 'static { + type WatermarkBufferStrategy: WatermarkBufferStrategy; + + fn should_emit(key: impl Row, buffered_watermarks: &[Option]) -> bool; +} + +pub struct EmitImmediate; + +impl EmitPolicy for EmitImmediate { + type WatermarkBufferStrategy = WatermarkBufferStrategyByEpochDefault; + + #[inline(always)] + fn should_emit(_key: impl Row, _buffered_watermarks: &[Option]) -> bool { + true + } +} + +pub struct EmitOnWatermarkClose; + +impl EmitPolicy for EmitOnWatermarkClose { + type WatermarkBufferStrategy = WatermarkNoBuffer; + + #[inline(always)] + fn should_emit(key: impl Row, buffered_watermarks: &[Option]) -> bool { + let Some(Some(cur_watermark)) = buffered_watermarks.get(0) else { + // There is no watermark value now, so we can't emit any value. + return false; + }; + let Some(ref watermark_val) = key.datum_at(0) else { + // NULL is unexpected in watermark column, however, if it exists, we'll treat it as the largest, so emit it here. + return true; + }; + watermark_val <= &cur_watermark.val.as_scalar_ref_impl() + } +} + /// [`HashAggExecutor`] could process large amounts of data using a state backend. It works as /// follows: /// @@ -57,15 +97,15 @@ type AggGroupCache = ExecutorCache, PrecomputedBuildHa /// * Upon a barrier is received, the executor will call `.flush` on the storage backend, so that /// all modifications will be flushed to the storage backend. Meanwhile, the executor will go /// through `modified_keys`, and produce a stream chunk based on the state changes. -pub struct HashAggExecutor { +pub struct HashAggExecutor { input: Box, - extra: HashAggExecutorExtra, + extra: HashAggExecutorExtra, - _phantom: PhantomData, + _phantom: PhantomData, } -struct HashAggExecutorExtra { +struct HashAggExecutorExtra { ctx: ActorContextRef, /// See [`Executor::schema`]. @@ -93,14 +133,14 @@ struct HashAggExecutorExtra { /// State table for the previous result of all agg calls. /// The outputs of all managed agg states are collected and stored in this /// table when `flush_data` is called. - result_table: StateTable, + result_table: StateTable, /// Indices of the columns /// all of the aggregation functions in this executor should depend on same group of keys group_key_indices: Vec, - /// Lru manager. None if using local eviction. - watermark_epoch: AtomicU64Ref, + /// The evict epoch for `GlobalMemoryManager`. None if using local eviction. + cache_evict_watermark_epoch: AtomicU64Ref, /// How many times have we hit the cache of join executor for the lookup of each key lookup_miss_count: AtomicU64, @@ -118,8 +158,9 @@ struct HashAggExecutorExtra { /// Extreme state cache size extreme_cache_size: usize, - /// Changed group keys in the current epoch (before next flush). - group_change_set: HashSet, + /// Held keys blocked by watermark, which should be emitted later. + /// The value is a boolean indicates whether the group is changed during the epoch. + held_keys: HashMap, /// The maximum size of the chunk produced by executor at a time. chunk_size: usize, @@ -131,7 +172,7 @@ struct HashAggExecutorExtra { buffered_watermarks: Vec>, } -impl Executor for HashAggExecutor { +impl Executor for HashAggExecutor { fn execute(self: Box) -> BoxedMessageStream { self.execute_inner().boxed() } @@ -149,14 +190,14 @@ impl Executor for HashAggExecutor { } } -impl HashAggExecutor { +impl HashAggExecutor { #[expect(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, input: Box, agg_calls: Vec, storages: Vec>, - result_table: StateTable, + result_table: StateTable, pk_indices: PkIndices, extreme_cache_size: usize, executor_id: u64, @@ -187,8 +228,8 @@ impl HashAggExecutor { storages, result_table, group_key_indices, - watermark_epoch, - group_change_set: HashSet::new(), + cache_evict_watermark_epoch: watermark_epoch, + held_keys: HashMap::new(), lookup_miss_count: AtomicU64::new(0), total_lookup_count: AtomicU64::new(0), chunk_lookup_miss_count: 0, @@ -255,7 +296,7 @@ impl HashAggExecutor { } async fn apply_chunk( - HashAggExecutorExtra:: { + HashAggExecutorExtra:: { ref ctx, ref identity, ref group_key_indices, @@ -265,14 +306,14 @@ impl HashAggExecutor { ref input_schema, ref input_pk_indices, ref extreme_cache_size, - ref mut group_change_set, + ref mut held_keys, ref schema, lookup_miss_count, total_lookup_count, ref mut chunk_lookup_miss_count, ref mut chunk_total_lookup_count, .. - }: &mut HashAggExecutorExtra, + }: &mut HashAggExecutorExtra, agg_group_cache: &mut AggGroupCache, chunk: StreamChunk, ) -> StreamExecutorResult<()> { @@ -323,19 +364,20 @@ impl HashAggExecutor { *chunk_lookup_miss_count += 1; } *chunk_total_lookup_count += 1; - let mut buffered = stream::iter(futs).buffer_unordered(10).fuse(); - while let Some(result) = buffered.next().await { - let (key, agg_group) = result?; - agg_group_cache.put(key, agg_group); + { + let mut buffered = stream::iter(futs).buffer_unordered(10).fuse(); + while let Some(result) = buffered.next().await { + let (key, agg_group) = result?; + agg_group_cache.put(key, agg_group); + } } - drop(buffered); // drop to avoid accidental use // Decompose the input chunk. let capacity = chunk.capacity(); let (ops, columns, visibility) = chunk.into_inner(); // Calculate the row visibility for every agg call. - let visibilities: Vec<_> = agg_calls + let visibilities: Vec> = agg_calls .iter() .map(|agg_call| { agg_call_filter_res( @@ -370,8 +412,8 @@ impl HashAggExecutor { // Apply chunk to each of the state (per agg_call), for each group. for (key, _, vis_map) in &unique_keys { - // Mark the group as changed. - group_change_set.insert(key.clone()); + // Mark the group as held and dirty. + held_keys.insert(key.clone(), true); let agg_group = agg_group_cache.get_mut(key).unwrap().as_mut(); let visibilities = visibilities .iter() @@ -387,13 +429,13 @@ impl HashAggExecutor { #[try_stream(ok = StreamChunk, error = StreamExecutorError)] async fn flush_data<'a>( - &mut HashAggExecutorExtra:: { + &mut HashAggExecutorExtra:: { ref ctx, ref group_key_indices, ref schema, ref mut storages, ref mut result_table, - ref mut group_change_set, + ref mut held_keys, ref lookup_miss_count, ref total_lookup_count, ref mut chunk_lookup_miss_count, @@ -402,7 +444,7 @@ impl HashAggExecutor { ref chunk_size, ref buffered_watermarks, .. - }: &'a mut HashAggExecutorExtra, + }: &'a mut HashAggExecutorExtra, agg_group_cache: &'a mut AggGroupCache, epoch: EpochPair, ) { @@ -435,22 +477,27 @@ impl HashAggExecutor { .inc_by(*chunk_total_lookup_count); *chunk_total_lookup_count = 0; - let dirty_cnt = group_change_set.len(); + let dirty_cnt = held_keys.len(); if dirty_cnt > 0 { // Produce the stream chunk let group_key_data_types = &schema.data_types()[..group_key_indices.len()]; - let mut group_chunks = IterChunks::chunks(group_change_set.drain(), *chunk_size); - while let Some(batch) = group_chunks.next() { - let keys_in_batch = batch.into_iter().collect_vec(); + let mut to_flush_keys = Vec::with_capacity(held_keys.len()); + let mut to_emit_chunks = IterChunks::chunks( + held_keys + .drain_filter(|k, dirty| { + if *dirty { + to_flush_keys.push(k.clone()); + } + *dirty = false; - // Flush agg states. - for key in &keys_in_batch { - let agg_group = agg_group_cache - .get_mut(key) - .expect("changed group must have corresponding AggGroup") - .as_mut(); - agg_group.flush_state_if_needed(storages).await?; - } + let key = K::deserialize(k, group_key_data_types).unwrap(); + E::should_emit(key, buffered_watermarks) + }) + .map(|(k, _)| k), + *chunk_size, + ); + while let Some(batch) = to_emit_chunks.next() { + let keys_in_batch = batch.into_iter().collect_vec(); // Create array builders. // As the datatype is retrieved from schema, it contains both group key and @@ -458,34 +505,57 @@ impl HashAggExecutor { let mut builders = schema.create_array_builders(chunk_size * 2); let mut new_ops = Vec::with_capacity(chunk_size * 2); + let agg_groups = { + let mut agg_group_cache_ptr: NonNull<_> = agg_group_cache.into(); + keys_in_batch + .iter() + .map(|key| { + // SAFETY: `keys_in_batch` is a subset of `held_keys`, which is a + // `HashMap`, so we can ensure that every `&mut agg_group_cache` is + // unique. + unsafe { + let agg_group_cache = agg_group_cache_ptr.as_mut(); + agg_group_cache + .get_mut(key) + .expect("changed group must have corresponding AggGroup") + } + }) + .collect_vec() + }; + // Calculate current outputs, concurrently. - let futs = keys_in_batch.into_iter().map(|key| { - // Pop out the agg group temporarily. - let mut agg_group = agg_group_cache - .pop(&key) - .expect("changed group must have corresponding AggGroup"); - async { - let curr_outputs = agg_group.get_outputs(storages).await?; - Ok::<_, StreamExecutorError>((key, agg_group, curr_outputs)) - } - }); + // FIXME: In fact, we don't need to collect `futs` as `Vec`, but rustc will report + // a weird error `error: higher-ranked lifetime error`. + #[expect(clippy::disallowed_methods)] + let futs: Vec<_> = keys_in_batch + .iter() + .zip(agg_groups.into_iter()) + .map(|(key, agg_group)| { + // Pop out the agg group temporarily. + let storages = &storages; + async move { + let curr_outputs = agg_group.get_outputs(storages).await?; + Ok::<_, StreamExecutorError>((key, agg_group, curr_outputs)) + } + }) + .collect(); let outputs_in_batch: Vec<_> = stream::iter(futs) .buffer_unordered(10) .fuse() .try_collect() .await?; - for (key, mut agg_group, curr_outputs) in outputs_in_batch { - let AggChangesInfo { - n_appended_ops, - result_row, - prev_outputs, - } = agg_group.build_changes( + for (key, agg_group, curr_outputs) in outputs_in_batch { + let agg_change_info = agg_group.build_changes( curr_outputs, &mut builders[group_key_indices.len()..], &mut new_ops, ); - + let AggChangesInfo { + n_appended_ops, + prev_outputs, + result_row, + } = agg_change_info; if n_appended_ops != 0 { for _ in 0..n_appended_ops { key.deserialize_to_builders( @@ -494,21 +564,20 @@ impl HashAggExecutor { )?; } if let Some(prev_outputs) = prev_outputs { - let old_row = agg_group.group_key().chain(prev_outputs); + // FIXME: double deserialization here + let group_key = K::deserialize(key, group_key_data_types).unwrap(); + let old_row = group_key.chain(prev_outputs); result_table.update(old_row, result_row); } else { result_table.insert(result_row); } } - - // Put the agg group back into the agg group cache. - agg_group_cache.put(key, agg_group); } let columns = builders .into_iter() - .map(|builder| Ok::<_, StreamExecutorError>(builder.finish().into())) - .try_collect()?; + .map(|builder| builder.finish().into()) + .collect(); let chunk = StreamChunk::new(new_ops, columns, None); @@ -516,6 +585,17 @@ impl HashAggExecutor { yield chunk; } + drop(to_emit_chunks); + + for key in to_flush_keys { + // Flush agg states. + let agg_group = agg_group_cache + .get_mut(&key) + .expect("changed group must have corresponding AggGroup") + .as_mut(); + agg_group.flush_state_if_needed(storages).await?; + } + // Commit all state tables. futures::future::try_join_all(iter_table_storage(storages).map(|state_table| async { if let Some(watermark) = state_clean_watermark.as_ref() { @@ -556,7 +636,7 @@ impl HashAggExecutor { // The cached state managers. `HashKey` -> `AggGroup`. let mut agg_group_cache = AggGroupCache::new(new_with_hasher( - extra.watermark_epoch.clone(), + extra.cache_evict_watermark_epoch.clone(), PrecomputedBuildHasher, )); @@ -643,6 +723,7 @@ mod tests { use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::StateStore; + use super::{EmitImmediate, EmitOnWatermarkClose}; use crate::executor::aggregation::{AggArgs, AggCall}; use crate::executor::monitor::StreamingMetrics; use crate::executor::test_utils::agg_executor::{create_agg_state_table, create_result_table}; @@ -650,6 +731,7 @@ mod tests { use crate::executor::{ActorContext, Executor, HashAggExecutor, Message, PkIndices}; #[allow(clippy::too_many_arguments)] + #[cfg(test)] async fn new_boxed_hash_agg_executor( store: S, input: Box, @@ -658,7 +740,11 @@ mod tests { pk_indices: PkIndices, extreme_cache_size: usize, executor_id: u64, + // TODO: should we use an enum here? + emit_immediate: bool, ) -> Box { + use super::EmitPolicy; + let mut agg_state_tables = Vec::with_capacity(agg_calls.iter().len()); for (idx, agg_call) in agg_calls.iter().enumerate() { agg_state_tables.push( @@ -674,31 +760,65 @@ mod tests { ) } - let result_table = create_result_table( - store, - TableId::new(agg_calls.len() as u32), - &agg_calls, - &group_key_indices, - input.as_ref(), - ) - .await; + if emit_immediate { + type E = EmitImmediate; - HashAggExecutor::::new( - ActorContext::create(123), - input, - agg_calls, - agg_state_tables, - result_table, - pk_indices, - extreme_cache_size, - executor_id, - group_key_indices, - Arc::new(AtomicU64::new(0)), - Arc::new(StreamingMetrics::unused()), - 1024, - ) - .unwrap() - .boxed() + let result_table = + create_result_table::::WatermarkBufferStrategy>( + store, + TableId::new(agg_calls.len() as u32), + &agg_calls, + &group_key_indices, + input.as_ref(), + ) + .await; + + HashAggExecutor::::new( + ActorContext::create(123), + input, + agg_calls, + agg_state_tables, + result_table, + pk_indices, + extreme_cache_size, + executor_id, + group_key_indices, + Arc::new(AtomicU64::new(0)), + Arc::new(StreamingMetrics::unused()), + 1024, + ) + .unwrap() + .boxed() + } else { + type E = EmitOnWatermarkClose; + + let result_table = + create_result_table::::WatermarkBufferStrategy>( + store, + TableId::new(agg_calls.len() as u32), + &agg_calls, + &group_key_indices, + input.as_ref(), + ) + .await; + + HashAggExecutor::::new( + ActorContext::create(123), + input, + agg_calls, + agg_state_tables, + result_table, + pk_indices, + extreme_cache_size, + executor_id, + group_key_indices, + Arc::new(AtomicU64::new(0)), + Arc::new(StreamingMetrics::unused()), + 1024, + ) + .unwrap() + .boxed() + } } // --- Test HashAgg with in-memory KeyedState --- @@ -782,6 +902,7 @@ mod tests { vec![], 1 << 10, 1, + true, ) .await; let mut hash_agg = hash_agg.execute(); @@ -884,6 +1005,7 @@ mod tests { vec![], 1 << 10, 1, + true, ) .await; let mut hash_agg = hash_agg.execute(); @@ -978,6 +1100,7 @@ mod tests { vec![2], 1 << 10, 1, + true, ) .await; let mut hash_agg = hash_agg.execute(); @@ -1077,6 +1200,7 @@ mod tests { vec![2], 1 << 10, 1, + true, ) .await; let mut hash_agg = hash_agg.execute(); @@ -1115,6 +1239,136 @@ mod tests { ); } + #[tokio::test] + async fn test_window_agg_in_memory() { + test_window_agg(MemoryStateStore::new()).await; + } + + async fn test_window_agg(store: S) { + let schema = Schema { + fields: vec![ + // group key column + Field::unnamed(DataType::Int64), + // data column to get minimum + Field::unnamed(DataType::Int64), + // primary key column + Field::unnamed(DataType::Int64), + ], + }; + let (mut tx, source) = MockSource::channel(schema, vec![2]); // pk + + tx.push_barrier(1, false); + tx.push_chunk(StreamChunk::from_pretty( + " I I I + + 1 2 1001 + + 1 4 1002 + + 2 6 1003", + )); + tx.push_watermark(0, DataType::Int64, 1i64.into()); + tx.push_barrier(2, false); + tx.push_chunk(StreamChunk::from_pretty( + " I I I + + 4 16 1004 + + 3 14 1005 + + 2 12 1006", + )); + tx.push_watermark(0, DataType::Int64, 2i64.into()); + tx.push_chunk(StreamChunk::from_pretty( + " I I I + + 3 13 1007 + + 5 11 1008 + + 3 15 1009", + )); + tx.push_barrier(3, false); + tx.push_watermark(0, DataType::Int64, 4i64.into()); + tx.push_barrier(4, false); + + // This is local hash aggregation, so we add another row count state + let keys = vec![0]; + let agg_calls = vec![ + AggCall { + kind: AggKind::Count, + args: AggArgs::None, + return_type: DataType::Int64, + order_pairs: vec![], + append_only: false, + filter: None, + }, + AggCall { + kind: AggKind::Sum, + args: AggArgs::Unary(DataType::Int64, 1), + return_type: DataType::Int64, + order_pairs: vec![], + append_only: false, + filter: None, + }, + ]; + + let return_tys = vec![DataType::Int64, DataType::Int64, DataType::Int64]; + let row_pretty = |s: &str| OwnedRow::from_pretty_with_tys(&return_tys, s); + + let hash_agg = new_boxed_hash_agg_executor( + store, + Box::new(source), + agg_calls, + keys, + vec![2], + 1 << 10, + 1, + false, + ) + .await; + + let mut hash_agg = hash_agg.execute(); + + // Consume the init barrier + hash_agg.next().await.unwrap().unwrap(); + // Consume stream chunk + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Chunk(chunk) = msg else { unreachable!() }; + let [(op, row)]: [_; 1] = chunk.rows().collect_vec().try_into().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!(row.to_owned_row(), row_pretty("1 2 6")); + + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Watermark(watermark) = msg else { unreachable!() }; + assert_eq!(watermark.col_idx, 0); + assert_eq!(watermark.val, 1i64.into()); + + // Consume a barrier + hash_agg.next().await.unwrap().unwrap(); + + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Chunk(chunk) = msg else { unreachable!() }; + let [(op, row)]: [_; 1] = chunk.rows().collect_vec().try_into().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!(row.to_owned_row(), row_pretty("2 2 18")); + + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Watermark(watermark) = msg else { unreachable!() }; + assert_eq!(watermark.col_idx, 0); + assert_eq!(watermark.val, 2i64.into()); + + // Consume a barrier + hash_agg.next().await.unwrap().unwrap(); + + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Chunk(chunk) = msg else { unreachable!() }; + let [(op1, row1), (op2, row2)]: [_; 2] = chunk.sorted_rows().try_into().unwrap(); + assert_eq!(op1, Op::Insert); + assert_eq!(row1, row_pretty("3 3 42")); + assert_eq!(op2, Op::Insert); + assert_eq!(row2, row_pretty("4 1 16")); + + let msg = hash_agg.next().await.unwrap().unwrap(); + let Message::Watermark(watermark) = msg else { unreachable!() }; + assert_eq!(watermark.col_idx, 0); + assert_eq!(watermark.val, 4i64.into()); + + // Consume a barrier + hash_agg.next().await.unwrap().unwrap(); + } + trait SortedRows { fn sorted_rows(self) -> Vec<(Op, OwnedRow)>; } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 57e52589517d2..674d723b76cc1 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -1032,7 +1032,7 @@ mod tests { .enumerate() .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) .collect_vec(); - let state_table = StateTable::new_without_distribution( + let state_table = StateTable::<_>::new_without_distribution( mem_state.clone(), TableId::new(table_id), column_descs, @@ -1053,7 +1053,7 @@ mod tests { ColumnId::new(pk_indices.len() as i32), DataType::Int64, )); - let degree_state_table = StateTable::new_without_distribution( + let degree_state_table = StateTable::<_>::new_without_distribution( mem_state, TableId::new(table_id + 1), degree_table_column_descs, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index bd3ee2d44c4b5..283f9e150271f 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -108,7 +108,7 @@ pub use error::{StreamExecutorError, StreamExecutorResult}; pub use expand::ExpandExecutor; pub use filter::FilterExecutor; pub use global_simple_agg::GlobalSimpleAggExecutor; -pub use hash_agg::HashAggExecutor; +pub use hash_agg::{EmitImmediate, EmitOnWatermarkClose, EmitPolicy, HashAggExecutor}; pub use hash_join::*; pub use hop_window::HopWindowExecutor; pub use local_simple_agg::LocalSimpleAggExecutor; diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 1b8446562fd3f..87b14ae35e13a 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -79,7 +79,7 @@ impl MaterializeExecutor { let schema = input.schema().clone(); - let state_table = StateTable::from_table_catalog(table_catalog, store, vnodes).await; + let state_table = StateTable::<_>::from_table_catalog(table_catalog, store, vnodes).await; Self { input, @@ -117,7 +117,7 @@ impl MaterializeExecutor { .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type())) .collect_vec(); - let state_table = StateTable::new_without_distribution( + let state_table = StateTable::<_>::new_without_distribution( store, table_id, columns, diff --git a/src/stream/src/executor/mview/test_utils.rs b/src/stream/src/executor/mview/test_utils.rs index df060d1e43a92..566848109cbbc 100644 --- a/src/stream/src/executor/mview/test_utils.rs +++ b/src/stream/src/executor/mview/test_utils.rs @@ -33,7 +33,7 @@ pub async fn gen_basic_table(row_count: usize) -> StorageTable ColumnDesc::unnamed(column_ids[2], DataType::Int32), ]; let pk_indices = vec![0_usize, 1_usize]; - let mut state = StateTable::new_without_distribution( + let mut state = StateTable::<_>::new_without_distribution( state_store.clone(), TableId::from(0x42), column_descs.clone(), diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 3476c8433676c..6beb62fe0481f 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -272,7 +272,7 @@ mod tests { let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)]; let order_types = create_order_types(); let pk_indices = create_pk_indices(); - StateTable::new_without_distribution( + StateTable::<_>::new_without_distribution( memory_state_store, table_id, column_descs, diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 753945f2a6d0f..2c8dbd5563570 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -504,7 +504,7 @@ mod tests { ]; let order_types = vec![OrderType::Ascending]; let pk_indices = create_pk_indices(); - StateTable::new_without_distribution( + StateTable::<_>::new_without_distribution( memory_state_store, table_id, column_descs, diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index e7d96b8ffdb7f..b895ba339b0f2 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -261,7 +261,7 @@ mod tests { let row_pretty = |s: &str| OwnedRow::from_pretty_with_tys(&tys, s); let order_types = vec![OrderType::Ascending]; - let mut state_table = StateTable::new_without_distribution( + let mut state_table = StateTable::<_>::new_without_distribution( state_store.clone(), table_id, column_descs.clone(), @@ -326,7 +326,7 @@ mod tests { // Failover and recover drop(sort_buffer); - let mut state_table = StateTable::new_without_distribution( + let mut state_table = StateTable::<_>::new_without_distribution( state_store.clone(), table_id, column_descs, diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index ba2e201aec155..4c8be830c652a 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -51,7 +51,7 @@ impl SourceStateTableHandler { .contains_key(&String::from(PROPERTIES_RETENTION_SECOND_KEY))); Self { - state_store: StateTable::from_table_catalog(table_catalog, store, None).await, + state_store: StateTable::<_>::from_table_catalog(table_catalog, store, None).await, } } @@ -241,9 +241,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_from_table_catalog() { let store = MemoryStateStore::new(); - let mut state_table = - StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None) - .await; + let mut state_table = StateTable::<_>::from_table_catalog( + &default_source_internal_table(0x2333), + store, + None, + ) + .await; let a: Arc = String::from("a").into(); let a: Datum = Some(ScalarImpl::Utf8(a.as_ref().into())); let b: Arc = String::from("b").into(); diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 8156568e2eee3..ec6351a224aea 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -214,6 +214,7 @@ pub mod agg_executor { use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; + use crate::common::table::watermark::WatermarkBufferStrategy; use crate::common::StateTableColumnMapping; use crate::executor::aggregation::{AggCall, AggStateStorage}; use crate::executor::{ @@ -263,7 +264,7 @@ pub mod agg_executor { add_column(*idx, input_fields[*idx].data_type(), OrderType::Ascending); } - let state_table = StateTable::new_without_distribution( + let state_table = StateTable::<_>::new_without_distribution( store, table_id, column_descs, @@ -289,13 +290,13 @@ pub mod agg_executor { } /// Create result state table for agg executor. - pub async fn create_result_table( + pub async fn create_result_table( store: S, table_id: TableId, agg_calls: &[AggCall], group_key_indices: &[usize], input_ref: &dyn Executor, - ) -> StateTable { + ) -> StateTable { let input_fields = input_ref.schema().fields(); let mut column_descs = Vec::new(); @@ -319,7 +320,7 @@ pub mod agg_executor { add_column_desc(agg_call.return_type.clone()); }); - StateTable::new_without_distribution( + StateTable::<_, W>::new_without_distribution( store, table_id, column_descs, @@ -411,7 +412,7 @@ pub mod top_n_executor { .enumerate() .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) .collect_vec(); - StateTable::new_without_distribution( + StateTable::<_>::new_without_distribution( state_store, TableId::new(0), column_descs, diff --git a/src/stream/src/from_proto/agg_common.rs b/src/stream/src/from_proto/agg_common.rs index dcfc9d370e260..8392dd617e56f 100644 --- a/src/stream/src/from_proto/agg_common.rs +++ b/src/stream/src/from_proto/agg_common.rs @@ -93,7 +93,7 @@ pub async fn build_agg_state_storages_from_proto( let agg_state_store = match agg_call_state.get_inner().unwrap() { agg_call_state::Inner::ResultValueState(..) => AggStateStorage::ResultValue, agg_call_state::Inner::TableState(state) => { - let table = StateTable::from_table_catalog( + let table = StateTable::<_>::from_table_catalog( state.get_table().unwrap(), store.clone(), vnodes.clone(), @@ -102,7 +102,7 @@ pub async fn build_agg_state_storages_from_proto( AggStateStorage::Table { table } } agg_call_state::Inner::MaterializedInputState(state) => { - let table = StateTable::from_table_catalog( + let table = StateTable::<_>::from_table_catalog( state.get_table().unwrap(), store.clone(), vnodes.clone(), diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index b108aa0c4f79c..8bdaca3d403c4 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -57,16 +57,19 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { } // TODO: enable sanity check for dynamic filter - let state_table_l = StateTable::from_table_catalog_no_sanity_check( + let state_table_l = StateTable::<_>::from_table_catalog_no_sanity_check( node.get_left_table()?, store.clone(), Some(vnodes), ) .await; - let state_table_r = - StateTable::from_table_catalog_no_sanity_check(node.get_right_table()?, store, None) - .await; + let state_table_r = StateTable::<_>::from_table_catalog_no_sanity_check( + node.get_right_table()?, + store, + None, + ) + .await; Ok(Box::new(DynamicFilterExecutor::new( params.actor_context, diff --git a/src/stream/src/from_proto/global_simple_agg.rs b/src/stream/src/from_proto/global_simple_agg.rs index 83af701e91b19..ef98feb5ed0a2 100644 --- a/src/stream/src/from_proto/global_simple_agg.rs +++ b/src/stream/src/from_proto/global_simple_agg.rs @@ -44,7 +44,8 @@ impl ExecutorBuilder for GlobalSimpleAggExecutorBuilder { build_agg_state_storages_from_proto(node.get_agg_call_states(), store.clone(), None) .await; let result_table = - StateTable::from_table_catalog(node.get_result_table().unwrap(), store, None).await; + StateTable::<_>::from_table_catalog(node.get_result_table().unwrap(), store, None) + .await; Ok(GlobalSimpleAggExecutor::new( params.actor_context, diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 6d287f2a464e5..0dca506daa7d9 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -43,7 +43,7 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { .collect(); let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); - let state_table = StateTable::from_table_catalog(table, store, vnodes).await; + let state_table = StateTable::<_>::from_table_catalog(table, store, vnodes).await; let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); let [input]: [_; 1] = params.input.try_into().unwrap(); let group_key_types = group_by diff --git a/src/stream/src/from_proto/group_top_n_appendonly.rs b/src/stream/src/from_proto/group_top_n_appendonly.rs index f7093bd1cdbd6..9d9794d3aaabd 100644 --- a/src/stream/src/from_proto/group_top_n_appendonly.rs +++ b/src/stream/src/from_proto/group_top_n_appendonly.rs @@ -57,7 +57,7 @@ impl ExecutorBuilder for AppendOnlyGroupTopNExecutorBuilder { .collect(); let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); - let state_table = StateTable::from_table_catalog(table, store, vnodes).await; + let state_table = StateTable::<_>::from_table_catalog(table, store, vnodes).await; let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); let [input]: [_; 1] = params.input.try_into().unwrap(); let group_key_types = group_by diff --git a/src/stream/src/from_proto/hash_agg.rs b/src/stream/src/from_proto/hash_agg.rs index e1f5dc19ac288..3eb1ef63ef409 100644 --- a/src/stream/src/from_proto/hash_agg.rs +++ b/src/stream/src/from_proto/hash_agg.rs @@ -25,15 +25,15 @@ use super::*; use crate::common::table::state_table::StateTable; use crate::executor::aggregation::{AggCall, AggStateStorage}; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, HashAggExecutor, PkIndices}; +use crate::executor::{ActorContextRef, EmitImmediate, EmitPolicy, HashAggExecutor, PkIndices}; use crate::task::AtomicU64Ref; -pub struct HashAggExecutorDispatcherArgs { +pub struct HashAggExecutorDispatcherArgs { ctx: ActorContextRef, input: BoxedExecutor, agg_calls: Vec, storages: Vec>, - result_table: StateTable, + result_table: StateTable, group_key_indices: Vec, group_key_types: Vec, pk_indices: PkIndices, @@ -44,11 +44,11 @@ pub struct HashAggExecutorDispatcherArgs { chunk_size: usize, } -impl HashKeyDispatcher for HashAggExecutorDispatcherArgs { +impl HashKeyDispatcher for HashAggExecutorDispatcherArgs { type Output = StreamResult; fn dispatch_impl(self) -> Self::Output { - Ok(HashAggExecutor::::new( + Ok(HashAggExecutor::::new( self.ctx, self.input, self.agg_calls, @@ -110,9 +110,9 @@ impl ExecutorBuilder for HashAggExecutorBuilder { .await; let result_table = - StateTable::from_table_catalog(node.get_result_table().unwrap(), store, vnodes).await; + StateTable::<_, ::WatermarkBufferStrategy>::from_table_catalog(node.get_result_table().unwrap(), store, vnodes).await; - let args = HashAggExecutorDispatcherArgs { + let args = HashAggExecutorDispatcherArgs::<_, EmitImmediate> { ctx: params.actor_context, input, agg_calls, diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index 4f0b8b19143ac..20bddf89cd6ec 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -90,15 +90,18 @@ impl ExecutorBuilder for HashJoinExecutorBuilder { .collect_vec(); let state_table_l = - StateTable::from_table_catalog(table_l, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_l = - StateTable::from_table_catalog(degree_table_l, store.clone(), Some(vnodes.clone())) - .await; + StateTable::<_>::from_table_catalog(table_l, store.clone(), Some(vnodes.clone())).await; + let degree_state_table_l = StateTable::<_>::from_table_catalog( + degree_table_l, + store.clone(), + Some(vnodes.clone()), + ) + .await; let state_table_r = - StateTable::from_table_catalog(table_r, store.clone(), Some(vnodes.clone())).await; + StateTable::<_>::from_table_catalog(table_r, store.clone(), Some(vnodes.clone())).await; let degree_state_table_r = - StateTable::from_table_catalog(degree_table_r, store, Some(vnodes)).await; + StateTable::<_>::from_table_catalog(degree_table_r, store, Some(vnodes)).await; let args = HashJoinExecutorDispatcherArgs { ctx: params.actor_context, diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index f34f2ab07abe3..2ea6480236010 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -41,7 +41,7 @@ impl ExecutorBuilder for NowExecutorBuilder { .register_sender(params.actor_context.id, sender); let state_table = - StateTable::from_table_catalog(node.get_state_table()?, store, None).await; + StateTable::<_>::from_table_catalog(node.get_state_table()?, store, None).await; Ok(Box::new(NowExecutor::new( barrier_receiver, diff --git a/src/stream/src/from_proto/sort.rs b/src/stream/src/from_proto/sort.rs index fe6d4a28d2e23..57dd3c6c9ef00 100644 --- a/src/stream/src/from_proto/sort.rs +++ b/src/stream/src/from_proto/sort.rs @@ -35,7 +35,7 @@ impl ExecutorBuilder for SortExecutorBuilder { let [input]: [_; 1] = params.input.try_into().unwrap(); let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for sort")); let state_table = - StateTable::from_table_catalog(node.get_state_table()?, store, Some(vnodes)).await; + StateTable::<_>::from_table_catalog(node.get_state_table()?, store, Some(vnodes)).await; Ok(Box::new(SortExecutor::new( params.actor_context, input, diff --git a/src/stream/src/from_proto/top_n.rs b/src/stream/src/from_proto/top_n.rs index 6b80f17c208b9..619de70067901 100644 --- a/src/stream/src/from_proto/top_n.rs +++ b/src/stream/src/from_proto/top_n.rs @@ -37,7 +37,7 @@ impl ExecutorBuilder for TopNExecutorNewBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); - let state_table = StateTable::from_table_catalog(table, store, vnodes).await; + let state_table = StateTable::<_>::from_table_catalog(table, store, vnodes).await; let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); diff --git a/src/stream/src/from_proto/top_n_appendonly.rs b/src/stream/src/from_proto/top_n_appendonly.rs index 3f23dc690a28f..c317fd6f2a84b 100644 --- a/src/stream/src/from_proto/top_n_appendonly.rs +++ b/src/stream/src/from_proto/top_n_appendonly.rs @@ -37,7 +37,7 @@ impl ExecutorBuilder for AppendOnlyTopNExecutorBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); - let state_table = StateTable::from_table_catalog(table, store, vnodes).await; + let state_table = StateTable::<_>::from_table_catalog(table, store, vnodes).await; let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index b130deddd9dfc..535aaf0e2be20 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -47,7 +47,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { // TODO: may enable sanity check for watermark filter after we have upsert. let [table]: [_; 1] = node.get_tables().clone().try_into().unwrap(); let table = - StateTable::from_table_catalog_no_sanity_check(&table, store, Some(vnodes)).await; + StateTable::<_>::from_table_catalog_no_sanity_check(&table, store, Some(vnodes)).await; Ok(WatermarkFilterExecutor::new( input,