Skip to content

Commit

Permalink
feat(flux): enable group > min/max pushdown (#21281)
Browse files Browse the repository at this point in the history
* build(flux): update flux to v0.113.0
* feat(query): enable min/max pushdown
* fix(query): fix the group last pushdown to use descending cursors
* test(storage): add read group test with no agg

Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
Co-authored-by: Christopher M. Wolff <chris.wolff@influxdata.com>
Co-authored-by: Faith Chikwekwe <faithchikwekwe01@gmail.com>
  • Loading branch information
4 people authored Apr 23, 2021
1 parent e97c5d9 commit e74ece2
Show file tree
Hide file tree
Showing 11 changed files with 1,041 additions and 345 deletions.
61 changes: 61 additions & 0 deletions flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package influxdb_test

import "testing/expect"

option now = () => (2030-01-01T00:00:00Z)

testcase push_down_min_bare extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare()
}

testcase push_down_min_bare_host extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_host()
}

testcase push_down_min_bare_field extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_field()
}

testcase push_down_max_bare extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare()
}

testcase push_down_max_bare_host extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_host()
}

testcase push_down_max_bare_field extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_field()
}

testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_min_table()
}

testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_max_table()
}
118 changes: 118 additions & 0 deletions flux/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
PushDownWindowAggregateRule{},
PushDownWindowAggregateByTimeRule{},
PushDownBareAggregateRule{},
PushDownGroupAggregateRule{},
)
plan.RegisterLogicalRules(
universe.MergeFiltersRule{},
Expand Down Expand Up @@ -874,6 +875,123 @@ func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}), true, nil
}

//
// Push Down of group aggregates.
// ReadGroupPhys |> { count }
//
type PushDownGroupAggregateRule struct{}

func (PushDownGroupAggregateRule) Name() string {
return "PushDownGroupAggregateRule"
}

func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(
[]plan.ProcedureKind{
universe.CountKind,
universe.SumKind,
universe.FirstKind,
universe.LastKind,
universe.MinKind,
universe.MaxKind,
},
plan.Pat(ReadGroupPhysKind))
}

func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
group := pn.Predecessors()[0].ProcedureSpec().(*ReadGroupPhysSpec)
// Cannot push down multiple aggregates
if len(group.AggregateMethod) > 0 {
return pn, false, nil
}

if !canPushGroupedAggregate(ctx, pn) {
return pn, false, nil
}

switch pn.Kind() {
case universe.CountKind:
// ReadGroup() -> count => ReadGroup(count)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.CountKind,
})
return node, true, nil
case universe.SumKind:
// ReadGroup() -> sum => ReadGroup(sum)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.SumKind,
})
return node, true, nil
case universe.FirstKind:
// ReadGroup() -> first => ReadGroup(first)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.FirstKind,
})
return node, true, nil
case universe.LastKind:
// ReadGroup() -> last => ReadGroup(last)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.LastKind,
})
return node, true, nil
case universe.MinKind:
// ReadGroup() -> min => ReadGroup(min)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MinKind,
})
return node, true, nil
case universe.MaxKind:
// ReadGroup() -> max => ReadGroup(max)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MaxKind,
})
return node, true, nil
}
return pn, false, nil
}

func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
switch pn.Kind() {
case universe.CountKind:
agg := pn.ProcedureSpec().(*universe.CountProcedureSpec)
return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel
case universe.SumKind:
agg := pn.ProcedureSpec().(*universe.SumProcedureSpec)
return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel
case universe.FirstKind:
agg := pn.ProcedureSpec().(*universe.FirstProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.LastKind:
agg := pn.ProcedureSpec().(*universe.LastProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.MaxKind:
agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.MinKind:
agg := pn.ProcedureSpec().(*universe.MinProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
}
return false
}

func asSchemaMutationProcedureSpec(spec plan.ProcedureSpec) *universe.SchemaMutationProcedureSpec {
if s, ok := spec.(*universe.DualImplProcedureSpec); ok {
spec = s.ProcedureSpec
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/golang/mock v1.4.3
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.0
github.com/influxdata/flux v0.112.1
github.com/influxdata/flux v0.113.0
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93
github.com/influxdata/pkg-config v0.2.7
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,9 @@ github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.65.0 h1:57tk1Oo4gpGIDbV12vUAPCMtLtThhaXzub1XRIuqv6A=
github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY=
github.com/influxdata/flux v0.111.0 h1:27CNz0SbEofD9NzdwcdxRwGmuVSDSisVq4dOceB/KF0=
github.com/influxdata/flux v0.111.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/flux v0.112.1 h1:N9kRbSx0AdGDkjH5PyoFczfCOenNsfxYVFhLFcEAOWQ=
github.com/influxdata/flux v0.112.1/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/flux v0.113.0 h1:QoQ9ggVRZeMK5u4FUzYLHPa3QKu435abMp/Ejdse6LY=
github.com/influxdata/flux v0.113.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
github.com/influxdata/influxdb v1.8.0/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ=
Expand Down
6 changes: 5 additions & 1 deletion services/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return nil, err
}

shardIDs, err := s.findShardIDs(database, rp, false, start, end)
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingGroupAggregate(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e74ece2

Please sign in to comment.