-
Notifications
You must be signed in to change notification settings - Fork 31
cmd/trace-agent: improve Agent.Process readability and add Span.SetMetric #490
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,27 +183,19 @@ func (a *Agent) Process(t model.Trace) { | |
return | ||
} | ||
|
||
// All traces should go through either through the normal score sampler or | ||
// the one dedicated to errors | ||
samplers := make([]*Sampler, 0, 2) | ||
if traceContainsError(t) { | ||
samplers = append(samplers, a.ErrorsScoreSampler) | ||
} else { | ||
samplers = append(samplers, a.ScoreSampler) | ||
} | ||
// Root span is used to carry some trace-level metadata, such as sampling rate and priority. | ||
root := t.GetRoot() | ||
|
||
// We get the address of the struct holding the stats associated to no tags | ||
// TODO: get the real tagStats related to this trace payload. | ||
// We get the address of the struct holding the stats associated to no tags. | ||
// TODO: get the real tagStats related to this trace payload (per lang/version). | ||
ts := a.Receiver.Stats.GetTagStats(info.Tags{}) | ||
stat := &ts.TracesPriorityNone | ||
root := t.GetRoot() | ||
|
||
// Extract priority early, as later goroutines might manipulate the Metrics map in parallel which isn't safe. | ||
priority, hasPriority := root.Metrics[sampler.SamplingPriorityKey] | ||
if hasPriority { | ||
// If Priority is defined, send to priority sampling, regardless of priority value. | ||
// The sampler will keep or discard the trace, but we send everything so that it | ||
// gets the big picture and can set the sampling rates accordingly. | ||
samplers = append(samplers, a.PrioritySampler) | ||
|
||
// Depending on the sampling priority, count that trace differently. | ||
stat := &ts.TracesPriorityNone | ||
if hasPriority { | ||
if priority < 0 { | ||
stat = &ts.TracesPriorityNeg | ||
} else if priority == 0 { | ||
|
@@ -231,12 +223,22 @@ func (a *Agent) Process(t model.Trace) { | |
return | ||
} | ||
|
||
rate := sampler.GetTraceAppliedSampleRate(root) | ||
rate *= a.Receiver.PreSampler.Rate() | ||
sampler.SetTraceAppliedSampleRate(root, rate) | ||
// Extra sanitization steps of the trace. | ||
for _, span := range t { | ||
a.obfuscator.Obfuscate(span) | ||
span.Truncate() | ||
} | ||
a.Replacer.Replace(&t) | ||
|
||
// Need to do this computation before entering the concentrator | ||
// as they access the Metrics map, which is not thread safe. | ||
// Extract the client sampling rate. | ||
clientSampleRate := sampler.GetTraceAppliedSampleRate(root) | ||
// Combine it with the pre-sampling rate. | ||
preSamplerRate := a.Receiver.PreSampler.Rate() | ||
// Combine them and attach it to the root to be used for weighing. | ||
sampler.SetTraceAppliedSampleRate(root, clientSampleRate*preSamplerRate) | ||
|
||
// Figure out the top-level spans and sublayers now as it involves modifying the Metrics map | ||
// which is not thread-safe while samplers and Concentrator might modify it too. | ||
t.ComputeTopLevel() | ||
|
||
subtraces := t.ExtractTopLevelSubtraces(root) | ||
|
@@ -247,20 +249,14 @@ func (a *Agent) Process(t model.Trace) { | |
model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers) | ||
} | ||
|
||
for _, span := range t { | ||
a.obfuscator.Obfuscate(span) | ||
span.Truncate() | ||
} | ||
|
||
a.Replacer.Replace(&t) | ||
|
||
pt := processedTrace{ | ||
Trace: t, | ||
WeightedTrace: model.NewWeightedTrace(t, root), | ||
Root: root, | ||
Env: a.conf.DefaultEnv, | ||
Sublayers: sublayers, | ||
} | ||
// Replace Agent-configured environment with `env` coming from span tag. | ||
if tenv := t.GetEnv(); tenv != "" { | ||
pt.Env = tenv | ||
} | ||
|
@@ -274,32 +270,45 @@ func (a *Agent) Process(t model.Trace) { | |
defer watchdog.LogOnPanic() | ||
// Everything is sent to concentrator for stats, regardless of sampling. | ||
a.Concentrator.Add(pt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if this goroutine could be merged with the other one (service extractor). I have the feeling (but not the benchmarks to prove it) that it will be a fairly good perf. increase. If you're also unsure, just leave it, I'll write some benchmarks and try it out later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually had the same thoughts working on that function. The service extractor is barely doing anything, so I'd guess that a goroutine is bit overdoing it. Although, I don't have any benchmark to power such argument. Let's maybe investigate and do a PR independently? |
||
|
||
}() | ||
if hasPriority && priority < 0 { | ||
// If the trace has a negative priority we absolutely don't want it | ||
// sampled either by the trace or transaction pipeline so we return here | ||
|
||
// Don't go through sampling for < 0 priority traces | ||
if priority < 0 { | ||
return | ||
} | ||
|
||
// Run both full trace sampling and transaction extraction in another goroutine | ||
// Run both full trace sampling and transaction extraction in another goroutine. | ||
go func() { | ||
defer watchdog.LogOnPanic() | ||
|
||
// Trace sampling | ||
sampled := false | ||
for _, s := range samplers { | ||
// Consider trace as sampled if at least one of the samplers kept it | ||
sampled = s.Add(pt) || sampled | ||
// All traces should go through either through the normal score sampler or | ||
// the one dedicated to errors. | ||
samplers := make([]*Sampler, 0, 2) | ||
if traceContainsError(t) { | ||
samplers = append(samplers, a.ErrorsScoreSampler) | ||
} else { | ||
samplers = append(samplers, a.ScoreSampler) | ||
} | ||
if hasPriority { | ||
// If Priority is defined, send to priority sampling, regardless of priority value. | ||
// The sampler will keep or discard the trace, but we send everything so that it | ||
// gets the big picture and can set the sampling rates accordingly. | ||
samplers = append(samplers, a.PrioritySampler) | ||
} | ||
|
||
// Trace sampling. | ||
var sampledTrace writer.SampledTrace | ||
|
||
sampled := false | ||
for _, s := range samplers { | ||
// Consider trace as sampled if at least one of the samplers kept it. | ||
sampled = s.Add(pt) || sampled | ||
} | ||
if sampled { | ||
sampledTrace.Trace = &pt.Trace | ||
} | ||
|
||
sampledTrace.Transactions = a.TransactionSampler.Extract(pt) | ||
// TODO: attach to these transactions the client, pre-sampler and transaction sample rates. | ||
|
||
if !sampledTrace.Empty() { | ||
a.sampledTraceChan <- &sampledTrace | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this rate ever going to be different than 1? We’ve stopped making it configurable. I’d like to put it into a constant in a subsequent PR and simply use that. Would that be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we stopped making its initial value being configurable. But this value gets below 1 whenever the Agent uses too many resources -> it will drop a proportion of incoming payloads.
Note that I did split it into two variables (while we still only use its product, previously
rate
) to make it more readable / because in a next PR I'll do something out of each individual value.