Skip to content

feat(routingprocessor): add option to drop resource attribute used for routing #8990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### 💡 Enhancements 💡

- `routingprocessor`: add option to drop resource attribute used for routing (#8990)

### 🛑 Breaking changes 🛑

- `filelogreceiver`, `journaldreceiver`, `syslogreceiver`, `tcplogreceiver`, `udplogreceiver`:
Expand Down
1 change: 1 addition & 0 deletions processor/routingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The following settings can be optionally configured:
- `attribute_source` defines where to look for the attribute in `from_attribute`. The allowed values are:
- `context` (the default) - to search the [context][context_docs], which includes HTTP headers
- `resource` - to search the resource attributes.
- `drop_resource_routing_attribute` - controls whether to remove the resource attribute used for routing. This is only relevant if AttributeSource is set to resource.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit verbose for my taste, but I can't come up with a better name :-)

- `default_exporters` contains the list of exporters to use when a more specific record can't be found in the routing table.

Example:
Expand Down
10 changes: 10 additions & 0 deletions processor/routingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package routingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -44,6 +45,11 @@ type Config struct {
// Required.
FromAttribute string `mapstructure:"from_attribute"`

// DropRoutingResourceAttribute controls whether to remove the resource attribute used for routing.
// This is only relevant if AttributeSource is set to resource.
// Optional.
DropRoutingResourceAttribute bool `mapstructure:"drop_resource_routing_attribute"`

// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`
Expand Down Expand Up @@ -76,6 +82,10 @@ func (c *Config) Validate() error {
)
}

if c.AttributeSource != resourceAttributeSource && c.DropRoutingResourceAttribute {
return errors.New("using a different attribute source than 'attribute' and drop_resource_routing_attribute is set to true")
}

return nil
}

Expand Down
161 changes: 157 additions & 4 deletions processor/routingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,55 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
})
}

func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) {
defaultExp := &mockTracesExporter{}
tExp := &mockTracesExporter{}

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): tExp,
},
}
},
}

exp := newProcessor(zap.NewNop(), &Config{
AttributeSource: resourceAttributeSource,
FromAttribute: "X-Tenant",
DropRoutingResourceAttribute: true,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"otlp/2"},
},
},
})
require.NoError(t, exp.Start(context.Background(), host))

tr := pdata.NewTraces()
rm := tr.ResourceSpans().AppendEmpty()
rm.Resource().Attributes().InsertString("X-Tenant", "acme")
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeTraces(context.Background(), tr))
assert.Equal(t, 1, tExp.getTraceCount(),
"trace should be routed to non default exporter",
)
require.Len(t, tExp.traces, 1)
require.Equal(t, 1, tExp.traces[0].ResourceSpans().Len())
attrs := tExp.traces[0].ResourceSpans().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
assert.True(t, ok, "non-routing attributes shouldn't have been dropped")
assert.Equal(t, "acme", v.StringVal())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this assertion already cover the assertion that came just before this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so. The other one check for the attribute for existence while this one checks its value. It might catch something tampering with the value in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Better to have it redundant today than a missed coverage in the future.

}

func TestProcessorCapabilities(t *testing.T) {
// prepare
config := &Config{
Expand Down Expand Up @@ -464,7 +513,6 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
rm.Resource().Attributes().InsertString("X-Tenant", "acme")

t.Run("non default route is properly used", func(t *testing.T) {

assert.NoError(t, exp.ConsumeMetrics(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "acme",
Expand Down Expand Up @@ -553,6 +601,55 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) {
})
}

func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) {
defaultExp := &mockMetricsExporter{}
mExp := &mockMetricsExporter{}

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
},
}
},
}

exp := newProcessor(zap.NewNop(), &Config{
AttributeSource: resourceAttributeSource,
FromAttribute: "X-Tenant",
DropRoutingResourceAttribute: true,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"otlp/2"},
},
},
})
require.NoError(t, exp.Start(context.Background(), host))

m := pdata.NewMetrics()
rm := m.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().InsertString("X-Tenant", "acme")
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeMetrics(context.Background(), m))
assert.Equal(t, 1, mExp.getMetricCount(),
"metric should be routed to non default exporter",
)
require.Len(t, mExp.metrics, 1)
require.Equal(t, 1, mExp.metrics[0].ResourceMetrics().Len())
attrs := mExp.metrics[0].ResourceMetrics().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
assert.True(t, ok, "non routing attributes shouldn't be dropped")
assert.Equal(t, "acme", v.StringVal())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for those two lines

}

