@@ -22,6 +22,7 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
22
22
if err != nil {
23
23
return nil , err
24
24
}
25
+ opts = append (opts , withConfig (conf ))
25
26
cfg := newConfig (opts ... )
26
27
return & Consumer {Consumer : c , cfg : cfg }, nil
27
28
}
@@ -35,30 +36,34 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
35
36
return wrapped
36
37
}
37
38
38
- func (c * Consumer ) Pool ( timeout int ) kafka.Event {
39
+ func (c * Consumer ) Poll ( timeoutMs int ) ( event kafka.Event ) {
39
40
if c .prev != nil {
40
41
c .prev .End ()
41
42
}
42
- event := c .Consumer .Poll (timeout )
43
- switch e := event .(type ) {
43
+ e := c .Consumer .Poll (timeoutMs )
44
+ switch e := e .(type ) {
44
45
case * kafka.Message :
45
46
span := c .startSpan (e )
47
+ // latest span is stored to be closed when the next message is polled or when the consumer is closed
46
48
c .prev = span
47
49
}
48
50
49
- return event
51
+ return e
50
52
}
51
53
52
54
// ReadMessage polls the consumer for a message. Message will be traced.
53
55
func (c * Consumer ) ReadMessage (timeout time.Duration ) (* kafka.Message , error ) {
54
56
if c .prev != nil {
55
- c .prev .End ()
57
+ if c .prev .IsRecording () {
58
+ c .prev .End ()
59
+ }
56
60
c .prev = nil
57
61
}
58
62
msg , err := c .Consumer .ReadMessage (timeout )
59
63
if err != nil {
60
64
return nil , err
61
65
}
66
+ // latest span is stored to be closed when the next message is polled or when the consumer is closed
62
67
c .prev = c .startSpan (msg )
63
68
return msg , nil
64
69
}
@@ -71,7 +76,9 @@ func (c *Consumer) Close() error {
71
76
// not enabled, because otherwise there would be a data race from the
72
77
// consuming goroutine.
73
78
if c .prev != nil {
74
- c .prev .End ()
79
+ if c .prev .IsRecording () {
80
+ c .prev .End ()
81
+ }
75
82
c .prev = nil
76
83
}
77
84
return err
@@ -95,6 +102,11 @@ func (c *Consumer) startSpan(msg *kafka.Message) trace.Span {
95
102
semconv .MessagingDestinationPartitionID (strconv .Itoa (int (msg .TopicPartition .Partition ))),
96
103
semconv .MessagingMessageBodySize (getMsgSize (msg )),
97
104
}
105
+
106
+ if c .cfg .attributeInjectFunc != nil {
107
+ attrs = append (attrs , c .cfg .attributeInjectFunc (msg )... )
108
+ }
109
+
98
110
opts := []trace.SpanStartOption {
99
111
trace .WithAttributes (attrs ... ),
100
112
trace .WithSpanKind (trace .SpanKindConsumer ),
@@ -105,10 +117,3 @@ func (c *Consumer) startSpan(msg *kafka.Message) trace.Span {
105
117
c .cfg .Propagators .Inject (newCtx , carrier )
106
118
return span
107
119
}
108
-
109
- func getMsgSize (msg * kafka.Message ) (size int ) {
110
- for _ , header := range msg .Headers {
111
- size += len (header .Key ) + len (header .Value )
112
- }
113
- return size + len (msg .Value ) + len (msg .Key )
114
- }
0 commit comments