diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index e1965d008060..b994322dc52e 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -623,8 +623,7 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { cfg := oobConfig cfg.OOBReportingPeriod = stringp("60s") sc := svcConfig(t, cfg) - mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { t.Fatalf("Error starting client: %v", err) } addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go index ba22777e16cb..e6079d67d5f1 100644 --- a/balancer/weightedroundrobin/metrics_test.go +++ b/balancer/weightedroundrobin/metrics_test.go @@ -39,63 +39,94 @@ func Test(t *testing.T) { // on a weighted SubConn, and expects certain metrics for each of these // scenarios. func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) - - wsc := &weightedSubConn{ - metricsRecorder: tmr, - weightVal: 3, + tests := []struct { + name string + weightExpirationPeriod time.Duration + blackoutPeriod time.Duration + lastUpdatedSet bool + nonEmptySet bool + nowTime time.Time + endpointWeightStaleWant float64 + endpointWeightNotYetUsableWant float64 + endpointWeightWant float64 + }{ + // The weighted SubConn's lastUpdated field hasn't been set, so this + // SubConn's weight is not yet usable. Thus, should emit that endpoint + // weight is not yet usable, and 0 for weight. + { + name: "no weight set", + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "weight expiration", + lastUpdatedSet: true, + weightExpirationPeriod: 2 * time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(100 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, + { + name: "in blackout period", + lastUpdatedSet: true, + weightExpirationPeriod: time.Minute, + blackoutPeriod: 10 * time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "normal weight", + lastUpdatedSet: true, + nonEmptySet: true, + weightExpirationPeriod: time.Minute, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 3, + }, + { + name: "weight expiration takes precdedence over blackout", + lastUpdatedSet: true, + nonEmptySet: true, + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Minute, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, } - // The weighted SubConn's lastUpdated field hasn't been set, so this - // SubConn's weight is not yet usable. Thus, should emit that endpoint - // weight is not yet usable, and 0 for weight. - wsc.weight(time.Now(), time.Second, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0. - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() - - // Setup a scenario where the SubConn's weight expires. Thus, should emit - // that endpoint weight is stale, and 0 for weight. - wsc.lastUpdated = time.Now() - wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() - - // Setup a scenario where the SubConn's weight is in the blackout period. - // Thus, should emit that endpoint weight is not yet usable, and 0 for - // weight. - wsc.weight(time.Now(), time.Minute, 10*time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 3, + } + if test.lastUpdatedSet { + wsc.lastUpdated = time.Now() + } + if test.nonEmptySet { + wsc.nonEmptySince = time.Now() + } + wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true) - // Setup a scenario where SubConn's weight is what is persists in weight - // field. This is triggered by last update being past blackout period and - // before weight update period. Should thus emit that endpoint weight is 3, - // and no other metrics. - wsc.nonEmptySince = time.Now() - wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3) - tmr.ClearMetrics() + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant) + }) + } - // Setup a scenario where a SubConn's weight both expires and is within the - // blackout period. In this case, weight expiry should take precedence with - // respect to metrics emitted. Thus, should emit that endpoint weight is not - // yet usable, and 0 for weight. - wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() } // TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index ff8e41cf47ac..580c538caa2f 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -53,11 +53,11 @@ func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder tmr := &TestMetricsRecorder{ t: t, - intCountCh: testutils.NewChannelWithSize(1000), - floatCountCh: testutils.NewChannelWithSize(1000), - intHistoCh: testutils.NewChannelWithSize(1000), - floatHistoCh: testutils.NewChannelWithSize(1000), - intGaugeCh: testutils.NewChannelWithSize(1000), + intCountCh: testutils.NewChannelWithSize(10), + floatCountCh: testutils.NewChannelWithSize(10), + intHistoCh: testutils.NewChannelWithSize(10), + floatHistoCh: testutils.NewChannelWithSize(10), + intGaugeCh: testutils.NewChannelWithSize(10), data: make(map[estats.Metric]float64), } @@ -79,17 +79,6 @@ func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal flo } } -// AssertEitherDataForMetric asserts either data point is present for metric. -// The zero value in the check is equivalent to unset. - -func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) { - r.mu.Lock() - defer r.mu.Unlock() - if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 { - r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2) - } -} - // PollForDataForMetric polls the metric data for the want. Fails if context // provided expires before data for metric is found. func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) { @@ -97,7 +86,7 @@ func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricNa r.mu.Lock() if r.data[estats.Metric(metricName)] == wantVal { r.mu.Unlock() - break + return } r.mu.Unlock() } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 7d64ae4841d3..e56c0fe94805 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -473,11 +473,9 @@ func (s) TestWRRMetrics(t *testing.T) { // scheduler. receivedExpectedMetrics := grpcsync.NewEvent() go func() { - for i := 0; i < 100; i++ { + for !receivedExpectedMetrics.HasFired() { client.EmptyCall(ctx, &testpb.Empty{}) - if receivedExpectedMetrics.HasFired() { - break - } + time.Sleep(2 * time.Millisecond) } }() @@ -554,10 +552,6 @@ func (s) TestWRRMetrics(t *testing.T) { }, } - if ctx.Err() != nil { - t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name) - } - if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil { t.Fatal(err) } diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go index 30b03cf927cd..f3b5d6e6ec43 100644 --- a/test/xds/xds_client_integration_test.go +++ b/test/xds/xds_client_integration_test.go @@ -24,16 +24,13 @@ import ( "testing" "time" - "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/testutils/xds/e2e/setup" - "google.golang.org/grpc/resolver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -52,38 +49,6 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) -// setupManagementServerAndResolver sets up an xDS management server, creates -// bootstrap configuration pointing to that server and creates an xDS resolver -// using that configuration. -// -// Registers a cleanup function on t to stop the management server. -// -// Returns the following: -// - the xDS management server -// - the node ID to use when talking to this management server -// - bootstrap configuration to use (if creating an xDS-enabled gRPC server) -// - xDS resolver builder (if creating an xDS-enabled gRPC client) -func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) { - // Start an xDS management server. - xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) - - // Create bootstrap configuration pointing to the above management server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address) - - // Create an xDS resolver with the above bootstrap configuration. - var r resolver.Builder - var err error - if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { - r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) - if err != nil { - t.Fatalf("Failed to create xDS resolver for testing: %v", err) - } - } - - return xdsServer, nodeID, bc, r -} - func (s) TestClientSideXDS(t *testing.T) { managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index 6ae920e07f5d..4df142f4a7c1 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -243,7 +243,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) { // Detection present in the CDS update, but with SuccessRateEjection unset, and // asserts that Outlier Detection is turned on and ejects upstreams. func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // Working backend 1. backend1 := stubserver.StartTestService(t, nil)