Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kubernetes Metadata Extension #1583

Merged
merged 18 commits into from
Mar 17, 2025
13 changes: 13 additions & 0 deletions extension/k8smetadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Kubernetes Metadata

The Kubernetes Metadata utilizes a Kubernetes client to start an informer, which queries the Kubernetes API for EndpointSlices. The EndpointSlices are transformed to reduce storage and periodically updated.

> Kubernetes' EndpointSlice API provides a way to track network endpoints within a Kubernetes cluster. (https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/)
These network endpoints expose relevant Kubernetes metadata for service-exposed applications.

Pod IP → {Workload, Namespace, Node} mappings are stored.
- Workload: This is the application's name.
- Namespace: This is the Kubernetes namespace the application is in.
- Node: This is the Kubernetes node the application is in.

12 changes: 12 additions & 0 deletions extension/k8smetadata/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"go.opentelemetry.io/collector/component"
)

type Config struct{}

var _ component.Config = (*Config)(nil)
18 changes: 18 additions & 0 deletions extension/k8smetadata/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, confmap.New().Unmarshal(cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
103 changes: 103 additions & 0 deletions extension/k8smetadata/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"context"
"math/rand"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.uber.org/atomic"
"go.uber.org/zap"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
)

const (
deletionDelay = 2 * time.Minute
jitterKubernetesAPISeconds = 10
)

type KubernetesMetadata struct {
logger *zap.Logger
config *Config
ready atomic.Bool
safeStopCh *k8sclient.SafeChannel
mu sync.Mutex
endpointSliceWatcher *k8sclient.EndpointSliceWatcher
}

var _ extension.Extension = (*KubernetesMetadata)(nil)

func jitterSleep(seconds int) {
jitter := time.Duration(rand.Intn(seconds)) * time.Second // nolint:gosec
time.Sleep(jitter)
}

func (e *KubernetesMetadata) Start(_ context.Context, _ component.Host) error {
e.mu.Lock()
defer e.mu.Unlock()

e.logger.Debug("Starting k8smetadata extension...")

config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
e.logger.Error("Failed to create config", zap.Error(err))
}
e.logger.Debug("Kubernetes config built successfully")

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
e.logger.Error("Failed to create kubernetes client", zap.Error(err))
}
e.logger.Debug("Kubernetes clientset created successfully")

jitterSleep(jitterKubernetesAPISeconds)

timedDeleter := &k8sclient.TimedDeleter{Delay: deletionDelay}
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)

e.endpointSliceWatcher = k8sclient.NewEndpointSliceWatcher(e.logger, sharedInformerFactory, timedDeleter)
e.safeStopCh = &k8sclient.SafeChannel{Ch: make(chan struct{}), Closed: false}

e.logger.Debug("Starting EndpointSliceWatcher Run()")
e.endpointSliceWatcher.Run(e.safeStopCh.Ch)

e.logger.Debug("Waiting for EndpointSlice cache to sync...")
e.endpointSliceWatcher.WaitForCacheSync(e.safeStopCh.Ch)

e.logger.Debug("EndpointSlice cache synced, extension fully started")
e.ready.Store(true)

return nil
}

func (e *KubernetesMetadata) Shutdown(_ context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
e.safeStopCh.Close()
return nil
}

func (e *KubernetesMetadata) GetPodMetadata(ip string) k8sclient.PodMetadata {
pm, ok := e.endpointSliceWatcher.IPToPodMetadata.Load(ip)
if !ok {
e.logger.Debug("GetPodMetadata: no mapping found for IP", zap.String("ip", ip))
return k8sclient.PodMetadata{}
}
metadata := pm.(k8sclient.PodMetadata)
e.logger.Debug("GetPodMetadata: found metadata",
zap.String("ip", ip),
zap.String("workload", metadata.Workload),
zap.String("namespace", metadata.Namespace),
zap.String("node", metadata.Node),
)
return metadata
}
39 changes: 39 additions & 0 deletions extension/k8smetadata/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
)

func TestKubernetesMetadata_GetPodMetadata(t *testing.T) {
esw := &k8sclient.EndpointSliceWatcher{
IPToPodMetadata: &sync.Map{},
}

const testIP = "1.2.3.4"
expected := k8sclient.PodMetadata{
Workload: "my-workload",
Namespace: "my-namespace",
Node: "my-node",
}
esw.IPToPodMetadata.Store(testIP, expected)

kMeta := &KubernetesMetadata{
logger: zap.NewNop(),
endpointSliceWatcher: esw,
}

got := kMeta.GetPodMetadata(testIP)
assert.Equal(t, expected, got, "GetPodMetadata should return the stored PodMetadata for %s", testIP)

unknown := kMeta.GetPodMetadata("9.9.9.9")
assert.Equal(t, k8sclient.PodMetadata{}, unknown, "GetPodMetadata should return empty if the IP is not present")
}
50 changes: 50 additions & 0 deletions extension/k8smetadata/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
)

var (
TypeStr, _ = component.NewType("k8smetadata")
kubernetesMetadata *KubernetesMetadata
mutex sync.RWMutex
)

func GetKubernetesMetadata() *KubernetesMetadata {
mutex.RLock()
defer mutex.RUnlock()
if kubernetesMetadata != nil && kubernetesMetadata.ready.Load() {
return kubernetesMetadata
}
return nil
}

func NewFactory() extension.Factory {
return extension.NewFactory(
TypeStr,
createDefaultConfig,
createExtension,
component.StabilityLevelAlpha,
)
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createExtension(_ context.Context, settings extension.Settings, cfg component.Config) (extension.Extension, error) {
mutex.Lock()
defer mutex.Unlock()
kubernetesMetadata = &KubernetesMetadata{
logger: settings.Logger,
config: cfg.(*Config),
}
return kubernetesMetadata, nil
}
26 changes: 26 additions & 0 deletions extension/k8smetadata/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8smetadata

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/extension/extensiontest"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := NewFactory().CreateDefaultConfig()
assert.Equal(t, &Config{}, cfg)
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreate(t *testing.T) {
cfg := &Config{}
got, err := NewFactory().Create(context.Background(), extensiontest.NewNopSettings(), cfg)
assert.NoError(t, err)
assert.NotNil(t, got)
}
Loading