Skip to content

Commit

Permalink
Report RPS for autoscaler metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Tara Gu committed Aug 26, 2019
1 parent de9936c commit aead62f
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,103 @@ data:
}
]
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "prometheus",
"fill": 1,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 13
},
"id": 9,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": true,
"targets": [
{
"expr": "sum(autoscaler_stable_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Average RPS",
"refId": "A"
},
{
"expr": "sum(autoscaler_panic_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Average Panic RPS",
"refId": "B"
},
{
"expr": "sum(autoscaler_target_requests_per_second{namespace_name=\"$namespace\", configuration_name=\"$configuration\", revision_name=\"$revision\"})",
"format": "time_series",
"interval": "1s",
"intervalFactor": 1,
"legendFormat": "Target RPS",
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Observed RPS",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
},
{
"aliasColors": {},
"bars": false,
Expand Down
4 changes: 3 additions & 1 deletion pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount
switch spec.ScalingMetric {
case autoscaling.RPS:
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
// TODO: Add metrics for RPS used in autoscaler.
a.reporter.ReportStableRPS(observedStableValue)
a.reporter.ReportPanicRPS(observedPanicValue)
a.reporter.ReportTargetRPS(spec.TargetValue)
default:
metricName = autoscaling.Concurrency // concurrency is used by default
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
Expand Down
15 changes: 15 additions & 0 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ func (r *mockReporter) ReportPanicRequestConcurrency(v float64) error {
return nil
}

// ReportStableRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportStableRPS(v float64) error {
return nil
}

// ReportPanicRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportPanicRPS(v float64) error {
return nil
}

// ReportTargetRPS of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTargetRPS(v float64) error {
return nil
}

