Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flux): enable group > min/max pushdown #21281

Merged
merged 2 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package influxdb_test

import "testing/expect"

option now = () => (2030-01-01T00:00:00Z)

testcase push_down_min_bare extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare()
}

testcase push_down_min_bare_host extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_host()
}

testcase push_down_min_bare_field extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_field()
}

testcase push_down_max_bare extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare()
}

testcase push_down_max_bare_host extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_host()
}

testcase push_down_max_bare_field extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_field()
}

testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_min_table()
}

testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_max_table()
}
118 changes: 118 additions & 0 deletions flux/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
PushDownWindowAggregateRule{},
PushDownWindowAggregateByTimeRule{},
PushDownBareAggregateRule{},
PushDownGroupAggregateRule{},
)
plan.RegisterLogicalRules(
universe.MergeFiltersRule{},
Expand Down Expand Up @@ -874,6 +875,123 @@ func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}), true, nil
}

//
// Push Down of group aggregates.
// ReadGroupPhys |> { count }
//
type PushDownGroupAggregateRule struct{}

func (PushDownGroupAggregateRule) Name() string {
return "PushDownGroupAggregateRule"
}

func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(
[]plan.ProcedureKind{
universe.CountKind,
universe.SumKind,
universe.FirstKind,
universe.LastKind,
universe.MinKind,
universe.MaxKind,
},
plan.Pat(ReadGroupPhysKind))
}

func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
group := pn.Predecessors()[0].ProcedureSpec().(*ReadGroupPhysSpec)
// Cannot push down multiple aggregates
if len(group.AggregateMethod) > 0 {
return pn, false, nil
}

if !canPushGroupedAggregate(ctx, pn) {
return pn, false, nil
}

switch pn.Kind() {
case universe.CountKind:
// ReadGroup() -> count => ReadGroup(count)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.CountKind,
})
return node, true, nil
case universe.SumKind:
// ReadGroup() -> sum => ReadGroup(sum)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.SumKind,
})
return node, true, nil
case universe.FirstKind:
// ReadGroup() -> first => ReadGroup(first)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.FirstKind,
})
return node, true, nil
case universe.LastKind:
// ReadGroup() -> last => ReadGroup(last)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.LastKind,
})
return node, true, nil
case universe.MinKind:
// ReadGroup() -> min => ReadGroup(min)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MinKind,
})
return node, true, nil
case universe.MaxKind:
// ReadGroup() -> max => ReadGroup(max)
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MaxKind,
})
return node, true, nil
}
return pn, false, nil
}

func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
switch pn.Kind() {
case universe.CountKind:
agg := pn.ProcedureSpec().(*universe.CountProcedureSpec)
return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel
case universe.SumKind:
agg := pn.ProcedureSpec().(*universe.SumProcedureSpec)
return len(agg.Columns) == 1 && agg.Columns[0] == execute.DefaultValueColLabel
case universe.FirstKind:
agg := pn.ProcedureSpec().(*universe.FirstProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.LastKind:
agg := pn.ProcedureSpec().(*universe.LastProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.MaxKind:
agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
case universe.MinKind:
agg := pn.ProcedureSpec().(*universe.MinProcedureSpec)
return agg.Column == execute.DefaultValueColLabel
}
return false
}

func asSchemaMutationProcedureSpec(spec plan.ProcedureSpec) *universe.SchemaMutationProcedureSpec {
if s, ok := spec.(*universe.DualImplProcedureSpec); ok {
spec = s.ProcedureSpec
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/golang/mock v1.4.3
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.0
github.com/influxdata/flux v0.112.1
github.com/influxdata/flux v0.113.0
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93
github.com/influxdata/pkg-config v0.2.7
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,9 @@ github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.65.0 h1:57tk1Oo4gpGIDbV12vUAPCMtLtThhaXzub1XRIuqv6A=
github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY=
github.com/influxdata/flux v0.111.0 h1:27CNz0SbEofD9NzdwcdxRwGmuVSDSisVq4dOceB/KF0=
github.com/influxdata/flux v0.111.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/flux v0.112.1 h1:N9kRbSx0AdGDkjH5PyoFczfCOenNsfxYVFhLFcEAOWQ=
github.com/influxdata/flux v0.112.1/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/flux v0.113.0 h1:QoQ9ggVRZeMK5u4FUzYLHPa3QKu435abMp/Ejdse6LY=
github.com/influxdata/flux v0.113.0/go.mod h1:3TJtvbm/Kwuo5/PEo5P6HUzwVg4bXWkb2wPQHPtQdlU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
github.com/influxdata/influxdb v1.8.0/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ=
Expand Down
6 changes: 5 additions & 1 deletion services/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return nil, err
}

shardIDs, err := s.findShardIDs(database, rp, false, start, end)
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingGroupAggregate(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}
Expand Down
Loading