Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Minor refactoring for clarification
Browse files Browse the repository at this point in the history
- Some comments reference structs or methods which have been renamed
- Change parameter name from 'builder' to 'keyBuffer' in `generateKey`
- Remove double-creation of key and value in series_writer.go
- Clarify why series cache key is constructed how it is, add test
  • Loading branch information
JamesGuthrie committed Jul 5, 2022
1 parent bc08632 commit eb5df40
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 30 deletions.
60 changes: 36 additions & 24 deletions pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
//this seems like a good initial size for /active/ series. Takes about 32MB
const DefaultSeriesCacheSize = 250000

const GrowCheckDuration = time.Minute //check whether to grow the series cache this often
const GrowCheckDuration = time.Minute // check whether to grow the series cache this often
const GrowEvictionThreshold = 0.2 // grow when evictions more than 20% of cache size
const GrowFactor = float64(2.0) // multiply cache size by this factor when growing the cache

// SeriesCache is a cache of model.Series entries.
type SeriesCache interface {
Reset()
GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error)
Expand Down Expand Up @@ -112,15 +113,13 @@ func (t *SeriesCacheImpl) Evictions() uint64 {
return t.cache.Evictions()
}

//ResetStoredLabels should be concurrency-safe
// Reset should be concurrency-safe
func (t *SeriesCacheImpl) Reset() {
t.cache.Reset()
}

