Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Implement prometheus metrics for tracing path.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

This commit implements Prometheus metrics for tracing module that
includes:
1. Ingest
2. Query
3. Cache

This commit also puts all shared metrics into pgmodel/metrics so that
ingestor and querier can easily access them and utilize their labels for
'type'.
  • Loading branch information
Harkishen-Singh committed Feb 8, 2022
1 parent 8cd208b commit 53cc24e
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 4 deletions.
42 changes: 42 additions & 0 deletions pkg/jaeger/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/telemetry"
)
Expand Down Expand Up @@ -41,45 +44,84 @@ func (p *Query) SpanWriter() spanstore.Writer {
}

func (p *Query) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": ""}).Inc()
start := time.Now()
res, err := getTrace(ctx, p.conn, traceID)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "200"}).Inc()
traceRequestsExec.Add(1)
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "500"}).Inc()
}
return res, logError(err)
}

func (p *Query) GetServices(ctx context.Context) ([]string, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": ""}).Inc()
start := time.Now()
res, err := getServices(ctx, p.conn)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "200"}).Inc()
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "500"}).Inc()
}
return res, logError(err)
}

func (p *Query) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": ""}).Inc()
start := time.Now()
res, err := getOperations(ctx, p.conn, query)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "200"}).Inc()
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "500"}).Inc()
}
return res, logError(err)
}

func (p *Query) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": ""}).Inc()
start := time.Now()
res, err := findTraces(ctx, p.conn, query)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "200"}).Inc()
traceExecutionTime.Observe(time.Since(start).Seconds())
traceRequestsExec.Add(1)
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "500"}).Inc()
}
return res, logError(err)
}

func (p *Query) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": ""}).Inc()
start := time.Now()
res, err := findTraceIDs(ctx, p.conn, query)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "200"}).Inc()
traceRequestsExec.Add(1)
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "500"}).Inc()
}
return res, logError(err)
}

func (p *Query) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": ""}).Inc()
start := time.Now()
res, err := getDependencies(ctx, p.conn, endTs, lookback)
if err == nil {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "200"}).Inc()
dependencyRequestsExec.Add(1)
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies"}).Observe(time.Since(start).Seconds())
} else {
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "500"}).Inc()
}
return res, logError(err)
}
Expand Down
119 changes: 119 additions & 0 deletions pkg/pgmodel/cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package cache

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/util"
"sync/atomic"
"time"
)

var (
Enabled = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "cache",
Name: "enabled",
Help: "Cache is enalbed or not.",
},
[]string{"subsystem", "name"}, // type => ["trace" or "metric"] and name => name of the cache i.e., metric cache, series cache, etc.
)
capacity = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "cache",
Name: "capacity",
Help: "Cache is enabled or not.",
},
[]string{"subsystem", "name"},
)
sizeBytes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "cache",
Name: "size_bytes",
Help: "Cache size in bytes.",
},
[]string{"subsystem", "name"},
)
evictionsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "cache",
Name: "evictions_total",
Help: "Total evictions in a clockcache.",
},
[]string{"subsystem", "name"},
)
)

func init() {
prometheus.MustRegister(
Enabled,
capacity,
sizeBytes,
evictionsTotal,
)
funcs.Store([]updateFunc{})
go metricsUpdater()
}

const (
Cap = iota
Size
Evict
)

type MetricKind uint8

type updateFunc struct {
typ MetricKind
update func(metric prometheus.Collector)
}

var funcs atomic.Value

// RegisterUpdateFunc updates some metrics like SizeBytes and Capacity every 30 secs.
// Earlier these were done via supplying a func to NewGaugeFunc that called that func
// when prometheus scraped. But now we have labels, and we have to use NewGaugeVec
// which does not allow to implement a func. Hence, we have to choose the routine way
// in order to update these metrics.
func RegisterUpdateFunc(kind MetricKind, update func(metric prometheus.Collector)) {
l := funcs.Load().([]updateFunc)
switch kind {
case Cap:
l = append(l, updateFunc{kind, update})
case Size:
l = append(l, updateFunc{kind, update})
case Evict:
l = append(l, updateFunc{kind, update})
default:
panic(fmt.Sprintf("invalid kind %d", kind))
}
funcs.Store(l)
}

