Skip to content

Commit f51ad4a

Browse files
authored
[ec2tagger] Set refresh interval for describe volumes to 5m when configured (#1578)
1 parent b1d223a commit f51ad4a

28 files changed

+285
-201
lines changed

plugins/processors/ec2tagger/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ The IAM User or Role making the calls must have permissions to call the EC2 Desc
3030
The following receiver configuration parameters are supported.
3131
| Name | Description | Supported Value | Default |
3232
|--------------------------| ---------------------------------------------------------------------------------------------------------------| -----------------------------------------| --------|
33-
|`refresh_interval_seconds`| is the frequency for the plugin to refresh the EC2 Instance Tags and ebs Volumes associated with this Instance.| "0s" | "0s" |
33+
|`refresh_tags_interval` | is the frequency for the plugin to refresh the EC2 Instance Tags associated with this Instance. | "0s" | "0s" |
34+
|`refresh_volumes_interval`| is the frequency for the plugin to refresh the EBS Volumes associated with this Instance. | "0s" | "0s" |
3435
|`ec2_metadata_tags` | is the option to specify which tags to be scraped from IMDS and add to datapoint attributes | ["InstanceId", "ImageId", "InstanceType"]| [] |
3536
|`ec2_instance_tag_keys` | is the option to specific which EC2 Instance tags to be scraped associated with this instance. | ["aws:autoscaling:groupName", "Name"] | [] |
3637
|`disk_device_tag_key` | is the option to Specify which tags to use to get the specified disk device name from input metric | [] | [] |

plugins/processors/ec2tagger/config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ const (
2222
)
2323

2424
type Config struct {
25-
RefreshIntervalSeconds time.Duration `mapstructure:"refresh_interval_seconds"`
25+
RefreshTagsInterval time.Duration `mapstructure:"refresh_tags_interval"`
26+
RefreshVolumesInterval time.Duration `mapstructure:"refresh_volumes_interval"`
2627
EC2MetadataTags []string `mapstructure:"ec2_metadata_tags"`
2728
EC2InstanceTagKeys []string `mapstructure:"ec2_instance_tag_keys"`
2829
EBSDeviceKeys []string `mapstructure:"ebs_device_keys,omitempty"`

plugins/processors/ec2tagger/ec2tagger.go

+71-22
Original file line numberDiff line numberDiff line change
@@ -197,29 +197,24 @@ func (t *Tagger) Shutdown(context.Context) error {
197197
return nil
198198
}
199199

200-
// refreshLoop handles the refresh ticks and also responds to shutdown signal
201-
func (t *Tagger) refreshLoop(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
200+
// refreshLoopTags handles the refresh ticks for describe tags and also responds to shutdown signal
201+
func (t *Tagger) refreshLoopTags(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
202202
refreshTicker := time.NewTicker(refreshInterval)
203203
defer refreshTicker.Stop()
204204
for {
205205
select {
206206
case <-refreshTicker.C:
207-
t.logger.Debug("ec2tagger refreshing")
207+
t.logger.Debug("ec2tagger refreshing tags")
208208
allTagsRetrieved := t.ec2TagsRetrieved()
209-
allVolumesRetrieved := t.ebsVolumesRetrieved()
210209
t.logger.Debug("Retrieve status",
211-
zap.Bool("Ec2AllTagsRetrieved", allTagsRetrieved),
212-
zap.Bool("EbsAllVolumesRetrieved", allVolumesRetrieved))
210+
zap.Bool("Ec2AllTagsRetrieved", allTagsRetrieved))
213211
refreshTags := len(t.EC2InstanceTagKeys) > 0
214-
refreshVolumes := len(t.EBSDeviceKeys) > 0
215212

216213
if stopAfterFirstSuccess {
217214
// need refresh tags when it is configured and not all ec2 tags are retrieved
218215
refreshTags = refreshTags && !allTagsRetrieved
219-
// need refresh volumes when it is configured and not all volumes are retrieved
220-
refreshVolumes = refreshVolumes && !allVolumesRetrieved
221-
if !refreshTags && !refreshVolumes {
222-
t.logger.Info("ec2tagger: Refresh is no longer needed, stop refreshTicker.")
216+
if !refreshTags {
217+
t.logger.Info("ec2tagger: Refresh for tags is no longer needed, stop refreshTicker.")
223218
return
224219
}
225220
}
@@ -230,6 +225,34 @@ func (t *Tagger) refreshLoop(refreshInterval time.Duration, stopAfterFirstSucces
230225
}
231226
}
232227

228+
case <-t.shutdownC:
229+
return
230+
}
231+
}
232+
}
233+
234+
// refreshLoopVolumes handles the refresh ticks for describe volumes and also responds to shutdown signal
235+
func (t *Tagger) refreshLoopVolumes(refreshInterval time.Duration, stopAfterFirstSuccess bool) {
236+
refreshTicker := time.NewTicker(refreshInterval)
237+
defer refreshTicker.Stop()
238+
for {
239+
select {
240+
case <-refreshTicker.C:
241+
t.logger.Debug("ec2tagger refreshing volumes")
242+
allVolumesRetrieved := t.ebsVolumesRetrieved()
243+
t.logger.Debug("Retrieve status",
244+
zap.Bool("EbsAllVolumesRetrieved", allVolumesRetrieved))
245+
refreshVolumes := len(t.EBSDeviceKeys) > 0
246+
247+
if stopAfterFirstSuccess {
248+
// need refresh volumes when it is configured and not all volumes are retrieved
249+
refreshVolumes = refreshVolumes && !allVolumesRetrieved
250+
if !refreshVolumes {
251+
t.logger.Info("ec2tagger: Refresh for volumes is no longer needed, stop refreshTicker.")
252+
return
253+
}
254+
}
255+
233256
if refreshVolumes {
234257
if err := t.updateVolumes(); err != nil {
235258
t.logger.Warn("ec2tagger: Error refreshing EBS volumes, keeping old values", zap.Error(err))
@@ -333,7 +356,8 @@ func (t *Tagger) Start(ctx context.Context, host component.Host) error {
333356

334357
go func() { //Async start of initial retrieval to prevent block of agent start
335358
t.initialRetrievalOfTagsAndVolumes()
336-
t.refreshLoopToUpdateTagsAndVolumes()
359+
t.refreshLoopToUpdateTags()
360+
t.refreshLoopToUpdateVolumes()
337361
}()
338362
t.logger.Info("ec2tagger: EC2 tagger has started initialization.")
339363

@@ -343,24 +367,49 @@ func (t *Tagger) Start(ctx context.Context, host component.Host) error {
343367
return nil
344368
}
345369

346-
func (t *Tagger) refreshLoopToUpdateTagsAndVolumes() {
370+
func (t *Tagger) refreshLoopToUpdateTags() {
347371
needRefresh := false
348372
stopAfterFirstSuccess := false
349-
refreshInterval := t.RefreshIntervalSeconds
350373

351-
if t.RefreshIntervalSeconds.Seconds() == 0 {
374+
refreshInterval := t.RefreshTagsInterval
375+
if refreshInterval.Seconds() == 0 {
352376
//when the refresh interval is 0, this means that customer don't want to
353-
//update tags/volumes values once they are retrieved successfully. In this case,
377+
//update tags values once they are retrieved successfully. In this case,
354378
//we still want to do refresh to make sure all the specified keys for tags/volumes
355379
//are fetched successfully because initial retrieval might not get all of them.
356380
//When the specified key is "*", there is no way for us to check if all
357-
//tags/volumes are fetched. So there is no need to do refresh in this case.
358-
needRefresh = !(len(t.EC2InstanceTagKeys) == 1 && t.EC2InstanceTagKeys[0] == "*") ||
359-
!(len(t.EBSDeviceKeys) == 1 && t.EBSDeviceKeys[0] == "*")
381+
//tags are fetched. So there is no need to do refresh in this case.
382+
needRefresh = !(len(t.EC2InstanceTagKeys) == 1 && t.EC2InstanceTagKeys[0] == "*")
383+
384+
stopAfterFirstSuccess = true
385+
refreshInterval = defaultRefreshInterval
386+
} else if refreshInterval.Seconds() > 0 {
387+
//customer wants to update the tags with the given refresh interval
388+
needRefresh = true
389+
}
390+
391+
if needRefresh {
392+
go func() {
393+
// randomly stagger the time of the first refresh to mitigate throttling if a whole fleet is
394+
// restarted at the same time
395+
sleepUntilHostJitter(refreshInterval)
396+
t.refreshLoopTags(refreshInterval, stopAfterFirstSuccess)
397+
}()
398+
}
399+
}
400+
401+
func (t *Tagger) refreshLoopToUpdateVolumes() {
402+
needRefresh := false
403+
stopAfterFirstSuccess := false
404+
405+
refreshInterval := t.RefreshVolumesInterval
406+
if refreshInterval.Seconds() == 0 {
407+
needRefresh = !(len(t.EBSDeviceKeys) == 1 && t.EBSDeviceKeys[0] == "*")
408+
360409
stopAfterFirstSuccess = true
361410
refreshInterval = defaultRefreshInterval
362-
} else if t.RefreshIntervalSeconds.Seconds() > 0 {
363-
//customer wants to update the tags/volumes with the given refresh interval
411+
} else if refreshInterval.Seconds() > 0 {
412+
//customer wants to update the volumes with the given refresh interval
364413
needRefresh = true
365414
}
366415

@@ -369,7 +418,7 @@ func (t *Tagger) refreshLoopToUpdateTagsAndVolumes() {
369418
// randomly stagger the time of the first refresh to mitigate throttling if a whole fleet is
370419
// restarted at the same time
371420
sleepUntilHostJitter(refreshInterval)
372-
t.refreshLoop(refreshInterval, stopAfterFirstSuccess)
421+
t.refreshLoopVolumes(refreshInterval, stopAfterFirstSuccess)
373422
}()
374423
}
375424
}

plugins/processors/ec2tagger/ec2tagger_test.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ func TestStartFailWithNoMetadata(t *testing.T) {
282282
// run Start() and check all tags/volumes are retrieved and saved
283283
func TestStartSuccessWithNoTagsVolumesUpdate(t *testing.T) {
284284
cfg := createDefaultConfig().(*Config)
285-
cfg.RefreshIntervalSeconds = 0 * time.Second
285+
cfg.RefreshTagsInterval = 0 * time.Second
286+
cfg.RefreshVolumesInterval = 0 * time.Second
286287
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
287288
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
288289
cfg.EBSDeviceKeys = []string{device1, device2}
@@ -326,7 +327,8 @@ func TestStartSuccessWithNoTagsVolumesUpdate(t *testing.T) {
326327
func TestStartSuccessWithTagsVolumesUpdate(t *testing.T) {
327328
cfg := createDefaultConfig().(*Config)
328329
//use millisecond rather than second to speed up test execution
329-
cfg.RefreshIntervalSeconds = 20 * time.Millisecond
330+
cfg.RefreshTagsInterval = 20 * time.Millisecond
331+
cfg.RefreshVolumesInterval = 20 * time.Millisecond
330332
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
331333
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
332334
cfg.EBSDeviceKeys = []string{device1, device2}
@@ -382,7 +384,8 @@ func TestStartSuccessWithTagsVolumesUpdate(t *testing.T) {
382384
// check there is no attempt to fetch all tags/volumes
383385
func TestStartSuccessWithWildcardTagVolumeKey(t *testing.T) {
384386
cfg := createDefaultConfig().(*Config)
385-
cfg.RefreshIntervalSeconds = 0 * time.Second
387+
cfg.RefreshTagsInterval = 0 * time.Second
388+
cfg.RefreshVolumesInterval = 0 * time.Second
386389
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
387390
cfg.EC2InstanceTagKeys = []string{"*"}
388391
cfg.EBSDeviceKeys = []string{"*"}
@@ -426,7 +429,8 @@ func TestStartSuccessWithWildcardTagVolumeKey(t *testing.T) {
426429
func TestApplyWithTagsVolumesUpdate(t *testing.T) {
427430
cfg := createDefaultConfig().(*Config)
428431
//use millisecond rather than second to speed up test execution
429-
cfg.RefreshIntervalSeconds = 20 * time.Millisecond
432+
cfg.RefreshTagsInterval = 20 * time.Millisecond
433+
cfg.RefreshVolumesInterval = 20 * time.Millisecond
430434
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
431435
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
432436
cfg.EBSDeviceKeys = []string{device1, device2}
@@ -520,7 +524,8 @@ func TestApplyWithTagsVolumesUpdate(t *testing.T) {
520524
// Test metrics are dropped before the initial retrieval is done
521525
func TestMetricsDroppedBeforeStarted(t *testing.T) {
522526
cfg := createDefaultConfig().(*Config)
523-
cfg.RefreshIntervalSeconds = 0 * time.Millisecond
527+
cfg.RefreshTagsInterval = 0 * time.Millisecond
528+
cfg.RefreshVolumesInterval = 0 * time.Millisecond
524529
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
525530
cfg.EC2InstanceTagKeys = []string{"*"}
526531
cfg.EBSDeviceKeys = []string{"*"}
@@ -585,7 +590,8 @@ func TestMetricsDroppedBeforeStarted(t *testing.T) {
585590
// Test ec2tagger Start does not block for a long time
586591
func TestTaggerStartDoesNotBlock(t *testing.T) {
587592
cfg := createDefaultConfig().(*Config)
588-
cfg.RefreshIntervalSeconds = 0 * time.Second
593+
cfg.RefreshTagsInterval = 0 * time.Second
594+
cfg.RefreshVolumesInterval = 0 * time.Second
589595
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
590596
cfg.EC2InstanceTagKeys = []string{"*"}
591597
cfg.EBSDeviceKeys = []string{"*"}
@@ -628,7 +634,8 @@ func TestTaggerStartDoesNotBlock(t *testing.T) {
628634
// Test ec2tagger Start does not block for a long time
629635
func TestTaggerStartsWithoutTagOrVolume(t *testing.T) {
630636
cfg := createDefaultConfig().(*Config)
631-
cfg.RefreshIntervalSeconds = 0 * time.Second
637+
cfg.RefreshTagsInterval = 0 * time.Second
638+
cfg.RefreshVolumesInterval = 0 * time.Second
632639
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
633640
_, cancel := context.WithCancel(context.Background())
634641

translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ processors:
5151
- InstanceType
5252
imds_retries: 1
5353
middleware: agenthealth/statuscode
54-
refresh_interval_seconds: 0s
54+
refresh_tags_interval: 0s
55+
refresh_volumes_interval: 0s
5556
receivers:
5657
telegraf_cpu:
5758
collection_interval: 1m0s
@@ -90,11 +91,11 @@ service:
9091
- ec2tagger
9192
- awsentity/resource
9293
receivers:
93-
- telegraf_cpu
9494
- telegraf_disk
9595
- telegraf_mem
9696
- telegraf_netstat
9797
- telegraf_swap
98+
- telegraf_cpu
9899
metrics/hostDeltaMetrics:
99100
exporters:
100101
- awscloudwatch

translator/tocwconfig/sampleConfig/advanced_config_linux.yaml

+6-5
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ processors:
4646
ec2_instance_tag_keys:
4747
- AutoScalingGroupName
4848
ec2_metadata_tags:
49-
- InstanceType
5049
- ImageId
5150
- InstanceId
51+
- InstanceType
5252
imds_retries: 1
5353
middleware: agenthealth/statuscode
54-
refresh_interval_seconds: 0s
54+
refresh_tags_interval: 0s
55+
refresh_volumes_interval: 0s
5556
receivers:
5657
telegraf_cpu:
5758
collection_interval: 1m0s
@@ -98,13 +99,13 @@ service:
9899
- ec2tagger
99100
- awsentity/resource
100101
receivers:
102+
- telegraf_disk
103+
- telegraf_mem
104+
- telegraf_netstat
101105
- telegraf_swap
102106
- telegraf_ethtool
103107
- telegraf_nvidia_smi
104108
- telegraf_cpu
105-
- telegraf_disk
106-
- telegraf_mem
107-
- telegraf_netstat
108109
metrics/hostDeltaMetrics:
109110
exporters:
110111
- awscloudwatch

translator/tocwconfig/sampleConfig/advanced_config_windows.yaml

+5-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ processors:
4141
- InstanceType
4242
imds_retries: 1
4343
middleware: agenthealth/statuscode
44-
refresh_interval_seconds: 0s
44+
refresh_tags_interval: 0s
45+
refresh_volumes_interval: 0s
4546
receivers:
4647
telegraf_win_perf_counters/1492679118:
4748
alias_name: Memory
@@ -91,13 +92,13 @@ service:
9192
- ec2tagger
9293
- awsentity/resource
9394
receivers:
94-
- telegraf_win_perf_counters/1492679118
95-
- telegraf_win_perf_counters/3610923661
96-
- telegraf_win_perf_counters/3446270237
9795
- telegraf_win_perf_counters/3762679655
9896
- telegraf_win_perf_counters/2073218482
9997
- telegraf_win_perf_counters/2039663244
10098
- telegraf_win_perf_counters/4283769065
99+
- telegraf_win_perf_counters/1492679118
100+
- telegraf_win_perf_counters/3610923661
101+
- telegraf_win_perf_counters/3446270237
101102
telemetry:
102103
logs:
103104
development: false

translator/tocwconfig/sampleConfig/amp_config_linux.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ processors:
108108
- ImageId
109109
imds_retries: 1
110110
middleware: agenthealth/statuscode
111-
refresh_interval_seconds: 0s
111+
refresh_tags_interval: 0s
112+
refresh_volumes_interval: 0s
112113
rollup:
113114
attribute_groups:
114115
- - ImageId

translator/tocwconfig/sampleConfig/basic_config_linux.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ processors:
3636
ec2_instance_tag_keys:
3737
- AutoScalingGroupName
3838
ec2_metadata_tags:
39+
- InstanceType
3940
- ImageId
4041
- InstanceId
41-
- InstanceType
4242
imds_retries: 1
4343
middleware: agenthealth/statuscode
44-
refresh_interval_seconds: 0s
44+
refresh_tags_interval: 0s
45+
refresh_volumes_interval: 0s
4546
receivers:
4647
telegraf_disk:
4748
collection_interval: 1m0s

translator/tocwconfig/sampleConfig/basic_config_windows.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ processors:
4141
- InstanceType
4242
imds_retries: 1
4343
middleware: agenthealth/statuscode
44-
refresh_interval_seconds: 0s
44+
refresh_tags_interval: 0s
45+
refresh_volumes_interval: 0s
4546
receivers:
4647
telegraf_win_perf_counters/1492679118:
4748
alias_name: Memory

translator/tocwconfig/sampleConfig/compass_linux_config.yaml

+4-3
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ processors:
4747
ec2_instance_tag_keys:
4848
- AutoScalingGroupName
4949
ec2_metadata_tags:
50+
- InstanceType
5051
- ImageId
5152
- InstanceId
52-
- InstanceType
5353
imds_retries: 1
5454
middleware: agenthealth/statuscode
55-
refresh_interval_seconds: 0s
55+
refresh_tags_interval: 0s
56+
refresh_volumes_interval: 0s
5657
receivers:
5758
telegraf_socket_listener:
5859
collection_interval: 10s
@@ -75,8 +76,8 @@ service:
7576
- ec2tagger
7677
- awsentity/service/telegraf
7778
receivers:
78-
- telegraf_socket_listener
7979
- telegraf_statsd
80+
- telegraf_socket_listener
8081
telemetry:
8182
logs:
8283
development: false

0 commit comments

Comments
 (0)