Skip to content

Commit

Permalink
Refactor MetricsProxy out of JmxProxyImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsembwever committed Sep 19, 2018
1 parent 895efba commit 68332c7
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 195 deletions.
8 changes: 0 additions & 8 deletions src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package io.cassandrareaper.jmx;

import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.JmxStat;
import io.cassandrareaper.core.Segment;
import io.cassandrareaper.service.RingRange;

import java.io.IOException;
import java.math.BigInteger;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -130,10 +128,4 @@ int triggerRepair(

void removeRepairStatusHandler(int repairNo);

Map<String, List<JmxStat>> collectTpStats() throws JMException, IOException;

Map<String, List<JmxStat>> collectDroppedMessages() throws JMException, IOException;

Map<String, List<JmxStat>> collectLatencyMetrics() throws JMException, IOException;

}
107 changes: 0 additions & 107 deletions src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.JmxStat;
import io.cassandrareaper.core.Segment;
import io.cassandrareaper.service.RingRange;

Expand All @@ -30,13 +29,11 @@
import java.net.MalformedURLException;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMISocketFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -50,19 +47,14 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.JMX;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
Expand All @@ -78,7 +70,6 @@
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
Expand Down Expand Up @@ -948,104 +939,6 @@ private void registerConnectionsGauge() {
}
}

/**
* Collects all attributes for a given set of JMX beans.
*
* @param beans the list of beans to collect through JMX
* @return a map with a key for each bean and a list of jmx stat in generic format.
*/
private Map<String, List<JmxStat>> collectMetrics(List<String> beans) throws JMException, IOException {
List<List<JmxStat>> allStats = Lists.newArrayList();
Set beanSet = Sets.newLinkedHashSet();
for (String bean : beans) {
beanSet.addAll(mbeanServer.queryNames(new ObjectName(bean), null));
}

for (Object bean : beanSet) {
ObjectName objName = (ObjectName) bean;
List<JmxStat> attributes = scrapeBean(objName);
allStats.add(attributes);
}

List<JmxStat> flatStatList =
allStats.stream().flatMap(attr -> attr.stream()).collect(Collectors.toList());

// Group the stats by scope to ease displaying/manipulating the data
Map<String, List<JmxStat>> groupedStatList =
flatStatList.stream().collect(Collectors.groupingBy(JmxStat::getScope));

return groupedStatList;
}

@Override
public Map<String, List<JmxStat>> collectTpStats() throws JMException, IOException {
return collectMetrics(
Arrays.asList(
"org.apache.cassandra.metrics:type=ThreadPools,path=request,*",
"org.apache.cassandra.metrics:type=ThreadPools,path=internal,*"));
}

@Override
public Map<String, List<JmxStat>> collectDroppedMessages() throws JMException, IOException {
return collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=DroppedMessage,*"));
}

@Override
public Map<String, List<JmxStat>> collectLatencyMetrics() throws JMException, IOException {
Map<String, List<JmxStat>> metrics =
collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=ClientRequest,*"));
LOG.info("latencies : {}", metrics);

return metrics;
}

private List<JmxStat> scrapeBean(ObjectName mbeanName) {
MBeanInfo info;
List<JmxStat> attributeList = Lists.newArrayList();
try {
info = mbeanServer.getMBeanInfo(mbeanName);
} catch (IOException e) {
return attributeList;
} catch (JMException e) {
LOG.error(mbeanName.toString(), "getMBeanInfo Fail: " + e);
return attributeList;
}
MBeanAttributeInfo[] attrInfos = info.getAttributes();

Map<String, MBeanAttributeInfo> name2AttrInfo = new LinkedHashMap<String, MBeanAttributeInfo>();
for (MBeanAttributeInfo attrInfo : attrInfos) {
MBeanAttributeInfo attr = attrInfo;
if (!attr.isReadable()) {
LOG.warn("{}.{} not readable", mbeanName, attr);
continue;
}
name2AttrInfo.put(attr.getName(), attr);
}
final AttributeList attributes;
try {
attributes =
mbeanServer.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0]));
} catch (RuntimeException | InstanceNotFoundException | ReflectionException | IOException e) {
LOG.error("Fail grabbing attributes for mbean {} ", mbeanName, e);
return attributeList;
}
for (Attribute attribute : attributes.asList()) {
Object value = attribute.getValue();
JmxStat.Builder jmxStatBuilder =
JmxStat.builder()
.withAttribute(attribute.getName())
.withName(mbeanName.getKeyProperty("name"))
.withScope(mbeanName.getKeyProperty("scope"));
if (value == null) {
attributeList.add(jmxStatBuilder.withValue(0.0).build());
} else if (value instanceof Number) {
attributeList.add(jmxStatBuilder.withValue(((Number) value).doubleValue()).build());
}
}

return attributeList;
}

StorageServiceMBean getStorageServiceMBean() {
return ssProxy;
}
Expand Down
131 changes: 131 additions & 0 deletions src/server/src/main/java/io/cassandrareaper/jmx/MetricsProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.jmx;


