Skip to content

Commit

Permalink
Add Metrics Recorder List and usage in ClientConn
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jul 19, 2024
1 parent 2bcbcab commit c777496
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 7 deletions.
5 changes: 5 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -256,6 +257,10 @@ type BuildOptions struct {
// same resolver.Target as passed to the resolver. See the documentation for
// the resolver.Target type for details about what it contains.
Target resolver.Target
// MetricsRecorder is the metrics recorder that balancers can use to record
// metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this field.
MetricsRecorder estats.MetricsRecorder
}

// Builder creates a balancer.
Expand Down
1 change: 1 addition & 0 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
Expand Down
19 changes: 12 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -195,8 +196,11 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)

cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)

return cc, nil
}

Expand Down Expand Up @@ -591,13 +595,14 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.

// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
authority string // See initAuthority().
dopts dialOptions // Default and user specified dial options.
channelz *channelz.Channel // Channelz object.
resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
idlenessMgr *idle.Manager
target string // User's dial target.
parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
authority string // See initAuthority().
dopts dialOptions // Default and user specified dial options.
channelz *channelz.Channel // Channelz object.
resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down
100 changes: 100 additions & 0 deletions internal/stats/metrics_recorder_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stats

import (
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/stats"
)

var logger = grpclog.Component("metrics-recorder-list")

// MetricsRecorderList forwards Record calls to all of it's metricsRecorders.
//
// It eats any record calls where the label values provided do not match the
// number of label keys.
type MetricsRecorderList struct {
// metricsRecorders are the metrics recorders this list will forward to.
metricsRecorders []estats.MetricsRecorder
}

// NewMetricsRecorderList creates a new metric recorder list with all the stats
// handlers provided which implement the MetricsRecorder interface.
// If no stats handlers provided implement the MetricsRecorder interface,
// the MetricsRecorder list returned is a no-op.
func NewMetricsRecorderList(shs []stats.Handler) *MetricsRecorderList {
var mrs []estats.MetricsRecorder
for _, sh := range shs {
if mr, ok := sh.(estats.MetricsRecorder); ok {
mrs = append(mrs, mr)
}
}
return &MetricsRecorderList{
metricsRecorders: mrs,
}
}

func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {
if got, want := len(handle.Labels)+len(handle.OptionalLabels), len(labels); got != want {
logger.Infof("length of labels passed to RecordInt64Count incorrect got: %v, want: %v", got, want)

Check warning on line 54 in internal/stats/metrics_recorder_list.go

View check run for this annotation

Codecov / codecov/patch

internal/stats/metrics_recorder_list.go#L54

Added line #L54 was not covered by tests
}

for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordInt64Count(handle, incr, labels...)
}
}

func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
if got, want := len(handle.Labels)+len(handle.OptionalLabels), len(labels); got != want {
logger.Infof("length of labels passed to RecordFloat64Count incorrect got: %v, want: %v", got, want)

Check warning on line 64 in internal/stats/metrics_recorder_list.go

View check run for this annotation

Codecov / codecov/patch

internal/stats/metrics_recorder_list.go#L64

Added line #L64 was not covered by tests
}

for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordFloat64Count(handle, incr, labels...)
}
}

func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {
if got, want := len(handle.Labels)+len(handle.OptionalLabels), len(labels); got != want {
logger.Infof("length of labels passed to RecordInt64Histo incorrect got: %v, want: %v", got, want)

Check warning on line 74 in internal/stats/metrics_recorder_list.go

View check run for this annotation

Codecov / codecov/patch

internal/stats/metrics_recorder_list.go#L74

Added line #L74 was not covered by tests
}

for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordInt64Histo(handle, incr, labels...)
}
}

func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {
if got, want := len(handle.Labels)+len(handle.OptionalLabels), len(labels); got != want {
logger.Infof("length of labels passed to RecordFloat64Histo incorrect got: %v, want: %v", got, want)

Check warning on line 84 in internal/stats/metrics_recorder_list.go

View check run for this annotation

Codecov / codecov/patch

internal/stats/metrics_recorder_list.go#L84

Added line #L84 was not covered by tests
}

for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordFloat64Histo(handle, incr, labels...)
}
}

