Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Port 1.x retention policy enforcement service #19521

Merged
merged 3 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ func launcherOpts(l *Launcher) []cli.Opt {
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &l.StorageConfig.RetentionService.CheckInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},

// InfluxQL Coordinator Config
{
Expand Down Expand Up @@ -802,7 +807,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
flushers = append(flushers, engine)
Expand All @@ -816,7 +820,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/martian v2.1.1-0.20190517191504-25dcb96d9e51+incompatible // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-retryablehttp v0.6.4 // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
Expand Down
20 changes: 4 additions & 16 deletions storage/config.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,21 @@
package storage

import (
"time"

"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/tsdb"
)

// Default configuration values.
const (
DefaultRetentionInterval = time.Hour
DefaultSeriesFileDirectoryName = "_series"
DefaultIndexDirectoryName = "index"
DefaultWALDirectoryName = "wal"
DefaultEngineDirectoryName = "data"
"github.com/influxdata/influxdb/v2/v1/services/retention"
)

// Config holds the configuration for an Engine.
type Config struct {
Data tsdb.Config

// Frequency of retention in seconds.
RetentionInterval toml.Duration `toml:"retention-interval"`
RetentionService retention.Config
}

// NewConfig initialises a new config for an Engine.
func NewConfig() Config {
return Config{
Data: tsdb.NewConfig(),
RetentionInterval: toml.Duration(DefaultRetentionInterval),
Data: tsdb.NewConfig(),
RetentionService: retention.NewConfig(),
}
}
171 changes: 32 additions & 139 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,34 @@ package storage

import (
"context"
"fmt"
"io"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
_ "github.com/influxdata/influxdb/v2/tsdb/index/inmem"
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxdb/v2/v1/services/retention"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Static objects to prevent small allocs.
// var timeBytes = []byte("time")

// ErrEngineClosed is returned when a caller attempts to use the engine while
// it's closed.
var ErrEngineClosed = errors.New("engine is closed")

// runner lets us mock out the retention enforcer in tests
type runner interface{ run() }

// runnable is a function that lets the caller know if they can proceed with their
// task. A runnable returns a function that should be called by the caller to
// signal they finished their task.
type runnable func() (done func())

type Engine struct {
config Config
path string
Expand All @@ -51,69 +41,35 @@ type Engine struct {
pointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
finder BucketFinder

retentionEnforcer runner
retentionEnforcerLimiter runnable
retentionService *retention.Service

defaultMetricLabels prometheus.Labels

writePointsValidationEnabled bool

// Tracks all goroutines started by the Engine.
wg sync.WaitGroup

logger *zap.Logger
}

// Option provides a set
type Option func(*Engine)

// WithRetentionEnforcer initialises a retention enforcer on the engine.
// WithRetentionEnforcer must be called after other options to ensure that all
// metrics are labelled correctly.
func WithRetentionEnforcer(finder BucketFinder) Option {
return func(e *Engine) {
e.finder = finder
// TODO - change retention enforce to take store
// e.retentionEnforcer = newRetentionEnforcer(e, e.engine, finder)
}
}

// WithRetentionEnforcerLimiter sets a limiter used to control when the
// retention enforcer can proceed. If this option is not used then the default
// limiter (or the absence of one) is a no-op, and no limitations will be put
// on running the retention enforcer.
func WithRetentionEnforcerLimiter(f runnable) Option {
return func(e *Engine) {
e.retentionEnforcerLimiter = f
}
}

// WithPageFaultLimiter allows the caller to set the limiter for restricting
// the frequency of page faults.
func WithPageFaultLimiter(limiter *rate.Limiter) Option {
return func(e *Engine) {
// TODO no longer needed
// e.engine.WithPageFaultLimiter(limiter)
// e.index.WithPageFaultLimiter(limiter)
// e.sfile.WithPageFaultLimiter(limiter)
}
}

func WithMetaClient(c MetaClient) Option {
return func(e *Engine) {
e.metaClient = c
}
}

type MetaClient interface {
Database(name string) (di *meta.DatabaseInfo)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
Database(name string) (di *meta.DatabaseInfo)
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
PruneShardGroups() error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
}

type TSDBStore interface {
Expand Down Expand Up @@ -155,9 +111,9 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
pw.MetaClient = e.metaClient
e.pointsWriter = pw

if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.SetDefaultMetricLabels(e.defaultMetricLabels)
}
e.retentionService = retention.NewService(c.RetentionService)
e.retentionService.TSDBStore = e.tsdbStore
e.retentionService.MetaClient = e.metaClient

return e
}
Expand All @@ -173,17 +129,15 @@ func (e *Engine) WithLogger(log *zap.Logger) {
pw.Logger = e.logger
}

if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.WithLogger(e.logger)
if e.retentionService != nil {
e.retentionService.WithLogger(log)
}
}

// PrometheusCollectors returns all the prometheus collectors associated with
// the engine and its components.
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, RetentionPrometheusCollectors()...)
return metrics
return nil
}

