From 743aef4a985b09176b6f317be3bf781401bffd8f Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 18 Jan 2021 19:02:26 -0800 Subject: [PATCH] fix(tsdb): allow backups during snapshotting, and don't leak tmp files (#20527) Co-authored-by: davidby-influx --- CHANGELOG.md | 2 + tsdb/engine.go | 2 +- tsdb/engine/tsm1/engine.go | 39 +++++---- tsdb/engine/tsm1/engine_internal_test.go | 102 +++++++++++++++++++++++ tsdb/shard.go | 4 +- tsdb/shard_test.go | 4 +- tsdb/store.go | 4 +- tsdb/store_test.go | 2 +- 8 files changed, 131 insertions(+), 28 deletions(-) create mode 100644 tsdb/engine/tsm1/engine_internal_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 666cea1c593..77383311742 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards 1. [20489](https://github.com/influxdata/influxdb/pull/20489): Improve error message when opening BoltDB with unsupported file system options. 1. [20490](https://github.com/influxdata/influxdb/pull/20490): Fix silent failure to register CLI args as required. 1. [20522](https://github.com/influxdata/influxdb/pull/20522): Fix loading config when INFLUXD_CONFIG_PATH points to a `.yml` file. +1. [20527](https://github.com/influxdata/influxdb/pull/20527): Don't leak .tmp files while backing up shards. +1. [20527](https://github.com/influxdata/influxdb/pull/20527): Allow backups to complete while a snapshot is in progress. ## v2.0.3 [2020-12-14] diff --git a/tsdb/engine.go b/tsdb/engine.go index 12923d75543..6471a0d1d1c 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -41,7 +41,7 @@ type Engine interface { LoadMetadataIndex(shardID uint64, index Index) error - CreateSnapshot() (string, error) + CreateSnapshot(skipCacheOk bool) (string, error) Backup(w io.Writer, basePath string, since time.Time) error Export(w io.Writer, basePath string, start time.Time, end time.Time) error Restore(r io.Reader, basePath string) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 5e9e12dec4f..997a8298e94 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -908,25 +908,14 @@ func (e *Engine) Free() error { // of the files in the archive. It will force a snapshot of the WAL first // then perform the backup with a read lock against the file store. This means // that new TSM files will not be able to be created in this shard while the -// backup is running. For shards that are still acively getting writes, this -// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes. +// backup is running. For shards that are still actively getting writes, this +// could cause the WAL to backup, increasing memory usage and eventually rejecting writes. func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { var err error var path string - for i := 0; i < 3; i++ { - path, err = e.CreateSnapshot() - if err != nil { - switch err { - case ErrSnapshotInProgress: - backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond - time.Sleep(backoff) - default: - return err - } - } - } - if err == ErrSnapshotInProgress { - e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.") + path, err = e.CreateSnapshot(true) + if err != nil { + return err } // Remove the temporary snapshot dir defer os.RemoveAll(path) @@ -990,7 +979,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo } func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { - path, err := e.CreateSnapshot() + path, err := e.CreateSnapshot(false) if err != nil { return err } @@ -1873,9 +1862,19 @@ func (e *Engine) WriteSnapshot() (err error) { } // CreateSnapshot will create a temp directory that holds -// temporary hardlinks to the underylyng shard files. -func (e *Engine) CreateSnapshot() (string, error) { - if err := e.WriteSnapshot(); err != nil { +// temporary hardlinks to the underlying shard files. +// skipCacheOk controls whether it is permissible to fail writing out +// in-memory cache data when a previous snapshot is in progress. +func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) { + err := e.WriteSnapshot() + for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 { + backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond + time.Sleep(backoff) + err = e.WriteSnapshot() + } + if err == ErrSnapshotInProgress && skipCacheOk { + e.logger.Warn("Snapshotter busy: proceeding without cache contents") + } else if err != nil { return "", err } diff --git a/tsdb/engine/tsm1/engine_internal_test.go b/tsdb/engine/tsm1/engine_internal_test.go new file mode 100644 index 00000000000..2ba840b529e --- /dev/null +++ b/tsdb/engine/tsm1/engine_internal_test.go @@ -0,0 +1,102 @@ +package tsm1 + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/tsdb" + "github.com/stretchr/testify/require" +) + +func TestEngine_ConcurrentShardSnapshots(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "shard_test") + require.NoError(t, err, "error creating temporary directory") + defer os.RemoveAll(tmpDir) + + tmpShard := filepath.Join(tmpDir, "shard") + tmpWal := filepath.Join(tmpDir, "wal") + + sfile := NewSeriesFile(tmpDir) + defer sfile.Close() + + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{}) + + sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts) + require.NoError(t, sh.Open(), "error opening shard") + defer sh.Close() + + points := make([]models.Point, 0, 10000) + for i := 0; i < cap(points); i++ { + points = append(points, models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0}, + time.Unix(int64(i), 0), + )) + } + err = sh.WritePoints(points) + require.NoError(t, err) + + engineInterface, err := sh.Engine() + require.NoError(t, err, "error retrieving shard engine") + + // Get the struct underlying the interface. Not a recommended practice. + realEngineStruct, ok := (engineInterface).(*Engine) + if !ok { + t.Log("Engine type does not permit simulating Cache race conditions") + return + } + // fake a race condition in snapshotting the cache. + realEngineStruct.Cache.snapshotting = true + defer func() { + realEngineStruct.Cache.snapshotting = false + }() + + snapshotFunc := func(skipCacheOk bool) { + if f, err := sh.CreateSnapshot(skipCacheOk); err == nil { + require.NoError(t, os.RemoveAll(f), "error cleaning up TestEngine_ConcurrentShardSnapshots") + } else if err == ErrSnapshotInProgress { + if skipCacheOk { + t.Fatalf("failing to ignore this error,: %s", err.Error()) + } + } else { + t.Fatalf("error creating shard snapshot: %s", err.Error()) + } + } + + // Permit skipping cache in the snapshot + snapshotFunc(true) + // do not permit skipping the cache in the snapshot + snapshotFunc(false) + realEngineStruct.Cache.snapshotting = false +} + +// NewSeriesFile returns a new instance of SeriesFile with a temporary file path. +func NewSeriesFile(tmpDir string) *tsdb.SeriesFile { + dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-") + if err != nil { + panic(err) + } + f := tsdb.NewSeriesFile(dir) + f.Logger = logger.New(os.Stdout) + if err := f.Open(); err != nil { + panic(err) + } + return f +} + +type seriesIDSets []*tsdb.SeriesIDSet + +func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error { + for _, v := range a { + f(v) + } + return nil +} diff --git a/tsdb/shard.go b/tsdb/shard.go index 984ca9cedc1..13fa285770d 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1133,12 +1133,12 @@ func (s *Shard) Import(r io.Reader, basePath string) error { // CreateSnapshot will return a path to a temp directory // containing hard links to the underlying shard files. -func (s *Shard) CreateSnapshot() (string, error) { +func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) { engine, err := s.Engine() if err != nil { return "", err } - return engine.CreateSnapshot() + return engine.CreateSnapshot(skipCacheOk) } // ForEachMeasurementName iterates over each measurement in the shard. diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index bc68b63b7f0..baee72eb35a 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -415,7 +415,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { } _ = sh.WritePoints(points[:500]) - if f, err := sh.CreateSnapshot(); err == nil { + if f, err := sh.CreateSnapshot(false); err == nil { os.RemoveAll(f) } @@ -431,7 +431,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { } _ = sh.WritePoints(points[500:]) - if f, err := sh.CreateSnapshot(); err == nil { + if f, err := sh.CreateSnapshot(false); err == nil { os.RemoveAll(f) } } diff --git a/tsdb/store.go b/tsdb/store.go index 296176b4165..1538867a534 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -629,13 +629,13 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en // CreateShardSnapShot will create a hard link to the underlying shard and return a path. // The caller is responsible for cleaning up (removing) the file path returned. -func (s *Store) CreateShardSnapshot(id uint64) (string, error) { +func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) { sh := s.Shard(id) if sh == nil { return "", ErrShardNotFound } - return sh.CreateSnapshot() + return sh.CreateSnapshot(skipCacheOk) } // SetShardEnabled enables or disables a shard for read and writes. diff --git a/tsdb/store_test.go b/tsdb/store_test.go index ac32b50d017..2ad9c5934d7 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -449,7 +449,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) { t.Fatalf("expected shard") } - dir, e := s.CreateShardSnapshot(1) + dir, e := s.CreateShardSnapshot(1, false) if e != nil { t.Fatal(e) }