func metricsUpdater() {
update := time.NewTicker(time.Second * 10)
defer update.Stop()
for range update.C {
if len(funcs.Load().([]updateFunc)) == 0 {
continue
}
needUpdate := funcs.Load().([]updateFunc)
for _, f := range needUpdate {
switch f.typ {
case Cap:
f.update(capacity)
case Size:
f.update(sizeBytes)
case Evict:
f.update(evictionsTotal)
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func reportMetricsTelemetry(maxTs int64, numSamples, numMetadata uint64) {
return
}
atomic.StoreInt64(&MaxSentTimestamp, maxTs)
metrics.MaxSentTimestamp.Set(float64(maxTs))
metrics.StaleMaxSentTimestamp.Set(float64(maxTs))
}

// Get the handler for a given metric name, creating a new one if none exists
Expand Down
18 changes: 18 additions & 0 deletions pkg/pgmodel/ingestor/trace/cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package trace

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/clockcache"
pgmodelCache "github.com/timescale/promscale/pkg/pgmodel/cache"
)

const (
Expand All @@ -26,3 +28,19 @@ func newInstrumentationLibraryCache() *clockcache.Cache {
func newTagCache() *clockcache.Cache {
return clockcache.WithMax(tagCacheSize)
}

func registerToMetrics(cacheKind string, c *clockcache.Cache) {
pgmodelCache.Enabled.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind})
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Cap, func(collector prometheus.Collector) {
metric := collector.(*prometheus.GaugeVec)
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.Cap()))
})
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Size, func(collector prometheus.Collector) {
metric := collector.(*prometheus.GaugeVec)
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.SizeBytes()))
})
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Evict, func(collector prometheus.Collector) {
metric := collector.(*prometheus.CounterVec)
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Add(float64(c.Evictions()))
})
}
29 changes: 27 additions & 2 deletions pkg/pgmodel/ingestor/trace/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/timestamp"

"go.opentelemetry.io/collector/model/pdata"

"github.com/jackc/pgtype"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/timescale/promscale/pkg/clockcache"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgxconn"
tput "github.com/timescale/promscale/pkg/util/throughput"
)
Expand Down Expand Up @@ -57,13 +60,18 @@ type traceWriterImpl struct {
}

func NewWriter(conn pgxconn.PgxConn) *traceWriterImpl {
return &traceWriterImpl{
writer := &traceWriterImpl{
conn: conn,
schemaCache: newSchemaCache(),
instLibCache: newInstrumentationLibraryCache(),
opCache: newOperationCache(),
tagCache: newTagCache(),
}
registerToMetrics("schema", writer.schemaCache)
registerToMetrics("instrumentation_library", writer.instLibCache)
registerToMetrics("operation", writer.opCache)
registerToMetrics("tag", writer.tagCache)
return writer
}

func (t *traceWriterImpl) queueSpanLinks(linkBatch pgxconn.PgxBatch, tagsBatch tagBatch, links pdata.SpanLinkSlice, traceID pgtype.UUID, spanID pgtype.Int8, spanStartTime time.Time) error {
Expand Down Expand Up @@ -124,6 +132,13 @@ func getServiceName(rSpan pdata.ResourceSpans) string {
}

func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) error {
ingestStart := time.Now()
metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Inc()
defer func() {
metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Dec()
metrics.IngestDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(ingestStart).Seconds())
}()

rSpans := traces.ResourceSpans()

sURLBatch := newSchemaUrlBatch(t.schemaCache)
Expand Down Expand Up @@ -207,6 +222,7 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces)
return err
}

maxEndTimestamp := uint64(0)
spanBatch := t.conn.NewBatch()
linkBatch := t.conn.NewBatch()
eventBatch := t.conn.NewBatch()
Expand Down Expand Up @@ -265,6 +281,10 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces)

eventTimeRange := getEventTimeRange(span.Events())

if maxEndTimestamp < uint64(span.EndTimestamp()) {
maxEndTimestamp = uint64(span.EndTimestamp())
}

spanBatch.Queue(
insertSpanSQL,
traceID,
Expand All @@ -290,10 +310,15 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces)
}
}

start := time.Now()
if err := t.sendBatches(ctx, eventBatch, linkBatch, spanBatch); err != nil {
return fmt.Errorf("error sending trace batches: %w", err)
}

metrics.InsertDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(start).Seconds())
metrics.IngestedTotal.With(prometheus.Labels{"type": "spans"}).Add(float64(traces.SpanCount()))
metrics.MaxSentTimestamp.With(prometheus.Labels{"subsystem": "trace"}).Set(float64(maxEndTimestamp))

// Only report telemetry if ingestion successful.
tput.ReportSpansProcessed(timestamp.FromTime(time.Now()), traces.SpanCount())
return nil
Expand Down
Loading

0 comments on commit 53cc24e

Please sign in to comment.