From e494e709d807ae9aefab16b1d989507a662e2793 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 21 Apr 2020 15:20:38 -0700 Subject: [PATCH 1/3] Add net benchmark harness --- go.mod | 1 + go.sum | 1 + net/bench.go | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+) create mode 100644 net/bench.go diff --git a/go.mod b/go.mod index d2d7ffe..a9bc53f 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.12 require ( github.com/libp2p/go-libp2p-core v0.3.0 github.com/multiformats/go-multiaddr v0.2.0 + google.golang.org/grpc v1.20.1 ) diff --git a/go.sum b/go.sum index 335592a..033c5ff 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/net/bench.go b/net/bench.go new file mode 100644 index 0000000..12e80b1 --- /dev/null +++ b/net/bench.go @@ -0,0 +1,115 @@ +package tnet + +import ( + "context" + "fmt" + "net" + "runtime" + "sync" + "testing" + "time" + + "google.golang.org/grpc/benchmark/latency" +) + +// NetworkTestFunc is a benchmark function under test by `FindNetworkLimit` +type NetworkTestFunc func(b *testing.B, n1, n2 net.Conn) + +// ConnectionForNetwork generates a pair of network connections with a specified latency. +func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) { + var wg sync.WaitGroup + wg.Add(1) + + var listener net.Listener + listener, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return + } + slowListener := n.Listener(listener) + go func() { + defer wg.Done() + n2, _ = slowListener.Accept() + slowListener.Close() + return + }() + baseDialer := net.Dialer{} + dialer := n.ContextDialer(baseDialer.DialContext) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + n1, err = dialer(ctx, "tcp4", slowListener.Addr().String()) + if err != nil { + return + } + + wg.Wait() + return +} + +// FindNetworkLimit benchmarks a function to analyze CPU, parallism, and network relationship +func FindNetworkLimit(testFunc NetworkTestFunc) (float64, error) { + maxProcs := runtime.GOMAXPROCS(0) + runtime.GOMAXPROCS(maxProcs) + if maxProcs > 1 { + runtime.GOMAXPROCS(1) + } + + network := latency.Network{ + Kbps: 0, + Latency: 0, + } + + wrapperFunc := func(sb *testing.B) { + n1, n2, err := ConnectionForNetwork(&network) + if err != nil { + sb.Error(err) + } + defer n1.Close() + defer n2.Close() + testFunc(sb, n1, n2) + } + + result := testing.Benchmark(wrapperFunc) + if result.N < 1 { + return 0.0, fmt.Errorf("failed to run benchmark") + } + max := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() + fmt.Printf("CPU Bound Limit: %s\n", result) + + current := max + network.Latency = 500 * time.Microsecond + for current > max*0.9 { + network.Latency *= 2 + result = testing.Benchmark(wrapperFunc) + current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() + } + fmt.Printf("Latency Bound Limit: %s\n", network.Latency) + + network.Kbps = 1024 * 100 // 100Mbps + network.Latency /= 2 + for current > max*0.9 { + network.Kbps /= 2 + result = testing.Benchmark(wrapperFunc) + current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() + } + fmt.Printf("Bandwidth Bound Limit: %dKbps\n", network.Kbps) + fmt.Printf("Network Bound Limit: %s\n", result) + + // Now, at a network-bounded level (divide bandwidth and latency by 10 as base) + // look at utilizations as a function of parallelism. + network.Latency *= 10 + network.Kbps /= 10 + prevBytes := int64(-1) + ratio := 1.0 + for i := 1; i <= maxProcs; i *= 2 { + runtime.GOMAXPROCS(i) + result = testing.Benchmark(wrapperFunc) + if prevBytes > 0 { + ratio = float64(result.Bytes) / float64(prevBytes) + } + prevBytes = result.Bytes + fmt.Printf("At MaxProc %d %dKbps / %s latency: %s\n", i, network.Kbps, network.Latency, result) + } + fmt.Printf("Slowdown is %f%%\n", 100*(1.0-ratio)) + return (1.0 - ratio), nil +} From 8775583591d809d2c6050675234071c823319652 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 21 Apr 2020 17:56:55 -0700 Subject: [PATCH 2/3] Break up / expose parameters more cleanly --- net/bench.go | 56 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/net/bench.go b/net/bench.go index 12e80b1..8d7aed8 100644 --- a/net/bench.go +++ b/net/bench.go @@ -46,21 +46,9 @@ func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) { return } -// FindNetworkLimit benchmarks a function to analyze CPU, parallism, and network relationship -func FindNetworkLimit(testFunc NetworkTestFunc) (float64, error) { - maxProcs := runtime.GOMAXPROCS(0) - runtime.GOMAXPROCS(maxProcs) - if maxProcs > 1 { - runtime.GOMAXPROCS(1) - } - - network := latency.Network{ - Kbps: 0, - Latency: 0, - } - - wrapperFunc := func(sb *testing.B) { - n1, n2, err := ConnectionForNetwork(&network) +func benchWithNet(testFunc NetworkTestFunc, n *latency.Network) func(*testing.B) { + return func(sb *testing.B) { + n1, n2, err := ConnectionForNetwork(n) if err != nil { sb.Error(err) } @@ -68,17 +56,27 @@ func FindNetworkLimit(testFunc NetworkTestFunc) (float64, error) { defer n2.Close() testFunc(sb, n1, n2) } +} + +// FindNetworkLimit benchmarks a function to analyze CPU and network relationship +func FindNetworkLimit(testFunc NetworkTestFunc, fractionOfMax float64) (int, time.Duration, error) { + network := latency.Network{ + Kbps: 0, + Latency: 0, + } + + wrapperFunc := benchWithNet(testFunc, &network) result := testing.Benchmark(wrapperFunc) if result.N < 1 { - return 0.0, fmt.Errorf("failed to run benchmark") + return 0, 0, fmt.Errorf("failed to run benchmark") } max := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() fmt.Printf("CPU Bound Limit: %s\n", result) current := max network.Latency = 500 * time.Microsecond - for current > max*0.9 { + for current > max*fractionOfMax { network.Latency *= 2 result = testing.Benchmark(wrapperFunc) current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() @@ -87,7 +85,7 @@ func FindNetworkLimit(testFunc NetworkTestFunc) (float64, error) { network.Kbps = 1024 * 100 // 100Mbps network.Latency /= 2 - for current > max*0.9 { + for current > max*fractionOfMax { network.Kbps /= 2 result = testing.Benchmark(wrapperFunc) current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() @@ -95,15 +93,27 @@ func FindNetworkLimit(testFunc NetworkTestFunc) (float64, error) { fmt.Printf("Bandwidth Bound Limit: %dKbps\n", network.Kbps) fmt.Printf("Network Bound Limit: %s\n", result) - // Now, at a network-bounded level (divide bandwidth and latency by 10 as base) - // look at utilizations as a function of parallelism. - network.Latency *= 10 - network.Kbps /= 10 + return network.Kbps, network.Latency, nil +} + +// ParallelismSlowdown tracks how much overhead is incurred on a ntework bound function when parallelism contentention +// in increased. +func ParallelismSlowdown(testFunc NetworkTestFunc, kbps int, l time.Duration) (slowdown float64, err error) { + maxProcs := runtime.GOMAXPROCS(0) + runtime.GOMAXPROCS(maxProcs) + if maxProcs > 1 { + runtime.GOMAXPROCS(1) + } + + network := latency.Network{ + Kbps: kbps, + Latency: l, + } prevBytes := int64(-1) ratio := 1.0 for i := 1; i <= maxProcs; i *= 2 { runtime.GOMAXPROCS(i) - result = testing.Benchmark(wrapperFunc) + result := testing.Benchmark(benchWithNet(testFunc, &network)) if prevBytes > 0 { ratio = float64(result.Bytes) / float64(prevBytes) } From d75c3b5bc45f28316e398b605e8b1432c3aff063 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 30 Apr 2020 14:43:16 -0700 Subject: [PATCH 3/3] track writes --- net/bench.go | 54 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/net/bench.go b/net/bench.go index 8d7aed8..63ce2cb 100644 --- a/net/bench.go +++ b/net/bench.go @@ -9,14 +9,26 @@ import ( "testing" "time" + "go.uber.org/atomic" "google.golang.org/grpc/benchmark/latency" ) +// WriteTrackedConn provides a wrapper for tracking how many write calls are made to a network connection. +type WriteTrackedConn struct { + net.Conn + WriteCount atomic.Uint32 +} + +func (c *WriteTrackedConn) Write(b []byte) (n int, err error) { + c.WriteCount.Inc() + return c.Conn.Write(b) +} + // NetworkTestFunc is a benchmark function under test by `FindNetworkLimit` type NetworkTestFunc func(b *testing.B, n1, n2 net.Conn) // ConnectionForNetwork generates a pair of network connections with a specified latency. -func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) { +func ConnectionForNetwork(n *latency.Network) (n1, n2 *WriteTrackedConn, err error) { var wg sync.WaitGroup wg.Add(1) @@ -28,8 +40,9 @@ func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) { slowListener := n.Listener(listener) go func() { defer wg.Done() - n2, _ = slowListener.Accept() + ac, _ := slowListener.Accept() slowListener.Close() + n2 = &WriteTrackedConn{ac, atomic.Uint32{}} return }() baseDialer := net.Dialer{} @@ -37,10 +50,11 @@ func ConnectionForNetwork(n *latency.Network) (n1, n2 net.Conn, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - n1, err = dialer(ctx, "tcp4", slowListener.Addr().String()) + conn, err := dialer(ctx, "tcp4", slowListener.Addr().String()) if err != nil { return } + n1 = &WriteTrackedConn{conn, atomic.Uint32{}} wg.Wait() return @@ -55,11 +69,12 @@ func benchWithNet(testFunc NetworkTestFunc, n *latency.Network) func(*testing.B) defer n1.Close() defer n2.Close() testFunc(sb, n1, n2) + sb.ReportMetric(float64(n1.WriteCount.Load()), "writes") } } // FindNetworkLimit benchmarks a function to analyze CPU and network relationship -func FindNetworkLimit(testFunc NetworkTestFunc, fractionOfMax float64) (int, time.Duration, error) { +func FindNetworkLimit(testFunc NetworkTestFunc) (int, error) { network := latency.Network{ Kbps: 0, Latency: 0, @@ -69,31 +84,24 @@ func FindNetworkLimit(testFunc NetworkTestFunc, fractionOfMax float64) (int, tim result := testing.Benchmark(wrapperFunc) if result.N < 1 { - return 0, 0, fmt.Errorf("failed to run benchmark") + return 0, fmt.Errorf("failed to run benchmark") } - max := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() - fmt.Printf("CPU Bound Limit: %s\n", result) - - current := max - network.Latency = 500 * time.Microsecond - for current > max*fractionOfMax { + fmt.Printf("Limit with no network latency: %s\n", result) + + last := 1.0 + network.Latency = 30 * time.Millisecond + network.Kbps = 1024 * 1 // 1 Mbps + result = testing.Benchmark(wrapperFunc) + current := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() + for current/(2*last) > .1 { network.Latency *= 2 result = testing.Benchmark(wrapperFunc) + last = current current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() } - fmt.Printf("Latency Bound Limit: %s\n", network.Latency) - - network.Kbps = 1024 * 100 // 100Mbps - network.Latency /= 2 - for current > max*fractionOfMax { - network.Kbps /= 2 - result = testing.Benchmark(wrapperFunc) - current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds() - } - fmt.Printf("Bandwidth Bound Limit: %dKbps\n", network.Kbps) - fmt.Printf("Network Bound Limit: %s\n", result) + fmt.Printf("At 30ms latency, bandwidth can saturate: %dKbps\n", network.Kbps) - return network.Kbps, network.Latency, nil + return network.Kbps, nil } // ParallelismSlowdown tracks how much overhead is incurred on a ntework bound function when parallelism contentention