Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tsm1): fix data race when accessing tombstone stats #20773

Merged
merged 3 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
1. [19936](https://github.com/influxdata/influxdb/pull/19936): Fix use-after-free bug in series ID iterator. Thanks @foobar!
1. [20585](https://github.com/influxdata/influxdb/pull/20585): Fix TSM WAL segement size check. Thanks @foobar!
1. [20754](https://github.com/influxdata/influxdb/pull/20754): Update references to docs site to use current URLs.
1. [20773](https://github.com/influxdata/influxdb/pull/20773): Fix data race in TSM engine when inspecting tombstone stats.

## v2.0.4 [2021-02-08]

Expand Down
6 changes: 2 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,6 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
}

var tombstonePath string
f, err := os.Open(fullPath)
if err != nil {
return err
Expand All @@ -940,9 +939,8 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}

// Grab the tombstone file if one exists.
if r.HasTombstones() {
tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path)
return intar.StreamFile(fi, shardRelativePath, tombstonePath, tw)
if ts := r.TombstoneStats(); ts.TombstoneExists {
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
}

min, max := r.TimeRange()
Expand Down
33 changes: 20 additions & 13 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ type TSMFile interface {
// HasTombstones returns true if file contains values that have been deleted.
HasTombstones() bool

// TombstoneFiles returns the tombstone filestats if there are any tombstones
// TombstoneStats returns the tombstone filestats if there are any tombstones
// written for this file.
TombstoneFiles() []FileStat
TombstoneStats() TombstoneStat

// Close closes the underlying file resources.
Close() error
Expand All @@ -121,7 +121,7 @@ type TSMFile interface {
Size() uint32

// Rename renames the existing TSM file to a new name and replaces the mmap backing slice using the new
// file name. Index and Reader state are not re-initialized.
// file name. Index and Reader state are not re-initialized.
Rename(path string) error

// Remove deletes the file from the filesystem.
Expand Down Expand Up @@ -205,6 +205,14 @@ type FileStat struct {
MinKey, MaxKey []byte
}

// TombstoneStat holds information about a possible tombstone file on disk.
type TombstoneStat struct {
TombstoneExists bool
Path string
LastModified int64
Size uint32
}

// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (f FileStat) OverlapsTimeRange(min, max int64) bool {
return f.MinTime <= max && f.MaxTime >= min
Expand Down Expand Up @@ -579,7 +587,7 @@ func (f *FileStore) Open() error {

// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
for _, ts := range res.r.TombstoneFiles() {
if ts := res.r.TombstoneStats(); ts.TombstoneExists {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
}

Expand Down Expand Up @@ -812,8 +820,8 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
return err
}

for _, t := range file.TombstoneFiles() {
if err := f.obs.FileUnlinking(t.Path); err != nil {
if ts := file.TombstoneStats(); ts.TombstoneExists {
if err := f.obs.FileUnlinking(ts.Path); err != nil {
return err
}
}
Expand All @@ -831,8 +839,8 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
if file.InUse() {
// Copy all the tombstones related to this TSM file
var deletes []string
for _, t := range file.TombstoneFiles() {
deletes = append(deletes, t.Path)
if ts := file.TombstoneStats(); ts.TombstoneExists {
deletes = append(deletes, ts.Path)
}

// Rename the TSM file used by this reader
Expand Down Expand Up @@ -894,10 +902,9 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
var totalSize int64
for _, file := range f.files {
totalSize += int64(file.Size())
for _, ts := range file.TombstoneFiles() {
if ts := file.TombstoneStats(); ts.TombstoneExists {
totalSize += int64(ts.Size)
}

}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)

Expand Down Expand Up @@ -1084,9 +1091,9 @@ func (f *FileStore) CreateSnapshot() (string, error) {
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
for _, tf := range tsmf.TombstoneFiles() {
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tf.Path, newpath); err != nil {
if ts := tsmf.TombstoneStats(); ts.TombstoneExists {
newpath := filepath.Join(tmpPath, filepath.Base(ts.Path))
if err := os.Link(ts.Path, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/file_store_key_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (*mockTSMFile) BatchDelete() BatchDeleter { panic("im
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
func (*mockTSMFile) TombstoneStats() TombstoneStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
Expand Down
4 changes: 2 additions & 2 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,8 +2739,8 @@ func TestFileStore_CreateSnapshot(t *testing.T) {
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
for _, tf := range f.TombstoneFiles() {
p := filepath.Join(s, filepath.Base(tf.Path))
if ts := f.TombstoneStats(); ts.TombstoneExists {
p := filepath.Join(s, filepath.Base(ts.Path))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (t *TSMReader) Size() uint32 {
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
for _, ts := range t.tombstoner.TombstoneFiles() {
if ts := t.tombstoner.TombstoneStats(); ts.TombstoneExists {
if ts.LastModified > lm {
lm = ts.LastModified
}
Expand All @@ -542,9 +542,9 @@ func (t *TSMReader) HasTombstones() bool {
}

// TombstoneFiles returns any tombstone files associated with this TSM file.
func (t *TSMReader) TombstoneFiles() []FileStat {
func (t *TSMReader) TombstoneStats() TombstoneStat {
t.mu.RLock()
fs := t.tombstoner.TombstoneFiles()
fs := t.tombstoner.TombstoneStats()
t.mu.RUnlock()
return fs
}
Expand Down
15 changes: 5 additions & 10 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"path/filepath"
"sort"
"testing"

"github.com/stretchr/testify/require"
)

func fatal(t *testing.T, msg string, err error) {
Expand Down Expand Up @@ -465,9 +467,7 @@ func TestTSMReader_MMAP_TombstoneOutsideTimeRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}

if got, exp := len(r.TombstoneFiles()), 0; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)
}
require.False(t, r.TombstoneStats().TombstoneExists)
}

func TestTSMReader_MMAP_TombstoneOutsideKeyRange(t *testing.T) {
Expand Down Expand Up @@ -529,10 +529,7 @@ func TestTSMReader_MMAP_TombstoneOutsideKeyRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}

if got, exp := len(r.TombstoneFiles()), 0; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)

}
require.False(t, r.TombstoneStats().TombstoneExists)
}

func TestTSMReader_MMAP_TombstoneOverlapKeyRange(t *testing.T) {
Expand Down Expand Up @@ -598,9 +595,7 @@ func TestTSMReader_MMAP_TombstoneOverlapKeyRange(t *testing.T) {
t.Fatalf("HasTombstones mismatch: got %v, exp %v", got, exp)
}

if got, exp := len(r.TombstoneFiles()), 1; got != exp {
t.Fatalf("TombstoneFiles len mismatch: got %v, exp %v", got, exp)
}
require.True(t, r.TombstoneStats().TombstoneExists)
}

func TestTSMReader_MMAP_TombstoneFullRange(t *testing.T) {
Expand Down
37 changes: 22 additions & 15 deletions tsdb/engine/tsm1/tombstone.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ type Tombstoner struct {

FilterFn func(k []byte) bool

// Tombstones that have been written but not flushed to disk yet.
tombstones []Tombstone
// cache of the stats for this tombstone
fileStats []FileStat
tombstoneStats TombstoneStat
// indicates that the stats may be out of sync with what is on disk and they
// should be refreshed.
statsLoaded bool

// Tombstones that have been written but not flushed to disk yet.
tombstones []Tombstone

// These are references used for pending writes that have not been committed. If
// these are nil, then no pending writes are in progress.
gz *gzip.Writer
Expand Down Expand Up @@ -183,43 +182,51 @@ func (t *Tombstoner) Delete() error {

// HasTombstones return true if there are any tombstone entries recorded.
func (t *Tombstoner) HasTombstones() bool {
files := t.TombstoneFiles()
stats := t.TombstoneStats()
if !stats.TombstoneExists {
return false
}
if stats.Size > 0 {
return true
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's worth the verbosity to avoid the extra lock below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good as long as a non-zero size always means there are tombstones.

Copy link
Contributor Author

@danxmoran danxmoran Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same end behavior as before, at least


t.mu.RLock()
n := len(t.tombstones)
t.mu.RUnlock()

return len(files) > 0 && files[0].Size > 0 || n > 0
return n > 0
}

// TombstoneFiles returns any tombstone files associated with Tombstoner's TSM file.
func (t *Tombstoner) TombstoneFiles() []FileStat {
func (t *Tombstoner) TombstoneStats() TombstoneStat {
t.mu.RLock()
if t.statsLoaded {
stats := t.fileStats
stats := t.tombstoneStats
t.mu.RUnlock()
return stats
}
t.mu.RUnlock()

stat, err := os.Stat(t.tombstonePath())
if os.IsNotExist(err) || err != nil {
if err != nil {
t.mu.Lock()
// The file doesn't exist so record that we tried to load it so
// we don't continue to keep trying. This is the common case.
t.statsLoaded = os.IsNotExist(err)
t.fileStats = t.fileStats[:0]
t.tombstoneStats.TombstoneExists = false
stats := t.tombstoneStats
t.mu.Unlock()
return nil
return stats
}

t.mu.Lock()
t.fileStats = append(t.fileStats[:0], FileStat{
t.tombstoneStats = TombstoneStat{
TombstoneExists: true,
Path: t.tombstonePath(),
LastModified: stat.ModTime().UnixNano(),
Size: uint32(stat.Size()),
})
t.statsLoaded = true
stats := t.fileStats
}
stats := t.tombstoneStats
t.mu.Unlock()

return stats
Expand Down
Loading