Skip to content
This repository was archived by the owner on Aug 30, 2019. It is now read-only.

cmd/trace-agent: improve Agent.Process readability and add Span.SetMetric #490

Merged
merged 2 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 49 additions & 40 deletions cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,19 @@ func (a *Agent) Process(t model.Trace) {
return
}

// All traces should go through either through the normal score sampler or
// the one dedicated to errors
samplers := make([]*Sampler, 0, 2)
if traceContainsError(t) {
samplers = append(samplers, a.ErrorsScoreSampler)
} else {
samplers = append(samplers, a.ScoreSampler)
}
// Root span is used to carry some trace-level metadata, such as sampling rate and priority.
root := t.GetRoot()

// We get the address of the struct holding the stats associated to no tags
// TODO: get the real tagStats related to this trace payload.
// We get the address of the struct holding the stats associated to no tags.
// TODO: get the real tagStats related to this trace payload (per lang/version).
ts := a.Receiver.Stats.GetTagStats(info.Tags{})
stat := &ts.TracesPriorityNone
root := t.GetRoot()

// Extract priority early, as later goroutines might manipulate the Metrics map in parallel which isn't safe.
priority, hasPriority := root.Metrics[sampler.SamplingPriorityKey]
if hasPriority {
// If Priority is defined, send to priority sampling, regardless of priority value.
// The sampler will keep or discard the trace, but we send everything so that it
// gets the big picture and can set the sampling rates accordingly.
samplers = append(samplers, a.PrioritySampler)

// Depending on the sampling priority, count that trace differently.
stat := &ts.TracesPriorityNone
if hasPriority {
if priority < 0 {
stat = &ts.TracesPriorityNeg
} else if priority == 0 {
Expand Down Expand Up @@ -231,12 +223,22 @@ func (a *Agent) Process(t model.Trace) {
return
}

rate := sampler.GetTraceAppliedSampleRate(root)
rate *= a.Receiver.PreSampler.Rate()
sampler.SetTraceAppliedSampleRate(root, rate)
// Extra sanitization steps of the trace.
for _, span := range t {
a.obfuscator.Obfuscate(span)
span.Truncate()
}
a.Replacer.Replace(&t)

// Need to do this computation before entering the concentrator
// as they access the Metrics map, which is not thread safe.
// Extract the client sampling rate.
clientSampleRate := sampler.GetTraceAppliedSampleRate(root)
// Combine it with the pre-sampling rate.
preSamplerRate := a.Receiver.PreSampler.Rate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this rate ever going to be different than 1? We’ve stopped making it configurable. I’d like to put it into a constant in a subsequent PR and simply use that. Would that be ok?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we stopped making its initial value being configurable. But this value gets below 1 whenever the Agent uses too many resources -> it will drop a proportion of incoming payloads.

Note that I did split it into two variables (while we still only use its product, previously rate) to make it more readable / because in a next PR I'll do something out of each individual value.

// Combine them and attach it to the root to be used for weighing.
sampler.SetTraceAppliedSampleRate(root, clientSampleRate*preSamplerRate)

// Figure out the top-level spans and sublayers now as it involves modifying the Metrics map
// which is not thread-safe while samplers and Concentrator might modify it too.
t.ComputeTopLevel()

subtraces := t.ExtractTopLevelSubtraces(root)
Expand All @@ -247,20 +249,14 @@ func (a *Agent) Process(t model.Trace) {
model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers)
}

for _, span := range t {
a.obfuscator.Obfuscate(span)
span.Truncate()
}

a.Replacer.Replace(&t)

pt := processedTrace{
Trace: t,
WeightedTrace: model.NewWeightedTrace(t, root),
Root: root,
Env: a.conf.DefaultEnv,
Sublayers: sublayers,
}
// Replace Agent-configured environment with `env` coming from span tag.
if tenv := t.GetEnv(); tenv != "" {
pt.Env = tenv
}
Expand All @@ -274,32 +270,45 @@ func (a *Agent) Process(t model.Trace) {
defer watchdog.LogOnPanic()
// Everything is sent to concentrator for stats, regardless of sampling.
a.Concentrator.Add(pt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this goroutine could be merged with the other one (service extractor). I have the feeling (but not the benchmarks to prove it) that it will be a fairly good perf. increase. If you're also unsure, just leave it, I'll write some benchmarks and try it out later.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had the same thoughts working on that function. The service extractor is barely doing anything, so I'd guess that a goroutine is bit overdoing it. Although, I don't have any benchmark to power such argument.

Let's maybe investigate and do a PR independently?


}()
if hasPriority && priority < 0 {
// If the trace has a negative priority we absolutely don't want it
// sampled either by the trace or transaction pipeline so we return here

// Don't go through sampling for < 0 priority traces
if priority < 0 {
return
}

// Run both full trace sampling and transaction extraction in another goroutine
// Run both full trace sampling and transaction extraction in another goroutine.
go func() {
defer watchdog.LogOnPanic()

// Trace sampling
sampled := false
for _, s := range samplers {
// Consider trace as sampled if at least one of the samplers kept it
sampled = s.Add(pt) || sampled
// All traces should go through either through the normal score sampler or
// the one dedicated to errors.
samplers := make([]*Sampler, 0, 2)
if traceContainsError(t) {
samplers = append(samplers, a.ErrorsScoreSampler)
} else {
samplers = append(samplers, a.ScoreSampler)
}
if hasPriority {
// If Priority is defined, send to priority sampling, regardless of priority value.
// The sampler will keep or discard the trace, but we send everything so that it
// gets the big picture and can set the sampling rates accordingly.
samplers = append(samplers, a.PrioritySampler)
}

// Trace sampling.
var sampledTrace writer.SampledTrace

sampled := false
for _, s := range samplers {
// Consider trace as sampled if at least one of the samplers kept it.
sampled = s.Add(pt) || sampled
}
if sampled {
sampledTrace.Trace = &pt.Trace
}

sampledTrace.Transactions = a.TransactionSampler.Extract(pt)
// TODO: attach to these transactions the client, pre-sampler and transaction sample rates.

if !sampledTrace.Empty() {
a.sampledTraceChan <- &sampledTrace
Expand Down
3 changes: 1 addition & 2 deletions cmd/trace-agent/transaction_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
func createTrace(serviceName string, operationName string, topLevel bool, hasPriority bool, priority int) processedTrace {
ws := model.WeightedSpan{TopLevel: topLevel, Span: &model.Span{Service: serviceName, Name: operationName}}
if hasPriority {
ws.Metrics = make(map[string]float64)
ws.Metrics[sampler.SamplingPriorityKey] = float64(priority)
ws.SetMetric(sampler.SamplingPriorityKey, float64(priority))
}
wt := model.WeightedTrace{&ws}
return processedTrace{WeightedTrace: wt, Root: ws.Span}
Expand Down
8 changes: 8 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ func (s *Span) Weight() float64 {

return 1.0 / sampleRate
}

// SetMetric sets a value in the span Metrics map.
func (s *Span) SetMetric(key string, val float64) {
if s.Metrics == nil {
s.Metrics = make(map[string]float64)
}
s.Metrics[key] = val
}
8 changes: 1 addition & 7 deletions model/top_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,11 @@ func (s *Span) setTopLevel(topLevel bool) {
return
}
delete(s.Metrics, topLevelKey)
if len(s.Metrics) == 0 {
s.Metrics = nil
}
return
}
if s.Metrics == nil {
s.Metrics = make(map[string]float64, 1)
}
// Setting the metrics value, so that code downstream in the pipeline
// can identify this as top-level without recomputing everything.
s.Metrics[topLevelKey] = 1
s.SetMetric(topLevelKey, 1)
}

// TopLevel returns true if span is top-level.
Expand Down
2 changes: 1 addition & 1 deletion model/top_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestTopLevelGetSetMetrics(t *testing.T) {
span.setTopLevel(true)
assert.Equal(float64(1), span.Metrics["_top_level"], "should have a _top_level:1 flag")
span.setTopLevel(false)
assert.Nil(span.Metrics, "no meta at all")
assert.Equal(len(span.Metrics), 0, "no meta at all")

span.Metrics = map[string]float64{"custom": 42}

Expand Down
5 changes: 1 addition & 4 deletions sampler/coresampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,5 @@ func GetTraceAppliedSampleRate(root *model.Span) float64 {

// SetTraceAppliedSampleRate sets the currently applied sample rate in the trace data to allow chained up sampling.
func SetTraceAppliedSampleRate(root *model.Span, sampleRate float64) {
if root.Metrics == nil {
root.Metrics = make(map[string]float64)
}
root.Metrics[model.SpanSampleRateMetricKey] = sampleRate
root.SetMetric(model.SpanSampleRateMetricKey, sampleRate)
}