diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java index 9bc614ea5..eeac910dc 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java @@ -18,11 +18,9 @@ package io.cassandrareaper.jmx; import io.cassandrareaper.ReaperException; -import io.cassandrareaper.core.JmxStat; import io.cassandrareaper.core.Segment; import io.cassandrareaper.service.RingRange; -import java.io.IOException; import java.math.BigInteger; import java.time.Duration; import java.util.Collection; @@ -130,10 +128,4 @@ int triggerRepair( void removeRepairStatusHandler(int repairNo); - Map> collectTpStats() throws JMException, IOException; - - Map> collectDroppedMessages() throws JMException, IOException; - - Map> collectLatencyMetrics() throws JMException, IOException; - } diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java index eb7161d45..2bb94bba1 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java @@ -19,7 +19,6 @@ import io.cassandrareaper.ReaperException; import io.cassandrareaper.core.Cluster; -import io.cassandrareaper.core.JmxStat; import io.cassandrareaper.core.Segment; import io.cassandrareaper.service.RingRange; @@ -30,13 +29,11 @@ import java.net.MalformedURLException; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMISocketFactory; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -50,19 +47,14 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.management.Attribute; -import javax.management.AttributeList; import javax.management.InstanceNotFoundException; import javax.management.JMException; import javax.management.JMX; import javax.management.ListenerNotFoundException; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanInfo; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.Notification; import javax.management.ObjectName; -import javax.management.ReflectionException; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -78,7 +70,6 @@ import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.compaction.CompactionManager; @@ -948,104 +939,6 @@ private void registerConnectionsGauge() { } } - /** - * Collects all attributes for a given set of JMX beans. - * - * @param beans the list of beans to collect through JMX - * @return a map with a key for each bean and a list of jmx stat in generic format. - */ - private Map> collectMetrics(List beans) throws JMException, IOException { - List> allStats = Lists.newArrayList(); - Set beanSet = Sets.newLinkedHashSet(); - for (String bean : beans) { - beanSet.addAll(mbeanServer.queryNames(new ObjectName(bean), null)); - } - - for (Object bean : beanSet) { - ObjectName objName = (ObjectName) bean; - List attributes = scrapeBean(objName); - allStats.add(attributes); - } - - List flatStatList = - allStats.stream().flatMap(attr -> attr.stream()).collect(Collectors.toList()); - - // Group the stats by scope to ease displaying/manipulating the data - Map> groupedStatList = - flatStatList.stream().collect(Collectors.groupingBy(JmxStat::getScope)); - - return groupedStatList; - } - - @Override - public Map> collectTpStats() throws JMException, IOException { - return collectMetrics( - Arrays.asList( - "org.apache.cassandra.metrics:type=ThreadPools,path=request,*", - "org.apache.cassandra.metrics:type=ThreadPools,path=internal,*")); - } - - @Override - public Map> collectDroppedMessages() throws JMException, IOException { - return collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=DroppedMessage,*")); - } - - @Override - public Map> collectLatencyMetrics() throws JMException, IOException { - Map> metrics = - collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=ClientRequest,*")); - LOG.info("latencies : {}", metrics); - - return metrics; - } - - private List scrapeBean(ObjectName mbeanName) { - MBeanInfo info; - List attributeList = Lists.newArrayList(); - try { - info = mbeanServer.getMBeanInfo(mbeanName); - } catch (IOException e) { - return attributeList; - } catch (JMException e) { - LOG.error(mbeanName.toString(), "getMBeanInfo Fail: " + e); - return attributeList; - } - MBeanAttributeInfo[] attrInfos = info.getAttributes(); - - Map name2AttrInfo = new LinkedHashMap(); - for (MBeanAttributeInfo attrInfo : attrInfos) { - MBeanAttributeInfo attr = attrInfo; - if (!attr.isReadable()) { - LOG.warn("{}.{} not readable", mbeanName, attr); - continue; - } - name2AttrInfo.put(attr.getName(), attr); - } - final AttributeList attributes; - try { - attributes = - mbeanServer.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0])); - } catch (RuntimeException | InstanceNotFoundException | ReflectionException | IOException e) { - LOG.error("Fail grabbing attributes for mbean {} ", mbeanName, e); - return attributeList; - } - for (Attribute attribute : attributes.asList()) { - Object value = attribute.getValue(); - JmxStat.Builder jmxStatBuilder = - JmxStat.builder() - .withAttribute(attribute.getName()) - .withName(mbeanName.getKeyProperty("name")) - .withScope(mbeanName.getKeyProperty("scope")); - if (value == null) { - attributeList.add(jmxStatBuilder.withValue(0.0).build()); - } else if (value instanceof Number) { - attributeList.add(jmxStatBuilder.withValue(((Number) value).doubleValue()).build()); - } - } - - return attributeList; - } - StorageServiceMBean getStorageServiceMBean() { return ssProxy; } diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/MetricsProxy.java b/src/server/src/main/java/io/cassandrareaper/jmx/MetricsProxy.java new file mode 100644 index 000000000..b00f9e611 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/jmx/MetricsProxy.java @@ -0,0 +1,131 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.jmx; + + +import io.cassandrareaper.core.JmxStat; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.management.JMException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.ObjectName; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +public final class MetricsProxy { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsProxy.class); + + private final JmxProxyImpl proxy; + + private MetricsProxy(JmxProxyImpl proxy) { + this.proxy = proxy; + } + + public static MetricsProxy create(JmxProxy proxy) { + Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported"); + return new MetricsProxy((JmxProxyImpl)proxy); + } + + public Map> collectTpStats() throws JMException, IOException { + return collectMetrics( + "org.apache.cassandra.metrics:type=ThreadPools,path=request,*", + "org.apache.cassandra.metrics:type=ThreadPools,path=internal,*"); + } + + public Map> collectDroppedMessages() throws JMException, IOException { + return collectMetrics("org.apache.cassandra.metrics:type=DroppedMessage,*"); + } + + public Map> collectLatencyMetrics() throws JMException, IOException { + return collectMetrics("org.apache.cassandra.metrics:type=ClientRequest,*"); + } + + + /** + * Collects all attributes for a given set of JMX beans. + * + * @param beans the list of beans to collect through JMX + * @return a map with a key for each bean and a list of jmx stat in generic format. + */ + private Map> collectMetrics(String... beans) throws JMException, IOException { + List> allStats = Lists.newArrayList(); + Set beanSet = Sets.newLinkedHashSet(); + for (String bean : beans) { + beanSet.addAll(proxy.getMBeanServerConnection().queryNames(new ObjectName(bean), null)); + } + + beanSet.stream() + .map((objName) -> scrapeBean(objName)) + .forEachOrdered(attributes -> allStats.add(attributes)); + + List flatStatList = allStats.stream() + .flatMap(attr -> attr.stream()).collect(Collectors.toList()); + + // Group the stats by scope to ease displaying/manipulating the data + Map> groupedStatList = flatStatList.stream() + .collect(Collectors.groupingBy(JmxStat::getScope)); + + return groupedStatList; + } + + private List scrapeBean(ObjectName mbeanName) { + List attributeList = Lists.newArrayList(); + try { + Map name2AttrInfo = new LinkedHashMap<>(); + MBeanInfo info = proxy.getMBeanServerConnection().getMBeanInfo(mbeanName); + for (MBeanAttributeInfo attrInfo : info.getAttributes()) { + MBeanAttributeInfo attr = attrInfo; + if (!attr.isReadable()) { + LOG.warn("{}.{} not readable", mbeanName, attr); + } else { + name2AttrInfo.put(attr.getName(), attr); + } + } + proxy.getMBeanServerConnection().getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0])) + .asList() + .forEach((attribute) -> { + Object value = attribute.getValue(); + + JmxStat.Builder jmxStatBuilder = JmxStat.builder() + .withAttribute(attribute.getName()) + .withName(mbeanName.getKeyProperty("name")) + .withScope(mbeanName.getKeyProperty("scope")); + + if (null == value) { + attributeList.add(jmxStatBuilder.withValue(0.0).build()); + } else if (value instanceof Number) { + attributeList.add(jmxStatBuilder.withValue(((Number) value).doubleValue()).build()); + } + }); + } catch (JMException | IOException e) { + LOG.error("Fail getting mbeanInfo or grabbing attributes for mbean {} ", mbeanName, e); + } + return attributeList; + } + +} diff --git a/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java b/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java index 71920f67d..bd0f0875d 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java +++ b/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java @@ -24,7 +24,7 @@ import io.cassandrareaper.core.MetricsHistogram; import io.cassandrareaper.core.Node; import io.cassandrareaper.core.ThreadPoolStat; -import io.cassandrareaper.jmx.JmxProxy; +import io.cassandrareaper.jmx.MetricsProxy; import java.io.IOException; import java.util.List; @@ -55,11 +55,10 @@ public static MetricsGrabber create(AppContext context) { public List getTpStats(Node host) throws ReaperException { try { - JmxProxy jmxProxy - = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds()); - - return convertToThreadPoolStats(jmxProxy.collectTpStats()); - } catch (JMException | RuntimeException | InterruptedException | IOException e) { + int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds(); + MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout)); + return convertToThreadPoolStats(proxy.collectTpStats()); + } catch (JMException | InterruptedException | IOException e) { LOG.error("Failed collecting tpstats for host {}", host, e); throw new ReaperException(e); } @@ -81,11 +80,10 @@ public List convertToThreadPoolStats(Map> public List getDroppedMessages(Node host) throws ReaperException { try { - JmxProxy jmxProxy - = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds()); - - return convertToDroppedMessages(jmxProxy.collectDroppedMessages()); - } catch (JMException | RuntimeException | InterruptedException | IOException e) { + int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds(); + MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout)); + return convertToDroppedMessages(proxy.collectDroppedMessages()); + } catch (JMException | InterruptedException | IOException e) { LOG.error("Failed collecting tpstats for host {}", host, e); throw new ReaperException(e); } @@ -107,11 +105,10 @@ public List convertToDroppedMessages(Map> public List getClientRequestLatencies(Node host) throws ReaperException { try { - JmxProxy jmxProxy - = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds()); - - return convertToMetricsHistogram(jmxProxy.collectLatencyMetrics()); - } catch (JMException | RuntimeException | InterruptedException | IOException e) { + int jmxTimeout = context.config.getJmxConnectionTimeoutInSeconds(); + MetricsProxy proxy = MetricsProxy.create(context.jmxConnectionFactory.connect(host, jmxTimeout)); + return convertToMetricsHistogram(proxy.collectLatencyMetrics()); + } catch (JMException | InterruptedException | IOException e) { LOG.error("Failed collecting tpstats for host {}", host, e); throw new ReaperException(e); } diff --git a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java index b3d01a209..d14de0021 100644 --- a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java @@ -19,6 +19,8 @@ import io.cassandrareaper.ReaperException; +import javax.management.MBeanServerConnection; + import com.google.common.base.Preconditions; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; import org.apache.cassandra.service.StorageServiceMBean; @@ -34,6 +36,11 @@ public static JmxProxy mockJmxProxyImpl() { return Mockito.mock(JmxProxyImpl.class); } + public static void mockGetMBeanServerConnection(JmxProxy proxy, MBeanServerConnection serverConnection) { + Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported"); + Mockito.when(((JmxProxyImpl)proxy).getMBeanServerConnection()).thenReturn(serverConnection); + } + public static void mockGetStorageServiceMBean(JmxProxy proxy, StorageServiceMBean storageMBean) { Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported"); Mockito.when(((JmxProxyImpl)proxy).getStorageServiceMBean()).thenReturn(storageMBean); diff --git a/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java b/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java index a766e40a6..a6263533b 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java @@ -20,68 +20,54 @@ import io.cassandrareaper.AppContext; import io.cassandrareaper.ReaperApplicationConfiguration; import io.cassandrareaper.ReaperException; -import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.DroppedMessages; import io.cassandrareaper.core.JmxStat; import io.cassandrareaper.core.Node; import io.cassandrareaper.core.ThreadPoolStat; import io.cassandrareaper.jmx.JmxConnectionFactory; import io.cassandrareaper.jmx.JmxProxy; -import io.cassandrareaper.storage.IStorage; +import io.cassandrareaper.jmx.JmxProxyTest; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; import javax.management.JMException; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class MetricsGrabberTest { @Test - public void testGetTpstats() throws InterruptedException, ReaperException, JMException, IOException { - AppContext context = new AppContext(); - context.config = new ReaperApplicationConfiguration(); - context.config.setJmxConnectionTimeoutInSeconds(10); - - context.storage = mock(IStorage.class); - Cluster cluster = new Cluster("testCluster", "murmur3", new TreeSet<>(Arrays.asList("127.0.0.1"))); - when(context.storage.getCluster(anyString())).thenReturn(Optional.of(cluster)); - - final JmxProxy jmx = mock(JmxProxy.class); - when(jmx.getLiveNodes()).thenReturn(Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3")); + public void testGetTpstats() + throws InterruptedException, ReaperException, JMException, IOException, ClassNotFoundException { + + AppContext cxt = new AppContext(); + cxt.config = new ReaperApplicationConfiguration(); + cxt.config.setJmxConnectionTimeoutInSeconds(10); + cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); + JmxProxy jmx = (JmxProxy) mock(Class.forName("io.cassandrareaper.jmx.JmxProxyImpl")); + when(cxt.jmxConnectionFactory.connect(Mockito.any(Node.class), Mockito.anyInt())).thenReturn(jmx); + MBeanServerConnection serverConn = mock(MBeanServerConnection.class); + JmxProxyTest.mockGetMBeanServerConnection(jmx, serverConn); + + // @todo capture objectName and return valid set of objectNames, + // to properly test MetricsProxy.collectMetrics(..) and MetricsGrabber.convertToThreadPoolStats(..) + when(serverConn.queryNames(Mockito.any(ObjectName.class), Mockito.isNull())).thenReturn(Collections.emptySet()); - final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); - context.jmxConnectionFactory = - new JmxConnectionFactory() { - @Override - public JmxProxy connect(Node host, int connectionTimeout) - throws ReaperException, InterruptedException { - return jmx; - } - - @Override - public JmxProxy connectAny(Cluster cluster, int connectionTimeout) - throws ReaperException { - return jmx; - } - }; Node node = Node.builder().withClusterName("test").withHostname("127.0.0.1").build(); - metricsGrabber.getTpStats(node); - verify(jmx, times(1)).collectTpStats(); + MetricsGrabber.create(cxt).getTpStats(node); + Mockito.verify(serverConn, Mockito.times(2)).queryNames(Mockito.any(ObjectName.class), Mockito.isNull()); } @Test @@ -148,36 +134,25 @@ public void testConvertToThreadPoolStats() { } @Test - public void testGetDroppedMessages() throws InterruptedException, ReaperException, JMException, IOException { - AppContext context = new AppContext(); - context.config = new ReaperApplicationConfiguration(); - context.config.setJmxConnectionTimeoutInSeconds(10); - - context.storage = mock(IStorage.class); - Cluster cluster = new Cluster("testCluster", "murmur3", new TreeSet<>(Arrays.asList("127.0.0.1"))); - when(context.storage.getCluster(anyString())).thenReturn(Optional.of(cluster)); + public void testGetDroppedMessages() + throws InterruptedException, ReaperException, JMException, IOException, ClassNotFoundException { + + AppContext cxt = new AppContext(); + cxt.config = new ReaperApplicationConfiguration(); + cxt.config.setJmxConnectionTimeoutInSeconds(10); + cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); + JmxProxy jmx = (JmxProxy) mock(Class.forName("io.cassandrareaper.jmx.JmxProxyImpl")); + when(cxt.jmxConnectionFactory.connect(Mockito.any(Node.class), Mockito.anyInt())).thenReturn(jmx); + MBeanServerConnection serverConn = mock(MBeanServerConnection.class); + JmxProxyTest.mockGetMBeanServerConnection(jmx, serverConn); + + // @todo capture objectName and return valid set of objectNames, + // to properly test MetricsProxy.collectMetrics(..) and MetricsGrabber.convertToDroppedMessages(..) + when(serverConn.queryNames(Mockito.any(ObjectName.class), Mockito.isNull())).thenReturn(Collections.emptySet()); - final JmxProxy jmx = mock(JmxProxy.class); - when(jmx.getLiveNodes()).thenReturn(Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3")); - - final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); - context.jmxConnectionFactory = - new JmxConnectionFactory() { - @Override - public JmxProxy connect(Node host, int connectionTimeout) - throws ReaperException, InterruptedException { - return jmx; - } - - @Override - public JmxProxy connectAny(Cluster cluster, int connectionTimeout) - throws ReaperException { - return jmx; - } - }; Node node = Node.builder().withClusterName("test").withHostname("127.0.0.1").build(); - metricsGrabber.getDroppedMessages(node); - verify(jmx, times(1)).collectDroppedMessages(); + MetricsGrabber.create(cxt).getDroppedMessages(node); + Mockito.verify(serverConn, Mockito.times(1)).queryNames(Mockito.any(ObjectName.class), Mockito.isNull()); } @Test @@ -235,4 +210,25 @@ public void testConvertToDroppedMessages() { assertEquals(5, dropped.getCount().intValue()); } + @Test + public void testGetClientRequestLatencies() + throws InterruptedException, ReaperException, JMException, IOException, ClassNotFoundException { + + AppContext cxt = new AppContext(); + cxt.config = new ReaperApplicationConfiguration(); + cxt.config.setJmxConnectionTimeoutInSeconds(10); + cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); + JmxProxy jmx = (JmxProxy) mock(Class.forName("io.cassandrareaper.jmx.JmxProxyImpl")); + when(cxt.jmxConnectionFactory.connect(Mockito.any(Node.class), Mockito.anyInt())).thenReturn(jmx); + MBeanServerConnection serverConn = mock(MBeanServerConnection.class); + JmxProxyTest.mockGetMBeanServerConnection(jmx, serverConn); + + // @todo capture objectName and return valid set of objectNames, + // to properly test MetricsProxy.collectMetrics(..) and MetricsGrabber.convertToMetricsHistogram(..) + when(serverConn.queryNames(Mockito.any(ObjectName.class), Mockito.isNull())).thenReturn(Collections.emptySet()); + + Node node = Node.builder().withClusterName("test").withHostname("127.0.0.1").build(); + MetricsGrabber.create(cxt).getClientRequestLatencies(node); + Mockito.verify(serverConn, Mockito.times(1)).queryNames(Mockito.any(ObjectName.class), Mockito.isNull()); + } }