From 6d939ba017f5ba16418f2319f37331d48b8dea93 Mon Sep 17 00:00:00 2001 From: Christopher Wolff Date: Tue, 27 Oct 2020 08:25:05 -0700 Subject: [PATCH] fix: make tagKeys and tagValues work for edge cases involving _field Fixes #19806 Fixes #19794 --- cmd/influxd/upgrade/config_test.go | 7 +- kit/io/limited_read_closer_test.go | 10 +- query/stdlib/influxdata/influxdb/rules.go | 5 +- query/stdlib/testing/testing.go | 6 +- storage/reads/influxql_predicate.go | 8 +- storage/reads/resultset.go | 5 +- v1/services/storage/predicate_influxql.go | 24 ++ v1/services/storage/store.go | 330 ++++++++++++++-------- 8 files changed, 261 insertions(+), 134 deletions(-) diff --git a/cmd/influxd/upgrade/config_test.go b/cmd/influxd/upgrade/config_test.go index c34a98bb1d0..2512ed9fdb0 100644 --- a/cmd/influxd/upgrade/config_test.go +++ b/cmd/influxd/upgrade/config_test.go @@ -89,8 +89,7 @@ func TestConfigUpgrade(t *testing.T) { } } func TestConfigUpgradeFileNotExists(t *testing.T) { - targetOtions := optionsV2{ - } + targetOtions := optionsV2{} configFile := "/there/is/no/such/path/influxdb.conf" // try upgrade @@ -314,7 +313,7 @@ bind-address = "127.0.0.1:8088" max-version = "tls1.3" ` -var testConfigV1obsoleteArrays =` +var testConfigV1obsoleteArrays = ` reporting-disabled = true [meta] @@ -393,7 +392,7 @@ tls-cert = "/etc/ssl/influxdb.pem" tls-key = "" ` -var testConfigV2obsoleteArrays =`reporting-disabled = true +var testConfigV2obsoleteArrays = `reporting-disabled = true bolt-path = "/db/.influxdbv2/influxd.bolt" engine-path = "/db/.influxdbv2/engine" ` diff --git a/kit/io/limited_read_closer_test.go b/kit/io/limited_read_closer_test.go index e25839cd981..357da746467 100644 --- a/kit/io/limited_read_closer_test.go +++ b/kit/io/limited_read_closer_test.go @@ -2,10 +2,10 @@ package io import ( "bytes" + "errors" "io" "io/ioutil" "testing" - "errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,7 +34,7 @@ func TestLimitedReadCloser_Happy(t *testing.T) { func TestLimitedReadCloseWithErrorAndLimitExceeded(t *testing.T) { b := &closer{ Reader: bytes.NewBufferString("howdy"), - err: errors.New("some error"), + err: errors.New("some error"), } rc := NewLimitedReadCloser(b, 3) @@ -49,7 +49,7 @@ func TestLimitedReadCloseWithError(t *testing.T) { closeErr := errors.New("some error") b := &closer{ Reader: bytes.NewBufferString("howdy"), - err: closeErr, + err: closeErr, } rc := NewLimitedReadCloser(b, 10) @@ -63,7 +63,7 @@ func TestMultipleCloseOnlyClosesOnce(t *testing.T) { closeErr := errors.New("some error") b := &closer{ Reader: bytes.NewBufferString("howdy"), - err: closeErr, + err: closeErr, } rc := NewLimitedReadCloser(b, 10) @@ -77,7 +77,7 @@ func TestMultipleCloseOnlyClosesOnce(t *testing.T) { type closer struct { io.Reader - err error + err error closeCount int } diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go index f16d436cf3b..140dfe38681 100644 --- a/query/stdlib/influxdata/influxdb/rules.go +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -23,9 +23,8 @@ func init() { PushDownRangeRule{}, PushDownFilterRule{}, PushDownGroupRule{}, - // These rules can be re-enabled with https://github.com/influxdata/influxdb/issues/19561 is fixed - // PushDownReadTagKeysRule{}, - // PushDownReadTagValuesRule{}, + PushDownReadTagKeysRule{}, + PushDownReadTagValuesRule{}, SortedPivotRule{}, PushDownWindowAggregateRule{}, PushDownWindowAggregateByTimeRule{}, diff --git a/query/stdlib/testing/testing.go b/query/stdlib/testing/testing.go index 6b93357fba5..ef8d6b5dc08 100644 --- a/query/stdlib/testing/testing.go +++ b/query/stdlib/testing/testing.go @@ -118,11 +118,7 @@ var FluxEndToEndSkipList = map[string]map[string]string{ "http": { "http_endpoint": "need ability to test side effects in e2e tests: (https://github.com/influxdata/flux/issues/1723)", }, - "influxdata/influxdb/schema": { - "show_measurements": "flaky test (https://github.com/influxdata/influxdb/issues/15450)", - "show_tag_values": "flaky test (https://github.com/influxdata/influxdb/issues/15450)", - "show_tag_keys": "flaky test (https://github.com/influxdata/influxdb/issues/15450)", - }, + "influxdata/influxdb/schema": {}, "influxdata/influxdb/monitor": { "state_changes_big_any_to_any": "unbounded test", "state_changes_big_info_to_ok": "unbounded test", diff --git a/storage/reads/influxql_predicate.go b/storage/reads/influxql_predicate.go index f7f4e49bbfa..df822d3708a 100644 --- a/storage/reads/influxql_predicate.go +++ b/storage/reads/influxql_predicate.go @@ -267,8 +267,12 @@ func (v *hasRefs) Visit(node influxql.Node) influxql.Visitor { return v } -func HasFieldValueKey(expr influxql.Expr) bool { - refs := hasRefs{refs: []string{fieldRef}, found: make([]bool, 1)} +func ExprHasKey(expr influxql.Expr, key string) bool { + refs := hasRefs{refs: []string{key}, found: make([]bool, 1)} influxql.Walk(&refs, expr) return refs.found[0] } + +func HasFieldValueKey(expr influxql.Expr) bool { + return ExprHasKey(expr, fieldRef) +} diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index a8c14a82967..9f7d5755041 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -4,7 +4,6 @@ import ( "context" "github.com/influxdata/influxdb/v2/models" - "github.com/influxdata/influxdb/v2/storage/reads/datatypes" "github.com/influxdata/influxdb/v2/tsdb/cursors" ) @@ -19,11 +18,11 @@ type resultSet struct { arrayCursors multiShardCursors } -func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, seriesCursor SeriesCursor) ResultSet { +func NewFilteredResultSet(ctx context.Context, start, end int64, seriesCursor SeriesCursor) ResultSet { return &resultSet{ ctx: ctx, seriesCursor: seriesCursor, - arrayCursors: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true), + arrayCursors: newMultiShardArrayCursors(ctx, start, end, true), } } diff --git a/v1/services/storage/predicate_influxql.go b/v1/services/storage/predicate_influxql.go index 98539179997..58b0cf6d723 100644 --- a/v1/services/storage/predicate_influxql.go +++ b/v1/services/storage/predicate_influxql.go @@ -112,3 +112,27 @@ func HasFieldKeyOrValue(expr influxql.Expr) (bool, bool) { influxql.Walk(&refs, expr) return refs.found[0], refs.found[1] } + +type hasAnyTagKeys struct { + found bool +} + +func (v *hasAnyTagKeys) Visit(node influxql.Node) influxql.Visitor { + if v.found { + return nil + } + + if n, ok := node.(*influxql.VarRef); ok { + if n.Val != fieldKey && n.Val != measurementKey && n.Val != "$" { + v.found = true + return nil + } + } + return v +} + +func hasTagKey(expr influxql.Expr) bool { + v := &hasAnyTagKeys{} + influxql.Walk(v, expr) + return v.found +} diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index dce3bd4fd49..ee8ed23c13f 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -3,6 +3,7 @@ package storage import ( "context" "errors" + "fmt" "sort" "time" @@ -185,7 +186,7 @@ func (s *Store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest req.Range.Start = start req.Range.End = end - return reads.NewFilteredResultSet(ctx, req, cur), nil + return reads.NewFilteredResultSet(ctx, req.Range.Start, req.Range.End, cur), nil } func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) { @@ -232,26 +233,66 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) return rs, nil } +type metaqueryAttributes struct { + orgID influxdb.ID + db, rp string + start, end int64 + pred influxql.Expr +} + +func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaqueryAttributes, shardIDs []uint64) (cursors.StringIterator, error) { + var cur reads.SeriesCursor + if ic, err := newIndexSeriesCursorInfluxQLPred(ctx, mqAttrs.pred, s.TSDBStore.Shards(shardIDs)); err != nil { + return nil, err + } else if ic == nil { + return cursors.EmptyStringIterator, nil + } else { + cur = ic + } + m := make(map[string]struct{}) + rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) + for rs.Next() { + func() { + c := rs.Cursor() + if c == nil { + // no data for series key + field combination + return + } + defer c.Close() + if cursorHasData(c) { + for _, tag := range rs.Tags() { + m[string(tag.Key)] = struct{}{} + } + } + }() + } + + arr := make([]string, 0, len(m)) + for tag, _ := range m { + arr = append(arr, tag) + } + sort.Strings(arr) + return cursors.NewStringSliceIterator(arr), nil +} + func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) { if req.TagsSource == nil { return nil, errors.New("missing read source") } - source, err := getReadSource(*req.TagsSource) if err != nil { return nil, err } - - database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) + db, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) if err != nil { return nil, err } - shardIDs, err := s.findShardIDs(database, rp, false, start, end) + shardIDs, err := s.findShardIDs(db, rp, false, start, end) if err != nil { return nil, err } - if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil + if len(shardIDs) == 0 { return cursors.EmptyStringIterator, nil } @@ -266,9 +307,18 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur if found := reads.HasFieldValueKey(expr); found { return nil, errors.New("field values unsupported") } - // this will remove any _field references, which are not indexed - // see https://github.com/influxdata/influxdb/issues/19488 - expr = influxql.Reduce(RewriteExprRemoveFieldKeyAndValue(influxql.CloneExpr(expr)), nil) + if found := reads.ExprHasKey(expr, fieldKey); found { + mqAttrs := &metaqueryAttributes{ + orgID: source.GetOrgID(), + db: db, + rp: rp, + start: start, + end: end, + pred: expr, + } + return s.tagKeysWithFieldPredicate(ctx, mqAttrs, shardIDs) + } + expr = influxql.Reduce(influxql.CloneExpr(expr), nil) if reads.IsTrueBooleanLiteral(expr) { expr = nil } @@ -300,19 +350,6 @@ func (s *Store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cur } func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) { - if tagKey, ok := measurementRemap[req.TagKey]; ok { - switch tagKey { - case "_name": - return s.MeasurementNames(ctx, &MeasurementNamesRequest{ - MeasurementsSource: req.TagsSource, - Predicate: req.Predicate, - }) - - case "_field": - return s.measurementFields(ctx, req) - } - } - if req.TagsSource == nil { return nil, errors.New("missing read source") } @@ -322,62 +359,97 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) return nil, err } - database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) + db, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) if err != nil { return nil, err } - shardIDs, err := s.findShardIDs(database, rp, false, start, end) - if err != nil { - return nil, err - } - if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil - return cursors.EmptyStringIterator, nil - } - - var expr influxql.Expr + var influxqlPred influxql.Expr if root := req.Predicate.GetRoot(); root != nil { var err error - expr, err = reads.NodeToExpr(root, measurementRemap) + influxqlPred, err = reads.NodeToExpr(root, measurementRemap) if err != nil { return nil, err } - if found := reads.HasFieldValueKey(expr); found { + if found := reads.HasFieldValueKey(influxqlPred); found { return nil, errors.New("field values unsupported") } - // this will remove any _field references, which are not indexed - // see https://github.com/influxdata/influxdb/issues/19488 - expr = influxql.Reduce(RewriteExprRemoveFieldKeyAndValue(influxql.CloneExpr(expr)), nil) - if reads.IsTrueBooleanLiteral(expr) { - expr = nil + + influxqlPred = influxql.Reduce(influxql.CloneExpr(influxqlPred), nil) + if reads.IsTrueBooleanLiteral(influxqlPred) { + influxqlPred = nil } } + mqAttrs := &metaqueryAttributes{ + orgID: source.GetOrgID(), + db: db, + rp: rp, + start: start, + end: end, + pred: influxqlPred, + } + + tagKey, ok := measurementRemap[req.TagKey] + if !ok { + tagKey = req.TagKey + } + + // Getting values of _measurement or _field are handled specially + switch tagKey { + case "_name": + return s.MeasurementNames(ctx, mqAttrs) + + case "_field": + return s.measurementFields(ctx, mqAttrs) + } + + return s.tagValues(ctx, mqAttrs, tagKey) +} + +func (s *Store) tagValues(ctx context.Context, mqAttrs *metaqueryAttributes, tagKey string) (cursors.StringIterator, error) { + // If there are any references to _field, we need to use the slow path + // since we cannot rely on the index alone. + if mqAttrs.pred != nil { + if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey { + return s.tagValuesSlow(ctx, mqAttrs, tagKey) + } + } + + shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end) + if err != nil { + return nil, err + } + if len(shardIDs) == 0 { + return cursors.EmptyStringIterator, nil + } + tagKeyExpr := &influxql.BinaryExpr{ Op: influxql.EQ, LHS: &influxql.VarRef{ Val: "_tagKey", }, RHS: &influxql.StringLiteral{ - Val: req.TagKey, + Val: tagKey, }, } - if expr != nil { - expr = &influxql.BinaryExpr{ + + if mqAttrs.pred != nil { + mqAttrs.pred = &influxql.BinaryExpr{ Op: influxql.AND, LHS: tagKeyExpr, RHS: &influxql.ParenExpr{ - Expr: expr, + Expr: mqAttrs.pred, }, } } else { - expr = tagKeyExpr + mqAttrs.pred = tagKeyExpr } // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.TagValues(auth, shardIDs, expr) + values, err := s.TSDBStore.TagValues(auth, shardIDs, mqAttrs.pred) if err != nil { return nil, err } @@ -397,48 +469,19 @@ func (s *Store) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) return cursors.NewStringSliceIterator(names), nil } -type MeasurementNamesRequest struct { - MeasurementsSource *types.Any - Predicate *datatypes.Predicate -} - -func (s *Store) MeasurementNames(ctx context.Context, req *MeasurementNamesRequest) (cursors.StringIterator, error) { - if req.MeasurementsSource == nil { - return nil, errors.New("missing read source") - } - - source, err := getReadSource(*req.MeasurementsSource) - if err != nil { - return nil, err - } - - database, _, _, _, err := s.validateArgs(source.OrganizationID, source.BucketID, -1, -1) - if err != nil { - return nil, err - } - - var expr influxql.Expr - if root := req.Predicate.GetRoot(); root != nil { - var err error - expr, err = reads.NodeToExpr(root, nil) - if err != nil { - return nil, err - } - - if found := reads.HasFieldValueKey(expr); found { - return nil, errors.New("field values unsupported") - } - // this will remove any _field references, which are not indexed - // see https://github.com/influxdata/influxdb/issues/19488 - expr = influxql.Reduce(RewriteExprRemoveFieldKeyAndValue(influxql.CloneExpr(expr)), nil) - if reads.IsTrueBooleanLiteral(expr) { - expr = nil +func (s *Store) MeasurementNames(ctx context.Context, mqAttrs *metaqueryAttributes) (cursors.StringIterator, error) { + if mqAttrs.pred != nil { + if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey { + // If there is a predicate on _field, we cannot use the index + // to filter out unwanted measurement names. Use a slower + // block scan instead. + return s.tagValuesSlow(ctx, mqAttrs, measurementKey) } } // TODO(jsternberg): Use a real authorizer. auth := query.OpenAuthorizer - values, err := s.TSDBStore.MeasurementNames(auth, database, expr) + values, err := s.TSDBStore.MeasurementNames(auth, mqAttrs.db, mqAttrs.pred) if err != nil { return nil, err } @@ -463,18 +506,20 @@ func (s *Store) GetSource(orgID, bucketID uint64) proto.Message { } } -func (s *Store) measurementFields(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) { - source, err := getReadSource(*req.TagsSource) - if err != nil { - return nil, err - } +func (s *Store) measurementFields(ctx context.Context, mqAttrs *metaqueryAttributes) (cursors.StringIterator, error) { + if mqAttrs.pred != nil { + if hasFieldKey := reads.ExprHasKey(mqAttrs.pred, fieldKey); hasFieldKey { + return s.tagValuesSlow(ctx, mqAttrs, fieldKey) + } - database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) - if err != nil { - return nil, err + // If there predicates on anything besides _measurement, we can't + // use the index and need to use the slow path. + if hasTagKey(mqAttrs.pred) { + return s.tagValuesSlow(ctx, mqAttrs, fieldKey) + } } - shardIDs, err := s.findShardIDs(database, rp, false, start, end) + shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end) if err != nil { return nil, err } @@ -482,32 +527,15 @@ func (s *Store) measurementFields(ctx context.Context, req *datatypes.TagValuesR return cursors.EmptyStringIterator, nil } - var expr influxql.Expr - if root := req.Predicate.GetRoot(); root != nil { - var err error - expr, err = reads.NodeToExpr(root, measurementRemap) - if err != nil { - return nil, err - } - - if found := reads.HasFieldValueKey(expr); found { - return nil, errors.New("field values unsupported") - } - expr = influxql.Reduce(influxql.CloneExpr(expr), nil) - if reads.IsTrueBooleanLiteral(expr) { - expr = nil - } - } - sg := s.TSDBStore.ShardGroup(shardIDs) ms := &influxql.Measurement{ - Database: database, - RetentionPolicy: rp, + Database: mqAttrs.db, + RetentionPolicy: mqAttrs.rp, SystemIterator: "_fieldKeys", } opts := query.IteratorOptions{ - OrgID: influxdb.ID(source.OrganizationID), - Condition: expr, + OrgID: mqAttrs.orgID, + Condition: mqAttrs.pred, Authorizer: query.OpenAuthorizer, } iter, err := sg.CreateIterator(ctx, ms, opts) @@ -529,3 +557,81 @@ func (s *Store) measurementFields(ctx context.Context, req *datatypes.TagValuesR return cursors.NewStringSliceIterator(fieldNames), nil } + +func cursorHasData(c cursors.Cursor) bool { + var l int + switch typedCur := c.(type) { + case cursors.IntegerArrayCursor: + ia := typedCur.Next() + l = ia.Len() + case cursors.FloatArrayCursor: + ia := typedCur.Next() + l = ia.Len() + case cursors.UnsignedArrayCursor: + ia := typedCur.Next() + l = ia.Len() + case cursors.BooleanArrayCursor: + ia := typedCur.Next() + l = ia.Len() + case cursors.StringArrayCursor: + ia := typedCur.Next() + l = ia.Len() + default: + panic(fmt.Sprintf("unreachable: %T", typedCur)) + } + return l != 0 +} + +// tagValuesSlow will determine the tag values for the given tagKey. +// It's generally faster to use tagValues, measurementFields or +// MeasurementNames, but those methods will only use the index and metadata +// stored in the shard. Because fields are not themselves indexed, we have no way +// of correlating fields to tag values, so we sometimes need to consult tsm to +// provide an accurate answer. +func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, tagKey string) (cursors.StringIterator, error) { + shardIDs, err := s.findShardIDs(mqAttrs.db, mqAttrs.rp, false, mqAttrs.start, mqAttrs.end) + if err != nil { + return nil, err + } + if len(shardIDs) == 0 { + return cursors.EmptyStringIterator, nil + } + + var cur reads.SeriesCursor + if ic, err := newIndexSeriesCursorInfluxQLPred(ctx, mqAttrs.pred, s.TSDBStore.Shards(shardIDs)); err != nil { + return nil, err + } else if ic == nil { + return nil, nil + } else { + cur = ic + } + m := make(map[string]struct{}) + + rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) + for rs.Next() { + func() { + c := rs.Cursor() + if c == nil { + // no data for series key + field combination? + // It seems that even when there is no data for this series key + field + // combo that the cursor may be not nil. We need to + // request invoke an array cursor to be sure. + // This is the reason for the call to cursorHasData below. + return + } + defer c.Close() + + if cursorHasData(c) { + f := rs.Tags().Get([]byte(tagKey)) + m[string(f)] = struct{}{} + } + }() + } + + names := make([]string, 0, len(m)) + for name := range m { + names = append(names, name) + } + sort.Strings(names) + return cursors.NewStringSliceIterator(names), nil +}