From 647f3159848a7c610aad4fe4b796aad52832fd27 Mon Sep 17 00:00:00 2001 From: Faith Chikwekwe Date: Wed, 29 Jul 2020 08:50:34 -0700 Subject: [PATCH] feat(query/stdlib): add min and max to ReadGroup Enables the mix and max aggregates for the ReadGroupAggregte pushdown behind a feature flag. Co-authored-by: Jonathan A. Sternberg --- cmd/influxd/launcher/query_test.go | 75 ++ flags.yml | 6 + kit/feature/list.go | 16 + query/stdlib/influxdata/influxdb/rules.go | 30 + .../stdlib/influxdata/influxdb/rules_test.go | 64 +- storage/flux/reader.go | 7 + storage/flux/table.gen.go | 723 ++++++++++++++---- storage/flux/table.gen.go.tmpl | 184 ++++- storage/flux/table.go | 8 +- storage/flux/table_test.go | 109 +++ 10 files changed, 1015 insertions(+), 207 deletions(-) diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index 809bcb474f0..47157d3a96f 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -2235,6 +2235,80 @@ from(bucket: v.bucket) ,result,table,kk,_value ,,0,kk0,32 ,,1,kk1,35 +`, + }, + { + name: "min group", + data: []string{ + "m0,k=k0,kk=kk0 f=0i 0", + "m0,k=k0,kk=kk1 f=1i 1000000000", + "m0,k=k0,kk=kk0 f=2i 2000000000", + "m0,k=k0,kk=kk1 f=3i 3000000000", + "m0,k=k0,kk=kk0 f=4i 4000000000", + "m0,k=k0,kk=kk1 f=5i 5000000000", + "m0,k=k0,kk=kk0 f=6i 6000000000", + "m0,k=k0,kk=kk1 f=5i 7000000000", + "m0,k=k0,kk=kk0 f=0i 8000000000", + "m0,k=k0,kk=kk1 f=6i 9000000000", + "m0,k=k0,kk=kk0 f=6i 10000000000", + "m0,k=k0,kk=kk1 f=7i 11000000000", + "m0,k=k0,kk=kk0 f=5i 12000000000", + "m0,k=k0,kk=kk1 f=8i 13000000000", + "m0,k=k0,kk=kk0 f=9i 14000000000", + "m0,k=k0,kk=kk1 f=5i 15000000000", + }, + op: "readGroup(min)", + query: ` +from(bucket: v.bucket) + |> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z) + |> group(columns: ["kk"]) + |> min() + |> keep(columns: ["kk", "_value"]) +`, + want: ` +#datatype,string,long,string,long +#group,false,false,true,false +#default,_result,,, +,result,table,kk,_value +,,0,kk0,0 +,,1,kk1,1 +`, + }, + { + name: "max group", + data: []string{ + "m0,k=k0,kk=kk0 f=0i 0", + "m0,k=k0,kk=kk1 f=1i 1000000000", + "m0,k=k0,kk=kk0 f=2i 2000000000", + "m0,k=k0,kk=kk1 f=3i 3000000000", + "m0,k=k0,kk=kk0 f=4i 4000000000", + "m0,k=k0,kk=kk1 f=5i 5000000000", + "m0,k=k0,kk=kk0 f=6i 6000000000", + "m0,k=k0,kk=kk1 f=5i 7000000000", + "m0,k=k0,kk=kk0 f=0i 8000000000", + "m0,k=k0,kk=kk1 f=6i 9000000000", + "m0,k=k0,kk=kk0 f=6i 10000000000", + "m0,k=k0,kk=kk1 f=7i 11000000000", + "m0,k=k0,kk=kk0 f=5i 12000000000", + "m0,k=k0,kk=kk1 f=8i 13000000000", + "m0,k=k0,kk=kk0 f=9i 14000000000", + "m0,k=k0,kk=kk1 f=5i 15000000000", + }, + op: "readGroup(max)", + query: ` +from(bucket: v.bucket) + |> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z) + |> group(columns: ["kk"]) + |> max() + |> keep(columns: ["kk", "_value"]) +`, + want: ` +#datatype,string,long,string,long +#group,false,false,true,false +#default,_result,,, +,result,table,kk,_value +,,0,kk0,9 +,,1,kk1,8 `, }, } @@ -2247,6 +2321,7 @@ from(bucket: v.bucket) feature.PushDownWindowAggregateMean(): true, feature.PushDownWindowAggregateMin(): true, feature.PushDownWindowAggregateMax(): true, + feature.PushDownGroupAggregateMinMax(): true, })) l.SetupOrFail(t) diff --git a/flags.yml b/flags.yml index 11cc8b3b5da..725d9d2bc78 100644 --- a/flags.yml +++ b/flags.yml @@ -151,3 +151,9 @@ contact: Monitoring Team lifetime: temporary expose: true + +- name: Push Down Group Aggregate Min Max + description: Enable the min and max variants of the PushDownGroupAggregate planner rule + key: pushDownGroupAggregateMinMax + default: false + contact: Query Team diff --git a/kit/feature/list.go b/kit/feature/list.go index 67c84a92160..66922fb35af 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -282,6 +282,20 @@ func Notebooks() BoolFlag { return notebooks } +var pushDownGroupAggregateMinMax = MakeBoolFlag( + "Push Down Group Aggregate Min Max", + "pushDownGroupAggregateMinMax", + "Query Team", + false, + Temporary, + false, +) + +// PushDownGroupAggregateMinMax - Enable the min and max variants of the PushDownGroupAggregate planner rule +func PushDownGroupAggregateMinMax() BoolFlag { + return pushDownGroupAggregateMinMax +} + var all = []Flag{ appMetrics, backendExample, @@ -303,6 +317,7 @@ var all = []Flag{ useUserPermission, mergeFiltersRule, notebooks, + pushDownGroupAggregateMinMax, } var byKey = map[string]Flag{ @@ -326,4 +341,5 @@ var byKey = map[string]Flag{ "useUserPermission": useUserPermission, "mergeFiltersRule": mergeFiltersRule, "notebooks": notebooks, + "pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax, } diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go index 62ca204fb0d..ac2f2308663 100644 --- a/query/stdlib/influxdata/influxdb/rules.go +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -1023,6 +1023,8 @@ func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern { universe.SumKind, universe.FirstKind, universe.LastKind, + universe.MinKind, + universe.MaxKind, }, plan.Pat(ReadGroupPhysKind)) } @@ -1075,6 +1077,28 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl AggregateMethod: universe.LastKind, }) return node, true, nil + case universe.MinKind: + // ReadGroup() -> min => ReadGroup(min) + if feature.PushDownGroupAggregateMinMax().Enabled(ctx) { + node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.MinKind, + }) + return node, true, nil + } + case universe.MaxKind: + // ReadGroup() -> max => ReadGroup(max) + if feature.PushDownGroupAggregateMinMax().Enabled(ctx) { + node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{ + ReadRangePhysSpec: group.ReadRangePhysSpec, + GroupMode: group.GroupMode, + GroupKeys: group.GroupKeys, + AggregateMethod: universe.MaxKind, + }) + return node, true, nil + } } return pn, false, nil } @@ -1102,6 +1126,12 @@ func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool { case universe.LastKind: agg := pn.ProcedureSpec().(*universe.LastProcedureSpec) return caps.HaveLast() && agg.Column == execute.DefaultValueColLabel + case universe.MaxKind: + agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec) + return caps.HaveMax() && agg.Column == execute.DefaultValueColLabel + case universe.MinKind: + agg := pn.ProcedureSpec().(*universe.MinProcedureSpec) + return caps.HaveMin() && agg.Column == execute.DefaultValueColLabel } return false } diff --git a/query/stdlib/influxdata/influxdb/rules_test.go b/query/stdlib/influxdata/influxdb/rules_test.go index 39c7738ac4b..38553daaeeb 100644 --- a/query/stdlib/influxdata/influxdb/rules_test.go +++ b/query/stdlib/influxdata/influxdb/rules_test.go @@ -2672,7 +2672,9 @@ func TestPushDownBareAggregateRule(t *testing.T) { // func TestPushDownGroupAggregateRule(t *testing.T) { // Turn on all flags - ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{})) + ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{ + feature.PushDownGroupAggregateMinMax(): true, + })) caps := func(c query.GroupCapability) context.Context { deps := influxdb.StorageDependencies{ @@ -2726,6 +2728,20 @@ func TestPushDownGroupAggregateRule(t *testing.T) { }, } } + minProcedureSpecVal := func() *universe.MinProcedureSpec { + return &universe.MinProcedureSpec{ + SelectorConfig: execute.SelectorConfig{ + Column: execute.DefaultValueColLabel, + }, + } + } + maxProcedureSpecVal := func() *universe.MaxProcedureSpec { + return &universe.MaxProcedureSpec{ + SelectorConfig: execute.SelectorConfig{ + Column: execute.DefaultValueColLabel, + }, + } + } countProcedureSpec := func() *universe.CountProcedureSpec { return &universe.CountProcedureSpec{ AggregateConfig: execute.DefaultAggregateConfig, @@ -2829,12 +2845,56 @@ func TestPushDownGroupAggregateRule(t *testing.T) { // ReadGroup() -> last => ReadGroup() -> last tests = append(tests, plantest.RuleTestCase{ Context: caps(mockGroupCapability{}), - Name: "RewriteGroupLast", + Name: "NoLastCapability", Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, Before: simplePlanWithAgg("last", lastProcedureSpec()), NoChange: true, }) + // ReadGroup() -> max => ReadGroup(max) + tests = append(tests, plantest.RuleTestCase{ + Context: caps(mockGroupCapability{max: true}), + Name: "RewriteGroupMax", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("max", maxProcedureSpecVal()), + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("max")), + }, + }, + }) + + // ReadGroup() -> max => ReadGroup() -> max + tests = append(tests, plantest.RuleTestCase{ + Context: caps(mockGroupCapability{}), + Name: "NoMaxCapability", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("max", maxProcedureSpecVal()), + NoChange: true, + }) + + // ReadGroup() -> min => ReadGroup(min) + tests = append(tests, plantest.RuleTestCase{ + Context: caps(mockGroupCapability{min: true}), + Name: "RewriteGroupMin", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("min", minProcedureSpecVal()), + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("min")), + }, + }, + }) + + // ReadGroup() -> min => ReadGroup() -> min + tests = append(tests, plantest.RuleTestCase{ + Context: caps(mockGroupCapability{}), + Name: "NoMinCapability", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("min", minProcedureSpecVal()), + NoChange: true, + }) + // Rewrite with successors // ReadGroup() -> count -> sum {2} => ReadGroup(count) -> sum {2} tests = append(tests, plantest.RuleTestCase{ diff --git a/storage/flux/reader.go b/storage/flux/reader.go index 9b10519d2fd..13a94cdf5a0 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/errors" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/query" @@ -273,6 +274,12 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error { req.Range.Start = int64(gi.spec.Bounds.Start) req.Range.End = int64(gi.spec.Bounds.Stop) + if len(gi.spec.GroupKeys) > 0 && gi.spec.GroupMode == query.GroupModeNone { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "cannot have group mode none with group key values", + } + } req.Group = convertGroupMode(gi.spec.GroupMode) req.GroupKeys = gi.spec.GroupKeys diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 6f9046afc5a..bd0b0b9449b 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -7,6 +7,7 @@ package storageflux import ( + "fmt" "math" "sync" @@ -746,49 +747,29 @@ func (t *floatGroupTable) advance() bool { return true } - // handle the group with aggregate case - var value float64 - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determineFloatAggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []float64{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - panic("unsupported for aggregate count: Float") - case datatypes.AggregateTypeSum: - value += arr.Values[i] - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -801,6 +782,113 @@ func (t *floatGroupTable) advance() bool { return true } +type floatAggregateMethod func([]int64, []float64) (int64, float64) + +// determineFloatAggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (floatAggregateMethod, error) { + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroupsFloat, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroupsFloat, nil + case datatypes.AggregateTypeCount: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate count: Float", + } + + case datatypes.AggregateTypeSum: + + return aggregateSumGroupsFloat, nil + + case datatypes.AggregateTypeMin: + + return aggregateMinGroupsFloat, nil + + case datatypes.AggregateTypeMax: + + return aggregateMaxGroupsFloat, nil + + default: + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +func aggregateMinGroupsFloat(timestamps []int64, values []float64) (int64, float64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value > values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateMaxGroupsFloat(timestamps []int64, values []float64) (int64, float64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value < values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. + +func aggregateSumGroupsFloat(_ []int64, values []float64) (int64, float64) { + var sum float64 + for _, v := range values { + sum += v + } + return math.MaxInt64, sum +} + +func aggregateFirstGroupsFloat(timestamps []int64, values []float64) (int64, float64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroupsFloat(timestamps []int64, values []float64) (int64, float64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *floatGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil @@ -1567,49 +1655,29 @@ func (t *integerGroupTable) advance() bool { return true } - // handle the group with aggregate case - var value int64 - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determineIntegerAggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []int64{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - fallthrough - case datatypes.AggregateTypeSum: - value += arr.Values[i] - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -1622,6 +1690,114 @@ func (t *integerGroupTable) advance() bool { return true } +type integerAggregateMethod func([]int64, []int64) (int64, int64) + +// determineIntegerAggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (integerAggregateMethod, error) { + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroupsInteger, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroupsInteger, nil + case datatypes.AggregateTypeCount: + + return aggregateCountGroupsInteger, nil + + case datatypes.AggregateTypeSum: + + return aggregateSumGroupsInteger, nil + + case datatypes.AggregateTypeMin: + + return aggregateMinGroupsInteger, nil + + case datatypes.AggregateTypeMax: + + return aggregateMaxGroupsInteger, nil + + default: + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +func aggregateMinGroupsInteger(timestamps []int64, values []int64) (int64, int64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value > values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateMaxGroupsInteger(timestamps []int64, values []int64) (int64, int64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value < values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. + +func aggregateCountGroupsInteger(timestamps []int64, values []int64) (int64, int64) { + return aggregateSumGroupsInteger(timestamps, values) +} + +func aggregateSumGroupsInteger(_ []int64, values []int64) (int64, int64) { + var sum int64 + for _, v := range values { + sum += v + } + return math.MaxInt64, sum +} + +func aggregateFirstGroupsInteger(timestamps []int64, values []int64) (int64, int64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroupsInteger(timestamps []int64, values []int64) (int64, int64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *integerGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil @@ -2386,49 +2562,29 @@ func (t *unsignedGroupTable) advance() bool { return true } - // handle the group with aggregate case - var value uint64 - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determineUnsignedAggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []uint64{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - panic("unsupported for aggregate count: Unsigned") - case datatypes.AggregateTypeSum: - value += arr.Values[i] - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -2441,6 +2597,113 @@ func (t *unsignedGroupTable) advance() bool { return true } +type unsignedAggregateMethod func([]int64, []uint64) (int64, uint64) + +// determineUnsignedAggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (unsignedAggregateMethod, error) { + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroupsUnsigned, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroupsUnsigned, nil + case datatypes.AggregateTypeCount: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate count: Unsigned", + } + + case datatypes.AggregateTypeSum: + + return aggregateSumGroupsUnsigned, nil + + case datatypes.AggregateTypeMin: + + return aggregateMinGroupsUnsigned, nil + + case datatypes.AggregateTypeMax: + + return aggregateMaxGroupsUnsigned, nil + + default: + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +func aggregateMinGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value > values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateMaxGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value < values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. + +func aggregateSumGroupsUnsigned(_ []int64, values []uint64) (int64, uint64) { + var sum uint64 + for _, v := range values { + sum += v + } + return math.MaxInt64, sum +} + +func aggregateFirstGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *unsignedGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil @@ -3205,49 +3468,29 @@ func (t *stringGroupTable) advance() bool { return true } - // handle the group with aggregate case - var value string - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determineStringAggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []string{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - panic("unsupported for aggregate count: String") - case datatypes.AggregateTypeSum: - panic("unsupported for aggregate sum: String") - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -3260,6 +3503,86 @@ func (t *stringGroupTable) advance() bool { return true } +type stringAggregateMethod func([]int64, []string) (int64, string) + +// determineStringAggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stringAggregateMethod, error) { + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroupsString, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroupsString, nil + case datatypes.AggregateTypeCount: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate count: String", + } + + case datatypes.AggregateTypeSum: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate sum: String", + } + + case datatypes.AggregateTypeMin: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate min: String", + } + + case datatypes.AggregateTypeMax: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate max: String", + } + + default: + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. + +func aggregateFirstGroupsString(timestamps []int64, values []string) (int64, string) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroupsString(timestamps []int64, values []string) (int64, string) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *stringGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil @@ -4024,49 +4347,29 @@ func (t *booleanGroupTable) advance() bool { return true } - // handle the group with aggregate case - var value bool - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determineBooleanAggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []bool{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - panic("unsupported for aggregate count: Boolean") - case datatypes.AggregateTypeSum: - panic("unsupported for aggregate sum: Boolean") - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -4079,6 +4382,86 @@ func (t *booleanGroupTable) advance() bool { return true } +type booleanAggregateMethod func([]int64, []bool) (int64, bool) + +// determineBooleanAggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (booleanAggregateMethod, error) { + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroupsBoolean, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroupsBoolean, nil + case datatypes.AggregateTypeCount: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate count: Boolean", + } + + case datatypes.AggregateTypeSum: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate sum: Boolean", + } + + case datatypes.AggregateTypeMin: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate min: Boolean", + } + + case datatypes.AggregateTypeMax: + + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate max: Boolean", + } + + default: + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. + +func aggregateFirstGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *booleanGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index 18c13bca1cb..5df88708086 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -1,6 +1,7 @@ package storageflux import ( + "fmt" "math" "sync" @@ -742,49 +743,29 @@ func (t *{{.name}}GroupTable) advance() bool { return true } - // handle the group with aggregate case - var value {{.Type}} - // For group count, sum, min, and max, the timestamp here is always math.MaxInt64. - // their final result does not contain _time, so this timestamp value can be anything - // and it won't matter. - // For group first, we need to assign the initial value to math.MaxInt64 so - // we can find the row with the smallest timestamp. - // Do not worry about data with math.MaxInt64 as its real timestamp. - // In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp - // cannot make it through. - var timestamp int64 = math.MaxInt64 - if t.gc.Aggregate().Type == datatypes.AggregateTypeLast { - timestamp = math.MinInt64 + aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type) + if err != nil { + t.err = err + return false } + + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps, values := []int64{ts}, []{{.Type}}{v} for { - // note that for the group aggregate case, len here should always be 1 - for i := 0; i < len; i++ { - switch t.gc.Aggregate().Type { - case datatypes.AggregateTypeCount: - {{if eq .Name "Integer"}}fallthrough{{else}}panic("unsupported for aggregate count: {{.Name}}"){{end}} - case datatypes.AggregateTypeSum: - {{if or (eq .Name "String") (eq .Name "Boolean")}}panic("unsupported for aggregate sum: {{.Name}}"){{else}}value += arr.Values[i]{{end}} - case datatypes.AggregateTypeFirst: - if arr.Timestamps[i] < timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - case datatypes.AggregateTypeLast: - if arr.Timestamps[i] > timestamp { - timestamp = arr.Timestamps[i] - value = arr.Values[i] - } - } - } arr = t.cur.Next() - len = arr.Len() - if len > 0 { + if arr.Len() > 0 { + ts, v := aggregate(arr.Timestamps, arr.Values) + timestamps = append(timestamps, ts) + values = append(values, v) continue } + if !t.advanceCursor() { break } } + timestamp, value := aggregate(timestamps, values) + colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) @@ -797,6 +778,141 @@ func (t *{{.name}}GroupTable) advance() bool { return true } +type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}}) + +// determine{{.Name}}AggregateMethod returns the method for aggregating +// returned points within the same group. The incoming points are the +// ones returned for each series and the method returned here will +// aggregate the aggregates. +func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){ + switch agg { + case datatypes.AggregateTypeFirst: + return aggregateFirstGroups{{.Name}}, nil + case datatypes.AggregateTypeLast: + return aggregateLastGroups{{.Name}}, nil + case datatypes.AggregateTypeCount: + {{if eq .Name "Integer"}} + return aggregateCountGroups{{.Name}}, nil + {{else}} + return nil, &influxdb.Error { + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate count: {{.Name}}", + } + {{end}} + case datatypes.AggregateTypeSum: + {{if and (ne .Name "Boolean") (ne .Name "String")}} + return aggregateSumGroups{{.Name}}, nil + {{else}} + return nil, &influxdb.Error { + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate sum: {{.Name}}", + } + {{end}} + case datatypes.AggregateTypeMin: + {{if and (ne .Name "Boolean") (ne .Name "String")}} + return aggregateMinGroups{{.Name}}, nil + {{else}} + return nil, &influxdb.Error { + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate min: {{.Name}}", + } + {{end}} + case datatypes.AggregateTypeMax: + {{if and (ne .Name "Boolean") (ne .Name "String")}} + return aggregateMaxGroups{{.Name}}, nil + {{else}} + return nil, &influxdb.Error { + Code: influxdb.EInvalid, + Msg: "unsupported for aggregate max: {{.Name}}", + } + {{end}} + default: + return nil, &influxdb.Error { + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), + } + } +} + +{{if and (ne .Name "Boolean") (ne .Name "String")}} +func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value > values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} +{{end}} + +{{if and (ne .Name "Boolean") (ne .Name "String")}} +func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if value < values[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} +{{end}} + +// For group count and sum, the timestamp here is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be anything +// and it won't matter. +{{if eq .Name "Integer"}} +func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { + return aggregateSumGroups{{.Name}}(timestamps, values) +} +{{end}} + +{{if and (ne .Name "Boolean") (ne .Name "String")}} +func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) { + var sum {{.Type}} + for _, v := range values { + sum += v + } + return math.MaxInt64, sum +} +{{end}} + +func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp > timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + +func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { + value := values[0] + timestamp := timestamps[0] + + for i := 1; i < len(values); i++ { + if timestamp < timestamps[i] { + value = values[i] + timestamp = timestamps[i] + } + } + + return timestamp, value +} + func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil diff --git a/storage/flux/table.go b/storage/flux/table.go index ae921d91cc4..51738024474 100644 --- a/storage/flux/table.go +++ b/storage/flux/table.go @@ -71,7 +71,7 @@ func (t *table) isCancelled() bool { } func (t *table) init(advance func() bool) { - t.empty = !advance() + t.empty = !advance() && t.err == nil } func (t *table) do(f func(flux.ColReader) error, advance func() bool) error { @@ -82,6 +82,12 @@ func (t *table) do(f func(flux.ColReader) error, advance func() bool) error { } defer t.closeDone() + // If an error occurred during initialization, that is + // returned here. + if t.err != nil { + return t.err + } + if !t.Empty() { t.err = f(t.colBufs) t.colBufs.Release() diff --git a/storage/flux/table_test.go b/storage/flux/table_test.go index dc2423a617c..59a1776b4e3 100644 --- a/storage/flux/table_test.go +++ b/storage/flux/table_test.go @@ -2564,6 +2564,115 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) { } } +func TestStorageReader_ReadGroup(t *testing.T) { + reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) { + spec := Spec(org, bucket, + MeasurementSpec("m0", + FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}), + TagValuesSequence("t0", "a-%s", 0, 3), + ), + ) + tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z") + return gen.NewSeriesGeneratorFromSpec(spec, tr), tr + }) + defer reader.Close() + + for _, tt := range []struct { + aggregate string + want flux.TableIterator + }{ + { + aggregate: storageflux.CountKind, + want: static.TableGroup{ + static.StringKey("_measurement", "m0"), + static.StringKey("_field", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:02:00Z"), + static.TableMatrix{ + static.StringKeys("t0", "a-0", "a-1", "a-2"), + { + static.Table{ + static.Ints("_value", 12), + }, + }, + }, + }, + }, + { + aggregate: storageflux.SumKind, + want: static.TableGroup{ + static.StringKey("_measurement", "m0"), + static.StringKey("_field", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:02:00Z"), + static.TableMatrix{ + static.StringKeys("t0", "a-0", "a-1", "a-2"), + { + static.Table{ + static.Floats("_value", 30), + }, + }, + }, + }, + }, + { + aggregate: storageflux.MinKind, + want: static.TableGroup{ + static.StringKey("_measurement", "m0"), + static.StringKey("_field", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:02:00Z"), + static.TableMatrix{ + static.StringKeys("t0", "a-0", "a-1", "a-2"), + { + static.Table{ + static.Times("_time", "2019-11-25T00:00:00Z"), + static.Floats("_value", 1), + }, + }, + }, + }, + }, + { + aggregate: storageflux.MaxKind, + want: static.TableGroup{ + static.StringKey("_measurement", "m0"), + static.StringKey("_field", "f0"), + static.TimeKey("_start", "2019-11-25T00:00:00Z"), + static.TimeKey("_stop", "2019-11-25T00:02:00Z"), + static.TableMatrix{ + static.StringKeys("t0", "a-0", "a-1", "a-2"), + { + static.Table{ + static.Times("_time", "2019-11-25T00:00:30Z"), + static.Floats("_value", 4), + }, + }, + }, + }, + }, + } { + mem := &memory.Allocator{} + got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{ + ReadFilterSpec: query.ReadFilterSpec{ + OrganizationID: reader.Org, + BucketID: reader.Bucket, + Bounds: reader.Bounds, + }, + GroupMode: query.GroupModeBy, + GroupKeys: []string{"_measurement", "_field", "t0"}, + AggregateMethod: tt.aggregate, + }, mem) + if err != nil { + t.Fatal(err) + } + + if diff := table.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected results -want/+got:\n%s", diff) + } + } +} + func BenchmarkReadFilter(b *testing.B) { setupFn := func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) { tagsSpec := &gen.TagsSpec{