Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Grow inverted labels cache if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
niksajakovljevic committed Dec 16, 2022
1 parent 92ad453 commit 19dc42d
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 125 deletions.
4 changes: 3 additions & 1 deletion pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
invertedLabelsCache := cache.NewInvertedLabelsCache(cfg.CacheConfig, sigClose)

c := ingestor.Cfg{
NumCopiers: numCopiers,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
Expand Down Expand Up @@ -230,7 +232,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
if !readOnly {
var err error
writerConn = pgxconn.NewPgxConn(writerPool)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, invertedLabelsCache, &c)
if err != nil {
log.Error("msg", "err starting the ingestor", "err", err)
return nil, err
Expand Down
67 changes: 67 additions & 0 deletions pkg/pgmodel/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ package cache

import (
"fmt"
"time"

"github.com/timescale/promscale/pkg/clockcache"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/model"
)

const (
DefaultMetricCacheSize = 10000
DefaultLabelsCacheSize = 100000
growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
)

var evictionMaxAge = time.Minute * 5 // grow cache if we are evicting elements younger than `now - evictionMaxAge`

type LabelsCache interface {
// GetValues tries to get a batch of keys and store the corresponding values is valuesOut
// returns the number of keys that were actually found.
Expand Down Expand Up @@ -116,3 +122,64 @@ func (m *MetricNameCache) Evictions() uint64 {
func NewLabelsCache(config Config) LabelsCache {
return clockcache.WithMetrics("label", "metric", config.LabelsCacheSize)
}

type ResizableCache struct {
*clockcache.Cache
maxSizeBytes uint64
}

func NewResizableCache(cache *clockcache.Cache, maxBytes uint64, sigClose <-chan struct{}) *ResizableCache {
rc := &ResizableCache{cache, maxBytes}
if sigClose != nil {
go rc.runSizeCheck(sigClose)
}
return rc
}

func (rc *ResizableCache) runSizeCheck(sigClose <-chan struct{}) {
ticker := time.NewTicker(growCheckDuration)
for {
select {
case <-ticker.C:
if rc.shouldGrow() {
rc.grow()
}
case <-sigClose:
return
}
}
}

// shouldGrow allows cache growth if we are evicting elements that were recently used or inserted
// evictionMaxAge defines the interval
func (rc *ResizableCache) shouldGrow() bool {
return rc.MaxEvictionTs()+int32(evictionMaxAge.Seconds()) > int32(time.Now().Unix())
}

func (rc *ResizableCache) grow() {
sizeBytes := rc.SizeBytes()
oldSize := rc.Cap()
if float64(sizeBytes)*1.2 >= float64(rc.maxSizeBytes) {
log.Warn("msg", "Cache is too small and cannot be grown",
"current_size_bytes", float64(sizeBytes), "max_size_bytes", float64(rc.maxSizeBytes),
"current_size_elements", oldSize, "check_interval", growCheckDuration,
"eviction_max_age", evictionMaxAge)
return
}

multiplier := growFactor
if float64(sizeBytes)*multiplier >= float64(rc.maxSizeBytes) {
multiplier = float64(rc.maxSizeBytes) / float64(sizeBytes)
}
if multiplier < 1.0 {
return
}

newNumElements := int(float64(oldSize) * multiplier)
log.Info("msg", "Growing the series cache",
"new_size_elements", newNumElements, "current_size_elements", oldSize,
"new_size_bytes", float64(sizeBytes)*multiplier, "max_size_bytes", float64(rc.maxSizeBytes),
"multiplier", multiplier,
"eviction_max_age", evictionMaxAge)
rc.ExpandTo(newNumElements)
}
53 changes: 41 additions & 12 deletions pkg/pgmodel/cache/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,52 @@ var (
Name: "series_cache_max_bytes",
Help: "The target for the maximum amount of memory the series_cache can use in bytes.",
})
InvertedLabelsCacheMaxBytesMetric = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Name: "inverted_labels_cache_max_bytes",
Help: "The target for the maximum amount of memory the inverted labels cache can use in bytes.",
})
)

type Config struct {
SeriesCacheInitialSize uint64
seriesCacheMemoryMaxFlag limits.PercentageAbsoluteBytesFlag
SeriesCacheMemoryMaxBytes uint64

MetricsCacheSize uint64
LabelsCacheSize uint64
ExemplarKeyPosCacheSize uint64
InvertedLabelsCacheSize uint64
MetricsCacheSize uint64
LabelsCacheSize uint64
ExemplarKeyPosCacheSize uint64
InvertedLabelsCacheSize uint64
InvertedLabelsCacheMaxBytesFlag limits.PercentageAbsoluteBytesFlag
InvertedLabelsCacheMaxBytes uint64
}

var DefaultConfig = Config{
SeriesCacheInitialSize: DefaultSeriesCacheSize,
SeriesCacheMemoryMaxBytes: 1000000,

MetricsCacheSize: DefaultMetricCacheSize,
LabelsCacheSize: DefaultLabelsCacheSize,
ExemplarKeyPosCacheSize: DefaultExemplarKeyPosCacheSize,
InvertedLabelsCacheSize: DefaultInvertedLabelsCacheSize,
MetricsCacheSize: DefaultMetricCacheSize,
LabelsCacheSize: DefaultLabelsCacheSize,
ExemplarKeyPosCacheSize: DefaultExemplarKeyPosCacheSize,
InvertedLabelsCacheSize: DefaultInvertedLabelsCacheSize,
InvertedLabelsCacheMaxBytes: 1000000,
}

