Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Series creation improvements #9380

Merged
merged 4 commits into from
Jan 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/mmap/mmap_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package mmap
import (
"os"
"syscall"
"unsafe"
)

// Map memory-maps a file.
Expand Down Expand Up @@ -38,11 +37,6 @@ func Map(path string, sz int64) ([]byte, error) {
return nil, err
}

if _, _, err := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&data[0])), uintptr(len(data)), uintptr(syscall.MADV_RANDOM)); err != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mmap in (I think) five places in the database:

  1. We mmap TSI index files (.tsi).
  2. We mmap TSI log files (.tsl).
  3. We mmap the Series File hashmap index (_series/*/index).
  4. We mmap the Series File segment files (_series/*/0000).
  5. We mmap the TSM index (.tsm).

As I understand it, SYS_MADVISE is there to inform the kernel about how the memory within the mmap'd data will be accessed. The current setting is telling the kernel that we will need to randomly access pages, which are typically going to be 4KB of the mapped data.

For the above five examples I think that the way we access pages would probably be different.

  1. We probably sequentially scan a lot when pulling series ids off, but then we probably jump around some parts because we have things like hash indexes and sketches and so on mapped in.
  2. Unless I'm mistaken we're only going to sequentially read this file, to process the log entries.
  3. I would expect this file to be basically randomly accessed. While there would be might be some sequential access, e.g., when probing, wouldn't it likely all be within a single page or at most a few?
  4. These files are probably mainly randomly accessed via the offsets of the series id in the segments.
  5. I'm not familiar enough with the tsm index to know its access patterns.

To me, it seems like we should be passing in the advice values to Map depending on the expected use-case. It looks like we might have a mixture of MADV_NORMAL, MADV_SEQUENTIAL and MADV_RANDOM.

@jwilder I guess when you switched from MADV_RANDOM to MADV_NORMAL then the kernel was doing some read ahead that was reducing the number of IO read operations. Though, of course, if we used MADV_SEQUENTIAL then more aggressive read ahead would be done and read IOPS might be reduced even further.

I think we could experiment with different advise parameters for the different subsystems that use mmap and set the advise value accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had to remove any place where we're using MADV_RANDOM in the past due to adverse performance problems. See: #8872 and #5221. Other projects (boltdb/bolt#691) have seen similar issues.

I only add madavise calls in specific spots after testing them quite extensively and can see that they improve what I'm measuring. Despite what the docs say they do, it's kind of a black box as to what kernel actually does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwilder I guess what I'm saying is: we should do some explicit experimentation in the future on all the sub systems and see if any advice other than NORMAL is appropriate anywhere. If we're doing really sequential stuff then MADV_SEQUENTIAL might be even better for read IOPS than MADV_NORMAL for example.

Unmap(data)
return nil, err
}

return data, nil
}

Expand Down
41 changes: 26 additions & 15 deletions pkg/rhh/rhh.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type HashMap struct {
threshold int64
mask int64
loadFactor int

tmpKey []byte
}

