From 81b1f535f60839ca63bff2da7102f7a7b6200b66 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Thu, 30 Apr 2020 11:09:48 +1000 Subject: [PATCH] Initial commit of host metrics cpu scraper using gopsutil to collect cpu times metric (#862) --- receiver/hostmetricsreceiver/README.md | 2 +- receiver/hostmetricsreceiver/config_test.go | 41 +++++- .../hostmetricsreceiver/example_config.yaml | 4 +- receiver/hostmetricsreceiver/factory.go | 29 ++-- .../hostmetrics_receiver.go | 13 +- .../hostmetrics_receiver_test.go | 50 ++++++- .../internal/metricutils.go | 39 +++++ .../hostmetricsreceiver/internal/scraper.go | 29 +++- .../internal/scraper/cpuscraper/config.go | 28 ++++ .../scraper/cpuscraper/cpu_constants.go | 45 ++++++ .../scraper/cpuscraper/cpu_scraper.go | 139 ++++++++++++++++++ .../scraper/cpuscraper/cpu_scraper_test.go | 96 ++++++++++++ .../internal/scraper/cpuscraper/factory.go | 65 ++++++++ .../scraper/cpuscraper/factory_test.go | 40 +++++ .../hostmetricsreceiver/internal/testutils.go | 64 ++++++++ .../hostmetricsreceiver/testdata/config.yaml | 10 +- 16 files changed, 661 insertions(+), 33 deletions(-) create mode 100644 receiver/hostmetricsreceiver/internal/metricutils.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/config.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_constants.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory_test.go create mode 100644 receiver/hostmetricsreceiver/internal/testutils.go diff --git a/receiver/hostmetricsreceiver/README.md b/receiver/hostmetricsreceiver/README.md index 1a9a17f08ad..b703431406e 100644 --- a/receiver/hostmetricsreceiver/README.md +++ b/receiver/hostmetricsreceiver/README.md @@ -11,7 +11,7 @@ hostmetrics: default_collection_interval: 10s scrapers: cpu: - report_per_process: true + report_per_cpu: true memory: disk: ``` diff --git a/receiver/hostmetricsreceiver/config_test.go b/receiver/hostmetricsreceiver/config_test.go index 23e444a729b..59a1b65aeef 100644 --- a/receiver/hostmetricsreceiver/config_test.go +++ b/receiver/hostmetricsreceiver/config_test.go @@ -17,10 +17,15 @@ package hostmetricsreceiver import ( "path" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper" ) func TestLoadConfig(t *testing.T) { @@ -29,7 +34,39 @@ func TestLoadConfig(t *testing.T) { factory := NewFactory() factories.Receivers[typeStr] = factory - _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) - require.Error(t, err) + require.NoError(t, err) + require.NotNil(t, cfg) + + assert.Equal(t, len(cfg.Receivers), 2) + + r0 := cfg.Receivers["hostmetrics"] + defaultConfigAllScrapers := factory.CreateDefaultConfig() + defaultConfigAllScrapers.(*Config).Scrapers = map[string]internal.Config{ + cpuscraper.TypeStr: getDefaultConfigWithDefaultCollectionInterval(&cpuscraper.Factory{}), + } + assert.Equal(t, r0, defaultConfigAllScrapers) + + r1 := cfg.Receivers["hostmetrics/customname"].(*Config) + assert.Equal(t, r1, + &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: "hostmetrics/customname", + }, + DefaultCollectionInterval: 10 * time.Second, + Scrapers: map[string]internal.Config{ + cpuscraper.TypeStr: &cpuscraper.Config{ + ConfigSettings: internal.ConfigSettings{CollectionIntervalValue: 5 * time.Second}, + ReportPerCPU: true, + }, + }, + }) +} + +func getDefaultConfigWithDefaultCollectionInterval(factory internal.Factory) internal.Config { + cfg := factory.CreateDefaultConfig() + cfg.SetCollectionInterval(10 * time.Second) + return cfg } diff --git a/receiver/hostmetricsreceiver/example_config.yaml b/receiver/hostmetricsreceiver/example_config.yaml index b573710c52f..0fe6fa48a8b 100644 --- a/receiver/hostmetricsreceiver/example_config.yaml +++ b/receiver/hostmetricsreceiver/example_config.yaml @@ -4,8 +4,10 @@ extensions: receivers: hostmetrics: - default_collection_interval: 10s + default_collection_interval: 60s scrapers: + cpu: + report_per_cpu: true exporters: logging: diff --git a/receiver/hostmetricsreceiver/factory.go b/receiver/hostmetricsreceiver/factory.go index 54ffbc80138..acf9449a67b 100644 --- a/receiver/hostmetricsreceiver/factory.go +++ b/receiver/hostmetricsreceiver/factory.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper" ) // This file implements Factory for HostMetrics receiver. @@ -40,17 +41,19 @@ const ( // Factory is the Factory for receiver. type Factory struct { - ScraperFactories map[string]internal.Factory + scraperFactories map[string]internal.Factory } -// NewFactory creates a new factory +// NewFactory creates a new factory for host metrics receiver. func NewFactory() *Factory { return &Factory{ - ScraperFactories: map[string]internal.Factory{}, + scraperFactories: map[string]internal.Factory{ + cpuscraper.TypeStr: &cpuscraper.Factory{}, + }, } } -// Type gets the type of the Receiver config created by this Factory. +// Type returns the type of the Receiver config created by this Factory. func (f *Factory) Type() configmodels.Type { return typeStr } @@ -71,6 +74,10 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { return fmt.Errorf("config type not hostmetrics.Config") } + if cfg.DefaultCollectionInterval <= 0 { + return fmt.Errorf("default_collection_interval must be a positive number") + } + // dynamically load the individual collector configs based on the key name cfg.Scrapers = map[string]internal.Config{} @@ -81,9 +88,9 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { } for key := range componentViperSection.GetStringMap(scrapersKey) { - factory, ok := f.ScraperFactories[key] + factory, ok := f.scraperFactories[key] if !ok { - return fmt.Errorf("invalid hostmetrics scraper key: %s", key) + return fmt.Errorf("invalid scraper key: %s", key) } collectorCfg := factory.CreateDefaultConfig() @@ -91,10 +98,14 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { if collectorViperSection != nil { err := collectorViperSection.UnmarshalExact(collectorCfg) if err != nil { - return fmt.Errorf("error reading settings for hostmetric scraper type %q: %v", key, err) + return fmt.Errorf("error reading settings for scraper type %q: %v", key, err) } } + if collectorCfg.CollectionInterval() <= 0 { + collectorCfg.SetCollectionInterval(cfg.DefaultCollectionInterval) + } + cfg.Scrapers[key] = collectorCfg } @@ -113,7 +124,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { } } -// CreateTraceReceiver creates a trace receiver based on provided config. +// CreateTraceReceiver returns error as trace receiver is not applicable to host metrics receiver. func (f *Factory) CreateTraceReceiver( ctx context.Context, params component.ReceiverCreateParams, @@ -138,7 +149,7 @@ func (f *Factory) CreateMetricsReceiver( config := cfg.(*Config) - hmr, err := NewHostMetricsReceiver(ctx, params.Logger, config, f.ScraperFactories, consumer) + hmr, err := NewHostMetricsReceiver(ctx, params.Logger, config, f.scraperFactories, consumer) if err != nil { return nil, err } diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver.go b/receiver/hostmetricsreceiver/hostmetrics_receiver.go index 18b69a84893..3c34fae75d9 100644 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver.go +++ b/receiver/hostmetricsreceiver/hostmetrics_receiver.go @@ -26,15 +26,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" ) -// Receiver is the type used to handle metrics from VM metrics. +// Receiver is the type that scrapes various host metrics. type Receiver struct { - consumer consumer.MetricsConsumer config *Config scrapers []internal.Scraper - cancel context.CancelFunc } -// NewHostMetricsReceiver creates a new set of VM and Process Metrics +// NewHostMetricsReceiver creates a host metrics scraper. func NewHostMetricsReceiver( ctx context.Context, logger *zap.Logger, @@ -45,7 +43,7 @@ func NewHostMetricsReceiver( scrapers := make([]internal.Scraper, 0) for key, cfg := range config.Scrapers { - scraper, err := factories[key].CreateMetricsScraper(ctx, logger, cfg) + scraper, err := factories[key].CreateMetricsScraper(ctx, logger, cfg, consumer) if err != nil { return nil, fmt.Errorf("cannot create scraper: %s", err.Error()) } @@ -53,7 +51,6 @@ func NewHostMetricsReceiver( } hmr := &Receiver{ - consumer: consumer, config: config, scrapers: scrapers, } @@ -63,8 +60,6 @@ func NewHostMetricsReceiver( // Start begins scraping host metrics based on the OS platform. func (hmr *Receiver) Start(ctx context.Context, host component.Host) error { - ctx, hmr.cancel = context.WithCancel(ctx) - go func() { for _, scraper := range hmr.scrapers { err := scraper.Start(ctx) @@ -80,8 +75,6 @@ func (hmr *Receiver) Start(ctx context.Context, host component.Host) error { // Shutdown stops the underlying host metrics scrapers. func (hmr *Receiver) Shutdown(ctx context.Context) error { - hmr.cancel() - var errs []error for _, scraper := range hmr.scrapers { diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go index b5ae6fb92ac..22ee97bcca4 100644 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go +++ b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go @@ -16,36 +16,74 @@ package hostmetricsreceiver import ( "context" + "runtime" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector/component/componenttest" + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper" ) func TestGatherMetrics_EndToEnd(t *testing.T) { sink := &exportertest.SinkMetricsExporter{} config := &Config{ - DefaultCollectionInterval: 0, - Scrapers: map[string]internal.Config{}, + Scrapers: map[string]internal.Config{ + cpuscraper.TypeStr: &cpuscraper.Config{ + ConfigSettings: internal.ConfigSettings{CollectionIntervalValue: 100 * time.Millisecond}, + ReportPerCPU: true, + }, + }, } - factories := map[string]internal.Factory{} + factories := map[string]internal.Factory{ + cpuscraper.TypeStr: &cpuscraper.Factory{}, + } receiver, err := NewHostMetricsReceiver(context.Background(), zap.NewNop(), config, factories, sink) + + if runtime.GOOS != "windows" { + require.Error(t, err, "Expected error when creating a host metrics receiver with cpuscraper collector on a non-windows environment") + return + } + require.NoError(t, err, "Failed to create metrics receiver: %v", err) err = receiver.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err, "Failed to start metrics receiver: %v", err) defer func() { assert.NoError(t, receiver.Shutdown(context.Background())) }() - got := sink.AllMetrics() + require.Eventually(t, func() bool { + got := sink.AllMetrics() + if len(got) == 0 { + return false + } + + assertMetricData(t, got) + return true + }, time.Second, 10*time.Millisecond, "No metrics were collected") +} + +func assertMetricData(t *testing.T, got []pdata.Metrics) { + metrics := internal.AssertSingleMetricDataAndGetMetricsSlice(t, got) + + // expect 1 metric + assert.Equal(t, 1, metrics.Len()) - // expect 0 MetricData objects - assert.Equal(t, 0, len(got)) + // for cpu seconds metric, expect 5 timeseries with appropriate labels + hostCPUTimeMetric := metrics.At(0) + internal.AssertDescriptorEqual(t, cpuscraper.MetricCPUSecondsDescriptor, hostCPUTimeMetric.MetricDescriptor()) + assert.Equal(t, 4*runtime.NumCPU(), hostCPUTimeMetric.Int64DataPoints().Len()) + internal.AssertInt64MetricLabelExists(t, hostCPUTimeMetric, 0, cpuscraper.CPULabel) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 0, cpuscraper.StateLabel, cpuscraper.UserStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 1, cpuscraper.StateLabel, cpuscraper.SystemStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 2, cpuscraper.StateLabel, cpuscraper.IdleStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 3, cpuscraper.StateLabel, cpuscraper.InterruptStateLabelValue) } diff --git a/receiver/hostmetricsreceiver/internal/metricutils.go b/receiver/hostmetricsreceiver/internal/metricutils.go new file mode 100644 index 00000000000..a16089e6c8c --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/metricutils.go @@ -0,0 +1,39 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" +) + +// Initializes a metric with a metric slice and returns it. +func InitializeMetricSlice(metricData data.MetricData) pdata.MetricSlice { + rms := metricData.ResourceMetrics() + rms.Resize(1) + rm := rms.At(0) + ilms := rm.InstrumentationLibraryMetrics() + ilms.Resize(1) + ilm := ilms.At(0) + return ilm.Metrics() +} + +// AddNewMetric appends an empty metric to the metric slice, resizing +// the slice by 1, and returns the new metric. +func AddNewMetric(metrics pdata.MetricSlice) pdata.Metric { + len := metrics.Len() + metrics.Resize(len + 1) + return metrics.At(len) +} diff --git a/receiver/hostmetricsreceiver/internal/scraper.go b/receiver/hostmetricsreceiver/internal/scraper.go index ab823106265..fa256e69cf0 100644 --- a/receiver/hostmetricsreceiver/internal/scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper.go @@ -16,8 +16,11 @@ package internal import ( "context" + "time" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/consumer" ) // Scraper gathers metrics from the host machine and converts @@ -43,10 +46,32 @@ type Factory interface { // CreateMetricsScraper creates a scraper based on this config. // If the config is not valid, error will be returned instead. - CreateMetricsScraper(ctx context.Context, - logger *zap.Logger, cfg Config) (Scraper, error) + CreateMetricsScraper( + ctx context.Context, + logger *zap.Logger, + cfg Config, + consumer consumer.MetricsConsumer) (Scraper, error) } // Config is the configuration of a scraper. type Config interface { + // CollectionInterval returns the interval at which the scraper collects metrics + CollectionInterval() time.Duration + // SetCollectionInterval sets the interval at which the scraper collects metrics + SetCollectionInterval(time.Duration) +} + +// ConfigSettings provides common settings for scraper configuration. +type ConfigSettings struct { + CollectionIntervalValue time.Duration `mapstructure:"collection_interval"` +} + +// CollectionInterval returns the interval at which the scraper collects metrics +func (c *ConfigSettings) CollectionInterval() time.Duration { + return c.CollectionIntervalValue +} + +// SetCollectionInterval sets the interval at which the scraper collects metrics +func (c *ConfigSettings) SetCollectionInterval(interval time.Duration) { + c.CollectionIntervalValue = interval } diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/config.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/config.go new file mode 100644 index 00000000000..e79ae2037f9 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/config.go @@ -0,0 +1,28 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" + +// Config relating to CPU Metric Scraper. +type Config struct { + internal.ConfigSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + + // If `true`, stats will be generated for the system as a whole _as well + // as_ for each individual CPU/core in the system and will be distinguished + // by the `cpu` dimension. If `false`, stats will only be generated for + // the system as a whole that will not include a `cpu` dimension. + ReportPerCPU bool `mapstructure:"report_per_cpu"` +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_constants.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_constants.go new file mode 100644 index 00000000000..e8d6a3fc0d8 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_constants.go @@ -0,0 +1,45 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import ( + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" +) + +// cpu metric constants + +var ( + StateLabel = "state" + CPULabel = "cpu" +) + +var ( + UserStateLabelValue = "user" + SystemStateLabelValue = "system" + IdleStateLabelValue = "idle" + InterruptStateLabelValue = "interrupt" +) + +var MetricCPUSecondsDescriptor = metricCPUSecondsDescriptor() + +func metricCPUSecondsDescriptor() pdata.MetricDescriptor { + descriptor := pdata.NewMetricDescriptor() + descriptor.InitEmpty() + descriptor.SetName("host/cpu/time") + descriptor.SetDescription("Total CPU ticks or jiffies broken down by different states") + descriptor.SetUnit("1") + descriptor.SetType(pdata.MetricTypeCounterInt64) + return descriptor +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go new file mode 100644 index 00000000000..25d2dc245e4 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go @@ -0,0 +1,139 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import ( + "context" + "fmt" + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/host" + "go.opencensus.io/trace" + + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/consumer/pdatautil" + "github.com/open-telemetry/opentelemetry-collector/internal/data" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" +) + +// Scraper for CPU Metrics +type Scraper struct { + config *Config + consumer consumer.MetricsConsumer + startTime pdata.TimestampUnixNano + cancel context.CancelFunc +} + +// NewCPUScraper creates a set of CPU related metrics +func NewCPUScraper(ctx context.Context, cfg *Config, consumer consumer.MetricsConsumer) (*Scraper, error) { + return &Scraper{config: cfg, consumer: consumer}, nil +} + +// Start +func (c *Scraper) Start(ctx context.Context) error { + ctx, c.cancel = context.WithCancel(ctx) + + bootTime, err := host.BootTime() + if err != nil { + return err + } + + c.startTime = pdata.TimestampUnixNano(bootTime) + + go func() { + ticker := time.NewTicker(c.config.CollectionInterval()) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.scrapeMetrics(ctx) + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +// Close +func (c *Scraper) Close(ctx context.Context) error { + c.cancel() + return nil +} + +func (c *Scraper) scrapeMetrics(ctx context.Context) { + ctx, span := trace.StartSpan(ctx, "cpuscraper.scrapeMetrics") + defer span.End() + + metricData := data.NewMetricData() + metrics := internal.InitializeMetricSlice(metricData) + + err := c.scrapeAndAppendMetrics(metrics) + if err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Error(s) when scraping cpu metrics: %v", err)}) + return + } + + if metrics.Len() > 0 { + err := c.consumer.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(metricData)) + if err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Unable to process metrics: %v", err)}) + return + } + } +} + +func (c *Scraper) scrapeAndAppendMetrics(metrics pdata.MetricSlice) error { + cpuTimes, err := cpu.Times(c.config.ReportPerCPU) + if err != nil { + return err + } + + metric := internal.AddNewMetric(metrics) + initializeCPUSecondsMetric(metric, c.startTime, cpuTimes) + return nil +} + +func initializeCPUSecondsMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, cpuTimes []cpu.TimesStat) { + MetricCPUSecondsDescriptor.CopyTo(metric.MetricDescriptor()) + + idps := metric.Int64DataPoints() + idps.Resize(4 * len(cpuTimes)) + for i, cpuTime := range cpuTimes { + initializeCPUSecondsDataPoint(idps.At(4*i+0), startTime, cpuTime.CPU, UserStateLabelValue, int64(cpuTime.User)) + initializeCPUSecondsDataPoint(idps.At(4*i+1), startTime, cpuTime.CPU, SystemStateLabelValue, int64(cpuTime.System)) + initializeCPUSecondsDataPoint(idps.At(4*i+2), startTime, cpuTime.CPU, IdleStateLabelValue, int64(cpuTime.Idle)) + initializeCPUSecondsDataPoint(idps.At(4*i+3), startTime, cpuTime.CPU, InterruptStateLabelValue, int64(cpuTime.Irq)) + } +} + +const gopsCPUTotal string = "cpu-total" + +func initializeCPUSecondsDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, cpuLabel string, stateLabel string, value int64) { + labelsMap := dataPoint.LabelsMap() + // ignore cpu label if reporting "total" cpu usage + if cpuLabel != gopsCPUTotal { + labelsMap.Insert(CPULabel, cpuLabel) + } + labelsMap.Insert(StateLabel, stateLabel) + + dataPoint.SetStartTime(startTime) + dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) + dataPoint.SetValue(value) +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go new file mode 100644 index 00000000000..cea3634a82a --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go @@ -0,0 +1,96 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" +) + +type validationFn func(*testing.T, []pdata.Metrics) + +func TestScrapeMetrics_MinimalData(t *testing.T) { + createScraperAndValidateScrapedMetrics(t, &Config{}, func(t *testing.T, got []pdata.Metrics) { + metrics := internal.AssertSingleMetricDataAndGetMetricsSlice(t, got) + + // expect 1 metric + assert.Equal(t, 1, metrics.Len()) + + // for cpu seconds metric, expect 4 timeseries with appropriate labels + hostCPUTimeMetric := metrics.At(0) + internal.AssertDescriptorEqual(t, MetricCPUSecondsDescriptor, hostCPUTimeMetric.MetricDescriptor()) + assert.Equal(t, 4, hostCPUTimeMetric.Int64DataPoints().Len()) + internal.AssertInt64MetricLabelDoesNotExist(t, hostCPUTimeMetric, 0, CPULabel) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 0, StateLabel, UserStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 1, StateLabel, SystemStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 2, StateLabel, IdleStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 3, StateLabel, InterruptStateLabelValue) + }) +} + +func TestScrapeMetrics_AllData(t *testing.T) { + config := &Config{ + ReportPerCPU: true, + } + + createScraperAndValidateScrapedMetrics(t, config, func(t *testing.T, got []pdata.Metrics) { + metrics := internal.AssertSingleMetricDataAndGetMetricsSlice(t, got) + + // expect 1 metric + assert.Equal(t, 1, metrics.Len()) + + // for cpu seconds metric, expect 4*cores timeseries with appropriate labels + hostCPUTimeMetric := metrics.At(0) + internal.AssertDescriptorEqual(t, MetricCPUSecondsDescriptor, hostCPUTimeMetric.MetricDescriptor()) + assert.Equal(t, 4*runtime.NumCPU(), hostCPUTimeMetric.Int64DataPoints().Len()) + internal.AssertInt64MetricLabelExists(t, hostCPUTimeMetric, 0, CPULabel) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 0, StateLabel, UserStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 1, StateLabel, SystemStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 2, StateLabel, IdleStateLabelValue) + internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 3, StateLabel, InterruptStateLabelValue) + }) +} + +func createScraperAndValidateScrapedMetrics(t *testing.T, config *Config, assertFn validationFn) { + config.SetCollectionInterval(100 * time.Millisecond) + + sink := &exportertest.SinkMetricsExporter{} + + scraper, err := NewCPUScraper(context.Background(), config, sink) + require.NoError(t, err, "Failed to create cpu scraper: %v", err) + + err = scraper.Start(context.Background()) + require.NoError(t, err, "Failed to start cpu scraper: %v", err) + defer func() { assert.NoError(t, scraper.Close(context.Background())) }() + + require.Eventually(t, func() bool { + got := sink.AllMetrics() + if len(got) == 0 { + return false + } + + assertFn(t, got) + return true + }, time.Second, 10*time.Millisecond, "No metrics were collected") +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go new file mode 100644 index 00000000000..934ab95dd06 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go @@ -0,0 +1,65 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import ( + "context" + "errors" + "runtime" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal" +) + +// This file implements Factory for CPU scraper. + +const ( + // The value of "type" key in configuration. + TypeStr = "cpu" +) + +// Factory is the Factory for scraper. +type Factory struct { +} + +// Type gets the type of the scraper config created by this Factory. +func (f *Factory) Type() string { + return TypeStr +} + +// CreateDefaultConfig creates the default configuration for the Scraper. +func (f *Factory) CreateDefaultConfig() internal.Config { + return &Config{ + ReportPerCPU: true, + } +} + +// CreateMetricsScraper creates a scraper based on provided config. +func (f *Factory) CreateMetricsScraper( + ctx context.Context, + logger *zap.Logger, + config internal.Config, + consumer consumer.MetricsConsumer, +) (internal.Scraper, error) { + if runtime.GOOS != "windows" { + return nil, errors.New("cpu scraper is currently only supported on windows") + } + + cfg := config.(*Config) + + return NewCPUScraper(ctx, cfg, consumer) +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory_test.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory_test.go new file mode 100644 index 00000000000..6f3b5666201 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory_test.go @@ -0,0 +1,40 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuscraper + +import ( + "context" + "runtime" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestCreateMetricsScraper(t *testing.T) { + factory := &Factory{} + cfg := &Config{} + + scraper, err := factory.CreateMetricsScraper(context.Background(), zap.NewNop(), cfg, nil) + + switch os := runtime.GOOS; os { + case "windows": + assert.Nil(t, err) + assert.NotNil(t, scraper) + default: + assert.NotNil(t, err) + assert.Nil(t, scraper) + } +} diff --git a/receiver/hostmetricsreceiver/internal/testutils.go b/receiver/hostmetricsreceiver/internal/testutils.go new file mode 100644 index 00000000000..fe580a9e22e --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/testutils.go @@ -0,0 +1,64 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/consumer/pdatautil" +) + +func AssertSingleMetricDataAndGetMetricsSlice(t *testing.T, metrics []pdata.Metrics) pdata.MetricSlice { + // expect 1 MetricData object + assert.Equal(t, 1, len(metrics)) + md := pdatautil.MetricsToInternalMetrics(metrics[0]) + + // expect 1 ResourceMetrics object + rms := md.ResourceMetrics() + assert.Equal(t, 1, rms.Len()) + rm := rms.At(0) + + // expect 1 InstrumentationLibraryMetrics object + ilms := rm.InstrumentationLibraryMetrics() + assert.Equal(t, 1, ilms.Len()) + return ilms.At(0).Metrics() +} + +func AssertDescriptorEqual(t *testing.T, expected pdata.MetricDescriptor, actual pdata.MetricDescriptor) { + assert.Equal(t, expected.Name(), actual.Name()) + assert.Equal(t, expected.Description(), actual.Description()) + assert.Equal(t, expected.Unit(), actual.Unit()) + assert.Equal(t, expected.Type(), actual.Type()) + assert.EqualValues(t, expected.LabelsMap().Sort(), actual.LabelsMap().Sort()) +} + +func AssertInt64MetricLabelHasValue(t *testing.T, metric pdata.Metric, index int, labelName string, expectedVal string) { + val, ok := metric.Int64DataPoints().At(index).LabelsMap().Get(labelName) + assert.Truef(t, ok, "Missing label %q in metric %q", labelName, metric.MetricDescriptor().Name()) + assert.Equal(t, expectedVal, val.Value()) +} + +func AssertInt64MetricLabelExists(t *testing.T, metric pdata.Metric, index int, labelName string) { + _, ok := metric.Int64DataPoints().At(index).LabelsMap().Get(labelName) + assert.Truef(t, ok, "Missing label %q in metric %q", labelName, metric.MetricDescriptor().Name()) +} + +func AssertInt64MetricLabelDoesNotExist(t *testing.T, metric pdata.Metric, index int, labelName string) { + _, ok := metric.Int64DataPoints().At(index).LabelsMap().Get(labelName) + assert.Falsef(t, ok, "Unexpected label %q in metric %q", labelName, metric.MetricDescriptor().Name()) +} diff --git a/receiver/hostmetricsreceiver/testdata/config.yaml b/receiver/hostmetricsreceiver/testdata/config.yaml index 6fdb43f56ee..298990054e5 100644 --- a/receiver/hostmetricsreceiver/testdata/config.yaml +++ b/receiver/hostmetricsreceiver/testdata/config.yaml @@ -1,7 +1,13 @@ receivers: hostmetrics: - scrape_interval: 10s scrapers: + cpu: + hostmetrics/customname: + default_collection_interval: 10s + scrapers: + cpu: + collection_interval: 5s + report_per_cpu: true processors: exampleprocessor: @@ -11,7 +17,7 @@ exporters: service: pipelines: - traces: + metrics: receivers: [hostmetrics] processors: [exampleprocessor] exporters: [exampleexporter]