-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat(routingprocessor): add option to drop resource attribute used for routing #8990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -349,6 +349,55 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { | |
}) | ||
} | ||
|
||
func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) { | ||
defaultExp := &mockTracesExporter{} | ||
tExp := &mockTracesExporter{} | ||
|
||
host := &mockHost{ | ||
Host: componenttest.NewNopHost(), | ||
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { | ||
return map[config.DataType]map[config.ComponentID]component.Exporter{ | ||
config.TracesDataType: { | ||
config.NewComponentID("otlp"): defaultExp, | ||
config.NewComponentID("otlp/2"): tExp, | ||
}, | ||
} | ||
}, | ||
} | ||
|
||
exp := newProcessor(zap.NewNop(), &Config{ | ||
AttributeSource: resourceAttributeSource, | ||
FromAttribute: "X-Tenant", | ||
DropRoutingResourceAttribute: true, | ||
DefaultExporters: []string{"otlp"}, | ||
Table: []RoutingTableItem{ | ||
{ | ||
Value: "acme", | ||
Exporters: []string{"otlp/2"}, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, exp.Start(context.Background(), host)) | ||
|
||
tr := pdata.NewTraces() | ||
rm := tr.ResourceSpans().AppendEmpty() | ||
rm.Resource().Attributes().InsertString("X-Tenant", "acme") | ||
rm.Resource().Attributes().InsertString("attr", "acme") | ||
|
||
assert.NoError(t, exp.ConsumeTraces(context.Background(), tr)) | ||
assert.Equal(t, 1, tExp.getTraceCount(), | ||
"trace should be routed to non default exporter", | ||
) | ||
require.Len(t, tExp.traces, 1) | ||
require.Equal(t, 1, tExp.traces[0].ResourceSpans().Len()) | ||
attrs := tExp.traces[0].ResourceSpans().At(0).Resource().Attributes() | ||
_, ok := attrs.Get("X-Tenant") | ||
assert.False(t, ok, "routing attribute should have been dropped") | ||
v, ok := attrs.Get("attr") | ||
assert.True(t, ok, "non-routing attributes shouldn't have been dropped") | ||
assert.Equal(t, "acme", v.StringVal()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this assertion already cover the assertion that came just before this one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe so. The other one check for the attribute for existence while this one checks its value. It might catch something tampering with the value in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. Better to have it redundant today than a missed coverage in the future. |
||
} | ||
|
||
func TestProcessorCapabilities(t *testing.T) { | ||
// prepare | ||
config := &Config{ | ||
|
@@ -464,7 +513,6 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) { | |
rm.Resource().Attributes().InsertString("X-Tenant", "acme") | ||
|
||
t.Run("non default route is properly used", func(t *testing.T) { | ||
|
||
assert.NoError(t, exp.ConsumeMetrics( | ||
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{ | ||
"X-Tenant": "acme", | ||
|
@@ -553,6 +601,55 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) { | |
}) | ||
} | ||
|
||
func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) { | ||
defaultExp := &mockMetricsExporter{} | ||
mExp := &mockMetricsExporter{} | ||
|
||
host := &mockHost{ | ||
Host: componenttest.NewNopHost(), | ||
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { | ||
return map[config.DataType]map[config.ComponentID]component.Exporter{ | ||
config.MetricsDataType: { | ||
config.NewComponentID("otlp"): defaultExp, | ||
config.NewComponentID("otlp/2"): mExp, | ||
}, | ||
} | ||
}, | ||
} | ||
|
||
exp := newProcessor(zap.NewNop(), &Config{ | ||
AttributeSource: resourceAttributeSource, | ||
FromAttribute: "X-Tenant", | ||
DropRoutingResourceAttribute: true, | ||
DefaultExporters: []string{"otlp"}, | ||
Table: []RoutingTableItem{ | ||
{ | ||
Value: "acme", | ||
Exporters: []string{"otlp/2"}, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, exp.Start(context.Background(), host)) | ||
|
||
m := pdata.NewMetrics() | ||
rm := m.ResourceMetrics().AppendEmpty() | ||
rm.Resource().Attributes().InsertString("X-Tenant", "acme") | ||
rm.Resource().Attributes().InsertString("attr", "acme") | ||
|
||
assert.NoError(t, exp.ConsumeMetrics(context.Background(), m)) | ||
assert.Equal(t, 1, mExp.getMetricCount(), | ||
"metric should be routed to non default exporter", | ||
) | ||
require.Len(t, mExp.metrics, 1) | ||
require.Equal(t, 1, mExp.metrics[0].ResourceMetrics().Len()) | ||
attrs := mExp.metrics[0].ResourceMetrics().At(0).Resource().Attributes() | ||
_, ok := attrs.Get("X-Tenant") | ||
assert.False(t, ok, "routing attribute should have been dropped") | ||
v, ok := attrs.Get("attr") | ||
assert.True(t, ok, "non routing attributes shouldn't be dropped") | ||
assert.Equal(t, "acme", v.StringVal()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto for those two lines |
||
} | ||
|
||
func TestLogs_RoutingWorks_Context(t *testing.T) { | ||
defaultExp := &mockLogsExporter{} | ||
lExp := &mockLogsExporter{} | ||
|
@@ -675,6 +772,55 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { | |
}) | ||
} | ||
|
||
func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) { | ||
defaultExp := &mockLogsExporter{} | ||
lExp := &mockLogsExporter{} | ||
|
||
host := &mockHost{ | ||
Host: componenttest.NewNopHost(), | ||
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { | ||
return map[config.DataType]map[config.ComponentID]component.Exporter{ | ||
config.LogsDataType: { | ||
config.NewComponentID("otlp"): defaultExp, | ||
config.NewComponentID("otlp/2"): lExp, | ||
}, | ||
} | ||
}, | ||
} | ||
|
||
exp := newProcessor(zap.NewNop(), &Config{ | ||
AttributeSource: resourceAttributeSource, | ||
FromAttribute: "X-Tenant", | ||
DropRoutingResourceAttribute: true, | ||
DefaultExporters: []string{"otlp"}, | ||
Table: []RoutingTableItem{ | ||
{ | ||
Value: "acme", | ||
Exporters: []string{"otlp/2"}, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, exp.Start(context.Background(), host)) | ||
|
||
l := pdata.NewLogs() | ||
rm := l.ResourceLogs().AppendEmpty() | ||
rm.Resource().Attributes().InsertString("X-Tenant", "acme") | ||
rm.Resource().Attributes().InsertString("attr", "acme") | ||
|
||
assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) | ||
assert.Equal(t, 1, lExp.getLogCount(), | ||
"log should be routed to non-default exporter", | ||
) | ||
require.Len(t, lExp.logs, 1) | ||
require.Equal(t, 1, lExp.logs[0].ResourceLogs().Len()) | ||
attrs := lExp.logs[0].ResourceLogs().At(0).Resource().Attributes() | ||
_, ok := attrs.Get("X-Tenant") | ||
assert.False(t, ok, "routing attribute should have been dropped") | ||
v, ok := attrs.Get("attr") | ||
assert.True(t, ok, "non routing attributes shouldn't be dropped") | ||
assert.Equal(t, "acme", v.StringVal()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the other comments |
||
} | ||
|
||
func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { | ||
defaultExp := &mockLogsExporter{} | ||
lExp := &mockLogsExporter{} | ||
|
@@ -752,21 +898,24 @@ type mockComponent struct{} | |
func (m *mockComponent) Start(context.Context, component.Host) error { | ||
return nil | ||
} | ||
|
||
func (m *mockComponent) Shutdown(context.Context) error { | ||
return nil | ||
} | ||
|
||
type mockMetricsExporter struct { | ||
mockComponent | ||
metricCount int32 | ||
metrics []pdata.Metrics | ||
} | ||
|
||
func (m *mockMetricsExporter) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{MutatesData: false} | ||
} | ||
|
||
func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pdata.Metrics) error { | ||
func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, metrics pdata.Metrics) error { | ||
atomic.AddInt32(&m.metricCount, 1) | ||
m.metrics = append(m.metrics, metrics) | ||
return nil | ||
} | ||
|
||
|
@@ -777,14 +926,16 @@ func (m *mockMetricsExporter) getMetricCount() int { | |
type mockLogsExporter struct { | ||
mockComponent | ||
logCount int32 | ||
logs []pdata.Logs | ||
} | ||
|
||
func (m *mockLogsExporter) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{MutatesData: false} | ||
} | ||
|
||
func (m *mockLogsExporter) ConsumeLogs(context.Context, pdata.Logs) error { | ||
func (m *mockLogsExporter) ConsumeLogs(_ context.Context, logs pdata.Logs) error { | ||
atomic.AddInt32(&m.logCount, 1) | ||
m.logs = append(m.logs, logs) | ||
return nil | ||
} | ||
|
||
|
@@ -795,14 +946,16 @@ func (m *mockLogsExporter) getLogCount() int { | |
type mockTracesExporter struct { | ||
jpkrohling marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mockComponent | ||
traceCount int32 | ||
traces []pdata.Traces | ||
} | ||
|
||
func (m *mockTracesExporter) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{MutatesData: false} | ||
} | ||
|
||
func (m *mockTracesExporter) ConsumeTraces(context.Context, pdata.Traces) error { | ||
func (m *mockTracesExporter) ConsumeTraces(_ context.Context, traces pdata.Traces) error { | ||
atomic.AddInt32(&m.traceCount, 1) | ||
m.traces = append(m.traces, traces) | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit verbose for my taste, but I can't come up with a better name :-)