// Get the canonical version of a series if one exists.
// input: the string representation of a Labels as defined by generateKey()
// This function should not be called directly, use labelProtosToLabels() or
// LabelsFromSlice() instead.
func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
val, ok := t.cache.Get(str)
if !ok {
Expand All @@ -132,18 +131,31 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
// Try to set a series as the canonical Series for a given string
// representation, returning the canonical version (which can be different in
// the even of multiple goroutines setting labels concurrently).
// This function should not be called directly, use labelProtosToLabels() or
// LabelsFromSlice() instead.
func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series {
//str not counted twice in size since the key and lset.str will point to same thing.
val, _ := t.cache.Insert(str, lset, lset.FinalSizeBytes())
return val.(*model.Series)
}

// Get a string representation for hashing and comparison
// This representation is guaranteed to uniquely represent the underlying label
// set, though need not human-readable, or indeed, valid utf-8
func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName string, error error) {
// generateKey takes an array of Prometheus labels, and a byte buffer. It
// stores a string representation (for hashing and comparison) of the labels
// array in the byte buffer, and returns the metric name which it found in the
// array of Prometheus labels.
//
// The string representation is guaranteed to uniquely represent the underlying
// label set, though need not be human-readable, or indeed, valid utf-8.
//
// The `labels` parameter contains an array of (name, value) pairs.
// The key which this function returns is the concatenation of sorted kv pairs
// where each of the key and value is prefixed by the little-endian
// representation of the two bytes of the uint16 length of the key/value string:
// <key1-len>key1<val1-len>val1<key2-len>key2<val2-len>val2
//
// This formatting ensures isomorphism, hence preventing collisions.
// An example for how this transform works is as follows:
// label1 = ("hell", "oworld"), label2 = ("hello", "world").
// => "\x04\x00hell\x06\x00oworld\x05\x00hello\x05\x00world"
func generateKey(labels []prompb.Label, keyBuffer *bytes.Buffer) (metricName string, error error) {
if len(labels) == 0 {
return "", nil
}
Expand All @@ -170,17 +182,13 @@ func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName strin

// BigCache cannot handle cases where the key string has a size greater than
// 16bits, so we error on such keys here. Since we are restricted to a 16bit
// total length anyway, we only use 16bits to store the legth of each substring
// total length anyway, we only use 16bits to store the length of each substring
// in our string encoding
if expectedStrLen > math.MaxUint16 {
return metricName, fmt.Errorf("series too long, combined series has length %d, max length %d", expectedStrLen, math.MaxUint16)
}

// the string representation is
// (<key-len>key <val-len> val)* (<key-len>key <val-len> val)?
// that is a series of the a sequence of key values pairs with each string
// prefixed with it's length as a little-endian uint16
builder.Grow(expectedStrLen)
keyBuffer.Grow(expectedStrLen)

lengthBuf := make([]byte, 2)
for i := range labels {
Expand All @@ -193,18 +201,18 @@ func generateKey(labels []prompb.Label, builder *bytes.Buffer) (metricName strin
// this cast is safe since we check that the combined length of all the
// strings fit within a uint16, each string's length must also fit
binary.LittleEndian.PutUint16(lengthBuf, uint16(len(key)))
builder.WriteByte(lengthBuf[0])
builder.WriteByte(lengthBuf[1])
builder.WriteString(key)
keyBuffer.WriteByte(lengthBuf[0])
keyBuffer.WriteByte(lengthBuf[1])
keyBuffer.WriteString(key)

val := l.Value

// this cast is safe since we check that the combined length of all the
// strings fit within a uint16, each string's length must also fit
binary.LittleEndian.PutUint16(lengthBuf, uint16(len(val)))
builder.WriteByte(lengthBuf[0])
builder.WriteByte(lengthBuf[1])
builder.WriteString(val)
keyBuffer.WriteByte(lengthBuf[0])
keyBuffer.WriteByte(lengthBuf[1])
keyBuffer.WriteString(val)
}

return metricName, nil
Expand All @@ -216,7 +224,7 @@ var keyPool = sync.Pool{
},
}

// GetSeriesFromLabels converts a labels.Labels to a canonical Labels object
// GetSeriesFromLabels converts a labels.Labels to a canonical model.Series object
func (t *SeriesCacheImpl) GetSeriesFromLabels(ls labels.Labels) (*model.Series, error) {
ll := make([]prompb.Label, len(ls))
for i := range ls {
Expand All @@ -242,7 +250,11 @@ func useByteAsStringNoCopy(b []byte, useAsStringFunc func(string)) {
useAsStringFunc(str)
}

// GetSeriesFromProtos converts a prompb.Label to a canonical Labels object
// GetSeriesFromProtos returns a model.Series entry given a list of Prometheus
// prompb.Label.
// If the desired entry is not in the cache, a "placeholder" model.Series entry
// is constructed and put into the cache. It is not populated with database IDs
// until a later phase, see model.Series.SetSeriesID.
func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model.Series, string, error) {
builder := keyPool.Get().(*bytes.Buffer)
builder.Reset()
Expand Down
20 changes: 19 additions & 1 deletion pkg/pgmodel/cache/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
package cache

import (
"bytes"
"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/prompb"
"math"
"strings"
"testing"

promLabels "github.com/prometheus/prometheus/model/labels"
)

func TestBigLables(t *testing.T) {
func TestBigLabels(t *testing.T) {
cache := NewSeriesCache(DefaultConfig, nil)
builder := strings.Builder{}
builder.Grow(int(^uint16(0)) + 1) // one greater than uint16 max
Expand All @@ -34,3 +37,18 @@ func TestBigLables(t *testing.T) {
t.Errorf("expected error")
}
}

func TestGenerateKey(t *testing.T) {
var labels = []prompb.Label{
{Name: "__name__", Value: "test"},
{Name: "hell", Value: "oworld"},
{Name: "hello", Value: "world"},
}
var keyBuffer = new(bytes.Buffer)
var metricName, err = generateKey(labels, keyBuffer)

require.Nil(t, err)

require.Equal(t, "test", metricName)
require.Equal(t, []byte("\x08\x00__name__\x04\x00test\x04\x00hell\x06\x00oworld\x05\x00hello\x05\x00world"), keyBuffer.Bytes())
}
4 changes: 2 additions & 2 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p
if len(ts.Labels) == 0 {
continue
}
// Normalize and canonicalize t.Labels.
// After this point t.Labels should never be used again.
// Normalize and canonicalize ts.Labels.
// After this point ts.Labels should never be used again.
series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels)
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/series_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe
for i := range pos {
res := cache.NewLabelInfo(labelIDs[i], pos[i])
key := cache.NewLabelKey(info.metricName, names[i], values[i])
if !h.labelsCache.Put(cache.NewLabelKey(info.metricName, names[i], values[i]), cache.NewLabelInfo(labelIDs[i], pos[i])) {
if !h.labelsCache.Put(key, res) {
log.Warn("failed to add label ID to inverted cache")
}
_, ok := labelMap[key]
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/model/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (s SeriesID) String() string {
return strconv.FormatInt(int64(s), 10)
}

//Epoch represents the series epoch
// SeriesEpoch represents the series epoch
type SeriesEpoch int64

// Series stores a labels.Series in its canonical string representation
// Series stores a Prometheus labels.Labels in its canonical string representation
type Series struct {
//protects names, values, seriesID, epoch
//str and metricName are immutable and doesn't need a lock
Expand Down

0 comments on commit eb5df40

Please sign in to comment.