Skip to content

Support exporting quantile for summary metrics #17265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/support-export-quantiles-for-summary-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsemfexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support exporting quantile for summary metrics

# One or more tracking issues related to the change
issues: [17265]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ The following exporter configuration parameters are supported.
| `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |
| `detailed_metrics` | "detailed_metrics" is the options for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value, preserve the quantile's population) | `false` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ]|


### metric_declaration
A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' labels and metric names.

Expand Down
4 changes: 4 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type Config struct {
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`

// DetailedMetrics is the options for retaining detailed datapoint values in exported metrics (e.g instead of exporting a quantile as a statistical value,
// preserve the quantile's population)
DetailedMetrics bool `mapstructure:"detailed_metrics"`

// logger is the Logger used for writing error/warning logs
logger *zap.Logger
}
Expand Down
136 changes: 79 additions & 57 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,27 @@
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"

import (
"fmt"
"strconv"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"golang.org/x/exp/maps"

aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)

var deltaMetricCalculator = aws.NewFloat64DeltaCalculator()
var summaryMetricCalculator = aws.NewMetricCalculator(calculateSummaryDelta)
const (
summaryCountSuffix = "_count"
summarySumSuffix = "_sum"
)

var (
deltaMetricCalculator = aws.NewFloat64DeltaCalculator()
summaryMetricCalculator = aws.NewMetricCalculator(calculateSummaryDelta)
)

func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, timestampMs time.Time) (interface{}, bool) {
metricEntry := val.(summaryMetricEntry)
Expand All @@ -43,22 +53,24 @@ func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, timestampMs t

// dataPoint represents a processed metric data point
type dataPoint struct {
name string
value interface{}
labels map[string]string
timestampMs int64
}

// dataPoints is a wrapper interface for:
// dataPointSlice is a wrapper interface for:
// - pmetric.NumberDataPointSlice
// - pmetric.HistogramDataPointSlice
// - pmetric.SummaryDataPointSlice
type dataPoints interface {
Len() int
// At gets the adjusted datapoint from the DataPointSlice at i-th index.
// CalculateDeltaDatapoints calculates the delta datapoint from the DataPointSlice at i-th index
// for some type (Counter, Summary)
// dataPoint: the adjusted data point
// retained: indicates whether the data point is valid for further process
// NOTE: It is an expensive call as it calculates the metric value.
At(i int) (dataPoint dataPoint, retained bool)
CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) (dataPoint []dataPoint, retained bool)
}

// deltaMetricMetadata contains the metadata required to perform rate/delta calculation
Expand All @@ -72,20 +84,20 @@ type deltaMetricMetadata struct {

// numberDataPointSlice is a wrapper for pmetric.NumberDataPointSlice
type numberDataPointSlice struct {
instrumentationLibraryName string
deltaMetricMetadata
pmetric.NumberDataPointSlice
}

// histogramDataPointSlice is a wrapper for pmetric.HistogramDataPointSlice
type histogramDataPointSlice struct {
instrumentationLibraryName string
// Todo:(khanhntd) Calculate delta value for count and sum value with histogram
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
deltaMetricMetadata
pmetric.HistogramDataPointSlice
}

// summaryDataPointSlice is a wrapper for pmetric.SummaryDataPointSlice
type summaryDataPointSlice struct {
instrumentationLibraryName string
deltaMetricMetadata
pmetric.SummaryDataPointSlice
}
Expand All @@ -95,10 +107,10 @@ type summaryMetricEntry struct {
count uint64
}

// At retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) At(i int) (dataPoint, bool) {
// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), dps.instrumentationLibraryName)
labels := createLabels(metric.Attributes(), instrumentationScopeName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

var metricVal float64
Expand All @@ -110,12 +122,13 @@ func (dps numberDataPointSlice) At(i int) (dataPoint, bool) {
}

retained := true

if dps.adjustToDelta {
var deltaVal interface{}
mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
deltaVal, retained = deltaMetricCalculator.Calculate(mKey, metricVal, metric.Timestamp().AsTime())
if !retained {
return dataPoint{}, retained
return nil, retained
}
// It should not happen in practice that the previous metric value is smaller than the current one.
// If it happens, we assume that the metric is reset for some reason.
Expand All @@ -124,20 +137,17 @@ func (dps numberDataPointSlice) At(i int) (dataPoint, bool) {
}
}

return dataPoint{
value: metricVal,
labels: labels,
timestampMs: timestampMs,
}, retained
return []dataPoint{{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs}}, retained
}

// At retrieves the HistogramDataPoint at the given index.
func (dps histogramDataPointSlice) At(i int) (dataPoint, bool) {
// CalculateDeltaDatapoints retrieves the HistogramDataPoint at the given index.
func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.HistogramDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), dps.instrumentationLibraryName)
labels := createLabels(metric.Attributes(), instrumentationScopeName)
timestamp := unixNanoToMilliseconds(metric.Timestamp())

