Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report RPS for autoscaler metrics #5238

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,103 @@ data:
}
]
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "prometheus",
"fill": 1,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 13
},
"id": 9,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": true,
"targets": [
{
"expr": "sum(autoscaler_stable_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Average RPS",
"refId": "A"
},
{
"expr": "sum(autoscaler_panic_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Average Panic RPS",
"refId": "B"
},
{
"expr": "sum(autoscaler_target_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to change the title for this dashboard as well or create a new one for RPS.

"legendFormat": "Target RPS",
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Observed RPS",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
},
{
"aliasColors": {},
"bars": false,
Expand Down
4 changes: 3 additions & 1 deletion pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount
switch spec.ScalingMetric {
case autoscaling.RPS:
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
// TODO: Add metrics for RPS used in autoscaler.
a.reporter.ReportStableRPS(observedStableValue)
a.reporter.ReportPanicRPS(observedPanicValue)
a.reporter.ReportTargetRPS(spec.TargetValue)
default:
metricName = autoscaling.Concurrency // concurrency is used by default
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
Expand Down
15 changes: 15 additions & 0 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ func (r *mockReporter) ReportPanicRequestConcurrency(v float64) error {
return nil
}

// ReportStableRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportStableRPS(v float64) error {
return nil
}

// ReportPanicRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportPanicRPS(v float64) error {
return nil
}

// ReportTargetRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTargetRPS(v float64) error {
return nil
}

// ReportTargetRequestConcurrency of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTargetRequestConcurrency(v float64) error {
return nil
Expand Down
22 changes: 15 additions & 7 deletions pkg/autoscaler/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ var (
Namespaced: true,
Metric: autoscaling.Concurrency,
}
rpsMetricInfo = provider.CustomMetricInfo{
GroupResource: v1alpha1.Resource("revisions"),
Namespaced: true,
Metric: autoscaling.RPS,
}

