This repository was archived by the owner on Aug 30, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathagent.go
357 lines (305 loc) · 10.1 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package main
import (
"context"
"sync/atomic"
"time"
log "github.com/cihub/seelog"
"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/filters"
"github.com/DataDog/datadog-trace-agent/info"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/obfuscate"
"github.com/DataDog/datadog-trace-agent/osutil"
"github.com/DataDog/datadog-trace-agent/sampler"
"github.com/DataDog/datadog-trace-agent/statsd"
"github.com/DataDog/datadog-trace-agent/watchdog"
"github.com/DataDog/datadog-trace-agent/writer"
)
const processStatsInterval = time.Minute
type processedTrace struct {
Trace model.Trace
WeightedTrace model.WeightedTrace
Root *model.Span
Env string
Sublayers map[*model.Span][]model.SublayerValue
}
func (pt *processedTrace) weight() float64 {
if pt.Root == nil {
return 1.0
}
return pt.Root.Weight()
}
func (pt *processedTrace) getSamplingPriority() (int, bool) {
if pt.Root == nil {
return 0, false
}
p, ok := pt.Root.Metrics[sampler.SamplingPriorityKey]
return int(p), ok
}
// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Blacklister *filters.Blacklister
Replacer *filters.Replacer
ScoreSampler *Sampler
ErrorsScoreSampler *Sampler
PrioritySampler *Sampler
TransactionSampler TransactionSampler
TraceWriter *writer.TraceWriter
ServiceWriter *writer.ServiceWriter
StatsWriter *writer.StatsWriter
ServiceExtractor *TraceServiceExtractor
ServiceMapper *ServiceMapper
// obfuscator is used to obfuscate sensitive data from various span
// tags based on their type.
obfuscator *obfuscate.Obfuscator
sampledTraceChan chan *writer.SampledTrace
// config
conf *config.AgentConfig
dynConf *config.DynamicConfig
// Used to synchronize on a clean exit
ctx context.Context
}
// NewAgent returns a new Agent object, ready to be started. It takes a context
// which may be cancelled in order to gracefully stop the agent.
func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
dynConf := config.NewDynamicConfig()
// inter-component channels
rawTraceChan := make(chan model.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace
sampledTraceChan := make(chan *writer.SampledTrace)
statsChan := make(chan []model.StatsBucket)
serviceChan := make(chan model.ServicesMetadata, 50)
filteredServiceChan := make(chan model.ServicesMetadata, 50)
// create components
r := NewHTTPReceiver(conf, dynConf, rawTraceChan, serviceChan)
c := NewConcentrator(
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
statsChan,
)
obf := obfuscate.NewObfuscator(conf.Obfuscation)
ss := NewScoreSampler(conf)
ess := NewErrorsSampler(conf)
ps := NewPrioritySampler(conf, dynConf)
ts := NewTransactionSampler(conf)
se := NewTraceServiceExtractor(serviceChan)
sm := NewServiceMapper(serviceChan, filteredServiceChan)
tw := writer.NewTraceWriter(conf, sampledTraceChan)
sw := writer.NewStatsWriter(conf, statsChan)
svcW := writer.NewServiceWriter(conf, filteredServiceChan)
return &Agent{
Receiver: r,
Concentrator: c,
Blacklister: filters.NewBlacklister(conf.Ignore["resource"]),
Replacer: filters.NewReplacer(conf.ReplaceTags),
ScoreSampler: ss,
ErrorsScoreSampler: ess,
PrioritySampler: ps,
TransactionSampler: ts,
TraceWriter: tw,
StatsWriter: sw,
ServiceWriter: svcW,
ServiceExtractor: se,
ServiceMapper: sm,
obfuscator: obf,
sampledTraceChan: sampledTraceChan,
conf: conf,
dynConf: dynConf,
ctx: ctx,
}
}
// Run starts routers routines and individual pieces then stop them when the exit order is received
func (a *Agent) Run() {
// it's really important to use a ticker for this, and with a not too short
// interval, for this is our guarantee that the process won't start and kill
// itself too fast (nightmare loop)
watchdogTicker := time.NewTicker(a.conf.WatchdogInterval)
defer watchdogTicker.Stop()
// update the data served by expvar so that we don't expose a 0 sample rate
info.UpdatePreSampler(*a.Receiver.preSampler.Stats())
// TODO: unify components APIs. Use Start/Stop as non-blocking ways of controlling the blocking Run loop.
// Like we do with TraceWriter.
a.Receiver.Run()
a.TraceWriter.Start()
a.StatsWriter.Start()
a.ServiceMapper.Start()
a.ServiceWriter.Start()
a.Concentrator.Start()
a.ScoreSampler.Run()
a.ErrorsScoreSampler.Run()
a.PrioritySampler.Run()
for {
select {
case t := <-a.Receiver.traces:
a.Process(t)
case <-watchdogTicker.C:
a.watchdog()
case <-a.ctx.Done():
log.Info("exiting")
if err := a.Receiver.Stop(); err != nil {
log.Error(err)
}
a.Concentrator.Stop()
a.TraceWriter.Stop()
a.StatsWriter.Stop()
a.ServiceMapper.Stop()
a.ServiceWriter.Stop()
a.ScoreSampler.Stop()
a.ErrorsScoreSampler.Stop()
a.PrioritySampler.Stop()
return
}
}
}
// Process is the default work unit that receives a trace, transforms it and
// passes it downstream.
func (a *Agent) Process(t model.Trace) {
if len(t) == 0 {
log.Debugf("skipping received empty trace")
return
}
root := t.GetRoot()
// We get the address of the struct holding the stats associated to no tags
// TODO: get the real tagStats related to this trace payload.
ts := a.Receiver.stats.GetTagStats(info.Tags{})
// All traces should go through either through the normal score sampler or
// the one dedicated to errors
samplers := make([]*Sampler, 0, 2)
if traceContainsError(t) {
samplers = append(samplers, a.ErrorsScoreSampler)
} else {
samplers = append(samplers, a.ScoreSampler)
}
priority, hasPriority := root.Metrics[sampler.SamplingPriorityKey]
if hasPriority {
// If Priority is defined, send to priority sampling, regardless of priority value.
// The sampler will keep or discard the trace, but we send everything so that it
// gets the big picture and can set the sampling rates accordingly.
samplers = append(samplers, a.PrioritySampler)
}
priorityPtr := &ts.TracesPriorityNone
if hasPriority {
if priority < 0 {
priorityPtr = &ts.TracesPriorityNeg
} else if priority == 0 {
priorityPtr = &ts.TracesPriority0
} else if priority == 1 {
priorityPtr = &ts.TracesPriority1
} else {
priorityPtr = &ts.TracesPriority2
}
}
atomic.AddInt64(priorityPtr, 1)
if root.End() < model.Now()-2*a.conf.BucketInterval.Nanoseconds() {
log.Errorf("skipping trace with root too far in past, root:%v", *root)
atomic.AddInt64(&ts.TracesDropped, 1)
atomic.AddInt64(&ts.SpansDropped, int64(len(t)))
return
}
if !a.Blacklister.Allows(root) {
log.Debugf("trace rejected by blacklister. root: %v", root)
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))
return
}
rate := sampler.GetTraceAppliedSampleRate(root)
rate *= a.Receiver.preSampler.Rate()
sampler.SetTraceAppliedSampleRate(root, rate)
// Need to do this computation before entering the concentrator
// as they access the Metrics map, which is not thread safe.
t.ComputeTopLevel()
wt := model.NewWeightedTrace(t, root)
subtraces := t.ExtractTopLevelSubtraces(root)
sublayers := make(map[*model.Span][]model.SublayerValue)
for _, subtrace := range subtraces {
subtraceSublayers := model.ComputeSublayers(subtrace.Trace)
sublayers[subtrace.Root] = subtraceSublayers
model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers)
}
for _, span := range t {
a.obfuscator.Obfuscate(span)
span.Truncate()
}
a.Replacer.Replace(&t)
pt := processedTrace{
Trace: t,
WeightedTrace: wt,
Root: root,
Env: a.conf.DefaultEnv,
Sublayers: sublayers,
}
if tenv := t.GetEnv(); tenv != "" {
pt.Env = tenv
}
go func() {
defer watchdog.LogOnPanic()
a.ServiceExtractor.Process(wt)
}()
go func() {
defer watchdog.LogOnPanic()
// Everything is sent to concentrator for stats, regardless of sampling.
a.Concentrator.Add(pt)
}()
if hasPriority && priority < 0 {
// If the trace has a negative priority we absolutely don't want it
// sampled either by the trace or transaction pipeline so we return here
return
}
// Run both full trace sampling and transaction extraction in another goroutine
go func() {
defer watchdog.LogOnPanic()
// Trace sampling
sampled := false
for _, s := range samplers {
// Consider trace as sampled if at least one of the samplers kept it
sampled = s.Add(pt) || sampled
}
var sampledTrace writer.SampledTrace
if sampled {
sampledTrace.Trace = &pt.Trace
}
sampledTrace.Transactions = a.TransactionSampler.Extract(pt)
if !sampledTrace.Empty() {
a.sampledTraceChan <- &sampledTrace
}
}()
}
// dieFunc is used by watchdog to kill the agent; replaced in tests.
var dieFunc = func(fmt string, args ...interface{}) {
osutil.Exitf(fmt, args...)
}
func (a *Agent) watchdog() {
var wi watchdog.Info
wi.CPU = watchdog.CPU()
wi.Mem = watchdog.Mem()
wi.Net = watchdog.Net()
if float64(wi.Mem.Alloc) > a.conf.MaxMemory && a.conf.MaxMemory > 0 {
dieFunc("exceeded max memory (current=%d, max=%d)", wi.Mem.Alloc, int64(a.conf.MaxMemory))
}
if int(wi.Net.Connections) > a.conf.MaxConnections && a.conf.MaxConnections > 0 {
dieFunc("exceeded max connections (current=%d, max=%d)", wi.Net.Connections, a.conf.MaxConnections)
}
info.UpdateWatchdogInfo(wi)
// Adjust pre-sampling dynamically
rate, err := sampler.CalcPreSampleRate(a.conf.MaxCPU, wi.CPU.UserAvg, a.Receiver.preSampler.RealRate())
if rate > a.conf.PreSampleRate {
rate = a.conf.PreSampleRate
}
if err != nil {
log.Warnf("problem computing pre-sample rate: %v", err)
}
a.Receiver.preSampler.SetRate(rate)
a.Receiver.preSampler.SetError(err)
preSamplerStats := a.Receiver.preSampler.Stats()
statsd.Client.Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1)
info.UpdatePreSampler(*preSamplerStats)
}
func traceContainsError(trace model.Trace) bool {
for _, span := range trace {
if span.Error != 0 {
return true
}
}
return false
}