Skip to content

Commit cfde09c

Browse files
author
Faith Chikwekwe
committed
feat(query/stdlib): add min and max to ReadGroup
Enables the mix and max aggregates for the ReadGroupAggregte pushdown behind a feature flag.
1 parent bc4c19d commit cfde09c

File tree

9 files changed

+1022
-254
lines changed

9 files changed

+1022
-254
lines changed

cmd/influxd/launcher/query_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -2235,6 +2235,80 @@ from(bucket: v.bucket)
22352235
,result,table,kk,_value
22362236
,,0,kk0,32
22372237
,,1,kk1,35
2238+
`,
2239+
},
2240+
{
2241+
name: "min group",
2242+
data: []string{
2243+
"m0,k=k0,kk=kk0 f=0i 0",
2244+
"m0,k=k0,kk=kk1 f=1i 1000000000",
2245+
"m0,k=k0,kk=kk0 f=2i 2000000000",
2246+
"m0,k=k0,kk=kk1 f=3i 3000000000",
2247+
"m0,k=k0,kk=kk0 f=4i 4000000000",
2248+
"m0,k=k0,kk=kk1 f=5i 5000000000",
2249+
"m0,k=k0,kk=kk0 f=6i 6000000000",
2250+
"m0,k=k0,kk=kk1 f=5i 7000000000",
2251+
"m0,k=k0,kk=kk0 f=0i 8000000000",
2252+
"m0,k=k0,kk=kk1 f=6i 9000000000",
2253+
"m0,k=k0,kk=kk0 f=6i 10000000000",
2254+
"m0,k=k0,kk=kk1 f=7i 11000000000",
2255+
"m0,k=k0,kk=kk0 f=5i 12000000000",
2256+
"m0,k=k0,kk=kk1 f=8i 13000000000",
2257+
"m0,k=k0,kk=kk0 f=9i 14000000000",
2258+
"m0,k=k0,kk=kk1 f=5i 15000000000",
2259+
},
2260+
op: "readGroup(min)",
2261+
query: `
2262+
from(bucket: v.bucket)
2263+
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
2264+
|> group(columns: ["kk"])
2265+
|> min()
2266+
|> keep(columns: ["kk", "_value"])
2267+
`,
2268+
want: `
2269+
#datatype,string,long,string,long
2270+
#group,false,false,true,false
2271+
#default,_result,,,
2272+
,result,table,kk,_value
2273+
,,0,kk0,0
2274+
,,1,kk1,1
2275+
`,
2276+
},
2277+
{
2278+
name: "max group",
2279+
data: []string{
2280+
"m0,k=k0,kk=kk0 f=0i 0",
2281+
"m0,k=k0,kk=kk1 f=1i 1000000000",
2282+
"m0,k=k0,kk=kk0 f=2i 2000000000",
2283+
"m0,k=k0,kk=kk1 f=3i 3000000000",
2284+
"m0,k=k0,kk=kk0 f=4i 4000000000",
2285+
"m0,k=k0,kk=kk1 f=5i 5000000000",
2286+
"m0,k=k0,kk=kk0 f=6i 6000000000",
2287+
"m0,k=k0,kk=kk1 f=5i 7000000000",
2288+
"m0,k=k0,kk=kk0 f=0i 8000000000",
2289+
"m0,k=k0,kk=kk1 f=6i 9000000000",
2290+
"m0,k=k0,kk=kk0 f=6i 10000000000",
2291+
"m0,k=k0,kk=kk1 f=7i 11000000000",
2292+
"m0,k=k0,kk=kk0 f=5i 12000000000",
2293+
"m0,k=k0,kk=kk1 f=8i 13000000000",
2294+
"m0,k=k0,kk=kk0 f=9i 14000000000",
2295+
"m0,k=k0,kk=kk1 f=5i 15000000000",
2296+
},
2297+
op: "readGroup(max)",
2298+
query: `
2299+
from(bucket: v.bucket)
2300+
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
2301+
|> group(columns: ["kk"])
2302+
|> max()
2303+
|> keep(columns: ["kk", "_value"])
2304+
`,
2305+
want: `
2306+
#datatype,string,long,string,long
2307+
#group,false,false,true,false
2308+
#default,_result,,,
2309+
,result,table,kk,_value
2310+
,,0,kk0,9
2311+
,,1,kk1,8
22382312
`,
22392313
},
22402314
}
@@ -2247,6 +2321,7 @@ from(bucket: v.bucket)
22472321
feature.PushDownWindowAggregateMean(): true,
22482322
feature.PushDownWindowAggregateMin(): true,
22492323
feature.PushDownWindowAggregateMax(): true,
2324+
feature.PushDownGroupAggregateMinMax(): true,
22502325
}))
22512326

22522327
l.SetupOrFail(t)

flags.yml

+6
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,9 @@
143143
contact: Monitoring Team
144144
lifetime: temporary
145145
expose: true
146+
147+
- name: Push Down Group Aggregate Min Max
148+
description: Enable the min and max variants of the PushDownGroupAggregate planner rule
149+
key: pushDownGroupAggregateMinMax
150+
default: false
151+
contact: Query Team

kit/feature/list.go

+16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

query/stdlib/influxdata/influxdb/rules.go

+48
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,8 @@ func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern {
10231023
universe.SumKind,
10241024
universe.FirstKind,
10251025
universe.LastKind,
1026+
universe.MinKind,
1027+
universe.MaxKind,
10261028
},
10271029
plan.Pat(ReadGroupPhysKind))
10281030
}
@@ -1075,6 +1077,46 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
10751077
AggregateMethod: universe.LastKind,
10761078
})
10771079
return node, true, nil
1080+
case universe.MinKind:
1081+
// ReadGroup() -> min => ReadGroup(min) -> min
1082+
if feature.PushDownGroupAggregateMinMax().Enabled(ctx) {
1083+
root := plan.CreatePhysicalNode("min", &universe.MinProcedureSpec{
1084+
SelectorConfig: execute.SelectorConfig{
1085+
Column: execute.DefaultValueColLabel,
1086+
},
1087+
})
1088+
1089+
leaf := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
1090+
ReadRangePhysSpec: group.ReadRangePhysSpec,
1091+
GroupMode: group.GroupMode,
1092+
GroupKeys: group.GroupKeys,
1093+
AggregateMethod: universe.MinKind,
1094+
})
1095+
1096+
root.AddPredecessors(leaf)
1097+
leaf.AddSuccessors(root)
1098+
return root, true, nil
1099+
}
1100+
case universe.MaxKind:
1101+
// ReadGroup() -> max => ReadGroup(max) -> max
1102+
if feature.PushDownGroupAggregateMinMax().Enabled(ctx) {
1103+
root := plan.CreatePhysicalNode("max", &universe.MaxProcedureSpec{
1104+
SelectorConfig: execute.SelectorConfig{
1105+
Column: execute.DefaultValueColLabel,
1106+
},
1107+
})
1108+
1109+
leaf := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
1110+
ReadRangePhysSpec: group.ReadRangePhysSpec,
1111+
GroupMode: group.GroupMode,
1112+
GroupKeys: group.GroupKeys,
1113+
AggregateMethod: universe.MaxKind,
1114+
})
1115+
1116+
root.AddPredecessors(leaf)
1117+
leaf.AddSuccessors(root)
1118+
return root, true, nil
1119+
}
10781120
}
10791121
return pn, false, nil
10801122
}
@@ -1102,6 +1144,12 @@ func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
11021144
case universe.LastKind:
11031145
agg := pn.ProcedureSpec().(*universe.LastProcedureSpec)
11041146
return caps.HaveLast() && agg.Column == execute.DefaultValueColLabel
1147+
case universe.MaxKind:
1148+
agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec)
1149+
return caps.HaveMax() && agg.Column == execute.DefaultValueColLabel
1150+
case universe.MinKind:
1151+
agg := pn.ProcedureSpec().(*universe.MinProcedureSpec)
1152+
return caps.HaveMin() && agg.Column == execute.DefaultValueColLabel
11051153
}
11061154
return false
11071155
}

query/stdlib/influxdata/influxdb/rules_test.go

+70-2
Original file line numberDiff line numberDiff line change
@@ -2672,7 +2672,9 @@ func TestPushDownBareAggregateRule(t *testing.T) {
26722672
//
26732673
func TestPushDownGroupAggregateRule(t *testing.T) {
26742674
// Turn on all flags
2675-
ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{}))
2675+
ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{
2676+
feature.PushDownGroupAggregateMinMax(): true,
2677+
}))
26762678

26772679
caps := func(c query.GroupCapability) context.Context {
26782680
deps := influxdb.StorageDependencies{
@@ -2726,6 +2728,20 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
27262728
},
27272729
}
27282730
}
2731+
minProcedureSpecVal := func() *universe.MinProcedureSpec {
2732+
return &universe.MinProcedureSpec{
2733+
SelectorConfig: execute.SelectorConfig{
2734+
Column: execute.DefaultValueColLabel,
2735+
},
2736+
}
2737+
}
2738+
maxProcedureSpecVal := func() *universe.MaxProcedureSpec {
2739+
return &universe.MaxProcedureSpec{
2740+
SelectorConfig: execute.SelectorConfig{
2741+
Column: execute.DefaultValueColLabel,
2742+
},
2743+
}
2744+
}
27292745
countProcedureSpec := func() *universe.CountProcedureSpec {
27302746
return &universe.CountProcedureSpec{
27312747
AggregateConfig: execute.DefaultAggregateConfig,
@@ -2829,12 +2845,64 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
28292845
// ReadGroup() -> last => ReadGroup() -> last
28302846
tests = append(tests, plantest.RuleTestCase{
28312847
Context: caps(mockGroupCapability{}),
2832-
Name: "RewriteGroupLast",
2848+
Name: "NoLastCapability",
28332849
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
28342850
Before: simplePlanWithAgg("last", lastProcedureSpec()),
28352851
NoChange: true,
28362852
})
28372853

2854+
// ReadGroup() -> max => ReadGroup(max) -> max
2855+
tests = append(tests, plantest.RuleTestCase{
2856+
Context: caps(mockGroupCapability{max: true}),
2857+
Name: "RewriteGroupMax",
2858+
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
2859+
Before: simplePlanWithAgg("max", maxProcedureSpecVal()),
2860+
After: &plantest.PlanSpec{
2861+
Nodes: []plan.Node{
2862+
plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("max")),
2863+
plan.CreateLogicalNode("max", maxProcedureSpecVal()),
2864+
},
2865+
Edges: [][2]int{
2866+
{0, 1},
2867+
},
2868+
},
2869+
})
2870+
2871+
// ReadGroup() -> max => ReadGroup() -> max
2872+
tests = append(tests, plantest.RuleTestCase{
2873+
Context: caps(mockGroupCapability{}),
2874+
Name: "NoMaxCapability",
2875+
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
2876+
Before: simplePlanWithAgg("max", maxProcedureSpecVal()),
2877+
NoChange: true,
2878+
})
2879+
2880+
// ReadGroup() -> min => ReadGroup(min) -> min
2881+
tests = append(tests, plantest.RuleTestCase{
2882+
Context: caps(mockGroupCapability{min: true}),
2883+
Name: "RewriteGroupMin",
2884+
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
2885+
Before: simplePlanWithAgg("min", minProcedureSpecVal()),
2886+
After: &plantest.PlanSpec{
2887+
Nodes: []plan.Node{
2888+
plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("min")),
2889+
plan.CreateLogicalNode("min", minProcedureSpecVal()),
2890+
},
2891+
Edges: [][2]int{
2892+
{0, 1},
2893+
},
2894+
},
2895+
})
2896+
2897+
// ReadGroup() -> min => ReadGroup() -> min
2898+
tests = append(tests, plantest.RuleTestCase{
2899+
Context: caps(mockGroupCapability{}),
2900+
Name: "NoMinCapability",
2901+
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
2902+
Before: simplePlanWithAgg("min", minProcedureSpecVal()),
2903+
NoChange: true,
2904+
})
2905+
28382906
// Rewrite with successors
28392907
// ReadGroup() -> count -> sum {2} => ReadGroup(count) -> sum {2}
28402908
tests = append(tests, plantest.RuleTestCase{

storage/flux/reader.go

+7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/influxdata/flux/memory"
1212
"github.com/influxdata/flux/plan"
1313
"github.com/influxdata/flux/values"
14+
"github.com/influxdata/influxdb/v2"
1415
"github.com/influxdata/influxdb/v2/kit/errors"
1516
"github.com/influxdata/influxdb/v2/models"
1617
"github.com/influxdata/influxdb/v2/query"
@@ -273,6 +274,12 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error {
273274
req.Range.Start = int64(gi.spec.Bounds.Start)
274275
req.Range.End = int64(gi.spec.Bounds.Stop)
275276

277+
if len(gi.spec.GroupKeys) > 0 && gi.spec.GroupMode == query.GroupModeNone {
278+
return &influxdb.Error{
279+
Code: influxdb.EInternal,
280+
Msg: "cannot have group mode none with group key values",
281+
}
282+
}
276283
req.Group = convertGroupMode(gi.spec.GroupMode)
277284
req.GroupKeys = gi.spec.GroupKeys
278285

0 commit comments

Comments
 (0)