From 99615584c0e37fd2bc12bcd80b9b70045e23a7f8 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 23 Aug 2023 23:31:07 -0700 Subject: [PATCH 01/16] Support spilling for hash aggregation --- .../aggregates/group_values/mod.rs | 14 + .../aggregates/group_values/primitive.rs | 27 ++ .../aggregates/group_values/row.rs | 44 ++- .../core/src/physical_plan/aggregates/mod.rs | 98 +++++- .../physical_plan/aggregates/order/partial.rs | 2 +- .../src/physical_plan/aggregates/row_hash.rs | 294 +++++++++++++++++- .../core/src/physical_plan/sorts/sort.rs | 4 +- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 27 +- .../physical-expr/src/aggregate/average.rs | 17 + .../physical-expr/src/aggregate/count.rs | 12 +- .../physical-expr/src/aggregate/first_last.rs | 16 +- .../aggregate/groups_accumulator/adapter.rs | 8 + .../aggregate/groups_accumulator/bool_op.rs | 14 + .../src/aggregate/groups_accumulator/mod.rs | 14 + .../aggregate/groups_accumulator/prim_op.rs | 16 +- 15 files changed, 555 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index 46f372b6ad28..e844eb9bdac5 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::SchemaRef; use datafusion_common::Result; @@ -42,6 +43,19 @@ pub trait GroupValues: Send { /// Emits the group values fn emit(&mut self, emit_to: EmitTo) -> Result>; + + /// Try to reserve the capacity that at least a single [`RecordBatch`] can be inserted. The + /// accumulator may reserve more space to speculatively avoid frequent re-allocations. After + /// calling try_reserve, capacity will be greater than or equal to self.len() + additional if + /// it returns Ok(()). Does nothing if capacity is already sufficient. This method preserves + /// the contents even if an error occurs. + fn try_reserve( + &mut self, + batch: &RecordBatch, + ) -> Result<(), hashbrown::TryReserveError>; + + /// clear the contents and shrink the capacity + fn clear_shrink(&mut self, batch: &RecordBatch); } pub fn new_group_values(schema: SchemaRef) -> Result> { diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index 7b8691c67fdd..883eea38d54d 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -20,6 +20,7 @@ use ahash::RandomState; use arrow::array::BooleanBufferBuilder; use arrow::buffer::NullBuffer; use arrow::datatypes::i256; +use arrow::record_batch::RecordBatch; use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; use arrow_schema::DataType; @@ -206,4 +207,30 @@ where }; Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } + + // FIXME: cannot return std::collections::TryReserveError because std::collections::TryReserveErrorKind + // is unstable. For now, use hashbrown::TryReserveError instead. + fn try_reserve( + &mut self, + batch: &RecordBatch, + ) -> Result<(), hashbrown::TryReserveError> { + let additional = batch.num_rows(); + self.values + .try_reserve(additional) + .map_err(|_| hashbrown::TryReserveError::CapacityOverflow) + .and({ + let state = &self.random_state; + self.map.try_reserve(additional, |g| unsafe { + self.values.get_unchecked(*g).hash(state) + }) + }) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + self.values.clear(); + self.values.shrink_to(count); + self.map.clear(); + self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + } } diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs index 4eb660d52590..d262f73281e7 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs @@ -17,6 +17,7 @@ use crate::physical_plan::aggregates::group_values::GroupValues; use ahash::RandomState; +use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; use arrow_array::ArrayRef; use arrow_schema::SchemaRef; @@ -59,17 +60,19 @@ pub struct GroupValuesRows { /// Random state for creating hashes random_state: RandomState, + + /// Schema fields for the row converter + fields: Vec, } impl GroupValuesRows { pub fn try_new(schema: SchemaRef) -> Result { - let row_converter = RowConverter::new( - schema - .fields() - .iter() - .map(|f| SortField::new(f.data_type().clone())) - .collect(), - )?; + let fields: Vec = schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(); + let row_converter = RowConverter::new(fields.clone())?; let map = RawTable::with_capacity(0); let group_values = row_converter.empty_rows(0, 0); @@ -81,6 +84,7 @@ impl GroupValuesRows { group_values, hashes_buffer: Default::default(), random_state: Default::default(), + fields, }) } } @@ -181,4 +185,30 @@ impl GroupValues for GroupValuesRows { } }) } + + // FIXME: cannot return std::collections::TryReserveError because std::collections::TryReserveErrorKind + // is unstable. For now, use hashbrown::TryReserveError instead. + fn try_reserve( + &mut self, + batch: &RecordBatch, + ) -> Result<(), hashbrown::TryReserveError> { + let additional = batch.num_rows(); + // FIXME: there is no good way to try_reserve for self.row_converter self.group_values + self.map.try_reserve(additional, |(hash, _)| *hash).and( + self.hashes_buffer + .try_reserve(additional) + .map_err(|_| hashbrown::TryReserveError::CapacityOverflow), + ) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + // FIXME: there is no good way to clear_shrink for self.row_converter self.group_values + self.row_converter = RowConverter::new(self.fields.clone()).unwrap(); + self.group_values = self.row_converter.empty_rows(count, 0); + self.map.clear(); + self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + self.hashes_buffer.clear(); + self.hashes_buffer.shrink_to(count); + } } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 8338da8ed677..9149ae619834 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -281,6 +281,8 @@ pub struct AggregateExec { /// Stores mode and output ordering information for the `AggregateExec`. aggregation_ordering: Option, required_input_ordering: Option, + /// Force spilling for debugging + force_spill: bool, } /// Calculates the working mode for `GROUP BY` queries. @@ -669,9 +671,34 @@ impl AggregateExec { metrics: ExecutionPlanMetricsSet::new(), aggregation_ordering, required_input_ordering, + force_spill: false, }) } + /// Only for testing. When `force_spill` is true, it spills every batch. + pub fn try_new_for_test( + mode: AggregateMode, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + order_by_expr: Vec>, + input: Arc, + input_schema: SchemaRef, + force_spill: bool, + ) -> Result { + let mut exec = AggregateExec::try_new( + mode, + group_by, + aggr_expr, + filter_expr, + order_by_expr, + input, + input_schema, + )?; + exec.force_spill = force_spill; + Ok(exec) + } + /// Aggregation mode (full, partial) pub fn mode(&self) -> &AggregateMode { &self.mode @@ -1389,7 +1416,10 @@ mod tests { ) } - async fn check_grouping_sets(input: Arc) -> Result<()> { + async fn check_grouping_sets( + input: Arc, + spill: bool, + ) -> Result<()> { let input_schema = input.schema(); let grouping_set = PhysicalGroupBy { @@ -1416,7 +1446,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); - let partial_aggregate = Arc::new(AggregateExec::try_new( + let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, grouping_set.clone(), aggregates.clone(), @@ -1424,6 +1454,7 @@ mod tests { vec![None], input, input_schema.clone(), + spill, )?); let result = @@ -1460,7 +1491,7 @@ mod tests { let final_grouping_set = PhysicalGroupBy::new_single(final_group); - let merged_aggregate = Arc::new(AggregateExec::try_new( + let merged_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Final, final_grouping_set, aggregates, @@ -1468,6 +1499,7 @@ mod tests { vec![None], merge, input_schema, + spill, )?); let result = @@ -1505,7 +1537,7 @@ mod tests { } /// build the aggregates on the data from some_data() and check the results - async fn check_aggregates(input: Arc) -> Result<()> { + async fn check_aggregates(input: Arc, spill: bool) -> Result<()> { let input_schema = input.schema(); let grouping_set = PhysicalGroupBy { @@ -1522,7 +1554,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); - let partial_aggregate = Arc::new(AggregateExec::try_new( + let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, grouping_set.clone(), aggregates.clone(), @@ -1530,6 +1562,7 @@ mod tests { vec![None], input, input_schema.clone(), + spill, )?); let result = @@ -1556,7 +1589,7 @@ mod tests { let final_grouping_set = PhysicalGroupBy::new_single(final_group); - let merged_aggregate = Arc::new(AggregateExec::try_new( + let merged_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Final, final_grouping_set, aggregates, @@ -1564,6 +1597,7 @@ mod tests { vec![None], merge, input_schema, + spill, )?); let result = @@ -1707,7 +1741,7 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec { yield_first: false }); - check_aggregates(input).await + check_aggregates(input, false).await } #[tokio::test] @@ -1715,7 +1749,7 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec { yield_first: false }); - check_grouping_sets(input).await + check_grouping_sets(input, false).await } #[tokio::test] @@ -1723,7 +1757,7 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec { yield_first: true }); - check_aggregates(input).await + check_aggregates(input, false).await } #[tokio::test] @@ -1731,7 +1765,39 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec { yield_first: true }); - check_grouping_sets(input).await + check_grouping_sets(input, false).await + } + + #[tokio::test] + async fn aggregate_source_not_yielding_with_spill() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: false }); + + check_aggregates(input, true).await + } + + #[tokio::test] + async fn aggregate_grouping_sets_source_not_yielding_with_spill() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: false }); + + check_grouping_sets(input, true).await + } + + #[tokio::test] + async fn aggregate_source_with_yielding_with_spill() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: true }); + + check_aggregates(input, true).await + } + + #[tokio::test] + async fn aggregate_grouping_sets_with_yielding_with_spill() -> Result<()> { + let input: Arc = + Arc::new(TestYieldingExec { yield_first: true }); + + check_grouping_sets(input, true).await } #[tokio::test] @@ -1899,7 +1965,10 @@ mod tests { async fn run_first_last_multi_partitions() -> Result<()> { for use_coalesce_batches in [false, true] { for is_first_acc in [false, true] { - first_last_multi_partitions(use_coalesce_batches, is_first_acc).await? + for spill in [false, true] { + first_last_multi_partitions(use_coalesce_batches, is_first_acc, spill) + .await? + } } } Ok(()) @@ -1925,6 +1994,7 @@ mod tests { async fn first_last_multi_partitions( use_coalesce_batches: bool, is_first_acc: bool, + spill: bool, ) -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); @@ -1969,7 +2039,7 @@ mod tests { schema.clone(), None, )?); - let aggregate_exec = Arc::new(AggregateExec::try_new( + let aggregate_exec = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, groups.clone(), aggregates.clone(), @@ -1977,6 +2047,7 @@ mod tests { vec![Some(ordering_req.clone())], memory_exec, schema.clone(), + spill, )?); let coalesce = if use_coalesce_batches { let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); @@ -1985,7 +2056,7 @@ mod tests { Arc::new(CoalescePartitionsExec::new(aggregate_exec)) as Arc }; - let aggregate_final = Arc::new(AggregateExec::try_new( + let aggregate_final = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Final, groups, aggregates.clone(), @@ -1993,6 +2064,7 @@ mod tests { vec![Some(ordering_req)], coalesce, schema, + spill, )?) as Arc; let result = crate::physical_plan::collect(aggregate_final, task_ctx).await?; diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs b/datafusion/core/src/physical_plan/aggregates/order/partial.rs index 019e61ef2688..0feac3a5ed52 100644 --- a/datafusion/core/src/physical_plan/aggregates/order/partial.rs +++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs @@ -241,7 +241,7 @@ impl GroupOrderingPartial { Ok(()) } - /// Return the size of memor allocated by this structure + /// Return the size of memory allocated by this structure pub(crate) fn size(&self) -> usize { std::mem::size_of::() + self.order_indices.allocated_size() diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 4613a2e46443..7ca43b74efad 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -18,7 +18,7 @@ //! Hash aggregation use datafusion_physical_expr::{ - AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, + AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, PhysicalSortExpr, }; use log::debug; use std::sync::Arc; @@ -29,19 +29,28 @@ use futures::ready; use futures::stream::{Stream, StreamExt}; use crate::physical_plan::aggregates::group_values::{new_group_values, GroupValues}; +use crate::physical_plan::aggregates::order::GroupOrderingFull; use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode, PhysicalGroupBy, }; +use crate::physical_plan::common::IPCWriter; use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; -use crate::physical_plan::{aggregates, PhysicalExpr}; +use crate::physical_plan::sorts::sort::{read_spill_as_stream, sort_batch}; +use crate::physical_plan::sorts::streaming_merge; +use crate::physical_plan::stream::RecordBatchStreamAdapter; +use crate::physical_plan::{aggregates, EmptyRecordBatchStream, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use arrow_schema::SortOptions; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::col; +use tempfile::NamedTempFile; #[derive(Debug, Clone)] /// This object tracks the aggregation phase (input/output) @@ -50,6 +59,9 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// When producing output of spilled data, the merged spilled data stream + /// is sliced off as needed in batch_size chunks + ProducingSpillOutput, Done, } @@ -120,6 +132,56 @@ use super::AggregateExec; /// hash table). /// /// [`group_values`]: Self::group_values +/// +/// # Spilling +/// +/// The sizes of group values and accumulators can become large. Before that causes +/// out of memory, this hash aggregator spills those data to local disk using Arrow +/// IPC format. For every input [`RecordBatch`], the memory manager checks whether +/// the new input size meets the memory configuration. If not, spilling happens, and +/// later stream-merge sort the spilled data to read back. As the rows cannot be +/// grouped between spilled data stored on disk, the read back merged data needs to +/// be re-grouped again. +/// +/// ```text +/// Partial Aggregation [batch_size = 2] (max memory = 3 rows) +/// +/// INPUTS PARTIALLY AGGREGATED (UPDATE BATCH) RE-GROUPED (SORTED) +/// ┌─────────┐ ┌─────────────────┐ [Similar to final aggregation merge, +/// │ a │ b │ │ a │ AVG(b) │ but using the partial schema] +/// │---│-----│ │ │[count]│[sum]│ ┌─────────────────┐ +/// │ 3 │ 3.0 │ ─▶ │---│-------│-----│ │ a │ AVG(b) │ +/// │ 2 │ 2.0 │ │ 2 │ 1 │ 2.0 │ ─▶ spill ─┐ │ │[count]│[sum]│ +/// └─────────┘ │ 3 │ 2 │ 7.0 │ │ │---│-------│-----│ +/// ┌─────────┐ ─▶ │ 4 │ 1 │ 8.0 │ ▼ │ 1 │ 1 │ 1.0 │ +/// │ 3 │ 4.0 │ └─────────────────┘ Streaming ─▶ │ 2 │ 1 │ 2.0 │ +/// │ 4 │ 8.0 │ ┌─────────────────┐ merge sort └─────────────────┘ +/// └─────────┘ │ a │ AVG(b) │ ▲ ┌─────────────────┐ +/// ┌─────────┐ │---│-------│-----│ │ │ a │ AVG(b) │ +/// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ memory ─┘ │ 3 │ 3 │ 9.0 │ +/// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ │ 4 │ 1 │ 8.0 │ +/// └─────────┘ └─────────────────┘ └─────────────────┘ +/// +/// +/// Final Aggregation [batch_size = 2] (max memory = 3 rows) +/// +/// PARTIALLY INPUTS FINAL AGGREGATION (MERGE BATCH) RE-GROUPED (SORTED) +/// ┌─────────────────┐ [keep using the partial schema] [Real final aggregation +/// │ a │ AVG(b) │ ┌─────────────────┐ output] +/// │ │[count]│[sum]│ │ a │ AVG(b) │ ┌────────────┐ +/// │---│-------│-----│ ─▶ │ │[count]│[sum]│ │ a │ AVG(b) │ +/// │ 3 │ 3 │ 3.0 │ │---│-------│-----│ ─▶ spill ─┐ │---│--------│ +/// │ 2 │ 2 │ 1.0 │ │ 2 │ 2 │ 1.0 │ │ │ 1 │ 4.0 │ +/// └─────────────────┘ │ 3 │ 4 │ 8.0 │ ▼ │ 2 │ 1.0 │ +/// ┌─────────────────┐ ─▶ │ 4 │ 1 │ 7.0 │ Streaming ─▶ └────────────┘ +/// │ 3 │ 1 │ 5.0 │ └─────────────────┘ merge sort ┌────────────┐ +/// │ 4 │ 1 │ 7.0 │ ┌─────────────────┐ ▲ │ a │ AVG(b) │ +/// └─────────────────┘ │ a │ AVG(b) │ │ │---│--------│ +/// ┌─────────────────┐ │---│-------│-----│ ─▶ memory ─┘ │ 3 │ 2.0 │ +/// │ 1 │ 2 │ 8.0 │ ─▶ │ 1 │ 2 │ 8.0 │ │ 4 │ 7.0 │ +/// │ 2 │ 2 │ 3.0 │ │ 2 │ 2 │ 3.0 │ └────────────┘ +/// └─────────────────┘ └─────────────────┘ +/// ``` pub(crate) struct GroupedHashAggregateStream { schema: SchemaRef, input: SendableRecordBatchStream, @@ -178,6 +240,28 @@ pub(crate) struct GroupedHashAggregateStream { /// Have we seen the end of the input input_done: bool, + + /// The [`RuntimeEnv`] associated with the [`TaskContext`] argument + runtime: Arc, + + /// If data has previously been spilled, the locations of the + /// spill files (in Arrow IPC format) + spills: Vec, + + /// Sorting expression for spilling batches + spill_expr: Vec, + + /// Schema for spilling batches + spill_schema: SchemaRef, + + /// Stream of merged outputs after spilling + merged_stream: SendableRecordBatchStream, + + /// aggregate_arguments for merging spilled data + merging_aggregate_arguments: Vec>>, + + /// Force spilling always for debugging + force_spill: bool, } impl GroupedHashAggregateStream { @@ -207,6 +291,12 @@ impl GroupedHashAggregateStream { &agg.mode, agg_group_by.expr.len(), )?; + // arguments for aggregating spilled data is the same as the one for final aggregation + let merging_aggregate_arguments = aggregates::aggregate_expressions( + &agg.aggr_expr, + &AggregateMode::Final, + agg_group_by.expr.len(), + )?; let filter_expressions = match agg.mode { AggregateMode::Partial @@ -224,6 +314,14 @@ impl GroupedHashAggregateStream { .collect::>()?; let group_schema = group_schema(&agg_schema, agg_group_by.expr.len()); + let spill_expr = group_schema + .fields + .into_iter() + .map(|field| PhysicalSortExpr { + expr: col(field.name(), &group_schema).unwrap(), + options: SortOptions::default(), + }) + .collect(); let name = format!("GroupedHashAggregateStream[{partition}]"); let reservation = MemoryConsumer::new(name).register(context.memory_pool()); @@ -244,7 +342,7 @@ impl GroupedHashAggregateStream { let exec_state = ExecutionState::ReadingInput; Ok(GroupedHashAggregateStream { - schema: agg_schema, + schema: agg_schema.clone(), input, mode: agg.mode, accumulators, @@ -259,6 +357,13 @@ impl GroupedHashAggregateStream { batch_size, group_ordering, input_done: false, + runtime: context.runtime_env(), + spills: vec![], + spill_expr, + spill_schema: agg_schema.clone(), + merged_stream: Box::pin(EmptyRecordBatchStream::new(agg_schema)), + merging_aggregate_arguments, + force_spill: agg.force_spill, }) } } @@ -310,15 +415,18 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); + // Make sure we have enough capacity for `batch`, otherwise spill + extract_ok!(self.spill_previous_if_necessary(&batch)); + // Do the grouping - extract_ok!(self.group_aggregate_batch(batch)); + extract_ok!(self.group_aggregate_batch(batch, false)); // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); if let Some(to_emit) = self.group_ordering.emit_to() { - let batch = extract_ok!(self.emit(to_emit)); + let batch = extract_ok!(self.emit(to_emit, false)); self.exec_state = ExecutionState::ProducingOutput(batch); } timer.done(); @@ -332,8 +440,14 @@ impl Stream for GroupedHashAggregateStream { self.input_done = true; self.group_ordering.input_done(); let timer = elapsed_compute.timer(); - let batch = extract_ok!(self.emit(EmitTo::All)); - self.exec_state = ExecutionState::ProducingOutput(batch); + if self.spills.is_empty() { + let batch = extract_ok!(self.emit(EmitTo::All, false)); + self.exec_state = ExecutionState::ProducingOutput(batch); + } else { + // If spill files exist, stream-merge them. + extract_ok!(self.update_merged_stream()); + self.exec_state = ExecutionState::ProducingSpillOutput; + } timer.done(); } } @@ -360,6 +474,38 @@ impl Stream for GroupedHashAggregateStream { ))); } + ExecutionState::ProducingSpillOutput => { + match ready!(self.merged_stream.poll_next_unpin(cx)) { + Some(batch_result) => { + // Re-group the stream-merged results. + extract_ok!(self.group_aggregate_batch(batch_result?, true)); + // Output first batch_size rows. + if let Some(to_emit) = self.group_ordering.emit_to() { + if let EmitTo::First(ready) = to_emit { + let batch_size = self.batch_size; + if ready >= batch_size { + let batch = + extract_ok!(self + .emit(EmitTo::First(batch_size), false)); + return Poll::Ready(Some(Ok( + batch.record_output(&self.baseline_metrics) + ))); + } + } + } + } + None => { + // The streaming merge is done. Output the remaining. + let batch = extract_ok!(self.emit(EmitTo::All, false)); + self.exec_state = if batch.num_rows() > 0 { + ExecutionState::ProducingOutput(batch) + } else { + ExecutionState::Done + } + } + } + } + ExecutionState::Done => return Poll::Ready(None), } } @@ -374,15 +520,29 @@ impl RecordBatchStream for GroupedHashAggregateStream { impl GroupedHashAggregateStream { /// Perform group-by aggregation for the given [`RecordBatch`]. - fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { + fn group_aggregate_batch(&mut self, batch: RecordBatch, merging: bool) -> Result<()> { // Evaluate the grouping expressions - let group_by_values = evaluate_group_by(&self.group_by, &batch)?; + let group_by_values = if merging { + let group_by = PhysicalGroupBy::new_single(self.group_by.expr.clone()); + evaluate_group_by(&group_by, &batch)? + } else { + evaluate_group_by(&self.group_by, &batch)? + }; // Evaluate the aggregation expressions. - let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let input_values = if merging { + evaluate_many(&self.merging_aggregate_arguments, &batch)? + } else { + evaluate_many(&self.aggregate_arguments, &batch)? + }; // Evaluate the filter expressions, if any, against the inputs - let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + let filter_values = if merging { + let filter_expressions = vec![None; self.accumulators.len()]; + evaluate_optional(&filter_expressions, &batch)? + } else { + evaluate_optional(&self.filter_expressions, &batch)? + }; for group_values in &group_by_values { // calculate the group indices for each input row @@ -416,7 +576,9 @@ impl GroupedHashAggregateStream { match self.mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => { + | AggregateMode::SinglePartitioned + if !merging => + { acc.update_batch( values, group_indices, @@ -424,7 +586,7 @@ impl GroupedHashAggregateStream { total_num_groups, )?; } - AggregateMode::FinalPartitioned | AggregateMode::Final => { + _ => { // if aggregation is over intermediate states, // use merge acc.merge_batch( @@ -452,9 +614,14 @@ impl GroupedHashAggregateStream { /// Create an output RecordBatch with the group keys and /// accumulator states/values specified in emit_to - fn emit(&mut self, emit_to: EmitTo) -> Result { + fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result { + let schema = if spilling { + self.spill_schema.clone() + } else { + self.schema() + }; if self.group_values.is_empty() { - return Ok(RecordBatch::new_empty(self.schema())); + return Ok(RecordBatch::new_empty(schema)); } let mut output = self.group_values.emit(emit_to)?; @@ -466,6 +633,11 @@ impl GroupedHashAggregateStream { for acc in self.accumulators.iter_mut() { match self.mode { AggregateMode::Partial => output.extend(acc.state(emit_to)?), + AggregateMode::Final | AggregateMode::FinalPartitioned if spilling => { + // If spilling, output partial state because the spilled data will be + // merged and re-evaluated later. + output.extend(acc.state(emit_to)?) + } AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Single @@ -474,7 +646,97 @@ impl GroupedHashAggregateStream { } self.update_memory_reservation()?; - let batch = RecordBatch::try_new(self.schema(), output)?; + let batch = RecordBatch::try_new(schema, output)?; Ok(batch) } + + /// Try to reserve the memory capacities to fit the next incoming batch. + /// In case of insufficient memory, spill the data to disk and clear the memory. + /// Currently only [`GroupOrdering::None`] is supported for spilling + fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> Result<()> { + // TODO: support group_ordering for spilling + if self.reservation.size() > 0 + && matches!(self.group_ordering, GroupOrdering::None) + { + let should_spill = self + .accumulators + .iter_mut() + .any(|x| x.try_reserve(&batch).is_err()) + || self.group_values.try_reserve(&batch).is_err() + || self + .current_group_indices + .try_reserve(batch.num_rows()) + .is_err() + || self.update_memory_reservation().is_err() + || self.force_spill; + if should_spill { + // If mode is Final, use input batch (Partial mode) schema for spilling because + // the spilled data will be merged and re-evaluated later. + if let AggregateMode::Final | AggregateMode::FinalPartitioned = self.mode + { + self.spill_schema = batch.schema(); + } + self.spill()?; + self.clear_shrink(batch); + + return self.update_memory_reservation(); + } + } + Ok(()) + } + + /// Emit all rows, sort them, and store them on disk. + fn spill(&mut self) -> Result<()> { + let emit = self.emit(EmitTo::All, true)?; + let sorted = sort_batch(&emit, &self.spill_expr, None)?; + let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?; + let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?; + // TODO: slice large `sorted` and write to multiple files in parallel + writer.write(&sorted)?; + writer.finish()?; + self.spills.push(spillfile); + Ok(()) + } + + /// Clear memory and shirk capacities to the size of the batch. + fn clear_shrink(&mut self, batch: &RecordBatch) { + self.accumulators + .iter_mut() + .for_each(|x| x.clear_shrink(&batch)); + self.group_values.clear_shrink(&batch); + self.current_group_indices.clear(); + self.current_group_indices.shrink_to(batch.num_rows()); + } + + /// At this point, all the inputs are read and there are some spills. + /// Emit the remaining rows and create a batch. + /// Conduct a streaming merge sort between the batch and spilled data. Since the stream is fully + /// sorted, set `self.group_ordering` to Full, then later we can read with [`EmitTo::First`]. + fn update_merged_stream(&mut self) -> Result<()> { + let batch = self.emit(EmitTo::All, true)?; + let mut streams: Vec = vec![]; + let expr = self.spill_expr.clone(); + let schema = batch.schema(); + streams.push(Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(futures::future::lazy(move |_| { + sort_batch(&batch, &expr, None) + })), + ))); + for spill in self.spills.drain(..) { + let stream = read_spill_as_stream(spill, schema.clone())?; + streams.push(stream); + } + self.merged_stream = streaming_merge( + streams, + schema, + &self.spill_expr, + self.baseline_metrics.clone(), + self.batch_size, + None, + self.reservation.new_empty(), + )?; + self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 5d23e72fe7cf..51980a41a22b 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -568,7 +568,7 @@ impl Debug for ExternalSorter { } } -fn sort_batch( +pub(crate) fn sort_batch( batch: &RecordBatch, expressions: &[PhysicalSortExpr], fetch: Option, @@ -602,7 +602,7 @@ async fn spill_sorted_batches( } } -fn read_spill_as_stream( +pub(crate) fn read_spill_as_stream( path: NamedTempFile, schema: SchemaRef, ) -> Result { diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index bdca1f90687d..11c6e1cff121 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -57,12 +57,15 @@ mod tests { let mut handles = Vec::new(); for i in 0..n { let test_idx = i % test_cases.len(); - let group_by_columns = test_cases[test_idx].clone(); - let job = tokio::spawn(run_aggregate_test( - make_staggered_batches::(1000, distinct, i as u64), - group_by_columns, - )); - handles.push(job); + for spill in [false, true] { + let group_by_columns = test_cases[test_idx].clone(); + let job = tokio::spawn(run_aggregate_test( + make_staggered_batches::(1000, distinct, i as u64), + group_by_columns, + spill, + )); + handles.push(job); + } } for job in handles { job.await.unwrap(); @@ -74,7 +77,11 @@ mod tests { /// Perform batch and streaming aggregation with same input /// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream` /// and non-pipeline breaking stream `BoundedAggregateStream` produces same result. -async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str>) { +async fn run_aggregate_test( + input1: Vec, + group_by_columns: Vec<&str>, + spill: bool, +) { let schema = input1[0].schema(); let session_config = SessionConfig::new().with_batch_size(50); let ctx = SessionContext::with_config(session_config); @@ -109,7 +116,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str let group_by = PhysicalGroupBy::new_single(expr); let aggregate_exec_running = Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_for_test( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), @@ -117,12 +124,13 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], running_source, schema.clone(), + spill, ) .unwrap(), ) as Arc; let aggregate_exec_usual = Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_for_test( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), @@ -130,6 +138,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], usual_source, schema.clone(), + spill, ) .unwrap(), ) as Arc; diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index ce8f5874021c..cc40d627d910 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -21,6 +21,7 @@ use arrow::array::{AsArray, PrimitiveBuilder}; use log::debug; use std::any::Any; +use std::collections::TryReserveError; use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; @@ -32,6 +33,7 @@ use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type}; use arrow::{ array::{ArrayRef, UInt64Array}, datatypes::Field, + record_batch::RecordBatch, }; use arrow_array::{ Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray, @@ -518,6 +520,21 @@ where self.counts.capacity() * std::mem::size_of::() + self.sums.capacity() * std::mem::size_of::() } + + fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { + let additional = batch.num_rows(); + self.counts + .try_reserve(additional) + .and(self.sums.try_reserve(additional)) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + self.counts.clear(); + self.counts.shrink_to(count); + self.sums.clear(); + self.sums.shrink_to(count); + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 738ca4e915f7..77ca779d00e6 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -18,6 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; +use std::collections::TryReserveError; use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; @@ -27,7 +28,7 @@ use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::array::{Array, Int64Array}; use arrow::compute; use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::Field}; +use arrow::{array::ArrayRef, datatypes::Field, record_batch::RecordBatch}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::PrimitiveArray; @@ -190,6 +191,15 @@ impl GroupsAccumulator for CountGroupsAccumulator { fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } + + fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { + self.counts.try_reserve(batch.num_rows()) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + self.counts.clear(); + self.counts.shrink_to(batch.num_rows()); + } } /// count null values for multiple columns diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index 7e8930ce2a32..02bb466d44bd 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -165,6 +165,8 @@ struct FirstValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Whether merge_batch() is called before + is_merge_called: bool, } impl FirstValueAccumulator { @@ -183,6 +185,7 @@ impl FirstValueAccumulator { is_set: false, orderings, ordering_req, + is_merge_called: false, }) } @@ -198,7 +201,9 @@ impl Accumulator for FirstValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.first.clone()]; result.extend(self.orderings.iter().cloned()); - result.push(ScalarValue::Boolean(Some(self.is_set))); + if !self.is_merge_called { + result.push(ScalarValue::Boolean(Some(self.is_set))); + } Ok(result) } @@ -213,6 +218,7 @@ impl Accumulator for FirstValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.is_merge_called = true; // FIRST_VALUE(first1, first2, first3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; @@ -384,6 +390,8 @@ struct LastValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Whether merge_batch() is called before + is_merge_called: bool, } impl LastValueAccumulator { @@ -402,6 +410,7 @@ impl LastValueAccumulator { is_set: false, orderings, ordering_req, + is_merge_called: false, }) } @@ -417,7 +426,9 @@ impl Accumulator for LastValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.last.clone()]; result.extend(self.orderings.clone()); - result.push(ScalarValue::Boolean(Some(self.is_set))); + if !self.is_merge_called { + result.push(ScalarValue::Boolean(Some(self.is_set))); + } Ok(result) } @@ -431,6 +442,7 @@ impl Accumulator for LastValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.is_merge_called = true; // LAST_VALUE(last1, last2, last3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs index dcc8c37e7484..175911208e6e 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs @@ -22,12 +22,14 @@ use arrow::{ array::{AsArray, UInt32Builder}, compute, datatypes::UInt32Type, + record_batch::RecordBatch, }; use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use datafusion_common::{ utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue, }; use datafusion_expr::Accumulator; +use std::collections::TryReserveError; /// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] /// @@ -342,6 +344,12 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { fn size(&self) -> usize { self.allocation_bytes } + + fn try_reserve(&mut self, _batch: &RecordBatch) -> Result<(), TryReserveError> { + Ok(()) + } + + fn clear_shrink(&mut self, _batch: &RecordBatch) {} } /// Extension trait for [`Vec`] to account for allocations. diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs index 21b6cc29e83d..edcab40a3d03 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::TryReserveError; use std::sync::Arc; use arrow::array::AsArray; +use arrow::record_batch::RecordBatch; use arrow_array::{ArrayRef, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use datafusion_common::Result; @@ -137,4 +139,16 @@ where // capacity is in bits, so convert to bytes self.values.capacity() / 8 + self.null_state.size() } + + fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { + let additional = batch.num_rows(); + // FIXME: there is no good way to try_reserve for self.values and self.null_state + self.values.reserve(additional); + Ok(()) + } + + fn clear_shrink(&mut self, _batch: &RecordBatch) { + self.values.truncate(0); + self.null_state = NullState::new(); + } } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index d2e64d373be2..070e396f66d0 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -19,7 +19,11 @@ pub(crate) mod accumulate; mod adapter; + pub use adapter::GroupsAccumulatorAdapter; +use arrow::record_batch::RecordBatch; + +use std::collections::TryReserveError; pub(crate) mod bool_op; pub(crate) mod prim_op; @@ -157,4 +161,14 @@ pub trait GroupsAccumulator: Send { /// in bytes. This function is called once per batch, so it should /// be `O(n)` to compute, not `O(num_groups)` fn size(&self) -> usize; + + /// Try to reserve capacity the that at least a single [`RecordBatch`] can be inserted. The + /// accumulator may reserve more space to speculatively avoid frequent re-allocations. After + /// calling try_reserve, capacity will be greater than or equal to self.len() + additional if + /// it returns Ok(()). Does nothing if capacity is already sufficient. This method preserves + /// the contents even if an error occurs. + fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError>; + + /// clear the contents and shrink the capacity + fn clear_shrink(&mut self, batch: &RecordBatch); } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs index adeaea712c68..0afabc24bdc6 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::collections::TryReserveError; use std::sync::Arc; -use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; +use arrow::{array::AsArray, datatypes::ArrowPrimitiveType, record_batch::RecordBatch}; use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; @@ -138,4 +139,17 @@ where fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } + + fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { + let additional = batch.num_rows(); + // FIXME: there is no good way to try_reserve for self.null_state + self.values.try_reserve(additional) + } + + fn clear_shrink(&mut self, batch: &RecordBatch) { + let count = batch.num_rows(); + self.values.clear(); + self.values.shrink_to(count); + self.null_state = NullState::new(); + } } From 5dfa345c98b845088b95379d1b94348698b8f888 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 24 Aug 2023 12:36:08 -0700 Subject: [PATCH 02/16] clippy --- .../core/src/physical_plan/aggregates/mod.rs | 1 + .../src/physical_plan/aggregates/row_hash.rs | 29 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 31f579827f61..2b0750d41753 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -676,6 +676,7 @@ impl AggregateExec { } /// Only for testing. When `force_spill` is true, it spills every batch. + #[allow(clippy::too_many_arguments)] pub fn try_new_for_test( mode: AggregateMode, group_by: PhysicalGroupBy, diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 7ca43b74efad..79c4530664c9 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -480,17 +480,16 @@ impl Stream for GroupedHashAggregateStream { // Re-group the stream-merged results. extract_ok!(self.group_aggregate_batch(batch_result?, true)); // Output first batch_size rows. - if let Some(to_emit) = self.group_ordering.emit_to() { - if let EmitTo::First(ready) = to_emit { - let batch_size = self.batch_size; - if ready >= batch_size { - let batch = - extract_ok!(self - .emit(EmitTo::First(batch_size), false)); - return Poll::Ready(Some(Ok( - batch.record_output(&self.baseline_metrics) - ))); - } + if let Some(EmitTo::First(n)) = self.group_ordering.emit_to() + { + let batch_size = self.batch_size; + if n >= batch_size { + let batch = extract_ok!( + self.emit(EmitTo::First(batch_size), false) + ); + return Poll::Ready(Some(Ok( + batch.record_output(&self.baseline_metrics) + ))); } } } @@ -661,8 +660,8 @@ impl GroupedHashAggregateStream { let should_spill = self .accumulators .iter_mut() - .any(|x| x.try_reserve(&batch).is_err()) - || self.group_values.try_reserve(&batch).is_err() + .any(|x| x.try_reserve(batch).is_err()) + || self.group_values.try_reserve(batch).is_err() || self .current_group_indices .try_reserve(batch.num_rows()) @@ -702,8 +701,8 @@ impl GroupedHashAggregateStream { fn clear_shrink(&mut self, batch: &RecordBatch) { self.accumulators .iter_mut() - .for_each(|x| x.clear_shrink(&batch)); - self.group_values.clear_shrink(&batch); + .for_each(|x| x.clear_shrink(batch)); + self.group_values.clear_shrink(batch); self.current_group_indices.clear(); self.current_group_indices.shrink_to(batch.num_rows()); } From ea38ad5a9786ed4a1fb0597d8dc911b81d1aa215 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 5 Sep 2023 13:56:43 -0700 Subject: [PATCH 03/16] address review comments --- .../aggregates/group_values/mod.rs | 10 -- .../aggregates/group_values/primitive.rs | 18 --- .../aggregates/group_values/row.rs | 15 --- .../src/physical_plan/aggregates/row_hash.rs | 119 +++++++++--------- .../physical-expr/src/aggregate/average.rs | 18 +-- .../physical-expr/src/aggregate/count.rs | 13 +- .../aggregate/groups_accumulator/adapter.rs | 8 -- .../aggregate/groups_accumulator/bool_op.rs | 15 +-- .../src/aggregate/groups_accumulator/mod.rs | 13 -- .../aggregate/groups_accumulator/prim_op.rs | 16 +-- 10 files changed, 66 insertions(+), 179 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index e844eb9bdac5..dcbe83cb849f 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -44,16 +44,6 @@ pub trait GroupValues: Send { /// Emits the group values fn emit(&mut self, emit_to: EmitTo) -> Result>; - /// Try to reserve the capacity that at least a single [`RecordBatch`] can be inserted. The - /// accumulator may reserve more space to speculatively avoid frequent re-allocations. After - /// calling try_reserve, capacity will be greater than or equal to self.len() + additional if - /// it returns Ok(()). Does nothing if capacity is already sufficient. This method preserves - /// the contents even if an error occurs. - fn try_reserve( - &mut self, - batch: &RecordBatch, - ) -> Result<(), hashbrown::TryReserveError>; - /// clear the contents and shrink the capacity fn clear_shrink(&mut self, batch: &RecordBatch); } diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index 883eea38d54d..1f366356f980 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -208,24 +208,6 @@ where Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } - // FIXME: cannot return std::collections::TryReserveError because std::collections::TryReserveErrorKind - // is unstable. For now, use hashbrown::TryReserveError instead. - fn try_reserve( - &mut self, - batch: &RecordBatch, - ) -> Result<(), hashbrown::TryReserveError> { - let additional = batch.num_rows(); - self.values - .try_reserve(additional) - .map_err(|_| hashbrown::TryReserveError::CapacityOverflow) - .and({ - let state = &self.random_state; - self.map.try_reserve(additional, |g| unsafe { - self.values.get_unchecked(*g).hash(state) - }) - }) - } - fn clear_shrink(&mut self, batch: &RecordBatch) { let count = batch.num_rows(); self.values.clear(); diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs index d262f73281e7..62741374d6cd 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs @@ -186,21 +186,6 @@ impl GroupValues for GroupValuesRows { }) } - // FIXME: cannot return std::collections::TryReserveError because std::collections::TryReserveErrorKind - // is unstable. For now, use hashbrown::TryReserveError instead. - fn try_reserve( - &mut self, - batch: &RecordBatch, - ) -> Result<(), hashbrown::TryReserveError> { - let additional = batch.num_rows(); - // FIXME: there is no good way to try_reserve for self.row_converter self.group_values - self.map.try_reserve(additional, |(hash, _)| *hash).and( - self.hashes_buffer - .try_reserve(additional) - .map_err(|_| hashbrown::TryReserveError::CapacityOverflow), - ) - } - fn clear_shrink(&mut self, batch: &RecordBatch) { let count = batch.num_rows(); // FIXME: there is no good way to clear_shrink for self.row_converter self.group_values diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 79c4530664c9..797c91f0f456 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -44,7 +44,7 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use arrow_schema::SortOptions; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; @@ -68,6 +68,31 @@ pub(crate) enum ExecutionState { use super::order::GroupOrdering; use super::AggregateExec; +/// This encapsulates the spilling state +struct SpillState { + /// If data has previously been spilled, the locations of the + /// spill files (in Arrow IPC format) + spills: Vec, + + /// Sorting expression for spilling batches + spill_expr: Vec, + + /// Schema for spilling batches + spill_schema: SchemaRef, + + /// Stream of merged outputs after spilling + merged_stream: SendableRecordBatchStream, + + /// aggregate_arguments for merging spilled data + merging_aggregate_arguments: Vec>>, + + /// GROUP BY expressions for merging spilled data + merging_group_by: PhysicalGroupBy, + + /// Force spilling always for debugging + force_spill: bool, +} + /// Hash based Grouping Aggregator /// /// # Design Goals @@ -244,24 +269,8 @@ pub(crate) struct GroupedHashAggregateStream { /// The [`RuntimeEnv`] associated with the [`TaskContext`] argument runtime: Arc, - /// If data has previously been spilled, the locations of the - /// spill files (in Arrow IPC format) - spills: Vec, - - /// Sorting expression for spilling batches - spill_expr: Vec, - - /// Schema for spilling batches - spill_schema: SchemaRef, - - /// Stream of merged outputs after spilling - merged_stream: SendableRecordBatchStream, - - /// aggregate_arguments for merging spilled data - merging_aggregate_arguments: Vec>>, - - /// Force spilling always for debugging - force_spill: bool, + /// The spill state object + spill_state: SpillState, } impl GroupedHashAggregateStream { @@ -341,8 +350,18 @@ impl GroupedHashAggregateStream { let exec_state = ExecutionState::ReadingInput; + let spill_state = SpillState { + spills: vec![], + spill_expr, + spill_schema: agg_schema.clone(), + merged_stream: Box::pin(EmptyRecordBatchStream::new(agg_schema.clone())), + merging_aggregate_arguments, + merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), + force_spill: agg.force_spill, + }; + Ok(GroupedHashAggregateStream { - schema: agg_schema.clone(), + schema: agg_schema, input, mode: agg.mode, accumulators, @@ -358,12 +377,7 @@ impl GroupedHashAggregateStream { group_ordering, input_done: false, runtime: context.runtime_env(), - spills: vec![], - spill_expr, - spill_schema: agg_schema.clone(), - merged_stream: Box::pin(EmptyRecordBatchStream::new(agg_schema)), - merging_aggregate_arguments, - force_spill: agg.force_spill, + spill_state, }) } } @@ -440,7 +454,7 @@ impl Stream for GroupedHashAggregateStream { self.input_done = true; self.group_ordering.input_done(); let timer = elapsed_compute.timer(); - if self.spills.is_empty() { + if self.spill_state.spills.is_empty() { let batch = extract_ok!(self.emit(EmitTo::All, false)); self.exec_state = ExecutionState::ProducingOutput(batch); } else { @@ -475,7 +489,7 @@ impl Stream for GroupedHashAggregateStream { } ExecutionState::ProducingSpillOutput => { - match ready!(self.merged_stream.poll_next_unpin(cx)) { + match ready!(self.spill_state.merged_stream.poll_next_unpin(cx)) { Some(batch_result) => { // Re-group the stream-merged results. extract_ok!(self.group_aggregate_batch(batch_result?, true)); @@ -522,15 +536,14 @@ impl GroupedHashAggregateStream { fn group_aggregate_batch(&mut self, batch: RecordBatch, merging: bool) -> Result<()> { // Evaluate the grouping expressions let group_by_values = if merging { - let group_by = PhysicalGroupBy::new_single(self.group_by.expr.clone()); - evaluate_group_by(&group_by, &batch)? + evaluate_group_by(&self.spill_state.merging_group_by, &batch)? } else { evaluate_group_by(&self.group_by, &batch)? }; // Evaluate the aggregation expressions. let input_values = if merging { - evaluate_many(&self.merging_aggregate_arguments, &batch)? + evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)? } else { evaluate_many(&self.aggregate_arguments, &batch)? }; @@ -599,7 +612,11 @@ impl GroupedHashAggregateStream { } } - self.update_memory_reservation() + match self.update_memory_reservation() { + // Here we can ignore `insufficient_capacity_err` because we will spill later + Err(DataFusionError::ResourcesExhausted(_)) => Ok(()), + other => other, + } } fn update_memory_reservation(&mut self) -> Result<()> { @@ -615,7 +632,7 @@ impl GroupedHashAggregateStream { /// accumulator states/values specified in emit_to fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result { let schema = if spilling { - self.spill_schema.clone() + self.spill_state.spill_schema.clone() } else { self.schema() }; @@ -649,31 +666,20 @@ impl GroupedHashAggregateStream { Ok(batch) } - /// Try to reserve the memory capacities to fit the next incoming batch. - /// In case of insufficient memory, spill the data to disk and clear the memory. - /// Currently only [`GroupOrdering::None`] is supported for spilling + /// Optimistically, [`Self::group_aggregate_batch`] allows to exceed the memory target slightly + /// (~ 1 [`RecordBatch`]) for simplicity. In such cases, spill the data to disk and clear the + /// memory. Currently only [`GroupOrdering::None`] is supported for spilling. fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> Result<()> { // TODO: support group_ordering for spilling if self.reservation.size() > 0 && matches!(self.group_ordering, GroupOrdering::None) { - let should_spill = self - .accumulators - .iter_mut() - .any(|x| x.try_reserve(batch).is_err()) - || self.group_values.try_reserve(batch).is_err() - || self - .current_group_indices - .try_reserve(batch.num_rows()) - .is_err() - || self.update_memory_reservation().is_err() - || self.force_spill; - if should_spill { + if self.update_memory_reservation().is_err() || self.spill_state.force_spill { // If mode is Final, use input batch (Partial mode) schema for spilling because // the spilled data will be merged and re-evaluated later. if let AggregateMode::Final | AggregateMode::FinalPartitioned = self.mode { - self.spill_schema = batch.schema(); + self.spill_state.spill_schema = batch.schema(); } self.spill()?; self.clear_shrink(batch); @@ -687,21 +693,18 @@ impl GroupedHashAggregateStream { /// Emit all rows, sort them, and store them on disk. fn spill(&mut self) -> Result<()> { let emit = self.emit(EmitTo::All, true)?; - let sorted = sort_batch(&emit, &self.spill_expr, None)?; + let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?; let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?; let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?; // TODO: slice large `sorted` and write to multiple files in parallel writer.write(&sorted)?; writer.finish()?; - self.spills.push(spillfile); + self.spill_state.spills.push(spillfile); Ok(()) } /// Clear memory and shirk capacities to the size of the batch. fn clear_shrink(&mut self, batch: &RecordBatch) { - self.accumulators - .iter_mut() - .for_each(|x| x.clear_shrink(batch)); self.group_values.clear_shrink(batch); self.current_group_indices.clear(); self.current_group_indices.shrink_to(batch.num_rows()); @@ -714,7 +717,7 @@ impl GroupedHashAggregateStream { fn update_merged_stream(&mut self) -> Result<()> { let batch = self.emit(EmitTo::All, true)?; let mut streams: Vec = vec![]; - let expr = self.spill_expr.clone(); + let expr = self.spill_state.spill_expr.clone(); let schema = batch.schema(); streams.push(Box::pin(RecordBatchStreamAdapter::new( schema.clone(), @@ -722,14 +725,14 @@ impl GroupedHashAggregateStream { sort_batch(&batch, &expr, None) })), ))); - for spill in self.spills.drain(..) { + for spill in self.spill_state.spills.drain(..) { let stream = read_spill_as_stream(spill, schema.clone())?; streams.push(stream); } - self.merged_stream = streaming_merge( + self.spill_state.merged_stream = streaming_merge( streams, schema, - &self.spill_expr, + &self.spill_state.spill_expr, self.baseline_metrics.clone(), self.batch_size, None, diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 2d6f75098e84..167db3dcc7e4 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -21,7 +21,7 @@ use arrow::array::{AsArray, PrimitiveBuilder}; use log::debug; use std::any::Any; -use std::collections::TryReserveError; + use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; @@ -33,7 +33,6 @@ use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type}; use arrow::{ array::{ArrayRef, UInt64Array}, datatypes::Field, - record_batch::RecordBatch, }; use arrow_array::{ Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray, @@ -520,21 +519,6 @@ where self.counts.capacity() * std::mem::size_of::() + self.sums.capacity() * std::mem::size_of::() } - - fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { - let additional = batch.num_rows(); - self.counts - .try_reserve(additional) - .and(self.sums.try_reserve(additional)) - } - - fn clear_shrink(&mut self, batch: &RecordBatch) { - let count = batch.num_rows(); - self.counts.clear(); - self.counts.shrink_to(count); - self.sums.clear(); - self.sums.shrink_to(count); - } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 77ca779d00e6..64b94b2f4fb0 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; -use std::collections::TryReserveError; + use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; @@ -28,7 +28,7 @@ use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; use arrow::array::{Array, Int64Array}; use arrow::compute; use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::Field, record_batch::RecordBatch}; +use arrow::{array::ArrayRef, datatypes::Field}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::PrimitiveArray; @@ -191,15 +191,6 @@ impl GroupsAccumulator for CountGroupsAccumulator { fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } - - fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { - self.counts.try_reserve(batch.num_rows()) - } - - fn clear_shrink(&mut self, batch: &RecordBatch) { - self.counts.clear(); - self.counts.shrink_to(batch.num_rows()); - } } /// count null values for multiple columns diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs index 175911208e6e..dcc8c37e7484 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs @@ -22,14 +22,12 @@ use arrow::{ array::{AsArray, UInt32Builder}, compute, datatypes::UInt32Type, - record_batch::RecordBatch, }; use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use datafusion_common::{ utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue, }; use datafusion_expr::Accumulator; -use std::collections::TryReserveError; /// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] /// @@ -344,12 +342,6 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { fn size(&self) -> usize { self.allocation_bytes } - - fn try_reserve(&mut self, _batch: &RecordBatch) -> Result<(), TryReserveError> { - Ok(()) - } - - fn clear_shrink(&mut self, _batch: &RecordBatch) {} } /// Extension trait for [`Vec`] to account for allocations. diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs index edcab40a3d03..c62041088116 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::collections::TryReserveError; use std::sync::Arc; use arrow::array::AsArray; -use arrow::record_batch::RecordBatch; + use arrow_array::{ArrayRef, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use datafusion_common::Result; @@ -139,16 +138,4 @@ where // capacity is in bits, so convert to bytes self.values.capacity() / 8 + self.null_state.size() } - - fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { - let additional = batch.num_rows(); - // FIXME: there is no good way to try_reserve for self.values and self.null_state - self.values.reserve(additional); - Ok(()) - } - - fn clear_shrink(&mut self, _batch: &RecordBatch) { - self.values.truncate(0); - self.null_state = NullState::new(); - } } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 070e396f66d0..2bc0df617e56 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -21,9 +21,6 @@ pub(crate) mod accumulate; mod adapter; pub use adapter::GroupsAccumulatorAdapter; -use arrow::record_batch::RecordBatch; - -use std::collections::TryReserveError; pub(crate) mod bool_op; pub(crate) mod prim_op; @@ -161,14 +158,4 @@ pub trait GroupsAccumulator: Send { /// in bytes. This function is called once per batch, so it should /// be `O(n)` to compute, not `O(num_groups)` fn size(&self) -> usize; - - /// Try to reserve capacity the that at least a single [`RecordBatch`] can be inserted. The - /// accumulator may reserve more space to speculatively avoid frequent re-allocations. After - /// calling try_reserve, capacity will be greater than or equal to self.len() + additional if - /// it returns Ok(()). Does nothing if capacity is already sufficient. This method preserves - /// the contents even if an error occurs. - fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError>; - - /// clear the contents and shrink the capacity - fn clear_shrink(&mut self, batch: &RecordBatch); } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs index 0afabc24bdc6..adeaea712c68 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::collections::TryReserveError; use std::sync::Arc; -use arrow::{array::AsArray, datatypes::ArrowPrimitiveType, record_batch::RecordBatch}; +use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; @@ -139,17 +138,4 @@ where fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } - - fn try_reserve(&mut self, batch: &RecordBatch) -> Result<(), TryReserveError> { - let additional = batch.num_rows(); - // FIXME: there is no good way to try_reserve for self.null_state - self.values.try_reserve(additional) - } - - fn clear_shrink(&mut self, batch: &RecordBatch) { - let count = batch.num_rows(); - self.values.clear(); - self.values.shrink_to(count); - self.null_state = NullState::new(); - } } From 0863e810681011a9a56248cfa32b54cdf3c235cb Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 5 Sep 2023 18:18:27 -0700 Subject: [PATCH 04/16] address review comments --- .../core/src/physical_plan/aggregates/mod.rs | 41 ++++++--- .../src/physical_plan/aggregates/row_hash.rs | 86 +++++++++++-------- .../physical-expr/src/aggregate/average.rs | 1 - .../physical-expr/src/aggregate/count.rs | 1 - .../physical-expr/src/aggregate/first_last.rs | 16 +--- .../aggregate/groups_accumulator/bool_op.rs | 1 - .../src/aggregate/groups_accumulator/mod.rs | 1 - 7 files changed, 85 insertions(+), 62 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 2b0750d41753..a5a035f653cb 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1569,15 +1569,30 @@ mod tests { let result = common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?; - let expected = [ - "+---+---------------+-------------+", - "| a | AVG(b)[count] | AVG(b)[sum] |", - "+---+---------------+-------------+", - "| 2 | 2 | 2.0 |", - "| 3 | 3 | 7.0 |", - "| 4 | 3 | 11.0 |", - "+---+---------------+-------------+", - ]; + let expected = if spill { + vec![ + "+---+---------------+-------------+", + "| a | AVG(b)[count] | AVG(b)[sum] |", + "+---+---------------+-------------+", + "| 2 | 1 | 1.0 |", + "| 2 | 1 | 1.0 |", + "| 3 | 1 | 2.0 |", + "| 3 | 2 | 5.0 |", + "| 4 | 1 | 4.0 |", + "| 4 | 2 | 7.0 |", + "+---+---------------+-------------+", + ] + } else { + vec![ + "+---+---------------+-------------+", + "| a | AVG(b)[count] | AVG(b)[sum] |", + "+---+---------------+-------------+", + "| 2 | 2 | 2.0 |", + "| 3 | 3 | 7.0 |", + "| 4 | 3 | 11.0 |", + "+---+---------------+-------------+", + ] + }; assert_batches_sorted_eq!(expected, &result); let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); @@ -1621,7 +1636,13 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); - assert_eq!(3, output_rows); + if spill { + // When spilling, the output rows metrics become partial output size + final output size + // This is due to the AtomicUsize behavior + assert_eq!(9, output_rows); + } else { + assert_eq!(3, output_rows); + } Ok(()) } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 797c91f0f456..f1403a1b4c17 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -160,32 +160,32 @@ struct SpillState { /// /// # Spilling /// -/// The sizes of group values and accumulators can become large. Before that causes -/// out of memory, this hash aggregator spills those data to local disk using Arrow -/// IPC format. For every input [`RecordBatch`], the memory manager checks whether -/// the new input size meets the memory configuration. If not, spilling happens, and -/// later stream-merge sort the spilled data to read back. As the rows cannot be -/// grouped between spilled data stored on disk, the read back merged data needs to -/// be re-grouped again. +/// The sizes of group values and accumulators can become large. Before that causes out of memory, +/// this hash aggregator outputs those data early for partial aggregation or spills to local disk +/// using Arrow IPC format for final aggregation. For every input [`RecordBatch`], the memory +/// manager checks whether the new input size meets the memory configuration. If not, outputting or +/// spilling happens. For outputting, the final aggregation takes care of re-grouping. For spilling, +/// later stream-merge sort on reading back the spilled data does re-grouping. Note the rows cannot +/// be grouped once spilled onto disk, the read back data needs to be re-grouped again. /// /// ```text /// Partial Aggregation [batch_size = 2] (max memory = 3 rows) /// -/// INPUTS PARTIALLY AGGREGATED (UPDATE BATCH) RE-GROUPED (SORTED) -/// ┌─────────┐ ┌─────────────────┐ [Similar to final aggregation merge, -/// │ a │ b │ │ a │ AVG(b) │ but using the partial schema] -/// │---│-----│ │ │[count]│[sum]│ ┌─────────────────┐ -/// │ 3 │ 3.0 │ ─▶ │---│-------│-----│ │ a │ AVG(b) │ -/// │ 2 │ 2.0 │ │ 2 │ 1 │ 2.0 │ ─▶ spill ─┐ │ │[count]│[sum]│ -/// └─────────┘ │ 3 │ 2 │ 7.0 │ │ │---│-------│-----│ -/// ┌─────────┐ ─▶ │ 4 │ 1 │ 8.0 │ ▼ │ 1 │ 1 │ 1.0 │ -/// │ 3 │ 4.0 │ └─────────────────┘ Streaming ─▶ │ 2 │ 1 │ 2.0 │ -/// │ 4 │ 8.0 │ ┌─────────────────┐ merge sort └─────────────────┘ -/// └─────────┘ │ a │ AVG(b) │ ▲ ┌─────────────────┐ -/// ┌─────────┐ │---│-------│-----│ │ │ a │ AVG(b) │ -/// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ memory ─┘ │ 3 │ 3 │ 9.0 │ -/// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ │ 4 │ 1 │ 8.0 │ -/// └─────────┘ └─────────────────┘ └─────────────────┘ +/// INPUTS PARTIALLY AGGREGATED (UPDATE BATCH) +/// ┌─────────┐ ┌─────────────────┐ +/// │ a │ b │ │ a │ AVG(b) │ +/// │---│-----│ │ │[count]│[sum]│ +/// │ 3 │ 3.0 │ ─▶ │---│-------│-----│ +/// │ 2 │ 2.0 │ │ 2 │ 1 │ 2.0 │ ─▶ output +/// └─────────┘ │ 3 │ 2 │ 7.0 │ +/// ┌─────────┐ ─▶ │ 4 │ 1 │ 8.0 │ +/// │ 3 │ 4.0 │ └─────────────────┘ +/// │ 4 │ 8.0 │ ┌─────────────────┐ +/// └─────────┘ │ a │ AVG(b) │ +/// ┌─────────┐ │---│-------│-----│ +/// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ output +/// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ +/// └─────────┘ └─────────────────┘ /// /// /// Final Aggregation [batch_size = 2] (max memory = 3 rows) @@ -443,6 +443,9 @@ impl Stream for GroupedHashAggregateStream { let batch = extract_ok!(self.emit(to_emit, false)); self.exec_state = ExecutionState::ProducingOutput(batch); } + + extract_ok!(self.emit_early_if_necessary()); + timer.done(); } Some(Err(e)) => { @@ -649,7 +652,7 @@ impl GroupedHashAggregateStream { for acc in self.accumulators.iter_mut() { match self.mode { AggregateMode::Partial => output.extend(acc.state(emit_to)?), - AggregateMode::Final | AggregateMode::FinalPartitioned if spilling => { + _ if spilling => { // If spilling, output partial state because the spilled data will be // merged and re-evaluated later. output.extend(acc.state(emit_to)?) @@ -672,20 +675,18 @@ impl GroupedHashAggregateStream { fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> Result<()> { // TODO: support group_ordering for spilling if self.reservation.size() > 0 + && batch.num_rows() > 0 && matches!(self.group_ordering, GroupOrdering::None) + && !matches!(self.mode, AggregateMode::Partial) + && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) { - if self.update_memory_reservation().is_err() || self.spill_state.force_spill { - // If mode is Final, use input batch (Partial mode) schema for spilling because - // the spilled data will be merged and re-evaluated later. - if let AggregateMode::Final | AggregateMode::FinalPartitioned = self.mode - { - self.spill_state.spill_schema = batch.schema(); - } - self.spill()?; - self.clear_shrink(batch); + // Use input batch (Partial mode) schema for spilling because + // the spilled data will be merged and re-evaluated later. + self.spill_state.spill_schema = batch.schema(); + self.spill()?; + self.clear_shrink(batch); - return self.update_memory_reservation(); - } + return self.update_memory_reservation(); } Ok(()) } @@ -710,6 +711,23 @@ impl GroupedHashAggregateStream { self.current_group_indices.shrink_to(batch.num_rows()); } + /// Emit if the used memory exceeds the target for partial aggregation. + /// Currently only [`GroupOrdering::None`] is supported for spilling. + /// TODO: support group_ordering for spilling + fn emit_early_if_necessary(&mut self) -> Result<()> { + if self.reservation.size() > 0 + && matches!(self.group_ordering, GroupOrdering::None) + && matches!(self.mode, AggregateMode::Partial) + && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) + { + let batch = self.emit(EmitTo::All, false)?; + if batch.num_rows() > 0 { + self.exec_state = ExecutionState::ProducingOutput(batch); + } + } + Ok(()) + } + /// At this point, all the inputs are read and there are some spills. /// Emit the remaining rows and create a batch. /// Conduct a streaming merge sort between the batch and spilled data. Since the stream is fully diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 167db3dcc7e4..ccadb2c9b826 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -21,7 +21,6 @@ use arrow::array::{AsArray, PrimitiveBuilder}; use log::debug; use std::any::Any; - use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 64b94b2f4fb0..738ca4e915f7 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -18,7 +18,6 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; - use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index 02bb466d44bd..7e8930ce2a32 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -165,8 +165,6 @@ struct FirstValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, - // Whether merge_batch() is called before - is_merge_called: bool, } impl FirstValueAccumulator { @@ -185,7 +183,6 @@ impl FirstValueAccumulator { is_set: false, orderings, ordering_req, - is_merge_called: false, }) } @@ -201,9 +198,7 @@ impl Accumulator for FirstValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.first.clone()]; result.extend(self.orderings.iter().cloned()); - if !self.is_merge_called { - result.push(ScalarValue::Boolean(Some(self.is_set))); - } + result.push(ScalarValue::Boolean(Some(self.is_set))); Ok(result) } @@ -218,7 +213,6 @@ impl Accumulator for FirstValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.is_merge_called = true; // FIRST_VALUE(first1, first2, first3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; @@ -390,8 +384,6 @@ struct LastValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, - // Whether merge_batch() is called before - is_merge_called: bool, } impl LastValueAccumulator { @@ -410,7 +402,6 @@ impl LastValueAccumulator { is_set: false, orderings, ordering_req, - is_merge_called: false, }) } @@ -426,9 +417,7 @@ impl Accumulator for LastValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.last.clone()]; result.extend(self.orderings.clone()); - if !self.is_merge_called { - result.push(ScalarValue::Boolean(Some(self.is_set))); - } + result.push(ScalarValue::Boolean(Some(self.is_set))); Ok(result) } @@ -442,7 +431,6 @@ impl Accumulator for LastValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.is_merge_called = true; // LAST_VALUE(last1, last2, last3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs index c62041088116..21b6cc29e83d 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use arrow::array::AsArray; - use arrow_array::{ArrayRef, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use datafusion_common::Result; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 2bc0df617e56..d2e64d373be2 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -19,7 +19,6 @@ pub(crate) mod accumulate; mod adapter; - pub use adapter::GroupsAccumulatorAdapter; pub(crate) mod bool_op; From 9068e5fb81cf34b14c5d849f36b3def383deede6 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 7 Sep 2023 13:00:14 -0700 Subject: [PATCH 05/16] address review comments --- .../aggregates/group_values/row.rs | 20 ++-- .../core/src/physical_plan/aggregates/mod.rs | 97 ++++++++++++++----- .../src/physical_plan/aggregates/row_hash.rs | 46 +++++---- .../physical-expr/src/aggregate/first_last.rs | 16 ++- 4 files changed, 119 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs index 62741374d6cd..30e69d94fc67 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs @@ -60,19 +60,17 @@ pub struct GroupValuesRows { /// Random state for creating hashes random_state: RandomState, - - /// Schema fields for the row converter - fields: Vec, } impl GroupValuesRows { pub fn try_new(schema: SchemaRef) -> Result { - let fields: Vec = schema - .fields() - .iter() - .map(|f| SortField::new(f.data_type().clone())) - .collect(); - let row_converter = RowConverter::new(fields.clone())?; + let row_converter = RowConverter::new( + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; let map = RawTable::with_capacity(0); let group_values = row_converter.empty_rows(0, 0); @@ -84,7 +82,6 @@ impl GroupValuesRows { group_values, hashes_buffer: Default::default(), random_state: Default::default(), - fields, }) } } @@ -188,8 +185,7 @@ impl GroupValues for GroupValuesRows { fn clear_shrink(&mut self, batch: &RecordBatch) { let count = batch.num_rows(); - // FIXME: there is no good way to clear_shrink for self.row_converter self.group_values - self.row_converter = RowConverter::new(self.fields.clone()).unwrap(); + // FIXME: there is no good way to clear_shrink for self.group_values self.group_values = self.row_converter.empty_rows(count, 0); self.map.clear(); self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index a5a035f653cb..50a01d920fc0 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1247,6 +1247,7 @@ mod tests { use std::sync::Arc; use std::task::{Context, Poll}; + use datafusion_execution::config::SessionConfig; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) @@ -1445,7 +1446,10 @@ mod tests { DataType::Int64, ))]; - let task_ctx = Arc::new(TaskContext::default()); + let session_config = SessionConfig::new().with_batch_size(4); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); + // let task_ctx = Arc::new(TaskContext::default()); let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, @@ -1461,24 +1465,53 @@ mod tests { let result = common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?; - let expected = vec![ - "+---+-----+-----------------+", - "| a | b | COUNT(1)[count] |", - "+---+-----+-----------------+", - "| | 1.0 | 2 |", - "| | 2.0 | 2 |", - "| | 3.0 | 2 |", - "| | 4.0 | 2 |", - "| 2 | | 2 |", - "| 2 | 1.0 | 2 |", - "| 3 | | 3 |", - "| 3 | 2.0 | 2 |", - "| 3 | 3.0 | 1 |", - "| 4 | | 3 |", - "| 4 | 3.0 | 1 |", - "| 4 | 4.0 | 2 |", - "+---+-----+-----------------+", - ]; + let expected = if spill { + vec![ + "+---+-----+-----------------+", + "| a | b | COUNT(1)[count] |", + "+---+-----+-----------------+", + "| | 1.0 | 1 |", + "| | 1.0 | 1 |", + "| | 2.0 | 1 |", + "| | 2.0 | 1 |", + "| | 3.0 | 1 |", + "| | 3.0 | 1 |", + "| | 4.0 | 1 |", + "| | 4.0 | 1 |", + "| 2 | | 1 |", + "| 2 | | 1 |", + "| 2 | 1.0 | 1 |", + "| 2 | 1.0 | 1 |", + "| 3 | | 1 |", + "| 3 | | 2 |", + "| 3 | 2.0 | 2 |", + "| 3 | 3.0 | 1 |", + "| 4 | | 1 |", + "| 4 | | 2 |", + "| 4 | 3.0 | 1 |", + "| 4 | 4.0 | 2 |", + "+---+-----+-----------------+", + ] + } else { + vec![ + "+---+-----+-----------------+", + "| a | b | COUNT(1)[count] |", + "+---+-----+-----------------+", + "| | 1.0 | 2 |", + "| | 2.0 | 2 |", + "| | 3.0 | 2 |", + "| | 4.0 | 2 |", + "| 2 | | 2 |", + "| 2 | 1.0 | 2 |", + "| 3 | | 3 |", + "| 3 | 2.0 | 2 |", + "| 3 | 3.0 | 1 |", + "| 4 | | 3 |", + "| 4 | 3.0 | 1 |", + "| 4 | 4.0 | 2 |", + "+---+-----+-----------------+", + ] + }; assert_batches_sorted_eq!(expected, &result); let groups = partial_aggregate.group_expr().expr().to_vec(); @@ -1507,7 +1540,11 @@ mod tests { common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 3); - assert_eq!(batch.num_rows(), 12); + if spill { + assert_eq!(batch.num_rows(), 16); + } else { + assert_eq!(batch.num_rows(), 12); + } let expected = vec![ "+---+-----+----------+", @@ -1532,7 +1569,13 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); - assert_eq!(12, output_rows); + if spill { + // When spilling, the output rows metrics includes numbers from partial early emitting. + // This is due to the AtomicUsize behavior + assert_eq!(26, output_rows); + } else { + assert_eq!(12, output_rows); + } Ok(()) } @@ -1553,7 +1596,10 @@ mod tests { DataType::Float64, ))]; - let task_ctx = Arc::new(TaskContext::default()); + let session_config = SessionConfig::new().with_batch_size(2); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); + // let task_ctx = Arc::new(TaskContext::default()); let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, @@ -1578,8 +1624,7 @@ mod tests { "| 2 | 1 | 1.0 |", "| 3 | 1 | 2.0 |", "| 3 | 2 | 5.0 |", - "| 4 | 1 | 4.0 |", - "| 4 | 2 | 7.0 |", + "| 4 | 3 | 11.0 |", "+---+---------------+-------------+", ] } else { @@ -1637,9 +1682,9 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); if spill { - // When spilling, the output rows metrics become partial output size + final output size + // When spilling, the output rows metrics includes numbers from partial early emitting. // This is due to the AtomicUsize behavior - assert_eq!(9, output_rows); + assert_eq!(8, output_rows); } else { assert_eq!(3, output_rows); } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index f1403a1b4c17..3d71e1b319f0 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -171,21 +171,21 @@ struct SpillState { /// ```text /// Partial Aggregation [batch_size = 2] (max memory = 3 rows) /// -/// INPUTS PARTIALLY AGGREGATED (UPDATE BATCH) -/// ┌─────────┐ ┌─────────────────┐ -/// │ a │ b │ │ a │ AVG(b) │ -/// │---│-----│ │ │[count]│[sum]│ -/// │ 3 │ 3.0 │ ─▶ │---│-------│-----│ -/// │ 2 │ 2.0 │ │ 2 │ 1 │ 2.0 │ ─▶ output -/// └─────────┘ │ 3 │ 2 │ 7.0 │ -/// ┌─────────┐ ─▶ │ 4 │ 1 │ 8.0 │ -/// │ 3 │ 4.0 │ └─────────────────┘ -/// │ 4 │ 8.0 │ ┌─────────────────┐ -/// └─────────┘ │ a │ AVG(b) │ -/// ┌─────────┐ │---│-------│-----│ -/// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ output -/// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ -/// └─────────┘ └─────────────────┘ +/// INPUTS PARTIALLY AGGREGATED (UPDATE BATCH) OUTPUTS +/// ┌─────────┐ ┌─────────────────┐ ┌─────────────────┐ +/// │ a │ b │ │ a │ AVG(b) │ │ a │ AVG(b) │ +/// │---│-----│ │ │[count]│[sum]│ │ │[count]│[sum]│ +/// │ 3 │ 3.0 │ ─▶ │---│-------│-----│ │---│-------│-----│ +/// │ 2 │ 2.0 │ │ 2 │ 1 │ 2.0 │ ─▶ early emit ─▶ │ 2 │ 1 │ 2.0 │ +/// └─────────┘ │ 3 │ 2 │ 7.0 │ │ │ 3 │ 2 │ 7.0 │ +/// ┌─────────┐ ─▶ │ 4 │ 1 │ 8.0 │ │ └─────────────────┘ +/// │ 3 │ 4.0 │ └─────────────────┘ └▶ ┌─────────────────┐ +/// │ 4 │ 8.0 │ ┌─────────────────┐ │ 4 │ 1 │ 8.0 │ +/// └─────────┘ │ a │ AVG(b) │ ┌▶ │ 1 │ 1 │ 1.0 │ +/// ┌─────────┐ │---│-------│-----│ │ └─────────────────┘ +/// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ early emit ─▶ ┌─────────────────┐ +/// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ │ 3 │ 1 │ 2.0 │ +/// └─────────┘ └─────────────────┘ └─────────────────┘ /// /// /// Final Aggregation [batch_size = 2] (max memory = 3 rows) @@ -199,7 +199,7 @@ struct SpillState { /// │ 2 │ 2 │ 1.0 │ │ 2 │ 2 │ 1.0 │ │ │ 1 │ 4.0 │ /// └─────────────────┘ │ 3 │ 4 │ 8.0 │ ▼ │ 2 │ 1.0 │ /// ┌─────────────────┐ ─▶ │ 4 │ 1 │ 7.0 │ Streaming ─▶ └────────────┘ -/// │ 3 │ 1 │ 5.0 │ └─────────────────┘ merge sort ┌────────────┐ +/// │ 3 │ 1 │ 5.0 │ └─────────────────┘ merge sort ─▶ ┌────────────┐ /// │ 4 │ 1 │ 7.0 │ ┌─────────────────┐ ▲ │ a │ AVG(b) │ /// └─────────────────┘ │ a │ AVG(b) │ │ │---│--------│ /// ┌─────────────────┐ │---│-------│-----│ ─▶ memory ─┘ │ 3 │ 2.0 │ @@ -719,11 +719,17 @@ impl GroupedHashAggregateStream { && matches!(self.group_ordering, GroupOrdering::None) && matches!(self.mode, AggregateMode::Partial) && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) + && self.group_values.len() >= self.batch_size { - let batch = self.emit(EmitTo::All, false)?; - if batch.num_rows() > 0 { - self.exec_state = ExecutionState::ProducingOutput(batch); - } + let n = self.group_values.len() / self.batch_size * self.batch_size; + println!( + "{}, {}, {}", + self.group_values.len(), + n, + self.current_group_indices.len() + ); + let batch = self.emit(EmitTo::First(n), false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); } Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index 7e8930ce2a32..02bb466d44bd 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -165,6 +165,8 @@ struct FirstValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Whether merge_batch() is called before + is_merge_called: bool, } impl FirstValueAccumulator { @@ -183,6 +185,7 @@ impl FirstValueAccumulator { is_set: false, orderings, ordering_req, + is_merge_called: false, }) } @@ -198,7 +201,9 @@ impl Accumulator for FirstValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.first.clone()]; result.extend(self.orderings.iter().cloned()); - result.push(ScalarValue::Boolean(Some(self.is_set))); + if !self.is_merge_called { + result.push(ScalarValue::Boolean(Some(self.is_set))); + } Ok(result) } @@ -213,6 +218,7 @@ impl Accumulator for FirstValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.is_merge_called = true; // FIRST_VALUE(first1, first2, first3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; @@ -384,6 +390,8 @@ struct LastValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Whether merge_batch() is called before + is_merge_called: bool, } impl LastValueAccumulator { @@ -402,6 +410,7 @@ impl LastValueAccumulator { is_set: false, orderings, ordering_req, + is_merge_called: false, }) } @@ -417,7 +426,9 @@ impl Accumulator for LastValueAccumulator { fn state(&self) -> Result> { let mut result = vec![self.last.clone()]; result.extend(self.orderings.clone()); - result.push(ScalarValue::Boolean(Some(self.is_set))); + if !self.is_merge_called { + result.push(ScalarValue::Boolean(Some(self.is_set))); + } Ok(result) } @@ -431,6 +442,7 @@ impl Accumulator for LastValueAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.is_merge_called = true; // LAST_VALUE(last1, last2, last3, ...) // last index contains is_set flag. let is_set_idx = states.len() - 1; From 4b195f66805c4720a5a80874966a30826f374b15 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 8 Sep 2023 04:22:22 -0700 Subject: [PATCH 06/16] address review comments --- datafusion/core/src/physical_plan/aggregates/mod.rs | 12 ++++-------- .../core/src/physical_plan/aggregates/row_hash.rs | 7 +------ 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 50a01d920fc0..e26ce1841c2a 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1540,11 +1540,7 @@ mod tests { common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 3); - if spill { - assert_eq!(batch.num_rows(), 16); - } else { - assert_eq!(batch.num_rows(), 12); - } + assert_eq!(batch.num_rows(), 12); let expected = vec![ "+---+-----+----------+", @@ -1570,9 +1566,9 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); if spill { - // When spilling, the output rows metrics includes numbers from partial early emitting. + // When spilling, the output rows metrics become partial output size + final output size // This is due to the AtomicUsize behavior - assert_eq!(26, output_rows); + assert_eq!(32, output_rows); } else { assert_eq!(12, output_rows); } @@ -1682,7 +1678,7 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); if spill { - // When spilling, the output rows metrics includes numbers from partial early emitting. + // When spilling, the output rows metrics become partial output size + final output size // This is due to the AtomicUsize behavior assert_eq!(8, output_rows); } else { diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 3d71e1b319f0..1d7f84733ae8 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -722,12 +722,6 @@ impl GroupedHashAggregateStream { && self.group_values.len() >= self.batch_size { let n = self.group_values.len() / self.batch_size * self.batch_size; - println!( - "{}, {}, {}", - self.group_values.len(), - n, - self.current_group_indices.len() - ); let batch = self.emit(EmitTo::First(n), false)?; self.exec_state = ExecutionState::ProducingOutput(batch); } @@ -740,6 +734,7 @@ impl GroupedHashAggregateStream { /// sorted, set `self.group_ordering` to Full, then later we can read with [`EmitTo::First`]. fn update_merged_stream(&mut self) -> Result<()> { let batch = self.emit(EmitTo::All, true)?; + self.clear_shrink(&batch); let mut streams: Vec = vec![]; let expr = self.spill_state.spill_expr.clone(); let schema = batch.schema(); From 985a90c63ddae331372ca488431c11f8ee71e2c9 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 8 Sep 2023 13:40:44 -0700 Subject: [PATCH 07/16] address review comments --- .../core/src/physical_plan/aggregates/mod.rs | 26 ++++++++++++------- .../src/physical_plan/aggregates/row_hash.rs | 3 +-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index e26ce1841c2a..717910c404f3 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -675,7 +675,9 @@ impl AggregateExec { }) } - /// Only for testing. When `force_spill` is true, it spills every batch. + /// Only for testing. When `force_spill` is true, it spills every batch for final aggregation. + /// It outputs every batch early for partial aggregation. + /// TODO: test by different memory pool sizes instead of propagating the `force_spill` flag #[allow(clippy::too_many_arguments)] pub fn try_new_for_test( mode: AggregateMode, @@ -1446,10 +1448,13 @@ mod tests { DataType::Int64, ))]; - let session_config = SessionConfig::new().with_batch_size(4); - let task_ctx = TaskContext::default().with_session_config(session_config); - let task_ctx = Arc::new(task_ctx); - // let task_ctx = Arc::new(TaskContext::default()); + let task_ctx = if spill { + let session_config = SessionConfig::new().with_batch_size(4); + let task_ctx = TaskContext::default().with_session_config(session_config); + Arc::new(task_ctx) + } else { + Arc::new(TaskContext::default()) + }; let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, @@ -1592,10 +1597,13 @@ mod tests { DataType::Float64, ))]; - let session_config = SessionConfig::new().with_batch_size(2); - let task_ctx = TaskContext::default().with_session_config(session_config); - let task_ctx = Arc::new(task_ctx); - // let task_ctx = Arc::new(TaskContext::default()); + let task_ctx = if spill { + let session_config = SessionConfig::new().with_batch_size(2); + let task_ctx = TaskContext::default().with_session_config(session_config); + Arc::new(task_ctx) + } else { + Arc::new(TaskContext::default()) + }; let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( AggregateMode::Partial, diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 1d7f84733ae8..b9c0a3dbb76e 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -715,11 +715,10 @@ impl GroupedHashAggregateStream { /// Currently only [`GroupOrdering::None`] is supported for spilling. /// TODO: support group_ordering for spilling fn emit_early_if_necessary(&mut self) -> Result<()> { - if self.reservation.size() > 0 + if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && matches!(self.mode, AggregateMode::Partial) && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) - && self.group_values.len() >= self.batch_size { let n = self.group_values.len() / self.batch_size * self.batch_size; let batch = self.emit(EmitTo::First(n), false)?; From d9a77c8cc5ca459adf74dfd12033bfd64c9fd2cf Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Sat, 9 Sep 2023 02:22:21 -0700 Subject: [PATCH 08/16] address review comments --- .../core/src/physical_plan/aggregates/mod.rs | 14 +++++++++----- .../src/physical_plan/aggregates/row_hash.rs | 17 ++++++++++++----- datafusion/execution/src/runtime_env.rs | 6 ++++++ 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 717910c404f3..2893525b7f98 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1599,13 +1599,19 @@ mod tests { let task_ctx = if spill { let session_config = SessionConfig::new().with_batch_size(2); - let task_ctx = TaskContext::default().with_session_config(session_config); + let runtime = Arc::new( + RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1956, 1.0)) + .unwrap(), + ); + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); Arc::new(task_ctx) } else { Arc::new(TaskContext::default()) }; - let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( + let partial_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Partial, grouping_set.clone(), aggregates.clone(), @@ -1613,7 +1619,6 @@ mod tests { vec![None], input, input_schema.clone(), - spill, )?); let result = @@ -1654,7 +1659,7 @@ mod tests { let final_grouping_set = PhysicalGroupBy::new_single(final_group); - let merged_aggregate = Arc::new(AggregateExec::try_new_for_test( + let merged_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Final, final_grouping_set, aggregates, @@ -1662,7 +1667,6 @@ mod tests { vec![None], merge, input_schema, - spill, )?); let result = diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index b9c0a3dbb76e..974ad0b81f48 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -522,7 +522,14 @@ impl Stream for GroupedHashAggregateStream { } } - ExecutionState::Done => return Poll::Ready(None), + ExecutionState::Done => { + // release the memory reservation since sending back output batch itself needs + // some memory reservation, so make some room for it. + let s = self.schema(); + self.clear_shrink(&RecordBatch::new_empty(s)); + let _ = self.update_memory_reservation(); + return Poll::Ready(None); + } } } } @@ -664,7 +671,9 @@ impl GroupedHashAggregateStream { } } - self.update_memory_reservation()?; + // emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is + // over the target memory size after emission, we can emit again rather than returning Err. + let _ = self.update_memory_reservation(); let batch = RecordBatch::try_new(schema, output)?; Ok(batch) } @@ -674,7 +683,7 @@ impl GroupedHashAggregateStream { /// memory. Currently only [`GroupOrdering::None`] is supported for spilling. fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> Result<()> { // TODO: support group_ordering for spilling - if self.reservation.size() > 0 + if self.group_values.len() > 0 && batch.num_rows() > 0 && matches!(self.group_ordering, GroupOrdering::None) && !matches!(self.mode, AggregateMode::Partial) @@ -685,8 +694,6 @@ impl GroupedHashAggregateStream { self.spill_state.spill_schema = batch.schema(); self.spill()?; self.clear_shrink(batch); - - return self.update_memory_reservation(); } Ok(()) } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 8f9c594681d0..c226f753df75 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -24,6 +24,7 @@ use crate::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; +use crate::memory_pool::FairSpillPool; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; use std::fmt::{Debug, Formatter}; @@ -168,6 +169,11 @@ impl RuntimeConfig { self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) } + pub fn with_fair_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { + let pool_size = (max_memory as f64 * memory_fraction) as usize; + self.with_memory_pool(Arc::new(FairSpillPool::new(pool_size))) + } + /// Use the specified path to create any needed temporary files pub fn with_temp_file_path(self, path: impl Into) -> Self { self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])) From aa1fc50e5df81f09e88a555ffec29a3d4323e8a0 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Sat, 9 Sep 2023 05:38:56 -0700 Subject: [PATCH 09/16] address review comments --- .../aggregates/group_values/row.rs | 1 + .../core/src/physical_plan/aggregates/mod.rs | 97 ++++++++----------- .../src/physical_plan/aggregates/row_hash.rs | 13 ++- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 29 +++--- datafusion/execution/src/runtime_env.rs | 6 -- 5 files changed, 63 insertions(+), 83 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs index 30e69d94fc67..d711a1619116 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs @@ -189,6 +189,7 @@ impl GroupValues for GroupValuesRows { self.group_values = self.row_converter.empty_rows(count, 0); self.map.clear(); self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + self.map_size = self.map.capacity() * std::mem::size_of::<(u64, usize)>(); self.hashes_buffer.clear(); self.hashes_buffer.shrink_to(count); } diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 2893525b7f98..09f31e1f498d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -675,33 +675,6 @@ impl AggregateExec { }) } - /// Only for testing. When `force_spill` is true, it spills every batch for final aggregation. - /// It outputs every batch early for partial aggregation. - /// TODO: test by different memory pool sizes instead of propagating the `force_spill` flag - #[allow(clippy::too_many_arguments)] - pub fn try_new_for_test( - mode: AggregateMode, - group_by: PhysicalGroupBy, - aggr_expr: Vec>, - filter_expr: Vec>>, - order_by_expr: Vec>, - input: Arc, - input_schema: SchemaRef, - force_spill: bool, - ) -> Result { - let mut exec = AggregateExec::try_new( - mode, - group_by, - aggr_expr, - filter_expr, - order_by_expr, - input, - input_schema, - )?; - exec.force_spill = force_spill; - Ok(exec) - } - /// Aggregation mode (full, partial) pub fn mode(&self) -> &AggregateMode { &self.mode @@ -1420,6 +1393,18 @@ mod tests { ) } + fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc { + let session_config = SessionConfig::new().with_batch_size(batch_size); + let runtime = Arc::new( + RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(max_memory, 1.0)) + .unwrap(), + ); + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + Arc::new(task_ctx) + } + async fn check_grouping_sets( input: Arc, spill: bool, @@ -1449,14 +1434,12 @@ mod tests { ))]; let task_ctx = if spill { - let session_config = SessionConfig::new().with_batch_size(4); - let task_ctx = TaskContext::default().with_session_config(session_config); - Arc::new(task_ctx) + new_spill_ctx(4, 1000) } else { Arc::new(TaskContext::default()) }; - let partial_aggregate = Arc::new(AggregateExec::try_new_for_test( + let partial_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Partial, grouping_set.clone(), aggregates.clone(), @@ -1464,7 +1447,6 @@ mod tests { vec![None], input, input_schema.clone(), - spill, )?); let result = @@ -1530,7 +1512,23 @@ mod tests { let final_grouping_set = PhysicalGroupBy::new_single(final_group); - let merged_aggregate = Arc::new(AggregateExec::try_new_for_test( + let task_ctx = if spill { + let session_config = SessionConfig::new().with_batch_size(4); + let runtime = Arc::new( + // memory 3160 is too large for partial aggregation to do partial-emits; however, + // 3160 is minimum for the final aggregation completes even with spilling + RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(3160, 1.0)) + .unwrap(), + ); + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + Arc::new(task_ctx) + } else { + task_ctx + }; + + let merged_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Final, final_grouping_set, aggregates, @@ -1538,7 +1536,6 @@ mod tests { vec![None], merge, input_schema, - spill, )?); let result = @@ -1570,13 +1567,7 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); - if spill { - // When spilling, the output rows metrics become partial output size + final output size - // This is due to the AtomicUsize behavior - assert_eq!(32, output_rows); - } else { - assert_eq!(12, output_rows); - } + assert_eq!(12, output_rows); Ok(()) } @@ -1598,15 +1589,7 @@ mod tests { ))]; let task_ctx = if spill { - let session_config = SessionConfig::new().with_batch_size(2); - let runtime = Arc::new( - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1956, 1.0)) - .unwrap(), - ); - let task_ctx = TaskContext::default() - .with_session_config(session_config) - .with_runtime(runtime); - Arc::new(task_ctx) + new_spill_ctx(2, 1956) } else { Arc::new(TaskContext::default()) }; @@ -1691,7 +1674,7 @@ mod tests { let output_rows = metrics.output_rows().unwrap(); if spill { // When spilling, the output rows metrics become partial output size + final output size - // This is due to the AtomicUsize behavior + // This is because final aggregation starts while partial aggregation is still emitting assert_eq!(8, output_rows); } else { assert_eq!(3, output_rows); @@ -2071,7 +2054,11 @@ mod tests { is_first_acc: bool, spill: bool, ) -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); + let task_ctx = if spill { + new_spill_ctx(2, 2581) + } else { + Arc::new(TaskContext::default()) + }; let (schema, data) = some_data_v2(); let partition1 = data[0].clone(); @@ -2114,7 +2101,7 @@ mod tests { schema.clone(), None, )?); - let aggregate_exec = Arc::new(AggregateExec::try_new_for_test( + let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), @@ -2122,7 +2109,6 @@ mod tests { vec![Some(ordering_req.clone())], memory_exec, schema.clone(), - spill, )?); let coalesce = if use_coalesce_batches { let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); @@ -2131,7 +2117,7 @@ mod tests { Arc::new(CoalescePartitionsExec::new(aggregate_exec)) as Arc }; - let aggregate_final = Arc::new(AggregateExec::try_new_for_test( + let aggregate_final = Arc::new(AggregateExec::try_new( AggregateMode::Final, groups, aggregates.clone(), @@ -2139,7 +2125,6 @@ mod tests { vec![Some(ordering_req)], coalesce, schema, - spill, )?) as Arc; let result = crate::physical_plan::collect(aggregate_final, task_ctx).await?; diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 974ad0b81f48..c534e26b8cb4 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -525,8 +525,7 @@ impl Stream for GroupedHashAggregateStream { ExecutionState::Done => { // release the memory reservation since sending back output batch itself needs // some memory reservation, so make some room for it. - let s = self.schema(); - self.clear_shrink(&RecordBatch::new_empty(s)); + self.clear_all(); let _ = self.update_memory_reservation(); return Poll::Ready(None); } @@ -718,6 +717,12 @@ impl GroupedHashAggregateStream { self.current_group_indices.shrink_to(batch.num_rows()); } + /// Clear memory and shirk capacities to zero. + fn clear_all(&mut self) { + let s = self.schema(); + self.clear_shrink(&RecordBatch::new_empty(s)); + } + /// Emit if the used memory exceeds the target for partial aggregation. /// Currently only [`GroupOrdering::None`] is supported for spilling. /// TODO: support group_ordering for spilling @@ -740,7 +745,9 @@ impl GroupedHashAggregateStream { /// sorted, set `self.group_ordering` to Full, then later we can read with [`EmitTo::First`]. fn update_merged_stream(&mut self) -> Result<()> { let batch = self.emit(EmitTo::All, true)?; - self.clear_shrink(&batch); + // clear up memory for streaming_merge + self.clear_all(); + self.update_memory_reservation()?; let mut streams: Vec = vec![]; let expr = self.spill_state.spill_expr.clone(); let schema = batch.schema(); diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index c039599ab507..c9345865abac 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -31,6 +31,8 @@ use rand::{Rng, SeedableRng}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::GreedyMemoryPool; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{col, Sum}; use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; @@ -57,15 +59,12 @@ mod tests { let mut handles = Vec::new(); for i in 0..n { let test_idx = i % test_cases.len(); - for spill in [false, true] { - let group_by_columns = test_cases[test_idx].clone(); - let job = tokio::spawn(run_aggregate_test( - make_staggered_batches::(1000, distinct, i as u64), - group_by_columns, - spill, - )); - handles.push(job); - } + let group_by_columns = test_cases[test_idx].clone(); + let job = tokio::spawn(run_aggregate_test( + make_staggered_batches::(1000, distinct, i as u64), + group_by_columns, + )); + handles.push(job); } for job in handles { job.await.unwrap(); @@ -77,11 +76,7 @@ mod tests { /// Perform batch and streaming aggregation with same input /// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream` /// and non-pipeline breaking stream `BoundedAggregateStream` produces same result. -async fn run_aggregate_test( - input1: Vec, - group_by_columns: Vec<&str>, - spill: bool, -) { +async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str>) { let schema = input1[0].schema(); let session_config = SessionConfig::new().with_batch_size(50); let ctx = SessionContext::with_config(session_config); @@ -116,7 +111,7 @@ async fn run_aggregate_test( let group_by = PhysicalGroupBy::new_single(expr); let aggregate_exec_running = Arc::new( - AggregateExec::try_new_for_test( + AggregateExec::try_new( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), @@ -124,13 +119,12 @@ async fn run_aggregate_test( vec![None], running_source, schema.clone(), - spill, ) .unwrap(), ) as Arc; let aggregate_exec_usual = Arc::new( - AggregateExec::try_new_for_test( + AggregateExec::try_new( AggregateMode::Partial, group_by.clone(), aggregate_expr.clone(), @@ -138,7 +132,6 @@ async fn run_aggregate_test( vec![None], usual_source, schema.clone(), - spill, ) .unwrap(), ) as Arc; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index c226f753df75..8f9c594681d0 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -24,7 +24,6 @@ use crate::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; -use crate::memory_pool::FairSpillPool; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; use std::fmt::{Debug, Formatter}; @@ -169,11 +168,6 @@ impl RuntimeConfig { self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) } - pub fn with_fair_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { - let pool_size = (max_memory as f64 * memory_fraction) as usize; - self.with_memory_pool(Arc::new(FairSpillPool::new(pool_size))) - } - /// Use the specified path to create any needed temporary files pub fn with_temp_file_path(self, path: impl Into) -> Self { self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])) From 66467767ab4e1c731f58c17abb2403971aea936a Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Mon, 11 Sep 2023 17:40:19 -0700 Subject: [PATCH 10/16] address review comments --- .../core/src/physical_plan/aggregates/mod.rs | 19 +++---------------- .../src/physical_plan/aggregates/row_hash.rs | 17 +++++++++-------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 09f31e1f498d..20dc3efa09d9 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -281,8 +281,6 @@ pub struct AggregateExec { /// Stores mode and output ordering information for the `AggregateExec`. aggregation_ordering: Option, required_input_ordering: Option, - /// Force spilling for debugging - force_spill: bool, } /// Calculates the working mode for `GROUP BY` queries. @@ -671,7 +669,6 @@ impl AggregateExec { metrics: ExecutionPlanMetricsSet::new(), aggregation_ordering, required_input_ordering, - force_spill: false, }) } @@ -1513,17 +1510,7 @@ mod tests { let final_grouping_set = PhysicalGroupBy::new_single(final_group); let task_ctx = if spill { - let session_config = SessionConfig::new().with_batch_size(4); - let runtime = Arc::new( - // memory 3160 is too large for partial aggregation to do partial-emits; however, - // 3160 is minimum for the final aggregation completes even with spilling - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(3160, 1.0)) - .unwrap(), - ); - let task_ctx = TaskContext::default() - .with_session_config(session_config) - .with_runtime(runtime); - Arc::new(task_ctx) + new_spill_ctx(4, 3160) } else { task_ctx }; @@ -1589,7 +1576,7 @@ mod tests { ))]; let task_ctx = if spill { - new_spill_ctx(2, 1956) + new_spill_ctx(2, 2144) } else { Arc::new(TaskContext::default()) }; @@ -2055,7 +2042,7 @@ mod tests { spill: bool, ) -> Result<()> { let task_ctx = if spill { - new_spill_ctx(2, 2581) + new_spill_ctx(2, 2812) } else { Arc::new(TaskContext::default()) }; diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index c534e26b8cb4..b9156dc9ef37 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -88,9 +88,6 @@ struct SpillState { /// GROUP BY expressions for merging spilled data merging_group_by: PhysicalGroupBy, - - /// Force spilling always for debugging - force_spill: bool, } /// Hash based Grouping Aggregator @@ -357,7 +354,6 @@ impl GroupedHashAggregateStream { merged_stream: Box::pin(EmptyRecordBatchStream::new(agg_schema.clone())), merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), - force_spill: agg.force_spill, }; Ok(GroupedHashAggregateStream { @@ -622,8 +618,13 @@ impl GroupedHashAggregateStream { } match self.update_memory_reservation() { - // Here we can ignore `insufficient_capacity_err` because we will spill later - Err(DataFusionError::ResourcesExhausted(_)) => Ok(()), + // Here we can ignore `insufficient_capacity_err` because we will spill later, + // but at least one batch should fit in the memory + Err(DataFusionError::ResourcesExhausted(_)) + if self.group_values.len() >= self.batch_size => + { + Ok(()) + } other => other, } } @@ -686,7 +687,7 @@ impl GroupedHashAggregateStream { && batch.num_rows() > 0 && matches!(self.group_ordering, GroupOrdering::None) && !matches!(self.mode, AggregateMode::Partial) - && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) + && self.update_memory_reservation().is_err() { // Use input batch (Partial mode) schema for spilling because // the spilled data will be merged and re-evaluated later. @@ -730,7 +731,7 @@ impl GroupedHashAggregateStream { if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && matches!(self.mode, AggregateMode::Partial) - && (self.update_memory_reservation().is_err() || self.spill_state.force_spill) + && self.update_memory_reservation().is_err() { let n = self.group_values.len() / self.batch_size * self.batch_size; let batch = self.emit(EmitTo::First(n), false)?; From b636a5e360ae109ad30a7e5d047fca61722ecd23 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 12 Sep 2023 02:18:04 -0700 Subject: [PATCH 11/16] address review comments --- datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index c9345865abac..a0e9a50a22ae 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -31,8 +31,6 @@ use rand::{Rng, SeedableRng}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_execution::memory_pool::GreedyMemoryPool; -use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{col, Sum}; use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; From a7f4fee1ad5662168e0b0a3836a2982eb95893bd Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 12 Sep 2023 03:24:14 -0700 Subject: [PATCH 12/16] a From ff53e06096715b54b15b50f1b3662a6dcf4dec68 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 13 Sep 2023 15:44:17 -0700 Subject: [PATCH 13/16] address review comments --- .../aggregates/group_values/mod.rs | 2 +- .../src/physical_plan/aggregates/row_hash.rs | 60 +++++-------------- 2 files changed, 15 insertions(+), 47 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index dcbe83cb849f..67410bdfd6d7 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -44,7 +44,7 @@ pub trait GroupValues: Send { /// Emits the group values fn emit(&mut self, emit_to: EmitTo) -> Result>; - /// clear the contents and shrink the capacity + /// clear the contents and shrink the capacity to free up memory usage fn clear_shrink(&mut self, batch: &RecordBatch); } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index b9156dc9ef37..8aa38cc6bada 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -39,7 +39,7 @@ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::sorts::sort::{read_spill_as_stream, sort_batch}; use crate::physical_plan::sorts::streaming_merge; use crate::physical_plan::stream::RecordBatchStreamAdapter; -use crate::physical_plan::{aggregates, EmptyRecordBatchStream, PhysicalExpr}; +use crate::physical_plan::{aggregates, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; @@ -59,9 +59,6 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), - /// When producing output of spilled data, the merged spilled data stream - /// is sliced off as needed in batch_size chunks - ProducingSpillOutput, Done, } @@ -80,8 +77,8 @@ struct SpillState { /// Schema for spilling batches spill_schema: SchemaRef, - /// Stream of merged outputs after spilling - merged_stream: SendableRecordBatchStream, + /// true when streaming merge is in progress + is_stream_merging: bool, /// aggregate_arguments for merging spilled data merging_aggregate_arguments: Vec>>, @@ -351,7 +348,7 @@ impl GroupedHashAggregateStream { spills: vec![], spill_expr, spill_schema: agg_schema.clone(), - merged_stream: Box::pin(EmptyRecordBatchStream::new(agg_schema.clone())), + is_stream_merging: false, merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; @@ -429,7 +426,7 @@ impl Stream for GroupedHashAggregateStream { extract_ok!(self.spill_previous_if_necessary(&batch)); // Do the grouping - extract_ok!(self.group_aggregate_batch(batch, false)); + extract_ok!(self.group_aggregate_batch(batch)); // If we can begin emitting rows, do so, // otherwise keep consuming input @@ -459,7 +456,7 @@ impl Stream for GroupedHashAggregateStream { } else { // If spill files exist, stream-merge them. extract_ok!(self.update_merged_stream()); - self.exec_state = ExecutionState::ProducingSpillOutput; + self.exec_state = ExecutionState::ReadingInput; } timer.done(); } @@ -487,37 +484,6 @@ impl Stream for GroupedHashAggregateStream { ))); } - ExecutionState::ProducingSpillOutput => { - match ready!(self.spill_state.merged_stream.poll_next_unpin(cx)) { - Some(batch_result) => { - // Re-group the stream-merged results. - extract_ok!(self.group_aggregate_batch(batch_result?, true)); - // Output first batch_size rows. - if let Some(EmitTo::First(n)) = self.group_ordering.emit_to() - { - let batch_size = self.batch_size; - if n >= batch_size { - let batch = extract_ok!( - self.emit(EmitTo::First(batch_size), false) - ); - return Poll::Ready(Some(Ok( - batch.record_output(&self.baseline_metrics) - ))); - } - } - } - None => { - // The streaming merge is done. Output the remaining. - let batch = extract_ok!(self.emit(EmitTo::All, false)); - self.exec_state = if batch.num_rows() > 0 { - ExecutionState::ProducingOutput(batch) - } else { - ExecutionState::Done - } - } - } - } - ExecutionState::Done => { // release the memory reservation since sending back output batch itself needs // some memory reservation, so make some room for it. @@ -538,23 +504,23 @@ impl RecordBatchStream for GroupedHashAggregateStream { impl GroupedHashAggregateStream { /// Perform group-by aggregation for the given [`RecordBatch`]. - fn group_aggregate_batch(&mut self, batch: RecordBatch, merging: bool) -> Result<()> { + fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { // Evaluate the grouping expressions - let group_by_values = if merging { + let group_by_values = if self.spill_state.is_stream_merging { evaluate_group_by(&self.spill_state.merging_group_by, &batch)? } else { evaluate_group_by(&self.group_by, &batch)? }; // Evaluate the aggregation expressions. - let input_values = if merging { + let input_values = if self.spill_state.is_stream_merging { evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)? } else { evaluate_many(&self.aggregate_arguments, &batch)? }; // Evaluate the filter expressions, if any, against the inputs - let filter_values = if merging { + let filter_values = if self.spill_state.is_stream_merging { let filter_expressions = vec![None; self.accumulators.len()]; evaluate_optional(&filter_expressions, &batch)? } else { @@ -594,7 +560,7 @@ impl GroupedHashAggregateStream { AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned - if !merging => + if !self.spill_state.is_stream_merging => { acc.update_batch( values, @@ -762,7 +728,8 @@ impl GroupedHashAggregateStream { let stream = read_spill_as_stream(spill, schema.clone())?; streams.push(stream); } - self.spill_state.merged_stream = streaming_merge( + self.spill_state.is_stream_merging = true; + self.input = streaming_merge( streams, schema, &self.spill_state.spill_expr, @@ -771,6 +738,7 @@ impl GroupedHashAggregateStream { None, self.reservation.new_empty(), )?; + self.input_done = false; self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); Ok(()) } From 2ead8b3c8f87e4ad21dd88e05345ea3f5d2e28b6 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 13 Sep 2023 16:32:51 -0700 Subject: [PATCH 14/16] address review comments --- datafusion/core/src/physical_plan/aggregates/row_hash.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index d3848bb7a24d..94906086eced 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -653,6 +653,7 @@ impl GroupedHashAggregateStream { && batch.num_rows() > 0 && matches!(self.group_ordering, GroupOrdering::None) && !matches!(self.mode, AggregateMode::Partial) + && !self.spill_state.is_stream_merging && self.update_memory_reservation().is_err() { // Use input batch (Partial mode) schema for spilling because From 9a65b2753e8f76f1736923d1afe801c637f13518 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 14 Sep 2023 13:22:25 -0700 Subject: [PATCH 15/16] address review comments --- .../src/physical_plan/aggregates/group_values/mod.rs | 2 +- .../core/src/physical_plan/aggregates/row_hash.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index 8781bd1dde45..cafa385eac39 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -44,7 +44,7 @@ pub trait GroupValues: Send { /// Emits the group values fn emit(&mut self, emit_to: EmitTo) -> Result>; - /// clear the contents and shrink the capacity to free up memory usage + /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage) fn clear_shrink(&mut self, batch: &RecordBatch); } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 931926a277f3..2afa08129841 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -155,8 +155,8 @@ struct SpillState { /// # Spilling /// /// The sizes of group values and accumulators can become large. Before that causes out of memory, -/// this hash aggregator outputs those data early for partial aggregation or spills to local disk -/// using Arrow IPC format for final aggregation. For every input [`RecordBatch`], the memory +/// this hash aggregator outputs partial states early for partial aggregation or spills to local +/// disk using Arrow IPC format for final aggregation. For every input [`RecordBatch`], the memory /// manager checks whether the new input size meets the memory configuration. If not, outputting or /// spilling happens. For outputting, the final aggregation takes care of re-grouping. For spilling, /// later stream-merge sort on reading back the spilled data does re-grouping. Note the rows cannot @@ -180,7 +180,7 @@ struct SpillState { /// │ 1 │ 1.0 │ ─▶ │ 1 │ 1 │ 1.0 │ ─▶ early emit ─▶ ┌─────────────────┐ /// │ 3 │ 2.0 │ │ 3 │ 1 │ 2.0 │ │ 3 │ 1 │ 2.0 │ /// └─────────┘ └─────────────────┘ └─────────────────┘ -/// +/// /// /// Final Aggregation [batch_size = 2] (max memory = 3 rows) /// @@ -692,8 +692,8 @@ impl GroupedHashAggregateStream { } /// Emit if the used memory exceeds the target for partial aggregation. - /// Currently only [`GroupOrdering::None`] is supported for spilling. - /// TODO: support group_ordering for spilling + /// Currently only [`GroupOrdering::None`] is supported for early emitting. + /// TODO: support group_ordering for early emitting fn emit_early_if_necessary(&mut self) -> Result<()> { if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) From c8b4a10989f731a345ec6d1f590c99fbbdcc3663 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 14 Sep 2023 15:36:32 -0700 Subject: [PATCH 16/16] address review comments --- datafusion/core/src/physical_plan/aggregates/row_hash.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 2afa08129841..eef25c1dc214 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -160,7 +160,8 @@ struct SpillState { /// manager checks whether the new input size meets the memory configuration. If not, outputting or /// spilling happens. For outputting, the final aggregation takes care of re-grouping. For spilling, /// later stream-merge sort on reading back the spilled data does re-grouping. Note the rows cannot -/// be grouped once spilled onto disk, the read back data needs to be re-grouped again. +/// be grouped once spilled onto disk, the read back data needs to be re-grouped again. In addition, +/// re-grouping may cause out of memory again. Thus, re-grouping has to be a sort based aggregation. /// /// ```text /// Partial Aggregation [batch_size = 2] (max memory = 3 rows)