// ReportTargetRequestConcurrency of a mockReporter does nothing and return nil for error.
func (r *mockReporter) ReportTargetRequestConcurrency(v float64) error {
return nil
Expand Down
22 changes: 15 additions & 7 deletions pkg/autoscaler/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ var (
Namespaced: true,
Metric: autoscaling.Concurrency,
}
rpsMetricInfo = provider.CustomMetricInfo{
GroupResource: v1alpha1.Resource("revisions"),
Namespaced: true,
Metric: autoscaling.RPS,
}

errMetricNotSupported = errors.New("metric not supported")
errNotImplemented = errors.New("not implemented")
Expand All @@ -59,23 +64,26 @@ func NewMetricProvider(metricClient MetricClient) *MetricProvider {

// GetMetricByName implements the interface.
func (p *MetricProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*cmetrics.MetricValue, error) {
if !cmp.Equal(info, concurrencyMetricInfo) {
now := time.Now()
var data float64
var err error
if cmp.Equal(info, concurrencyMetricInfo) {
data, _, err = p.metricClient.StableAndPanicConcurrency(name, now)
} else if cmp.Equal(info, rpsMetricInfo) {
data, _, err = p.metricClient.StableAndPanicRPS(name, now)
} else {
return nil, errMetricNotSupported
}

now := time.Now()
concurrency, _, err := p.metricClient.StableAndPanicConcurrency(name, now)
if err != nil {
return nil, err
}
value := *resource.NewQuantity(int64(math.Ceil(concurrency)), resource.DecimalSI)

return &cmetrics.MetricValue{
Metric: cmetrics.MetricIdentifier{
Name: info.Metric,
},
Timestamp: metav1.Time{Time: now},
Value: value,
Value: *resource.NewQuantity(int64(math.Ceil(data)), resource.DecimalSI),
}, nil
}

Expand All @@ -86,5 +94,5 @@ func (p *MetricProvider) GetMetricBySelector(namespace string, selector labels.S

// ListAllMetrics implements the interface.
func (p *MetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
return []provider.CustomMetricInfo{concurrencyMetricInfo}
return []provider.CustomMetricInfo{concurrencyMetricInfo, rpsMetricInfo}
}
34 changes: 27 additions & 7 deletions pkg/autoscaler/metrics_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func TestGetMetricByName(t *testing.T) {
info: concurrencyMetricInfo,
},
want: 11,
}, {
name: "all good (RPS)",
args: args{
name: types.NamespacedName{Namespace: existingNamespace, Name: "test"},
info: rpsMetricInfo,
},
want: 14,
}, {
name: "requesting unsupported metric",
args: args{
Expand All @@ -74,7 +81,7 @@ func TestGetMetricByName(t *testing.T) {
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewMetricProvider(staticConcurrency(10.3))
p := NewMetricProvider(staticMetrics(10.3, 14))
got, err := p.GetMetricByName(tt.args.name, tt.args.info)
if (err != nil) != tt.wantErr {
t.Errorf("GetMetricByName() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -92,27 +99,40 @@ func TestGetMetricByName(t *testing.T) {
}

func TestGetMetricBySelector(t *testing.T) {
provider := NewMetricProvider(staticConcurrency(10.0))
provider := NewMetricProvider(staticMetrics(10.0, 14))
_, got := provider.GetMetricBySelector("foo", labels.NewSelector(), concurrencyMetricInfo)
if got != errNotImplemented {
t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented)
}

_, got = provider.GetMetricBySelector("foo", labels.NewSelector(), rpsMetricInfo)
if got != errNotImplemented {
t.Errorf("GetMetricBySelector() = %v, want %v", got, errNotImplemented)
}
}

func TestListAllMetrics(t *testing.T) {
provider := NewMetricProvider(staticConcurrency(10.0))
got := provider.ListAllMetrics()[0]
provider := NewMetricProvider(staticMetrics(10.0, 14))
gotConcurrency := provider.ListAllMetrics()[0]

if equal, err := kmp.SafeEqual(gotConcurrency, concurrencyMetricInfo); err != nil {
t.Errorf("Got error comparing output, err = %v", err)
} else if !equal {
t.Errorf("ListAllMetrics() = %v, want %v", gotConcurrency, concurrencyMetricInfo)
}

if equal, err := kmp.SafeEqual(got, concurrencyMetricInfo); err != nil {
gotRPS := provider.ListAllMetrics()[1]
if equal, err := kmp.SafeEqual(gotRPS, rpsMetricInfo); err != nil {
t.Errorf("Got error comparing output, err = %v", err)
} else if !equal {
t.Errorf("ListAllMetrics() = %v, want %v", got, concurrencyMetricInfo)
t.Errorf("ListAllMetrics() = %v, want %v", gotRPS, rpsMetricInfo)
}
}

func staticConcurrency(concurrency float64) MetricClient {
func staticMetrics(concurrency, rps float64) MetricClient {
return &fake.MetricClient{
StableConcurrency: concurrency,
StableRPS: rps,
ErrF: func(key types.NamespacedName, now time.Time) error {
if key.Namespace != existingNamespace {
return errors.New("doesn't exist")
Expand Down
49 changes: 49 additions & 0 deletions pkg/autoscaler/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ var (
"target_concurrency_per_pod",
"The desired number of concurrent requests for each pod",
stats.UnitDimensionless)
stableRPSM = stats.Float64(
"stable_requests_per_second",
"Average requests-per-second per observed pod over the stable window",
stats.UnitDimensionless)
panicRPSM = stats.Float64(
"panic_requests_per_second",
"Average requests-per-second per observed pod over the panic window",
stats.UnitDimensionless)
targetRPSM = stats.Float64(
"target_requests_per_second",
"The desired requests-per-second for each pod",
stats.UnitDimensionless)
panicM = stats.Int64(
"panic_mode",
"1 if autoscaler is in panic mode, 0 otherwise",
Expand Down Expand Up @@ -147,6 +159,24 @@ func register() {
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "Average requests-per-second over the stable window",
Measure: stableRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "Average requests-per-second over the panic window",
Measure: panicRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
&view.View{
Description: "The desired requests-per-second for each pod",
Measure: targetRPSM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{namespaceTagKey, serviceTagKey, configTagKey, revisionTagKey},
},
)
if err != nil {
panic(err)
Expand All @@ -161,6 +191,9 @@ type StatsReporter interface {
ReportStableRequestConcurrency(v float64) error
ReportPanicRequestConcurrency(v float64) error
ReportTargetRequestConcurrency(v float64) error
ReportStableRPS(v float64) error
ReportPanicRPS(v float64) error
ReportTargetRPS(v float64) error
ReportExcessBurstCapacity(v float64) error
ReportPanic(v int64) error
}
Expand Down Expand Up @@ -235,6 +268,22 @@ func (r *Reporter) ReportTargetRequestConcurrency(v float64) error {
return r.report(targetRequestConcurrencyM.M(v))
}

// ReportStableRPS captures value v for stable RPS measure.
func (r *Reporter) ReportStableRPS(v float64) error {
return r.report(stableRPSM.M(v))
}

// ReportPanicRPS captures value v for panic RPS measure.
func (r *Reporter) ReportPanicRPS(v float64) error {
return r.report(panicRPSM.M(v))
}

// ReportTargetRPS captures value v for target requests-per-second measure.
func (r *Reporter) ReportTargetRPS(v float64) error {
return r.report(targetRPSM.M(v))

}

// ReportPanic captures value v for panic mode measure.
func (r *Reporter) ReportPanic(v int64) error {
return r.report(panicM.M(v))
Expand Down
11 changes: 10 additions & 1 deletion pkg/autoscaler/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestReporterReport(t *testing.T) {
expectSuccess(t, "ReportStableRequestConcurrency", func() error { return r.ReportStableRequestConcurrency(2) })
expectSuccess(t, "ReportPanicRequestConcurrency", func() error { return r.ReportPanicRequestConcurrency(3) })
expectSuccess(t, "ReportTargetRequestConcurrency", func() error { return r.ReportTargetRequestConcurrency(0.9) })
expectSuccess(t, "ReportStableRPS", func() error { return r.ReportStableRPS(5) })
expectSuccess(t, "ReportPanicRPS", func() error { return r.ReportPanicRPS(6) })
expectSuccess(t, "ReportTargetRPS", func() error { return r.ReportTargetRPS(7) })
expectSuccess(t, "ReportExcessBurstCapacity", func() error { return r.ReportExcessBurstCapacity(19.84) })
metricstest.CheckLastValueData(t, "desired_pods", wantTags, 10)
metricstest.CheckLastValueData(t, "requested_pods", wantTags, 7)
Expand All @@ -71,6 +74,9 @@ func TestReporterReport(t *testing.T) {
metricstest.CheckLastValueData(t, "excess_burst_capacity", wantTags, 19.84)
metricstest.CheckLastValueData(t, "panic_request_concurrency", wantTags, 3)
metricstest.CheckLastValueData(t, "target_concurrency_per_pod", wantTags, 0.9)
metricstest.CheckLastValueData(t, "stable_requests_per_second", wantTags, 5)
metricstest.CheckLastValueData(t, "panic_requests_per_second", wantTags, 6)
metricstest.CheckLastValueData(t, "target_requests_per_second", wantTags, 7)

// All the stats are gauges - record multiple entries for one stat - last one should stick
expectSuccess(t, "ReportDesiredPodCount", func() error { return r.ReportDesiredPodCount(1) })
Expand Down Expand Up @@ -129,6 +135,9 @@ func resetMetrics() {
panicRequestConcurrencyM.Name(),
excessBurstCapacityM.Name(),
targetRequestConcurrencyM.Name(),
panicM.Name())
panicM.Name(),
stableRPSM.Name(),
panicRPSM.Name(),
targetRPSM.Name())
register()
}

0 comments on commit aead62f

Please sign in to comment.