Skip to content

[processor/transform] Add functionality to break up a summary metric into Sum metrics. #11041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 21, 2022
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

## 🛑 Breaking changes 🛑
- `transformprocessor`: `metric.is_monotonic` is now accessed via a bool literal instead of a string. (#10473)

- `vcenterreceiver`: Changed the attribute `effective` on `vcenter.cluster.host.count` as it will now be reported as a bool rather than a string (#10914)

### 🚩 Deprecations 🚩
Expand All @@ -23,6 +22,7 @@

- `cmd/mdatagen`: Allow attribute values of any types (#9245)
- `transformprocessor`: Add byte slice literal to the grammar. Add new SpanID and TraceID functions that take a byte slice and return a Span/Trace ID. (#10487)
- `transformprocessor`: Add Summary transform functions. (#11041)
- `elasticsearchreceiver`: Add integration test for elasticsearch receiver (#10165)
- `datadogexporter`: Some config validation and unmarshaling steps are now done on `Validate` and `Unmarshal` instead of `Sanitize` (#8829)
- `examples`: Add an example for scraping Couchbase metrics (#10894)
Expand Down
6 changes: 6 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#gauge). Use at your own risk.

- `convert_summary_count_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Count" Value. Noop for metrics that are not of type "Summary".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

- `convert_summary_sum_val_to_sum(aggregation_temporality, is_monotonic)` - Creates a new Sum metric out of incoming metrics of type "Summary" with a "Sum" Value. Noop for metrics that are not of type "Summary".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use at your own risk.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func convertSummaryCountValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_count")
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetIntVal(int64(dp.Count()))
}
return nil
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func getTestSummaryMetric() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetDataType(pmetric.MetricDataTypeSummary)
metricInput.SetName("summary_metric")
input := metricInput.Summary().DataPoints().AppendEmpty()
input.SetCount(100)
input.SetSum(12.34)

qVal1 := input.QuantileValues().AppendEmpty()
qVal1.SetValue(1)
qVal1.SetQuantile(.99)

qVal2 := input.QuantileValues().AppendEmpty()
qVal2.SetValue(2)
qVal2.SetQuantile(.95)

qVal3 := input.QuantileValues().AppendEmpty()
qVal3.SetValue(3)
qVal3.SetQuantile(.50)

attrs := getTestAttributes()
attrs.CopyTo(input.Attributes())
return metricInput
}

func getTestAttributes() pcommon.Map {
attrs := pcommon.NewMap()
attrs.InsertString("test", "hello world")
attrs.InsertInt("test2", 3)
attrs.InsertBool("test3", true)
return attrs
}

func TestConvertSummarySumValToSum(t *testing.T) {
tests := []struct {
name string
inv common.Invocation
want func(pmetric.MetricSlice)
}{
{
name: "convert_summary_sum_val_to_sum",
inv: common.Invocation{
Function: "convert_summary_sum_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_sum")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(12.34)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath)
assert.NoError(t, err)
evaluate(metricTransformContext{
il: pcommon.NewInstrumentationScope(),
resource: pcommon.NewResource(),
metric: summaryMetric,
metrics: actualMetrics,
})

expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func convertSummarySumValToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSummary {
return nil
}

sumMetric := mtc.GetMetrics().AppendEmpty()
sumMetric.SetDescription(metric.Description())
sumMetric.SetName(metric.Name() + "_sum")
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(aggTemp)
sumMetric.Sum().SetIsMonotonic(monotonic)

sumDps := sumMetric.Sum().DataPoints()
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
sumDp := sumDps.AppendEmpty()
dp.Attributes().CopyTo(sumDp.Attributes())
sumDp.SetDoubleVal(dp.Sum())
}
return nil
}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common/testhelper"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestConvertSummaryCountValToSum(t *testing.T) {
tests := []struct {
name string
inv common.Invocation
want func(pmetric.MetricSlice)
}{
{
name: "convert_summary_count_val_to_sum",
inv: common.Invocation{
Function: "convert_summary_count_val_to_sum",
Arguments: []common.Value{
{
String: testhelper.Strp("delta"),
},
{
Bool: (*common.Boolean)(testhelper.Boolp(false)),
},
},
},
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
sumMetric := metrics.AppendEmpty()
sumMetric.SetDataType(pmetric.MetricDataTypeSum)
sumMetric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
sumMetric.Sum().SetIsMonotonic(false)

sumMetric.SetName("summary_metric_count")
dp := sumMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(100)

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := common.NewFunctionCall(tt.inv, DefaultFunctions(), ParsePath)
assert.NoError(t, err)
evaluate(metricTransformContext{
il: pcommon.NewInstrumentationScope(),
resource: pcommon.NewResource(),
metric: summaryMetric,
metrics: actualMetrics,
})

expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
})
}
}
6 changes: 4 additions & 2 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_summary_sum_val_to_sum": convertSummarySumValToSum,
"convert_summary_count_val_to_sum": convertSummaryCountValToSum,
}

func init() {
Expand Down
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type metricTransformContext struct {
dataPoint interface{}
metric pmetric.Metric
metrics pmetric.MetricSlice
il pcommon.InstrumentationScope
resource pcommon.Resource
}
Expand All @@ -47,6 +48,10 @@ func (ctx metricTransformContext) GetMetric() pmetric.Metric {
return ctx.metric
}

func (ctx metricTransformContext) GetMetrics() pmetric.MetricSlice {
return ctx.metrics
}

// pathGetSetter is a getSetter which has been resolved using a path expression provided by a user.
type pathGetSetter struct {
getter common.ExprFunc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (p *Processor) ProcessMetrics(_ context.Context, td pmetric.Metrics) (pmetr
smetrics := rmetrics.ScopeMetrics().At(j)
ctx.il = smetrics.Scope()
metrics := smetrics.Metrics()
ctx.metrics = metrics
for k := 0; k < metrics.Len(); k++ {
ctx.metric = metrics.At(k)
switch ctx.metric.DataType() {
Expand Down