Skip to content

Commit

Permalink
fix(query): fix the group last pushdown to use descending cursors
Browse files Browse the repository at this point in the history
  • Loading branch information
jsternberg authored and Faith Chikwekwe committed Apr 20, 2021
1 parent 24dc72a commit fb21461
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
10 changes: 9 additions & 1 deletion storage/reads/group_resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func GroupOptionNilSortLo() GroupOption {
}
}

// IsAscendingGroupAggregate checks if this request is using the `last` aggregate type.
// It returns true if an ascending cursor should be used (all other conditions)
// or a descending cursor (when `last` is used).
func IsAscendingGroupAggregate(req *datatypes.ReadGroupRequest) bool {
return req.Aggregate == nil || req.Aggregate.Type != datatypes.AggregateTypeLast
}

func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet {
g := &groupResultSet{
ctx: ctx,
Expand All @@ -54,7 +61,8 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
o(g)
}

g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true)
ascending := IsAscendingGroupAggregate(req)
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, ascending)

for i, k := range req.GroupKeys {
g.keys[i] = []byte(k)
Expand Down
6 changes: 5 additions & 1 deletion v1/services/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return nil, err
}

shardIDs, err := s.findShardIDs(database, rp, false, start, end)
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingGroupAggregate(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fb21461

Please sign in to comment.