Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fix event tag correctness
Browse files Browse the repository at this point in the history
When searching for tags in jaeger, a tag can come from any event
attached to a span. Different tags can come from different events
on the same span. Previously, this was broken since the events
were joined to spans and tags searched one-at-a-time.

Fixed this by searching with EXISTS.
  • Loading branch information
cevian committed Sep 21, 2022
1 parent 0f12a84 commit 28276ba
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
56 changes: 30 additions & 26 deletions pkg/jaeger/store/trace_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ const (
s.trace_id,
max(start_time) as start_time_max
FROM _ps_trace.span s
%[2]s
WHERE
%[3]s
%[2]s
GROUP BY s.trace_id
) as trace_sub
ORDER BY trace_sub.start_time_max DESC
Expand Down Expand Up @@ -210,9 +209,25 @@ func (b *Builder) buildOperationSubquery(q *spanstore.TraceQueryParameters, tInf
return "", params
}

func (b *Builder) buildTagClauses(tInfo *tagsInfo, params []interface{}) (string, []interface{}, bool) {
func (b *Builder) buildEventSubquery(q *spanstore.TraceQueryParameters, clauses []string, params []interface{}) (string, []interface{}) {
var defaultTime time.Time
if q.StartTimeMin != defaultTime {
params = append(params, q.StartTimeMin.Add(-b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time >= $%d`, len(params)))
}
if q.StartTimeMax != defaultTime {
params = append(params, q.StartTimeMax.Add(b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time <= $%d`, len(params)))
}
subquery := fmt.Sprintf(
`SELECT 1
FROM _ps_trace.event e
WHERE s.trace_id = e.trace_id AND s.span_id = e.span_id AND %s`, strings.Join(clauses, " AND "))
return subquery, params
}

func (b *Builder) buildTagClauses(q *spanstore.TraceQueryParameters, tInfo *tagsInfo, params []interface{}) (string, []interface{}) {
clauses := make([]string, 0, len(tInfo.generalTags))
hasEventTags := false
for _, tag := range tInfo.generalTags {
tagClauses := make([]string, 0, 3)
params = append(params, tag.jsonbPairArray)
Expand All @@ -223,12 +238,13 @@ func (b *Builder) buildTagClauses(tInfo *tagsInfo, params []interface{}) (string
tagClauses = append(tagClauses, fmt.Sprintf("(s.resource_tags @> ANY($%d::jsonb[]))", len(params)))
}
if tag.isEvent {
hasEventTags = true
tagClauses = append(tagClauses, fmt.Sprintf("(e.tags @> ANY($%d::jsonb[]))", len(params)))
var subquery string
subquery, params = b.buildEventSubquery(q, []string{fmt.Sprintf("(e.tags @> ANY($%d::jsonb[]))", len(params))}, params)
tagClauses = append(tagClauses, fmt.Sprintf("EXISTS(%s)", subquery))
}
clauses = append(clauses, "("+strings.Join(tagClauses, " OR ")+")")
}
return "(" + strings.Join(clauses, " AND ") + ")", params, hasEventTags
return "(" + strings.Join(clauses, " AND ") + ")", params

}

Expand All @@ -237,8 +253,11 @@ func (b *Builder) BuildTraceIDSubquery(q *spanstore.TraceQueryParameters, tInfo
params := tInfo.params

clauses = append(clauses, tInfo.spanClauses...)
clauses = append(clauses, tInfo.eventClauses...)
needEventTable := len(tInfo.eventClauses) > 0
if len(tInfo.eventClauses) > 0 {
var subquery string
subquery, params = b.buildEventSubquery(q, tInfo.eventClauses, params)
clauses = append(clauses, fmt.Sprintf("EXISTS(%s)", subquery))
}

operationSubquery, params := b.buildOperationSubquery(q, tInfo, params)
if len(operationSubquery) > 0 {
Expand All @@ -252,29 +271,18 @@ func (b *Builder) BuildTraceIDSubquery(q *spanstore.TraceQueryParameters, tInfo

if len(tInfo.generalTags) > 0 {
var tagClause string
var tagNeedsEventTable bool
tagClause, params, tagNeedsEventTable = b.buildTagClauses(tInfo, params)
tagClause, params = b.buildTagClauses(q, tInfo, params)
clauses = append(clauses, tagClause)
needEventTable = needEventTable || tagNeedsEventTable
}
//todo check the inclusive semantics here
var defaultTime time.Time
if q.StartTimeMin != defaultTime {
params = append(params, q.StartTimeMin)
clauses = append(clauses, fmt.Sprintf(`s.start_time >= $%d`, len(params)))

if needEventTable {
params = append(params, q.StartTimeMin.Add(-b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time >= $%d`, len(params)))
}
}
if q.StartTimeMax != defaultTime {
params = append(params, q.StartTimeMax)
clauses = append(clauses, fmt.Sprintf(`s.start_time <= $%d`, len(params)))
if needEventTable {
params = append(params, q.StartTimeMax.Add(b.cfg.MaxTraceDuration))
clauses = append(clauses, fmt.Sprintf(`e.time <= $%d`, len(params)))
}
}

var defaultDuration time.Duration
Expand Down Expand Up @@ -305,14 +313,10 @@ func (b *Builder) BuildTraceIDSubquery(q *spanstore.TraceQueryParameters, tInfo
clauseString = "TRUE"
}

eventJoin := ""
if needEventTable {
eventJoin = "INNER JOIN _ps_trace.event e ON(s.trace_id = e.trace_id AND s.span_id = e.span_id)"
}
params = append(params, b.cfg.MaxTraceDuration)
//Note: the parameter number for b.cfg.MaxTraceDuration is used in two places ($%[1]d in subqueryFormat)
//to both add and subtract from start_time_max.
query := fmt.Sprintf(subqueryFormat, len(params), eventJoin, clauseString)
query := fmt.Sprintf(subqueryFormat, len(params), clauseString)

if q.NumTraces != 0 {
query += fmt.Sprintf(" LIMIT %d", q.NumTraces)
Expand Down
4 changes: 3 additions & 1 deletion pkg/tests/end_to_end_tests/trace_query_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,9 @@ func getTraces(t testing.TB, c httpClient, service string, start, end int64, use
r, err := do(queryUrl)
require.NoError(t, err)

traces := convertToTraces(r.Data.([]interface{}))
data, ok := r.Data.([]interface{})
require.True(t, ok, "Data is not an []interface")
traces := convertToTraces(data)
sort.SliceStable(traces, func(i, j int) bool {
return traces[i].TraceID < traces[j].TraceID
})
Expand Down

0 comments on commit 28276ba

Please sign in to comment.