diff --git a/test/channelz_test.go b/test/channelz_test.go index 63f703c3128d..aa29198b260b 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1165,25 +1165,32 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { } func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { + const keepaliveRate = 50 * time.Millisecond czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime) - internal.KeepaliveMinPingTime = time.Second + internal.KeepaliveMinPingTime = keepaliveRate e := tcpClearRREnv te := newTest(t, e) te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams( keepalive.ClientParameters{ - Time: time.Second, + Time: keepaliveRate, Timeout: 500 * time.Millisecond, PermitWithoutStream: true, })) te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy( keepalive.EnforcementPolicy{ - MinTime: 500 * time.Millisecond, + MinTime: keepaliveRate, PermitWithoutStream: true, })) te.startServer(&testServer{security: e.security}) - te.clientConn() // Dial the server + cc := te.clientConn() // Dial the server + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + start := time.Now() + // Wait for at least two keepalives to be able to occur. + time.Sleep(2 * keepaliveRate) defer te.tearDown() if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0, 0) @@ -1208,8 +1215,9 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { break } skt := channelz.GetSocket(id) - if skt.SocketData.KeepAlivesSent != 2 { - return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) + want := int64(time.Since(start) / keepaliveRate) + if skt.SocketData.KeepAlivesSent != want { + return false, fmt.Errorf("there should be %v KeepAlives sent, not %d", want, skt.SocketData.KeepAlivesSent) } return true, nil }); err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 5e70acdd767b..cde1edef5ee3 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -4212,7 +4212,7 @@ func (s) TestFlowControlLogicalRace(t *testing.T) { maxFailures = 3 ) - requestCount := 10000 + requestCount := 3000 if raceMode { requestCount = 1000 } @@ -4247,33 +4247,22 @@ func (s) TestFlowControlLogicalRace(t *testing.T) { t.Fatalf("StreamingOutputCall; err = %q", err) } - j := 0 - loop: - for ; j < recvCount; j++ { - _, err := output.Recv() - if err != nil { - if err == io.EOF { - break loop - } - switch status.Code(err) { - case codes.DeadlineExceeded: - break loop - default: - t.Fatalf("Recv; err = %q", err) + for j := 0; j < recvCount; j++ { + if _, err := output.Recv(); err != nil { + if err == io.EOF || status.Code(err) == codes.DeadlineExceeded { + t.Errorf("got %d responses to request %d", j, i) + failures++ + break } + t.Fatalf("Recv; err = %q", err) } } cancel() - <-ctx.Done() - - if j < recvCount { - t.Errorf("got %d responses to request %d", j, i) - failures++ - if failures >= maxFailures { - // Continue past the first failure to see if the connection is - // entirely broken, or if only a single RPC was affected - break - } + + if failures >= maxFailures { + // Continue past the first failure to see if the connection is + // entirely broken, or if only a single RPC was affected + t.Fatalf("Too many failures received; aborting") } } }