func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
/* set defaults */
cfg.seriesCacheMemoryMaxFlag.SetPercent(50)
cfg.InvertedLabelsCacheMaxBytesFlag.SetPercent(10)

fs.Uint64Var(&cfg.MetricsCacheSize, "metrics.cache.metrics.size", DefaultMetricCacheSize, "Maximum number of metric names to cache.")
fs.Uint64Var(&cfg.SeriesCacheInitialSize, "metrics.cache.series.initial-size", DefaultSeriesCacheSize, "Maximum number of series to cache.")
fs.Uint64Var(&cfg.LabelsCacheSize, "metrics.cache.labels.size", DefaultLabelsCacheSize, "Maximum number of labels to cache.")
fs.Uint64Var(&cfg.ExemplarKeyPosCacheSize, "metrics.cache.exemplar.size", DefaultExemplarKeyPosCacheSize, "Maximum number of exemplar metrics key-position to cache. "+
"It has one-to-one mapping with number of metrics that have exemplar, as key positions are saved per metric basis.")
fs.Var(&cfg.seriesCacheMemoryMaxFlag, "metrics.cache.series.max-bytes", "Initial number of elements in the series cache. "+
"Specified in bytes or as a percentage of the memory-target (e.g. 50%).")
"Specified in bytes or as a percentage of the cache.memory-target (e.g. 50%).")
fs.Uint64Var(&cfg.InvertedLabelsCacheSize, "metrics.cache.inverted-labels.size", DefaultInvertedLabelsCacheSize, "Maximum number of label-ids to cache. This helps increase ingest performance.")
fs.Var(&cfg.InvertedLabelsCacheMaxBytesFlag, "metrics.cache.inverted-labels.max-bytes", "Initial size of elements in the invreted labels cache. Specified in bytes or as a percentage of the cache.memory-target (e.g. 50%).")
return cfg
}

Expand All @@ -66,14 +77,32 @@ func Validate(cfg *Config, lcfg limits.Config) error {
case limits.Absolute:
cfg.SeriesCacheMemoryMaxBytes = value
default:
return fmt.Errorf("series-cache-max-bytes flag has unknown kind")
return fmt.Errorf("metrics.cache.series.max-bytes flag has unknown kind")
}
if cfg.SeriesCacheMemoryMaxBytes > lcfg.TargetMemoryBytes {
return fmt.Errorf("The metrics.cache.series.max-bytes must be smaller than the cache.memory-target")
}
SeriesCacheMaxBytesMetric.Set(float64(cfg.SeriesCacheMemoryMaxBytes))

