diff --git a/pkg/jaeger/query/query.go b/pkg/jaeger/query/query.go index b6d117f56c..80338c19be 100644 --- a/pkg/jaeger/query/query.go +++ b/pkg/jaeger/query/query.go @@ -9,10 +9,13 @@ import ( "fmt" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgxconn" "github.com/timescale/promscale/pkg/telemetry" ) @@ -41,45 +44,84 @@ func (p *Query) SpanWriter() spanstore.Writer { } func (p *Query) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": ""}).Inc() + start := time.Now() res, err := getTrace(ctx, p.conn, traceID) if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "200"}).Inc() traceRequestsExec.Add(1) + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "500"}).Inc() } return res, logError(err) } func (p *Query) GetServices(ctx context.Context) ([]string, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": ""}).Inc() + start := time.Now() res, err := getServices(ctx, p.conn) + if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "200"}).Inc() + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "500"}).Inc() + } return res, logError(err) } func (p *Query) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": ""}).Inc() + start := time.Now() res, err := getOperations(ctx, p.conn, query) + if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "200"}).Inc() + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "500"}).Inc() + } return res, logError(err) } func (p *Query) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": ""}).Inc() start := time.Now() res, err := findTraces(ctx, p.conn, query) if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "200"}).Inc() traceExecutionTime.Observe(time.Since(start).Seconds()) traceRequestsExec.Add(1) + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "500"}).Inc() } return res, logError(err) } func (p *Query) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": ""}).Inc() + start := time.Now() res, err := findTraceIDs(ctx, p.conn, query) if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "200"}).Inc() traceRequestsExec.Add(1) + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "500"}).Inc() } return res, logError(err) } func (p *Query) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": ""}).Inc() + start := time.Now() res, err := getDependencies(ctx, p.conn, endTs, lookback) if err == nil { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "200"}).Inc() dependencyRequestsExec.Add(1) + metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies"}).Observe(time.Since(start).Seconds()) + } else { + metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "500"}).Inc() } return res, logError(err) } diff --git a/pkg/pgmodel/cache/metrics.go b/pkg/pgmodel/cache/metrics.go new file mode 100644 index 0000000000..08d4caf44c --- /dev/null +++ b/pkg/pgmodel/cache/metrics.go @@ -0,0 +1,119 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package cache + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/timescale/promscale/pkg/util" + "sync/atomic" + "time" +) + +var ( + Enabled = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "cache", + Name: "enabled", + Help: "Cache is enalbed or not.", + }, + []string{"subsystem", "name"}, // type => ["trace" or "metric"] and name => name of the cache i.e., metric cache, series cache, etc. + ) + capacity = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "cache", + Name: "capacity", + Help: "Cache is enabled or not.", + }, + []string{"subsystem", "name"}, + ) + sizeBytes = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "cache", + Name: "size_bytes", + Help: "Cache size in bytes.", + }, + []string{"subsystem", "name"}, + ) + evictionsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "cache", + Name: "evictions_total", + Help: "Total evictions in a clockcache.", + }, + []string{"subsystem", "name"}, + ) +) + +func init() { + prometheus.MustRegister( + Enabled, + capacity, + sizeBytes, + evictionsTotal, + ) + funcs.Store([]updateFunc{}) + go metricsUpdater() +} + +const ( + Cap = iota + Size + Evict +) + +type MetricKind uint8 + +type updateFunc struct { + typ MetricKind + update func(metric prometheus.Collector) +} + +var funcs atomic.Value + +// RegisterUpdateFunc updates some metrics like SizeBytes and Capacity every 30 secs. +// Earlier these were done via supplying a func to NewGaugeFunc that called that func +// when prometheus scraped. But now we have labels, and we have to use NewGaugeVec +// which does not allow to implement a func. Hence, we have to choose the routine way +// in order to update these metrics. +func RegisterUpdateFunc(kind MetricKind, update func(metric prometheus.Collector)) { + l := funcs.Load().([]updateFunc) + switch kind { + case Cap: + l = append(l, updateFunc{kind, update}) + case Size: + l = append(l, updateFunc{kind, update}) + case Evict: + l = append(l, updateFunc{kind, update}) + default: + panic(fmt.Sprintf("invalid kind %d", kind)) + } + funcs.Store(l) +} + +func metricsUpdater() { + update := time.NewTicker(time.Second * 10) + defer update.Stop() + for range update.C { + if len(funcs.Load().([]updateFunc)) == 0 { + continue + } + needUpdate := funcs.Load().([]updateFunc) + for _, f := range needUpdate { + switch f.typ { + case Cap: + f.update(capacity) + case Size: + f.update(sizeBytes) + case Evict: + f.update(evictionsTotal) + } + } + } +} diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 3a0b74b9fd..464e3f5d5d 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -275,7 +275,7 @@ func reportMetricsTelemetry(maxTs int64, numSamples, numMetadata uint64) { return } atomic.StoreInt64(&MaxSentTimestamp, maxTs) - metrics.MaxSentTimestamp.Set(float64(maxTs)) + metrics.StaleMaxSentTimestamp.Set(float64(maxTs)) } // Get the handler for a given metric name, creating a new one if none exists diff --git a/pkg/pgmodel/ingestor/trace/cache.go b/pkg/pgmodel/ingestor/trace/cache.go index 59b8eb5559..0007f83e54 100644 --- a/pkg/pgmodel/ingestor/trace/cache.go +++ b/pkg/pgmodel/ingestor/trace/cache.go @@ -1,7 +1,9 @@ package trace import ( + "github.com/prometheus/client_golang/prometheus" "github.com/timescale/promscale/pkg/clockcache" + pgmodelCache "github.com/timescale/promscale/pkg/pgmodel/cache" ) const ( @@ -26,3 +28,19 @@ func newInstrumentationLibraryCache() *clockcache.Cache { func newTagCache() *clockcache.Cache { return clockcache.WithMax(tagCacheSize) } + +func registerToMetrics(cacheKind string, c *clockcache.Cache) { + pgmodelCache.Enabled.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}) + pgmodelCache.RegisterUpdateFunc(pgmodelCache.Cap, func(collector prometheus.Collector) { + metric := collector.(*prometheus.GaugeVec) + metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.Cap())) + }) + pgmodelCache.RegisterUpdateFunc(pgmodelCache.Size, func(collector prometheus.Collector) { + metric := collector.(*prometheus.GaugeVec) + metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.SizeBytes())) + }) + pgmodelCache.RegisterUpdateFunc(pgmodelCache.Evict, func(collector prometheus.Collector) { + metric := collector.(*prometheus.CounterVec) + metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Add(float64(c.Evictions())) + }) +} diff --git a/pkg/pgmodel/ingestor/trace/writer.go b/pkg/pgmodel/ingestor/trace/writer.go index f2ca8cac15..b85d00d947 100644 --- a/pkg/pgmodel/ingestor/trace/writer.go +++ b/pkg/pgmodel/ingestor/trace/writer.go @@ -10,11 +10,14 @@ import ( "fmt" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/timestamp" + "go.opentelemetry.io/collector/model/pdata" "github.com/jackc/pgtype" - "github.com/prometheus/prometheus/model/timestamp" "github.com/timescale/promscale/pkg/clockcache" + "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgxconn" tput "github.com/timescale/promscale/pkg/util/throughput" ) @@ -57,13 +60,18 @@ type traceWriterImpl struct { } func NewWriter(conn pgxconn.PgxConn) *traceWriterImpl { - return &traceWriterImpl{ + writer := &traceWriterImpl{ conn: conn, schemaCache: newSchemaCache(), instLibCache: newInstrumentationLibraryCache(), opCache: newOperationCache(), tagCache: newTagCache(), } + registerToMetrics("schema", writer.schemaCache) + registerToMetrics("instrumentation_library", writer.instLibCache) + registerToMetrics("operation", writer.opCache) + registerToMetrics("tag", writer.tagCache) + return writer } func (t *traceWriterImpl) queueSpanLinks(linkBatch pgxconn.PgxBatch, tagsBatch tagBatch, links pdata.SpanLinkSlice, traceID pgtype.UUID, spanID pgtype.Int8, spanStartTime time.Time) error { @@ -124,6 +132,13 @@ func getServiceName(rSpan pdata.ResourceSpans) string { } func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) error { + ingestStart := time.Now() + metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Inc() + defer func() { + metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Dec() + metrics.IngestDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(ingestStart).Seconds()) + }() + rSpans := traces.ResourceSpans() sURLBatch := newSchemaUrlBatch(t.schemaCache) @@ -207,6 +222,7 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) return err } + maxEndTimestamp := uint64(0) spanBatch := t.conn.NewBatch() linkBatch := t.conn.NewBatch() eventBatch := t.conn.NewBatch() @@ -265,6 +281,10 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) eventTimeRange := getEventTimeRange(span.Events()) + if maxEndTimestamp < uint64(span.EndTimestamp()) { + maxEndTimestamp = uint64(span.EndTimestamp()) + } + spanBatch.Queue( insertSpanSQL, traceID, @@ -290,10 +310,15 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) } } + start := time.Now() if err := t.sendBatches(ctx, eventBatch, linkBatch, spanBatch); err != nil { return fmt.Errorf("error sending trace batches: %w", err) } + metrics.InsertDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(start).Seconds()) + metrics.IngestedTotal.With(prometheus.Labels{"type": "spans"}).Add(float64(traces.SpanCount())) + metrics.MaxSentTimestamp.With(prometheus.Labels{"subsystem": "trace"}).Set(float64(maxEndTimestamp)) + // Only report telemetry if ingestion successful. tput.ReportSpansProcessed(timestamp.FromTime(time.Now()), traces.SpanCount()) return nil diff --git a/pkg/pgmodel/metrics/metrics.go b/pkg/pgmodel/metrics/metrics.go index c99736d471..0b88efd092 100644 --- a/pkg/pgmodel/metrics/metrics.go +++ b/pkg/pgmodel/metrics/metrics.go @@ -58,13 +58,79 @@ var ( Help: "Total number of times leader changed per cluster.", }, []string{"cluster"}) - MaxSentTimestamp = prometheus.NewGauge( + StaleMaxSentTimestamp = prometheus.NewGauge( // This will be deprecated in the renaming PR. prometheus.GaugeOpts{ Namespace: util.PromNamespace, Name: "max_sent_timestamp_milliseconds", Help: "Maximum sample timestamp that Promscale sent to the database.", }, ) + IngestedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "ingested_total", + Help: "Total number of insertables ingested in the database.", + }, + []string{"type"}, + ) + ActiveWriteRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "active_write_requests", + Help: "Number of write requests that are active in the ingestion pipeline.", + }, + []string{"subsystem"}, + ) + InsertDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "insert_duration_seconds", + Help: "Time taken to insert a batch of samples or traces into the database.", + Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500}, + }, + []string{"subsystem"}, + ) + IngestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "duration_seconds", + Help: "Time taken to process (including filling up caches) and insert a batch of samples or traces into the database.", + Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500}, + }, + []string{"subsystem"}, + ) + MaxSentTimestamp = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "max_sent_timestamp_milliseconds", + Help: "Maximum sent timestamp into the database. For samples, it is the sample timestamp and for traces, it is the maximum end timestamp.", + }, + []string{"subsystem"}, + ) + RequestsDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "query", + Name: "duration_seconds", + Help: "Time taken by function to respond to query.", + Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500}, + }, + []string{"subsystem", "handler"}, + ) + RequestsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "query", + Name: "requests_total", + Help: "Total query requests.", + }, + []string{"subsystem", "handler", "code"}, + ) ) func init() { @@ -76,6 +142,13 @@ func init() { DuplicateMetrics, HAClusterLeaderDetails, NumOfHAClusterLeaderChanges, + StaleMaxSentTimestamp, + IngestedTotal, + ActiveWriteRequests, + InsertDuration, + IngestDuration, MaxSentTimestamp, + RequestsDuration, + RequestsTotal, ) }