Skip to content

Commit

Permalink
receiver: avoid copy base lock
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Dec 8, 2022
1 parent 418b36d commit 001aa65
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
15 changes: 12 additions & 3 deletions receiver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ type Base struct {
concatCharacter string
}

func NewBase(logger *zap.Logger, config tags.TagConfig) Base {
return Base{logger: logger, Tags: config}
}
// func NewBase(logger *zap.Logger, config tags.TagConfig) Base {
// return Base{logger: logger, Tags: config}
// }

func sendUint64Counter(send func(metric string, value float64), metric string, value *uint64) {
v := atomic.LoadUint64(value)
Expand All @@ -56,6 +56,15 @@ func sendInt64Gauge(send func(metric string, value float64), metric string, valu
send(metric, float64(atomic.LoadInt64(value)))
}

func (base *Base) Init(logger *zap.Logger, config tags.TagConfig, opts ...Option) {
base.logger = logger
base.Tags = config

for _, optApply := range opts {
optApply(base)
}
}

func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool {
if base.dropFutureSeconds != 0 && (metricTime > (nowTime + base.dropFutureSeconds)) {
atomic.AddUint64(&base.stat.futureDropped, 1)
Expand Down
27 changes: 10 additions & 17 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

base := NewBase(zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)), config)

for _, optApply := range opts {
optApply(&base)
}
logger := zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1))

if u.Scheme == "tcp" {
addr, err := net.ResolveTCPAddr("tcp", u.Host)
Expand All @@ -110,9 +106,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &TCP{
Base: base,
parseChan: make(chan *Buffer),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -127,9 +123,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &Pickle{
Base: base,
parseChan: make(chan []byte),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -144,9 +140,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &UDP{
Base: base,
parseChan: make(chan *Buffer),
}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -160,9 +156,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &GRPC{
Base: base,
}
r := &GRPC{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -176,9 +171,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &PrometheusRemoteWrite{
Base: base,
}
r := &PrometheusRemoteWrite{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -192,9 +186,8 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &TelegrafHttpJson{
Base: base,
}
r := &TelegrafHttpJson{}
r.Init(logger, config, opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions receiver/telegraf_http_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func BenchmarkTelegrafHttpJson5(b *testing.B) {
}
}
}()
r := &TelegrafHttpJson{
Base: NewBase(zapwriter.Logger("telegraf"), tags.DisabledTagConfig()),
}
r := &TelegrafHttpJson{}
r.Init(zapwriter.Logger("telegraf"), tags.DisabledTagConfig())

r.Base.writeChan = writeChan

Expand Down

0 comments on commit 001aa65

Please sign in to comment.