From 3e237cf825b63ed52e212ffd004803c794244bd6 Mon Sep 17 00:00:00 2001 From: "Christopher M. Wolff" Date: Tue, 20 Apr 2021 07:32:33 -0700 Subject: [PATCH 1/2] build(flux): update flux to v0.113.0 --- go.mod | 2 +- go.sum | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index d763d8bc7e8..1982402e166 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/golang/mock v1.4.3 github.com/golang/snappy v0.0.1 github.com/google/go-cmp v0.5.0 - github.com/influxdata/flux v0.112.1 + github.com/influxdata/flux v0.113.0 github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 github.com/influxdata/pkg-config v0.2.7 diff --git a/go.sum b/go.sum index 8a8ff43431e..f77b99d4f7e 100644 --- a/go.sum +++ b/go.sum @@ -446,12 +446,9 @@ github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/flux v0.65.0 h1:57tk1Oo4gpGIDbV12vUAPCMtLtThhaXzub1XRIuqv6A= github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY= -github.com/influxdata/flux v0.111.0 h1:27CNz0SbEofD9NzdwcdxRwGmuVSDSisVq4dOceB/KF0= -github.com/influxdata/flux v0.111.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU= -github.com/influxdata/flux v0.112.1 h1:N9kRbSx0AdGDkjH5PyoFczfCOenNsfxYVFhLFcEAOWQ= -github.com/influxdata/flux v0.112.1/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU= +github.com/influxdata/flux v0.113.0 h1:QoQ9ggVRZeMK5u4FUzYLHPa3QKu435abMp/Ejdse6LY= +github.com/influxdata/flux v0.113.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA= github.com/influxdata/influxdb v1.8.0/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ= From 458a7ec9790e0564b400d3ba3256bbfb7ebca9ea Mon Sep 17 00:00:00 2001 From: Faith Chikwekwe Date: Tue, 20 Apr 2021 12:56:43 -0700 Subject: [PATCH 2/2] feat(query): enable min/max pushdown * feat(query): enable min/max pushdown * fix(query): fix the group last pushdown to use descending cursors * test(storage): add read group test with no agg Co-authored-by: Jonathan A. Sternberg --- .../influxdb/min_max_influxdb_test.flux | 61 ++ flux/stdlib/influxdata/influxdb/rules.go | 118 +++ services/storage/store.go | 6 +- storage/flux/table.gen.go | 847 ++++++++++++------ storage/flux/table.gen.go.tmpl | 210 +++-- storage/flux/table.go | 13 +- storage/flux/table_test.go | 106 ++- storage/reads/group_resultset.go | 10 +- test-flux.sh | 6 +- 9 files changed, 1038 insertions(+), 339 deletions(-) create mode 100644 flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux diff --git a/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux b/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux new file mode 100644 index 00000000000..820e81e5a13 --- /dev/null +++ b/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux @@ -0,0 +1,61 @@ +package influxdb_test + +import "testing/expect" + +option now = () => (2030-01-01T00:00:00Z) + +testcase push_down_min_bare extends "flux/planner/group_min_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_min_test.group_min_bare() +} + +testcase push_down_min_bare_host extends "flux/planner/group_min_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_min_test.group_min_bare_host() +} + +testcase push_down_min_bare_field extends "flux/planner/group_min_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_min_test.group_min_bare_field() +} + +testcase push_down_max_bare extends "flux/planner/group_max_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_max_test.group_max_bare() +} + +testcase push_down_max_bare_host extends "flux/planner/group_max_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_max_test.group_max_bare_host() +} + +testcase push_down_max_bare_field extends "flux/planner/group_max_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_max_test.group_max_bare_field() +} + +testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_min_max_table_test.group_min_table() +} + +testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + ]) + group_min_max_table_test.group_max_table() +} diff --git a/flux/stdlib/influxdata/influxdb/rules.go b/flux/stdlib/influxdata/influxdb/rules.go index bcf1dbe2aa6..1e4555b2d4f 100644 --- a/flux/stdlib/influxdata/influxdb/rules.go +++ b/flux/stdlib/influxdata/influxdb/rules.go @@ -30,6 +30,7 @@ func init() { PushDownWindowAggregateRule{}, PushDownWindowAggregateByTimeRule{}, PushDownBareAggregateRule{}, + PushDownGroupAggregateRule{}, ) plan.RegisterLogicalRules( universe.MergeFiltersRule{}, @@ -874,6 +875,123 @@ func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p }), true, nil } +// +// Push Down of group aggregates. +// ReadGroupPhys |> { count } +// +type PushDownGroupAggregateRule struct{} + +func (PushDownGroupAggregateRule) Name() string { + return "PushDownGroupAggregateRule" +} + +func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern { + return plan.OneOf( + []plan.ProcedureKind{ + universe.CountKind, + universe.SumKind, + universe.FirstKind, + universe.LastKind, + universe.MinKind, + universe.MaxKind, + }, + plan.Pat(ReadGroupPhysKind)) +} + +func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) { + group := pn.Predecessors()[0].ProcedureSpec().(*ReadGroupPhysSpec) + // Cannot push down multiple aggregates + if len(group.AggregateMethod) > 0 { + return pn, false, nil + } + + if !canPushGroupedAggregate(ctx, pn) { + return pn, false, nil + } + + switch pn.Kind() { + case universe.CountKind: + // ReadGroup() -> count => ReadGroup(count) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.CountKind, + }) + return node, true, nil + case universe.SumKind: + // ReadGroup() -> sum => ReadGroup(sum) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.SumKind, + }) + return node, true, nil + case universe.FirstKind: + // ReadGroup() -> first => ReadGroup(first) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.FirstKind, + }) + return node, true, nil + case universe.LastKind: + // ReadGroup() -> last => ReadGroup(last) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.LastKind, + }) + return node, true, nil + case universe.MinKind: + // ReadGroup() -> min => ReadGroup(min) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.MinKind, + }) + return node, true, nil + case universe.MaxKind: + // ReadGroup() -> max => ReadGroup(max) + node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.MaxKind, + }) + return node, true, nil + } + return pn, false, nil +} + +func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool { + switch pn.Kind() { + case universe.CountKind: + agg := pn.ProcedureSpec().(*universe.CountProcedureSpec) + return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel + case universe.SumKind: + agg := pn.ProcedureSpec().(*universe.SumProcedureSpec) + return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel + case universe.FirstKind: + agg := pn.ProcedureSpec().(*universe.FirstProcedureSpec) + return agg.Column == execute.DefaultValueColLabel + case universe.LastKind: + agg := pn.ProcedureSpec().(*universe.LastProcedureSpec) + return agg.Column == execute.DefaultValueColLabel + case universe.MaxKind: + agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec) + return agg.Column == execute.DefaultValueColLabel + case universe.MinKind: + agg := pn.ProcedureSpec().(*universe.MinProcedureSpec) + return agg.Column == execute.DefaultValueColLabel + } + return false +} + func asSchemaMutationProcedureSpec(spec plan.ProcedureSpec) *universe.SchemaMutationProcedureSpec { if s, ok := spec.(*universe.DualImplProcedureSpec); ok { spec = s.ProcedureSpec diff --git a/services/storage/store.go b/services/storage/store.go index 4217253c4ed..5821c8dc1bd 100644 --- a/services/storage/store.go +++ b/services/storage/store.go @@ -159,7 +159,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) return nil, err } - shardIDs, err := s.findShardIDs(database, rp, false, start, end) + // Due to some optimizations around how flux's `last()` function is implemented with the + // storage engine, we need to detect if the read request requires a descending + // cursor or not. + descending := !reads.IsAscendingGroupAggregate(req) + shardIDs, err := s.findShardIDs(database, rp, descending, start, end) if err != nil { return nil, err } diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 7b969c1542c..4302bdd8923 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -737,20 +737,17 @@ func (t *floatGroupTable) advance() bool { return true } - aggregate, err := determineFloatAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeFloatAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []float64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -758,7 +755,7 @@ func (t *floatGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -767,23 +764,116 @@ func (t *floatGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]float64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type floatAggregateMethod func([]int64, []float64) (int64, float64) +type FloatAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) -// determineFloatAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []float64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, float64, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type floatSelectorMethod func(int64, float64, []int64, []float64, int) int + +// The selector accumulator tracks currently-selected item. +type floatSelectorAccumulator struct { + selector floatSelectorMethod + + ts int64 + v float64 + tags [][]byte +} + +func (a *floatSelectorAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *floatSelectorAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *floatSelectorAccumulator) Result() (int64, float64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type floatAggregateMethod func(float64, []float64, int) float64 + +type floatAggregateAccumulator struct { + aggregate floatAggregateMethod + accum float64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *floatAggregateAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *floatAggregateAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *floatAggregateAccumulator) Result() (int64, float64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeFloatAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (floatAggregateMethod, error) { +func makeFloatAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (FloatAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorFirstGroupsFloat}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorLastGroupsFloat}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -793,15 +883,15 @@ func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (float case datatypes.AggregateTypeSum: - return aggregateSumGroupsFloat, nil + return &floatAggregateAccumulator{aggregate: aggregateSumGroupsFloat}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorMinGroupsFloat}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorMaxGroupsFloat}, nil default: return nil, &errors.Error{ @@ -811,72 +901,63 @@ func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (float } } -func aggregateMinGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v < values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateSumGroupsFloat(_ []int64, values []float64) (int64, float64) { - var sum float64 - for _, v := range values { - sum += v +func aggregateSumGroupsFloat(sum float64, values []float64, i int) float64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *floatGroupTable) advanceCursor() bool { @@ -1633,20 +1714,17 @@ func (t *integerGroupTable) advance() bool { return true } - aggregate, err := determineIntegerAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeIntegerAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []int64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -1654,7 +1732,7 @@ func (t *integerGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -1663,38 +1741,131 @@ func (t *integerGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]int64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type integerAggregateMethod func([]int64, []int64) (int64, int64) +type IntegerAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []int64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, int64, [][]byte) +} -// determineIntegerAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type integerSelectorMethod func(int64, int64, []int64, []int64, int) int + +// The selector accumulator tracks currently-selected item. +type integerSelectorAccumulator struct { + selector integerSelectorMethod + + ts int64 + v int64 + tags [][]byte +} + +func (a *integerSelectorAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *integerSelectorAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *integerSelectorAccumulator) Result() (int64, int64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type integerAggregateMethod func(int64, []int64, int) int64 + +type integerAggregateAccumulator struct { + aggregate integerAggregateMethod + accum int64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *integerAggregateAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *integerAggregateAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *integerAggregateAccumulator) Result() (int64, int64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeIntegerAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (integerAggregateMethod, error) { +func makeIntegerAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (IntegerAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorFirstGroupsInteger}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorLastGroupsInteger}, nil case datatypes.AggregateTypeCount: - return aggregateCountGroupsInteger, nil + return &integerAggregateAccumulator{aggregate: aggregateCountGroupsInteger}, nil case datatypes.AggregateTypeSum: - return aggregateSumGroupsInteger, nil + return &integerAggregateAccumulator{aggregate: aggregateSumGroupsInteger}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorMinGroupsInteger}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorMaxGroupsInteger}, nil default: return nil, &errors.Error{ @@ -1704,76 +1875,67 @@ func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (int } } -func aggregateMinGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v < values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateCountGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - return aggregateSumGroupsInteger(timestamps, values) +func aggregateCountGroupsInteger(accum int64, values []int64, i int) int64 { + return aggregateSumGroupsInteger(accum, values, i) } -func aggregateSumGroupsInteger(_ []int64, values []int64) (int64, int64) { - var sum int64 - for _, v := range values { - sum += v +func aggregateSumGroupsInteger(sum int64, values []int64, i int) int64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *integerGroupTable) advanceCursor() bool { @@ -2528,20 +2690,17 @@ func (t *unsignedGroupTable) advance() bool { return true } - aggregate, err := determineUnsignedAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeUnsignedAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []uint64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -2549,7 +2708,7 @@ func (t *unsignedGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -2558,23 +2717,116 @@ func (t *unsignedGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]uint64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type unsignedAggregateMethod func([]int64, []uint64) (int64, uint64) +type UnsignedAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, uint64, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type unsignedSelectorMethod func(int64, uint64, []int64, []uint64, int) int + +// The selector accumulator tracks currently-selected item. +type unsignedSelectorAccumulator struct { + selector unsignedSelectorMethod + + ts int64 + v uint64 + tags [][]byte +} + +func (a *unsignedSelectorAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *unsignedSelectorAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] -// determineUnsignedAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *unsignedSelectorAccumulator) Result() (int64, uint64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type unsignedAggregateMethod func(uint64, []uint64, int) uint64 + +type unsignedAggregateAccumulator struct { + aggregate unsignedAggregateMethod + accum uint64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *unsignedAggregateAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *unsignedAggregateAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *unsignedAggregateAccumulator) Result() (int64, uint64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeUnsignedAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (unsignedAggregateMethod, error) { +func makeUnsignedAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (UnsignedAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorFirstGroupsUnsigned}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorLastGroupsUnsigned}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -2584,15 +2836,15 @@ func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (un case datatypes.AggregateTypeSum: - return aggregateSumGroupsUnsigned, nil + return &unsignedAggregateAccumulator{aggregate: aggregateSumGroupsUnsigned}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorMinGroupsUnsigned}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorMaxGroupsUnsigned}, nil default: return nil, &errors.Error{ @@ -2602,72 +2854,63 @@ func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (un } } -func aggregateMinGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v < values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateSumGroupsUnsigned(_ []int64, values []uint64) (int64, uint64) { - var sum uint64 - for _, v := range values { - sum += v +func aggregateSumGroupsUnsigned(sum uint64, values []uint64, i int) uint64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *unsignedGroupTable) advanceCursor() bool { @@ -3422,20 +3665,17 @@ func (t *stringGroupTable) advance() bool { return true } - aggregate, err := determineStringAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeStringAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []string{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -3443,7 +3683,7 @@ func (t *stringGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -3452,23 +3692,84 @@ func (t *stringGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]string{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type stringAggregateMethod func([]int64, []string) (int64, string) +type StringAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []string, tags [][]byte) -// determineStringAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []string, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, string, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type stringSelectorMethod func(int64, string, []int64, []string, int) int + +// The selector accumulator tracks currently-selected item. +type stringSelectorAccumulator struct { + selector stringSelectorMethod + + ts int64 + v string + tags [][]byte +} + +func (a *stringSelectorAccumulator) AccumulateFirst(timestamps []int64, values []string, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *stringSelectorAccumulator) AccumulateMore(timestamps []int64, values []string, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *stringSelectorAccumulator) Result() (int64, string, [][]byte) { + return a.ts, a.v, a.tags +} + +// makeStringAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stringAggregateMethod, error) { +func makeStringAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (StringAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsString, nil + return &stringSelectorAccumulator{selector: selectorFirstGroupsString}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsString, nil + return &stringSelectorAccumulator{selector: selectorLastGroupsString}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -3505,36 +3806,30 @@ func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stri } } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateFirstGroupsString(timestamps []int64, values []string) (int64, string) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsString(timestamps []int64, values []string) (int64, string) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *stringGroupTable) advanceCursor() bool { @@ -4289,20 +4584,17 @@ func (t *booleanGroupTable) advance() bool { return true } - aggregate, err := determineBooleanAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeBooleanAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []bool{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -4310,7 +4602,7 @@ func (t *booleanGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -4319,23 +4611,84 @@ func (t *booleanGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]bool{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type booleanAggregateMethod func([]int64, []bool) (int64, bool) +type BooleanAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []bool, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, bool, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type booleanSelectorMethod func(int64, bool, []int64, []bool, int) int -// determineBooleanAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will +// The selector accumulator tracks currently-selected item. +type booleanSelectorAccumulator struct { + selector booleanSelectorMethod + + ts int64 + v bool + tags [][]byte +} + +func (a *booleanSelectorAccumulator) AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *booleanSelectorAccumulator) AccumulateMore(timestamps []int64, values []bool, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *booleanSelectorAccumulator) Result() (int64, bool, [][]byte) { + return a.ts, a.v, a.tags +} + +// makeBooleanAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (booleanAggregateMethod, error) { +func makeBooleanAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (BooleanAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsBoolean, nil + return &booleanSelectorAccumulator{selector: selectorFirstGroupsBoolean}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsBoolean, nil + return &booleanSelectorAccumulator{selector: selectorLastGroupsBoolean}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -4372,36 +4725,30 @@ func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (boo } } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateFirstGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *booleanGroupTable) advanceCursor() bool { diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index 49002c38bca..d7dcd23e64e 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -733,20 +733,17 @@ func (t *{{.name}}GroupTable) advance() bool { return true } - aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type) + aggregate, err := make{{.Name}}AggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []{{.Type}}{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -754,7 +751,7 @@ func (t *{{.name}}GroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -763,26 +760,123 @@ func (t *{{.name}}GroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}}) +type {{.Name}}AggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) -// determine{{.Name}}AggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, {{.Type}}, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type {{.name}}SelectorMethod func(int64, {{.Type}}, []int64, []{{.Type}}, int) (int) + +// The selector accumulator tracks currently-selected item. +type {{.name}}SelectorAccumulator struct { + selector {{.name}}SelectorMethod + + ts int64 + v {{.Type}} + tags [][]byte +} + +func (a *{{.name}}SelectorAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *{{.name}}SelectorAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *{{.name}}SelectorAccumulator) Result() (int64, {{.Type}}, [][]byte) { + return a.ts, a.v, a.tags +} + +{{if and (ne .Name "Boolean") (ne .Name "String")}} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type {{.name}}AggregateMethod func({{.Type}}, []{{.Type}}, int) ({{.Type}}) + +type {{.name}}AggregateAccumulator struct { + aggregate {{.name}}AggregateMethod + accum {{.Type}} + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *{{.name}}AggregateAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *{{.name}}AggregateAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *{{.name}}AggregateAccumulator) Result() (int64, {{.Type}}, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +{{end}} + +// make{{.Name}}AggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){ - switch agg { +func make{{.Name}}AggregateAccumulator(agg datatypes.Aggregate_AggregateType) ({{.Name}}AggregateAccumulator, error){ + switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorFirstGroups{{.Name}}}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorLastGroups{{.Name}}}, nil case datatypes.AggregateTypeCount: {{if eq .Name "Integer"}} - return aggregateCountGroups{{.Name}}, nil + return &{{.name}}AggregateAccumulator{aggregate: aggregateCountGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -791,7 +885,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeSum: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateSumGroups{{.Name}}, nil + return &{{.name}}AggregateAccumulator{aggregate: aggregateSumGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -800,7 +894,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeMin: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateMinGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorMinGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -809,7 +903,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeMax: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateMaxGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorMaxGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -825,82 +919,74 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ } {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v < values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } {{end}} -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. {{if eq .Name "Integer"}} -func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - return aggregateSumGroups{{.Name}}(timestamps, values) +func aggregateCountGroups{{.Name}}(accum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) { + return aggregateSumGroups{{.Name}}(accum, values, i) } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) { - var sum {{.Type}} - for _, v := range values { - sum += v +func aggregateSumGroups{{.Name}}(sum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) { + for ; i< len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } {{end}} -func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *{{.name}}GroupTable) advanceCursor() bool { diff --git a/storage/flux/table.go b/storage/flux/table.go index 7398c29f13d..00eea322d65 100644 --- a/storage/flux/table.go +++ b/storage/flux/table.go @@ -203,7 +203,7 @@ func (t *table) readTags(tags models.Tags) { for _, tag := range tags { j := execute.ColIdx(string(tag.Key), t.cols) // In the case of group aggregate, tags that are not referenced in group() are not included in the result, but - // readTags () still get a complete tag list. Here is just to skip the tags that should not present in the result. + // readTags () still get a complete tag list. Here is just to skip the tags that should not be present in the result. if j < 0 { continue } @@ -211,16 +211,21 @@ func (t *table) readTags(tags models.Tags) { } } -// appendTags fills the colBufs for the tag columns with the tag value. -func (t *table) appendTags(cr *colReader) { +// appendTheseTags fills the colBufs for the tag columns with the given tag values. +func (t *table) appendTheseTags(cr *colReader, tags [][]byte) { for j := range t.cols { - v := t.tags[j] + v := tags[j] if v != nil { cr.cols[j] = t.cache.GetTag(string(v), cr.l, t.alloc) } } } +// appendTags fills the colBufs for the tag columns with the tag values from the table structure. +func (t *table) appendTags(cr *colReader) { + t.appendTheseTags(cr, t.tags) +} + // appendBounds fills the colBufs for the time bounds func (t *table) appendBounds(cr *colReader) { start, stop := t.cache.GetBounds(t.bounds, cr.l, t.alloc) diff --git a/storage/flux/table_test.go b/storage/flux/table_test.go index 38919ab9918..316bbbbfd73 100644 --- a/storage/flux/table_test.go +++ b/storage/flux/table_test.go @@ -2854,7 +2854,6 @@ func TestStorageReader_ReadGroup(t *testing.T) { // values vary among the candidate items for select and the read-group // operation must track and return the correct set of tags. func TestStorageReader_ReadGroupSelectTags(t *testing.T) { - t.Skip("fixme") reader := NewStorageReader(t, func(db, rp string) (datagen.SeriesGenerator, datagen.TimeRange) { spec := Spec(db, rp, MeasurementSpec("m0", @@ -2918,24 +2917,97 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) { } for _, tt := range cases { - mem := &memory.Allocator{} - got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ - ReadFilterSpec: influxdb.ReadFilterSpec{ - Database: reader.Database, - RetentionPolicy: reader.RetentionPolicy, - Bounds: reader.Bounds, + t.Run(tt.aggregate, func(t *testing.T) { + mem := &memory.Allocator{} + got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ + ReadFilterSpec: influxdb.ReadFilterSpec{ + Database: reader.Database, + RetentionPolicy: reader.RetentionPolicy, + Bounds: reader.Bounds, + }, + GroupMode: influxdb.GroupModeBy, + GroupKeys: []string{"t0"}, + AggregateMethod: tt.aggregate, + }, mem) + if err != nil { + t.Fatal(err) + } + + if diff := table.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected results -want/+got:\n%s", diff) + } + }) + } +} + +// TestStorageReader_ReadGroupNoAgg exercises the path where no aggregate is specified +func TestStorageReader_ReadGroupNoAgg(t *testing.T) { + reader := NewStorageReader(t, func(db, rp string) (datagen.SeriesGenerator, datagen.TimeRange) { + spec := Spec(db, rp, + MeasurementSpec("m0", + FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}), + TagValuesSequence("t1", "b-%s", 0, 2), + ), + ) + tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:00:40Z") + return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr + }) + defer reader.Close() + + cases := []struct { + aggregate string + want flux.TableIterator + }{ + { + want: static.TableGroup{ + static.TableMatrix{ + { + static.Table{ + static.StringKey("t1", "b-0"), + static.Strings("_measurement", "m0", "m0", "m0", "m0"), + static.Strings("_field", "f0", "f0", "f0", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:00:40Z"), + static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"), + static.Floats("_value", 1.0, 2.0, 3.0, 4.0), + }, + }, + { + static.Table{ + static.StringKey("t1", "b-1"), + static.Strings("_measurement", "m0", "m0", "m0", "m0"), + static.Strings("_field", "f0", "f0", "f0", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:00:40Z"), + static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"), + static.Floats("_value", 1.0, 2.0, 3.0, 4.0), + }, + }, + }, }, - GroupMode: influxdb.GroupModeBy, - GroupKeys: []string{"t0"}, - AggregateMethod: tt.aggregate, - }, mem) - if err != nil { - t.Fatal(err) - } + }, + } - if diff := table.Diff(tt.want, got); diff != "" { - t.Errorf("unexpected results -want/+got:\n%s", diff) - } + for _, tt := range cases { + t.Run(tt.aggregate, func(t *testing.T) { + mem := &memory.Allocator{} + got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ + ReadFilterSpec: influxdb.ReadFilterSpec{ + Database: reader.Database, + RetentionPolicy: reader.RetentionPolicy, + Bounds: reader.Bounds, + }, + GroupMode: influxdb.GroupModeBy, + GroupKeys: []string{"t1"}, + }, mem) + if err != nil { + t.Fatal(err) + } + + if diff := table.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected results -want/+got:\n%s", diff) + } + }) } } diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index c0d4c04e4af..270ff35bc05 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -40,6 +40,13 @@ func GroupOptionNilSortLo() GroupOption { } } +// IsAscendingGroupAggregate checks if this request is using the `last` aggregate type. +// It returns true if an ascending cursor should be used (all other conditions) +// or a descending cursor (when `last` is used). +func IsAscendingGroupAggregate(req *datatypes.ReadGroupRequest) bool { + return req.Aggregate == nil || req.Aggregate.Type != datatypes.AggregateTypeLast +} + func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { g := &groupResultSet{ ctx: ctx, @@ -54,7 +61,8 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new o(g) } - g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true) + ascending := IsAscendingGroupAggregate(req) + g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, ascending) for i, k := range req.GroupKeys { g.keys[i] = []byte(k) diff --git a/test-flux.sh b/test-flux.sh index e20e6831e8e..979113ff7fb 100755 --- a/test-flux.sh +++ b/test-flux.sh @@ -32,10 +32,8 @@ build_test_harness() { } run_integration_tests() { - log "Running flux integration tests..." - ./fluxtest -v -p flux.zip - log "Running influxdb integration tests..." - ./fluxtest -v -p flux/stdlib + log "Running integration tests..." + ./fluxtest -v -p flux.zip -p flux/stdlib } cleanup() {