Skip to content

Commit

Permalink
feat: Port 1.x retention policy enforcement service
Browse files Browse the repository at this point in the history
Configuration of the check interval is available via

```
--storage-retention-check-interval
```

Closes #19309
  • Loading branch information
stuartcarnie committed Sep 8, 2020
1 parent e51d1cb commit faed872
Show file tree
Hide file tree
Showing 9 changed files with 698 additions and 439 deletions.
7 changes: 5 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ func launcherOpts(l *Launcher) []cli.Opt {
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &l.StorageConfig.RetentionInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},

// InfluxQL Coordinator Config
{
Expand Down Expand Up @@ -802,7 +807,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
flushers = append(flushers, engine)
Expand All @@ -816,7 +820,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/martian v2.1.1-0.20190517191504-25dcb96d9e51+incompatible // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-retryablehttp v0.6.4 // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
Expand Down
171 changes: 32 additions & 139 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,34 @@ package storage

import (
"context"
"fmt"
"io"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
_ "github.com/influxdata/influxdb/v2/tsdb/index/inmem"
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxdb/v2/v1/services/retention"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Static objects to prevent small allocs.
// var timeBytes = []byte("time")

// ErrEngineClosed is returned when a caller attempts to use the engine while
// it's closed.
var ErrEngineClosed = errors.New("engine is closed")

// runner lets us mock out the retention enforcer in tests
type runner interface{ run() }

// runnable is a function that lets the caller know if they can proceed with their
// task. A runnable returns a function that should be called by the caller to
// signal they finished their task.
type runnable func() (done func())

type Engine struct {
config Config
path string
Expand All @@ -51,69 +41,35 @@ type Engine struct {
pointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
finder BucketFinder

retentionEnforcer runner
retentionEnforcerLimiter runnable
retentionService *retention.Service

defaultMetricLabels prometheus.Labels

writePointsValidationEnabled bool

// Tracks all goroutines started by the Engine.
wg sync.WaitGroup

logger *zap.Logger
}

// Option provides a set
type Option func(*Engine)

// WithRetentionEnforcer initialises a retention enforcer on the engine.
// WithRetentionEnforcer must be called after other options to ensure that all
// metrics are labelled correctly.
func WithRetentionEnforcer(finder BucketFinder) Option {
return func(e *Engine) {
e.finder = finder
// TODO - change retention enforce to take store
// e.retentionEnforcer = newRetentionEnforcer(e, e.engine, finder)
}
}

// WithRetentionEnforcerLimiter sets a limiter used to control when the
// retention enforcer can proceed. If this option is not used then the default
// limiter (or the absence of one) is a no-op, and no limitations will be put
// on running the retention enforcer.
func WithRetentionEnforcerLimiter(f runnable) Option {
return func(e *Engine) {
e.retentionEnforcerLimiter = f
}
}

// WithPageFaultLimiter allows the caller to set the limiter for restricting
// the frequency of page faults.
func WithPageFaultLimiter(limiter *rate.Limiter) Option {
return func(e *Engine) {
// TODO no longer needed
// e.engine.WithPageFaultLimiter(limiter)
// e.index.WithPageFaultLimiter(limiter)
// e.sfile.WithPageFaultLimiter(limiter)
}
}

func WithMetaClient(c MetaClient) Option {
return func(e *Engine) {
e.metaClient = c
}
}

type MetaClient interface {
Database(name string) (di *meta.DatabaseInfo)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
Database(name string) (di *meta.DatabaseInfo)
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
PruneShardGroups() error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
}

type TSDBStore interface {
Expand Down Expand Up @@ -155,9 +111,9 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
pw.MetaClient = e.metaClient
e.pointsWriter = pw

if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.SetDefaultMetricLabels(e.defaultMetricLabels)
}
e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval})
e.retentionService.TSDBStore = e.tsdbStore
e.retentionService.MetaClient = e.metaClient

return e
}
Expand All @@ -173,17 +129,15 @@ func (e *Engine) WithLogger(log *zap.Logger) {
pw.Logger = e.logger
}

if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.WithLogger(e.logger)
if e.retentionService != nil {
e.retentionService.WithLogger(log)
}
}

// PrometheusCollectors returns all the prometheus collectors associated with
// the engine and its components.
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, RetentionPrometheusCollectors()...)
return metrics
return nil
}

// Open opens the store and all underlying resources. It returns an error if
Expand All @@ -202,14 +156,13 @@ func (e *Engine) Open(ctx context.Context) (err error) {
if err := e.tsdbStore.Open(); err != nil {
return err
}
e.closing = make(chan struct{})

// TODO(edd) background tasks will be run in priority order via a scheduler.
// For now we will just run on an interval as we only have the retention
// policy enforcer.
if e.retentionEnforcer != nil {
e.runRetentionEnforcer()
if err := e.retentionService.Open(); err != nil {
return err
}

e.closing = make(chan struct{})

return nil
}

Expand All @@ -221,72 +174,6 @@ func (e *Engine) EnableCompactions() {
func (e *Engine) DisableCompactions() {
}

// runRetentionEnforcer runs the retention enforcer in a separate goroutine.
//
// Currently this just runs on an interval, but in the future we will add the
// ability to reschedule the retention enforcement if there are not enough
// resources available.
func (e *Engine) runRetentionEnforcer() {
interval := time.Duration(e.config.RetentionInterval)

if interval == 0 {
e.logger.Info("Retention enforcer disabled")
return // Enforcer disabled.
} else if interval < 0 {
e.logger.Error("Negative retention interval", logger.DurationLiteral("check_interval", interval))
return
}

l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval))
l.Info("Starting")

ticker := time.NewTicker(interval)
e.wg.Add(1)
go func() {
defer e.wg.Done()
for {
// It's safe to read closing without a lock because it's never
// modified if this goroutine is active.
select {
case <-e.closing:
l.Info("Stopping")
return
case <-ticker.C:
// canRun will signal to this goroutine that the enforcer can
// run. It will also carry from the blocking goroutine a function
// that needs to be called when the enforcer has finished its work.
canRun := make(chan func())

// This goroutine blocks until the retention enforcer has permission
// to proceed.
go func() {
if e.retentionEnforcerLimiter != nil {
// The limiter will block until the enforcer can proceed.
// The limiter returns a function that needs to be called
// when the enforcer has finished its work.
canRun <- e.retentionEnforcerLimiter()
return
}
canRun <- func() {}
}()

// Is it possible to get a slot? We need to be able to close
// whilst waiting...
select {
case <-e.closing:
l.Info("Stopping")
return
case done := <-canRun:
e.retentionEnforcer.run()
if done != nil {
done()
}
}
}
}
}()
}

// Close closes the store and all underlying resources. It returns an error if
// any of the underlying systems fail to close.
func (e *Engine) Close() error {
Expand All @@ -301,15 +188,21 @@ func (e *Engine) Close() error {
close(e.closing)
e.mu.RUnlock()

// Wait for any other goroutines to finish.
e.wg.Wait()

e.mu.Lock()
defer e.mu.Unlock()
e.closing = nil

// TODO - Close tsdb store
return nil
var retErr *multierror.Error

if err := e.retentionService.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing retention service: %w", err))
}

if err := e.tsdbStore.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing TSDB store: %w", err))
}

return retErr.ErrorOrNil()
}

// WritePoints writes the provided points to the engine.
Expand Down
Loading

0 comments on commit faed872

Please sign in to comment.