func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {
if got, want := len(handle.Labels)+len(handle.OptionalLabels), len(labels); got != want {
logger.Infof("length of labels passed to RecordInt64Gauge incorrect got: %v, want: %v", got, want)
}

for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordInt64Gauge(handle, incr, labels...)
}
}
133 changes: 133 additions & 0 deletions internal/stats/test/metrics_recorder_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package test implements an e2e test for the Metrics Recorder List component
// of the Client Conn, and a TestMetricsRecorder utility.
package test

import (
"context"
"log"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)

var defaultTestTimeout = 5 * time.Second

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestMetricsRecorderList tests the metrics recorder list functionality of the
// ClientConn. It configures a global and local stats handler Dial Option. These
// stats handlers implement the MetricsRecorder interface. It also configures a
// balancer which registers metrics and records on metrics at build time. This
// test then asserts that the recorded metrics show up on both configured stats
// handlers, and that metrics calls with the incorrect number of labels do not
// make their way to stats handlers.
func (s) TestMetricsRecorderList(t *testing.T) {
mr := manual.NewBuilderWithScheme("test-metrics-recorder-list")
defer mr.Close()

json := `{"loadBalancingConfig": [{"recording_load_balancer":{}}]}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
ServiceConfig: sc,
})

// Create two stats.Handlers which also implement MetricsRecorder, configure
// one as a global dial option and one as a local dial option.
mr1 := NewTestMetricsRecorder(t, []string{})
mr2 := NewTestMetricsRecorder(t, []string{})

defer internal.ClearGlobalDialOptions()
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1))

cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(mr2))
if err != nil {
log.Fatalf("Failed to dial: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Trigger the recording_load_balancer to build, which will trigger metrics
// to record.
tsc.UnaryCall(ctx, &testpb.SimpleRequest{})

mdWant := MetricsData{
Handle: (*estats.MetricDescriptor)(intCountHandle),
IntIncr: 1,
LabelKeys: []string{"int counter label", "int counter optional label"},
LabelVals: []string{"int counter label val", "int counter optional label val"},
}
mr1.WaitForInt64Count(ctx, mdWant)
mr2.WaitForInt64Count(ctx, mdWant)

mdWant = MetricsData{
Handle: (*estats.MetricDescriptor)(floatCountHandle),
FloatIncr: 2,
LabelKeys: []string{"float counter label", "float counter optional label"},
LabelVals: []string{"float counter label val", "float counter optional label val"},
}
mr1.WaitForFloat64Count(ctx, mdWant)
mr2.WaitForFloat64Count(ctx, mdWant)

mdWant = MetricsData{
Handle: (*estats.MetricDescriptor)(intHistoHandle),
IntIncr: 3,
LabelKeys: []string{"int histo label", "int histo optional label"},
LabelVals: []string{"int histo label val", "int histo optional label val"},
}
mr1.WaitForInt64Histo(ctx, mdWant)
mr2.WaitForInt64Histo(ctx, mdWant)

mdWant = MetricsData{
Handle: (*estats.MetricDescriptor)(floatHistoHandle),
FloatIncr: 4,
LabelKeys: []string{"float histo label", "float histo optional label"},
LabelVals: []string{"float histo label val", "float histo optional label val"},
}
mr1.WaitForFloat64Histo(ctx, mdWant)
mr2.WaitForFloat64Histo(ctx, mdWant)
mdWant = MetricsData{
Handle: (*estats.MetricDescriptor)(intGaugeHandle),
IntIncr: 5, // Should ignore the 7 metrics recording point because emits wrong number of labels.
LabelKeys: []string{"int gauge label", "int gauge optional label"},
LabelVals: []string{"int gauge label val", "int gauge optional label val"},
}
mr1.WaitForInt64Gauge(ctx, mdWant)
mr2.WaitForInt64Gauge(ctx, mdWant)
}
Loading

0 comments on commit c777496

Please sign in to comment.