Skip to content

Commit

Permalink
Merge pull request #127 from msaf1980/receiver_json_perf
Browse files Browse the repository at this point in the history
Improve telegraf json receiver performance
  • Loading branch information
msaf1980 authored Dec 8, 2022
2 parents e39c155 + 001aa65 commit cb484fd
Show file tree
Hide file tree
Showing 105 changed files with 11,814 additions and 32 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.4.1
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/json-iterator/go v1.1.12
github.com/lomik/graphite-pickle v0.0.0-20171221213606-614e8df42119
github.com/lomik/og-rek v0.0.0-20170411191824-628eefeb8d80 // indirect
github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd // indirect
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
Expand All @@ -47,8 +50,10 @@ github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd h1:hUNpVzZOYNANa5s8XMBE
github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd/go.mod h1:3pLqdYIrxHYk+VsfIlrTcBD9J34YkGq8iN9yzJuhrP0=
github.com/lomik/zapwriter v0.0.0-20170315193840-d4499a33b592 h1:NEjGY17W0lNlYFX9i7GupGjrJojX2Ilhy4XebC+86+w=
github.com/lomik/zapwriter v0.0.0-20170315193840-d4499a33b592/go.mod h1:8xn4WUTM35HWUh6BGz2NkXFmD2l4D420gb4lL8OPJcg=
github.com/msaf1980/go-stringutils v0.1.0 h1:MFkZ4AEXsUA5Ggyrn3C8v0AIrXYhA7qizxlMLm3nZ+M=
github.com/msaf1980/go-stringutils v0.1.0/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/msaf1980/go-stringutils v0.1.4 h1:UwsIT0hplHVucqbknk3CoNqKkmIuSHhsbBldXxyld5U=
github.com/msaf1980/go-stringutils v0.1.4/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
Expand Down
15 changes: 12 additions & 3 deletions receiver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ type Base struct {
concatCharacter string
}

func NewBase(logger *zap.Logger, config tags.TagConfig) Base {
return Base{logger: logger, Tags: config}
}
// func NewBase(logger *zap.Logger, config tags.TagConfig) Base {
// return Base{logger: logger, Tags: config}
// }

func sendUint64Counter(send func(metric string, value float64), metric string, value *uint64) {
v := atomic.LoadUint64(value)
Expand All @@ -56,6 +56,15 @@ func sendInt64Gauge(send func(metric string, value float64), metric string, valu
send(metric, float64(atomic.LoadInt64(value)))
}

func (base *Base) Init(logger *zap.Logger, config tags.TagConfig, opts ...Option) {
base.logger = logger
base.Tags = config

for _, optApply := range opts {
optApply(base)
}
}

func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool {
if base.dropFutureSeconds != 0 && (metricTime > (nowTime + base.dropFutureSeconds)) {
atomic.AddUint64(&base.stat.futureDropped, 1)
Expand Down
27 changes: 10 additions & 17 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

base := NewBase(zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)), config)

for _, optApply := range opts {
optApply(&base)
}
logger := zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1))

if u.Scheme == "tcp" {
addr, err := net.ResolveTCPAddr("tcp", u.Host)
Expand All @@ -110,9 +106,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &TCP{
Base: base,
parseChan: make(chan *Buffer),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -127,9 +123,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &Pickle{
Base: base,
parseChan: make(chan []byte),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -144,9 +140,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &UDP{
Base: base,
parseChan: make(chan *Buffer),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -160,9 +156,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &GRPC{
Base: base,
}
r := &GRPC{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -176,9 +171,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &PrometheusRemoteWrite{
Base: base,
}
r := &PrometheusRemoteWrite{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -192,9 +186,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &TelegrafHttpJson{
Base: base,
}
r := &TelegrafHttpJson{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand Down
26 changes: 16 additions & 10 deletions receiver/telegraf_http_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package receiver
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"math"
"net"
Expand All @@ -12,6 +11,7 @@ import (
"sync/atomic"
"time"

json "github.com/json-iterator/go"
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
"github.com/lomik/carbon-clickhouse/helper/escape"
"go.uber.org/zap"
Expand Down Expand Up @@ -73,21 +73,14 @@ func TelegrafEncodeTags(tags map[string]string) string {
return res.String()
}

func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

func (rcv *TelegrafHttpJson) process(ctx context.Context, body []byte) (err error) {
var data TelegrafHttpPayload
err = json.Unmarshal(body, &data)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

writer := RowBinary.NewWriter(r.Context(), rcv.writeChan)
writer := RowBinary.NewWriter(ctx, rcv.writeChan)

var pathBuf bytes.Buffer

Expand Down Expand Up @@ -141,6 +134,19 @@ metricsLoop:
if writeErrors := writer.WriteErrors(); writeErrors > 0 {
atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors))
}

return
}

func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err = rcv.process(r.Context(), body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

// Addr returns binded socket address. For bind port 0 in tests
Expand Down
75 changes: 75 additions & 0 deletions receiver/telegraf_http_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/lomik/carbon-clickhouse/helper/RowBinary/reader"
"github.com/lomik/carbon-clickhouse/helper/tags"
"github.com/lomik/carbon-clickhouse/helper/tests"
"github.com/lomik/zapwriter"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -138,3 +139,77 @@ func TestTelegrafHttpJson(t *testing.T) {

verifyIndexUploaded(t, &rawBuf, wantPoints, start, uint32(time.Now().Unix()))
}

func BenchmarkTelegrafHttpJson5(b *testing.B) {
writeChan := make(chan *RowBinary.WriteBuffer)
// simulate writer
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case b := <-writeChan:
b.Release()
case <-ctx.Done():
return
}
}
}()
r := &TelegrafHttpJson{}
r.Init(zapwriter.Logger("telegraf"), tags.DisabledTagConfig())

r.Base.writeChan = writeChan

payload := TelegrafHttpPayload{
Metrics: []TelegrafHttpMetric{
{
Name: "name with space1.",
Fields: map[string]interface{}{"counter": 3538944},
Tags: map[string]string{"key with space": "value with space", "name": "name_value"},
Timestamp: 1670348700,
},
{
Name: "name with space2.",
Fields: map[string]interface{}{"gauge": 3538945},
Tags: map[string]string{"key2": "value2", "key1": "value2"},
Timestamp: 1670348702,
},
{
Name: "name with space3.",
Fields: map[string]interface{}{"gauge": 3538945},
Tags: map[string]string{"key2": "value2", "key1": "value2"},
Timestamp: 1670348702,
},
{
Name: "name with space4.",
Fields: map[string]interface{}{"gauge": 3538945},
Tags: map[string]string{"key2": "value2", "key1": "value2"},
Timestamp: 1670348702,
},
{
Name: "name with space5.",
Fields: map[string]interface{}{"gauge": 3538945},
Tags: map[string]string{"key2": "value2", "key1": "value2"},
Timestamp: 1670348702,
},
},
}
out, err := json.Marshal(&payload)
if err != nil {
b.Fatal("payload marshal", err)
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := r.process(context.Background(), out); err != nil {
b.Fatal("process", err)
}
}
b.StopTimer()

cancel()
wg.Wait()
}
3 changes: 3 additions & 0 deletions vendor/github.com/json-iterator/go/.codecov.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/github.com/json-iterator/go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/json-iterator/go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/json-iterator/go/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions vendor/github.com/json-iterator/go/Gopkg.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/json-iterator/go/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cb484fd

Please sign in to comment.