From aead62f9075dc35a2de7687cfe6ee9708ed01587 Mon Sep 17 00:00:00 2001 From: Tara Gu Date: Wed, 21 Aug 2019 10:26:19 -0400 Subject: [PATCH] Report RPS for autoscaler metrics --- .../100-grafana-dash-knative-scaling.yaml | 97 +++++++++++++++++++ pkg/autoscaler/autoscaler.go | 4 +- pkg/autoscaler/autoscaler_test.go | 15 +++ pkg/autoscaler/metrics_provider.go | 22 +++-- pkg/autoscaler/metrics_provider_test.go | 34 +++++-- pkg/autoscaler/stats_reporter.go | 49 ++++++++++ pkg/autoscaler/stats_reporter_test.go | 11 ++- 7 files changed, 216 insertions(+), 16 deletions(-) diff --git a/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml b/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml index 5aabbf4ef776..5bff01345b46 100644 --- a/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml +++ b/config/monitoring/metrics/prometheus/100-grafana-dash-knative-scaling.yaml @@ -601,6 +601,103 @@ data: } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "prometheus", + "fill": 1, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 13 + }, + "id": 9, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": true, + "targets": [ + { + "expr": "sum(autoscaler_stable_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Average RPS", + "refId": "A" + }, + { + "expr": "sum(autoscaler_panic_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Average Panic RPS", + "refId": "B" + }, + { + "expr": "sum(autoscaler_target_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})", + "format": "time_series", + "interval": "1s", + "intervalFactor": 1, + "legendFormat": "Target RPS", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Observed RPS", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, { "aliasColors": {}, "bars": false, diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index d77fae24727f..44e5ecd9b94e 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -135,7 +135,9 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount switch spec.ScalingMetric { case autoscaling.RPS: observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now) - // TODO: Add metrics for RPS used in autoscaler. + a.reporter.ReportStableRPS(observedStableValue) + a.reporter.ReportPanicRPS(observedPanicValue) + a.reporter.ReportTargetRPS(spec.TargetValue) default: metricName = autoscaling.Concurrency // concurrency is used by default observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now) diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index 83993516570e..7fdc077fbe8b 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -279,6 +279,21 @@ func (r *mockReporter) ReportPanicRequestConcurrency(v float64) error { return nil } +// ReportStableRPS of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportStableRPS(v float64) error { + return nil +} + +// ReportPanicRPS of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportPanicRPS(v float64) error { + return nil +} + +// ReportTargetRPS of a mockReporter does nothing and return nil for error. +func (r *mockReporter) ReportTargetRPS(v float64) error { + return nil +} + // ReportTargetRequestConcurrency of a mockReporter does nothing and return nil for error. func (r *mockReporter) ReportTargetRequestConcurrency(v float64) error { return nil diff --git a/pkg/autoscaler/metrics_provider.go b/pkg/autoscaler/metrics_provider.go index 3b8ca7345216..fd8c0a85833a 100644 --- a/pkg/autoscaler/metrics_provider.go +++ b/pkg/autoscaler/metrics_provider.go @@ -38,6 +38,11 @@ var ( Namespaced: true, Metric: autoscaling.Concurrency, } + rpsMetricInfo = provider.CustomMetricInfo{ + GroupResource: v1alpha1.Resource("revisions"), + Namespaced: true, + Metric: autoscaling.RPS, + } errMetricNotSupported = errors.New("metric not supported") errNotImplemented = errors.New("not implemented") @@ -59,23 +64,26 @@ func NewMetricProvider(metricClient MetricClient) *MetricProvider { // GetMetricByName implements the interface. func (p *MetricProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*cmetrics.MetricValue, error) { - if !cmp.Equal(info, concurrencyMetricInfo) { + now := time.Now() + var data float64 + var err error + if cmp.Equal(info, concurrencyMetricInfo) { + data, _, err = p.metricClient.StableAndPanicConcurrency(name, now) + } else if cmp.Equal(info, rpsMetricInfo) { + data, _, err = p.metricClient.StableAndPanicRPS(name, now) + } else { return nil, errMetricNotSupported } - - now := time.Now() - concurrency, _, err := p.metricClient.StableAndPanicConcurrency(name, now) if err != nil { return nil, err } - value := *resource.NewQuantity(int64(math.Ceil(concurrency)), resource.DecimalSI) return &cmetrics.MetricValue{ Metric: cmetrics.MetricIdentifier{ Name: info.Metric, }, Timestamp: metav1.Time{Time: now}, - Value: value, + Value: *resource.NewQuantity(int64(math.Ceil(data)), resource.DecimalSI), }, nil } @@ -86,5 +94,5 @@ func (p *MetricProvider) GetMetricBySelector(namespace string, selector labels.S // ListAllMetrics implements the interface. func (p *MetricProvider) ListAllMetrics() []provider.CustomMetricInfo { - return []provider.CustomMetricInfo{concurrencyMetricInfo} + return []provider.CustomMetricInfo{concurrencyMetricInfo, rpsMetricInfo} } diff --git a/pkg/autoscaler/metrics_provider_test.go b/pkg/autoscaler/metrics_provider_test.go index 628eace51dd9..7ee4aaa3196e 100644 --- a/pkg/autoscaler/metrics_provider_test.go +++ b/pkg/autoscaler/metrics_provider_test.go @@ -53,6 +53,13 @@ func TestGetMetricByName(t *testing.T) { info: concurrencyMetricInfo, }, want: 11, + }, { + name: "all good (RPS)", + args: args{ + name: types.NamespacedName{Namespace: existingNamespace, Name: "test"}, + info: rpsMetricInfo, + }, + want: 14, }, { name: "requesting unsupported metric", args: args{ @@ -74,7 +81,7 @@ func TestGetMetricByName(t *testing.T) { }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := NewMetricProvider(staticConcurrency(10.3)) + p := NewMetricProvider(staticMetrics(10.3, 14)) got, err := p.GetMetricByName(tt.args.name, tt.args.info) if (err != nil) != tt.wantErr { t.Errorf("GetMetricByName() error = %v, wantErr %v", err, tt.wantErr) @@ -92,27 +99,40 @@ func TestGetMetricByName(t *testing.T) { } func TestGetMetricBySelector(t *testing.T) { - provider := NewMetricProvider(staticConcurrency(10.0)) + provider := NewMetricProvider(staticMetrics(10.0, 14)) _, got := provider.GetMetricBySelector("foo", labels.NewSelector(), concurrencyMetricInfo) if got != errNotImplemented { t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented) } + + _, got = provider.GetMetricBySelector("foo", labels.NewSelector(), rpsMetricInfo) + if got != errNotImplemented { + t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented) + } } func TestListAllMetrics(t *testing.T) { - provider := NewMetricProvider(staticConcurrency(10.0)) - got := provider.ListAllMetrics()[0] + provider := NewMetricProvider(staticMetrics(10.0, 14)) + gotConcurrency := provider.ListAllMetrics()[0] + + if equal, err := kmp.SafeEqual(gotConcurrency, concurrencyMetricInfo); err != nil { + t.Errorf("Got error comparing output, err = %v", err) + } else if !equal { + t.Errorf("ListAllMetrics() = %v, want %v", gotConcurrency, concurrencyMetricInfo) + } - if equal, err := kmp.SafeEqual(got, concurrencyMetricInfo); err != nil { + gotRPS := provider.ListAllMetrics()[1] + if equal, err := kmp.SafeEqual(gotRPS, rpsMetricInfo); err != nil { t.Errorf("Got error comparing output, err = %v", err) } else if !equal { - t.Errorf("ListAllMetrics() = %v, want %v", got, concurrencyMetricInfo) + t.Errorf("ListAllMetrics() = %v, want %v", gotRPS, rpsMetricInfo) } } -func staticConcurrency(concurrency float64) MetricClient { +func staticMetrics(concurrency, rps float64) MetricClient { return &fake.MetricClient{ StableConcurrency: concurrency, + StableRPS: rps, ErrF: func(key types.NamespacedName, now time.Time) error { if key.Namespace != existingNamespace { return errors.New("doesn't exist") diff --git a/pkg/autoscaler/stats_reporter.go b/pkg/autoscaler/stats_reporter.go index 4de64ad19a89..a9f6f45c0365 100644 --- a/pkg/autoscaler/stats_reporter.go +++ b/pkg/autoscaler/stats_reporter.go @@ -57,6 +57,18 @@ var ( "target_concurrency_per_pod", "The desired number of concurrent requests for each pod", stats.UnitDimensionless) + stableRPSM = stats.Float64( + "stable_requests_per_second", + "Average requests-per-second per observed pod over the stable window", + stats.UnitDimensionless) + panicRPSM = stats.Float64( + "panic_requests_per_second", + "Average requests-per-second per observed pod over the panic window", + stats.UnitDimensionless) + targetRPSM = stats.Float64( + "target_requests_per_second", + "The desired requests-per-second for each pod", + stats.UnitDimensionless) panicM = stats.Int64( "panic_mode", "1 if autoscaler is in panic mode, 0 otherwise", @@ -147,6 +159,24 @@ func register() { Aggregation: view.LastValue(), TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey}, }, + &view.View{ + Description: "Average requests-per-second over the stable window", + Measure: stableRPSM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey}, + }, + &view.View{ + Description: "Average requests-per-second over the panic window", + Measure: panicRPSM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey}, + }, + &view.View{ + Description: "The desired requests-per-second for each pod", + Measure: targetRPSM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey}, + }, ) if err != nil { panic(err) @@ -161,6 +191,9 @@ type StatsReporter interface { ReportStableRequestConcurrency(v float64) error ReportPanicRequestConcurrency(v float64) error ReportTargetRequestConcurrency(v float64) error + ReportStableRPS(v float64) error + ReportPanicRPS(v float64) error + ReportTargetRPS(v float64) error ReportExcessBurstCapacity(v float64) error ReportPanic(v int64) error } @@ -235,6 +268,22 @@ func (r *Reporter) ReportTargetRequestConcurrency(v float64) error { return r.report(targetRequestConcurrencyM.M(v)) } +// ReportStableRPS captures value v for stable RPS measure. +func (r *Reporter) ReportStableRPS(v float64) error { + return r.report(stableRPSM.M(v)) +} + +// ReportPanicRPS captures value v for panic RPS measure. +func (r *Reporter) ReportPanicRPS(v float64) error { + return r.report(panicRPSM.M(v)) +} + +// ReportTargetRPS captures value v for target requests-per-second measure. +func (r *Reporter) ReportTargetRPS(v float64) error { + return r.report(targetRPSM.M(v)) + +} + // ReportPanic captures value v for panic mode measure. func (r *Reporter) ReportPanic(v int64) error { return r.report(panicM.M(v)) diff --git a/pkg/autoscaler/stats_reporter_test.go b/pkg/autoscaler/stats_reporter_test.go index 63c5d985f875..499b5255ae87 100644 --- a/pkg/autoscaler/stats_reporter_test.go +++ b/pkg/autoscaler/stats_reporter_test.go @@ -62,6 +62,9 @@ func TestReporterReport(t *testing.T) { expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) }) expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) }) expectSuccess(t, "ReportTargetRequestConcurrency", func() error { return r.ReportTargetRequestConcurrency(0.9) }) + expectSuccess(t, "ReportStableRPS", func() error { return r.ReportStableRPS(5) }) + expectSuccess(t, "ReportPanicRPS", func() error { return r.ReportPanicRPS(6) }) + expectSuccess(t, "ReportTargetRPS", func() error { return r.ReportTargetRPS(7) }) expectSuccess(t, "ReportExcessBurstCapacity", func() error { return r.ReportExcessBurstCapacity(19.84) }) metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10) metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7) @@ -71,6 +74,9 @@ func TestReporterReport(t *testing.T) { metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84) metricstest.CheckLastValueData(t, "panic_request_concurrency", wantTags, 3) metricstest.CheckLastValueData(t, "target_concurrency_per_pod", wantTags, 0.9) + metricstest.CheckLastValueData(t, "stable_requests_per_second", wantTags, 5) + metricstest.CheckLastValueData(t, "panic_requests_per_second", wantTags, 6) + metricstest.CheckLastValueData(t, "target_requests_per_second", wantTags, 7) // All the stats are gauges - record multiple entries for one stat - last one should stick expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(1) }) @@ -129,6 +135,9 @@ func resetMetrics() { panicRequestConcurrencyM.Name(), excessBurstCapacityM.Name(), targetRequestConcurrencyM.Name(), - panicM.Name()) + panicM.Name(), + stableRPSM.Name(), + panicRPSM.Name(), + targetRPSM.Name()) register() }