// Open opens the store and all underlying resources. It returns an error if
Expand All @@ -202,14 +156,13 @@ func (e *Engine) Open(ctx context.Context) (err error) {
if err := e.tsdbStore.Open(); err != nil {
return err
}
e.closing = make(chan struct{})

// TODO(edd) background tasks will be run in priority order via a scheduler.
// For now we will just run on an interval as we only have the retention
// policy enforcer.
if e.retentionEnforcer != nil {
e.runRetentionEnforcer()
if err := e.retentionService.Open(ctx); err != nil {
return err
}

e.closing = make(chan struct{})

return nil
}

Expand All @@ -221,72 +174,6 @@ func (e *Engine) EnableCompactions() {
func (e *Engine) DisableCompactions() {
}

// runRetentionEnforcer runs the retention enforcer in a separate goroutine.
//
// Currently this just runs on an interval, but in the future we will add the
// ability to reschedule the retention enforcement if there are not enough
// resources available.
func (e *Engine) runRetentionEnforcer() {
interval := time.Duration(e.config.RetentionInterval)

if interval == 0 {
e.logger.Info("Retention enforcer disabled")
return // Enforcer disabled.
} else if interval < 0 {
e.logger.Error("Negative retention interval", logger.DurationLiteral("check_interval", interval))
return
}

l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval))
l.Info("Starting")

ticker := time.NewTicker(interval)
e.wg.Add(1)
go func() {
defer e.wg.Done()
for {
// It's safe to read closing without a lock because it's never
// modified if this goroutine is active.
select {
case <-e.closing:
l.Info("Stopping")
return
case <-ticker.C:
// canRun will signal to this goroutine that the enforcer can
// run. It will also carry from the blocking goroutine a function
// that needs to be called when the enforcer has finished its work.
canRun := make(chan func())

// This goroutine blocks until the retention enforcer has permission
// to proceed.
go func() {
if e.retentionEnforcerLimiter != nil {
// The limiter will block until the enforcer can proceed.
// The limiter returns a function that needs to be called
// when the enforcer has finished its work.
canRun <- e.retentionEnforcerLimiter()
return
}
canRun <- func() {}
}()

// Is it possible to get a slot? We need to be able to close
// whilst waiting...
select {
case <-e.closing:
l.Info("Stopping")
return
case done := <-canRun:
e.retentionEnforcer.run()
if done != nil {
done()
}
}
}
}
}()
}

// Close closes the store and all underlying resources. It returns an error if
// any of the underlying systems fail to close.
func (e *Engine) Close() error {
Expand All @@ -301,15 +188,21 @@ func (e *Engine) Close() error {
close(e.closing)
e.mu.RUnlock()

// Wait for any other goroutines to finish.
e.wg.Wait()

e.mu.Lock()
defer e.mu.Unlock()
e.closing = nil

// TODO - Close tsdb store
return nil
var retErr *multierror.Error

if err := e.retentionService.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing retention service: %w", err))
}

if err := e.tsdbStore.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing TSDB store: %w", err))
}

return retErr.ErrorOrNil()
}

// WritePoints writes the provided points to the engine.
Expand Down
Loading