Skip to content

Commit

Permalink
[processor/resourcedetection] introduce retry mechanism for detectors (
Browse files Browse the repository at this point in the history
…#37506)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Introduce retry mechanism for detectors.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #34761

---------

Signed-off-by: odubajDT <ondrej.dubaj@dynatrace.com>
  • Loading branch information
odubajDT authored Feb 25, 2025
1 parent a826350 commit e289237
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 38 deletions.
27 changes: 27 additions & 0 deletions .chloggen/docker-resource-failure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/resourcedetection

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Introduce retry logic for failed resource detection."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34761]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions connector/datadogconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ require (
github.com/bmatcuk/doublestar/v4 v4.8.1 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/datadogexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/datadogexporter/integrationtest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions processor/resourcedetectionprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.29.7
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.29
github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1
github.com/cenkalti/backoff/v5 v5.0.2
github.com/google/go-cmp v0.6.0
github.com/hashicorp/consul/api v1.31.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.120.1
Expand Down
2 changes: 2 additions & 0 deletions processor/resourcedetectionprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 45 additions & 10 deletions processor/resourcedetectionprocessor/internal/resourcedetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

backoff "github.com/cenkalti/backoff/v5"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -120,31 +122,64 @@ func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resour
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)
p.detectResource(ctx, client.Timeout)
})

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}

func (p *ResourceProvider) detectResource(ctx context.Context) {
func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Duration) {
p.detectedResource = &resourceResult{}

res := pcommon.NewResource()
mergedSchemaURL := ""

p.logger.Info("began detecting resource information")

for _, detector := range p.detectors {
r, schemaURL, err := detector.Detect(ctx)
if err != nil {
p.logger.Warn("failed to detect resource", zap.Error(err))
resultsChan := make([]chan resourceResult, len(p.detectors))
for i, detector := range p.detectors {
resultsChan[i] = make(chan resourceResult)
go func(detector Detector) {
sleep := backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 1.5,
Multiplier: 2,
MaxInterval: timeout,
}
sleep.Reset()
var err error
var r pcommon.Resource
var schemaURL string
for {
r, schemaURL, err = detector.Detect(ctx)
if err == nil {
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: nil}
return
}
p.logger.Warn("failed to detect resource", zap.Error(err))

timer := time.NewTimer(sleep.NextBackOff())
select {
case <-timer.C:
fmt.Println("Retrying fetching data...")
case <-ctx.Done():
p.logger.Warn("Context was cancelled: %w", zap.Error(ctx.Err()))
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: err}
return
}
}
}(detector)
}

for _, ch := range resultsChan {
result := <-ch
if result.err != nil {
if allowErrorPropagationFeatureGate.IsEnabled() {
p.detectedResource.err = err
return
p.detectedResource.err = errors.Join(p.detectedResource.err, result.err)
}
} else {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
MergeResource(res, r, false)
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.resource, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDetect(t *testing.T) {
p, err := f.CreateResourceProvider(processortest.NewNopSettings(metadata.Type), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...)
require.NoError(t, err)

got, _, err := p.Get(context.Background(), http.DefaultClient)
got, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
require.NoError(t, err)

assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw())
Expand All @@ -133,38 +133,46 @@ func TestDetectResource_DetectorFactoryError(t *testing.T) {
require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed"))
}

func TestDetectResource_Error(t *testing.T) {
md1 := &MockDetector{}
res := pcommon.NewResource()
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(res, nil)

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
_, _, err := p.Get(context.Background(), http.DefaultClient)
require.NoError(t, err)
}

func TestDetectResource_Error_PropagationEnabled(t *testing.T) {
func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) {
err := featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(allowErrorPropagationFeatureGate.ID(), false)
}()

md1 := &MockDetector{}
res := pcommon.NewResource()
require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(res, nil)
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1"))
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)
_, _, err = p.Get(context.Background(), http.DefaultClient)

var cancel context.CancelFunc
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()

_, _, err = p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
require.Error(t, err)
require.Contains(t, err.Error(), "err1")
require.Contains(t, err.Error(), "err2")
}

func TestDetectResource_Error_ContextDeadline_WithoutErrPropagation(t *testing.T) {
md1 := &MockDetector{}
md1.On("Detect").Return(pcommon.NewResource(), fmt.Errorf("err1"))

md2 := &MockDetector{}
md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2"))

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

var cancel context.CancelFunc
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()

_, _, err := p.Get(ctx, &http.Client{Timeout: 10 * time.Second})
require.NoError(t, err)
}

func TestMergeResource(t *testing.T) {
Expand Down Expand Up @@ -230,20 +238,17 @@ func TestDetectResource_Parallel(t *testing.T) {
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"}))
md2.On("Detect").Return(res2, nil)

md3 := NewMockParallelDetector()
md3.On("Detect").Return(pcommon.NewResource(), errors.New("an error"))

expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3)
p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

// call p.Get multiple times
wg := &sync.WaitGroup{}
wg.Add(iterations)
for i := 0; i < iterations; i++ {
go func() {
defer wg.Done()
detected, _, err := p.Get(context.Background(), http.DefaultClient)
detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 10 * time.Second})
assert.NoError(t, err)
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())
}()
Expand All @@ -255,13 +260,36 @@ func TestDetectResource_Parallel(t *testing.T) {
// detector.Detect should only be called once, so we only need to notify each channel once
md1.ch <- struct{}{}
md2.ch <- struct{}{}
md3.ch <- struct{}{}

// then wait until all goroutines are finished, and ensure p.Detect was only called once
wg.Wait()
md1.AssertNumberOfCalls(t, "Detect", 1)
md2.AssertNumberOfCalls(t, "Detect", 1)
md3.AssertNumberOfCalls(t, "Detect", 1)
}

func TestDetectResource_Reconnect(t *testing.T) {
md1 := &MockDetector{}
res1 := pcommon.NewResource()
require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"}))
md1.On("Detect").Return(pcommon.NewResource(), errors.New("connection error1")).Twice()
md1.On("Detect").Return(res1, nil)

md2 := &MockDetector{}
res2 := pcommon.NewResource()
require.NoError(t, res2.Attributes().FromRaw(map[string]any{"c": "3"}))
md2.On("Detect").Return(pcommon.NewResource(), errors.New("connection error2")).Once()
md2.On("Detect").Return(res2, nil)

expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"}

p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2)

detected, _, err := p.Get(context.Background(), &http.Client{Timeout: 15 * time.Second})
assert.NoError(t, err)
assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw())

md1.AssertNumberOfCalls(t, "Detect", 3) // 2 errors + 1 success
md2.AssertNumberOfCalls(t, "Detect", 2) // 1 error + 1 success
}

func TestFilterAttributes_Match(t *testing.T) {
Expand Down

0 comments on commit e289237

Please sign in to comment.