diff --git a/src/server/src/main/java/io/cassandrareaper/AppContext.java b/src/server/src/main/java/io/cassandrareaper/AppContext.java index c72af4516..abdd0ed0c 100644 --- a/src/server/src/main/java/io/cassandrareaper/AppContext.java +++ b/src/server/src/main/java/io/cassandrareaper/AppContext.java @@ -15,6 +15,7 @@ package io.cassandrareaper; import io.cassandrareaper.jmx.JmxConnectionFactory; +import io.cassandrareaper.service.MetricsGrabber; import io.cassandrareaper.service.PurgeManager; import io.cassandrareaper.service.RepairManager; import io.cassandrareaper.service.SnapshotManager; @@ -48,6 +49,7 @@ public final class AppContext { public MetricRegistry metricRegistry = new MetricRegistry(); public SnapshotManager snapshotManager; public PurgeManager purgeManager; + public MetricsGrabber metricsGrabber; private static String initialiseInstanceAddress() { String reaperInstanceAddress; diff --git a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java index 5462a9f75..de729aadc 100644 --- a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java +++ b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java @@ -19,6 +19,7 @@ import io.cassandrareaper.jmx.JmxConnectionFactory; import io.cassandrareaper.jmx.JmxConnectionsInitializer; import io.cassandrareaper.resources.ClusterResource; +import io.cassandrareaper.resources.NodeStatsResource; import io.cassandrareaper.resources.PingResource; import io.cassandrareaper.resources.ReaperHealthCheck; import io.cassandrareaper.resources.RepairRunResource; @@ -27,6 +28,7 @@ import io.cassandrareaper.resources.auth.LoginResource; import io.cassandrareaper.resources.auth.ShiroExceptionMapper; import io.cassandrareaper.service.AutoSchedulingManager; +import io.cassandrareaper.service.MetricsGrabber; import io.cassandrareaper.service.PurgeManager; import io.cassandrareaper.service.RepairManager; import io.cassandrareaper.service.SchedulingManager; @@ -158,6 +160,9 @@ public void run(ReaperApplicationConfiguration config, Environment environment) context, environment.lifecycle().executorService("SnapshotManager").minThreads(5).maxThreads(5).build()); + context.snapshotManager = SnapshotManager.create(context); + context.metricsGrabber = MetricsGrabber.create(context); + int repairThreads = config.getRepairRunThreadCount(); LOG.info("initializing runner thread pool with {} threads", repairThreads); @@ -235,6 +240,9 @@ public void run(ReaperApplicationConfiguration config, Environment environment) final SnapshotResource snapshotResource = new SnapshotResource(context); environment.jersey().register(snapshotResource); + final NodeStatsResource nodeStatsResource = new NodeStatsResource(context); + environment.jersey().register(nodeStatsResource); + if (config.isAccessControlEnabled()) { SessionHandler sessionHandler = new SessionHandler(); sessionHandler.setMaxInactiveInterval((int) config.getAccessControl().getSessionTimeout().getSeconds()); diff --git a/src/server/src/main/java/io/cassandrareaper/core/DroppedMessages.java b/src/server/src/main/java/io/cassandrareaper/core/DroppedMessages.java new file mode 100644 index 000000000..e9d7d8835 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/core/DroppedMessages.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.core; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +@JsonDeserialize(builder = DroppedMessages.Builder.class) +public final class DroppedMessages { + private final String name; + private final Integer count; + private final Double oneMinuteRate; + private final Double fiveMinuteRate; + private final Double fifteenMinuteRate; + private final Double meanRate; + + private DroppedMessages(Builder builder) { + this.name = builder.name; + this.count = builder.count; + this.oneMinuteRate = builder.oneMinuteRate; + this.fiveMinuteRate = builder.fiveMinuteRate; + this.fifteenMinuteRate = builder.fifteenMinuteRate; + this.meanRate = builder.meanRate; + } + + public String getName() { + return name; + } + + public Integer getCount() { + return count; + } + + public Double getOneMinuteRate() { + return oneMinuteRate; + } + + public Double getFiveMinuteRate() { + return fiveMinuteRate; + } + + public Double getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public Double getMeanRate() { + return meanRate; + } + + + public static Builder builder() { + return new Builder(); + } + + @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with") + public static final class Builder { + private String name; + private Integer count; + private Double oneMinuteRate; + private Double fiveMinuteRate; + private Double fifteenMinuteRate; + private Double meanRate; + + private Builder() {} + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withCount(Integer count) { + this.count = count; + return this; + } + + public Builder withOneMinuteRate(Double oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + return this; + } + + public Builder withFiveMinuteRate(Double fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + return this; + } + + public Builder withFifteenMinuteRate(Double fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + return this; + } + + public Builder withMeanRate(Double meanRate) { + this.meanRate = meanRate; + return this; + } + + public DroppedMessages build() { + return new DroppedMessages(this); + } + } +} diff --git a/src/server/src/main/java/io/cassandrareaper/core/JmxStat.java b/src/server/src/main/java/io/cassandrareaper/core/JmxStat.java new file mode 100644 index 000000000..057a2a66d --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/core/JmxStat.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.core; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +@JsonDeserialize(builder = JmxStat.Builder.class) +public final class JmxStat { + private final String scope; + private final String name; + private final String attribute; + private final Double value; + + private JmxStat(Builder builder) { + this.scope = builder.scope; + this.name = builder.name; + this.attribute = builder.attribute; + this.value = builder.value; + } + + public String getScope() { + return scope; + } + + public String getName() { + return name; + } + + public String getAttribute() { + return attribute; + } + + public Double getValue() { + return value; + } + + + @Override + public String toString() { + return scope + "/" + name + "/" + attribute + " = " + value; + } + + public static Builder builder() { + return new Builder(); + } + + @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with") + public static final class Builder { + private String scope; + private String name; + private String attribute; + private Double value; + + private Builder() {} + + public Builder withScope(String scope) { + this.scope = scope; + return this; + } + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withAttribute(String attribute) { + this.attribute = attribute; + return this; + } + + public Builder withValue(Double value) { + this.value = value; + return this; + } + + public JmxStat build() { + return new JmxStat(this); + } + } +} \ No newline at end of file diff --git a/src/server/src/main/java/io/cassandrareaper/core/MetricsHistogram.java b/src/server/src/main/java/io/cassandrareaper/core/MetricsHistogram.java new file mode 100644 index 000000000..b031aca34 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/core/MetricsHistogram.java @@ -0,0 +1,244 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.core; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +@JsonDeserialize(builder = MetricsHistogram.Builder.class) +public final class MetricsHistogram { + private final String name; + private final String type; + private final Double p50; + private final Double p75; + private final Double p95; + private final Double p98; + private final Double p99; + private final Double p999; + private final Double min; + private final Double mean; + private final Double max; + private final Integer count; + private final Double oneMinuteRate; + private final Double fiveMinuteRate; + private final Double fifteenMinuteRate; + private final Double meanRate; + private final Double stdDev; + + private MetricsHistogram(Builder builder) { + this.name = builder.name; + this.type = builder.type; + this.p50 = builder.p50; + this.p75 = builder.p75; + this.p95 = builder.p95; + this.p98 = builder.p98; + this.p99 = builder.p99; + this.p999 = builder.p999; + this.min = builder.min; + this.mean = builder.mean; + this.max = builder.max; + this.count = builder.count; + this.oneMinuteRate = builder.oneMinuteRate; + this.fiveMinuteRate = builder.fiveMinuteRate; + this.fifteenMinuteRate = builder.fifteenMinuteRate; + this.meanRate = builder.meanRate; + this.stdDev = builder.stdDev; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public Double getP50() { + return p50; + } + + public Double getP75() { + return p75; + } + + public Double getP95() { + return p95; + } + + public Double getP98() { + return p98; + } + + public Double getP99() { + return p99; + } + + public Double getP999() { + return p999; + } + + public Double getMin() { + return min; + } + + public Double getMean() { + return mean; + } + + public Double getMax() { + return max; + } + + public Integer getCount() { + return count; + } + + public Double getOneMinuteRate() { + return oneMinuteRate; + } + + public Double getFiveMinuteRate() { + return fiveMinuteRate; + } + + public Double getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public Double getMeanRate() { + return meanRate; + } + + public Double getStdDev() { + return stdDev; + } + + public static Builder builder() { + return new Builder(); + } + + @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with") + public static final class Builder { + private String name; + private String type; + private Double p50; + private Double p75; + private Double p95; + private Double p98; + private Double p99; + private Double p999; + private Double min; + private Double mean; + private Double max; + private Integer count; + private Double oneMinuteRate; + private Double fiveMinuteRate; + private Double fifteenMinuteRate; + private Double meanRate; + private Double stdDev; + + private Builder() {} + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withType(String type) { + this.type = type; + return this; + } + + public Builder withP50(Double p50) { + this.p50 = p50; + return this; + } + + public Builder withP75(Double p75) { + this.p75 = p75; + return this; + } + + public Builder withP95(Double p95) { + this.p95 = p95; + return this; + } + + public Builder withP98(Double p98) { + this.p98 = p98; + return this; + } + + public Builder withP99(Double p99) { + this.p99 = p99; + return this; + } + + public Builder withP999(Double p999) { + this.p999 = p999; + return this; + } + + public Builder withMin(Double min) { + this.min = min; + return this; + } + + public Builder withMean(Double mean) { + this.mean = mean; + return this; + } + + public Builder withMax(Double max) { + this.max = max; + return this; + } + + public Builder withCount(Integer count) { + this.count = count; + return this; + } + + public Builder withOneMinuteRate(Double oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + return this; + } + + public Builder withFiveMinuteRate(Double fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + return this; + } + + public Builder withFifteenMinuteRate(Double fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + return this; + } + + public Builder withMeanRate(Double meanRate) { + this.meanRate = meanRate; + return this; + } + + public Builder withStdDev(Double stdDev) { + this.stdDev = stdDev; + return this; + } + + public MetricsHistogram build() { + return new MetricsHistogram(this); + } + } + +} diff --git a/src/server/src/main/java/io/cassandrareaper/core/ThreadPoolStat.java b/src/server/src/main/java/io/cassandrareaper/core/ThreadPoolStat.java new file mode 100644 index 000000000..5e1585e74 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/core/ThreadPoolStat.java @@ -0,0 +1,125 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.core; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +@JsonDeserialize(builder = ThreadPoolStat.Builder.class) +public final class ThreadPoolStat { + private final String name; + private final Integer activeTasks; + private final Integer pendingTasks; + private final Integer completedTasks; + private final Integer currentlyBlockedTasks; + private final Integer totalBlockedTasks; + private final Integer maxPoolSize; + + private ThreadPoolStat(Builder builder) { + this.name = builder.name; + this.activeTasks = builder.activeTasks; + this.pendingTasks = builder.pendingTasks; + this.currentlyBlockedTasks = builder.currentlyBlockedTasks; + this.completedTasks = builder.completedTasks; + this.totalBlockedTasks = builder.totalBlockedTasks; + this.maxPoolSize = builder.maxPoolSize; + } + + public String getName() { + return name; + } + + public Integer getActiveTasks() { + return activeTasks; + } + + public Integer getPendingTasks() { + return pendingTasks; + } + + public Integer getCurrentlyBlockedTasks() { + return currentlyBlockedTasks; + } + + public Integer getCompletedTasks() { + return completedTasks; + } + + public Integer getTotalBlockedTasks() { + return totalBlockedTasks; + } + + public Integer getMaxPoolSize() { + return maxPoolSize; + } + + + + public static Builder builder() { + return new Builder(); + } + + @JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with") + public static final class Builder { + private String name; + private Integer activeTasks; + private Integer pendingTasks; + private Integer currentlyBlockedTasks; + private Integer completedTasks; + private Integer totalBlockedTasks; + private Integer maxPoolSize; + + private Builder() {} + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withActiveTasks(Integer activeTasks) { + this.activeTasks = activeTasks; + return this; + } + + public Builder withPendingTasks(Integer pendingTasks) { + this.pendingTasks = pendingTasks; + return this; + } + + public Builder withCurrentlyBlockedTasks(Integer currentlyBlockedTasks) { + this.currentlyBlockedTasks = currentlyBlockedTasks; + return this; + } + + public Builder withCompletedTasks(Integer completedTasks) { + this.completedTasks = completedTasks; + return this; + } + + public Builder withTotalBlockedTasks(Integer totalBlockedTasks) { + this.totalBlockedTasks = totalBlockedTasks; + return this; + } + + public Builder withMaxPoolSize(Integer maxPoolSize) { + this.maxPoolSize = maxPoolSize; + return this; + } + + public ThreadPoolStat build() { + return new ThreadPoolStat(this); + } + } +} diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java index 1bfb71769..123d88692 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxy.java @@ -15,10 +15,12 @@ package io.cassandrareaper.jmx; import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.JmxStat; import io.cassandrareaper.core.Segment; import io.cassandrareaper.core.Snapshot; import io.cassandrareaper.service.RingRange; +import java.io.IOException; import java.math.BigInteger; import java.time.Duration; import java.util.Collection; @@ -27,7 +29,9 @@ import java.util.Set; import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; import javax.management.NotificationListener; import javax.management.ReflectionException; import javax.validation.constraints.NotNull; @@ -146,4 +150,16 @@ int triggerRepair( void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String snapshotName) throws ReaperException; + + Map> collectTpStats() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException; + + Map> collectDroppedMessages() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException; + + Map> collectLatencyMetrics() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException; } diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java index 335be5f90..a8bdf7bfe 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java @@ -16,6 +16,7 @@ import io.cassandrareaper.ReaperException; import io.cassandrareaper.core.Cluster; +import io.cassandrareaper.core.JmxStat; import io.cassandrareaper.core.Segment; import io.cassandrareaper.core.Snapshot; import io.cassandrareaper.core.Snapshot.Builder; @@ -29,11 +30,13 @@ import java.net.UnknownHostException; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMISocketFactory; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,11 +49,16 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import javax.management.Attribute; +import javax.management.AttributeList; import javax.management.AttributeNotFoundException; import javax.management.InstanceNotFoundException; +import javax.management.JMException; import javax.management.JMX; import javax.management.ListenerNotFoundException; +import javax.management.MBeanAttributeInfo; import javax.management.MBeanException; +import javax.management.MBeanInfo; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.Notification; @@ -72,6 +80,7 @@ import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.gms.FailureDetector; @@ -1135,4 +1144,113 @@ public void takeColumnFamilySnapshot( throw new ReaperException(e); } } + + /** + * Collects all attributes for a given set of JMX beans. + * + * @param beans the list of beans to collect through JMX + * @return a map with a key for each bean and a list of jmx stat in generic format. + */ + private Map> collectMetrics(List beans) + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException { + List> allStats = Lists.newArrayList(); + Set beanSet = Sets.newLinkedHashSet(); + for (String bean : beans) { + beanSet.addAll(mbeanServer.queryNames(new ObjectName(bean), null)); + } + + for (Object bean : beanSet) { + ObjectName objName = (ObjectName) bean; + List attributes = scrapeBean(objName); + allStats.add(attributes); + } + + List flatStatList = + allStats.stream().flatMap(attr -> attr.stream()).collect(Collectors.toList()); + + // Group the stats by scope to ease displaying/manipulating the data + Map> groupedStatList = + flatStatList.stream().collect(Collectors.groupingBy(JmxStat::getScope)); + + return groupedStatList; + } + + @Override + public Map> collectTpStats() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException { + + return collectMetrics( + Arrays.asList( + "org.apache.cassandra.metrics:type=ThreadPools,path=request,*", + "org.apache.cassandra.metrics:type=ThreadPools,path=internal,*")); + } + + @Override + public Map> collectDroppedMessages() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException { + + return collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=DroppedMessage,*")); + } + + @Override + public Map> collectLatencyMetrics() + throws MalformedObjectNameException, IOException, AttributeNotFoundException, + InstanceNotFoundException, MBeanException, ReflectionException { + + Map> metrics = + collectMetrics(Arrays.asList("org.apache.cassandra.metrics:type=ClientRequest,*")); + LOG.info("latencies : {}", metrics); + + return metrics; + } + + private List scrapeBean(ObjectName mbeanName) { + MBeanInfo info; + List attributeList = Lists.newArrayList(); + try { + info = mbeanServer.getMBeanInfo(mbeanName); + } catch (IOException e) { + return attributeList; + } catch (JMException e) { + LOG.error(mbeanName.toString(), "getMBeanInfo Fail: " + e); + return attributeList; + } + MBeanAttributeInfo[] attrInfos = info.getAttributes(); + + Map name2AttrInfo = new LinkedHashMap(); + for (MBeanAttributeInfo attrInfo : attrInfos) { + MBeanAttributeInfo attr = attrInfo; + if (!attr.isReadable()) { + LOG.warn("{}.{} not readable", mbeanName, attr); + continue; + } + name2AttrInfo.put(attr.getName(), attr); + } + final AttributeList attributes; + try { + attributes = + mbeanServer.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0])); + } catch (RuntimeException | InstanceNotFoundException | ReflectionException | IOException e) { + LOG.error("Fail grabbing attributes for mbean {} ", mbeanName, e); + return attributeList; + } + for (Attribute attribute : attributes.asList()) { + Object value = attribute.getValue(); + JmxStat.Builder jmxStatBuilder = + JmxStat.builder() + .withAttribute(attribute.getName()) + .withName(mbeanName.getKeyProperty("name")) + .withScope(mbeanName.getKeyProperty("scope")); + if (value == null) { + attributeList.add(jmxStatBuilder.withValue(0.0).build()); + } else if (value instanceof Number) { + attributeList.add(jmxStatBuilder.withValue(((Number) value).doubleValue()).build()); + } + } + + return attributeList; + } } diff --git a/src/server/src/main/java/io/cassandrareaper/resources/NodeStatsResource.java b/src/server/src/main/java/io/cassandrareaper/resources/NodeStatsResource.java new file mode 100644 index 000000000..02892e1f0 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/resources/NodeStatsResource.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.resources; + +import io.cassandrareaper.AppContext; +import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.Node; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/node") +@Produces(MediaType.APPLICATION_JSON) +public final class NodeStatsResource { + + private static final Logger LOG = LoggerFactory.getLogger(NodeStatsResource.class); + + private final AppContext context; + + public NodeStatsResource(AppContext context) { + this.context = context; + } + + /** + * Endpoint used to collect thread pool stats for a node. + * + * @return a list of thread pools if ok, and a status code 500 in case of errors. + */ + @GET + @Path("/tpstats/{clusterName}/{host}") + public Response getTpStats( + @Context UriInfo uriInfo, + @PathParam("clusterName") String clusterName, + @PathParam("host") String host) { + + try { + Node node = Node.builder().withClusterName(clusterName).withHostname(host).build(); + return Response.ok().entity(context.metricsGrabber.getTpStats(node)).build(); + } catch (RuntimeException | ReaperException e) { + LOG.error(e.getMessage(), e); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + /** + * Endpoint used to collect dropped messages stats for a node. + * + * @return a list of dropped messages metrics if ok, and a status code 500 in case of errors. + */ + @GET + @Path("/dropped/{clusterName}/{host}") + public Response getDroppedMessages( + @Context UriInfo uriInfo, + @PathParam("clusterName") String clusterName, + @PathParam("host") String host) { + + try { + Node node = Node.builder().withClusterName(clusterName).withHostname(host).build(); + return Response.ok().entity(context.metricsGrabber.getDroppedMessages(node)).build(); + } catch (RuntimeException | ReaperException e) { + LOG.error(e.getMessage(), e); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + /** + * Endpoint used to collect client request latencies for a node. + * + * @return a list of latency histograms if ok, and a status code 500 in case of errors. + */ + @GET + @Path("/clientRequestLatencies/{clusterName}/{host}") + public Response getClientRequestLatencies( + @Context UriInfo uriInfo, + @PathParam("clusterName") String clusterName, + @PathParam("host") String host) { + + try { + Node node = Node.builder().withClusterName(clusterName).withHostname(host).build(); + return Response.ok().entity(context.metricsGrabber.getClientRequestLatencies(node)).build(); + } catch (RuntimeException | ReaperException e) { + LOG.error(e.getMessage(), e); + return Response.serverError().entity(e.getMessage()).build(); + } + } +} diff --git a/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java b/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java new file mode 100644 index 000000000..0315743b7 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/service/MetricsGrabber.java @@ -0,0 +1,222 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.service; + +import io.cassandrareaper.AppContext; +import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.DroppedMessages; +import io.cassandrareaper.core.JmxStat; +import io.cassandrareaper.core.MetricsHistogram; +import io.cassandrareaper.core.Node; +import io.cassandrareaper.core.ThreadPoolStat; +import io.cassandrareaper.core.ThreadPoolStat.Builder; +import io.cassandrareaper.jmx.JmxProxy; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class MetricsGrabber { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsGrabber.class); + + private final AppContext context; + private final ExecutorService executor = Executors.newFixedThreadPool(5); + + private MetricsGrabber(AppContext context) { + this.context = context; + } + + public static MetricsGrabber create(AppContext context) { + return new MetricsGrabber(context); + } + + public List getTpStats(Node host) throws ReaperException { + try { + JmxProxy jmxProxy + = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds()); + + return convertToThreadPoolStats(jmxProxy.collectTpStats()); + } catch (RuntimeException + | InterruptedException + | MalformedObjectNameException + | AttributeNotFoundException + | InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException e) { + LOG.error("Failed collecting tpstats for host {}", host, e); + throw new ReaperException(e); + } + } + + @VisibleForTesting + public List convertToThreadPoolStats(Map> jmxStats) { + List tpstats = Lists.newArrayList(); + for (Entry> pool : jmxStats.entrySet()) { + Builder builder = ThreadPoolStat.builder().withName(pool.getKey()); + for (JmxStat stat : pool.getValue()) { + if (stat.getName().equals("MaxPoolSize")) { + builder.withMaxPoolSize(stat.getValue().intValue()); + } else if (stat.getName().equals("TotalBlockedTasks")) { + builder.withTotalBlockedTasks(stat.getValue().intValue()); + } else if (stat.getName().equals("PendingTasks")) { + builder.withPendingTasks(stat.getValue().intValue()); + } else if (stat.getName().equals("CurrentlyBlockedTasks")) { + builder.withCurrentlyBlockedTasks(stat.getValue().intValue()); + } else if (stat.getName().equals("CompletedTasks")) { + builder.withCompletedTasks(stat.getValue().intValue()); + } else if (stat.getName().equals("ActiveTasks")) { + builder.withActiveTasks(stat.getValue().intValue()); + } + } + tpstats.add(builder.build()); + } + + return tpstats; + } + + public List getDroppedMessages(Node host) throws ReaperException { + try { + JmxProxy jmxProxy = + context.jmxConnectionFactory.connect( + host, context.config.getJmxConnectionTimeoutInSeconds()); + + return convertToDroppedMessages(jmxProxy.collectDroppedMessages()); + } catch (RuntimeException + | InterruptedException + | MalformedObjectNameException + | AttributeNotFoundException + | InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException e) { + LOG.error("Failed collecting tpstats for host {}", host, e); + throw new ReaperException(e); + } + } + + @VisibleForTesting + public List convertToDroppedMessages(Map> jmxStats) { + List droppedMessages = Lists.newArrayList(); + for (Entry> pool : jmxStats.entrySet()) { + DroppedMessages.Builder builder = DroppedMessages.builder().withName(pool.getKey()); + for (JmxStat stat : pool.getValue()) { + if (stat.getAttribute().equals("Count")) { + builder.withCount(stat.getValue().intValue()); + } else if (stat.getAttribute().equals("OneMinuteRate")) { + builder.withOneMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("FiveMinuteRate")) { + builder.withFiveMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("FifteenMinuteRate")) { + builder.withFifteenMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("MeanRate")) { + builder.withMeanRate(stat.getValue()); + } + } + droppedMessages.add(builder.build()); + } + + return droppedMessages; + } + + public List getClientRequestLatencies(Node host) throws ReaperException { + try { + JmxProxy jmxProxy = + context.jmxConnectionFactory.connect( + host, context.config.getJmxConnectionTimeoutInSeconds()); + + return convertToMetricsHistogram(jmxProxy.collectLatencyMetrics()); + } catch (RuntimeException + | InterruptedException + | MalformedObjectNameException + | AttributeNotFoundException + | InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException e) { + LOG.error("Failed collecting tpstats for host {}", host, e); + throw new ReaperException(e); + } + } + + @VisibleForTesting + public List convertToMetricsHistogram(Map> jmxStats) { + List droppedMessages = Lists.newArrayList(); + for (Entry> pool : jmxStats.entrySet()) { + // We have several metric types that we need to process separately + // We'll group on MetricsHistogram::getType in order to generate one histogram per type + Map> metrics = + pool.getValue().stream().collect(Collectors.groupingBy(JmxStat::getName)); + + for (Entry> metric : metrics.entrySet()) { + MetricsHistogram.Builder builder = + MetricsHistogram.builder().withName(pool.getKey()).withType(metric.getKey()); + for (JmxStat stat : metric.getValue()) { + if (stat.getAttribute().equals("Count")) { + builder.withCount(stat.getValue().intValue()); + } else if (stat.getAttribute().equals("OneMinuteRate")) { + builder.withOneMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("FiveMinuteRate")) { + builder.withFiveMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("FifteenMinuteRate")) { + builder.withFifteenMinuteRate(stat.getValue()); + } else if (stat.getAttribute().equals("MeanRate")) { + builder.withMeanRate(stat.getValue()); + } else if (stat.getAttribute().equals("StdDev")) { + builder.withStdDev(stat.getValue()); + } else if (stat.getAttribute().equals("Min")) { + builder.withMin(stat.getValue()); + } else if (stat.getAttribute().equals("Max")) { + builder.withMax(stat.getValue()); + } else if (stat.getAttribute().equals("Mean")) { + builder.withMean(stat.getValue()); + } else if (stat.getAttribute().equals("50thPercentile")) { + builder.withP50(stat.getValue()); + } else if (stat.getAttribute().equals("75thPercentile")) { + builder.withP75(stat.getValue()); + } else if (stat.getAttribute().equals("95thPercentile")) { + builder.withP95(stat.getValue()); + } else if (stat.getAttribute().equals("98thPercentile")) { + builder.withP98(stat.getValue()); + } else if (stat.getAttribute().equals("99thPercentile")) { + builder.withP99(stat.getValue()); + } else if (stat.getAttribute().equals("999thPercentile")) { + builder.withP999(stat.getValue()); + } + } + droppedMessages.add(builder.build()); + } + } + + return droppedMessages; + } + +} diff --git a/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java b/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java index e8972512b..39dfcee2f 100644 --- a/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java +++ b/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java @@ -14,8 +14,11 @@ package io.cassandrareaper; +import io.cassandrareaper.core.DroppedMessages; +import io.cassandrareaper.core.MetricsHistogram; import io.cassandrareaper.core.RepairSegment; import io.cassandrareaper.core.Snapshot; +import io.cassandrareaper.core.ThreadPoolStat; import io.cassandrareaper.resources.view.RepairRunStatus; import io.cassandrareaper.resources.view.RepairScheduleStatus; @@ -166,4 +169,16 @@ public static Map> parseSnapshotMapJSON(String json) { return parseJSON(json, new TypeReference>>() {}); } + public static List parseTpStatJSON(String json) { + return parseJSON(json, new TypeReference>() {}); + } + + public static List parseDroppedMessagesJSON(String json) { + return parseJSON(json, new TypeReference>() {}); + } + + public static List parseClientRequestMetricsJSON(String json) { + return parseJSON(json, new TypeReference>() {}); + } + } diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java index 5f739d0f4..e7860a12b 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java @@ -17,10 +17,13 @@ import io.cassandrareaper.AppContext; import io.cassandrareaper.ReaperException; import io.cassandrareaper.SimpleReaperClient; +import io.cassandrareaper.core.DroppedMessages; +import io.cassandrareaper.core.MetricsHistogram; import io.cassandrareaper.core.Node; import io.cassandrareaper.core.RepairRun; import io.cassandrareaper.core.RepairSegment; import io.cassandrareaper.core.Snapshot; +import io.cassandrareaper.core.ThreadPoolStat; import io.cassandrareaper.jmx.JmxConnectionFactory; import io.cassandrareaper.jmx.JmxProxy; import io.cassandrareaper.resources.view.RepairRunStatus; @@ -1426,6 +1429,104 @@ public void theResponseWasRedirectedToTheLoginPage() throws Throwable { lastResponse.readEntity(String.class).contains("Not a real login page")); } + @And("^we can collect the tpstats from the seed node$") + public void we_can_collect_the_tpstats_from_the_seed_node() throws Throwable { + synchronized (BasicSteps.class) { + RUNNERS + .parallelStream() + .forEach( + runner -> { + Response response = + runner.callReaper( + "GET", + "/node/tpstats/" + + TestContext.TEST_CLUSTER + + "/" + + TestContext.SEED_HOST.split("@")[0], + EMPTY_PARAMS); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + String responseData = response.readEntity(String.class); + List tpstats = SimpleReaperClient.parseTpStatJSON(responseData); + + assertTrue( + tpstats.stream().filter(tpstat -> tpstat.getName().equals("ReadStage")).count() + == 1); + assertTrue( + tpstats + .stream() + .filter(tpstat -> tpstat.getName().equals("ReadStage")) + .filter(tpstat -> tpstat.getCurrentlyBlockedTasks() == 0) + .filter(tpstat -> tpstat.getCompletedTasks() > 0) + .count() + == 1); + }); + } + } + + @And("^we can collect the dropped messages stats from the seed node$") + public void we_can_collect_the_dropped_messages_stats_from_the_seed_node() throws Throwable { + synchronized (BasicSteps.class) { + RUNNERS + .parallelStream() + .forEach( + runner -> { + Response response = + runner.callReaper( + "GET", + "/node/dropped/" + + TestContext.TEST_CLUSTER + + "/" + + TestContext.SEED_HOST.split("@")[0], + EMPTY_PARAMS); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + String responseData = response.readEntity(String.class); + List tpstats = + SimpleReaperClient.parseDroppedMessagesJSON(responseData); + + assertTrue( + tpstats.stream().filter(tpstat -> tpstat.getName().equals("READ")).count() + == 1); + assertTrue( + tpstats + .stream() + .filter(tpstat -> tpstat.getName().equals("READ")) + .filter(tpstat -> tpstat.getCount() >= 0) + .count() + == 1); + }); + } + } + + @And("^we can collect the client request metrics from the seed node$") + public void we_can_collect_the_client_request_metrics_from_the_seed_node() throws Throwable { + synchronized (BasicSteps.class) { + RUNNERS + .parallelStream() + .forEach( + runner -> { + Response response = + runner.callReaper( + "GET", + "/node/clientRequestLatencies/" + + TestContext.TEST_CLUSTER + + "/" + + TestContext.SEED_HOST.split("@")[0], + EMPTY_PARAMS); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + String responseData = response.readEntity(String.class); + List clientRequestMetrics = + SimpleReaperClient.parseClientRequestMetricsJSON(responseData); + + assertTrue( + clientRequestMetrics + .stream() + .filter(metric -> metric.getName().startsWith("Write")) + .count() + >= 1); + }); + } + } + private static int httpStatus(String statusCodeDescriptions) { String enumName = statusCodeDescriptions.toUpperCase().replace(' ', '_'); return Response.Status.valueOf(enumName).getStatusCode(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java b/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java new file mode 100644 index 000000000..ba14a0db9 --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/service/MetricsGrabberTest.java @@ -0,0 +1,247 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.service; + +import io.cassandrareaper.AppContext; +import io.cassandrareaper.ReaperApplicationConfiguration; +import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.Cluster; +import io.cassandrareaper.core.DroppedMessages; +import io.cassandrareaper.core.JmxStat; +import io.cassandrareaper.core.Node; +import io.cassandrareaper.core.ThreadPoolStat; +import io.cassandrareaper.jmx.JmxConnectionFactory; +import io.cassandrareaper.jmx.JmxProxy; +import io.cassandrareaper.storage.IStorage; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import jersey.repackaged.com.google.common.collect.Lists; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MetricsGrabberTest { + + @Test + public void testGetTpstats() + throws InterruptedException, ReaperException, MalformedObjectNameException, + AttributeNotFoundException, InstanceNotFoundException, MBeanException, + ReflectionException, IOException { + AppContext context = new AppContext(); + context.config = new ReaperApplicationConfiguration(); + context.config.setJmxConnectionTimeoutInSeconds(10); + + context.storage = mock(IStorage.class); + Cluster cluster = + new Cluster("testCluster", "murmur3", new TreeSet(Arrays.asList("127.0.0.1"))); + when(context.storage.getCluster(anyString())).thenReturn(Optional.of(cluster)); + + final JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getLiveNodes()).thenReturn(Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3")); + + final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); + context.jmxConnectionFactory = + new JmxConnectionFactory() { + @Override + public JmxProxy connect(Node host, int connectionTimeout) + throws ReaperException, InterruptedException { + return jmx; + } + + @Override + public JmxProxy connectAny(Cluster cluster, int connectionTimeout) + throws ReaperException { + return jmx; + } + }; + Node node = Node.builder().withClusterName("test").withHostname("127.0.0.1").build(); + metricsGrabber.getTpStats(node); + verify(jmx, times(1)).collectTpStats(); + } + + @Test + public void testConvertToThreadPoolStats() { + AppContext context = new AppContext(); + final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); + + List statList = Lists.newArrayList(); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("PendingTasks") + .withAttribute("Value") + .withValue(1.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("ActiveTasks") + .withAttribute("Value") + .withValue(2.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("CurrentlyBlockedTasks") + .withAttribute("Value") + .withValue(3.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("CompletedTasks") + .withAttribute("Value") + .withValue(4.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("MaxPoolSize") + .withAttribute("Value") + .withValue(5.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("ReadStage") + .withName("TotalBlockedTasks") + .withAttribute("Value") + .withValue(6.0) + .build()); + + Map> jmxStats = Maps.newHashMap(); + jmxStats.put("ReadStage", statList); + + List threadPoolStats = metricsGrabber.convertToThreadPoolStats(jmxStats); + ThreadPoolStat tpstat = threadPoolStats.get(0); + + assertEquals(1, tpstat.getPendingTasks().intValue()); + assertEquals(2, tpstat.getActiveTasks().intValue()); + assertEquals(3, tpstat.getCurrentlyBlockedTasks().intValue()); + assertEquals(4, tpstat.getCompletedTasks().intValue()); + assertEquals(5, tpstat.getMaxPoolSize().intValue()); + assertEquals(6, tpstat.getTotalBlockedTasks().intValue()); + } + + @Test + public void testGetDroppedMessages() + throws InterruptedException, ReaperException, MalformedObjectNameException, + AttributeNotFoundException, InstanceNotFoundException, MBeanException, + ReflectionException, IOException { + AppContext context = new AppContext(); + context.config = new ReaperApplicationConfiguration(); + context.config.setJmxConnectionTimeoutInSeconds(10); + + context.storage = mock(IStorage.class); + Cluster cluster = + new Cluster("testCluster", "murmur3", new TreeSet(Arrays.asList("127.0.0.1"))); + when(context.storage.getCluster(anyString())).thenReturn(Optional.of(cluster)); + + final JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getLiveNodes()).thenReturn(Arrays.asList("127.0.0.1", "127.0.0.2", "127.0.0.3")); + + final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); + context.jmxConnectionFactory = + new JmxConnectionFactory() { + @Override + public JmxProxy connect(Node host, int connectionTimeout) + throws ReaperException, InterruptedException { + return jmx; + } + + @Override + public JmxProxy connectAny(Cluster cluster, int connectionTimeout) + throws ReaperException { + return jmx; + } + }; + Node node = Node.builder().withClusterName("test").withHostname("127.0.0.1").build(); + metricsGrabber.getDroppedMessages(node); + verify(jmx, times(1)).collectDroppedMessages(); + } + + @Test + public void testConvertToDroppedMessages() { + AppContext context = new AppContext(); + final MetricsGrabber metricsGrabber = MetricsGrabber.create(context); + + List statList = Lists.newArrayList(); + statList.add( + JmxStat.builder() + .withScope("READ") + .withName("Dropped") + .withAttribute("MeanRate") + .withValue(1.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("READ") + .withName("Dropped") + .withAttribute("OneMinuteRate") + .withValue(2.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("READ") + .withName("Dropped") + .withAttribute("FiveMinuteRate") + .withValue(3.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("READ") + .withName("Dropped") + .withAttribute("FifteenMinuteRate") + .withValue(4.0) + .build()); + statList.add( + JmxStat.builder() + .withScope("READ") + .withName("Dropped") + .withAttribute("Count") + .withValue(5.0) + .build()); + + Map> jmxStats = Maps.newHashMap(); + jmxStats.put("READ", statList); + + List droppedMessages = metricsGrabber.convertToDroppedMessages(jmxStats); + DroppedMessages dropped = droppedMessages.get(0); + + assertEquals(1, dropped.getMeanRate().intValue()); + assertEquals(2, dropped.getOneMinuteRate().intValue()); + assertEquals(3, dropped.getFiveMinuteRate().intValue()); + assertEquals(4, dropped.getFifteenMinuteRate().intValue()); + assertEquals(5, dropped.getCount().intValue()); + } + +} diff --git a/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature b/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature index d208b0791..12176afc0 100644 --- a/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature +++ b/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature @@ -19,6 +19,9 @@ Feature: Using Reaper to launch repairs and schedule them Given that we are going to use "127.0.0.1@test" as cluster seed host And reaper has no cluster in storage When an add-cluster request is made to reaper + And we can collect the tpstats from the seed node + And we can collect the dropped messages stats from the seed node + And we can collect the client request metrics from the seed node Then reaper has the last added cluster in storage And reaper has 0 scheduled repairs for the last added cluster When a new daily "full" repair schedule is added for the last added cluster and keyspace "booya"