Skip to content

Commit

Permalink
Add support for top level aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Jul 7, 2022
1 parent ab77525 commit 85bac3a
Show file tree
Hide file tree
Showing 8 changed files with 539 additions and 4 deletions.
29 changes: 29 additions & 0 deletions query/graphql/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,24 @@ func getRequestables(
desc *client.CollectionDescription,
descriptionsRepo *DescriptionsRepo,
) (fields []Requestable, aggregates []*aggregateRequest, err error) {
// If this parser.Select is itself an aggregate, we need to append the
// relevent info here as if it was a field of its own (due to a quirk of
// the parser package).
if _, isAggregate := parserTypes.Aggregates[parsed.Name]; isAggregate {
index := mapping.GetNextIndex()
aggregateReq, err := getAggregateRequests(index, parsed)
if err != nil {
return nil, nil, err
}

mapping.RenderKeys = append(mapping.RenderKeys, core.RenderKey{
Index: index,
Key: parsed.Alias,
})
mapping.Add(index, parsed.Name)
aggregates = append(aggregates, &aggregateReq)
}

for _, field := range parsed.Fields {
switch f := field.(type) {
case *parser.Field:
Expand Down Expand Up @@ -449,6 +467,11 @@ func getCollectionName(
parsed *parser.Select,
parentCollectionName string,
) (string, error) {
if _, isAggregate := parserTypes.Aggregates[parsed.Name]; isAggregate {
// This string is not used or referenced, its value is only there to aid debugging
return "_topLevel", nil
}

if parsed.Name == parserTypes.GroupFieldName {
return parentCollectionName, nil
} else if parsed.Root == parserTypes.CommitSelection {
Expand Down Expand Up @@ -480,6 +503,12 @@ func getTopLevelInfo(
) (*core.DocumentMapping, *client.CollectionDescription, error) {
mapping := core.NewDocumentMapping()

if _, isAggregate := parserTypes.Aggregates[parsed.Name]; isAggregate {
// If this is a (top-level) aggregate, then it will have no collection
// description, and no top-level fields, so we return an empty mapping only
return mapping, &client.CollectionDescription{}, nil
}

if parsed.Root != parserTypes.CommitSelection {
mapping.Add(core.DocKeyFieldIndex, parserTypes.DocKeyFieldName)

Expand Down
1 change: 1 addition & 0 deletions query/graphql/planner/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
_ planNode = (*selectTopNode)(nil)
_ planNode = (*orderNode)(nil)
_ planNode = (*sumNode)(nil)
_ planNode = (*topLevelNode)(nil)
_ planNode = (*typeIndexJoin)(nil)
_ planNode = (*typeJoinMany)(nil)
_ planNode = (*typeJoinOne)(nil)
Expand Down
25 changes: 25 additions & 0 deletions query/graphql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ func (p *Planner) newPlan(stmt interface{}) (planNode, error) {
if err != nil {
return nil, err
}

if _, isAgg := parserTypes.Aggregates[n.Name]; isAgg {
// If this Select is an aggregate, then it must be a top-level
// aggregate and we need to resolve it within the context of a
// top-level node.
return p.Top(m)
}

return p.Select(m)
case *mapper.Select:
return p.Select(n)
Expand Down Expand Up @@ -223,6 +231,23 @@ func (p *Planner) expandPlan(plan planNode, parentPlan *selectTopNode) error {
case *deleteNode:
return p.expandPlan(n.source, parentPlan)

case *topLevelNode:
for _, child := range n.children {
switch c := child.(type) {
case *selectTopNode:
// We only care about expanding the child source here, it is assumed that the parent source
// is expanded elsewhere/already
err := p.expandPlan(child, parentPlan)
if err != nil {
return err
}
case aggregateNode:
// top-level aggregates use the top-level node as a source
c.SetPlan(n)
}
}
return nil

default:
return nil
}
Expand Down
203 changes: 203 additions & 0 deletions query/graphql/planner/top.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package planner

import (
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/query/graphql/mapper"
parserTypes "github.com/sourcenetwork/defradb/query/graphql/parser/types"
)

// topLevelNode is a special node that represents the very top of the
// plan graph. It has no source, and will only yield a single item
// containing all of its children.
type topLevelNode struct {
documentIterator
docMapper

children []planNode
childIndexes []int
isdone bool

// This node's children may use this node as a source
// this property controls the recursive flow preventing
// infinate loops.
isInRecurse bool
}

func (n *topLevelNode) Spans(spans core.Spans) {
if n.isInRecurse {
return
}
n.isInRecurse = true
defer func() {
n.isInRecurse = false
}()

for _, child := range n.children {
child.Spans(spans)
}
}

func (n *topLevelNode) Kind() string {
return "topLevelNode"
}

func (n *topLevelNode) Init() error {
if n.isInRecurse {
return nil
}
n.isInRecurse = true
defer func() {
n.isInRecurse = false
}()

for _, child := range n.children {
err := child.Init()
if err != nil {
return err
}
}

return nil
}

func (n *topLevelNode) Start() error {
if n.isInRecurse {
return nil
}
n.isInRecurse = true
defer func() {
n.isInRecurse = false
}()

for _, child := range n.children {
err := child.Start()
if err != nil {
return err
}
}

return nil
}

func (n *topLevelNode) Close() error {
if n.isInRecurse {
return nil
}
n.isInRecurse = true
defer func() {
n.isInRecurse = false
}()

for _, child := range n.children {
err := child.Close()
if err != nil {
return err
}
}

return nil
}

func (n *topLevelNode) Source() planNode {
return nil
}

func (n *topLevelNode) Next() (bool, error) {
if n.isdone {
return false, nil
}

if n.isInRecurse {
return true, nil
}

n.currentValue = n.documentMapping.NewDoc()
n.isInRecurse = true
defer func() {
n.isInRecurse = false
}()

for i, child := range n.children {
switch child.(type) {
case *selectTopNode:
docs := []core.Doc{}
for {
hasChild, err := child.Next()
if err != nil {
return false, err
}
if !hasChild {
break
}
docs = append(docs, child.Value())
}
n.currentValue.Fields[n.childIndexes[i]] = docs
default:
_, err := child.Next()
if err != nil {
return false, err
}

n.currentValue = child.Value()
}
}

n.isdone = true
return true, nil
}

// Top creates a new topLevelNode using the given Select.
func (p *Planner) Top(m *mapper.Select) (*topLevelNode, error) {
node := topLevelNode{
docMapper: docMapper{&m.DocumentMapping},
}

aggregateChildren := []planNode{}
aggregateChildIndexes := []int{}
for _, field := range m.Fields {
switch f := field.(type) {
case *mapper.Aggregate:
var child planNode
var err error
switch field.GetName() {
case parserTypes.CountFieldName:
child, err = p.Count(f, m)
case parserTypes.SumFieldName:
child, err = p.Sum(f, m)
case parserTypes.AverageFieldName:
child, err = p.Average(f)
}
if err != nil {
return nil, err
}
aggregateChildren = append(aggregateChildren, child)
aggregateChildIndexes = append(aggregateChildIndexes, field.GetIndex())
case *mapper.Select:
child, err := p.Select(f)
if err != nil {
return nil, err
}
node.children = append(node.children, child)
node.childIndexes = append(node.childIndexes, field.GetIndex())
}
}

// Iterate through the aggregates backwards to ensure dependencies
// execute *before* any aggregate dependent on them.
for i := len(aggregateChildren) - 1; i >= 0; i-- {
node.children = append(node.children, aggregateChildren[i])
node.childIndexes = append(node.childIndexes, aggregateChildIndexes[i])
}

return &node, nil
}
Loading

0 comments on commit 85bac3a

Please sign in to comment.