Skip to content

Commit 0e25b67

Browse files
authored
Link Kafka metrics and tracing (#590)
* Standardise httpServer component name * Use Tags.Net.Peer in producer * Add Net.Peer to KafkaConsumer#poll so that the metrics can find corresponding spans * Move some tags from KafkaConsumer$Poll to ListenerConsumer$PollAndInvoke * Add clientId to Producer tags * Add detail and tracing to metrics dashboard * Add detail and tracing to producer dashboard * [tmp] * Fix uri tag * Fix UI style * Add limit to detail query * Add doc * Adjust doc directory * Add kafka consumer metrics * Fix uri
1 parent 6de98eb commit 0e25b67

File tree

30 files changed

+390
-111
lines changed

30 files changed

+390
-111
lines changed

README.md

+37-35
Original file line numberDiff line numberDiff line change
@@ -134,45 +134,47 @@ If the target application runs under JDK 11 and above, the following arguments s
134134

135135
# Supported Components
136136

137-
| Component | Min Version | Max Version | Metrics | Tracing |
138-
|------------------------|-------------|-------------|---------|---------|
139-
| JVM | 1.8 | | ✓ | |
140-
| JDK - Thread Pool | 1.8 | | ✓ | |
141-
| JDK - HTTP Client | 1.8 | | ✓ | ✓ |
142-
| Alibaba Druid | 1.0.28 | | ✓ | |
143-
| Apache Druid | 0.16 | 24.0 | | ✓ |
144-
| Apache Kafka | 0.10 | | ✓ | ✓ |
145-
| Apache OZone | 1.3.0 | | | ✓ |
146-
| Eclipse Glassfish | 2.34 | | | ✓ |
147-
| GRPC | 1.6.0 | | ✓ | |
148-
| Google Guice | 4.1.0 | | | ✓ |
149-
| HTTP Client - Apache | 4.5.2 | | ✓ | ✓ |
150-
| HTTP Client - Jetty | 9.4.6 | | ✓ | ✓ |
151-
| HTTP Client - Netty | 3.10.6 | < 4.0 | &check; | &check; |
152-
| HTTP Client - okhttp3 | 3.2 | 4.9 | &check; | &check; |
153-
| Jersey | 1.19.4 | | | &check; |
154-
| MongoDB | 3.4.2 | | &check; | |
155-
| MySQL | 5.x | 8.x | &check; | |
156-
| Quartz | 2.x | | &check; | &check; |
157-
| Redis - Jedis | 2.9 | | &check; | |
158-
| Redis - Lettuce | 5.1.2 | | &check; | |
159-
| Spring Boot | 1.5 | 3.0+ | | &check; |
160-
| Spring Bean | 4.3.12 | | | &check; |
161-
| Spring Open Feign | 10.12 | | | &check; |
162-
| Spring Rest Template | 4.3.12 | | | &check; |
163-
| Spring Scheduling | 4.3.12 | | | &check; |
164-
| Spring WebFlux | 5.0.0 | | &check; | &check; |
165-
| HTTP Server - Jetty | 9.4.41 | | &check; | &check; |
166-
| HTTP Server - Netty | 2.0.0 | | | &check; |
167-
| HTTP Server - Tomcat | 8.5.20 | | &check; | &check; |
168-
| HTTP Server - Undertow | 1.4.12 | | &check; | &check; |
137+
| Component | Min Version | Max Version | Metrics | Tracing |
138+
|------------------------|-------------|-------------|------------------------------------------------|---------|
139+
| JVM | 1.8 | | &check; | |
140+
| JDK - Thread Pool | 1.8 | | &check; | |
141+
| JDK - HTTP Client | 1.8 | | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
142+
| Alibaba Druid | 1.0.28 | | &check; | |
143+
| Apache Druid | 0.16 | 24.0 | | &check; |
144+
| Apache Kafka | 0.10 | | &check; | &check; |
145+
| Apache OZone | 1.3.0 | | | &check; |
146+
| Eclipse Glassfish | 2.34 | | | &check; |
147+
| GRPC | 1.6.0 | | &check; | |
148+
| Google Guice | 4.1.0 | | | &check; |
149+
| HTTP Client - Apache | 4.5.2 | | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
150+
| HTTP Client - Jetty | 9.4.6 | | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
151+
| HTTP Client - Netty | 3.10.6 | < 4.0 | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
152+
| HTTP Client - okhttp3 | 3.2 | 4.9 | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
153+
| Jersey | 1.19.4 | | | &check; |
154+
| MongoDB | 3.4.2 | | &check; | |
155+
| MySQL | 5.x | 8.x | &check; | |
156+
| Quartz | 2.x | | &check; | &check; |
157+
| Redis - Jedis | 2.9 | | &check; | |
158+
| Redis - Lettuce | 5.1.2 | | &check; | |
159+
| Spring Boot | 1.5 | 3.0+ | | &check; |
160+
| Spring Bean | 4.3.12 | | | &check; |
161+
| Spring Open Feign | 10.12 | | | &check; |
162+
| Spring Rest Template | 4.3.12 | | | &check; |
163+
| Spring Scheduling | 4.3.12 | | | &check; |
164+
| Spring WebFlux | 5.0.0 | | [&check;](doc/metrics/http-outgoing/README.md) | &check; |
165+
| HTTP Server - Jetty | 9.4.41 | | &check; | &check; |
166+
| HTTP Server - Netty | 2.0.0 | | | &check; |
167+
| HTTP Server - Tomcat | 8.5.20 | | &check; | &check; |
168+
| HTTP Server - Undertow | 1.4.12 | | &check; | &check; |
169169

170170

171171
# User Doc
172-
1. [Configuration](doc/configuration/configuration.md)
173-
2. [Diagnosis](doc/diagnosis/README.md)
172+
1. [Metrics](doc/metrics/README.md)
173+
2. [Tracing](doc/tracing/README.md)
174174
3. [Logging](doc/logging/README.md)
175-
4. SDK
175+
4. [Diagnosis](doc/diagnosis/README.md)
176+
5. [Configuration](doc/configuration/configuration.md)
177+
6. SDK
176178
1. [Metrics](doc/sdk/metrics.md)
177179
2. [Tracing](doc/sdk/tracing.md)
178180

agent/agent-plugins/apache-kafka/src/main/java/org/bithon/agent/plugin/apache/kafka/KafkaPluginContext.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
public class KafkaPluginContext {
2828
/**
2929
* Available for consumer.
30-
* default to empty string so that no need to handle null.
30+
* Default to empty string so that no need to handle null.
3131
*/
3232
public String groupId = "";
3333
public String clientId;
3434
public Supplier<String> clusterSupplier;
3535

36+
public String uri;
37+
public String topic;
38+
3639
public static String getCurrentDestination() {
3740
String dest = (String) InterceptorContext.get("kafka-ctx-destination");
3841
return dest == null ? "" : dest;

agent/agent-plugins/apache-kafka/src/main/java/org/bithon/agent/plugin/apache/kafka/consumer/interceptor/KafkaConsumer$Poll.java

+4-16
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
package org.bithon.agent.plugin.apache.kafka.consumer.interceptor;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecords;
20-
import org.apache.kafka.clients.consumer.KafkaConsumer;
2120
import org.bithon.agent.instrumentation.aop.context.AopContext;
2221
import org.bithon.agent.instrumentation.aop.interceptor.InterceptionDecision;
2322
import org.bithon.agent.instrumentation.aop.interceptor.declaration.AroundInterceptor;
2423
import org.bithon.agent.observability.tracing.context.ITraceSpan;
2524
import org.bithon.agent.observability.tracing.context.TraceSpanFactory;
26-
import org.bithon.agent.plugin.apache.kafka.KafkaPluginContext;
25+
import org.bithon.component.commons.tracing.Components;
2726
import org.bithon.component.commons.tracing.Tags;
2827

2928
import java.time.Duration;
@@ -38,33 +37,22 @@ public class KafkaConsumer$Poll extends AroundInterceptor {
3837

3938
@Override
4039
public InterceptionDecision before(AopContext aopContext) {
41-
ITraceSpan span = TraceSpanFactory.newSpan("kafka");
40+
ITraceSpan span = TraceSpanFactory.newSpan(Components.KAFKA);
4241
if (span == null) {
4342
return InterceptionDecision.SKIP_LEAVE;
4443
}
4544

46-
aopContext.setUserContext(span.method(aopContext.getTargetClass(), aopContext.getMethod())
45+
aopContext.setUserContext(span.method(aopContext.getTargetClass(),
46+
aopContext.getMethod())
4747
.start());
4848
return InterceptionDecision.CONTINUE;
4949
}
5050

5151
@Override
5252
public void after(AopContext aopContext) {
53-
KafkaPluginContext kafkaPluginContext = aopContext.getInjectedOnTargetAs();
54-
55-
KafkaConsumer<?, ?> consumer = aopContext.getTargetAs();
56-
String topics = null;
57-
try {
58-
topics = String.join(",", consumer.subscription());
59-
} catch (Exception ignored) {
60-
}
61-
6253
ConsumerRecords<?, ?> records = aopContext.getReturningAs();
6354
ITraceSpan span = aopContext.getUserContextAs();
6455
span.tag(aopContext.getException())
65-
.tag(Tags.Messaging.KAFKA_CONSUMER_GROUP, kafkaPluginContext.groupId)
66-
.tag(Tags.Messaging.KAFKA_CLIENT_ID, kafkaPluginContext.clientId)
67-
.tag(Tags.Messaging.KAFKA_TOPIC, topics)
6856
.tag(Tags.Messaging.COUNT, records == null ? 0 : records.count())
6957
.finish();
7058
}

agent/agent-plugins/apache-kafka/src/main/java/org/bithon/agent/plugin/apache/kafka/consumer/interceptor/ListenerConsumer$Ctor.java

+23-14
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,41 @@ public class ListenerConsumer$Ctor extends AfterInterceptor {
4141
*/
4242
@Override
4343
public void after(AopContext aopContext) {
44-
KafkaConsumer<?, ?> consumer = (KafkaConsumer<?, ?>) ReflectionUtils.getFieldValue(aopContext.getTarget(), "consumer");
44+
KafkaConsumer<?, ?> consumer = (KafkaConsumer<?, ?>) ReflectionUtils.getFieldValue(aopContext.getTarget(),
45+
"consumer");
4546
if (consumer == null) {
4647
return;
4748
}
48-
String cluster = ((KafkaPluginContext) ((IBithonObject) consumer).getInjectedObject()).clusterSupplier.get();
4949

50-
ContainerProperties properties = (ContainerProperties) ReflectionUtils.getFieldValue(aopContext.getTarget(), "containerProperties");
50+
ContainerProperties properties = (ContainerProperties) ReflectionUtils.getFieldValue(aopContext.getTarget(),
51+
"containerProperties");
5152
if (properties == null) {
5253
return;
5354
}
5455

55-
String uri = "kafka://" + cluster;
56-
String[] topics = properties.getTopics();
57-
if (topics != null) {
58-
uri += "?topic=" + String.join(",", topics);
59-
} else if (properties.getTopicPattern() != null) {
60-
uri += "?topic=" + properties.getTopicPattern().pattern();
61-
} else {
62-
TopicPartitionOffset[] partitions = properties.getTopicPartitions();
63-
if (partitions != null) {
64-
uri += "?topic=" + Stream.of(partitions).map(TopicPartitionOffset::getTopic).collect(Collectors.joining(","));
56+
KafkaPluginContext pluginContext = ((KafkaPluginContext) ((IBithonObject) consumer).getInjectedObject());
57+
{
58+
String topicString = null;
59+
String[] topics = properties.getTopics();
60+
if (topics != null) {
61+
topicString = String.join(",", topics);
62+
} else if (properties.getTopicPattern() != null) {
63+
topicString = properties.getTopicPattern().pattern();
64+
} else {
65+
TopicPartitionOffset[] partitions = properties.getTopicPartitions();
66+
if (partitions != null) {
67+
topicString = Stream.of(partitions)
68+
.map(TopicPartitionOffset::getTopic)
69+
.collect(Collectors.joining(","));
70+
}
6571
}
72+
String cluster = pluginContext.clusterSupplier.get();
73+
pluginContext.uri = "kafka://" + cluster + (topicString == null ? "" : "?topic=" + topicString);
74+
pluginContext.topic = topicString;
6675
}
6776

6877
// Keep the uri for further use
6978
IBithonObject bithonObject = aopContext.getTargetAs();
70-
bithonObject.setInjectedObject(uri);
79+
bithonObject.setInjectedObject(pluginContext);
7180
}
7281
}

agent/agent-plugins/apache-kafka/src/main/java/org/bithon/agent/plugin/apache/kafka/consumer/interceptor/ListenerConsumer$PollAndInvoke.java

+35-12
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
import org.bithon.agent.observability.tracing.context.ITraceSpan;
2828
import org.bithon.agent.observability.tracing.context.TraceContextFactory;
2929
import org.bithon.agent.observability.tracing.context.TraceContextHolder;
30+
import org.bithon.agent.observability.tracing.context.TraceMode;
3031
import org.bithon.agent.observability.tracing.sampler.ISampler;
3132
import org.bithon.agent.observability.tracing.sampler.SamplerFactory;
33+
import org.bithon.agent.plugin.apache.kafka.KafkaPluginContext;
34+
import org.bithon.component.commons.tracing.Components;
3235
import org.bithon.component.commons.tracing.SpanKind;
36+
import org.bithon.component.commons.tracing.Tags;
3337

3438
/**
3539
* {@link org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer}
@@ -40,19 +44,28 @@
4044
public class ListenerConsumer$PollAndInvoke extends AroundInterceptor {
4145

4246
private final ISampler sampler = SamplerFactory.createSampler(ConfigurationManager.getInstance()
43-
.getDynamicConfig("tracing.samplingConfigs.kafka-consumer",
44-
TraceSamplingConfig.class));
47+
.getDynamicConfig(
48+
"tracing.samplingConfigs.kafka-consumer",
49+
TraceSamplingConfig.class));
4550

4651
@Override
4752
public InterceptionDecision before(AopContext aopContext) {
53+
IBithonObject bithonObject = aopContext.getTargetAs();
54+
KafkaPluginContext pluginContext = (KafkaPluginContext) bithonObject.getInjectedObject();
55+
if (pluginContext == null) {
56+
return InterceptionDecision.SKIP_LEAVE;
57+
}
58+
4859
ITraceContext context = TraceContextFactory.create(sampler.decideSamplingMode(null),
4960
Tracer.get().traceIdGenerator().newTraceId());
5061

51-
aopContext.setUserContext(context.currentSpan()
52-
.component("kafka")
53-
.kind(SpanKind.CONSUMER)
54-
.method(aopContext.getTargetClass(), aopContext.getMethod())
55-
.start());
62+
if (TraceMode.TRACING.equals(context.traceMode())) {
63+
aopContext.setUserContext(context.currentSpan()
64+
.component(Components.KAFKA)
65+
.kind(SpanKind.CONSUMER)
66+
.method(aopContext.getTargetClass(), aopContext.getMethod())
67+
.start());
68+
}
5669

5770
TraceContextHolder.set(context);
5871
return InterceptionDecision.CONTINUE;
@@ -62,11 +75,21 @@ public InterceptionDecision before(AopContext aopContext) {
6275
public void after(AopContext aopContext) {
6376
ITraceSpan span = aopContext.getUserContextAs();
6477

65-
span.tag(aopContext.getException())
66-
.tag("status", aopContext.hasException() ? "500" : "200")
67-
.tag("uri", ((IBithonObject) aopContext.getTarget()).getInjectedObject())
68-
.finish();
69-
span.context().finish();
78+
if (span != null) {
79+
IBithonObject bithonObject = aopContext.getTargetAs();
80+
KafkaPluginContext pluginContext = (KafkaPluginContext) bithonObject.getInjectedObject();
81+
82+
span.tag(aopContext.getException())
83+
.tag("status", aopContext.hasException() ? "500" : "200")
84+
.tag("uri", pluginContext.uri)
85+
.tag(Tags.Net.PEER, pluginContext.clusterSupplier.get())
86+
.tag(Tags.Messaging.KAFKA_TOPIC, pluginContext.topic)
87+
.tag(Tags.Messaging.KAFKA_CONSUMER_GROUP, pluginContext.groupId)
88+
.tag(Tags.Messaging.KAFKA_CLIENT_ID, pluginContext.clientId)
89+
.finish();
90+
91+
span.context().finish();
92+
}
7093

7194
TraceContextHolder.remove();
7295
}

agent/agent-plugins/apache-kafka/src/main/java/org/bithon/agent/plugin/apache/kafka/producer/interceptor/KafkaProducer$DoSend.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.bithon.agent.observability.tracing.context.TraceSpanFactory;
2828
import org.bithon.agent.plugin.apache.kafka.KafkaPluginContext;
2929
import org.bithon.agent.plugin.apache.kafka.producer.KafkaProducerTracingConfig;
30+
import org.bithon.component.commons.tracing.Components;
3031
import org.bithon.component.commons.tracing.SpanKind;
3132
import org.bithon.component.commons.tracing.Tags;
3233
import org.bithon.component.commons.utils.ReflectionUtils;
@@ -40,11 +41,12 @@
4041
*/
4142
public class KafkaProducer$DoSend extends AroundInterceptor {
4243

43-
private final KafkaProducerTracingConfig tracingConfig = ConfigurationManager.getInstance().getConfig(KafkaProducerTracingConfig.class);
44+
private final KafkaProducerTracingConfig tracingConfig = ConfigurationManager.getInstance()
45+
.getConfig(KafkaProducerTracingConfig.class);
4446

4547
@Override
4648
public InterceptionDecision before(AopContext aopContext) {
47-
ITraceSpan span = TraceSpanFactory.newSpan("kafka");
49+
ITraceSpan span = TraceSpanFactory.newSpan(Components.KAFKA);
4850
if (span == null) {
4951
return InterceptionDecision.SKIP_LEAVE;
5052
}
@@ -69,11 +71,13 @@ public InterceptionDecision before(AopContext aopContext) {
6971
size = ((ByteBuffer) record.value()).remaining();
7072
}
7173

72-
String cluster = ((KafkaPluginContext) ((IBithonObject) aopContext.getTarget()).getInjectedObject()).clusterSupplier.get();
74+
IBithonObject bithonObject = aopContext.getTargetAs();
75+
KafkaPluginContext pluginContext = (KafkaPluginContext) bithonObject.getInjectedObject();
7376

7477
aopContext.setUserContext(span.method(aopContext.getTargetClass(), aopContext.getMethod())
7578
.kind(SpanKind.PRODUCER)
76-
.tag("uri", "kafka://" + cluster)
79+
.tag(Tags.Net.PEER, pluginContext.clusterSupplier.get())
80+
.tag(Tags.Messaging.KAFKA_CLIENT_ID, pluginContext.clientId)
7781
.tag(Tags.Messaging.KAFKA_TOPIC, record.topic())
7882
.tag(Tags.Messaging.KAFKA_SOURCE_PARTITION, record.partition())
7983
.tag(Tags.Messaging.BYTES, size)
@@ -90,7 +94,8 @@ public void after(AopContext aopContext) {
9094
} else {
9195
Object returning = aopContext.getReturning();
9296
if (returning != null) {
93-
if ("org.apache.kafka.clients.producer.KafkaProducer$FutureFailure".equals(returning.getClass().getName())) {
97+
if ("org.apache.kafka.clients.producer.KafkaProducer$FutureFailure".equals(returning.getClass()
98+
.getName())) {
9499
Exception exception = (Exception) ReflectionUtils.getFieldValue(returning, "exception");
95100
if (exception != null) {
96101
span.tag(exception.getCause());

0 commit comments

Comments
 (0)