Skip to content

Commit 853aa84

Browse files
authored
[INLONG-2264] DataProxy get metric value with error JMX ObjectName (#2275)
* [INLONG-2264] DataProxy get metric value with error JMX ObjectName * add UT for this bug * Thread.sleep(2000);
1 parent 912a30a commit 853aa84

File tree

3 files changed

+29
-28
lines changed

3 files changed

+29
-28
lines changed

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.inlong.commons.config.metrics.MetricItem;
3737
import org.apache.inlong.commons.config.metrics.MetricItemMBean;
3838
import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
39+
import org.apache.inlong.commons.config.metrics.MetricRegister;
40+
import org.apache.inlong.commons.config.metrics.MetricUtils;
3941
import org.apache.inlong.commons.config.metrics.MetricValue;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
@@ -46,7 +48,7 @@
4648
*/
4749
public class MetricListenerRunnable implements Runnable {
4850

49-
public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
51+
public static final Logger LOG = LoggerFactory.getLogger(MetricListenerRunnable.class);
5052

5153
private String domain;
5254
private List<MetricListener> listenerList;
@@ -92,9 +94,14 @@ public void run() {
9294
* @throws ClassNotFoundException
9395
*/
9496
@SuppressWarnings("unchecked")
95-
private List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
97+
public List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
9698
ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException {
97-
ObjectName objName = new ObjectName(domain + MetricItemMBean.DOMAIN_SEPARATOR + "*");
99+
StringBuilder beanName = new StringBuilder();
100+
beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
101+
.append("type=").append(MetricUtils.getDomain(DataProxyMetricItemSet.class))
102+
.append(MetricItemMBean.PROPERTY_SEPARATOR)
103+
.append("*");
104+
ObjectName objName = new ObjectName(beanName.toString());
98105
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
99106
Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
100107
LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);

inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@
5252
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
5353
import org.apache.inlong.dataproxy.metrics.MetricItemValue;
5454
import org.apache.inlong.dataproxy.metrics.MetricListener;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
5557

5658
import io.prometheus.client.Collector;
5759
import io.prometheus.client.CounterMetricFamily;
5860
import io.prometheus.client.exporter.HTTPServer;
59-
import org.slf4j.Logger;
60-
import org.slf4j.LoggerFactory;
6161

6262
/**
6363
*
@@ -70,10 +70,9 @@ public class PrometheusMetricListener extends Collector implements MetricListene
7070
public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
7171
public static final String DEFAULT_DIMENSION_LABEL = "dimension";
7272

73-
//
73+
private String metricName;
7474
private DataProxyMetricItem metricItem;
7575
private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
76-
private String metricName;
7776
protected HTTPServer httpServer;
7877
private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
7978
private List<String> dimensionKeys = new ArrayList<>();
@@ -82,19 +81,20 @@ public class PrometheusMetricListener extends Collector implements MetricListene
8281
* Constructor
8382
*/
8483
public PrometheusMetricListener() {
84+
this.metricName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
8585
this.metricItem = new DataProxyMetricItem();
86-
this.metricItem.clusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
86+
this.metricItem.clusterId = metricName;
8787
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
8888
StringBuilder beanName = new StringBuilder();
89-
beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
90-
this.metricName = beanName.toString();
89+
beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyPrometheus");
90+
String strBeanName = beanName.toString();
9191
try {
92-
ObjectName objName = new ObjectName(metricName);
92+
ObjectName objName = new ObjectName(strBeanName);
9393
mbs.registerMBean(metricItem, objName);
9494
} catch (Exception ex) {
95-
LOG.error("exception while register mbean:{},error:{}", metricName, ex.getMessage());
95+
LOG.error("exception while register mbean:{},error:{}", strBeanName, ex);
9696
}
97-
//
97+
// prepare metric value map
9898
metricValueMap.put(M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
9999
metricValueMap.put(M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
100100
metricValueMap.put(M_READ_FAIL_COUNT, metricItem.readFailCount);
@@ -115,6 +115,7 @@ public PrometheusMetricListener() {
115115
int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
116116
try {
117117
this.httpServer = new HTTPServer(httpPort);
118+
this.register();
118119
} catch (IOException e) {
119120
LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
120121
}
@@ -142,7 +143,7 @@ public void snapshot(String domain, List<MetricItemValue> itemValues) {
142143
// id dimension
143144
String dimensionKey = itemValue.getKey();
144145
MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
145-
if (dimensionKey == null) {
146+
if (dimensionMetricValue == null) {
146147
dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
147148
new ConcurrentHashMap<String, MetricValue>());
148149
this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
@@ -202,7 +203,7 @@ public List<MetricFamilySamples> collect() {
202203

203204
// id dimension
204205
CounterMetricFamily idCounter = new CounterMetricFamily(metricName + "&group=id",
205-
"The metrics of inlong dataflow.", this.dimensionKeys);
206+
"The metrics of inlong datastream.", this.dimensionKeys);
206207
for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
207208
MetricItemValue itemValue = entry.getValue();
208209
// read

inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.inlong.commons.config.metrics.MetricRegister;
2727
import org.apache.inlong.commons.config.metrics.MetricUtils;
2828
import org.apache.inlong.commons.config.metrics.MetricValue;
29-
import org.junit.BeforeClass;
3029
import org.junit.Test;
3130

3231
/**
@@ -54,10 +53,12 @@ public class TestMetricListenerRunnable {
5453
private static String keySink2;
5554

5655
/**
57-
* setup
56+
* testResult
57+
*
58+
* @throws Exception
5859
*/
59-
@BeforeClass
60-
public static void setup() {
60+
@Test
61+
public void testResult() throws Exception {
6162
itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
6263
MetricRegister.register(itemSet);
6364
// prepare
@@ -76,15 +77,6 @@ public static void setup() {
7677
itemSink.inlongGroupId = INLONG_GROUP_ID1;
7778
itemSink.inlongStreamId = INLONG_STREAM_ID;
7879
dimSink = itemSink.getDimensions();
79-
}
80-
81-
/**
82-
* testResult
83-
*
84-
* @throws Exception
85-
*/
86-
@Test
87-
public void testResult() throws Exception {
8880
// increase source
8981
DataProxyMetricItem item = null;
9082
item = itemSet.findMetricItem(dimSource);
@@ -146,6 +138,7 @@ public void snapshot(String domain, List<MetricItemValue> itemValues) {
146138
List<MetricListener> listeners = new ArrayList<>();
147139
listeners.add(listener);
148140
MetricListenerRunnable runnable = new MetricListenerRunnable("DataProxy", listeners);
141+
List<MetricItemValue> itemValues = runnable.getItemValues();
149142
runnable.run();
150143
}
151144
}

0 commit comments

Comments
 (0)