import io.cassandrareaper.core.JmxStat;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.ObjectName;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public final class MetricsProxy {

private static final Logger LOG = LoggerFactory.getLogger(MetricsProxy.class);

private final JmxProxyImpl proxy;

private MetricsProxy(JmxProxyImpl proxy) {
this.proxy = proxy;
}

public static MetricsProxy create(JmxProxy proxy) {
Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported");
return new MetricsProxy((JmxProxyImpl)proxy);
}

public Map<String, List<JmxStat>> collectTpStats() throws JMException, IOException {
return collectMetrics(
"org.apache.cassandra.metrics:type=ThreadPools,path=request,*",
"org.apache.cassandra.metrics:type=ThreadPools,path=internal,*");
}

public Map<String, List<JmxStat>> collectDroppedMessages() throws JMException, IOException {
return collectMetrics("org.apache.cassandra.metrics:type=DroppedMessage,*");
}

public Map<String, List<JmxStat>> collectLatencyMetrics() throws JMException, IOException {
return collectMetrics("org.apache.cassandra.metrics:type=ClientRequest,*");
}


/**
* Collects all attributes for a given set of JMX beans.
*
* @param beans the list of beans to collect through JMX
* @return a map with a key for each bean and a list of jmx stat in generic format.
*/
private Map<String, List<JmxStat>> collectMetrics(String... beans) throws JMException, IOException {
List<List<JmxStat>> allStats = Lists.newArrayList();
Set<ObjectName> beanSet = Sets.newLinkedHashSet();
for (String bean : beans) {
beanSet.addAll(proxy.getMBeanServerConnection().queryNames(new ObjectName(bean), null));
}

beanSet.stream()
.map((objName) -> scrapeBean(objName))
.forEachOrdered(attributes -> allStats.add(attributes));

List<JmxStat> flatStatList = allStats.stream()
.flatMap(attr -> attr.stream()).collect(Collectors.toList());

// Group the stats by scope to ease displaying/manipulating the data
Map<String, List<JmxStat>> groupedStatList = flatStatList.stream()
.collect(Collectors.groupingBy(JmxStat::getScope));

return groupedStatList;
}

private List<JmxStat> scrapeBean(ObjectName mbeanName) {
List<JmxStat> attributeList = Lists.newArrayList();
try {
Map<String, MBeanAttributeInfo> name2AttrInfo = new LinkedHashMap<>();
MBeanInfo info = proxy.getMBeanServerConnection().getMBeanInfo(mbeanName);
for (MBeanAttributeInfo attrInfo : info.getAttributes()) {
MBeanAttributeInfo attr = attrInfo;
if (!attr.isReadable()) {
LOG.warn("{}.{} not readable", mbeanName, attr);
} else {
name2AttrInfo.put(attr.getName(), attr);
}
}
proxy.getMBeanServerConnection().getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0]))
.asList()
.forEach((attribute) -> {
Object value = attribute.getValue();

JmxStat.Builder jmxStatBuilder = JmxStat.builder()
.withAttribute(attribute.getName())
.withName(mbeanName.getKeyProperty("name"))
.withScope(mbeanName.getKeyProperty("scope"));

if (null == value) {
attributeList.add(jmxStatBuilder.withValue(0.0).build());
} else if (value instanceof Number) {
attributeList.add(jmxStatBuilder.withValue(((Number) value).doubleValue()).build());
}
});
} catch (JMException | IOException e) {
LOG.error("Fail getting mbeanInfo or grabbing attributes for mbean {} ", mbeanName, e);
}
return attributeList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.cassandrareaper.core.MetricsHistogram;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.core.ThreadPoolStat;
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.jmx.MetricsProxy;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -55,11 +55,10 @@ public static MetricsGrabber create(AppContext context) {

public List<ThreadPoolStat> getTpStats(Node host) throws ReaperException {
try {
JmxProxy jmxProxy
= context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds());

return convertToThreadPoolStats(jmxProxy.collectTpStats());
} catch (JMException | RuntimeException | InterruptedException | IOException e) {
int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds();
MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout));
return convertToThreadPoolStats(proxy.collectTpStats());
} catch (JMException | InterruptedException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", host, e);
throw new ReaperException(e);
}
Expand All @@ -81,11 +80,10 @@ public List<ThreadPoolStat> convertToThreadPoolStats(Map<String, List<JmxStat>>

public List<DroppedMessages> getDroppedMessages(Node host) throws ReaperException {
try {
JmxProxy jmxProxy
= context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds());

return convertToDroppedMessages(jmxProxy.collectDroppedMessages());
} catch (JMException | RuntimeException | InterruptedException | IOException e) {
int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds();
MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout));
return convertToDroppedMessages(proxy.collectDroppedMessages());
} catch (JMException | InterruptedException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", host, e);
throw new ReaperException(e);
}
Expand All @@ -107,11 +105,10 @@ public List<DroppedMessages> convertToDroppedMessages(Map<String, List<JmxStat>>

public List<MetricsHistogram> getClientRequestLatencies(Node host) throws ReaperException {
try {
JmxProxy jmxProxy
= context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds());

return convertToMetricsHistogram(jmxProxy.collectLatencyMetrics());
} catch (JMException | RuntimeException | InterruptedException | IOException e) {
int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds();
MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout));
return convertToMetricsHistogram(proxy.collectLatencyMetrics());
} catch (JMException | InterruptedException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", host, e);
throw new ReaperException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.cassandrareaper.ReaperException;

import javax.management.MBeanServerConnection;

import com.google.common.base.Preconditions;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.service.StorageServiceMBean;
Expand All @@ -34,6 +36,11 @@ public static JmxProxy mockJmxProxyImpl() {
return Mockito.mock(JmxProxyImpl.class);
}

public static void mockGetMBeanServerConnection(JmxProxy proxy, MBeanServerConnection serverConnection) {
Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported");
Mockito.when(((JmxProxyImpl)proxy).getMBeanServerConnection()).thenReturn(serverConnection);
}

public static void mockGetStorageServiceMBean(JmxProxy proxy, StorageServiceMBean storageMBean) {
Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported");
Mockito.when(((JmxProxyImpl)proxy).getStorageServiceMBean()).thenReturn(storageMBean);
Expand Down
Loading

0 comments on commit 68332c7

Please sign in to comment.