Skip to content

Commit

Permalink
handle empty results, empty indices, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Mar 17, 2022
1 parent 691245b commit 9fff26c
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 95 deletions.
49 changes: 49 additions & 0 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,62 @@ use serde::{Deserialize, Serialize};
use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;

/// The top-level aggregation request structure, which contains [Aggregation] and their user defined
/// names. It is also used in [buckets](BucketAggregation) to define sub-aggregations.
///
/// The key is the user defined name of the aggregation.
pub type Aggregations = HashMap<String, Aggregation>;

/// Like Aggregations, but optimized to work with the aggregation result
#[derive(Clone, Debug)]
pub(crate) struct CollectorAggregations {
pub(crate) metrics: VecWithNames<MetricAggregation>,
pub(crate) buckets: VecWithNames<CollectorBucketAggregation>,
}

impl From<Aggregations> for CollectorAggregations {
fn from(aggs: Aggregations) -> Self {
let mut metrics = vec![];
let mut buckets = vec![];
for (key, agg) in aggs {
match agg {
Aggregation::Bucket(bucket) => buckets.push((
key,
CollectorBucketAggregation {
bucket_agg: bucket.bucket_agg,
sub_aggregation: bucket.sub_aggregation.into(),
},
)),
Aggregation::Metric(metric) => metrics.push((key, metric)),
}
}
Self {
metrics: VecWithNames::from_entries(metrics),
buckets: VecWithNames::from_entries(buckets),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct CollectorBucketAggregation {
/// Bucket aggregation strategy to group documents.
pub bucket_agg: BucketAggregationType,
/// The sub_aggregations in the buckets. Each bucket will aggregate on the document set in the
/// bucket.
pub sub_aggregation: CollectorAggregations,
}

impl CollectorBucketAggregation {
pub(crate) fn as_histogram(&self) -> &HistogramAggregation {
match &self.bucket_agg {
BucketAggregationType::Range(_) => panic!("unexpected aggregation"),
BucketAggregationType::Histogram(histogram) => histogram,
}
}
}

/// Extract all fast field names used in the tree.
pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
let mut fast_field_names = Default::default();
Expand Down
121 changes: 85 additions & 36 deletions src/aggregation/agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::agg_req::{Aggregations, CollectorAggregations, CollectorBucketAggregation};
use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
Expand All @@ -22,21 +23,52 @@ use super::Key;
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

impl From<IntermediateAggregationResults> for AggregationResults {
fn from(tree: IntermediateAggregationResults) -> Self {
Self(
tree.buckets
.unwrap_or_default()
.into_iter()
.map(|(key, bucket)| (key, AggregationResult::BucketResult(bucket.into())))
.chain(
tree.metrics
.unwrap_or_default()
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
impl From<(IntermediateAggregationResults, Aggregations)> for AggregationResults {
fn from(tree_and_req: (IntermediateAggregationResults, Aggregations)) -> Self {
let agg: CollectorAggregations = tree_and_req.1.into();
(tree_and_req.0, &agg).into()
}
}

impl From<(IntermediateAggregationResults, &CollectorAggregations)> for AggregationResults {
fn from(data: (IntermediateAggregationResults, &CollectorAggregations)) -> Self {
let tree = data.0;
let req = data.1;
let mut result = HashMap::default();

// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
// request
if let Some(buckets) = tree.buckets {
result.extend(buckets.into_iter().zip(req.buckets.values()).map(
|((key, bucket), req)| (key, AggregationResult::BucketResult((bucket, req).into())),
));
} else {
result.extend(req.buckets.iter().map(|(key, req)| {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
(
key.to_string(),
AggregationResult::BucketResult((empty_bucket, req).into()),
)
.collect(),
)
}));
}

if let Some(metrics) = tree.metrics {
result.extend(
metrics
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
);
} else {
result.extend(req.metrics.iter().map(|(key, req)| {
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
(
key.to_string(),
AggregationResult::MetricResult(empty_bucket.into()),
)
}));
}
Self(result)
}
}

Expand Down Expand Up @@ -95,13 +127,15 @@ pub enum BucketResult {
},
}

impl From<IntermediateBucketResult> for BucketResult {
fn from(result: IntermediateBucketResult) -> Self {
match result {
impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketResult {
fn from(result_and_req: (IntermediateBucketResult, &CollectorBucketAggregation)) -> Self {
let bucket_result = result_and_req.0;
let req = result_and_req.1;
match bucket_result {
IntermediateBucketResult::Range(range_map) => {
let mut buckets: Vec<RangeBucketEntry> = range_map
.into_iter()
.map(|(_, bucket)| bucket.into())
.map(|(_, bucket)| (bucket, &req.sub_aggregation).into())
.collect_vec();

buckets.sort_by(|a, b| {
Expand All @@ -112,20 +146,26 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets, req } => {
let buckets = if req.min_doc_count() == 0 {
IntermediateBucketResult::Histogram { buckets } => {
let histogram_req = req.as_histogram();
let buckets = if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size).
let fill_gaps_buckets = if buckets.len() > 1 {
// buckets are sorted

let (min, max) = if buckets.is_empty() {
(f64::MAX, f64::MIN)
} else {
let min = buckets[0].key;
let max = buckets[buckets.len() - 1].key;
generate_buckets(&req, min, max)
} else {
vec![]
(min, max)
};

let fill_gaps_buckets = generate_buckets(histogram_req, min, max);

let sub_aggregation =
IntermediateAggregationResults::empty_from_req(&req.sub_aggregation);

buckets
.into_iter()
.merge_join_by(
Expand All @@ -138,21 +178,26 @@ impl From<IntermediateBucketResult> for BucketResult {
},
)
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => existing.into(),
itertools::EitherOrBoth::Left(existing) => existing.into(),
itertools::EitherOrBoth::Both(existing, _) => {
(existing, &req.sub_aggregation).into()
}
itertools::EitherOrBoth::Left(existing) => {
(existing, &req.sub_aggregation).into()
}
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: Default::default(),
sub_aggregation: (sub_aggregation.clone(), &req.sub_aggregation)
.into(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= req.min_doc_count())
.map(|bucket| bucket.into())
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| (bucket, &req.sub_aggregation).into())
.collect_vec()
};

Expand Down Expand Up @@ -199,12 +244,14 @@ pub struct BucketEntry {
pub sub_aggregation: AggregationResults,
}

impl From<IntermediateHistogramBucketEntry> for BucketEntry {
fn from(entry: IntermediateHistogramBucketEntry) -> Self {
impl From<(IntermediateHistogramBucketEntry, &CollectorAggregations)> for BucketEntry {
fn from(entry_and_req: (IntermediateHistogramBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
sub_aggregation: (entry.sub_aggregation, req).into(),
}
}
}
Expand Down Expand Up @@ -256,12 +303,14 @@ pub struct RangeBucketEntry {
pub to: Option<f64>,
}

impl From<IntermediateRangeBucketEntry> for RangeBucketEntry {
fn from(entry: IntermediateRangeBucketEntry) -> Self {
impl From<(IntermediateRangeBucketEntry, &CollectorAggregations)> for RangeBucketEntry {
fn from(entry_and_req: (IntermediateRangeBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
sub_aggregation: (entry.sub_aggregation, req).into(),
to: entry.to,
from: entry.from,
}
Expand Down
Loading

0 comments on commit 9fff26c

Please sign in to comment.