diff --git a/pkg/mmap/mmap_unix.go b/pkg/mmap/mmap_unix.go index c9faafb8d4c..13629c1a4e5 100644 --- a/pkg/mmap/mmap_unix.go +++ b/pkg/mmap/mmap_unix.go @@ -10,7 +10,6 @@ package mmap import ( "os" "syscall" - "unsafe" ) // Map memory-maps a file. @@ -38,11 +37,6 @@ func Map(path string, sz int64) ([]byte, error) { return nil, err } - if _, _, err := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&data[0])), uintptr(len(data)), uintptr(syscall.MADV_RANDOM)); err != 0 { - Unmap(data) - return nil, err - } - return data, nil } diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index 05307159008..bb8db4be7cb 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -19,6 +19,8 @@ type HashMap struct { threshold int64 mask int64 loadFactor int + + tmpKey []byte } func NewHashMap(opt Options) *HashMap { @@ -65,16 +67,19 @@ func (m *HashMap) insert(hash int64, key []byte, val interface{}) (overwritten b pos := hash & m.mask var dist int64 + var copied bool + searchKey := key + // Continue searching until we find an empty slot or lower probe distance. for { e := &m.elems[pos] // Empty slot found or matching key, insert and exit. - match := bytes.Equal(m.elems[pos].key, key) + match := bytes.Equal(m.elems[pos].key, searchKey) if m.hashes[pos] == 0 || match { m.hashes[pos] = hash e.hash, e.value = hash, val - e.setKey(key) + e.setKey(searchKey) return match } @@ -86,11 +91,16 @@ func (m *HashMap) insert(hash int64, key []byte, val interface{}) (overwritten b hash, m.hashes[pos] = m.hashes[pos], hash val, e.value = e.value, val - tmp := make([]byte, len(e.key)) - copy(tmp, e.key) + m.tmpKey = assign(m.tmpKey, e.key) + e.setKey(searchKey) - e.setKey(key) - key = tmp + if !copied { + searchKey = make([]byte, len(key)) + copy(searchKey, key) + copied = true + } + + searchKey = assign(searchKey, m.tmpKey) // Update current distance. dist = elemDist @@ -208,15 +218,7 @@ func (e *hashElem) reset() { // setKey copies v to a key on e. func (e *hashElem) setKey(v []byte) { - // Shrink or grow key to fit value. - if len(e.key) > len(v) { - e.key = e.key[:len(v)] - } else if len(e.key) < len(v) { - e.key = append(e.key, make([]byte, len(v)-len(e.key))...) - } - - // Copy value to key. - copy(e.key, v) + e.key = assign(e.key, v) } // Options represents initialization options that are passed to NewHashMap(). @@ -268,6 +270,15 @@ func pow2(v int64) int64 { panic("unreachable") } +func assign(x, v []byte) []byte { + if cap(x) < len(v) { + x = make([]byte, len(v)) + } + x = x[:len(v)] + copy(x, v) + return x +} + type byteSlices [][]byte func (a byteSlices) Len() int { return len(a) } diff --git a/tsdb/engine/tsm1/mmap_unix.go b/tsdb/engine/tsm1/mmap_unix.go index d4c7b2b3e29..48b4a8caef3 100644 --- a/tsdb/engine/tsm1/mmap_unix.go +++ b/tsdb/engine/tsm1/mmap_unix.go @@ -27,6 +27,10 @@ func munmap(b []byte) (err error) { return unix.Munmap(b) } +func madviseWillNeed(b []byte) error { + return madvise(b, syscall.MADV_WILLNEED) +} + func madviseDontNeed(b []byte) error { return madvise(b, syscall.MADV_DONTNEED) } diff --git a/tsdb/engine/tsm1/mmap_windows.go b/tsdb/engine/tsm1/mmap_windows.go index 360be3efe8d..1991f841f42 100644 --- a/tsdb/engine/tsm1/mmap_windows.go +++ b/tsdb/engine/tsm1/mmap_windows.go @@ -121,6 +121,10 @@ func munmap(b []byte) (err error) { return nil } +func madviseWillNeed(b []byte) error { + return nil +} + func madviseDontNeed(b []byte) error { // Not supported return nil diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index b3b13bd0838..aeb3747c5a9 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -1374,6 +1374,10 @@ func (m *mmapAccessor) init() (*indirectIndex, error) { return nil, fmt.Errorf("mmapAccessor: invalid indexStart") } + // Hint to the kernal that we will be reading the file. It would be better to hint + // that we will be reading the index section, but that doesn't seem to work ATM. + _ = madviseWillNeed(m.b) + m.index = NewIndirectIndex() if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { return nil, err diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 3296f6ce4a3..245acccb3e8 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -36,13 +36,14 @@ const ( // LogFile represents an on-disk write-ahead log file. type LogFile struct { - mu sync.RWMutex - wg sync.WaitGroup // ref count - id int // file sequence identifier - data []byte // mmap - file *os.File // writer - w *bufio.Writer // buffered writer - buf []byte // marshaling buffer + mu sync.RWMutex + wg sync.WaitGroup // ref count + id int // file sequence identifier + data []byte // mmap + file *os.File // writer + w *bufio.Writer // buffered writer + buf []byte // marshaling buffer + keyBuf []byte sfile *tsdb.SeriesFile // series lookup size int64 // tracks current file size @@ -478,7 +479,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag continue } writeRequired = true - entries = append(entries, LogEntry{SeriesID: seriesIDs[i]}) + entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true}) } seriesSet.RUnlock() @@ -607,7 +608,17 @@ func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { } func (f *LogFile) execSeriesEntry(e *LogEntry) { - seriesKey := f.sfile.SeriesKey(e.SeriesID) + var seriesKey []byte + if e.cached { + sz := tsdb.SeriesKeySize(e.name, e.tags) + if len(f.keyBuf) < sz { + f.keyBuf = make([]byte, 0, sz) + } + seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags) + } else { + seriesKey = f.sfile.SeriesKey(e.SeriesID) + } + assert(seriesKey != nil, fmt.Sprintf("series key for ID: %d not found", e.SeriesID)) // Check if deleted. @@ -966,6 +977,10 @@ type LogEntry struct { Value []byte // tag value Checksum uint32 // checksum of flag/name/tags. Size int // total size of record, in bytes. + + cached bool // Hint to LogFile that series data is already parsed + name []byte // series naem, this is a cached copy of the parsed measurement name + tags models.Tags // series tags, this is a cached copied of the parsed tags } // UnmarshalBinary unmarshals data into e.