if cfg.SeriesCacheMemoryMaxBytes > lcfg.TargetMemoryBytes {
return fmt.Errorf("The series-cache-max-bytes must be smaller than the memory-target")
kind, value = cfg.InvertedLabelsCacheMaxBytesFlag.Get()
switch kind {
case limits.Percentage:
cfg.InvertedLabelsCacheMaxBytes = uint64(float64(lcfg.TargetMemoryBytes) * (float64(value) / 100.0))
case limits.Absolute:
cfg.InvertedLabelsCacheMaxBytes = value
default:
return fmt.Errorf("metrics.cache.inverted-labels.max-bytes flag has unknown kind")
}

if cfg.InvertedLabelsCacheMaxBytes > lcfg.TargetMemoryBytes {
return fmt.Errorf("The metrics.cache.inverted-labels.max-bytes must be smaller than the cache.memory-target")
}

if cfg.SeriesCacheMemoryMaxBytes+cfg.InvertedLabelsCacheMaxBytes > lcfg.TargetMemoryBytes {
return fmt.Errorf("Sum of metrics.cache.series.max-bytes and metrics.cache.inverted-labels.max-bytes must be smaller than the cache.memory-target")
}

InvertedLabelsCacheMaxBytesMetric.Set(float64(cfg.InvertedLabelsCacheMaxBytes))
return nil
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/pgmodel/cache/inverted_labels_cache.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cache

import (
"fmt"

"github.com/timescale/promscale/pkg/clockcache"
)

Expand Down Expand Up @@ -38,31 +36,24 @@ func (li LabelInfo) len() int {
// Each label position is unique for a specific metric, meaning that
// one label can have different position for different metrics
type InvertedLabelsCache struct {
cache *clockcache.Cache
*ResizableCache
}

// Cache is thread-safe
func NewInvertedLabelsCache(size uint64) (*InvertedLabelsCache, error) {
if size <= 0 {
return nil, fmt.Errorf("labels cache size must be > 0")
}
cache := clockcache.WithMetrics("inverted_labels", "metric", size)
return &InvertedLabelsCache{cache}, nil
func NewInvertedLabelsCache(config Config, sigClose chan struct{}) *InvertedLabelsCache {
cache := clockcache.WithMetrics("inverted_labels", "metric", config.InvertedLabelsCacheSize)
return &InvertedLabelsCache{NewResizableCache(cache, config.InvertedLabelsCacheMaxBytes, sigClose)}
}

func (c *InvertedLabelsCache) GetLabelsId(key LabelKey) (LabelInfo, bool) {
id, found := c.cache.Get(key)
id, found := c.Get(key)
if found {
return id.(LabelInfo), found
}
return LabelInfo{}, false
}

func (c *InvertedLabelsCache) Put(key LabelKey, val LabelInfo) bool {
_, added := c.cache.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
_, added := c.Insert(key, val, uint64(key.len())+uint64(val.len())+17)
return added
}

func (c *InvertedLabelsCache) Reset() {
c.cache.Reset()
}
86 changes: 5 additions & 81 deletions pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"math"
"sort"
"sync"
"time"
"unsafe"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -24,10 +23,6 @@ import (
// this seems like a good default size for /active/ series. This results in Promscale using around 360MB on start.
const DefaultSeriesCacheSize = 1000000

const growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
const growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
var evictionMaxAge = time.Minute * 5 // grow cache if we are evicting elements younger than `now - evictionMaxAge`

// SeriesCache is a cache of model.Series entries.
type SeriesCache interface {
Reset()
Expand All @@ -38,91 +33,20 @@ type SeriesCache interface {
}

type SeriesCacheImpl struct {
cache *clockcache.Cache
maxSizeBytes uint64
*ResizableCache
}

func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl {
cache := &SeriesCacheImpl{
return &SeriesCacheImpl{NewResizableCache(
clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize),
config.SeriesCacheMemoryMaxBytes,
}

if sigClose != nil {
go cache.runSizeCheck(sigClose)
}
return cache
}

func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) {
ticker := time.NewTicker(growCheckDuration)
for {
select {
case <-ticker.C:
if t.shouldGrow() {
t.grow()
}
case <-sigClose:
return
}
}
}

// shouldGrow allows cache growth if we are evicting elements that were recently used or inserted
// evictionMaxAge defines the interval
func (t *SeriesCacheImpl) shouldGrow() bool {
return t.cache.MaxEvictionTs()+int32(evictionMaxAge.Seconds()) > int32(time.Now().Unix())
}

func (t *SeriesCacheImpl) grow() {
sizeBytes := t.cache.SizeBytes()
oldSize := t.cache.Cap()
if float64(sizeBytes)*1.2 >= float64(t.maxSizeBytes) {
log.Warn("msg", "Series cache is too small and cannot be grown",
"current_size_bytes", float64(sizeBytes), "max_size_bytes", float64(t.maxSizeBytes),
"current_size_elements", oldSize, "check_interval", growCheckDuration,
"eviction_max_age", evictionMaxAge)
return
}

multiplier := growFactor
if float64(sizeBytes)*multiplier >= float64(t.maxSizeBytes) {
multiplier = float64(t.maxSizeBytes) / float64(sizeBytes)
}
if multiplier < 1.0 {
return
}

newNumElements := int(float64(oldSize) * multiplier)
log.Info("msg", "Growing the series cache",
"new_size_elements", newNumElements, "current_size_elements", oldSize,
"new_size_bytes", float64(sizeBytes)*multiplier, "max_size_bytes", float64(t.maxSizeBytes),
"multiplier", multiplier,
"eviction_max_age", evictionMaxAge)
t.cache.ExpandTo(newNumElements)
}

func (t *SeriesCacheImpl) Len() int {
return t.cache.Len()
}

func (t *SeriesCacheImpl) Cap() int {
return t.cache.Cap()
}

func (t *SeriesCacheImpl) Evictions() uint64 {
return t.cache.Evictions()
}

// Reset should be concurrency-safe
func (t *SeriesCacheImpl) Reset() {
t.cache.Reset()
sigClose)}
}

// Get the canonical version of a series if one exists.
// input: the string representation of a Labels as defined by generateKey()
func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
val, ok := t.cache.Get(str)
val, ok := t.Get(str)
if !ok {
return nil
}
Expand All @@ -134,7 +58,7 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
// the even of multiple goroutines setting labels concurrently).
func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series {
//str not counted twice in size since the key and lset.str will point to same thing.
val, inCache := t.cache.Insert(str, lset, lset.FinalSizeBytes())
val, inCache := t.Insert(str, lset, lset.FinalSizeBytes())
if !inCache {
// It seems that cache was full and eviction failed to remove
// element due to starvation caused by a lot of concurrent gets
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/cache/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestGrowSeriesCache(t *testing.T) {
cache := NewSeriesCache(Config{SeriesCacheInitialSize: 100, SeriesCacheMemoryMaxBytes: DefaultConfig.SeriesCacheMemoryMaxBytes}, nil)
cacheGrowCounter := 0
for i := 0; i < 200; i++ {
cache.cache.Insert(i, i, 1)
cache.Insert(i, i, 1)
if i%100 == 0 {
time.Sleep(tc.sleep)
}
Expand Down
Loading

0 comments on commit 19dc42d

Please sign in to comment.