From 4ae412b927da8faa5b1fd7da0e961cc4d874b942 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Tue, 8 Sep 2020 16:02:37 -0700 Subject: [PATCH 1/3] feat: Port 1.x retention policy enforcement service Configuration of the check interval is available via ``` --storage-retention-check-interval ``` Closes #19309 --- cmd/influxd/launcher/launcher.go | 7 +- go.mod | 1 + storage/engine.go | 171 +++-------- storage/metrics.go | 90 ------ storage/retention.go | 208 -------------- v1/services/retention/config.go | 49 ++++ v1/services/retention/config_test.go | 51 ++++ v1/services/retention/service.go | 163 +++++++++++ v1/services/retention/service_test.go | 397 ++++++++++++++++++++++++++ 9 files changed, 698 insertions(+), 439 deletions(-) delete mode 100644 storage/metrics.go create mode 100644 v1/services/retention/config.go create mode 100644 v1/services/retention/config_test.go create mode 100644 v1/services/retention/service.go create mode 100644 v1/services/retention/service_test.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index f24fabe3d20..bac17f70429 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -435,6 +435,11 @@ func launcherOpts(l *Launcher) []cli.Opt { Flag: "storage-tsm-use-madv-willneed", Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.", }, + { + DestP: &l.StorageConfig.RetentionInterval, + Flag: "storage-retention-check-interval", + Desc: "The interval of time when retention policy enforcement checks run.", + }, // InfluxQL Coordinator Config { @@ -802,7 +807,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { // the testing engine will write/read into a temporary directory engine := NewTemporaryEngine( m.StorageConfig, - storage.WithRetentionEnforcer(ts.BucketService), storage.WithMetaClient(metaClient), ) flushers = append(flushers, engine) @@ -816,7 +820,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.engine = storage.NewEngine( m.enginePath, m.StorageConfig, - storage.WithRetentionEnforcer(ts.BucketService), storage.WithMetaClient(metaClient), ) } diff --git a/go.mod b/go.mod index 16615025098..bc062357509 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/google/go-querystring v1.0.0 // indirect github.com/google/martian v2.1.1-0.20190517191504-25dcb96d9e51+incompatible // indirect github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect + github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/go-retryablehttp v0.6.4 // indirect github.com/hashicorp/raft v1.0.0 // indirect github.com/hashicorp/vault/api v1.0.2 diff --git a/storage/engine.go b/storage/engine.go index e51b8a446b4..7bc673067c9 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -2,15 +2,16 @@ package storage import ( "context" + "fmt" "io" "path/filepath" "sync" "time" + "github.com/hashicorp/go-multierror" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/kit/tracing" - "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/tsdb" _ "github.com/influxdata/influxdb/v2/tsdb/engine" @@ -18,28 +19,17 @@ import ( _ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" "github.com/influxdata/influxdb/v2/v1/coordinator" "github.com/influxdata/influxdb/v2/v1/services/meta" + "github.com/influxdata/influxdb/v2/v1/services/retention" "github.com/influxdata/influxql" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "golang.org/x/time/rate" ) -// Static objects to prevent small allocs. -// var timeBytes = []byte("time") - // ErrEngineClosed is returned when a caller attempts to use the engine while // it's closed. var ErrEngineClosed = errors.New("engine is closed") -// runner lets us mock out the retention enforcer in tests -type runner interface{ run() } - -// runnable is a function that lets the caller know if they can proceed with their -// task. A runnable returns a function that should be called by the caller to -// signal they finished their task. -type runnable func() (done func()) - type Engine struct { config Config path string @@ -51,56 +41,19 @@ type Engine struct { pointsWriter interface { WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error } - finder BucketFinder - retentionEnforcer runner - retentionEnforcerLimiter runnable + retentionService *retention.Service defaultMetricLabels prometheus.Labels writePointsValidationEnabled bool - // Tracks all goroutines started by the Engine. - wg sync.WaitGroup - logger *zap.Logger } // Option provides a set type Option func(*Engine) -// WithRetentionEnforcer initialises a retention enforcer on the engine. -// WithRetentionEnforcer must be called after other options to ensure that all -// metrics are labelled correctly. -func WithRetentionEnforcer(finder BucketFinder) Option { - return func(e *Engine) { - e.finder = finder - // TODO - change retention enforce to take store - // e.retentionEnforcer = newRetentionEnforcer(e, e.engine, finder) - } -} - -// WithRetentionEnforcerLimiter sets a limiter used to control when the -// retention enforcer can proceed. If this option is not used then the default -// limiter (or the absence of one) is a no-op, and no limitations will be put -// on running the retention enforcer. -func WithRetentionEnforcerLimiter(f runnable) Option { - return func(e *Engine) { - e.retentionEnforcerLimiter = f - } -} - -// WithPageFaultLimiter allows the caller to set the limiter for restricting -// the frequency of page faults. -func WithPageFaultLimiter(limiter *rate.Limiter) Option { - return func(e *Engine) { - // TODO no longer needed - // e.engine.WithPageFaultLimiter(limiter) - // e.index.WithPageFaultLimiter(limiter) - // e.sfile.WithPageFaultLimiter(limiter) - } -} - func WithMetaClient(c MetaClient) Option { return func(e *Engine) { e.metaClient = c @@ -108,12 +61,15 @@ func WithMetaClient(c MetaClient) Option { } type MetaClient interface { - Database(name string) (di *meta.DatabaseInfo) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) - UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error - RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + Database(name string) (di *meta.DatabaseInfo) + Databases() []meta.DatabaseInfo + DeleteShardGroup(database, policy string, id uint64) error + PruneShardGroups() error + RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) + UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error } type TSDBStore interface { @@ -155,9 +111,9 @@ func NewEngine(path string, c Config, options ...Option) *Engine { pw.MetaClient = e.metaClient e.pointsWriter = pw - if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok { - r.SetDefaultMetricLabels(e.defaultMetricLabels) - } + e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval}) + e.retentionService.TSDBStore = e.tsdbStore + e.retentionService.MetaClient = e.metaClient return e } @@ -173,17 +129,15 @@ func (e *Engine) WithLogger(log *zap.Logger) { pw.Logger = e.logger } - if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok { - r.WithLogger(e.logger) + if e.retentionService != nil { + e.retentionService.WithLogger(log) } } // PrometheusCollectors returns all the prometheus collectors associated with // the engine and its components. func (e *Engine) PrometheusCollectors() []prometheus.Collector { - var metrics []prometheus.Collector - metrics = append(metrics, RetentionPrometheusCollectors()...) - return metrics + return nil } // Open opens the store and all underlying resources. It returns an error if @@ -202,14 +156,13 @@ func (e *Engine) Open(ctx context.Context) (err error) { if err := e.tsdbStore.Open(); err != nil { return err } - e.closing = make(chan struct{}) - // TODO(edd) background tasks will be run in priority order via a scheduler. - // For now we will just run on an interval as we only have the retention - // policy enforcer. - if e.retentionEnforcer != nil { - e.runRetentionEnforcer() + if err := e.retentionService.Open(); err != nil { + return err } + + e.closing = make(chan struct{}) + return nil } @@ -221,72 +174,6 @@ func (e *Engine) EnableCompactions() { func (e *Engine) DisableCompactions() { } -// runRetentionEnforcer runs the retention enforcer in a separate goroutine. -// -// Currently this just runs on an interval, but in the future we will add the -// ability to reschedule the retention enforcement if there are not enough -// resources available. -func (e *Engine) runRetentionEnforcer() { - interval := time.Duration(e.config.RetentionInterval) - - if interval == 0 { - e.logger.Info("Retention enforcer disabled") - return // Enforcer disabled. - } else if interval < 0 { - e.logger.Error("Negative retention interval", logger.DurationLiteral("check_interval", interval)) - return - } - - l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval)) - l.Info("Starting") - - ticker := time.NewTicker(interval) - e.wg.Add(1) - go func() { - defer e.wg.Done() - for { - // It's safe to read closing without a lock because it's never - // modified if this goroutine is active. - select { - case <-e.closing: - l.Info("Stopping") - return - case <-ticker.C: - // canRun will signal to this goroutine that the enforcer can - // run. It will also carry from the blocking goroutine a function - // that needs to be called when the enforcer has finished its work. - canRun := make(chan func()) - - // This goroutine blocks until the retention enforcer has permission - // to proceed. - go func() { - if e.retentionEnforcerLimiter != nil { - // The limiter will block until the enforcer can proceed. - // The limiter returns a function that needs to be called - // when the enforcer has finished its work. - canRun <- e.retentionEnforcerLimiter() - return - } - canRun <- func() {} - }() - - // Is it possible to get a slot? We need to be able to close - // whilst waiting... - select { - case <-e.closing: - l.Info("Stopping") - return - case done := <-canRun: - e.retentionEnforcer.run() - if done != nil { - done() - } - } - } - } - }() -} - // Close closes the store and all underlying resources. It returns an error if // any of the underlying systems fail to close. func (e *Engine) Close() error { @@ -301,15 +188,21 @@ func (e *Engine) Close() error { close(e.closing) e.mu.RUnlock() - // Wait for any other goroutines to finish. - e.wg.Wait() - e.mu.Lock() defer e.mu.Unlock() e.closing = nil - // TODO - Close tsdb store - return nil + var retErr *multierror.Error + + if err := e.retentionService.Close(); err != nil { + retErr = multierror.Append(retErr, fmt.Errorf("error closing retention service: %w", err)) + } + + if err := e.tsdbStore.Close(); err != nil { + retErr = multierror.Append(retErr, fmt.Errorf("error closing TSDB store: %w", err)) + } + + return retErr.ErrorOrNil() } // WritePoints writes the provided points to the engine. diff --git a/storage/metrics.go b/storage/metrics.go deleted file mode 100644 index bd1eaef620e..00000000000 --- a/storage/metrics.go +++ /dev/null @@ -1,90 +0,0 @@ -package storage - -import ( - "sort" - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -// The following package variables act as singletons, to be shared by all -// storage.Engine instantiations. This allows multiple Engines to be -// monitored within the same process. -var ( - rms *retentionMetrics - mmu sync.RWMutex -) - -// RetentionPrometheusCollectors returns all prometheus metrics for retention. -func RetentionPrometheusCollectors() []prometheus.Collector { - mmu.RLock() - defer mmu.RUnlock() - - var collectors []prometheus.Collector - if rms != nil { - collectors = append(collectors, rms.PrometheusCollectors()...) - } - return collectors -} - -// namespace is the leading part of all published metrics for the Storage service. -const namespace = "storage" - -const retentionSubsystem = "retention" // sub-system associated with metrics for writing points. - -// retentionMetrics is a set of metrics concerned with tracking data about retention policies. -type retentionMetrics struct { - labels prometheus.Labels - Checks *prometheus.CounterVec - CheckDuration *prometheus.HistogramVec -} - -func newRetentionMetrics(labels prometheus.Labels) *retentionMetrics { - var names []string - for k := range labels { - names = append(names, k) - } - sort.Strings(names) - - checksNames := append(append([]string(nil), names...), "status") - sort.Strings(checksNames) - - checkDurationNames := append(append([]string(nil), names...), "status") - sort.Strings(checkDurationNames) - - return &retentionMetrics{ - labels: labels, - Checks: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: retentionSubsystem, - Name: "checks_total", - Help: "Number of retention check operations performed.", - }, checksNames), - - CheckDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: retentionSubsystem, - Name: "check_duration_seconds", - Help: "Time taken to perform a successful retention check.", - // 25 buckets spaced exponentially between 10s and ~2h - Buckets: prometheus.ExponentialBuckets(10, 1.32, 25), - }, checkDurationNames), - } -} - -// Labels returns a copy of labels for use with retention metrics. -func (m *retentionMetrics) Labels() prometheus.Labels { - l := make(map[string]string, len(m.labels)) - for k, v := range m.labels { - l[k] = v - } - return l -} - -// PrometheusCollectors satisfies the prom.PrometheusCollector interface. -func (rm *retentionMetrics) PrometheusCollectors() []prometheus.Collector { - return []prometheus.Collector{ - rm.Checks, - rm.CheckDuration, - } -} diff --git a/storage/retention.go b/storage/retention.go index 308100a463e..d7a3f43e3f8 100644 --- a/storage/retention.go +++ b/storage/retention.go @@ -2,219 +2,11 @@ package storage import ( "context" - "errors" - "math" - "time" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/kit/tracing" - "github.com/influxdata/influxdb/v2/logger" - "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -const ( - bucketAPITimeout = 10 * time.Second -) - -// A Deleter implementation is capable of deleting data from a storage engine. -type Deleter interface { - DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error -} - -// A Snapshotter implementation can take snapshots of the entire engine. -type Snapshotter interface { - WriteSnapshot(ctx context.Context) error -} - // A BucketFinder is responsible for providing access to buckets via a filter. type BucketFinder interface { FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) } - -// ErrServiceClosed is returned when the service is unavailable. -var ErrServiceClosed = errors.New("service is currently closed") - -// The retentionEnforcer periodically removes data that is outside of the retention -// period of the bucket associated with the data. -type retentionEnforcer struct { - // Engine provides access to data stored on the engine - Engine Deleter - - Snapshotter Snapshotter - - // BucketService provides an API for retrieving buckets associated with - // organisations. - BucketService BucketFinder - - logger *zap.Logger - - tracker *retentionTracker -} - -// SetDefaultMetricLabels sets the default labels for the retention metrics. -func (s *retentionEnforcer) SetDefaultMetricLabels(defaultLabels prometheus.Labels) { - if s == nil { - return // Not initialized - } - - mmu.Lock() - if rms == nil { - rms = newRetentionMetrics(defaultLabels) - } - mmu.Unlock() - - s.tracker = newRetentionTracker(rms, defaultLabels) -} - -// WithLogger sets the logger l on the service. It must be called before any run calls. -func (s *retentionEnforcer) WithLogger(l *zap.Logger) { - if s == nil { - return // Not initialised - } - s.logger = l.With(zap.String("component", "retention_enforcer")) -} - -// run periodically expires (deletes) all data that's fallen outside of the -// retention period for the associated bucket. -func (s *retentionEnforcer) run() { - if s == nil { - return // Not initialized - } - - span, ctx := tracing.StartSpanFromContext(context.Background()) - defer span.Finish() - - log, logEnd := logger.NewOperation(ctx, s.logger, "Data retention check", "data_retention_check") - defer logEnd() - - now := time.Now().UTC() - buckets, err := s.getBucketInformation(ctx) - if err != nil { - log.Error("Unable to determine bucket information", zap.Error(err)) - } else { - s.expireData(ctx, buckets, now) - } - s.tracker.CheckDuration(time.Since(now), err == nil) -} - -// expireData runs a delete operation on the storage engine. -// -// Any series data that (1) belongs to a bucket in the provided list and -// (2) falls outside the bucket's indicated retention period will be deleted. -func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.Bucket, now time.Time) { - logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion", - zap.Int("buckets", len(buckets))) - defer logEnd() - - // Snapshot to clear the cache to reduce write contention. - if err := s.Snapshotter.WriteSnapshot(ctx); err != nil && err != tsm1.ErrSnapshotInProgress { - logger.Warn("Unable to snapshot cache before retention", zap.Error(err)) - } - - var skipInf, skipInvalid int - for _, b := range buckets { - bucketFields := []zapcore.Field{ - zap.String("org_id", b.OrgID.String()), - zap.String("bucket_id", b.ID.String()), - zap.Duration("retention_period", b.RetentionPeriod), - zap.String("system_type", b.Type.String()), - } - - if b.RetentionPeriod == 0 { - logger.Debug("Skipping bucket with infinite retention", bucketFields...) - skipInf++ - continue - } else if !b.OrgID.Valid() || !b.ID.Valid() { - skipInvalid++ - logger.Warn("Skipping bucket with invalid fields", bucketFields...) - continue - } - - min := int64(math.MinInt64) - max := now.Add(-b.RetentionPeriod).UnixNano() - - span, ctx := tracing.StartSpanFromContext(ctx) - span.LogKV( - "bucket_id", b.ID, - "org_id", b.OrgID, - "system_type", b.Type, - "retention_period", b.RetentionPeriod, - "retention_policy", b.RetentionPolicyName, - "from", time.Unix(0, min).UTC(), - "to", time.Unix(0, max).UTC(), - ) - - err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, min, max) - if err != nil { - logger.Info("Unable to delete bucket range", - append(bucketFields, zap.Time("min", time.Unix(0, min)), zap.Time("max", time.Unix(0, max)), zap.Error(err))...) - tracing.LogError(span, err) - } - s.tracker.IncChecks(err == nil) - span.Finish() - } - - if skipInf > 0 || skipInvalid > 0 { - logger.Info("Skipped buckets", zap.Int("infinite_retention_total", skipInf), zap.Int("invalid_total", skipInvalid)) - } -} - -// getBucketInformation returns a slice of buckets to run retention on. -func (s *retentionEnforcer) getBucketInformation(ctx context.Context) ([]*influxdb.Bucket, error) { - ctx, cancel := context.WithTimeout(ctx, bucketAPITimeout) - defer cancel() - - buckets, _, err := s.BucketService.FindBuckets(ctx, influxdb.BucketFilter{}) - return buckets, err -} - -// -// metrics tracker -// - -type retentionTracker struct { - metrics *retentionMetrics - labels prometheus.Labels -} - -func newRetentionTracker(metrics *retentionMetrics, defaultLabels prometheus.Labels) *retentionTracker { - return &retentionTracker{metrics: metrics, labels: defaultLabels} -} - -// Labels returns a copy of labels for use with index cache metrics. -func (t *retentionTracker) Labels() prometheus.Labels { - l := make(map[string]string, len(t.labels)) - for k, v := range t.labels { - l[k] = v - } - return l -} - -// IncChecks signals that a check happened for some bucket. -func (t *retentionTracker) IncChecks(success bool) { - labels := t.Labels() - - if success { - labels["status"] = "ok" - } else { - labels["status"] = "error" - } - - t.metrics.Checks.With(labels).Inc() -} - -// CheckDuration records the overall duration of a full retention check. -func (t *retentionTracker) CheckDuration(dur time.Duration, success bool) { - labels := t.Labels() - - if success { - labels["status"] = "ok" - } else { - labels["status"] = "error" - } - - t.metrics.CheckDuration.With(labels).Observe(dur.Seconds()) -} diff --git a/v1/services/retention/config.go b/v1/services/retention/config.go new file mode 100644 index 00000000000..8b78b059758 --- /dev/null +++ b/v1/services/retention/config.go @@ -0,0 +1,49 @@ +package retention + +import ( + "errors" + "time" + + "github.com/influxdata/influxdb/v2/toml" + "github.com/influxdata/influxdb/v2/v1/monitor/diagnostics" +) + +// Config represents the configuration for the retention service. +type Config struct { + Enabled bool `toml:"enabled"` + CheckInterval toml.Duration `toml:"check-interval"` +} + +// NewConfig returns an instance of Config with defaults. +func NewConfig() Config { + return Config{Enabled: true, CheckInterval: toml.Duration(30 * time.Minute)} +} + +// Validate returns an error if the Config is invalid. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + + // TODO: Should we enforce a minimum interval? + // Polling every nanosecond, for instance, will greatly impact performance. + if c.CheckInterval <= 0 { + return errors.New("check-interval must be positive") + } + + return nil +} + +// Diagnostics returns a diagnostics representation of a subset of the Config. +func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { + if !c.Enabled { + return diagnostics.RowFromMap(map[string]interface{}{ + "enabled": false, + }), nil + } + + return diagnostics.RowFromMap(map[string]interface{}{ + "enabled": true, + "check-interval": c.CheckInterval, + }), nil +} diff --git a/v1/services/retention/config_test.go b/v1/services/retention/config_test.go new file mode 100644 index 00000000000..a31c5622f09 --- /dev/null +++ b/v1/services/retention/config_test.go @@ -0,0 +1,51 @@ +package retention_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdata/influxdb/v2/v1/services/retention" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c retention.Config + if _, err := toml.Decode(` +enabled = true +check-interval = "1s" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if !c.Enabled { + t.Fatalf("unexpected enabled state: %v", c.Enabled) + } else if time.Duration(c.CheckInterval) != time.Second { + t.Fatalf("unexpected check interval: %v", c.CheckInterval) + } +} + +func TestConfig_Validate(t *testing.T) { + c := retention.NewConfig() + if err := c.Validate(); err != nil { + t.Fatalf("unexpected validation fail from NewConfig: %s", err) + } + + c = retention.NewConfig() + c.CheckInterval = 0 + if err := c.Validate(); err == nil { + t.Fatal("expected error for check-interval = 0, got nil") + } + + c = retention.NewConfig() + c.CheckInterval *= -1 + if err := c.Validate(); err == nil { + t.Fatal("expected error for negative check-interval, got nil") + } + + c.Enabled = false + if err := c.Validate(); err != nil { + t.Fatalf("unexpected validation fail from disabled config: %s", err) + } +} diff --git a/v1/services/retention/service.go b/v1/services/retention/service.go new file mode 100644 index 00000000000..3f7dd052637 --- /dev/null +++ b/v1/services/retention/service.go @@ -0,0 +1,163 @@ +// Package retention provides the retention policy enforcement service. +package retention // import "github.com/influxdata/influxdb/services/retention" + +import ( + "context" + "sync" + "time" + + "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/v1/services/meta" + "go.uber.org/zap" +) + +// Service represents the retention policy enforcement service. +type Service struct { + MetaClient interface { + Databases() []meta.DatabaseInfo + DeleteShardGroup(database, policy string, id uint64) error + PruneShardGroups() error + } + TSDBStore interface { + ShardIDs() []uint64 + DeleteShard(shardID uint64) error + } + + config Config + wg sync.WaitGroup + done chan struct{} + + logger *zap.Logger +} + +// NewService returns a configured retention policy enforcement service. +func NewService(c Config) *Service { + return &Service{ + config: c, + logger: zap.NewNop(), + } +} + +// Open starts retention policy enforcement. +func (s *Service) Open() error { + if !s.config.Enabled || s.done != nil { + return nil + } + + s.logger.Info("Starting retention policy enforcement service", + logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) + s.done = make(chan struct{}) + + s.wg.Add(1) + go func() { defer s.wg.Done(); s.run() }() + return nil +} + +// Close stops retention policy enforcement. +func (s *Service) Close() error { + if !s.config.Enabled || s.done == nil { + return nil + } + + s.logger.Info("Closing retention policy enforcement service") + close(s.done) + + s.wg.Wait() + s.done = nil + return nil +} + +// WithLogger sets the logger on the service. +func (s *Service) WithLogger(log *zap.Logger) { + s.logger = log.With(zap.String("service", "retention")) +} + +func (s *Service) run() { + ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) + defer ticker.Stop() + for { + select { + case <-s.done: + return + + case <-ticker.C: + log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check") + + type deletionInfo struct { + db string + rp string + } + deletedShardIDs := make(map[uint64]deletionInfo) + + // Mark down if an error occurred during this function so we can inform the + // user that we will try again on the next interval. + // Without the message, they may see the error message and assume they + // have to do it manually. + var retryNeeded bool + dbs := s.MetaClient.Databases() + for _, d := range dbs { + for _, r := range d.RetentionPolicies { + // Build list of already deleted shards. + for _, g := range r.DeletedShardGroups() { + for _, sh := range g.Shards { + deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} + } + } + + // Determine all shards that have expired and need to be deleted. + for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + log.Info("Failed to delete shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name), + zap.Error(err)) + retryNeeded = true + continue + } + + log.Info("Deleted shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name)) + + // Store all the shard IDs that may possibly need to be removed locally. + for _, sh := range g.Shards { + deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} + } + } + } + } + + // Remove shards if we store them locally + for _, id := range s.TSDBStore.ShardIDs() { + if info, ok := deletedShardIDs[id]; ok { + if err := s.TSDBStore.DeleteShard(id); err != nil { + log.Info("Failed to delete shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp), + zap.Error(err)) + retryNeeded = true + continue + } + log.Info("Deleted shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp)) + } + } + + if err := s.MetaClient.PruneShardGroups(); err != nil { + log.Info("Problem pruning shard groups", zap.Error(err)) + retryNeeded = true + } + + if retryNeeded { + log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) + } + + logEnd() + } + } +} diff --git a/v1/services/retention/service_test.go b/v1/services/retention/service_test.go new file mode 100644 index 00000000000..27b4c9dbe13 --- /dev/null +++ b/v1/services/retention/service_test.go @@ -0,0 +1,397 @@ +package retention_test + +import ( + "bytes" + "fmt" + "reflect" + "sync" + "testing" + "time" + + "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/toml" + "github.com/influxdata/influxdb/v2/v1/internal" + "github.com/influxdata/influxdb/v2/v1/services/meta" + "github.com/influxdata/influxdb/v2/v1/services/retention" +) + +func TestService_OpenDisabled(t *testing.T) { + // Opening a disabled service should be a no-op. + c := retention.NewConfig() + c.Enabled = false + s := NewService(c) + + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if s.LogBuf.String() != "" { + t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String()) + } +} + +func TestService_OpenClose(t *testing.T) { + // Opening a disabled service should be a no-op. + s := NewService(retention.NewConfig()) + + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if s.LogBuf.String() == "" { + t.Fatal("service didn't log anything on open") + } + + // Reopening is a no-op + if err := s.Open(); err != nil { + t.Fatal(err) + } + + if err := s.Close(); err != nil { + t.Fatal(err) + } + + // Re-closing is a no-op + if err := s.Close(); err != nil { + t.Fatal(err) + } +} + +func TestService_CheckShards(t *testing.T) { + now := time.Now() + // Account for any time difference that could cause some of the logic in + // this test to fail due to a race condition. If we are at the very end of + // the hour, we can choose a time interval based on one "now" time and then + // run the retention service in the next hour. If we're in one of those + // situations, wait 100 milliseconds until we're in the next hour. + if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) { + time.Sleep(100 * time.Millisecond) + } + + data := []meta.DatabaseInfo{ + { + Name: "db0", + + DefaultRetentionPolicy: "rp0", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Name: "rp0", + ReplicaN: 1, + Duration: time.Hour, + ShardGroupDuration: time.Hour, + ShardGroups: []meta.ShardGroupInfo{ + { + ID: 1, + StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour), + EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour), + Shards: []meta.ShardInfo{ + {ID: 2}, + {ID: 3}, + }, + }, + { + ID: 4, + StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour), + EndTime: now.Truncate(time.Hour), + Shards: []meta.ShardInfo{ + {ID: 5}, + {ID: 6}, + }, + }, + { + ID: 7, + StartTime: now.Truncate(time.Hour), + EndTime: now.Truncate(time.Hour).Add(time.Hour), + Shards: []meta.ShardInfo{ + {ID: 8}, + {ID: 9}, + }, + }, + }, + }, + }, + }, + } + + config := retention.NewConfig() + config.CheckInterval = toml.Duration(10 * time.Millisecond) + s := NewService(config) + s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { + return data + } + + done := make(chan struct{}) + deletedShardGroups := make(map[string]struct{}) + s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error { + for _, dbi := range data { + if dbi.Name == database { + for _, rpi := range dbi.RetentionPolicies { + if rpi.Name == policy { + for i, sg := range rpi.ShardGroups { + if sg.ID == id { + rpi.ShardGroups[i].DeletedAt = time.Now().UTC() + } + } + } + } + } + } + + deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{} + if got, want := deletedShardGroups, map[string]struct{}{ + "db0.rp0.1": struct{}{}, + }; reflect.DeepEqual(got, want) { + close(done) + } else if len(got) > 1 { + t.Errorf("deleted too many shard groups") + } + return nil + } + + pruned := false + closing := make(chan struct{}) + s.MetaClient.PruneShardGroupsFn = func() error { + select { + case <-done: + if !pruned { + close(closing) + pruned = true + } + default: + } + return nil + } + + deletedShards := make(map[uint64]struct{}) + s.TSDBStore.ShardIDsFn = func() []uint64 { + return []uint64{2, 3, 5, 6} + } + s.TSDBStore.DeleteShardFn = func(shardID uint64) error { + deletedShards[shardID] = struct{}{} + return nil + } + + if err := s.Open(); err != nil { + t.Fatalf("unexpected open error: %s", err) + } + defer func() { + if err := s.Close(); err != nil { + t.Fatalf("unexpected close error: %s", err) + } + }() + + timer := time.NewTimer(100 * time.Millisecond) + select { + case <-done: + timer.Stop() + case <-timer.C: + t.Errorf("timeout waiting for shard groups to be deleted") + return + } + + timer = time.NewTimer(100 * time.Millisecond) + select { + case <-closing: + timer.Stop() + case <-timer.C: + t.Errorf("timeout waiting for shards to be deleted") + return + } + + if got, want := deletedShards, map[uint64]struct{}{ + 2: struct{}{}, + 3: struct{}{}, + }; !reflect.DeepEqual(got, want) { + t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want) + } +} + +// This reproduces https://github.com/influxdata/influxdb/issues/8819 +func TestService_8819_repro(t *testing.T) { + for i := 0; i < 1000; i++ { + s, errC, done := testService_8819_repro(t) + + if err := s.Open(); err != nil { + t.Fatal(err) + } + + // Wait for service to run one sweep of all dbs/rps/shards. + if err := <-errC; err != nil { + t.Fatalf("%dth iteration: %v", i, err) + } + // Mark that we do not expect more errors in case it runs one more time. + close(done) + + if err := s.Close(); err != nil { + t.Fatal(err) + } + } +} + +func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) { + c := retention.NewConfig() + c.CheckInterval = toml.Duration(time.Millisecond) + s := NewService(c) + errC := make(chan error, 1) // Buffer Important to prevent deadlock. + done := make(chan struct{}) + + // A database and a bunch of shards + var mu sync.Mutex + shards := []uint64{3, 5, 8, 9, 11, 12} + localShards := []uint64{3, 5, 8, 9, 11, 12} + databases := []meta.DatabaseInfo{ + { + Name: "db0", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Name: "autogen", + Duration: 24 * time.Hour, + ShardGroupDuration: 24 * time.Hour, + ShardGroups: []meta.ShardGroupInfo{ + { + ID: 1, + StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC), + EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC), + Shards: []meta.ShardInfo{ + {ID: 3}, {ID: 9}, + }, + }, + { + ID: 2, + StartTime: time.Now().Add(-1 * time.Hour), + EndTime: time.Now(), + DeletedAt: time.Now(), + Shards: []meta.ShardInfo{ + {ID: 11}, {ID: 12}, + }, + }, + }, + }, + }, + }, + } + + sendError := func(err error) { + select { + case errC <- err: + case <-done: + } + } + + s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { + mu.Lock() + defer mu.Unlock() + return databases + } + + s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { + if database != "db0" { + sendError(fmt.Errorf("wrong db name: %s", database)) + return nil + } else if policy != "autogen" { + sendError(fmt.Errorf("wrong rp name: %s", policy)) + return nil + } else if id != 1 { + sendError(fmt.Errorf("wrong shard group id: %d", id)) + return nil + } + + // remove the associated shards (3 and 9) from the shards slice... + mu.Lock() + newShards := make([]uint64, 0, len(shards)) + for _, sid := range shards { + if sid != 3 && sid != 9 { + newShards = append(newShards, sid) + } + } + shards = newShards + databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC() + mu.Unlock() + return nil + } + + s.MetaClient.PruneShardGroupsFn = func() error { + // When this is called all shards that have been deleted from the meta + // store (expired) should also have been deleted from disk. + // If they haven't then that indicates that shards can be removed from + // the meta store and there can be a race where they haven't yet been + // removed from the local disk and indexes. This has an impact on, for + // example, the max series per database limit. + + mu.Lock() + defer mu.Unlock() + for _, lid := range localShards { + var found bool + for _, mid := range shards { + if lid == mid { + found = true + break + } + } + + if !found { + sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards)) + return nil + } + } + + // We should have removed shards 3 and 9 + if !reflect.DeepEqual(localShards, []uint64{5, 8}) { + sendError(fmt.Errorf("removed shards still present locally: %v", localShards)) + return nil + } + sendError(nil) + return nil + } + + s.TSDBStore.ShardIDsFn = func() []uint64 { + mu.Lock() + defer mu.Unlock() + return localShards + } + + s.TSDBStore.DeleteShardFn = func(id uint64) error { + var found bool + mu.Lock() + newShards := make([]uint64, 0, len(localShards)) + for _, sid := range localShards { + if sid != id { + newShards = append(newShards, sid) + } else { + found = true + } + } + localShards = newShards + mu.Unlock() + + if !found { + return fmt.Errorf("shard %d not found locally", id) + } + return nil + } + + return s, errC, done +} + +type Service struct { + MetaClient *internal.MetaClientMock + TSDBStore *internal.TSDBStoreMock + + LogBuf bytes.Buffer + *retention.Service +} + +func NewService(c retention.Config) *Service { + s := &Service{ + MetaClient: &internal.MetaClientMock{}, + TSDBStore: &internal.TSDBStoreMock{}, + Service: retention.NewService(c), + } + + l := logger.New(&s.LogBuf) + s.WithLogger(l) + + s.Service.MetaClient = s.MetaClient + s.Service.TSDBStore = s.TSDBStore + return s +} From 0123fedbcd5883f47db8b0d14dbbc0dc3e9a763a Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 9 Sep 2020 06:48:43 -0700 Subject: [PATCH 2/3] chore: Closing in goroutine causes race with logging framework --- task/backend/analytical_storage_test.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/task/backend/analytical_storage_test.go b/task/backend/analytical_storage_test.go index 0da4834231a..d8bb789da26 100644 --- a/task/backend/analytical_storage_test.go +++ b/task/backend/analytical_storage_test.go @@ -66,24 +66,22 @@ func TestAnalyticalStore(t *testing.T) { ts.BucketService = storage.NewBucketService(ts.BucketService, ab.storageEngine) - go func() { - <-ctx.Done() - ab.Close(t) - }() - authCtx := icontext.SetAuthorizer(ctx, &influxdb.Authorization{ Permissions: influxdb.OperPermissions(), }) return &servicetest.System{ - TaskControlService: svcStack, - TaskService: svcStack, - OrganizationService: ts.OrganizationService, - UserService: ts.UserService, - UserResourceMappingService: ts.UserResourceMappingService, - AuthorizationService: authSvc, - Ctx: authCtx, - }, cancelFunc + TaskControlService: svcStack, + TaskService: svcStack, + OrganizationService: ts.OrganizationService, + UserService: ts.UserService, + UserResourceMappingService: ts.UserResourceMappingService, + AuthorizationService: authSvc, + Ctx: authCtx, + }, func() { + cancelFunc() + ab.Close(t) + } }, ) } From 027143aeaea5104b8c2fc0cce4d87a2acd5632c0 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 9 Sep 2020 09:48:47 -0700 Subject: [PATCH 3/3] fix: Improvements in response to PR feedback * Pass context.Context to Service.Open * Remove redundant comments * Bind to retention.Config configuration to be consistent with 1.x --- cmd/influxd/launcher/launcher.go | 2 +- storage/config.go | 20 ++++---------------- storage/engine.go | 4 ++-- v1/services/retention/config.go | 2 -- v1/services/retention/service.go | 26 ++++++++++++++++---------- v1/services/retention/service_test.go | 12 +++++++----- 6 files changed, 30 insertions(+), 36 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index bac17f70429..e3548ef8e2d 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -436,7 +436,7 @@ func launcherOpts(l *Launcher) []cli.Opt { Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.", }, { - DestP: &l.StorageConfig.RetentionInterval, + DestP: &l.StorageConfig.RetentionService.CheckInterval, Flag: "storage-retention-check-interval", Desc: "The interval of time when retention policy enforcement checks run.", }, diff --git a/storage/config.go b/storage/config.go index 35bee8d9e17..ef953a296d8 100644 --- a/storage/config.go +++ b/storage/config.go @@ -1,33 +1,21 @@ package storage import ( - "time" - - "github.com/influxdata/influxdb/v2/toml" "github.com/influxdata/influxdb/v2/tsdb" -) - -// Default configuration values. -const ( - DefaultRetentionInterval = time.Hour - DefaultSeriesFileDirectoryName = "_series" - DefaultIndexDirectoryName = "index" - DefaultWALDirectoryName = "wal" - DefaultEngineDirectoryName = "data" + "github.com/influxdata/influxdb/v2/v1/services/retention" ) // Config holds the configuration for an Engine. type Config struct { Data tsdb.Config - // Frequency of retention in seconds. - RetentionInterval toml.Duration `toml:"retention-interval"` + RetentionService retention.Config } // NewConfig initialises a new config for an Engine. func NewConfig() Config { return Config{ - Data: tsdb.NewConfig(), - RetentionInterval: toml.Duration(DefaultRetentionInterval), + Data: tsdb.NewConfig(), + RetentionService: retention.NewConfig(), } } diff --git a/storage/engine.go b/storage/engine.go index 7bc673067c9..8518f48dae9 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -111,7 +111,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { pw.MetaClient = e.metaClient e.pointsWriter = pw - e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval}) + e.retentionService = retention.NewService(c.RetentionService) e.retentionService.TSDBStore = e.tsdbStore e.retentionService.MetaClient = e.metaClient @@ -157,7 +157,7 @@ func (e *Engine) Open(ctx context.Context) (err error) { return err } - if err := e.retentionService.Open(); err != nil { + if err := e.retentionService.Open(ctx); err != nil { return err } diff --git a/v1/services/retention/config.go b/v1/services/retention/config.go index 8b78b059758..14d8f7245aa 100644 --- a/v1/services/retention/config.go +++ b/v1/services/retention/config.go @@ -25,8 +25,6 @@ func (c Config) Validate() error { return nil } - // TODO: Should we enforce a minimum interval? - // Polling every nanosecond, for instance, will greatly impact performance. if c.CheckInterval <= 0 { return errors.New("check-interval must be positive") } diff --git a/v1/services/retention/service.go b/v1/services/retention/service.go index 3f7dd052637..a8de45bf8f9 100644 --- a/v1/services/retention/service.go +++ b/v1/services/retention/service.go @@ -25,7 +25,7 @@ type Service struct { config Config wg sync.WaitGroup - done chan struct{} + cancel context.CancelFunc logger *zap.Logger } @@ -39,31 +39,37 @@ func NewService(c Config) *Service { } // Open starts retention policy enforcement. -func (s *Service) Open() error { - if !s.config.Enabled || s.done != nil { +func (s *Service) Open(ctx context.Context) error { + if !s.config.Enabled || s.cancel != nil { return nil } s.logger.Info("Starting retention policy enforcement service", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) - s.done = make(chan struct{}) + + ctx, s.cancel = context.WithCancel(ctx) s.wg.Add(1) - go func() { defer s.wg.Done(); s.run() }() + go func() { + defer s.wg.Done() + s.run(ctx) + }() return nil } // Close stops retention policy enforcement. func (s *Service) Close() error { - if !s.config.Enabled || s.done == nil { + if !s.config.Enabled || s.cancel == nil { return nil } s.logger.Info("Closing retention policy enforcement service") - close(s.done) + s.cancel() s.wg.Wait() - s.done = nil + + s.cancel = nil + return nil } @@ -72,12 +78,12 @@ func (s *Service) WithLogger(log *zap.Logger) { s.logger = log.With(zap.String("service", "retention")) } -func (s *Service) run() { +func (s *Service) run(ctx context.Context) { ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) defer ticker.Stop() for { select { - case <-s.done: + case <-ctx.Done(): return case <-ticker.C: diff --git a/v1/services/retention/service_test.go b/v1/services/retention/service_test.go index 27b4c9dbe13..7fdd52f08e5 100644 --- a/v1/services/retention/service_test.go +++ b/v1/services/retention/service_test.go @@ -2,6 +2,7 @@ package retention_test import ( "bytes" + "context" "fmt" "reflect" "sync" @@ -21,7 +22,7 @@ func TestService_OpenDisabled(t *testing.T) { c.Enabled = false s := NewService(c) - if err := s.Open(); err != nil { + if err := s.Open(context.Background()); err != nil { t.Fatal(err) } @@ -34,7 +35,8 @@ func TestService_OpenClose(t *testing.T) { // Opening a disabled service should be a no-op. s := NewService(retention.NewConfig()) - if err := s.Open(); err != nil { + ctx := context.Background() + if err := s.Open(ctx); err != nil { t.Fatal(err) } @@ -43,7 +45,7 @@ func TestService_OpenClose(t *testing.T) { } // Reopening is a no-op - if err := s.Open(); err != nil { + if err := s.Open(ctx); err != nil { t.Fatal(err) } @@ -171,7 +173,7 @@ func TestService_CheckShards(t *testing.T) { return nil } - if err := s.Open(); err != nil { + if err := s.Open(context.Background()); err != nil { t.Fatalf("unexpected open error: %s", err) } defer func() { @@ -211,7 +213,7 @@ func TestService_8819_repro(t *testing.T) { for i := 0; i < 1000; i++ { s, errC, done := testService_8819_repro(t) - if err := s.Open(); err != nil { + if err := s.Open(context.Background()); err != nil { t.Fatal(err) }