Skip to content

Commit

Permalink
[elasticsearchexporter] Read and honour data_stream.type attribute …
Browse files Browse the repository at this point in the history
…in creating dynamic indexes (#38000)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Currently in ES exporter, data_stream naming scheme is dependent on
pipeline signal type. For example, pipeline with logs signal will always
create `logs-<dataset>-<namespace> dynamic index`

This is a proposal to support `data_stream.type attribute` that will
override signal type if set. The data_stream would then look like
`<type>-<dataset>-<namespace>`

This is limited to bodymap mode.

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added UT

---------

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
  • Loading branch information
khushijain21 and carsonip authored Feb 25, 2025
1 parent a387ad5 commit ff465fd
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
13 changes: 13 additions & 0 deletions .chloggen/support-datastream-type-bodymap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for `data_stream.type` attribute to create dynamic index in bodymap mode.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38000]
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ This can be customised through the following settings:
- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`

- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
- `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. In a special case with `mapping::mode: bodymap`, `data_stream.type` field (valid values: `logs`, `metrics`) is also supported to dynamically construct index in the form `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html).

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
Expand Down
32 changes: 21 additions & 11 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newDocumentRouter(mode MappingMode, dynamicIndex bool, defaultIndex string,
if dynamicIndex {
router = dynamicDocumentRouter{
index: elasticsearch.Index{Index: defaultIndex},
otel: mode == MappingOTel,
mode: mode,
}
} else {
router = staticDocumentRouter{
Expand Down Expand Up @@ -95,23 +95,23 @@ func (r staticDocumentRouter) route(_ pcommon.Resource, _ pcommon.Instrumentatio

type dynamicDocumentRouter struct {
index elasticsearch.Index
otel bool
mode MappingMode
}

func (r dynamicDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs)
}

func (r dynamicDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeMetrics), nil
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeMetrics)
}

func (r dynamicDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeTraces), nil
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeTraces)
}

func (r dynamicDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs)
}

type logstashDocumentRouter struct {
Expand Down Expand Up @@ -151,9 +151,9 @@ func routeRecord(
scope pcommon.InstrumentationScope,
recordAttr pcommon.Map,
index string,
otel bool,
mode MappingMode,
defaultDSType string,
) elasticsearch.Index {
) (elasticsearch.Index, error) {
resourceAttr := resource.Attributes()
scopeAttr := scope.Attributes()

Expand All @@ -164,12 +164,22 @@ func routeRecord(
// 4. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr)

dsType := defaultDSType
// if mapping mode is bodymap, allow overriding data_stream.type
if mode == MappingBodyMap {
dsType, _ = getFromAttributes(elasticsearch.DataStreamType, defaultDSType, recordAttr, scopeAttr, resourceAttr)
if dsType != "logs" && dsType != "metrics" {
return elasticsearch.Index{}, fmt.Errorf("data_stream.type cannot be other than logs or metrics")
}
}

dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)}
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)}, nil
}
}

Expand All @@ -185,11 +195,11 @@ func routeRecord(
// For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the built-in logs-*.otel-* index template.
var datasetSuffix string
if otel {
if mode == MappingOTel {
datasetSuffix += ".otel"
}

dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix)
namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "")
return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace)
return elasticsearch.NewDataStreamIndex(dsType, dataset, namespace), nil
}
64 changes: 46 additions & 18 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (

type routeTestCase struct {
name string
otel bool
mode MappingMode
scopeName string
want elasticsearch.Index
}

func createRouteTests(dsType string) []routeTestCase {
renderWantRoute := func(dsType, dsDataset string, otel bool) elasticsearch.Index {
if otel {
renderWantRoute := func(dsType, dsDataset string, mode MappingMode) elasticsearch.Index {
if mode == MappingOTel {
dsDataset += ".otel"
}
return elasticsearch.NewDataStreamIndex(dsType, dsDataset, defaultDataStreamNamespace)
Expand All @@ -31,37 +31,37 @@ func createRouteTests(dsType string) []routeTestCase {
return []routeTestCase{
{
name: "default",
otel: false,
want: renderWantRoute(dsType, defaultDataStreamDataset, false),
mode: MappingNone,
want: renderWantRoute(dsType, defaultDataStreamDataset, MappingNone),
},
{
name: "otel",
otel: true,
want: renderWantRoute(dsType, defaultDataStreamDataset, true),
mode: MappingOTel,
want: renderWantRoute(dsType, defaultDataStreamDataset, MappingOTel),
},
{
name: "default with receiver scope name",
otel: false,
mode: MappingNone,
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
want: renderWantRoute(dsType, "hostmetricsreceiver", false),
want: renderWantRoute(dsType, "hostmetricsreceiver", MappingNone),
},
{
name: "otel with receiver scope name",
otel: true,
mode: MappingOTel,
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
want: renderWantRoute(dsType, "hostmetricsreceiver", true),
want: renderWantRoute(dsType, "hostmetricsreceiver", MappingOTel),
},
{
name: "default with non-receiver scope name",
otel: false,
mode: MappingNone,
scopeName: "some_other_scope_name",
want: renderWantRoute(dsType, defaultDataStreamDataset, false),
want: renderWantRoute(dsType, defaultDataStreamDataset, MappingNone),
},
{
name: "otel with non-receiver scope name",
otel: true,
mode: MappingOTel,
scopeName: "some_other_scope_name",
want: renderWantRoute(dsType, defaultDataStreamDataset, true),
want: renderWantRoute(dsType, defaultDataStreamDataset, MappingOTel),
},
}
}
Expand All @@ -71,7 +71,7 @@ func TestRouteLogRecord(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
router := dynamicDocumentRouter{otel: tc.otel}
router := dynamicDocumentRouter{mode: tc.mode}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

Expand All @@ -80,14 +80,42 @@ func TestRouteLogRecord(t *testing.T) {
assert.Equal(t, tc.want, ds)
})
}

t.Run("test data_stream.type for bodymap mode", func(t *testing.T) {
dsType := "metrics"
router := dynamicDocumentRouter{mode: MappingBodyMap}
attrs := pcommon.NewMap()
attrs.PutStr("data_stream.type", dsType)
ds, err := router.routeLogRecord(pcommon.NewResource(), pcommon.NewInstrumentationScope(), attrs)
require.NoError(t, err)
assert.Equal(t, dsType, ds.Type)
})
t.Run("test data_stream.type is not honored for other modes (except bodymap)", func(t *testing.T) {
dsType := "metrics"
router := dynamicDocumentRouter{mode: MappingOTel}
attrs := pcommon.NewMap()
attrs.PutStr("data_stream.type", dsType)
ds, err := router.routeLogRecord(pcommon.NewResource(), pcommon.NewInstrumentationScope(), attrs)
require.NoError(t, err)
assert.Equal(t, "logs", ds.Type) // should equal to logs
})

t.Run("test data_stream.type does not accept values other than logs/metrics", func(t *testing.T) {
dsType := "random"
router := dynamicDocumentRouter{mode: MappingBodyMap}
attrs := pcommon.NewMap()
attrs.PutStr("data_stream.type", dsType)
_, err := router.routeLogRecord(pcommon.NewResource(), pcommon.NewInstrumentationScope(), attrs)
require.Error(t, err, "data_stream.type cannot be other than logs or metrics")
})
}

func TestRouteDataPoint(t *testing.T) {
tests := createRouteTests(defaultDataStreamTypeMetrics)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
router := dynamicDocumentRouter{otel: tc.otel}
router := dynamicDocumentRouter{mode: tc.mode}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

Expand All @@ -103,7 +131,7 @@ func TestRouteSpan(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
router := dynamicDocumentRouter{otel: tc.otel}
router := dynamicDocumentRouter{mode: tc.mode}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

Expand Down

0 comments on commit ff465fd

Please sign in to comment.