Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorder series key tagset #12391

Merged
merged 10 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
## v2.0.0-alpha.5 [unreleased]

### Release Notes

This release includes a [breaking change](https://github.com/influxdata/influxdb/pull/12391) to the format that TSM and index data are stored on disk.
Any existing local data will not be queryable once InfluxDB is upgraded to this release.
Prior to installing this release we recommend all storage-engine data is removed from your local InfluxDB `2.x` installation; this can be achieved without losing any of your other InfluxDB `2.x` data (settings etc).
To remove only local storage data, run the following in a terminal.

On most `linux` systems:

```sh

# Replace <username> with your actual username.

$ rm -r /home/<username>/.influxdbv2/engine
```

On `macOS`:

```sh
# Replace <username> with your actual username.

$ rm -r /Users/<username>/.influxdbv2/engine
```

Once completed, `v2.0.0-alpha.5` can be started.

### Features

1. [12096](https://github.com/influxdata/influxdb/pull/12096): Add labels to cloned tasks
1. [12111](https://github.com/influxdata/influxdb/pull/12111): Add ability to filter resources by clicking a label
1. [12401](https://github.com/influxdata/influxdb/pull/12401): Add ability to add a member to org
1. [12391](https://github.com/influxdata/influxdb/pull/12391): Improve representation of TSM tagsets on disk

### Bug Fixes

Expand Down
8 changes: 4 additions & 4 deletions cmd/influxd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func TestLauncher_WriteAndQuery(t *testing.T) {

// Query server to ensure write persists.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"
exp := `,result,table,_start,_stop,_time,_value,_measurement,k,_field` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,m,v,f` + "\r\n\r\n"

buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
if err != nil {
Expand Down Expand Up @@ -112,8 +112,8 @@ func TestLauncher_BucketDelete(t *testing.T) {

// Query server to ensure write persists.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"
exp := `,result,table,_start,_stop,_time,_value,_measurement,k,_field` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,m,v,f` + "\r\n\r\n"

buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions cmd/influxd/launcher/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ mem,server=b value=45.2`))
}

rawQ := fmt.Sprintf(`from(bucket:"%s")
|> filter(fn: (r) => r._m == "cpu" and (r._f == "v1" or r._f == "v0"))
|> filter(fn: (r) => r._measurement == "cpu" and (r._field == "v1" or r._field == "v0"))
|> range(start:-1m)
`, be.Bucket.Name)

// Expected keys:
//
// _measurement=cpu,region=east,server=b,area=z,_field=v1
// _measurement=cpu,region=west,server=a,_field=v0
// _measurement=cpu,region=west,server=b,_field=v0
//
results := be.MustExecuteQuery(be.Org.ID, rawQ, be.Auth)
defer results.Done()
results.First(t).HasTablesWithCols([]int{4, 4, 5})
results.First(t).HasTablesWithCols([]int{5, 4, 4})
}

// This test initialises a default launcher writes some data,
Expand Down
8 changes: 4 additions & 4 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func TestStorage_WriteAndQuery(t *testing.T) {

qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`

exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n\r\n"
exp := `,result,table,_start,_stop,_time,_value,_measurement,k,_field` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,m,v1,f` + "\r\n\r\n"
if got := l.FluxQueryOrFail(t, org1.Org, org1.Auth.Token, qs); !cmp.Equal(got, exp) {
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
}

exp = `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n\r\n"
exp = `,result,table,_start,_stop,_time,_value,_measurement,k,_field` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,m,v2,f` + "\r\n\r\n"
if got := l.FluxQueryOrFail(t, org2.Org, org2.Auth.Token, qs); !cmp.Equal(got, exp) {
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
}
Expand Down
43 changes: 32 additions & 11 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ import (
"github.com/influxdata/influxdb/pkg/escape"
)

// Values used to store the field key and measurement name as special internal
// tags.
const (
FieldKeyTagKey = "\xff"
MeasurementTagKey = "\x00"
)

// Predefined byte representations of special tag keys.
var (
FieldKeyTagKeyBytes = []byte(FieldKeyTagKey)
MeasurementTagKeyBytes = []byte(MeasurementTagKey)
)

type escapeSet struct {
k [1]byte
esc [2]byte
Expand Down Expand Up @@ -2462,27 +2475,35 @@ func appendField(b []byte, k string, v interface{}) []byte {
return b
}

// ValidKeyToken returns true if the token used for measurement, tag key, or tag
// value is a valid unicode string and only contains printable, non-replacement characters.
func ValidKeyToken(s string) bool {
if !utf8.ValidString(s) {
// ValidToken returns true if the provided token is a valid unicode string, and
// only contains printable, non-replacement characters.
func ValidToken(a []byte) bool {
if !utf8.Valid(a) {
return false
}
for _, r := range s {

for _, r := range string(a) {
if !unicode.IsPrint(r) || r == unicode.ReplacementChar {
return false
}
}
return true
}

// ValidKeyTokens returns true if the measurement name and all tags are valid.
func ValidKeyTokens(name string, tags Tags) bool {
if !ValidKeyToken(name) {
return false
}
// ValidTagTokens returns true if all the provided tag key and values are
// valid.
//
// ValidTagTokens does not validate the special tag keys used to represent the
// measurement name and field key, but it does validate the associated values.
func ValidTagTokens(tags Tags) bool {
for _, tag := range tags {
if !ValidKeyToken(string(tag.Key)) || !ValidKeyToken(string(tag.Value)) {
// Validate all external tag keys.
if !bytes.Equal(tag.Key, MeasurementTagKeyBytes) && !bytes.Equal(tag.Key, FieldKeyTagKeyBytes) && !ValidToken(tag.Key) {
return false
}

// Validate all tag values (this will also validate the field key, which is a tag value for the special field key tag key).
if !ValidToken(tag.Value) {
return false
}
}
Expand Down
36 changes: 36 additions & 0 deletions models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,42 @@ func TestParseName(t *testing.T) {
}
}

func TestValidTagTokens(t *testing.T) {
testCases := []struct {
tags models.Tags
expected bool
}{
{tags: models.NewTags(map[string]string{}), expected: true},
{tags: models.NewTags(map[string]string{"foo": "bar"}), expected: true},
{tags: models.NewTags(map[string]string{"foo": "bar", "_foo": "cpu", "hello": "こんにちは", "a smile": "😂"}), expected: true},

// These cases have invalid keys, but since they're used for special tags (measurement and field key), they're not validated.
{tags: models.NewTags(map[string]string{models.MeasurementTagKey: "bar"}), expected: true},
{tags: models.NewTags(map[string]string{"\x00": "bar"}), expected: true},
{tags: models.NewTags(map[string]string{string([]byte{0}): "bar"}), expected: true},
{tags: models.NewTags(map[string]string{"\x00": "bar"}), expected: true},
{tags: models.NewTags(map[string]string{"\u0000": "bar"}), expected: true},
{tags: models.NewTags(map[string]string{models.FieldKeyTagKey: "bar"}), expected: true},
{tags: models.NewTags(map[string]string{"\xff": "bar"}), expected: true},
{tags: models.NewTags(map[string]string{string([]byte{255}): "bar"}), expected: true},

// These cases all have invalid tag values
{tags: models.NewTags(map[string]string{string([]byte{0}): "\x00"}), expected: false},
{tags: models.NewTags(map[string]string{"\x00": "\x00"}), expected: false},
{tags: models.NewTags(map[string]string{"\u0000": "\x00"}), expected: false},
{tags: models.NewTags(map[string]string{"\xff": "\x00"}), expected: false},
{tags: models.NewTags(map[string]string{string([]byte{255}): "\x00"}), expected: false},
{tags: models.NewTags(map[string]string{string([]byte{100, 200}): "bar", "_foo": "cpu"}), expected: false},
{tags: models.NewTags(map[string]string{"good key": string([]byte{255})}), expected: false},
}

for i, testCase := range testCases {
if got := models.ValidTagTokens(testCase.tags); got != testCase.expected {
t.Fatalf("[example %d] got %v, expected %v for tags %s", i+1, got, testCase.expected, testCase.tags)
}
}
}

func BenchmarkEscapeStringField_Plain(b *testing.B) {
s := "nothing special"
for i := 0; i < b.N; i++ {
Expand Down
3 changes: 0 additions & 3 deletions storage/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type Config struct {
Dir string `toml:"dir"`
WALDir string `toml:"wal-dir"`
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`
ValidateKeys bool `toml:"validate-keys"`
CacheMaxMemorySize toml.Size `toml:"cache-max-memory-size"`
CacheSnapshotMemorySize toml.Size `toml:"cache-snapshot-memory-size"`
CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"`
Expand All @@ -34,7 +33,6 @@ type Config struct {
func NewConfig() Config {
return Config{
WALFsyncDelay: toml.Duration(tsm1.DefaultWALFsyncDelay),
ValidateKeys: storage.DefaultValidateKeys,
CacheMaxMemorySize: toml.Size(tsm1.DefaultCacheMaxMemorySize),
CacheSnapshotMemorySize: toml.Size(tsm1.DefaultCacheSnapshotMemorySize),
CacheSnapshotWriteColdDuration: toml.Duration(tsm1.DefaultCacheSnapshotWriteColdDuration),
Expand All @@ -50,7 +48,6 @@ func NewConfig() Config {
// of the Dir key so that it can be passed through appropriately to the storage engine constructor.
func Convert(oldConfig Config) (string, storage.Config) {
newConfig := storage.NewConfig()
newConfig.ValidateKeys = oldConfig.ValidateKeys
newConfig.Engine.MADVWillNeed = oldConfig.TSMWillNeed
newConfig.Engine.Cache.MaxMemorySize = oldConfig.CacheMaxMemorySize
newConfig.Engine.Cache.SnapshotMemorySize = oldConfig.CacheSnapshotMemorySize
Expand Down
16 changes: 5 additions & 11 deletions storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"github.com/influxdata/influxdb/tsdb/tsm1"
)

// Default configuration values.
const (
DefaultRetentionInterval = 1 * time.Hour
DefaultValidateKeys = false

DefaultRetentionInterval = time.Hour
DefaultSeriesFileDirectoryName = "_series"
DefaultIndexDirectoryName = "index"
DefaultWALDirectoryName = "wal"
Expand All @@ -24,9 +23,6 @@ type Config struct {
// Frequency of retention in seconds.
RetentionInterval toml.Duration `toml:"retention-interval"`

// Enables unicode validation on series keys on write.
ValidateKeys bool `toml:"validate-keys"`

// Series file config.
SeriesFilePath string `toml:"series-file-path"` // Overrides the default path.

Expand All @@ -47,11 +43,9 @@ type Config struct {
func NewConfig() Config {
return Config{
RetentionInterval: toml.Duration(DefaultRetentionInterval),
ValidateKeys: DefaultValidateKeys,

WAL: tsm1.NewWALConfig(),
Engine: tsm1.NewConfig(),
Index: tsi1.NewConfig(),
WAL: tsm1.NewWALConfig(),
Engine: tsm1.NewConfig(),
Index: tsi1.NewConfig(),
}
}

Expand Down
67 changes: 44 additions & 23 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"

platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
Expand All @@ -20,6 +18,7 @@ import (
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/influxdata/influxdb/tsdb/value"
"github.com/influxdata/influxql"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -356,40 +355,62 @@ func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator,
// WritePoints writes the provided points to the engine.
//
// The Engine expects all points to have been correctly validated by the caller.
// WritePoints will however determine if there are any field type conflicts, and
// return an appropriate error in that case.
// However, WritePoints will determine if any tag key-pairs are missing, or if
// there are any field type conflicts.
//
// Appropriate errors are returned in those cases.
func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
collection, j := tsdb.NewSeriesCollection(points), 0

// dropPoint should be called whenever there is reason to drop a point from
// the batch.
dropPoint := func(key []byte, reason string) {
if collection.Reason == "" {
collection.Reason = reason
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, key)
}

for iter := collection.Iterator(); iter.Next(); {
tags := iter.Tags()

if tags.Len() > 0 && bytes.Equal(tags[0].Key, tsdb.FieldKeyTagKeyBytes) && bytes.Equal(tags[0].Value, timeBytes) {
// Field key "time" is invalid
if collection.Reason == "" {
collection.Reason = fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes)
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
// Not enough tags present.
if tags.Len() < 2 {
dropPoint(iter.Key(), fmt.Sprintf("missing required tags: parsed tags: %q", tags))
continue
}

// First tag key is not measurement tag.
if !bytes.Equal(tags[0].Key, models.MeasurementTagKeyBytes) {
dropPoint(iter.Key(), fmt.Sprintf("missing required measurement tag as first tag, got: %q", tags[0].Key))
continue
}

fkey, fval := tags[len(tags)-1].Key, tags[len(tags)-1].Value

// Last tag key is not field tag.
if !bytes.Equal(fkey, models.FieldKeyTagKeyBytes) {
dropPoint(iter.Key(), fmt.Sprintf("missing required field key tag as last tag, got: %q", tags[0].Key))
continue
}

// The value representing the underlying field key is invalid if it's "time".
if bytes.Equal(fval, timeBytes) {
dropPoint(iter.Key(), fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes))
continue
}

// Filter out any tags with key equal to "time": they are invalid.
if tags.Get(timeBytes) != nil {
if collection.Reason == "" {
collection.Reason = fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name())
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
dropPoint(iter.Key(), fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name()))
continue
}

// Drop any series with invalid unicode characters in the key.
if e.config.ValidateKeys && !models.ValidKeyTokens(string(iter.Name()), tags) {
if collection.Reason == "" {
collection.Reason = fmt.Sprintf("key contains invalid unicode: %q", iter.Key())
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
// Drop any point with invalid unicode characters in any of the tag keys or values.
// This will also cover validating the value used to represent the field key.
if !models.ValidTagTokens(tags) {
dropPoint(iter.Key(), fmt.Sprintf("key contains invalid unicode: %q", iter.Key()))
continue
}

Expand Down
Loading