From 05243a09ed9477a5c5a7422f21802a7cf689c755 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Wed, 7 Jun 2023 12:06:37 -0600 Subject: [PATCH 1/4] Fix preloader mode in benchmarks --- benchmark/benchmain/main.go | 91 +++++++++++++++++++++++-------------- benchmark/benchmark.go | 56 ++++++++++++++++++++--- 2 files changed, 107 insertions(+), 40 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 971a2d453c9a..d3a99925fee0 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -53,6 +53,7 @@ import ( "reflect" "runtime" "runtime/pprof" + "strconv" "strings" "sync" "sync/atomic" @@ -81,7 +82,8 @@ var ( traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, - fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will bi ignored in unary mode", + strings.Join(allToggleModes, ", ")), allToggleModes) channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) compressorMode = flags.StringWithAllowedValues("compression", compModeOff, @@ -386,20 +388,11 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { } func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { - clients, cleanup := makeClients(bf) - - streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) - for cn := 0; cn < bf.Connections; cn++ { - tc := clients[cn] - streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) - for pos := 0; pos < bf.MaxConcurrentCalls; pos++ { + streams, req, cleanup := setupStream(bf, workloadsStreaming) - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - streams[cn][pos] = stream - } + var preparedMsg [][]*grpc.PreparedMsg + if bf.EnablePreloader { + preparedMsg = prepareMessages(streams, req) } return func(cn, pos int) { @@ -411,24 +404,25 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { if bf.RespPayloadCurve != nil { respSizeBytes = bf.RespPayloadCurve.ChooseRandom() } - streamCaller(streams[cn][pos], reqSizeBytes, respSizeBytes) + var req interface{} + if bf.EnablePreloader { + req = preparedMsg[cn][pos] + } else { + pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, reqSizeBytes) + req = &testpb.SimpleRequest{ + ResponseType: pl.Type, + ResponseSize: int32(respSizeBytes), + Payload: pl, + } + } + streamCaller(streams[cn][pos], req) }, cleanup } func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupUnconstrainedStream(bf) + streams, req, cleanup := setupStream(bf, workloadsUnconstrained) - preparedMsg := make([][]*grpc.PreparedMsg, len(streams)) - for cn, connStreams := range streams { - preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams)) - for pos, stream := range connStreams { - preparedMsg[cn][pos] = &grpc.PreparedMsg{} - err := preparedMsg[cn][pos].Encode(stream, req) - if err != nil { - logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err) - } - } - } + preparedMsg := prepareMessages(streams, req) return func(cn, pos int) { streams[cn][pos].SendMsg(preparedMsg[cn][pos]) @@ -438,7 +432,7 @@ func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRe } func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupUnconstrainedStream(bf) + streams, req, cleanup := setupStream(bf, workloadsUnconstrained) return func(cn, pos int) { streams[cn][pos].Send(req) @@ -447,13 +441,21 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r }, cleanup } -func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { +func setupStream(bf stats.Features, mode string) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { clients, cleanup := makeClients(bf) streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) - md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", - benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) - ctx := metadata.NewOutgoingContext(context.Background(), md) + ctx := context.Background() + if mode == workloadsUnconstrained { + md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", + benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + ctx = metadata.NewOutgoingContext(ctx, md) + } + if bf.EnablePreloader { + md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), + benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + ctx = metadata.NewOutgoingContext(ctx, md) + } for cn := 0; cn < bf.Connections; cn++ { tc := clients[cn] streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) @@ -476,6 +478,21 @@ func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_ return streams, req, cleanup } +func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg { + preparedMsg := make([][]*grpc.PreparedMsg, len(streams)) + for cn, connStreams := range streams { + preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams)) + for pos, stream := range connStreams { + preparedMsg[cn][pos] = &grpc.PreparedMsg{} + err := preparedMsg[cn][pos].Encode(stream, req) + if err != nil { + logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err) + } + } + } + return preparedMsg +} + // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and // request and response sizes. func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) { @@ -484,8 +501,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) } } -func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) { - if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { +func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) { + if err := bm.DoStreamingRoundTripPreloaded(stream, req); err != nil { logger.Fatalf("DoStreamingRoundTrip failed: %v", err) } } @@ -770,6 +787,9 @@ func processFlags() *benchOpts { if len(opts.features.reqSizeBytes) != 0 { log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time") } + if len(opts.features.enablePreloader) != 0 { + log.Fatalf("you may not specify -reqPayloadCurveFiles and -preloader at the same time") + } for _, file := range *reqPayloadCurveFiles { pc, err := stats.NewPayloadCurve(file) if err != nil { @@ -787,6 +807,9 @@ func processFlags() *benchOpts { if len(opts.features.respSizeBytes) != 0 { log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time") } + if len(opts.features.enablePreloader) != 0 { + log.Fatalf("you may not specify -respPayloadCurveFiles and -preloader at the same time") + } for _, file := range *respPayloadCurveFiles { pc, err := stats.NewPayloadCurve(file) if err != nil { diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 2e11167004db..551482ee2934 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -28,6 +28,7 @@ import ( "log" "math/rand" "net" + "strconv" "time" "google.golang.org/grpc" @@ -83,13 +84,33 @@ const UnconstrainedStreamingHeader = "unconstrained-streaming" // the server should sleep between consecutive RPC responses. const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay" +const PreloadMsgSizeHeader = "preload-msg-size" + func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error { + preloadMsgSize := 0 + if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[PreloadMsgSizeHeader]) != 0 { + val := md[PreloadMsgSizeHeader][0] + var err error + preloadMsgSize, err = strconv.Atoi(val) + if err != nil { + return fmt.Errorf("can't parse %q header: %s", PreloadMsgSizeHeader, err) + } + } + if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 { - return s.UnconstrainedStreamingCall(stream) + return s.UnconstrainedStreamingCall(stream, preloadMsgSize) } response := &testpb.SimpleResponse{ Payload: new(testpb.Payload), } + preloadedResponse := &grpc.PreparedMsg{} + if preloadMsgSize > 0 { + setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize) + err := preloadedResponse.Encode(stream, response) + if err != nil { + return err + } + } in := new(testpb.SimpleRequest) for { // use ServerStream directly to reuse the same testpb.SimpleRequest object @@ -101,14 +122,19 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal if err != nil { return err } - setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) - if err := stream.Send(response); err != nil { + if preloadMsgSize > 0 { + err = stream.SendMsg(preloadedResponse) + } else { + setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) + err = stream.Send(response) + } + if err != nil { return err } } } -func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error { +func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer, preloadMsgSize int) error { maxSleep := 0 if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 { val := md[UnconstrainedStreamingDelayHeader][0] @@ -135,6 +161,14 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService } setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) + preloadedResponse := &grpc.PreparedMsg{} + if preloadMsgSize > 0 { + err := preloadedResponse.Encode(stream, response) + if err != nil { + return err + } + } + go func() { for { // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. @@ -154,7 +188,12 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService if maxSleep > 0 { time.Sleep(time.Duration(rand.Intn(maxSleep))) } - err := stream.Send(response) + var err error + if preloadMsgSize > 0 { + err = stream.SendMsg(preloadedResponse) + } else { + err = stream.Send(response) + } switch status.Code(err) { case codes.Unavailable, codes.Canceled: return @@ -258,7 +297,12 @@ func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, ResponseSize: int32(respSize), Payload: pl, } - if err := stream.Send(req); err != nil { + return DoStreamingRoundTripPreloaded(stream, req) +} + +func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) error { + // req could be either *testpb.SimpleRequest or *grpc.PreparedMsg + if err := stream.SendMsg(req); err != nil { return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want ", err) } if _, err := stream.Recv(); err != nil { From 9bb807b35e3497dcaeaabd785a8ccebcbd37765f Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Sun, 11 Jun 2023 18:29:14 -0600 Subject: [PATCH 2/4] Add comments --- benchmark/benchmark.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 551482ee2934..23516ed1eafe 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -84,6 +84,9 @@ const UnconstrainedStreamingHeader = "unconstrained-streaming" // the server should sleep between consecutive RPC responses. const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay" +// PreloadMsgSizeHeader indicates that the client is going to ask for +// a fixed response size and passes this size to the server. +// The server is expected to preload the response on startup. const PreloadMsgSizeHeader = "preload-msg-size" func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error { @@ -300,6 +303,7 @@ func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, return DoStreamingRoundTripPreloaded(stream, req) } +// DoStreamingRoundTrip performs a round trip for a single streaming rpc with preloaded payload. func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) error { // req could be either *testpb.SimpleRequest or *grpc.PreparedMsg if err := stream.SendMsg(req); err != nil { From 4d8bddad6fa4ecda2a907ab0f3c82c8ccf64cae3 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Sun, 11 Jun 2023 20:39:44 -0600 Subject: [PATCH 3/4] fix comment --- benchmark/benchmark.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 23516ed1eafe..3e3f3c8d916d 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -303,7 +303,7 @@ func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, return DoStreamingRoundTripPreloaded(stream, req) } -// DoStreamingRoundTrip performs a round trip for a single streaming rpc with preloaded payload. +// DoStreamingRoundTripPreloaded performs a round trip for a single streaming rpc with preloaded payload. func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) error { // req could be either *testpb.SimpleRequest or *grpc.PreparedMsg if err := stream.SendMsg(req); err != nil { From b6d63fed905d1f1c8c087212b9ae03e045d5cdbc Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Tue, 27 Jun 2023 12:58:30 -0600 Subject: [PATCH 4/4] fix style errors --- benchmark/benchmain/main.go | 21 +++++++++------------ benchmark/benchmark.go | 8 +++----- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 1044aeef71dc..f4b96a5d460b 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -82,7 +82,7 @@ var ( traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, - fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will bi ignored in unary mode", + fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode", strings.Join(allToggleModes, ", ")), allToggleModes) channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) @@ -403,7 +403,7 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { } func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { - streams, req, cleanup := setupStream(bf, workloadsStreaming) + streams, req, cleanup := setupStream(bf, false) var preparedMsg [][]*grpc.PreparedMsg if bf.EnablePreloader { @@ -435,7 +435,7 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { } func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupStream(bf, workloadsUnconstrained) + streams, req, cleanup := setupStream(bf, true) preparedMsg := prepareMessages(streams, req) @@ -447,7 +447,7 @@ func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRe } func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupStream(bf, workloadsUnconstrained) + streams, req, cleanup := setupStream(bf, true) return func(cn, pos int) { streams[cn][pos].Send(req) @@ -456,19 +456,17 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r }, cleanup } -func setupStream(bf stats.Features, mode string) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { +func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { clients, cleanup := makeClients(bf) streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) ctx := context.Background() - if mode == workloadsUnconstrained { - md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", - benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + if unconstrained { + md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) ctx = metadata.NewOutgoingContext(ctx, md) } if bf.EnablePreloader { - md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), - benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) ctx = metadata.NewOutgoingContext(ctx, md) } for cn := 0; cn < bf.Connections; cn++ { @@ -499,8 +497,7 @@ func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams)) for pos, stream := range connStreams { preparedMsg[cn][pos] = &grpc.PreparedMsg{} - err := preparedMsg[cn][pos].Encode(stream, req) - if err != nil { + if err := preparedMsg[cn][pos].Encode(stream, req); err != nil { logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err) } } diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 3e3f3c8d916d..27101954aa30 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -96,7 +96,7 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal var err error preloadMsgSize, err = strconv.Atoi(val) if err != nil { - return fmt.Errorf("can't parse %q header: %s", PreloadMsgSizeHeader, err) + return fmt.Errorf("%q header value is not an integer: %s", PreloadMsgSizeHeader, err) } } @@ -109,8 +109,7 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal preloadedResponse := &grpc.PreparedMsg{} if preloadMsgSize > 0 { setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize) - err := preloadedResponse.Encode(stream, response) - if err != nil { + if err := preloadedResponse.Encode(stream, response); err != nil { return err } } @@ -166,8 +165,7 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService preloadedResponse := &grpc.PreparedMsg{} if preloadMsgSize > 0 { - err := preloadedResponse.Encode(stream, response) - if err != nil { + if err := preloadedResponse.Encode(stream, response); err != nil { return err } }