diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index 01758c2eab1..10de32d8e8e 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -150,7 +150,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.store = store - f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans, true) + f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans) f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName}) f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName}) @@ -176,7 +176,7 @@ func initializeDir(path string) { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - tr := badgerStore.NewTraceReader(f.store, f.cache) + tr := badgerStore.NewTraceReader(f.store, f.cache, true) return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil } diff --git a/plugin/storage/badger/spanstore/cache.go b/plugin/storage/badger/spanstore/cache.go index 29dd1eb7043..9bebc6c63b7 100644 --- a/plugin/storage/badger/spanstore/cache.go +++ b/plugin/storage/badger/spanstore/cache.go @@ -25,83 +25,41 @@ type CacheStore struct { } // NewCacheStore returns initialized CacheStore for badger use -func NewCacheStore(db *badger.DB, ttl time.Duration, prefill bool) *CacheStore { +func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore { cs := &CacheStore{ services: make(map[string]uint64), operations: make(map[string]map[string]uint64), ttl: ttl, store: db, } - - if prefill { - cs.populateCaches() - } return cs } -func (c *CacheStore) populateCaches() { +// AddService fills the services into the cache with the most updated expiration time +func (c *CacheStore) AddService(service string, keyTTL uint64) { c.cacheLock.Lock() defer c.cacheLock.Unlock() - - c.loadServices() - - for k := range c.services { - c.loadOperations(k) - } -} - -func (c *CacheStore) loadServices() { - c.store.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - it := txn.NewIterator(opts) - defer it.Close() - - serviceKey := []byte{serviceNameIndexKey} - - // Seek all the services first - for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() { - timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64) - serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) - keyTTL := it.Item().ExpiresAt() - if v, found := c.services[serviceName]; found { - if v > keyTTL { - continue - } - } - c.services[serviceName] = keyTTL + if v, found := c.services[service]; found { + if v > keyTTL { + return } - return nil - }) + } + c.services[service] = keyTTL } -func (c *CacheStore) loadOperations(service string) { - c.store.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - it := txn.NewIterator(opts) - defer it.Close() - - serviceKey := make([]byte, len(service)+1) - serviceKey[0] = operationNameIndexKey - copy(serviceKey[1:], service) - - // Seek all the services first - for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() { - timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64) - operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) - keyTTL := it.Item().ExpiresAt() - if _, found := c.operations[service]; !found { - c.operations[service] = make(map[string]uint64) - } - - if v, found := c.operations[service][operationName]; found { - if v > keyTTL { - continue - } - } - c.operations[service][operationName] = keyTTL +// AddOperation adds the cache with operation names with most updated expiration time +func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) { + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + if _, found := c.operations[service]; !found { + c.operations[service] = make(map[string]uint64) + } + if v, found := c.operations[service][operation]; found { + if v > keyTTL { + return } - return nil - }) + } + c.operations[service][operation] = keyTTL } // Update caches the results of service and service + operation indexes and maintains their TTL diff --git a/plugin/storage/badger/spanstore/cache_test.go b/plugin/storage/badger/spanstore/cache_test.go index 5b4a73d281a..8d3fa151fb0 100644 --- a/plugin/storage/badger/spanstore/cache_test.go +++ b/plugin/storage/badger/spanstore/cache_test.go @@ -10,8 +10,6 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/jaegertracing/jaeger/model" ) /* @@ -20,7 +18,7 @@ import ( func TestExpiredItems(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { - cache := NewCacheStore(store, time.Duration(-1*time.Hour), false) + cache := NewCacheStore(store, time.Duration(-1*time.Hour)) expireTime := uint64(time.Now().Add(cache.ttl).Unix()) @@ -55,45 +53,6 @@ func TestExpiredItems(t *testing.T) { }) } -func TestOldReads(t *testing.T) { - runWithBadger(t, func(store *badger.DB, t *testing.T) { - timeNow := model.TimeAsEpochMicroseconds(time.Now()) - s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0}) - s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0}) - - tid := time.Now().Add(1 * time.Minute) - - writer := func() { - store.Update(func(txn *badger.Txn) error { - txn.SetEntry(&badger.Entry{ - Key: s1Key, - ExpiresAt: uint64(tid.Unix()), - }) - txn.SetEntry(&badger.Entry{ - Key: s1o1Key, - ExpiresAt: uint64(tid.Unix()), - }) - return nil - }) - } - - cache := NewCacheStore(store, time.Duration(-1*time.Hour), false) - writer() - - nuTid := tid.Add(1 * time.Hour) - - cache.Update("service1", "operation1", uint64(tid.Unix())) - cache.services["service1"] = uint64(nuTid.Unix()) - cache.operations["service1"]["operation1"] = uint64(nuTid.Unix()) - - cache.populateCaches() - - // Now make sure we didn't use the older timestamps from the DB - assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"]) - assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"]) - }) -} - // func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader)) { func runWithBadger(t *testing.T, test func(store *badger.DB, t *testing.T)) { opts := badger.DefaultOptions("") diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 810f774a2c2..ef122c109ea 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -74,11 +74,18 @@ type executionPlan struct { } // NewTraceReader returns a TraceReader with cache -func NewTraceReader(db *badger.DB, c *CacheStore) *TraceReader { - return &TraceReader{ +func NewTraceReader(db *badger.DB, c *CacheStore, prefillCache bool) *TraceReader { + reader := &TraceReader{ store: db, cache: c, } + if prefillCache { + services := reader.preloadServices() + for _, service := range services { + reader.preloadOperations(service) + } + } + return reader } func decodeValue(val []byte, encodeType byte) (*model.Span, error) { @@ -612,3 +619,48 @@ func scanRangeFunction(it *badger.Iterator, indexEndValue []byte) bool { } return false } + +// preloadServices fills the cache with services after extracting from badger +func (r *TraceReader) preloadServices() []string { + var services []string + r.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + serviceKey := []byte{serviceNameIndexKey} + + // Seek all the services first + for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() { + timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64) + serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) + keyTTL := it.Item().ExpiresAt() + services = append(services, serviceName) + r.cache.AddService(serviceName, keyTTL) + } + return nil + }) + return services +} + +// preloadOperations extract all operations for a specified service +func (r *TraceReader) preloadOperations(service string) { + r.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + serviceKey := make([]byte, len(service)+1) + serviceKey[0] = operationNameIndexKey + copy(serviceKey[1:], service) + + // Seek all the services first + for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() { + timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64) + operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) + keyTTL := it.Item().ExpiresAt() + r.cache.AddOperation(service, operationName, keyTTL) + } + return nil + }) +} diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index 01a78a820cd..4cf42a5dd29 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -23,9 +23,9 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour), true) + cache := NewCacheStore(store, time.Duration(1*time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache) + rw := NewTraceReader(store, cache, true) sw.encodingType = jsonEncoding err := sw.WriteSpan(context.Background(), &testSpan) @@ -40,7 +40,7 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour), true) + cache := NewCacheStore(store, time.Duration(1*time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) // rw := NewTraceReader(store, cache) @@ -53,9 +53,9 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour), true) + cache := NewCacheStore(store, time.Duration(1*time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache) + rw := NewTraceReader(store, cache, true) err := sw.WriteSpan(context.Background(), &testSpan) require.NoError(t, err) @@ -92,9 +92,9 @@ func TestDecodeErrorReturns(t *testing.T) { func TestDuplicateTraceIDDetection(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour), true) + cache := NewCacheStore(store, time.Duration(1*time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache) + rw := NewTraceReader(store, cache, true) origStartTime := testSpan.StartTime traceCount := 128 @@ -189,3 +189,43 @@ func TestMergeJoin(t *testing.T) { chk.Len(merged, 2) chk.Equal(uint32(2), binary.BigEndian.Uint32(merged[1])) } + +func TestOldReads(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + timeNow := model.TimeAsEpochMicroseconds(time.Now()) + s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0}) + s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0}) + + tid := time.Now().Add(1 * time.Minute) + + writer := func() { + store.Update(func(txn *badger.Txn) error { + txn.SetEntry(&badger.Entry{ + Key: s1Key, + ExpiresAt: uint64(tid.Unix()), + }) + txn.SetEntry(&badger.Entry{ + Key: s1o1Key, + ExpiresAt: uint64(tid.Unix()), + }) + return nil + }) + } + + cache := NewCacheStore(store, time.Duration(-1*time.Hour)) + writer() + + nuTid := tid.Add(1 * time.Hour) + + cache.Update("service1", "operation1", uint64(tid.Unix())) + cache.services["service1"] = uint64(nuTid.Unix()) + cache.operations["service1"]["operation1"] = uint64(nuTid.Unix()) + + // This is equivalent to populate caches of cache + _ = NewTraceReader(store, cache, true) + + // Now make sure we didn't use the older timestamps from the DB + assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"]) + assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"]) + }) +}