Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): lazy emit for HashAggExecutor #7752

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async fn test_row_seq_scan() -> Result<()> {
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
];

let mut state = StateTable::new_without_distribution(
let mut state = StateTable::<_>::new_without_distribution(
memory_state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/common/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

pub mod state_table;
mod watermark;
pub mod watermark;

#[cfg(test)]
pub mod test_state_table;
Expand Down
11 changes: 4 additions & 7 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,15 @@ use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution
use risingwave_storage::StateStore;
use tracing::trace;

use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
use super::watermark::{WatermarkBufferStrategy, WatermarkBufferStrategyByEpochDefault};
use crate::executor::{StreamExecutorError, StreamExecutorResult};

/// This num is arbitrary and we may want to improve this choice in the future.
const STATE_CLEANING_PERIOD_EPOCH: usize = 5;

/// `StateTable` is the interface accessing relational data in KV(`StateStore`) with
/// row-based encoding.
#[derive(Clone)]
pub struct StateTable<
S: StateStore,
W: WatermarkBufferStrategy = WatermarkBufferByEpoch<STATE_CLEANING_PERIOD_EPOCH>,
W: WatermarkBufferStrategy = WatermarkBufferStrategyByEpochDefault,
> {
/// Id for this table.
table_id: TableId,
Expand Down Expand Up @@ -508,7 +505,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions);

// point get
impl<S: StateStore> StateTable<S> {
impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
/// Get a single row from state table.
pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
let compacted_row: Option<CompactedRow> = self.get_compacted_row(pk).await?;
Expand Down Expand Up @@ -600,7 +597,7 @@ impl<S: StateStore> StateTable<S> {
}

// write
impl<S: StateStore> StateTable<S> {
impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
#[expect(clippy::boxed_local)]
fn handle_mem_table_error(&self, e: Box<MemTableError>) {
match *e {
Expand Down
18 changes: 9 additions & 9 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn test_state_table_update_insert() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -228,7 +228,7 @@ async fn test_state_table_iter_with_prefix() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -353,7 +353,7 @@ async fn test_state_table_iter_with_pk_range() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -487,7 +487,7 @@ async fn test_mem_table_assertion() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -530,7 +530,7 @@ async fn test_state_table_iter_with_value_indices() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -691,7 +691,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -926,7 +926,7 @@ async fn test_state_table_write_chunk() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -1055,7 +1055,7 @@ async fn test_state_table_write_chunk_visibility() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -1182,7 +1182,7 @@ async fn test_state_table_write_chunk_value_indices() {

test_env.register_table(table.clone()).await;
let mut state_table =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let epoch = EpochPair::new_test_epoch(1);
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/common/table/test_storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn test_storage_table_value_indices() {

test_env.register_table(table.clone()).await;
let mut state =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let table = StorageTable::for_test(
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() {

test_env.register_table(table.clone()).await;
let mut state =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let mut epoch = EpochPair::new_test_epoch(1);
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() {

test_env.register_table(table.clone()).await;
let mut state =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)];
Expand Down Expand Up @@ -402,7 +402,7 @@ async fn test_batch_scan_with_value_indices() {

test_env.register_table(table.clone()).await;
let mut state =
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
.await;

let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)];
Expand Down
8 changes: 7 additions & 1 deletion src/stream/src/common/table/watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

/// Strategy to decide how to buffer the watermarks, used for state cleaning.
pub trait WatermarkBufferStrategy: Default {
pub trait WatermarkBufferStrategy: Default + Send + Sync + 'static {
/// Trigger when a epoch is committed.
fn tick(&mut self);

Expand Down Expand Up @@ -58,3 +58,9 @@ impl<const PERIOD: usize> WatermarkBufferStrategy for WatermarkBufferByEpoch<PER
}
}
}

/// This num is arbitrary and we may want to improve this choice in the future.
const STATE_CLEANING_PERIOD_EPOCH: usize = 5;

pub type WatermarkBufferStrategyByEpochDefault =
WatermarkBufferByEpoch<STATE_CLEANING_PERIOD_EPOCH>;
7 changes: 4 additions & 3 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_storage::StateStore;
use super::agg_state::{AggState, AggStateStorage};
use super::AggCall;
use crate::common::table::state_table::StateTable;
use crate::common::table::watermark::WatermarkBufferStrategy;
use crate::executor::error::StreamExecutorResult;
use crate::executor::PkIndices;

Expand All @@ -47,7 +48,7 @@ impl<S: StateStore> Debug for AggGroup<S> {
f.debug_struct("AggGroup")
.field("group_key", &self.group_key)
.field("prev_outputs", &self.prev_outputs)
.finish()
.finish_non_exhaustive()
}
}

Expand All @@ -67,11 +68,11 @@ pub struct AggChangesInfo {
impl<S: StateStore> AggGroup<S> {
/// Create [`AggGroup`] for the given [`AggCall`]s and `group_key`.
/// For [`crate::executor::GlobalSimpleAggExecutor`], the `group_key` should be `None`.
pub async fn create(
pub async fn create<W: WatermarkBufferStrategy>(
group_key: Option<OwnedRow>,
agg_calls: &[AggCall],
storages: &[AggStateStorage<S>],
result_table: &StateTable<S>,
result_table: &StateTable<S, W>,
pk_indices: &PkIndices,
extreme_cache_size: usize,
input_schema: &Schema,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ mod tests {
.collect_vec();
let mapping = StateTableColumnMapping::new(upstream_columns, None);
let pk_len = order_types.len();
let table = StateTable::new_without_distribution(
let table = StateTable::<_>::new_without_distribution(
MemoryStateStore::new(),
table_id,
columns,
Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ mod state_cache;
mod table;
mod value;

/// Generate [`crate::executor::HashAggExecutor`]'s schema from `input`, `agg_calls` and
/// `group_key_indices`. For [`crate::executor::HashAggExecutor`], the group key indices should
/// be provided.
/// Generate [`HashAggExecutor`][HashAggExecutor]'s schema from `input`, `agg_calls` and
/// `group_key_indices`. For [`HashAggExecutor`][HashAggExecutor], the group key indices should be
/// provided.
///
/// [HashAggExecutor]:crate::executor::HashAggExecutor
pub fn generate_agg_schema(
input: &dyn Executor,
agg_calls: &[AggCall],
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ mod tests {
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64);
// TODO: enable sanity check for dynamic filter <https://github.com/risingwavelabs/risingwave/issues/3893>
let state_table_l = StateTable::new_without_distribution_no_sanity_check(
let state_table_l = StateTable::<_>::new_without_distribution_no_sanity_check(
mem_state.clone(),
TableId::new(0),
vec![column_descs.clone()],
vec![OrderType::Ascending],
vec![0],
)
.await;
let state_table_r = StateTable::new_without_distribution_no_sanity_check(
let state_table_r = StateTable::<_>::new_without_distribution_no_sanity_check(
mem_state,
TableId::new(1),
vec![column_descs],
Expand Down
Loading