Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config parameters to toggle WAL concurrency and timeouts #21621

Merged
merged 8 commits into from
Jun 9, 2021
11 changes: 11 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,17 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
Flag: "storage-wal-fsync-delay",
Desc: "The amount of time that a write will wait before fsyncing. A duration greater than 0 can be used to batch up multiple fsync calls. This is useful for slower disks or when WAL write contention is seen.",
},
{
DestP: &o.StorageConfig.Data.WALMaxConcurrentWrites,
Flag: "storage-wal-max-concurrent-writes",
Desc: "The max number of writes that will attempt to write to the WAL at a time. (default <nprocs> * 2)",
},
{
DestP: &o.StorageConfig.Data.WALMaxWriteDelay,
Flag: "storage-wal-max-write-delay",
Default: o.StorageConfig.Data.WALMaxWriteDelay,
Desc: "The max amount of time a write will wait when the WAL already has `storage-wal-max-concurrent-writes` active writes. Set to 0 to disable the timeout.",
},
{
DestP: &o.StorageConfig.Data.ValidateKeys,
Flag: "storage-validate-keys",
Expand Down
12 changes: 6 additions & 6 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type TSDBStoreMock struct {
CreateShardSnapshotFn func(id uint64) (string, error)
DatabasesFn func() []string
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteMeasurementFn func(ctx context.Context, database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
Expand Down Expand Up @@ -72,14 +72,14 @@ func (s *TSDBStoreMock) Databases() []string {
func (s *TSDBStoreMock) DeleteDatabase(name string) error {
return s.DeleteDatabaseFn(name)
}
func (s *TSDBStoreMock) DeleteMeasurement(database string, name string) error {
return s.DeleteMeasurementFn(database, name)
func (s *TSDBStoreMock) DeleteMeasurement(ctx context.Context, database string, name string) error {
return s.DeleteMeasurementFn(ctx, database, name)
}
func (s *TSDBStoreMock) DeleteRetentionPolicy(database string, name string) error {
return s.DeleteRetentionPolicyFn(database, name)
}
func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
func (s *TSDBStoreMock) DeleteSeries(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(ctx, database, sources, condition)
}
func (s *TSDBStoreMock) DeleteShard(shardID uint64) error {
return s.DeleteShardFn(shardID)
Expand Down
14 changes: 11 additions & 3 deletions pkg/limiter/fixed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Package limiter provides concurrency limiters.
package limiter

import "context"

// Fixed is a simple channel-based concurrency limiter. It uses a fixed
// size channel to limit callers from proceeding until there is a value available
// in the channel. If all are in-use, the caller blocks until one is freed.
Expand Down Expand Up @@ -35,9 +37,15 @@ func (t Fixed) TryTake() bool {
}
}

// Take attempts to take a token and blocks until one is available.
func (t Fixed) Take() {
t <- struct{}{}
// Take attempts to take a token and blocks until one is available OR until the given context
// is cancelled.
func (t Fixed) Take(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case t <- struct{}{}:
return nil
}
}

// Release releases a token back to the limiter.
Expand Down
39 changes: 28 additions & 11 deletions pkg/limiter/fixed_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
package limiter_test

import (
"context"
"testing"
"time"

"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/stretchr/testify/require"
)

func TestFixed_Available(t *testing.T) {
f := limiter.NewFixed(10)
if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
require.Equal(t, 10, f.Available())

f.Take()

if exp, got := 9, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
require.NoError(t, f.Take(context.Background()))
require.Equal(t, 9, f.Available())

f.Release()
require.Equal(t, 10, f.Available())
}

func TestFixed_Timeout(t *testing.T) {
f := limiter.NewFixed(1)
require.NoError(t, f.Take(context.Background()))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
err := f.Take(ctx)
require.Error(t, err)
require.Equal(t, "context deadline exceeded", err.Error())
}

func TestFixed_Canceled(t *testing.T) {
f := limiter.NewFixed(1)
require.NoError(t, f.Take(context.Background()))

if exp, got := 10, f.Available(); exp != got {
t.Fatalf("available mismatch: exp %v, got %v", exp, got)
}
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
cancel()
err := f.Take(ctx)
require.Error(t, err)
require.Equal(t, "context canceled", err.Error())
}
18 changes: 9 additions & 9 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Engine struct {
tsdbStore *tsdb.Store
metaClient MetaClient
pointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
WritePoints(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
Close() error
}

Expand Down Expand Up @@ -90,8 +90,8 @@ type MetaClient interface {
}

type TSDBStore interface {
DeleteMeasurement(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteMeasurement(ctx context.Context, database, name string) error
DeleteSeries(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
ShardGroup(ids []uint64) tsdb.ShardGroup
Shards(ids []uint64) []*tsdb.Shard
Expand Down Expand Up @@ -177,7 +177,7 @@ func (e *Engine) Open(ctx context.Context) (err error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if err := e.tsdbStore.Open(); err != nil {
if err := e.tsdbStore.Open(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func (e *Engine) WritePoints(ctx context.Context, orgID platform.ID, bucketID pl
return ErrEngineClosed
}

return e.pointsWriter.WritePoints(bucketID.String(), meta.DefaultRetentionPolicyName, models.ConsistencyLevelAll, &meta.UserInfo{}, points)
return e.pointsWriter.WritePoints(ctx, bucketID.String(), meta.DefaultRetentionPolicyName, models.ConsistencyLevelAll, &meta.UserInfo{}, points)
}

func (e *Engine) CreateBucket(ctx context.Context, b *influxdb.Bucket) (err error) {
Expand Down Expand Up @@ -317,7 +317,7 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.DeleteSeriesWithPredicate(bucketID.String(), min, max, pred)
return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred)
}

// LockKVStore locks the KV store as well as the engine in preparation for doing a backup.
Expand Down Expand Up @@ -385,7 +385,7 @@ func (e *Engine) RestoreKVStore(ctx context.Context, r io.Reader) error {
}

for _, sh := range sgi.Shards {
if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil {
if err := e.tsdbStore.CreateShard(ctx, dbi.Name, rpi.Name, sh.ID, true); err != nil {
return err
}
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func (e *Engine) RestoreBucket(ctx context.Context, id platform.ID, buf []byte)
}

for _, sh := range sgi.Shards {
if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil {
if err := e.tsdbStore.CreateShard(ctx, dbi.Name, rpi.Name, sh.ID, true); err != nil {
return nil, err
}
}
Expand All @@ -470,7 +470,7 @@ func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader)
return ErrEngineClosed
}

return e.tsdbStore.RestoreShard(shardID, r)
return e.tsdbStore.RestoreShard(ctx, shardID, r)
}

// SeriesCardinality returns the number of series in the engine.
Expand Down
12 changes: 12 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ type Config struct {
// General WAL configuration options
WALDir string `toml:"wal-dir"`

// WALMaxConcurrentWrites sets the max number of WAL writes that can be attempted at one time.
// In reality only one write to disk can run at a time, but we allow the preceding encoding steps
// to run concurrently. This can cause allocations to increase quickly when writing to a slow disk.
// Set to 0 to use the default (<nprocs> * 2).
WALMaxConcurrentWrites int `toml:"wal-max-concurrent-writes"`

// WALMaxWriteDelay is the max amount of time the WAL will wait to begin a write when there are
// already WALMaxConcurrentWrites in progress. A value of 0 disables any timeout.
WALMaxWriteDelay time.Duration `toml:"wal-max-write-delay"`

// WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration
// greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
Expand Down Expand Up @@ -151,6 +161,8 @@ func NewConfig() Config {

MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,

WALMaxWriteDelay: 10 * time.Minute,

MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize,

Expand Down
10 changes: 5 additions & 5 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (

// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Open(ctx context.Context) error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
Expand All @@ -51,12 +51,12 @@ type Engine interface {
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursorIterator(ctx context.Context) (CursorIterator, error)
IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
WritePoints(points []models.Point) error
WritePoints(ctx context.Context, points []models.Point) error

CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
DeleteSeriesRange(ctx context.Context, itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(ctx context.Context, itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error

MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
Expand All @@ -68,7 +68,7 @@ type Engine interface {
MeasurementFieldSet() *MeasurementFieldSet
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
DeleteMeasurement(ctx context.Context, name []byte) error

HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
Expand Down
Loading