Skip to content

Commit 027143a

Browse files
committed
fix: Improvements in response to PR feedback
* Pass context.Context to Service.Open * Remove redundant comments * Bind to retention.Config configuration to be consistent with 1.x
1 parent 0123fed commit 027143a

File tree

6 files changed

+30
-36
lines changed

6 files changed

+30
-36
lines changed

cmd/influxd/launcher/launcher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ func launcherOpts(l *Launcher) []cli.Opt {
436436
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
437437
},
438438
{
439-
DestP: &l.StorageConfig.RetentionInterval,
439+
DestP: &l.StorageConfig.RetentionService.CheckInterval,
440440
Flag: "storage-retention-check-interval",
441441
Desc: "The interval of time when retention policy enforcement checks run.",
442442
},

storage/config.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,21 @@
11
package storage
22

33
import (
4-
"time"
5-
6-
"github.com/influxdata/influxdb/v2/toml"
74
"github.com/influxdata/influxdb/v2/tsdb"
8-
)
9-
10-
// Default configuration values.
11-
const (
12-
DefaultRetentionInterval = time.Hour
13-
DefaultSeriesFileDirectoryName = "_series"
14-
DefaultIndexDirectoryName = "index"
15-
DefaultWALDirectoryName = "wal"
16-
DefaultEngineDirectoryName = "data"
5+
"github.com/influxdata/influxdb/v2/v1/services/retention"
176
)
187

198
// Config holds the configuration for an Engine.
209
type Config struct {
2110
Data tsdb.Config
2211

23-
// Frequency of retention in seconds.
24-
RetentionInterval toml.Duration `toml:"retention-interval"`
12+
RetentionService retention.Config
2513
}
2614

2715
// NewConfig initialises a new config for an Engine.
2816
func NewConfig() Config {
2917
return Config{
30-
Data: tsdb.NewConfig(),
31-
RetentionInterval: toml.Duration(DefaultRetentionInterval),
18+
Data: tsdb.NewConfig(),
19+
RetentionService: retention.NewConfig(),
3220
}
3321
}

storage/engine.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
111111
pw.MetaClient = e.metaClient
112112
e.pointsWriter = pw
113113

114-
e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval})
114+
e.retentionService = retention.NewService(c.RetentionService)
115115
e.retentionService.TSDBStore = e.tsdbStore
116116
e.retentionService.MetaClient = e.metaClient
117117

@@ -157,7 +157,7 @@ func (e *Engine) Open(ctx context.Context) (err error) {
157157
return err
158158
}
159159

160-
if err := e.retentionService.Open(); err != nil {
160+
if err := e.retentionService.Open(ctx); err != nil {
161161
return err
162162
}
163163

v1/services/retention/config.go

-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ func (c Config) Validate() error {
2525
return nil
2626
}
2727

28-
// TODO: Should we enforce a minimum interval?
29-
// Polling every nanosecond, for instance, will greatly impact performance.
3028
if c.CheckInterval <= 0 {
3129
return errors.New("check-interval must be positive")
3230
}

v1/services/retention/service.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Service struct {
2525

2626
config Config
2727
wg sync.WaitGroup
28-
done chan struct{}
28+
cancel context.CancelFunc
2929

3030
logger *zap.Logger
3131
}
@@ -39,31 +39,37 @@ func NewService(c Config) *Service {
3939
}
4040

4141
// Open starts retention policy enforcement.
42-
func (s *Service) Open() error {
43-
if !s.config.Enabled || s.done != nil {
42+
func (s *Service) Open(ctx context.Context) error {
43+
if !s.config.Enabled || s.cancel != nil {
4444
return nil
4545
}
4646

4747
s.logger.Info("Starting retention policy enforcement service",
4848
logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
49-
s.done = make(chan struct{})
49+
50+
ctx, s.cancel = context.WithCancel(ctx)
5051

5152
s.wg.Add(1)
52-
go func() { defer s.wg.Done(); s.run() }()
53+
go func() {
54+
defer s.wg.Done()
55+
s.run(ctx)
56+
}()
5357
return nil
5458
}
5559

5660
// Close stops retention policy enforcement.
5761
func (s *Service) Close() error {
58-
if !s.config.Enabled || s.done == nil {
62+
if !s.config.Enabled || s.cancel == nil {
5963
return nil
6064
}
6165

6266
s.logger.Info("Closing retention policy enforcement service")
63-
close(s.done)
67+
s.cancel()
6468

6569
s.wg.Wait()
66-
s.done = nil
70+
71+
s.cancel = nil
72+
6773
return nil
6874
}
6975

@@ -72,12 +78,12 @@ func (s *Service) WithLogger(log *zap.Logger) {
7278
s.logger = log.With(zap.String("service", "retention"))
7379
}
7480

75-
func (s *Service) run() {
81+
func (s *Service) run(ctx context.Context) {
7682
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
7783
defer ticker.Stop()
7884
for {
7985
select {
80-
case <-s.done:
86+
case <-ctx.Done():
8187
return
8288

8389
case <-ticker.C:

v1/services/retention/service_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package retention_test
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"reflect"
78
"sync"
@@ -21,7 +22,7 @@ func TestService_OpenDisabled(t *testing.T) {
2122
c.Enabled = false
2223
s := NewService(c)
2324

24-
if err := s.Open(); err != nil {
25+
if err := s.Open(context.Background()); err != nil {
2526
t.Fatal(err)
2627
}
2728

@@ -34,7 +35,8 @@ func TestService_OpenClose(t *testing.T) {
3435
// Opening a disabled service should be a no-op.
3536
s := NewService(retention.NewConfig())
3637

37-
if err := s.Open(); err != nil {
38+
ctx := context.Background()
39+
if err := s.Open(ctx); err != nil {
3840
t.Fatal(err)
3941
}
4042

@@ -43,7 +45,7 @@ func TestService_OpenClose(t *testing.T) {
4345
}
4446

4547
// Reopening is a no-op
46-
if err := s.Open(); err != nil {
48+
if err := s.Open(ctx); err != nil {
4749
t.Fatal(err)
4850
}
4951

@@ -171,7 +173,7 @@ func TestService_CheckShards(t *testing.T) {
171173
return nil
172174
}
173175

174-
if err := s.Open(); err != nil {
176+
if err := s.Open(context.Background()); err != nil {
175177
t.Fatalf("unexpected open error: %s", err)
176178
}
177179
defer func() {
@@ -211,7 +213,7 @@ func TestService_8819_repro(t *testing.T) {
211213
for i := 0; i < 1000; i++ {
212214
s, errC, done := testService_8819_repro(t)
213215

214-
if err := s.Open(); err != nil {
216+
if err := s.Open(context.Background()); err != nil {
215217
t.Fatal(err)
216218
}
217219

0 commit comments

Comments
 (0)