return dataPoint{
return []dataPoint{{
name: dps.metricName,
value: &cWMetricStats{
Count: metric.Count(),
Sum: metric.Sum(),
Expand All @@ -146,45 +156,57 @@ func (dps histogramDataPointSlice) At(i int) (dataPoint, bool) {
},
labels: labels,
timestampMs: timestamp,
}, true
}}, true
}

// At retrieves the SummaryDataPoint at the given index.
func (dps summaryDataPointSlice) At(i int) (dataPoint, bool) {
// CalculateDeltaDatapoints retrieves the SummaryDataPoint at the given index and perform calculation with sum and count while retain the quantile value.
func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.SummaryDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), dps.instrumentationLibraryName)
labels := createLabels(metric.Attributes(), instrumentationScopeName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

sum := metric.Sum()
count := metric.Count()

retained := true
datapoints := []dataPoint{}

if dps.adjustToDelta {
var delta interface{}
mKey := aws.NewKey(dps.deltaMetricMetadata, labels)

delta, retained = summaryMetricCalculator.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime())
if !retained {
return dataPoint{}, retained
return datapoints, retained
}
summaryMetricDelta := delta.(summaryMetricEntry)
sum = summaryMetricDelta.sum
count = summaryMetricDelta.count
}

metricVal := &cWMetricStats{
Count: count,
Sum: sum,
}
if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
metricVal.Min = quantileValues.At(0).Value()
metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
if detailedMetrics {
// Instead of sending metrics as a Statistical Set (contains min,max, count, sum), the emfexporter will enrich the
// values by sending each quantile values as a datapoint (from quantile 0 ... 1)
values := metric.QuantileValues()
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summarySumSuffix), value: sum, labels: labels, timestampMs: timestampMs})
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summaryCountSuffix), value: count, labels: labels, timestampMs: timestampMs})

for i := 0; i < values.Len(); i++ {
cLabels := maps.Clone(labels)
quantile := values.At(i)
cLabels["quantile"] = strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64)
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: quantile.Value(), labels: cLabels, timestampMs: timestampMs})

}
} else {
metricVal := &cWMetricStats{Count: count, Sum: sum}
if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
metricVal.Min = quantileValues.At(0).Value()
metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
}
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs})
}

return dataPoint{
value: metricVal,
labels: labels,
timestampMs: timestampMs,
}, retained
return datapoints, retained
}

// createLabels converts OTel AttributesMap attributes to a map
Expand All @@ -197,43 +219,43 @@ func createLabels(attributes pcommon.Map, instrLibName string) map[string]string
})

// Add OTel instrumentation lib name as an additional label if it is defined
if instrLibName != noInstrumentationLibraryName {
if instrLibName != "" {
labels[oTellibDimensionKey] = instrLibName
}

return labels
}

// getDataPoints retrieves data points from OT Metric.
func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) (dps dataPoints) {
adjusterMetadata := deltaMetricMetadata{
false,
pmd.Name(),
metadata.namespace,
metadata.logGroup,
metadata.logStream,
func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) dataPoints {
metricMetadata := deltaMetricMetadata{
adjustToDelta: false,
metricName: pmd.Name(),
namespace: metadata.namespace,
logGroup: metadata.logGroup,
logStream: metadata.logStream,
}

var dps dataPoints

switch pmd.Type() {
case pmetric.MetricTypeGauge:
metric := pmd.Gauge()
dps = numberDataPointSlice{
metadata.instrumentationLibraryName,
adjusterMetadata,
metricMetadata,
metric.DataPoints(),
}
case pmetric.MetricTypeSum:
metric := pmd.Sum()
adjusterMetadata.adjustToDelta = metric.AggregationTemporality() == pmetric.AggregationTemporalityCumulative
metricMetadata.adjustToDelta = metric.AggregationTemporality() == pmetric.AggregationTemporalityCumulative
dps = numberDataPointSlice{
metadata.instrumentationLibraryName,
adjusterMetadata,
metricMetadata,
metric.DataPoints(),
}
case pmetric.MetricTypeHistogram:
metric := pmd.Histogram()
dps = histogramDataPointSlice{
metadata.instrumentationLibraryName,
metricMetadata,
metric.DataPoints(),
}
case pmetric.MetricTypeSummary:
Expand All @@ -244,10 +266,9 @@ func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Lo
// attribute processor) from resource metrics. If it exists, and equals to prometheus, the sum and count will be
// converted.
// For more information: https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/prometheusreceiver/DESIGN.md#summary
adjusterMetadata.adjustToDelta = metadata.receiver == prometheusReceiver
metricMetadata.adjustToDelta = metadata.receiver == prometheusReceiver
dps = summaryDataPointSlice{
metadata.instrumentationLibraryName,
adjusterMetadata,
metricMetadata,
metric.DataPoints(),
}
default:
Expand All @@ -257,5 +278,6 @@ func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Lo
zap.String("Unit", pmd.Unit()),
)
}
return

return dps
}
Loading