Skip to content

Commit

Permalink
Merge pull request #13426 from influxdata/sgc/feat/13369
Browse files Browse the repository at this point in the history
Implementation of storage schema TagValues API
  • Loading branch information
stuartcarnie authored Apr 18, 2019
2 parents c98c29c + d5341a1 commit ad761ea
Show file tree
Hide file tree
Showing 30 changed files with 2,295 additions and 334 deletions.
43 changes: 14 additions & 29 deletions storage/engine_schema.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,35 @@
package storage

import (
"context"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxql"
)

// StringIterator describes the behavior for enumerating a sequence of
// string values.
type StringIterator interface {
// Next advances the StringIterator to the next value. It returns false
// when there are no more values.
Next() bool

// Value returns the current value.
Value() string
}

// TagKeys returns an iterator where the values are tag keys for the bucket
// matching the predicate within the time range (start, end].
func (e *Engine) TagKeys(orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) StringIterator {
func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) cursors.StringIterator {
// This method would be invoked when the consumer wants to get the following schema information for an arbitrary
// time range in a single bucket:
//
// * All tag keys;
// * All tag keys filtered by an arbitrary predicate, e.g., tag keys for a measurement or tag keys appearing alongside another tag key pair.
//
return nullStringIterator

return cursors.NewStringSliceIterator([]string{"\x00", "arch", "cluster_id", "datacenter", "hostname", "os", "rack", "region", "service", "service_environment", "service_version", "\xff"})
}

// TagValues returns an iterator which enumerates the values for the specific
// tagKey in the given bucket matching the predicate within the
// time range (start, end].
func (e *Engine) TagValues(orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) StringIterator {
// This method would be invoked when the consumer wants to get the following schema information for an arbitrary
// time range in a single bucket:
//
// * All measurement names, i.e. tagKey == _measurement);
// * All field names for a specific measurement using a predicate
// * i.e. tagKey is "_field", predicate _measurement == "<measurement>"
//
return nullStringIterator
func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return cursors.EmptyStringIterator, nil
}

return e.engine.TagValues(ctx, orgID, bucketID, tagKey, start, end, predicate)
}

var nullStringIterator StringIterator = &emptyStringIterator{}

type emptyStringIterator struct{}

func (*emptyStringIterator) Next() bool { return false }
func (*emptyStringIterator) Value() string { return "" }
12 changes: 9 additions & 3 deletions storage/reads/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/heap"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb/cursors"
)

Expand Down Expand Up @@ -123,12 +122,15 @@ func (h *resultSetHeap) Pop() interface{} {
}

type MergedStringIterator struct {
iterators []storage.StringIterator
iterators []cursors.StringIterator
uniqueValues map[string]struct{}
nextValue string
}

