Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ec2tagger] Set refresh interval for describe volumes to 5m when configured #1578

Merged
merged 13 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/processors/ec2tagger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ The IAM User or Role making the calls must have permissions to call the EC2 Desc
The following receiver configuration parameters are supported.
| Name | Description | Supported Value | Default |
|--------------------------| ---------------------------------------------------------------------------------------------------------------| -----------------------------------------| --------|
|`refresh_interval_seconds`| is the frequency for the plugin to refresh the EC2 Instance Tags and ebs Volumes associated with this Instance.| "0s" | "0s" |
|`refresh_tags_interval` | is the frequency for the plugin to refresh the EC2 Instance Tags associated with this Instance. | "0s" | "0s" |
|`refresh_volumes_interval`| is the frequency for the plugin to refresh the EBS Volumes associated with this Instance. | "0s" | "0s" |
|`ec2_metadata_tags` | is the option to specify which tags to be scraped from IMDS and add to datapoint attributes | ["InstanceId", "ImageId", "InstanceType"]| [] |
|`ec2_instance_tag_keys` | is the option to specific which EC2 Instance tags to be scraped associated with this instance. | ["aws:autoscaling:groupName", "Name"] | [] |
|`disk_device_tag_key` | is the option to Specify which tags to use to get the specified disk device name from input metric | [] | [] |
Expand Down
3 changes: 2 additions & 1 deletion plugins/processors/ec2tagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
)

type Config struct {
RefreshIntervalSeconds time.Duration `mapstructure:"refresh_interval_seconds"`
RefreshTagsInterval time.Duration `mapstructure:"refresh_tags_interval"`
RefreshVolumesInterval time.Duration `mapstructure:"refresh_volumes_interval"`
EC2MetadataTags []string `mapstructure:"ec2_metadata_tags"`
EC2InstanceTagKeys []string `mapstructure:"ec2_instance_tag_keys"`
EBSDeviceKeys []string `mapstructure:"ebs_device_keys,omitempty"`
Expand Down
93 changes: 71 additions & 22 deletions plugins/processors/ec2tagger/ec2tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,29 +197,24 @@ func (t *Tagger) Shutdown(context.Context) error {
return nil
}

// refreshLoop handles the refresh ticks and also responds to shutdown signal
func (t *Tagger) refreshLoop(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
// refreshLoopTags handles the refresh ticks for describe tags and also responds to shutdown signal
func (t *Tagger) refreshLoopTags(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
refreshTicker := time.NewTicker(refreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
t.logger.Debug("ec2tagger refreshing")
t.logger.Debug("ec2tagger refreshing tags")
allTagsRetrieved := t.ec2TagsRetrieved()
allVolumesRetrieved := t.ebsVolumesRetrieved()
t.logger.Debug("Retrieve status",
zap.Bool("Ec2AllTagsRetrieved", allTagsRetrieved),
zap.Bool("EbsAllVolumesRetrieved", allVolumesRetrieved))
zap.Bool("Ec2AllTagsRetrieved", allTagsRetrieved))
refreshTags := len(t.EC2InstanceTagKeys) > 0
refreshVolumes := len(t.EBSDeviceKeys) > 0

if stopAfterFirstSuccess {
// need refresh tags when it is configured and not all ec2 tags are retrieved
refreshTags = refreshTags && !allTagsRetrieved
// need refresh volumes when it is configured and not all volumes are retrieved
refreshVolumes = refreshVolumes && !allVolumesRetrieved
if !refreshTags && !refreshVolumes {
t.logger.Info("ec2tagger: Refresh is no longer needed, stop refreshTicker.")
if !refreshTags {
t.logger.Info("ec2tagger: Refresh for tags is no longer needed, stop refreshTicker.")
return
}
}
Expand All @@ -230,6 +225,34 @@ func (t *Tagger) refreshLoop(refreshInterval time.Duration, stopAfterFirstSucces
}
}

case <-t.shutdownC:
return
}
}
}

