From eb5df403d8dc027da8a32811698e37509829192a Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Wed, 29 Jun 2022 08:59:22 +0200 Subject: [PATCH] Minor refactoring for clarification - Some comments reference structs or methods which have been renamed - Change parameter name from 'builder' to 'keyBuffer' in `generateKey` - Remove double-creation of key and value in series_writer.go - Clarify why series cache key is constructed how it is, add test --- pkg/pgmodel/cache/series_cache.go | 60 +++++++++++++++----------- pkg/pgmodel/cache/series_cache_test.go | 20 ++++++++- pkg/pgmodel/ingestor/ingestor.go | 4 +- pkg/pgmodel/ingestor/series_writer.go | 2 +- pkg/pgmodel/model/series.go | 4 +- 5 files changed, 60 insertions(+), 30 deletions(-) diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index fe0641092a..a6dfb97ad5 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -24,10 +24,11 @@ import ( //this seems like a good initial size for /active/ series. Takes about 32MB const DefaultSeriesCacheSize = 250000 -const GrowCheckDuration = time.Minute //check whether to grow the series cache this often +const GrowCheckDuration = time.Minute // check whether to grow the series cache this often const GrowEvictionThreshold = 0.2 // grow when evictions more than 20% of cache size const GrowFactor = float64(2.0) // multiply cache size by this factor when growing the cache +// SeriesCache is a cache of model.Series entries. type SeriesCache interface { Reset() GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error) @@ -112,15 +113,13 @@ func (t *SeriesCacheImpl) Evictions() uint64 { return t.cache.Evictions() } -//ResetStoredLabels should be concurrency-safe +// Reset should be concurrency-safe func (t *SeriesCacheImpl) Reset() { t.cache.Reset() } // Get the canonical version of a series if one exists. // input: the string representation of a Labels as defined by generateKey() -// This function should not be called directly, use labelProtosToLabels() or -// LabelsFromSlice() instead. func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) { val, ok := t.cache.Get(str) if !ok { @@ -132,18 +131,31 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) { // Try to set a series as the canonical Series for a given string // representation, returning the canonical version (which can be different in // the even of multiple goroutines setting labels concurrently). -// This function should not be called directly, use labelProtosToLabels() or -// LabelsFromSlice() instead. func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series { //str not counted twice in size since the key and lset.str will point to same thing. val, _ := t.cache.Insert(str, lset, lset.FinalSizeBytes()) return val.(*model.Series) } -// Get a string representation for hashing and comparison -// This representation is guaranteed to uniquely represent the underlying label -// set, though need not human-readable, or indeed, valid utf-8 -func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName string, error error) { +// generateKey takes an array of Prometheus labels, and a byte buffer. It +// stores a string representation (for hashing and comparison) of the labels +// array in the byte buffer, and returns the metric name which it found in the +// array of Prometheus labels. +// +// The string representation is guaranteed to uniquely represent the underlying +// label set, though need not be human-readable, or indeed, valid utf-8. +// +// The `labels` parameter contains an array of (name, value) pairs. +// The key which this function returns is the concatenation of sorted kv pairs +// where each of the key and value is prefixed by the little-endian +// representation of the two bytes of the uint16 length of the key/value string: +// key1val1key2val2 +// +// This formatting ensures isomorphism, hence preventing collisions. +// An example for how this transform works is as follows: +// label1 = ("hell", "oworld"), label2 = ("hello", "world"). +// => "\x04\x00hell\x06\x00oworld\x05\x00hello\x05\x00world" +func generateKey(labels []prompb.Label, keyBuffer *bytes.Buffer) (metricName string, error error) { if len(labels) == 0 { return "", nil } @@ -170,17 +182,13 @@ func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName strin // BigCache cannot handle cases where the key string has a size greater than // 16bits, so we error on such keys here. Since we are restricted to a 16bit - // total length anyway, we only use 16bits to store the legth of each substring + // total length anyway, we only use 16bits to store the length of each substring // in our string encoding if expectedStrLen > math.MaxUint16 { return metricName, fmt.Errorf("series too long, combined series has length %d, max length %d", expectedStrLen, math.MaxUint16) } - // the string representation is - // (key val)* (key val)? - // that is a series of the a sequence of key values pairs with each string - // prefixed with it's length as a little-endian uint16 - builder.Grow(expectedStrLen) + keyBuffer.Grow(expectedStrLen) lengthBuf := make([]byte, 2) for i := range labels { @@ -193,18 +201,18 @@ func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName strin // this cast is safe since we check that the combined length of all the // strings fit within a uint16, each string's length must also fit binary.LittleEndian.PutUint16(lengthBuf, uint16(len(key))) - builder.WriteByte(lengthBuf[0]) - builder.WriteByte(lengthBuf[1]) - builder.WriteString(key) + keyBuffer.WriteByte(lengthBuf[0]) + keyBuffer.WriteByte(lengthBuf[1]) + keyBuffer.WriteString(key) val := l.Value // this cast is safe since we check that the combined length of all the // strings fit within a uint16, each string's length must also fit binary.LittleEndian.PutUint16(lengthBuf, uint16(len(val))) - builder.WriteByte(lengthBuf[0]) - builder.WriteByte(lengthBuf[1]) - builder.WriteString(val) + keyBuffer.WriteByte(lengthBuf[0]) + keyBuffer.WriteByte(lengthBuf[1]) + keyBuffer.WriteString(val) } return metricName, nil @@ -216,7 +224,7 @@ var keyPool = sync.Pool{ }, } -// GetSeriesFromLabels converts a labels.Labels to a canonical Labels object +// GetSeriesFromLabels converts a labels.Labels to a canonical model.Series object func (t *SeriesCacheImpl) GetSeriesFromLabels(ls labels.Labels) (*model.Series, error) { ll := make([]prompb.Label, len(ls)) for i := range ls { @@ -242,7 +250,11 @@ func useByteAsStringNoCopy(b []byte, useAsStringFunc func(string)) { useAsStringFunc(str) } -// GetSeriesFromProtos converts a prompb.Label to a canonical Labels object +// GetSeriesFromProtos returns a model.Series entry given a list of Prometheus +// prompb.Label. +// If the desired entry is not in the cache, a "placeholder" model.Series entry +// is constructed and put into the cache. It is not populated with database IDs +// until a later phase, see model.Series.SetSeriesID. func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model.Series, string, error) { builder := keyPool.Get().(*bytes.Buffer) builder.Reset() diff --git a/pkg/pgmodel/cache/series_cache_test.go b/pkg/pgmodel/cache/series_cache_test.go index 62da5ecafa..de30665a3c 100644 --- a/pkg/pgmodel/cache/series_cache_test.go +++ b/pkg/pgmodel/cache/series_cache_test.go @@ -5,6 +5,9 @@ package cache import ( + "bytes" + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/prompb" "math" "strings" "testing" @@ -12,7 +15,7 @@ import ( promLabels "github.com/prometheus/prometheus/model/labels" ) -func TestBigLables(t *testing.T) { +func TestBigLabels(t *testing.T) { cache := NewSeriesCache(DefaultConfig, nil) builder := strings.Builder{} builder.Grow(int(^uint16(0)) + 1) // one greater than uint16 max @@ -34,3 +37,18 @@ func TestBigLables(t *testing.T) { t.Errorf("expected error") } } + +func TestGenerateKey(t *testing.T) { + var labels = []prompb.Label{ + {Name: "__name__", Value: "test"}, + {Name: "hell", Value: "oworld"}, + {Name: "hello", Value: "world"}, + } + var keyBuffer = new(bytes.Buffer) + var metricName, err = generateKey(labels, keyBuffer) + + require.Nil(t, err) + + require.Equal(t, "test", metricName) + require.Equal(t, []byte("\x08\x00__name__\x04\x00test\x04\x00hell\x06\x00oworld\x05\x00hello\x05\x00world"), keyBuffer.Bytes()) +} diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 7e52e77e05..506d474502 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -192,8 +192,8 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p if len(ts.Labels) == 0 { continue } - // Normalize and canonicalize t.Labels. - // After this point t.Labels should never be used again. + // Normalize and canonicalize ts.Labels. + // After this point ts.Labels should never be used again. series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels) if err != nil { return 0, err diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 8dcbd25f72..e100d22cf4 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -288,7 +288,7 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe for i := range pos { res := cache.NewLabelInfo(labelIDs[i], pos[i]) key := cache.NewLabelKey(info.metricName, names[i], values[i]) - if !h.labelsCache.Put(cache.NewLabelKey(info.metricName, names[i], values[i]), cache.NewLabelInfo(labelIDs[i], pos[i])) { + if !h.labelsCache.Put(key, res) { log.Warn("failed to add label ID to inverted cache") } _, ok := labelMap[key] diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index b15c3fe1fe..1bc3702013 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -27,10 +27,10 @@ func (s SeriesID) String() string { return strconv.FormatInt(int64(s), 10) } -//Epoch represents the series epoch +// SeriesEpoch represents the series epoch type SeriesEpoch int64 -// Series stores a labels.Series in its canonical string representation +// Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { //protects names, values, seriesID, epoch //str and metricName are immutable and doesn't need a lock