func NewMergedStringIterator(iterators []storage.StringIterator) *MergedStringIterator {
// API compatibility
var _ cursors.StringIterator = (*MergedStringIterator)(nil)

func NewMergedStringIterator(iterators []cursors.StringIterator) *MergedStringIterator {
return &MergedStringIterator{
iterators: iterators,
uniqueValues: make(map[string]struct{}),
Expand Down Expand Up @@ -161,3 +163,7 @@ func (mr *MergedStringIterator) Next() bool {
func (mr *MergedStringIterator) Value() string {
return mr.nextValue
}

func (mr *MergedStringIterator) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}
8 changes: 4 additions & 4 deletions storage/reads/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)

func newStreamSeries(v ...string) *sliceStreamReader {
Expand Down Expand Up @@ -109,19 +109,19 @@ series: _m=m0,tag0=val03
func TestNewMergedStringIterator(t *testing.T) {
tests := []struct {
name string
iterators []storage.StringIterator
iterators []cursors.StringIterator
expectedValues []string
}{
{
name: "simple",
iterators: []storage.StringIterator{
iterators: []cursors.StringIterator{
newMockStringIterator("foo", "bar"),
},
expectedValues: []string{"foo", "bar"},
},
{
name: "duplicates",
iterators: []storage.StringIterator{
iterators: []cursors.StringIterator{
newMockStringIterator("foo"),
newMockStringIterator("bar", "bar"),
newMockStringIterator("foo"),
Expand Down
5 changes: 2 additions & 3 deletions storage/reads/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
Expand Down Expand Up @@ -79,6 +78,6 @@ type Store interface {
ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error)
GroupRead(ctx context.Context, req *datatypes.ReadRequest) (GroupResultSet, error)
GetSource(orgID, bucketID uint64) proto.Message
TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (storage.StringIterator, error)
TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (storage.StringIterator, error)
TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error)
TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error)
}
8 changes: 8 additions & 0 deletions storage/reads/string_iterator_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reads

import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type StringValuesStreamReader interface {
Expand All @@ -16,6 +17,9 @@ type StringIteratorStreamReader struct {
err error
}

// API compatibility
var _ cursors.StringIterator = (*StringIteratorStreamReader)(nil)

func NewStringIteratorStreamReader(stream StringValuesStreamReader) *StringIteratorStreamReader {
return &StringIteratorStreamReader{
stream: stream,
Expand Down Expand Up @@ -53,3 +57,7 @@ func (r *StringIteratorStreamReader) Value() string {
// Better than panic.
return ""
}

func (r *StringIteratorStreamReader) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}
5 changes: 5 additions & 0 deletions storage/reads/string_iterator_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type mockStringIterator struct {
Expand Down Expand Up @@ -37,6 +38,10 @@ func (si *mockStringIterator) Value() string {
return ""
}

func (si *mockStringIterator) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}

type mockStringValuesStreamReader struct {
responses []*datatypes.StringValuesResponse
}
Expand Down
4 changes: 2 additions & 2 deletions storage/reads/string_iterator_writer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package reads

import (
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type StringIteratorStream interface {
Expand Down Expand Up @@ -37,7 +37,7 @@ func (w *StringIteratorWriter) WrittenN() int {
return w.vc
}

func (w *StringIteratorWriter) WriteStringIterator(si storage.StringIterator) error {
func (w *StringIteratorWriter) WriteStringIterator(si cursors.StringIterator) error {
for si.Next() {
v := si.Value()
if v == "" {
Expand Down
11 changes: 5 additions & 6 deletions storage/readservice/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/influxdata/influxql"
)

Expand Down Expand Up @@ -133,7 +134,7 @@ func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (read
return reads.NewGroupResultSet(ctx, req, newCursor), nil
}

func (s *store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (storage.StringIterator, error) {
func (s *store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -169,12 +170,12 @@ func (s *store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (sto
if err != nil {
return nil, err
}
si := s.engine.TagKeys(influxdb.ID(readSource.OrganizationID), influxdb.ID(readSource.BucketID), req.Range.Start, req.Range.End, expr)
si := s.engine.TagKeys(ctx, influxdb.ID(readSource.OrganizationID), influxdb.ID(readSource.BucketID), req.Range.Start, req.Range.End, expr)

return si, nil
}

func (s *store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (storage.StringIterator, error) {
func (s *store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -214,9 +215,7 @@ func (s *store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest)
if err != nil {
return nil, err
}
si := s.engine.TagValues(influxdb.ID(readSource.OrganizationID), influxdb.ID(readSource.BucketID), req.TagKey, req.Range.Start, req.Range.End, expr)

return si, nil
return s.engine.TagValues(ctx, influxdb.ID(readSource.OrganizationID), influxdb.ID(readSource.BucketID), req.TagKey, req.Range.Start, req.Range.End, expr)
}

// this is easier than fooling around with .proto files.
Expand Down
11 changes: 6 additions & 5 deletions tsdb/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import "github.com/influxdata/influxdb/tsdb/cursors"
// talk about consuming data.

type (
IntegerArray = cursors.IntegerArray
FloatArray = cursors.FloatArray
UnsignedArray = cursors.UnsignedArray
StringArray = cursors.StringArray
BooleanArray = cursors.BooleanArray
IntegerArray = cursors.IntegerArray
FloatArray = cursors.FloatArray
UnsignedArray = cursors.UnsignedArray
StringArray = cursors.StringArray
BooleanArray = cursors.BooleanArray
TimestampArray = cursors.TimestampArray

IntegerArrayCursor = cursors.IntegerArrayCursor
FloatArrayCursor = cursors.FloatArrayCursor
Expand Down
Loading

0 comments on commit ad761ea

Please sign in to comment.