func NewHashMap(opt Options) *HashMap {
Expand Down Expand Up @@ -65,16 +67,19 @@ func (m *HashMap) insert(hash int64, key []byte, val interface{}) (overwritten b
pos := hash & m.mask
var dist int64

var copied bool
searchKey := key

// Continue searching until we find an empty slot or lower probe distance.
for {
e := &m.elems[pos]

// Empty slot found or matching key, insert and exit.
match := bytes.Equal(m.elems[pos].key, key)
match := bytes.Equal(m.elems[pos].key, searchKey)
if m.hashes[pos] == 0 || match {
m.hashes[pos] = hash
e.hash, e.value = hash, val
e.setKey(key)
e.setKey(searchKey)
return match
}

Expand All @@ -86,11 +91,16 @@ func (m *HashMap) insert(hash int64, key []byte, val interface{}) (overwritten b
hash, m.hashes[pos] = m.hashes[pos], hash
val, e.value = e.value, val

tmp := make([]byte, len(e.key))
copy(tmp, e.key)
m.tmpKey = assign(m.tmpKey, e.key)
e.setKey(searchKey)

e.setKey(key)
key = tmp
if !copied {
searchKey = make([]byte, len(key))
copy(searchKey, key)
copied = true
}

searchKey = assign(searchKey, m.tmpKey)

// Update current distance.
dist = elemDist
Expand Down Expand Up @@ -208,15 +218,7 @@ func (e *hashElem) reset() {

// setKey copies v to a key on e.
func (e *hashElem) setKey(v []byte) {
// Shrink or grow key to fit value.
if len(e.key) > len(v) {
e.key = e.key[:len(v)]
} else if len(e.key) < len(v) {
e.key = append(e.key, make([]byte, len(v)-len(e.key))...)
}

// Copy value to key.
copy(e.key, v)
e.key = assign(e.key, v)
}

// Options represents initialization options that are passed to NewHashMap().
Expand Down Expand Up @@ -268,6 +270,15 @@ func pow2(v int64) int64 {
panic("unreachable")
}

func assign(x, v []byte) []byte {
if cap(x) < len(v) {
x = make([]byte, len(v))
}
x = x[:len(v)]
copy(x, v)
return x
}

type byteSlices [][]byte

func (a byteSlices) Len() int { return len(a) }
Expand Down
4 changes: 4 additions & 0 deletions tsdb/engine/tsm1/mmap_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func munmap(b []byte) (err error) {
return unix.Munmap(b)
}

func madviseWillNeed(b []byte) error {
return madvise(b, syscall.MADV_WILLNEED)
}

func madviseDontNeed(b []byte) error {
return madvise(b, syscall.MADV_DONTNEED)
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/engine/tsm1/mmap_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func munmap(b []byte) (err error) {
return nil
}

func madviseWillNeed(b []byte) error {
return nil
}

func madviseDontNeed(b []byte) error {
// Not supported
return nil
Expand Down
4 changes: 4 additions & 0 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,10 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
}

// Hint to the kernal that we will be reading the file. It would be better to hint
// that we will be reading the index section, but that doesn't seem to work ATM.
_ = madviseWillNeed(m.b)

m.index = NewIndirectIndex()
if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
return nil, err
Expand Down
33 changes: 24 additions & 9 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ const (

// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
mu sync.RWMutex
wg sync.WaitGroup // ref count
id int // file sequence identifier
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
buf []byte // marshaling buffer
mu sync.RWMutex
wg sync.WaitGroup // ref count
id int // file sequence identifier
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
buf []byte // marshaling buffer
keyBuf []byte

sfile *tsdb.SeriesFile // series lookup
size int64 // tracks current file size
Expand Down Expand Up @@ -478,7 +479,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
continue
}
writeRequired = true
entries = append(entries, LogEntry{SeriesID: seriesIDs[i]})
entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true})
}
seriesSet.RUnlock()

Expand Down Expand Up @@ -607,7 +608,17 @@ func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
}

func (f *LogFile) execSeriesEntry(e *LogEntry) {
seriesKey := f.sfile.SeriesKey(e.SeriesID)
var seriesKey []byte
if e.cached {
sz := tsdb.SeriesKeySize(e.name, e.tags)
if len(f.keyBuf) < sz {
f.keyBuf = make([]byte, 0, sz)
}
seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
} else {
seriesKey = f.sfile.SeriesKey(e.SeriesID)
}

assert(seriesKey != nil, fmt.Sprintf("series key for ID: %d not found", e.SeriesID))

// Check if deleted.
Expand Down Expand Up @@ -966,6 +977,10 @@ type LogEntry struct {
Value []byte // tag value
Checksum uint32 // checksum of flag/name/tags.
Size int // total size of record, in bytes.

cached bool // Hint to LogFile that series data is already parsed
name []byte // series naem, this is a cached copy of the parsed measurement name
tags models.Tags // series tags, this is a cached copied of the parsed tags
}

// UnmarshalBinary unmarshals data into e.
Expand Down