func TestLogs_RoutingWorks_Context(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExp := &mockLogsExporter{}
Expand Down Expand Up @@ -675,6 +772,55 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
})
}

func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExp := &mockLogsExporter{}

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.LogsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
},
}
},
}

exp := newProcessor(zap.NewNop(), &Config{
AttributeSource: resourceAttributeSource,
FromAttribute: "X-Tenant",
DropRoutingResourceAttribute: true,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"otlp/2"},
},
},
})
require.NoError(t, exp.Start(context.Background(), host))

l := pdata.NewLogs()
rm := l.ResourceLogs().AppendEmpty()
rm.Resource().Attributes().InsertString("X-Tenant", "acme")
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))
assert.Equal(t, 1, lExp.getLogCount(),
"log should be routed to non-default exporter",
)
require.Len(t, lExp.logs, 1)
require.Equal(t, 1, lExp.logs[0].ResourceLogs().Len())
attrs := lExp.logs[0].ResourceLogs().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
assert.True(t, ok, "non routing attributes shouldn't be dropped")
assert.Equal(t, "acme", v.StringVal())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other comments

}

func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExp := &mockLogsExporter{}
Expand Down Expand Up @@ -752,21 +898,24 @@ type mockComponent struct{}
func (m *mockComponent) Start(context.Context, component.Host) error {
return nil
}

func (m *mockComponent) Shutdown(context.Context) error {
return nil
}

type mockMetricsExporter struct {
mockComponent
metricCount int32
metrics []pdata.Metrics
}

func (m *mockMetricsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pdata.Metrics) error {
func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, metrics pdata.Metrics) error {
atomic.AddInt32(&m.metricCount, 1)
m.metrics = append(m.metrics, metrics)
return nil
}

Expand All @@ -777,14 +926,16 @@ func (m *mockMetricsExporter) getMetricCount() int {
type mockLogsExporter struct {
mockComponent
logCount int32
logs []pdata.Logs
}

func (m *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockLogsExporter) ConsumeLogs(context.Context, pdata.Logs) error {
func (m *mockLogsExporter) ConsumeLogs(_ context.Context, logs pdata.Logs) error {
atomic.AddInt32(&m.logCount, 1)
m.logs = append(m.logs, logs)
return nil
}

Expand All @@ -795,14 +946,16 @@ func (m *mockLogsExporter) getLogCount() int {
type mockTracesExporter struct {
mockComponent
traceCount int32
traces []pdata.Traces
}

func (m *mockTracesExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockTracesExporter) ConsumeTraces(context.Context, pdata.Traces) error {
func (m *mockTracesExporter) ConsumeTraces(_ context.Context, traces pdata.Traces) error {
atomic.AddInt32(&m.traceCount, 1)
m.traces = append(m.traces, traces)
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions processor/routingprocessor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (r *router) RouteMetrics(ctx context.Context, tm pdata.Metrics) []routedMet
}
}

func (r *router) removeRoutingAttribute(resource pdata.Resource) {
resource.Attributes().Remove(r.config.FromAttribute)
}

func (r *router) routeMetricsForResource(_ context.Context, tm pdata.Metrics) []routedMetrics {
// routingEntry is used to group pdata.ResourceMetrics that are routed to
// the same set of exporters.
Expand All @@ -92,6 +96,9 @@ func (r *router) routeMetricsForResource(_ context.Context, tm pdata.Metrics) []
// If we have an exporter list defined for that attribute value then use it.
if e, ok := r.metricsExporters[attrValue]; ok {
exp = e
if r.config.DropRoutingResourceAttribute {
r.removeRoutingAttribute(resMetrics.Resource())
}
}

if rEntry, ok := routingMap[attrValue]; ok {
Expand Down Expand Up @@ -178,6 +185,9 @@ func (r *router) routeTracesForResource(_ context.Context, tr pdata.Traces) []ro
// If we have an exporter list defined for that attribute value then use it.
if e, ok := r.tracesExporters[attrValue]; ok {
exp = e
if r.config.DropRoutingResourceAttribute {
r.removeRoutingAttribute(resSpans.Resource())
}
}

if rEntry, ok := routingMap[attrValue]; ok {
Expand Down Expand Up @@ -264,6 +274,9 @@ func (r *router) routeLogsForResource(_ context.Context, tl pdata.Logs) []routed
// If we have an exporter list defined for that attribute value then use it.
if e, ok := r.logsExporters[attrValue]; ok {
exp = e
if r.config.DropRoutingResourceAttribute {
r.removeRoutingAttribute(resLogs.Resource())
}
}

if rEntry, ok := routingMap[attrValue]; ok {
Expand Down