Skip to content

Add OTLP grpc compression benchmarks #4640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,46 @@ exporters:
"test 2": "value 2"
```

### Compression Comparison

[configgrpc_benchmark_test.go](./configgrpc_benchmark_test.go) contains benchmarks comparing the supported compression algorithms. It performs compression using `gzip`, `zstd`, and `snappy` compression on small, medium, and large sized log, trace, and metric payloads. Each test case outputs the uncompressed payload size, the compressed payload size, and the average nanoseconds spent on compression.

The following table summarizes the results, including some additional columns computed from the raw data. The benchmarks were performed on an AWS m5.large EC2 instance with an Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz.

| Request | Compressor | Raw Bytes | Compressed bytes | Compression ratio | Ns / op | Mb compressed / second | Mb saved / second |
|-------------------|------------|-----------|------------------|-------------------|---------|------------------------|-------------------|
| lg_log_request | gzip | 5150 | 262 | 19.66 | 49231 | 104.61 | 99.29 |
| lg_metric_request | gzip | 6800 | 201 | 33.83 | 51816 | 131.23 | 127.35 |
| lg_trace_request | gzip | 9200 | 270 | 34.07 | 65174 | 141.16 | 137.02 |
| md_log_request | gzip | 363 | 268 | 1.35 | 37609 | 9.65 | 2.53 |
| md_metric_request | gzip | 320 | 145 | 2.21 | 30141 | 10.62 | 5.81 |
| md_trace_request | gzip | 451 | 288 | 1.57 | 38270 | 11.78 | 4.26 |
| sm_log_request | gzip | 166 | 168 | 0.99 | 30511 | 5.44 | -0.07 |
| sm_metric_request | gzip | 185 | 142 | 1.30 | 29055 | 6.37 | 1.48 |
| sm_trace_request | gzip | 233 | 205 | 1.14 | 33466 | 6.96 | 0.84 |
| lg_log_request | snappy | 5150 | 475 | 10.84 | 1915 | 2,689.30 | 2,441.25 |
| lg_metric_request | snappy | 6800 | 466 | 14.59 | 2266 | 3,000.88 | 2,795.23 |
| lg_trace_request | snappy | 9200 | 644 | 14.29 | 3281 | 2,804.02 | 2,607.74 |
| md_log_request | snappy | 363 | 300 | 1.21 | 770.0 | 471.43 | 81.82 |
| md_metric_request | snappy | 320 | 162 | 1.98 | 588.6 | 543.66 | 268.43 |
| md_trace_request | snappy | 451 | 330 | 1.37 | 907.7 | 496.86 | 133.30 |
| sm_log_request | snappy | 166 | 184 | 0.90 | 551.8 | 300.83 | -32.62 |
| sm_metric_request | snappy | 185 | 154 | 1.20 | 526.3 | 351.51 | 58.90 |
| sm_trace_request | snappy | 233 | 251 | 0.93 | 682.1 | 341.59 | -26.39 |
| lg_log_request | zstd | 5150 | 223 | 23.09 | 17998 | 286.14 | 273.75 |
| lg_metric_request | zstd | 6800 | 144 | 47.22 | 14289 | 475.89 | 465.81 |
| lg_trace_request | zstd | 9200 | 208 | 44.23 | 17160 | 536.13 | 524.01 |
| md_log_request | zstd | 363 | 261 | 1.39 | 11216 | 32.36 | 9.09 |
| md_metric_request | zstd | 320 | 145 | 2.21 | 9318 | 34.34 | 18.78 |
| md_trace_request | zstd | 451 | 301 | 1.50 | 12583 | 35.84 | 11.92 |
| sm_log_request | zstd | 166 | 165 | 1.01 | 12482 | 13.30 | 0.08 |
| sm_metric_request | zstd | 185 | 139 | 1.33 | 8824 | 20.97 | 5.21 |
| sm_trace_request | zstd | 233 | 203 | 1.15 | 10134 | 22.99 | 2.96 |

Compression ratios will vary in practice as they are highly dependent on the data's information entropy. Compression rates are dependent on the speed of the CPU, and the size of payloads being compressed: smaller payloads compress at slower rates relative to larger payloads, which are able to amortize fixed computation costs over more bytes.

`gzip` is the only required compression algorithm required for [OTLP servers](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#protocol-details), and is a natural first choice. It is not as fast as `snappy`, but achieves better compression ratios and has reasonable performance. If your collector is CPU bound and your OTLP server supports it, you may benefit from using `snappy` compression. If your collector is CPU bound and has a very fast network link, you may benefit from disabling compression, which is the default.

## Server Configuration

[Receivers](https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/README.md)
Expand Down
181 changes: 181 additions & 0 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package configgrpc

import (
"bytes"
"fmt"
"testing"

"github.com/mostynb/go-grpc-compression/snappy"
"github.com/mostynb/go-grpc-compression/zstd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
)

func BenchmarkCompressors(b *testing.B) {
payloads := setupTestPayloads()

compressors := make([]encoding.Compressor, 0)
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
compressors = append(compressors, encoding.GetCompressor(snappy.Name))

for _, payload := range payloads {
for _, compressor := range compressors {
fmt.Printf(payload.name)
messageBytes, err := payload.marshaler.marshal(payload.message)
if err != nil {
b.Errorf("marshal(_) returned an error")
}

compressedBytes, err := compress(compressor, messageBytes)
if err != nil {
b.Errorf("Compressor.Compress(_) returned an error")
}

name := fmt.Sprintf("%v/raw_bytes_%v/compressed_bytes_%v/compressor_%v", payload.name, len(messageBytes), len(compressedBytes), compressor.Name())

b.Run(name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err != nil {
b.Errorf("marshal(_) returned an error")
}
_, err := compress(compressor, messageBytes)
if err != nil {
b.Errorf("compress(_) returned an error")
}
}
})
}
}
}

func compress(compressor encoding.Compressor, in []byte) ([]byte, error) {
if compressor == nil {
return nil, nil
}
wrapErr := func(err error) error {
return status.Errorf(codes.Internal, "error while compressing: %v", err.Error())
}
cbuf := &bytes.Buffer{}
z, err := compressor.Compress(cbuf)
if err != nil {
return nil, wrapErr(err)
}
if _, err := z.Write(in); err != nil {
return nil, wrapErr(err)
}
if err := z.Close(); err != nil {
return nil, wrapErr(err)
}
return cbuf.Bytes(), nil
}

type testPayload struct {
name string
message interface{}
marshaler marshaler
}

type marshaler interface {
marshal(interface{}) ([]byte, error)
}

type logMarshaler struct {
pdata.LogsMarshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
}

func setupTestPayloads() []testPayload {
payloads := make([]testPayload, 0)

// log payloads
logMarshaler := &logMarshaler{otlp.NewProtobufLogsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_log_request",
message: testdata.GenerateLogsOneLogRecord(),
marshaler: logMarshaler})
payloads = append(payloads, testPayload{
name: "md_log_request",
message: testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent(),
marshaler: logMarshaler})
payloads = append(payloads, testPayload{
name: "lg_log_request",
message: testdata.GenerateLogsManyLogRecordsSameResource(50),
marshaler: logMarshaler})

// trace payloads
tracesMarshaler := &traceMarshaler{otlp.NewProtobufTracesMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_trace_request",
message: testdata.GenerateTracesOneSpan(),
marshaler: tracesMarshaler})
payloads = append(payloads, testPayload{
name: "md_trace_request",
message: testdata.GenerateTracesTwoSpansSameResourceOneDifferent(),
marshaler: tracesMarshaler})
payloads = append(payloads, testPayload{
name: "lg_trace_request",
message: testdata.GenerateTracesManySpansSameResource(50),
marshaler: tracesMarshaler})

// metric payloads
metricsMarshaler := &metricsMarshaler{otlp.NewProtobufMetricsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_metric_request",
message: testdata.GenerateMetricsOneMetric(),
marshaler: metricsMarshaler})
payloads = append(payloads, testPayload{
name: "md_metric_request",
message: testdata.GenerateMetricsTwoMetrics(),
marshaler: metricsMarshaler})
payloads = append(payloads, testPayload{
name: "lg_metric_request",
message: testdata.GenerateMetricsManyMetricsSameResource(50),
marshaler: metricsMarshaler})

return payloads
}