// refreshLoopVolumes handles the refresh ticks for describe volumes and also responds to shutdown signal
func (t *Tagger) refreshLoopVolumes(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
refreshTicker := time.NewTicker(refreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
t.logger.Debug("ec2tagger refreshing volumes")
allVolumesRetrieved := t.ebsVolumesRetrieved()
t.logger.Debug("Retrieve status",
zap.Bool("EbsAllVolumesRetrieved", allVolumesRetrieved))
refreshVolumes := len(t.EBSDeviceKeys) > 0

if stopAfterFirstSuccess {
// need refresh volumes when it is configured and not all volumes are retrieved
refreshVolumes = refreshVolumes && !allVolumesRetrieved
if !refreshVolumes {
t.logger.Info("ec2tagger: Refresh for volumes is no longer needed, stop refreshTicker.")
return
}
}

if refreshVolumes {
if err := t.updateVolumes(); err != nil {
t.logger.Warn("ec2tagger: Error refreshing EBS volumes, keeping old values", zap.Error(err))
Expand Down Expand Up @@ -333,7 +356,8 @@ func (t *Tagger) Start(ctx context.Context, host component.Host) error {

go func() { //Async start of initial retrieval to prevent block of agent start
t.initialRetrievalOfTagsAndVolumes()
t.refreshLoopToUpdateTagsAndVolumes()
t.refreshLoopToUpdateTags()
t.refreshLoopToUpdateVolumes()
}()
t.logger.Info("ec2tagger: EC2 tagger has started initialization.")

Expand All @@ -343,24 +367,49 @@ func (t *Tagger) Start(ctx context.Context, host component.Host) error {
return nil
}

func (t *Tagger) refreshLoopToUpdateTagsAndVolumes() {
func (t *Tagger) refreshLoopToUpdateTags() {
needRefresh := false
stopAfterFirstSuccess := false
refreshInterval := t.RefreshIntervalSeconds

if t.RefreshIntervalSeconds.Seconds() == 0 {
refreshInterval := t.RefreshTagsInterval
if refreshInterval.Seconds() == 0 {
//when the refresh interval is 0, this means that customer don't want to
//update tags/volumes values once they are retrieved successfully. In this case,
//update tags values once they are retrieved successfully. In this case,
//we still want to do refresh to make sure all the specified keys for tags/volumes
//are fetched successfully because initial retrieval might not get all of them.
//When the specified key is "*", there is no way for us to check if all
//tags/volumes are fetched. So there is no need to do refresh in this case.
needRefresh = !(len(t.EC2InstanceTagKeys) == 1 && t.EC2InstanceTagKeys[0] == "*") ||
!(len(t.EBSDeviceKeys) == 1 && t.EBSDeviceKeys[0] == "*")
//tags are fetched. So there is no need to do refresh in this case.
needRefresh = !(len(t.EC2InstanceTagKeys) == 1 && t.EC2InstanceTagKeys[0] == "*")

stopAfterFirstSuccess = true
refreshInterval = defaultRefreshInterval
} else if refreshInterval.Seconds() > 0 {
//customer wants to update the tags with the given refresh interval
needRefresh = true
}

if needRefresh {
go func() {
// randomly stagger the time of the first refresh to mitigate throttling if a whole fleet is
// restarted at the same time
sleepUntilHostJitter(refreshInterval)
t.refreshLoopTags(refreshInterval, stopAfterFirstSuccess)
}()
}
}

func (t *Tagger) refreshLoopToUpdateVolumes() {
needRefresh := false
stopAfterFirstSuccess := false

refreshInterval := t.RefreshVolumesInterval
if refreshInterval.Seconds() == 0 {
needRefresh = !(len(t.EBSDeviceKeys) == 1 && t.EBSDeviceKeys[0] == "*")

stopAfterFirstSuccess = true
refreshInterval = defaultRefreshInterval
} else if t.RefreshIntervalSeconds.Seconds() > 0 {
//customer wants to update the tags/volumes with the given refresh interval
} else if refreshInterval.Seconds() > 0 {
//customer wants to update the volumes with the given refresh interval
needRefresh = true
}

Expand All @@ -369,7 +418,7 @@ func (t *Tagger) refreshLoopToUpdateTagsAndVolumes() {
// randomly stagger the time of the first refresh to mitigate throttling if a whole fleet is
// restarted at the same time
sleepUntilHostJitter(refreshInterval)
t.refreshLoop(refreshInterval, stopAfterFirstSuccess)
t.refreshLoopVolumes(refreshInterval, stopAfterFirstSuccess)
}()
}
}
Expand Down
21 changes: 14 additions & 7 deletions plugins/processors/ec2tagger/ec2tagger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ func TestStartFailWithNoMetadata(t *testing.T) {
// run Start() and check all tags/volumes are retrieved and saved
func TestStartSuccessWithNoTagsVolumesUpdate(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.RefreshIntervalSeconds = 0 * time.Second
cfg.RefreshTagsInterval = 0 * time.Second
cfg.RefreshVolumesInterval = 0 * time.Second
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
cfg.EBSDeviceKeys = []string{device1, device2}
Expand Down Expand Up @@ -326,7 +327,8 @@ func TestStartSuccessWithNoTagsVolumesUpdate(t *testing.T) {
func TestStartSuccessWithTagsVolumesUpdate(t *testing.T) {
cfg := createDefaultConfig().(*Config)
//use millisecond rather than second to speed up test execution
cfg.RefreshIntervalSeconds = 20 * time.Millisecond
cfg.RefreshTagsInterval = 20 * time.Millisecond
cfg.RefreshVolumesInterval = 20 * time.Millisecond
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
cfg.EBSDeviceKeys = []string{device1, device2}
Expand Down Expand Up @@ -382,7 +384,8 @@ func TestStartSuccessWithTagsVolumesUpdate(t *testing.T) {
// check there is no attempt to fetch all tags/volumes
func TestStartSuccessWithWildcardTagVolumeKey(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.RefreshIntervalSeconds = 0 * time.Second
cfg.RefreshTagsInterval = 0 * time.Second
cfg.RefreshVolumesInterval = 0 * time.Second
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{"*"}
cfg.EBSDeviceKeys = []string{"*"}
Expand Down Expand Up @@ -426,7 +429,8 @@ func TestStartSuccessWithWildcardTagVolumeKey(t *testing.T) {
func TestApplyWithTagsVolumesUpdate(t *testing.T) {
cfg := createDefaultConfig().(*Config)
//use millisecond rather than second to speed up test execution
cfg.RefreshIntervalSeconds = 20 * time.Millisecond
cfg.RefreshTagsInterval = 20 * time.Millisecond
cfg.RefreshVolumesInterval = 20 * time.Millisecond
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
cfg.EBSDeviceKeys = []string{device1, device2}
Expand Down Expand Up @@ -520,7 +524,8 @@ func TestApplyWithTagsVolumesUpdate(t *testing.T) {
// Test metrics are dropped before the initial retrieval is done
func TestMetricsDroppedBeforeStarted(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.RefreshIntervalSeconds = 0 * time.Millisecond
cfg.RefreshTagsInterval = 0 * time.Millisecond
cfg.RefreshVolumesInterval = 0 * time.Millisecond
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{"*"}
cfg.EBSDeviceKeys = []string{"*"}
Expand Down Expand Up @@ -585,7 +590,8 @@ func TestMetricsDroppedBeforeStarted(t *testing.T) {
// Test ec2tagger Start does not block for a long time
func TestTaggerStartDoesNotBlock(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.RefreshIntervalSeconds = 0 * time.Second
cfg.RefreshTagsInterval = 0 * time.Second
cfg.RefreshVolumesInterval = 0 * time.Second
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
cfg.EC2InstanceTagKeys = []string{"*"}
cfg.EBSDeviceKeys = []string{"*"}
Expand Down Expand Up @@ -628,7 +634,8 @@ func TestTaggerStartDoesNotBlock(t *testing.T) {
// Test ec2tagger Start does not block for a long time
func TestTaggerStartsWithoutTagOrVolume(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.RefreshIntervalSeconds = 0 * time.Second
cfg.RefreshTagsInterval = 0 * time.Second
cfg.RefreshVolumesInterval = 0 * time.Second
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
_, cancel := context.WithCancel(context.Background())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ processors:
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_cpu:
collection_interval: 1m0s
Expand Down Expand Up @@ -90,11 +91,11 @@ service:
- ec2tagger
- awsentity/resource
receivers:
- telegraf_cpu
- telegraf_disk
- telegraf_mem
- telegraf_netstat
- telegraf_swap
- telegraf_cpu
metrics/hostDeltaMetrics:
exporters:
- awscloudwatch
Expand Down
11 changes: 6 additions & 5 deletions translator/tocwconfig/sampleConfig/advanced_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ processors:
ec2_instance_tag_keys:
- AutoScalingGroupName
ec2_metadata_tags:
- InstanceType
- ImageId
- InstanceId
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_cpu:
collection_interval: 1m0s
Expand Down Expand Up @@ -98,13 +99,13 @@ service:
- ec2tagger
- awsentity/resource
receivers:
- telegraf_disk
- telegraf_mem
- telegraf_netstat
- telegraf_swap
- telegraf_ethtool
- telegraf_nvidia_smi
- telegraf_cpu
- telegraf_disk
- telegraf_mem
- telegraf_netstat
metrics/hostDeltaMetrics:
exporters:
- awscloudwatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ processors:
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_win_perf_counters/1492679118:
alias_name: Memory
Expand Down Expand Up @@ -91,13 +92,13 @@ service:
- ec2tagger
- awsentity/resource
receivers:
- telegraf_win_perf_counters/1492679118
- telegraf_win_perf_counters/3610923661
- telegraf_win_perf_counters/3446270237
- telegraf_win_perf_counters/3762679655
- telegraf_win_perf_counters/2073218482
- telegraf_win_perf_counters/2039663244
- telegraf_win_perf_counters/4283769065
- telegraf_win_perf_counters/1492679118
- telegraf_win_perf_counters/3610923661
- telegraf_win_perf_counters/3446270237
telemetry:
logs:
development: false
Expand Down
3 changes: 2 additions & 1 deletion translator/tocwconfig/sampleConfig/amp_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ processors:
- ImageId
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
rollup:
attribute_groups:
- - ImageId
Expand Down
5 changes: 3 additions & 2 deletions translator/tocwconfig/sampleConfig/basic_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ processors:
ec2_instance_tag_keys:
- AutoScalingGroupName
ec2_metadata_tags:
- InstanceType
- ImageId
- InstanceId
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_disk:
collection_interval: 1m0s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ processors:
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_win_perf_counters/1492679118:
alias_name: Memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ processors:
ec2_instance_tag_keys:
- AutoScalingGroupName
ec2_metadata_tags:
- InstanceType
- ImageId
- InstanceId
- InstanceType
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
refresh_tags_interval: 0s
refresh_volumes_interval: 0s
receivers:
telegraf_socket_listener:
collection_interval: 10s
Expand All @@ -75,8 +76,8 @@ service:
- ec2tagger
- awsentity/service/telegraf
receivers:
- telegraf_socket_listener
- telegraf_statsd
- telegraf_socket_listener
telemetry:
logs:
development: false
Expand Down
Loading