errMetricNotSupported = errors.New("metric not supported")
errNotImplemented = errors.New("not implemented")
Expand All @@ -59,23 +64,26 @@ func NewMetricProvider(metricClient MetricClient) *MetricProvider {

// GetMetricByName implements the interface.
func (p *MetricProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*cmetrics.MetricValue, error) {
if !cmp.Equal(info, concurrencyMetricInfo) {
now := time.Now()
var data float64
var err error
if cmp.Equal(info, concurrencyMetricInfo) {
data, _, err = p.metricClient.StableAndPanicConcurrency(name, now)
} else if cmp.Equal(info, rpsMetricInfo) {
data, _, err = p.metricClient.StableAndPanicRPS(name, now)
} else {
return nil, errMetricNotSupported
}

now := time.Now()
concurrency, _, err := p.metricClient.StableAndPanicConcurrency(name, now)
if err != nil {
return nil, err
}
value := *resource.NewQuantity(int64(math.Ceil(concurrency)), resource.DecimalSI)

return &cmetrics.MetricValue{
Metric: cmetrics.MetricIdentifier{
Name: info.Metric,
},
Timestamp: metav1.Time{Time: now},
Value: value,
Value: *resource.NewQuantity(int64(math.Ceil(data)), resource.DecimalSI),
}, nil
}

Expand All @@ -86,5 +94,5 @@ func (p *MetricProvider) GetMetricBySelector(namespace string, selector labels.S

// ListAllMetrics implements the interface.
func (p *MetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
return []provider.CustomMetricInfo{concurrencyMetricInfo}
return []provider.CustomMetricInfo{concurrencyMetricInfo, rpsMetricInfo}
}
34 changes: 27 additions & 7 deletions pkg/autoscaler/metrics_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func TestGetMetricByName(t *testing.T) {
info: concurrencyMetricInfo,
},
want: 11,
}, {
name: "all good (RPS)",
args: args{
name: types.NamespacedName{Namespace: existingNamespace, Name: "test"},
info: rpsMetricInfo,
},
want: 14,
}, {
name: "requesting unsupported metric",
args: args{
Expand All @@ -74,7 +81,7 @@ func TestGetMetricByName(t *testing.T) {
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewMetricProvider(staticConcurrency(10.3))
p := NewMetricProvider(staticMetrics(10.3, 14))
got, err := p.GetMetricByName(tt.args.name, tt.args.info)
if (err != nil) != tt.wantErr {
t.Errorf("GetMetricByName() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -92,27 +99,40 @@ func TestGetMetricByName(t *testing.T) {
}

func TestGetMetricBySelector(t *testing.T) {
provider := NewMetricProvider(staticConcurrency(10.0))
provider := NewMetricProvider(staticMetrics(10.0, 14))
_, got := provider.GetMetricBySelector("foo", labels.NewSelector(), concurrencyMetricInfo)
if got != errNotImplemented {
t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented)
}

_, got = provider.GetMetricBySelector("foo", labels.NewSelector(), rpsMetricInfo)
if got != errNotImplemented {
t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented)
}
}

func TestListAllMetrics(t *testing.T) {
provider := NewMetricProvider(staticConcurrency(10.0))
got := provider.ListAllMetrics()[0]
provider := NewMetricProvider(staticMetrics(10.0, 14))
gotConcurrency := provider.ListAllMetrics()[0]

if equal, err := kmp.SafeEqual(gotConcurrency, concurrencyMetricInfo); err != nil {
t.Errorf("Got error comparing output, err = %v", err)
} else if !equal {
t.Errorf("ListAllMetrics() = %v, want %v", gotConcurrency, concurrencyMetricInfo)
}

if equal, err := kmp.SafeEqual(got, concurrencyMetricInfo); err != nil {
gotRPS := provider.ListAllMetrics()[1]
if equal, err := kmp.SafeEqual(gotRPS, rpsMetricInfo); err != nil {
t.Errorf("Got error comparing output, err = %v", err)
} else if !equal {
t.Errorf("ListAllMetrics() = %v, want %v", got, concurrencyMetricInfo)
t.Errorf("ListAllMetrics() = %v, want %v", gotRPS, rpsMetricInfo)
}
}

func staticConcurrency(concurrency float64) MetricClient {
func staticMetrics(concurrency, rps float64) MetricClient {
return &fake.MetricClient{
StableConcurrency: concurrency,
StableRPS: rps,
ErrF: func(key types.NamespacedName, now time.Time) error {
if key.Namespace != existingNamespace {
return errors.New("doesn't exist")
Expand Down
49 changes: 49 additions & 0 deletions pkg/autoscaler/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ var (
"target_concurrency_per_pod",
"The desired number of concurrent requests for each pod",
stats.UnitDimensionless)
stableRPSM = stats.Float64(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to register these metrics in func register.

"stable_requests_per_second",
"Average requests-per-second per observed pod over the stable window",
stats.UnitDimensionless)
panicRPSM = stats.Float64(
"panic_requests_per_second",
"Average requests-per-second per observed pod over the panic window",
stats.UnitDimensionless)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add targetRPS as well?

targetRPSM = stats.Float64(
"target_requests_per_second",
"The desired requests-per-second for each pod",
stats.UnitDimensionless)
panicM = stats.Int64(
"panic_mode",
"1 if autoscaler is in panic mode, 0 otherwise",
Expand Down Expand Up @@ -147,6 +159,24 @@ func register() {
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "Average requests-per-second over the stable window",
Measure: stableRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "Average requests-per-second over the panic window",
Measure: panicRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "The desired requests-per-second for each pod",
Measure: targetRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
)
if err != nil {
panic(err)
Expand All @@ -161,6 +191,9 @@ type StatsReporter interface {
ReportStableRequestConcurrency(v float64) error
ReportPanicRequestConcurrency(v float64) error
ReportTargetRequestConcurrency(v float64) error
ReportStableRPS(v float64) error
ReportPanicRPS(v float64) error
ReportTargetRPS(v float64) error
ReportExcessBurstCapacity(v float64) error
ReportPanic(v int64) error
}
Expand Down Expand Up @@ -235,6 +268,22 @@ func (r *Reporter) ReportTargetRequestConcurrency(v float64) error {
return r.report(targetRequestConcurrencyM.M(v))
}

// ReportStableRPS captures value v for stable RPS measure.
func (r *Reporter) ReportStableRPS(v float64) error {
return r.report(stableRPSM.M(v))
}

// ReportPanicRPS captures value v for panic RPS measure.
func (r *Reporter) ReportPanicRPS(v float64) error {
return r.report(panicRPSM.M(v))
}

// ReportTargetRPS captures value v for target requests-per-second measure.
func (r *Reporter) ReportTargetRPS(v float64) error {
return r.report(targetRPSM.M(v))

}

// ReportPanic captures value v for panic mode measure.
func (r *Reporter) ReportPanic(v int64) error {
return r.report(panicM.M(v))
Expand Down
11 changes: 10 additions & 1 deletion pkg/autoscaler/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) })
expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) })
expectSuccess(t, "ReportTargetRequestConcurrency", func() error { return r.ReportTargetRequestConcurrency(0.9) })
expectSuccess(t, "ReportStableRPS", func() error { return r.ReportStableRPS(5) })
expectSuccess(t, "ReportPanicRPS", func() error { return r.ReportPanicRPS(6) })
expectSuccess(t, "ReportTargetRPS", func() error { return r.ReportTargetRPS(7) })
expectSuccess(t, "ReportExcessBurstCapacity", func() error { return r.ReportExcessBurstCapacity(19.84) })
metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10)
metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7)
Expand All @@ -71,6 +74,9 @@ func TestReporterReport(t *testing.T) {
metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84)
metricstest.CheckLastValueData(t, "panic_request_concurrency", wantTags, 3)
metricstest.CheckLastValueData(t, "target_concurrency_per_pod", wantTags, 0.9)
metricstest.CheckLastValueData(t, "stable_requests_per_second", wantTags, 5)
metricstest.CheckLastValueData(t, "panic_requests_per_second", wantTags, 6)
metricstest.CheckLastValueData(t, "target_requests_per_second", wantTags, 7)

// All the stats are gauges - record multiple entries for one stat - last one should stick
expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(1) })
Expand Down Expand Up @@ -129,6 +135,9 @@ func resetMetrics() {
panicRequestConcurrencyM.Name(),
excessBurstCapacityM.Name(),
targetRequestConcurrencyM.Name(),
panicM.Name())
panicM.Name(),
stableRPSM.Name(),
panicRPSM.Name(),
targetRPSM.Name())
register()
}