Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: aws/amazon-cloudwatch-agent
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a5ea567f1b7d20b5d42ce97ff54944cb7b8a0e5b
Choose a base ref
..
head repository: aws/amazon-cloudwatch-agent
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 512f3f39c1b353c129e88bddb25aa8fa942d5a87
Choose a head ref
21 changes: 17 additions & 4 deletions plugins/processors/awsentity/processor.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ const (
attributeService = "Service"
attributeEC2TagAwsAutoscalingGroupName = "ec2.tag.aws:autoscaling:groupName"
EMPTY = ""
unknownService = "unknown_service"
)

type scraper interface {
@@ -187,13 +188,18 @@ func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metric
}

podInfo, ok := p.k8sscraper.(*k8sattributescraper.K8sAttributeScraper)
// Perform fallback mechanism for service and environment name if they
// are empty
if entityServiceName == EMPTY && ok && podInfo != nil && podInfo.Workload != EMPTY {
// Perform fallback mechanism for service name if it is empty
// or has prefix unknown_service ( unknown_service will be set by OTEL SDK if the service name is empty on application pod)
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
if (entityServiceName == EMPTY || strings.HasPrefix(entityServiceName, unknownService)) && ok && podInfo != nil && podInfo.Workload != EMPTY {
entityServiceName = podInfo.Workload
entityServiceNameSource = entitystore.ServiceNameSourceK8sWorkload
}

// Set the service name source to Instrumentation if the operator doesn't set it
if entityServiceName != EMPTY && entityServiceNameSource == EMPTY && getTelemetrySDKEnabledAttribute(resourceAttrs) {
entityServiceNameSource = entitystore.ServiceNameSourceInstrumentation
}
// Perform fallback mechanism for environment if it is empty
if entityEnvironmentName == EMPTY && ok && podInfo.Cluster != EMPTY && podInfo.Namespace != EMPTY {
if p.config.KubernetesMode == config.ModeEKS {
entityEnvironmentName = "eks:" + p.config.ClusterName + "/" + podInfo.Namespace
@@ -491,6 +497,13 @@ func getServiceAttributes(p pcommon.Map) string {
return EMPTY
}

func getTelemetrySDKEnabledAttribute(p pcommon.Map) bool {
if _, ok := p.Get(semconv.AttributeTelemetrySDKName); ok {
return true
}
return false
}

// scrapeK8sPodName gets the k8s pod name which is full pod name from the resource attributes
// This is needed to map the pod to the service/environment
func scrapeK8sPodName(p pcommon.Map) string {
104 changes: 104 additions & 0 deletions plugins/processors/awsentity/processor_test.go
Original file line number Diff line number Diff line change
@@ -198,6 +198,12 @@ func TestProcessMetricsForAddingPodToServiceMap(t *testing.T) {
want: map[string]entitystore.ServiceEnvironment{"cloudwatch-agent-adhgaf": {ServiceName: "test-service", ServiceNameSource: entitystore.ServiceNameSourceUnknown}},
k8sMode: config.ModeEKS,
},
{
name: "WithPodNameAndServiceNameNoSourceWithTelemetryEnabled",
metrics: generateMetrics(attributeServiceName, "test-service", semconv.AttributeK8SPodName, "cloudwatch-agent-adhgaf", semconv.AttributeTelemetrySDKName, "opentelemetry"),
want: map[string]entitystore.ServiceEnvironment{"cloudwatch-agent-adhgaf": {ServiceName: "test-service", ServiceNameSource: entitystore.ServiceNameSourceInstrumentation}},
k8sMode: config.ModeEKS,
},
{
name: "WithPodNameAndServiceNameHasSource",
metrics: generateMetrics(attributeServiceName, "test-service", semconv.AttributeK8SPodName, "cloudwatch-agent-adhgaf", entityattributes.AttributeEntityServiceNameSource, "Instrumentation"),
@@ -375,6 +381,104 @@ func TestProcessMetricsResourceAttributeScraping(t *testing.T) {
semconv.AttributeK8SNodeName: "test-node",
},
},
{
name: "ResourceAttributeWorkloadFallbackForUnknownService",
kubernetesMode: config.ModeEKS,
clusterName: "test-cluster",
metrics: generateMetrics(semconv.AttributeK8SNamespaceName, "test-namespace", semconv.AttributeK8SDeploymentName, "test-workload", semconv.AttributeK8SNodeName, "test-node", semconv.AttributeServiceName, "unknown_service"),
want: map[string]any{
entityattributes.AttributeEntityType: "Service",
entityattributes.AttributeEntityServiceName: "test-workload",
entityattributes.AttributeEntityDeploymentEnvironment: "eks:test-cluster/test-namespace",
entityattributes.AttributeEntityCluster: "test-cluster",
entityattributes.AttributeEntityNamespace: "test-namespace",
entityattributes.AttributeEntityNode: "test-node",
entityattributes.AttributeEntityWorkload: "test-workload",
entityattributes.AttributeEntityServiceNameSource: "K8sWorkload",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
semconv.AttributeK8SNamespaceName: "test-namespace",
semconv.AttributeK8SDeploymentName: "test-workload",
semconv.AttributeK8SNodeName: "test-node",
attributeServiceName: "unknown_service",
},
},
{
name: "ResourceAttributeWorkloadFallbackForUnknownServiceJava",
kubernetesMode: config.ModeEKS,
clusterName: "test-cluster",
metrics: generateMetrics(semconv.AttributeK8SNamespaceName, "test-namespace", semconv.AttributeK8SDeploymentName, "test-workload", semconv.AttributeK8SNodeName, "test-node", semconv.AttributeServiceName, "unknown_service:java"),
want: map[string]any{
entityattributes.AttributeEntityType: "Service",
entityattributes.AttributeEntityServiceName: "test-workload",
entityattributes.AttributeEntityDeploymentEnvironment: "eks:test-cluster/test-namespace",
entityattributes.AttributeEntityCluster: "test-cluster",
entityattributes.AttributeEntityNamespace: "test-namespace",
entityattributes.AttributeEntityNode: "test-node",
entityattributes.AttributeEntityWorkload: "test-workload",
entityattributes.AttributeEntityServiceNameSource: "K8sWorkload",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
semconv.AttributeK8SNamespaceName: "test-namespace",
semconv.AttributeK8SDeploymentName: "test-workload",
semconv.AttributeK8SNodeName: "test-node",
attributeServiceName: "unknown_service:java",
},
},
{
name: "ResourceAttributeWithUnknownServiceNegativeCase",
kubernetesMode: config.ModeEKS,
clusterName: "test-cluster",
metrics: generateMetrics(semconv.AttributeK8SNamespaceName, "test-namespace", semconv.AttributeK8SDeploymentName, "test-workload", semconv.AttributeK8SNodeName, "test-node", semconv.AttributeServiceName, "unknown_servic"),
want: map[string]any{
entityattributes.AttributeEntityType: "Service",
entityattributes.AttributeEntityServiceName: "unknown_servic",
entityattributes.AttributeEntityDeploymentEnvironment: "eks:test-cluster/test-namespace",
entityattributes.AttributeEntityCluster: "test-cluster",
entityattributes.AttributeEntityNamespace: "test-namespace",
entityattributes.AttributeEntityNode: "test-node",
entityattributes.AttributeEntityWorkload: "test-workload",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
semconv.AttributeK8SNamespaceName: "test-namespace",
semconv.AttributeK8SDeploymentName: "test-workload",
semconv.AttributeK8SNodeName: "test-node",
attributeServiceName: "unknown_servic",
},
},
{
name: "ResourceAttributeWorkloadFallbackForUnknownServiceJava",
kubernetesMode: config.ModeEKS,
clusterName: "test-cluster",
metrics: generateMetrics(semconv.AttributeK8SNamespaceName, "test-namespace", semconv.AttributeK8SNodeName, "test-node", semconv.AttributeServiceName, "unknown_service:java"),
want: map[string]any{
entityattributes.AttributeEntityType: "Service",
entityattributes.AttributeEntityServiceName: "unknown_service:java",
entityattributes.AttributeEntityDeploymentEnvironment: "eks:test-cluster/test-namespace",
semconv.AttributeK8SNamespaceName: "test-namespace",
semconv.AttributeK8SNodeName: "test-node",
attributeServiceName: "unknown_service:java",
},
},
{
name: "ResourceAttributeTelemetrySDKEnabled",
kubernetesMode: config.ModeEKS,
clusterName: "test-cluster",
metrics: generateMetrics(semconv.AttributeK8SNamespaceName, "test-namespace", semconv.AttributeK8SDeploymentName, "test-workload", semconv.AttributeK8SNodeName, "test-node", attributeServiceName, "test-service", semconv.AttributeTelemetrySDKName, "opentelemetry"),
want: map[string]any{
entityattributes.AttributeEntityType: "Service",
entityattributes.AttributeEntityServiceName: "test-service",
entityattributes.AttributeEntityDeploymentEnvironment: "eks:test-cluster/test-namespace",
entityattributes.AttributeEntityCluster: "test-cluster",
entityattributes.AttributeEntityNamespace: "test-namespace",
entityattributes.AttributeEntityNode: "test-node",
entityattributes.AttributeEntityWorkload: "test-workload",
entityattributes.AttributeEntityServiceNameSource: "Instrumentation",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
semconv.AttributeK8SNamespaceName: "test-namespace",
semconv.AttributeK8SDeploymentName: "test-workload",
semconv.AttributeK8SNodeName: "test-node",
attributeServiceName: "test-service",
semconv.AttributeTelemetrySDKName: "opentelemetry",
},
},
{
name: "ResourceAttributeEnvironmentFallbackToASG",
platform: config.ModeEC2,
14 changes: 13 additions & 1 deletion translator/translate/otel/common/appsignals.go
Original file line number Diff line number Diff line change
@@ -3,11 +3,23 @@

package common

import "os"
import (
"os"

"go.opentelemetry.io/collector/confmap"
)

const KubernetesEnvVar = "K8S_NAMESPACE"

func IsAppSignalsKubernetes() bool {
_, isSet := os.LookupEnv(KubernetesEnvVar)
return isSet
}

func GetHostedIn(conf *confmap.Conf) (string, bool) {
hostedIn, hostedInConfigured := GetString(conf, ConfigKey(LogsKey, MetricsCollectedKey, AppSignals, "hosted_in"))
if !hostedInConfigured {
hostedIn, hostedInConfigured = GetString(conf, ConfigKey(LogsKey, MetricsCollectedKey, AppSignalsFallback, "hosted_in"))
}
return hostedIn, hostedInConfigured
}
17 changes: 17 additions & 0 deletions translator/translate/otel/common/common.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ package common
import (
"container/list"
"fmt"
"os"
"reflect"
"strconv"
"strings"
@@ -15,6 +16,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"
"gopkg.in/yaml.v3"

"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
)

const (
@@ -478,3 +481,17 @@ func IsAnySet(conf *confmap.Conf, keys []string) bool {
func KueueContainerInsightsEnabled(conf *confmap.Conf) bool {
return GetOrDefaultBool(conf, ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey, EnableKueueContainerInsights), false)
}

func GetClusterName(conf *confmap.Conf) string {
val, ok := GetString(conf, ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey, "cluster_name"))
if ok && val != "" {
return val
}

envVarClusterName := os.Getenv("K8S_CLUSTER_NAME")
if envVarClusterName != "" {
return envVarClusterName
}

return util.GetClusterNameFromEc2Tagger()
}
3 changes: 1 addition & 2 deletions translator/translate/otel/exporter/awsemf/prometheus.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (
"go.opentelemetry.io/collector/confmap"

"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)
@@ -40,7 +39,7 @@ func setPrometheusLogGroup(conf *confmap.Conf, cfg *awsemfexporter.Config) error
}
} else {

if clusterName := util.GetClusterNameFromEc2Tagger(); clusterName != "" {
if clusterName := common.GetClusterName(conf); clusterName != "" {
cfg.LogGroupName = fmt.Sprintf(eksDefaultLogGroupFormat, clusterName)
}
}
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ import (
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)
@@ -65,15 +64,10 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
configKey := common.AppSignalsConfigKeys[t.signal]
cfg := t.factory.CreateDefaultConfig().(*appsignalsconfig.Config)

hostedInConfigKey := common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in")
hostedIn, hostedInConfigured := common.GetString(conf, hostedInConfigKey)
if !hostedInConfigured {
hostedInConfigKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in")
hostedIn, hostedInConfigured = common.GetString(conf, hostedInConfigKey)
}
hostedIn, hostedInConfigured := common.GetHostedIn(conf)
if common.IsAppSignalsKubernetes() {
if !hostedInConfigured {
hostedIn = util.GetClusterNameFromEc2Tagger()
hostedIn = common.GetClusterName(conf)
}
}

33 changes: 12 additions & 21 deletions translator/translate/otel/processor/awsentity/translator.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ import (

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)
@@ -55,8 +54,10 @@ func (t *translator) ID() component.ID {
}

func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
ctx := context.CurrentContext()

// Do not send entity for ECS
if context.CurrentContext().RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() {
if ctx.RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() {
return nil, nil
}

@@ -70,32 +71,22 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
cfg.ScrapeDatapointAttribute = true
}

hostedInConfigKey := common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in")
hostedIn, hostedInConfigured := common.GetString(conf, hostedInConfigKey)
if !hostedInConfigured {
hostedInConfigKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in")
hostedIn, hostedInConfigured = common.GetString(conf, hostedInConfigKey)
}
if common.IsAppSignalsKubernetes() {
if !hostedInConfigured {
hostedIn = util.GetClusterNameFromEc2Tagger()
}
}

//TODO: This logic is more or less identical to what AppSignals does. This should be moved to a common place for reuse
ctx := context.CurrentContext()
mode := ctx.KubernetesMode()
cfg.KubernetesMode = mode
cfg.KubernetesMode = ctx.KubernetesMode()

mode = ctx.Mode()
if cfg.KubernetesMode != "" {
cfg.ClusterName = hostedIn
clusterName, clusterNameConfigured := common.GetHostedIn(conf)

if !clusterNameConfigured {
clusterName = common.GetClusterName(conf)
}

cfg.ClusterName = clusterName
}

// We want to keep platform config variable to be
// anything that is non-Kubernetes related so the
// processor can perform different logics for EKS
// in EC2 or Non-EC2
cfg.Platform = mode
cfg.Platform = ctx.Mode()
return cfg, nil
}
Loading