diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java b/src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java index 9e8d96e6f..70ed757f7 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java @@ -458,7 +458,7 @@ public CompactionStats listActiveCompactions(Node node) throws MalformedObjectNameException, ReflectionException, ReaperException, InterruptedException, IOException { LOG.debug("Listing active compactions for node {}", node); - String nodeDc = getDatacenter(node); + String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname()); if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) { LOG.debug("Yay!! Node {} in DC {} is accessible through JMX", node.getHostname(), nodeDc); // We have direct JMX access to the node @@ -538,7 +538,7 @@ public Map> collectMetrics(Node node, String[] collectedMe */ public List getClientRequestLatencies(Node node) throws ReaperException { try { - String nodeDc = getDatacenter(node); + String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname()); if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) { MetricsProxy metricsProxy = MetricsProxy.create(connect(node)); return convertToMetricsHistogram( @@ -553,7 +553,7 @@ public List getClientRequestLatencies(Node node) throws Reaper "ClientRequest", DateTime.now().minusMinutes(METRICS_PARTITIONING_TIME_MINS + 1).getMillis())); } - } catch (JMException | InterruptedException | IOException e) { + } catch (JMException | IOException e) { LOG.error("Failed collecting tpstats for host {}", node, e); throw new ReaperException(e); } @@ -568,7 +568,7 @@ public List getClientRequestLatencies(Node node) throws Reaper */ public List getDroppedMessages(Node node) throws ReaperException { try { - String nodeDc = getDatacenter(node); + String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname()); if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) { MetricsProxy proxy = MetricsProxy.create(connect(node)); return convertToDroppedMessages(MetricsProxy.convertToGenericMetrics(proxy.collectDroppedMessages(), node)); @@ -581,7 +581,7 @@ public List getDroppedMessages(Node node) throws ReaperExceptio "DroppedMessage", DateTime.now().minusMinutes(1).getMillis())); } - } catch (JMException | InterruptedException | IOException e) { + } catch (JMException | IOException e) { LOG.error("Failed collecting tpstats for host {}", node, e); throw new ReaperException(e); } @@ -611,7 +611,7 @@ public List convertToDroppedMessages(List metric */ public List getTpStats(Node node) throws ReaperException { try { - String nodeDc = getDatacenter(node); + String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname()); if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) { MetricsProxy proxy = MetricsProxy.create(connect(node)); return convertToThreadPoolStats(MetricsProxy.convertToGenericMetrics(proxy.collectTpStats(), node)); @@ -624,7 +624,7 @@ public List getTpStats(Node node) throws ReaperException { "ThreadPools", DateTime.now().minusMinutes(1).getMillis())); } - } catch (JMException | InterruptedException | IOException e) { + } catch (JMException | IOException e) { LOG.error("Failed collecting tpstats for host {}", node, e); throw new ReaperException(e); } @@ -697,11 +697,16 @@ public Pair takeSnapshot(String snapshotName, Node host, String... */ public List listSnapshots(Node host) throws ReaperException { try { - return SnapshotProxy.create(connect(host)).listSnapshots(); + if (context.config.getDatacenterAvailability().isInCollocatedMode() + && context.jmxConnectionFactory.getHostConnectionCounters().getSuccessfulConnections( + host.getHostname()) >= 0) { + return SnapshotProxy.create(connect(host)).listSnapshots(); + } } catch (UnsupportedOperationException unsupported) { LOG.debug("Listing snapshot is unsupported with Cassandra 2.0 and prior"); throw unsupported; } + return Collections.emptyList(); } /** @@ -731,7 +736,7 @@ public void clearSnapshot(String snapshotName, Node host) throws ReaperException */ public List listActiveStreams(Node node) throws ReaperException, InterruptedException, IOException { - String nodeDc = getDatacenter(node); + String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname()); if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) { // We have direct JMX access to the node return listStreamsDirect(node); @@ -741,8 +746,11 @@ public List listActiveStreams(Node node) String streamsJson = ((IDistributedStorage) context.storage) .listOperations(node.getClusterName(), OpType.OP_STREAMING, node.getHostname()); + if (streamsJson.length() > 0) { + return parseStreamSessionJson(streamsJson); + } - return parseStreamSessionJson(streamsJson); + return Collections.emptyList(); } } diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxConnectionFactory.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxConnectionFactory.java index 0b1cf9490..1856d44c4 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxConnectionFactory.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxConnectionFactory.java @@ -133,12 +133,19 @@ public final JmxProxy connectAny(Collection nodes) throws ReaperException for (int i = 0; i < 2; i++) { for (Node node : nodeList) { // First loop, we try the most accessible nodes, then second loop we try all nodes - if (hostConnectionCounters.getSuccessfulConnections(node.getHostname()) >= 0 || 1 == i) { + if (getHostConnectionCounters().getSuccessfulConnections(node.getHostname()) >= 0 || 1 == i) { try { - return connectImpl(node); + LOG.debug("Trying to connect to node {} with {} successful connections with i = {}", + node.getHostname(), getHostConnectionCounters().getSuccessfulConnections(node.getHostname()), i); + JmxProxy jmxProxy = connectImpl(node); + getHostConnectionCounters().incrementSuccessfulConnections(node.getHostname()); + if (getHostConnectionCounters().getSuccessfulConnections(node.getHostname()) > 0) { + accessibleDatacenters.add(EndpointSnitchInfoProxy.create(jmxProxy).getDataCenter()); + } + return jmxProxy; } catch (ReaperException | RuntimeException e) { - LOG.info("Unreachable host: {}: {}", e.getMessage(), e.getCause().getMessage()); - LOG.debug("Unreachable host: ", e); + getHostConnectionCounters().decrementSuccessfulConnections(node.getHostname()); + LOG.info("Unreachable host: ", e); } catch (InterruptedException expected) { LOG.trace("Expected exception", expected); } @@ -172,7 +179,7 @@ public void setJmxmp(Jmxmp jmxmp) { this.jmxmp = jmxmp; } - public final HostConnectionCounters getHostConnectionCounters() { + public HostConnectionCounters getHostConnectionCounters() { return hostConnectionCounters; } @@ -230,13 +237,8 @@ public JmxProxy apply(String host) { try { JmxProxy proxy = JmxProxyImpl.connect( host, jmxCredentials, addressTranslator, connectionTimeout, metricRegistry, cryptograph, jmxmp); - if (hostConnectionCounters.getSuccessfulConnections(host) <= 0) { - accessibleDatacenters.add(EndpointSnitchInfoProxy.create(proxy).getDataCenter()); - } - hostConnectionCounters.incrementSuccessfulConnections(host); return proxy; } catch (ReaperException | InterruptedException ex) { - hostConnectionCounters.decrementSuccessfulConnections(host); throw new RuntimeException(ex); } } diff --git a/src/server/src/main/java/io/cassandrareaper/service/Heart.java b/src/server/src/main/java/io/cassandrareaper/service/Heart.java index b7233fd35..78d151ca3 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/Heart.java +++ b/src/server/src/main/java/io/cassandrareaper/service/Heart.java @@ -21,14 +21,13 @@ import io.cassandrareaper.ReaperApplicationConfiguration.DatacenterAvailability; import io.cassandrareaper.ReaperException; import io.cassandrareaper.core.Cluster; -import io.cassandrareaper.core.NodeMetrics; +import io.cassandrareaper.core.Node; import io.cassandrareaper.jmx.ClusterFacade; -import io.cassandrareaper.jmx.JmxProxy; import io.cassandrareaper.storage.IDistributedStorage; -import io.cassandrareaper.storage.IStorage; -import java.util.Arrays; -import java.util.UUID; +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -40,13 +39,13 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public final class Heart implements AutoCloseable { private static final AtomicBoolean GAUGES_REGISTERED = new AtomicBoolean(false); @@ -119,36 +118,49 @@ private void updateRequestedNodeMetrics() { if (!updatingNodeMetrics.getAndSet(true)) { forkJoinPool.submit(() -> { try (Timer.Context t0 = timer(context, "updatingNodeMetrics")) { - + ClusterFacade clusterFacade = ClusterFacade.create(context); + Collection clusters = context.storage.getClusters(); forkJoinPool.submit(() -> { - context.repairManager.repairRunners.keySet() + clusters .parallelStream() - .forEach(runId -> { - - ((IDistributedStorage) context.storage).getNodeMetrics(runId) - .parallelStream() - .filter(metrics -> canAnswerToNodeMetricsRequest(metrics)) - .forEach(req -> { - - LOG.info("Got metric request for node {} in {}", req.getNode(), req.getCluster()); - try (Timer.Context t1 = timer( - context, - req.getCluster().replace('.', '-'), - req.getNode().replace('.', '-'))) { - + .filter(cluster -> cluster.getState() == Cluster.State.ACTIVE) + .forEach(cluster -> { + try { + clusterFacade.getLiveNodes(cluster) + .parallelStream() + .filter(hostname -> { + return context.jmxConnectionFactory + .getHostConnectionCounters() + .getSuccessfulConnections(hostname) >= 0; + }) + .map(hostname -> Node.builder() + .withHostname(hostname) + .withCluster( + Cluster.builder() + .withName(cluster.getName()) + .withSeedHosts(ImmutableSet.of(hostname)) + .build()) + .build()) + .forEach(node -> { try { - grabAndStoreNodeMetrics(context.storage, runId, req); - - LOG.info("Responded to metric request for node {}", req.getNode()); - } catch (ReaperException | RuntimeException | InterruptedException ex) { - LOG.debug("failed seed connection in cluster " + req.getCluster(), ex); - } catch (JMException e) { - LOG.warn( - "failed querying JMX MBean for metrics on node {} of cluster {} due to {}", - req.getNode(), req.getCluster(), e.getMessage()); + metricsService.grabAndStoreCompactionStats(Optional.of(node)); + metricsService.grabAndStoreActiveStreams(Optional.of(node)); + if (lastMetricBeat.get() + maxBeatFrequencyMillis <= System.currentTimeMillis()) { + metricsService.grabAndStoreGenericMetrics(Optional.of(node)); + lastMetricBeat.set(System.currentTimeMillis()); + } + } catch (JMException | ReaperException | RuntimeException | IOException e) { + LOG.error("Couldn't extract metrics for node {} in cluster {}", + node.getHostname(), cluster.getName(), e); + } catch (InterruptedException e) { + LOG.error("Interrupted while extracting metrics for node {} in cluster {}", + node.getHostname(), cluster.getName(), e); } - } - }); + }); + } catch (ReaperException e) { + LOG.error("Couldn't list live nodes in cluster {}", cluster.getName(), e); + e.printStackTrace(); + } }); }).get(); @@ -156,18 +168,18 @@ private void updateRequestedNodeMetrics() { // In sidecar mode we store metrics in the db on a regular basis if (lastMetricBeat.get() + maxBeatFrequencyMillis <= System.currentTimeMillis()) { - metricsService.grabAndStoreGenericMetrics(); + metricsService.grabAndStoreGenericMetrics(Optional.empty()); lastMetricBeat.set(System.currentTimeMillis()); } else { LOG.trace("Not storing metrics yet... Last beat was {} and now is {}", lastMetricBeat.get(), System.currentTimeMillis()); } - metricsService.grabAndStoreCompactionStats(); - metricsService.grabAndStoreActiveStreams(); + metricsService.grabAndStoreCompactionStats(Optional.empty()); + metricsService.grabAndStoreActiveStreams(Optional.empty()); } } catch (ExecutionException | InterruptedException | RuntimeException - | ReaperException | JMException | JsonProcessingException ex) { + | ReaperException | JMException | IOException ex) { LOG.warn("Failed metric collection during heartbeat", ex); } finally { assert updatingNodeMetrics.get(); @@ -177,40 +189,6 @@ private void updateRequestedNodeMetrics() { } } - /** - * Checks if the local Reaper instance is supposed to answer a metrics request. - * Requires to be in sidecar on the node for which metrics are requested, or to be in a different mode than ALL. - * Also checks that the metrics record as requested set to true. - * - * @param metric a metric request - * @return true if reaper should try to answer the metric request - */ - private boolean canAnswerToNodeMetricsRequest(NodeMetrics metric) { - return (context.config.getDatacenterAvailability() == DatacenterAvailability.SIDECAR - && metric.getNode().equals(context.getLocalNodeAddress())) - || (context.config.getDatacenterAvailability() != DatacenterAvailability.ALL - && context.config.getDatacenterAvailability() != DatacenterAvailability.SIDECAR) - && metric.isRequested(); - } - - private void grabAndStoreNodeMetrics(IStorage storage, UUID runId, NodeMetrics req) - throws ReaperException, InterruptedException, JMException { - - Cluster cluster = storage.getCluster(req.getCluster()); - JmxProxy nodeProxy = ClusterFacade.create(context).connect(cluster, Arrays.asList(req.getNode())); - - ((IDistributedStorage) storage).storeNodeMetrics( - runId, - NodeMetrics.builder() - .withNode(req.getNode()) - .withCluster(req.getCluster()) - .withDatacenter(req.getDatacenter()) - .withPendingCompactions(nodeProxy.getPendingCompactions()) - .withHasRepairRunning(nodeProxy.isRepairRunning()) - .withActiveAnticompactions(0) // for future use - .build()); - } - private static Timer.Context timer(AppContext context, String... names) { return context.metricRegistry.timer(MetricRegistry.name(Heart.class, names)).time(); } diff --git a/src/server/src/main/java/io/cassandrareaper/service/MetricsService.java b/src/server/src/main/java/io/cassandrareaper/service/MetricsService.java index 92e62badc..d99679696 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/MetricsService.java +++ b/src/server/src/main/java/io/cassandrareaper/service/MetricsService.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import javax.management.JMException; @@ -127,20 +128,20 @@ public List convertToGenericMetrics(Map> jm return metrics; } - void grabAndStoreGenericMetrics() throws ReaperException, InterruptedException, JMException { + void grabAndStoreGenericMetrics(Optional maybeNode) throws ReaperException, InterruptedException, JMException { Preconditions.checkState( - context.config.isInSidecarMode(), - "grabAndStoreGenericMetrics() can only be called in sidecar"); - - Node node - = Node.builder() - .withHostname(context.getLocalNodeAddress()) - .withCluster( - Cluster.builder() - .withName(localClusterName) - .withSeedHosts(ImmutableSet.of(context.getLocalNodeAddress())) - .build()) - .build(); + context.config.getDatacenterAvailability().isInCollocatedMode(), + "grabAndStoreGenericMetrics() can only be called in collocated mode"); + + Node node = maybeNode.orElseGet(() -> + Node.builder() + .withHostname(context.getLocalNodeAddress()) + .withCluster( + Cluster.builder() + .withName(localClusterName) + .withSeedHosts(ImmutableSet.of(context.getLocalNodeAddress())) + .build()) + .build()); List metrics = convertToGenericMetrics(ClusterFacade.create(context).collectMetrics(node, COLLECTED_METRICS), node); @@ -148,43 +149,63 @@ void grabAndStoreGenericMetrics() throws ReaperException, InterruptedException, for (GenericMetric metric:metrics) { ((IDistributedStorage)context.storage).storeMetric(metric); } - LOG.debug("Grabbing and storing metrics for {}", context.getLocalNodeAddress()); + LOG.debug("Grabbing and storing metrics for {}", node.getHostname()); } - void grabAndStoreCompactionStats() throws JsonProcessingException, JMException, ReaperException { + void grabAndStoreCompactionStats(Optional maybeNode) + throws JsonProcessingException, JMException, ReaperException { Preconditions.checkState( - context.config.isInSidecarMode(), + context.config.getDatacenterAvailability().isInCollocatedMode(), "grabAndStoreCompactionStats() can only be called in sidecar"); - Node node = Node.builder().withHostname(context.getLocalNodeAddress()).build(); + Node node = maybeNode.orElseGet(() -> + Node.builder() + .withHostname(context.getLocalNodeAddress()) + .withCluster( + Cluster.builder() + .withName(localClusterName) + .withSeedHosts(ImmutableSet.of(context.getLocalNodeAddress())) + .build()) + .build()); + CompactionStats compactionStats = ClusterFacade.create(context).listCompactionStatsDirect(node); ((IDistributedStorage) context.storage) .storeOperations( - localClusterName, + node.getClusterName(), OpType.OP_COMPACTION, - context.getLocalNodeAddress(), + node.getHostname(), objectMapper.writeValueAsString(compactionStats)); - LOG.debug("Grabbing and storing compaction stats for {}", context.getLocalNodeAddress()); + LOG.debug("Grabbing and storing compaction stats for {}", node.getHostname()); } - void grabAndStoreActiveStreams() throws JsonProcessingException, ReaperException { + void grabAndStoreActiveStreams(Optional maybeNode) throws JsonProcessingException, ReaperException { Preconditions.checkState( - context.config.isInSidecarMode(), + context.config.getDatacenterAvailability().isInCollocatedMode(), "grabAndStoreActiveStreams() can only be called in sidecar"); - Node node = Node.builder().withHostname(context.getLocalNodeAddress()).build(); + Node node = maybeNode.orElseGet(() -> + Node.builder() + .withHostname(context.getLocalNodeAddress()) + .withCluster( + Cluster.builder() + .withName(localClusterName) + .withSeedHosts(ImmutableSet.of(context.getLocalNodeAddress())) + .build()) + .build()); + List activeStreams = ClusterFacade.create(context).listStreamsDirect(node); ((IDistributedStorage) context.storage) .storeOperations( - localClusterName, - OpType.OP_STREAMING,context.getLocalNodeAddress(), + node.getClusterName(), + OpType.OP_STREAMING, + node.getHostname(), objectMapper.writeValueAsString(activeStreams)); - LOG.debug("Grabbing and storing streams for {}", context.getLocalNodeAddress()); + LOG.debug("Grabbing and storing streams for {}", node.getHostname()); } } diff --git a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java index bc1dab417..7e6c280e8 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java +++ b/src/server/src/main/java/io/cassandrareaper/service/PurgeService.java @@ -144,7 +144,6 @@ private void purgeMetrics() { storage.purgeMetrics(); storage.purgeNodeOperations(); } - storage.purgeNodeMetrics(); } } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java index 1e46d5d3e..cce62113a 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java @@ -25,7 +25,6 @@ import io.cassandrareaper.core.ClusterProperties; import io.cassandrareaper.core.DiagEventSubscription; import io.cassandrareaper.core.GenericMetric; -import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.core.RepairRun; import io.cassandrareaper.core.RepairRun.Builder; import io.cassandrareaper.core.RepairRun.RunState; @@ -97,7 +96,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import io.dropwizard.setup.Environment; import io.dropwizard.util.Duration; import org.apache.cassandra.repair.RepairParallelism; @@ -1619,92 +1617,6 @@ private boolean hasLeadOnSegment(UUID leaderId) { return lwtResult.wasApplied(); } - @Override - public void storeNodeMetrics(UUID runId, NodeMetrics nodeMetrics) { - long minute = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()); - storeNodeMetricsImpl(runId, nodeMetrics, minute); - storeNodeMetricsImpl(runId, nodeMetrics, minute + 1); - storeNodeMetricsImpl(runId, nodeMetrics, minute + 2); - } - - private void storeNodeMetricsImpl(UUID runId, NodeMetrics nodeMetrics, long minute) { - session.executeAsync( - storeNodeMetricsPrepStmt.bind( - minute, - runId, - nodeMetrics.getNode(), - nodeMetrics.getDatacenter(), - nodeMetrics.getCluster(), - nodeMetrics.isRequested(), - nodeMetrics.getPendingCompactions(), - nodeMetrics.hasRepairRunning(), - nodeMetrics.getActiveAnticompactions())); - } - - @Override - public Collection getNodeMetrics(UUID runId) { - List futures = Lists.newArrayList(); - long minuteBefore = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - 60_000); - long minute = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()); - futures.add(session.executeAsync(getNodeMetricsPrepStmt.bind(minuteBefore, runId))); - futures.add(session.executeAsync(getNodeMetricsPrepStmt.bind(minute, runId))); - ListenableFuture> results = Futures.successfulAsList(futures); - try { - Set metrics = results.get() - .stream() - .map(result -> result.all()) - .flatMap(Collection::stream) - .map(row -> createNodeMetrics(row)) - .collect(Collectors.toSet()); - return metrics; - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Failed collecting metrics requests for run {}", runId, e); - return Collections.emptySet(); - } - } - - @Override - public Optional getNodeMetrics(UUID runId, String node) { - List futures = Lists.newArrayList(); - long minuteBefore = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - 60_000); - long minute = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()); - futures.add(session.executeAsync(getNodeMetricsByNodePrepStmt.bind(minute, runId, node))); - futures.add(session.executeAsync(getNodeMetricsByNodePrepStmt.bind(minuteBefore, runId, node))); - ListenableFuture> results = Futures.successfulAsList(futures); - try { - for (ResultSet result:results.get()) { - for (Row row:result) { - return Optional.of(createNodeMetrics(row)); - } - } - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Failed grabbing metrics for node {}. Will try again later.", node, e); - } - return Optional.empty(); - } - - @Override - public void deleteNodeMetrics(UUID runId, String node) { - long minute = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()); - LOG.info("Deleting metrics for node {}", node); - session.executeAsync(delNodeMetricsByNodePrepStmt.bind(minute, runId, node)); - } - - @Override - public void purgeNodeMetrics() {} - - private static NodeMetrics createNodeMetrics(Row row) { - return NodeMetrics.builder() - .withNode(row.getString("node")) - .withDatacenter(row.getString("datacenter")) - .withCluster(row.getString("cluster")) - .withRequested(row.getBool("requested")) - .withPendingCompactions(row.getInt("pending_compactions")) - .withHasRepairRunning(row.getBool("has_repair_running")) - .withActiveAnticompactions(row.getInt("active_anticompactions")) - .build(); - } - @Override public int countRunningReapers() { ResultSet result = session.execute(getRunningReapersCountPrepStmt.bind()); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java index e6829a5ff..ccbaea2eb 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java @@ -18,11 +18,9 @@ package io.cassandrareaper.storage; import io.cassandrareaper.core.GenericMetric; -import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.core.RepairSegment; import io.cassandrareaper.service.RingRange; -import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; @@ -67,14 +65,6 @@ boolean releaseRunningRepairsForNodes( void saveHeartbeat(); - Collection getNodeMetrics(UUID runId); - - Optional getNodeMetrics(UUID runId, String node); - - void deleteNodeMetrics(UUID runId, String node); - - void storeNodeMetrics(UUID runId, NodeMetrics nodeMetrics); - /** * Gets the next free segment from the backend that is both within the parallel range and the local node ranges. * @@ -98,11 +88,6 @@ List getMetrics( String listOperations(String clusterName, OpType operationType, String host); - /** - * Purges old node metrics from the database (no-op for databases with TTL) - */ - void purgeNodeMetrics(); - /** * Purges old metrics from the database (no-op for databases w/ TTL) */ diff --git a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java index 777b49605..667ed1ac5 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/PostgresStorage.java @@ -21,7 +21,6 @@ import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.DiagEventSubscription; import io.cassandrareaper.core.GenericMetric; -import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.core.RepairRun; import io.cassandrareaper.core.RepairSchedule; import io.cassandrareaper.core.RepairSegment; @@ -810,78 +809,6 @@ public int countRunningReapers() { return 1; } - @Override - public void storeNodeMetrics(UUID runId, NodeMetrics nodeMetrics) { - if (null != jdbi) { - try (Handle h = jdbi.open()) { - getPostgresStorage(h).storeNodeMetrics( - UuidUtil.toSequenceId(runId), - nodeMetrics.getNode(), - nodeMetrics.getCluster(), - nodeMetrics.getDatacenter(), - nodeMetrics.isRequested(), - nodeMetrics.getPendingCompactions(), - nodeMetrics.hasRepairRunning(), - nodeMetrics.getActiveAnticompactions() - ); - } - } - } - - @Override - public Collection getNodeMetrics(UUID runId) { - if (null != jdbi) { - try (Handle h = jdbi.open()) { - Instant expirationTime = getExpirationTime(reaperTimeout); - return getPostgresStorage(h).getNodeMetrics( - UuidUtil.toSequenceId(runId), - expirationTime - ); - } - } - return new ArrayList<>(); - } - - @Override - public Optional getNodeMetrics(UUID runId, String node) { - if (null != jdbi) { - try (Handle h = jdbi.open()) { - Instant expirationTime = getExpirationTime(reaperTimeout); - NodeMetrics nm = getPostgresStorage(h).getNodeMetricsByNode( - UuidUtil.toSequenceId(runId), - expirationTime, - node - ); - if (nm != null) { - return Optional.of(nm); - } - } - } - return Optional.empty(); - } - - @Override - public void deleteNodeMetrics(UUID runId, String node) { - if (null != jdbi) { - try (Handle h = jdbi.open()) { - getPostgresStorage(h).deleteNodeMetricsByNode( - UuidUtil.toSequenceId(runId), - node - ); - } - } - } - - @Override - public void purgeNodeMetrics() { - if (null != jdbi) { - try (Handle h = jdbi.open()) { - Instant expirationTime = getExpirationTime(reaperTimeout); - getPostgresStorage(h).purgeOldNodeMetrics(expirationTime); - } - } - } - @Override public List getNextFreeSegmentsForRanges( UUID runId, diff --git a/src/server/src/test/java/io/cassandrareaper/jmx/JmxConnectionsInitializerTest.java b/src/server/src/test/java/io/cassandrareaper/jmx/JmxConnectionsInitializerTest.java index 773d3c4a0..f66b54222 100644 --- a/src/server/src/test/java/io/cassandrareaper/jmx/JmxConnectionsInitializerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/jmx/JmxConnectionsInitializerTest.java @@ -27,6 +27,7 @@ import io.cassandrareaper.storage.CassandraStorage; import io.cassandrareaper.storage.PostgresStorage; +import java.net.UnknownHostException; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableSet; @@ -45,10 +46,10 @@ public class JmxConnectionsInitializerTest { * @throws ReaperException */ @Test - public void initializerDatacenterAvailabilityEachTest() throws ReaperException { + public void initializerDatacenterAvailabilityEachTest() throws ReaperException, UnknownHostException { AppContext context = new AppContext(); final Cryptograph cryptographMock = mock(Cryptograph.class); - final JmxProxy jmxProxyMock = mock(JmxProxy.class); + final JmxProxy jmxProxyMock = JmxProxyTest.mockJmxProxyImpl(); final AtomicInteger connectionAttempts = new AtomicInteger(0); context.jmxConnectionFactory = new JmxConnectionFactory(context, cryptographMock) { @@ -83,10 +84,10 @@ protected JmxProxy connectImpl(Node node) throws ReaperException { * @throws ReaperException */ @Test - public void initializerDatacenterAvailabilityLocalTest() throws ReaperException { + public void initializerDatacenterAvailabilityLocalTest() throws ReaperException, UnknownHostException { AppContext context = new AppContext(); final Cryptograph cryptographMock = mock(Cryptograph.class); - final JmxProxy jmxProxyMock = mock(JmxProxy.class); + final JmxProxy jmxProxyMock = JmxProxyTest.mockJmxProxyImpl(); final AtomicInteger connectionAttempts = new AtomicInteger(0); context.jmxConnectionFactory = new JmxConnectionFactory(context, cryptographMock) { diff --git a/src/server/src/test/java/io/cassandrareaper/jmx/JmxCustomPortTest.java b/src/server/src/test/java/io/cassandrareaper/jmx/JmxCustomPortTest.java index 35077c401..7933e65ec 100644 --- a/src/server/src/test/java/io/cassandrareaper/jmx/JmxCustomPortTest.java +++ b/src/server/src/test/java/io/cassandrareaper/jmx/JmxCustomPortTest.java @@ -25,6 +25,7 @@ import io.cassandrareaper.crypto.Cryptograph; import io.cassandrareaper.storage.CassandraStorage; +import java.net.UnknownHostException; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; @@ -32,7 +33,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class JmxCustomPortTest { @@ -42,11 +45,14 @@ public final class JmxCustomPortTest { * @throws ReaperException */ @Test - public void customJmxPortTest() throws ReaperException, InterruptedException { + public void customJmxPortTest() throws ReaperException, InterruptedException, UnknownHostException { AppContext context = new AppContext(); final Cryptograph cryptographMock = mock(Cryptograph.class); - final JmxProxy jmxProxyMock = mock(JmxProxy.class); + final JmxProxy jmxProxyMock = JmxProxyTest.mockJmxProxyImpl(); final AtomicInteger port = new AtomicInteger(0); + HostConnectionCounters hostConnectionCounters = mock(HostConnectionCounters.class); + when(hostConnectionCounters.getSuccessfulConnections(any())).thenReturn(1); + context.jmxConnectionFactory = new JmxConnectionFactory(context, cryptographMock) { @Override @@ -55,6 +61,11 @@ protected JmxProxy connectImpl(Node node) throws ReaperException { port.set(node.getJmxPort()); return jmx; } + + @Override + public HostConnectionCounters getHostConnectionCounters() { + return hostConnectionCounters; + } }; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java index ac57bcf18..c3bfc7e0d 100644 --- a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java @@ -19,6 +19,7 @@ import io.cassandrareaper.ReaperException; +import java.net.UnknownHostException; import java.util.Optional; import java.util.Random; @@ -33,12 +34,15 @@ import org.mockito.Mockito; import static org.junit.Assert.assertEquals; - +import static org.mockito.ArgumentMatchers.any; public final class JmxProxyTest { - public static JmxProxy mockJmxProxyImpl() { + public static JmxProxy mockJmxProxyImpl() throws UnknownHostException { JmxProxyImpl impl = Mockito.mock(JmxProxyImpl.class); Mockito.when(impl.getUntranslatedHost()).thenReturn("test-host-" + new Random().nextInt()); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = Mockito.mock(EndpointSnitchInfoMBean.class); + Mockito.when(endpointSnitchInfoMBean.getDatacenter(any())).thenReturn("dc1"); + Mockito.when(impl.getEndpointSnitchInfoMBean()).thenReturn(endpointSnitchInfoMBean); return impl; } diff --git a/src/server/src/test/java/io/cassandrareaper/service/HeartTest.java b/src/server/src/test/java/io/cassandrareaper/service/HeartTest.java index 7d73000eb..16a2a92d1 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/HeartTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/HeartTest.java @@ -21,22 +21,17 @@ import io.cassandrareaper.ReaperApplicationConfiguration; import io.cassandrareaper.ReaperException; import io.cassandrareaper.core.Cluster; -import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.crypto.NoopCrypotograph; -import io.cassandrareaper.jmx.HostConnectionCounters; import io.cassandrareaper.jmx.JmxConnectionFactory; import io.cassandrareaper.jmx.JmxProxy; import io.cassandrareaper.storage.CassandraStorage; import io.cassandrareaper.storage.MemoryStorage; import java.util.Collection; -import java.util.Collections; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import javax.management.JMException; - import com.google.common.collect.ImmutableSet; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; @@ -92,7 +87,6 @@ public void testBeat_distributedStorage_noDatacenterAvailability() } Mockito.verify(context.storage, Mockito.times(0)).getClusters(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -119,7 +113,6 @@ public void testBeat_distributedStorage_allDatacenterAvailability() } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); Mockito.verify(context.storage, Mockito.times(0)).getClusters(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -137,7 +130,6 @@ public void testBeat_distributedStorage_eachDatacenterAvailability() throws Inte Thread.sleep(500); } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -169,7 +161,6 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs() Thread.sleep(500); } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -195,10 +186,6 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_noMet context.storage = Mockito.mock(CassandraStorage.class); context.jmxConnectionFactory = Mockito.mock(JmxConnectionFactory.class); - Mockito - .when(((CassandraStorage)context.storage).getNodeMetrics(any())) - .thenReturn(Collections.emptyList()); - try (Heart heart = Heart.create(context)) { context.isDistributed.set(true); heart.beat(); @@ -206,9 +193,7 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_noMet } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).getNodeMetrics(any()); Mockito.verify(context.jmxConnectionFactory, Mockito.times(0)).connectAny(any(Collection.class)); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -235,23 +220,10 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_noReq context.storage = Mockito.mock(CassandraStorage.class); context.jmxConnectionFactory = Mockito.mock(JmxConnectionFactory.class); - Mockito - .when(((CassandraStorage)context.storage).getNodeMetrics(any())) - .thenReturn( - Collections.singleton( - NodeMetrics.builder() - .withNode("test") - .withDatacenter("dc1") - .withCluster("cluster1") - .build())); - JmxProxy nodeProxy = Mockito.mock(JmxProxy.class); Mockito.when(context.jmxConnectionFactory.connectAny(any(Collection.class))).thenReturn(nodeProxy); - HostConnectionCounters hostConnectionCounters = Mockito.mock(HostConnectionCounters.class); - Mockito.when(context.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(hostConnectionCounters); - try (Heart heart = Heart.create(context)) { context.isDistributed.set(true); heart.beat(); @@ -259,9 +231,7 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_noReq } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).getNodeMetrics(any()); Mockito.verify(context.jmxConnectionFactory, Mockito.times(0)).connectAny(any(Collection.class)); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(0)).storeNodeMetrics(any(), any()); } @Test @@ -288,17 +258,6 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_reque context.storage = Mockito.mock(CassandraStorage.class); context.jmxConnectionFactory = Mockito.mock(JmxConnectionFactory.class); - Mockito - .when(((CassandraStorage)context.storage).getNodeMetrics(any())) - .thenReturn( - Collections.singleton( - NodeMetrics.builder() - .withNode("test") - .withDatacenter("dc1") - .withCluster("cluster1") - .withRequested(true) - .build())); - Mockito.when(((CassandraStorage) context.storage).getCluster(any())) .thenReturn( Cluster.builder() @@ -318,76 +277,5 @@ public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_reque } Mockito.verify((CassandraStorage)context.storage, Mockito.times(1)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).getNodeMetrics(any()); - Mockito.verify(context.jmxConnectionFactory, Mockito.times(2)).connectAny(any(Collection.class)); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).storeNodeMetrics(any(), any()); - } - - @Test - public void testBeat_distributedStorage_eachDatacenterAvailability_repairs_requests_queued() - throws InterruptedException, ReaperException, JMException { - - AppContext context = new AppContext(); - context.config = new ReaperApplicationConfiguration(); - context.config.setDatacenterAvailability(ReaperApplicationConfiguration.DatacenterAvailability.EACH); - context.storage = Mockito.mock(CassandraStorage.class); - - context.repairManager = RepairManager.create( - context, - Executors.newScheduledThreadPool(1), - REPAIR_TIMEOUT_S, - TimeUnit.SECONDS, - RETRY_DELAY_S, - TimeUnit.SECONDS, - 1); - - context.repairManager.repairRunners.put(UUID.randomUUID(), Mockito.mock(RepairRunner.class)); - context.repairManager.repairRunners.put(UUID.randomUUID(), Mockito.mock(RepairRunner.class)); - - context.storage = Mockito.mock(CassandraStorage.class); - context.jmxConnectionFactory = Mockito.mock(JmxConnectionFactory.class); - - Mockito - .when(((CassandraStorage)context.storage).getNodeMetrics(any())) - .thenReturn( - Collections.singleton( - NodeMetrics.builder() - .withNode("test") - .withDatacenter("dc1") - .withCluster("cluster1") - .withRequested(true) - .build())); - - Mockito.when(((CassandraStorage) context.storage).getCluster(any())) - .thenReturn( - Cluster.builder() - .withName("cluster1") - .withSeedHosts(ImmutableSet.of("test")) - .withJmxPort(7199) - .build()); - - JmxProxy nodeProxy = Mockito.mock(JmxProxy.class); - Mockito.when(context.jmxConnectionFactory.connectAny(any(Collection.class))).thenReturn(nodeProxy); - - Mockito.when(nodeProxy.getPendingCompactions()) - .then(a -> { - // delay the call so force the forkJoinPool queue - Thread.sleep(2501); - return 10; - }); - - try (Heart heart = Heart.create(context, TimeUnit.SECONDS.toMillis(2))) { - context.isDistributed.set(true); - heart.beat(); - Assertions.assertThat(heart.isCurrentlyUpdatingNodeMetrics().get()).isTrue(); - Thread.sleep(2100); - heart.beat(); - Thread.sleep(500); - } - - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).saveHeartbeat(); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).getNodeMetrics(any()); - Mockito.verify(context.jmxConnectionFactory, Mockito.times(2)).connectAny(any(Collection.class)); - Mockito.verify((CassandraStorage)context.storage, Mockito.times(2)).storeNodeMetrics(any(), any()); } } diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java index 61780bd60..67ffc2d06 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java @@ -212,7 +212,7 @@ public void buildReplicasToRangeMapTest() { } @Test - public void generateSegmentsTest() throws ReaperException { + public void generateSegmentsTest() throws ReaperException, UnknownHostException { Cluster cluster = Cluster.builder() .withName("test_" + RandomStringUtils.randomAlphabetic(12)) .withSeedHosts(ImmutableSet.of("127.0.0.1", "127.0.0.2", "127.0.0.3")) diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java index 27e2b7031..8129f99c4 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java @@ -1007,8 +1007,6 @@ public void getNodeMetricsInLocalDCAvailabilityForRemoteDCNodeTest() throws Exce .repairParallelism(RepairParallelism.PARALLEL) .tables(TABLES).build(UUID.randomUUID()); - when(((IDistributedStorage) context.storage).getNodeMetrics(any(), any())) - .thenReturn(Optional.empty()); Mockito.when(((IDistributedStorage) context.storage).countRunningReapers()).thenReturn(1); Mockito.when(((CassandraStorage) context.storage).getRepairRun(any())).thenReturn(Optional.of(run)); Mockito.when(((CassandraStorage) context.storage).getRepairUnit(any(UUID.class))).thenReturn(repairUnit); @@ -1042,9 +1040,6 @@ public void getNodeMetricsInLocalDCAvailabilityForRemoteDCNodeTest() throws Exce Pair>> result = repairRunner.getNodeMetrics("node-some", "dc1", "dc2"); assertFalse(result.getRight().call().isPresent()); verify(jmxConnectionFactory, times(0)).connectAny(any(Collection.class)); - // Verify that we didn't call any method that is used in getRemoteNodeMetrics() - verify((CassandraStorage)context.storage, times(0)).storeNodeMetrics(any(), any()); - verify((CassandraStorage)context.storage, times(0)).getNodeMetrics(any(), any()); } @Test diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairUnitServiceTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairUnitServiceTest.java index e2e9503ea..67fd101e0 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairUnitServiceTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairUnitServiceTest.java @@ -28,6 +28,7 @@ import io.cassandrareaper.jmx.JmxProxy; import io.cassandrareaper.jmx.JmxProxyTest; +import java.net.UnknownHostException; import java.util.Collection; import com.datastax.driver.core.utils.UUIDs; @@ -67,7 +68,7 @@ public void setUp() throws Exception { } @Test - public void getTablesToRepairRemoveOneTableTest() throws ReaperException { + public void getTablesToRepairRemoveOneTableTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -90,7 +91,7 @@ public void getTablesToRepairRemoveOneTableTest() throws ReaperException { } @Test - public void getTablesToRepairDefaultCompactionStrategyTable() throws ReaperException { + public void getTablesToRepairDefaultCompactionStrategyTable() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -113,7 +114,7 @@ public void getTablesToRepairDefaultCompactionStrategyTable() throws ReaperExcep } @Test - public void getTablesToRepairRemoveOneTableWithTwcsTest() throws ReaperException { + public void getTablesToRepairRemoveOneTableWithTwcsTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -135,7 +136,7 @@ public void getTablesToRepairRemoveOneTableWithTwcsTest() throws ReaperException } @Test - public void getTablesToRepairRemoveTwoTablesTest() throws ReaperException { + public void getTablesToRepairRemoveTwoTablesTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -158,7 +159,7 @@ public void getTablesToRepairRemoveTwoTablesTest() throws ReaperException { } @Test - public void getTablesToRepairRemoveTwoTablesOneWithTwcsTest() throws ReaperException { + public void getTablesToRepairRemoveTwoTablesOneWithTwcsTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -181,7 +182,7 @@ public void getTablesToRepairRemoveTwoTablesOneWithTwcsTest() throws ReaperExcep } @Test - public void getTablesToRepairRemoveOneTableFromListTest() throws ReaperException { + public void getTablesToRepairRemoveOneTableFromListTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -205,7 +206,7 @@ public void getTablesToRepairRemoveOneTableFromListTest() throws ReaperException } @Test - public void getTablesToRepairRemoveOneTableFromListOneWithTwcsTest() throws ReaperException { + public void getTablesToRepairRemoveOneTableFromListOneWithTwcsTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -229,7 +230,7 @@ public void getTablesToRepairRemoveOneTableFromListOneWithTwcsTest() throws Reap } @Test(expected = IllegalStateException.class) - public void getTablesToRepairRemoveAllFailingTest() throws ReaperException { + public void getTablesToRepairRemoveAllFailingTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); @@ -252,7 +253,7 @@ public void getTablesToRepairRemoveAllFailingTest() throws ReaperException { } @Test(expected = IllegalStateException.class) - public void getTablesToRepairRemoveAllFromListFailingTest() throws ReaperException { + public void getTablesToRepairRemoveAllFromListFailingTest() throws ReaperException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); when(proxy.getCassandraVersion()).thenReturn("3.11.4"); when(context.jmxConnectionFactory.connectAny(Mockito.any(Collection.class))).thenReturn(proxy); diff --git a/src/server/src/test/java/io/cassandrareaper/service/SnapshotServiceTest.java b/src/server/src/test/java/io/cassandrareaper/service/SnapshotServiceTest.java index f12dac100..d6bacad04 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/SnapshotServiceTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/SnapshotServiceTest.java @@ -117,7 +117,7 @@ public void testListSnapshot() throws InterruptedException, ReaperException, Cla .listSnapshots(Node.builder().withHostname("127.0.0.1").build()); Assertions.assertThat(result).isEmpty(); - verify(storageMBean, times(1)).getSnapshotDetails(); + verify(storageMBean, times(0)).getSnapshotDetails(); } @Test diff --git a/src/server/src/test/java/io/cassandrareaper/service/StreamServiceTest.java b/src/server/src/test/java/io/cassandrareaper/service/StreamServiceTest.java index bda923184..599ebe15f 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/StreamServiceTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/StreamServiceTest.java @@ -19,15 +19,18 @@ import io.cassandrareaper.AppContext; import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.Cluster; import io.cassandrareaper.core.Node; import io.cassandrareaper.core.StreamSession; import io.cassandrareaper.jmx.ClusterFacade; +import io.cassandrareaper.jmx.HostConnectionCounters; import io.cassandrareaper.jmx.JmxConnectionFactory; import io.cassandrareaper.jmx.JmxProxy; import io.cassandrareaper.jmx.JmxProxyTest; import java.io.IOException; import java.net.URL; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; @@ -58,7 +61,8 @@ public class StreamServiceTest { @Test - public void testListStreams() throws ReaperException, ClassNotFoundException, InterruptedException { + public void testListStreams() + throws ReaperException, ClassNotFoundException, InterruptedException, UnknownHostException { JmxProxy proxy = JmxProxyTest.mockJmxProxyImpl(); StreamManagerMBean streamingManagerMBean = Mockito.mock(StreamManagerMBean.class); JmxProxyTest.mockGetStreamManagerMBean(proxy, streamingManagerMBean); @@ -68,11 +72,13 @@ public void testListStreams() throws ReaperException, ClassNotFoundException, In cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); - StreamService + // do the actual pullStreams() call, which should succeed + List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); } @@ -99,13 +105,17 @@ public void testGetStreams_2_0_17() cxt.config = TestRepairConfiguration.defaultConfig(); cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); + HostConnectionCounters connectionCounters = mock(HostConnectionCounters.class); + when(cxt.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(connectionCounters); + when(connectionCounters.getSuccessfulConnections(any())).thenReturn(1); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); // do the actual pullStreams() call, which should succeed List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); assertEquals(1, result.size()); @@ -133,13 +143,17 @@ public void testGetStreams_2_1_20() cxt.config = TestRepairConfiguration.defaultConfig(); cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); + HostConnectionCounters connectionCounters = mock(HostConnectionCounters.class); + when(cxt.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(connectionCounters); + when(connectionCounters.getSuccessfulConnections(any())).thenReturn(1); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); // do the actual pullStreams() call, which should succeed List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); assertEquals(1, result.size()); @@ -167,13 +181,17 @@ public void testGetStreams_2_2_12() cxt.config = TestRepairConfiguration.defaultConfig(); cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); + HostConnectionCounters connectionCounters = mock(HostConnectionCounters.class); + when(cxt.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(connectionCounters); + when(connectionCounters.getSuccessfulConnections(any())).thenReturn(1); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); // do the actual pullStreams() call, which should succeed List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); assertEquals(1, result.size()); @@ -201,13 +219,17 @@ public void testGetStreams_3_11_2() cxt.config = TestRepairConfiguration.defaultConfig(); cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); + HostConnectionCounters connectionCounters = mock(HostConnectionCounters.class); + when(cxt.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(connectionCounters); + when(connectionCounters.getSuccessfulConnections(any())).thenReturn(1); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); // do the actual pullStreams() call, which should succeed List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); assertEquals(1, result.size()); @@ -232,13 +254,17 @@ public void testGetStreams_4_0_0() cxt.config = TestRepairConfiguration.defaultConfig(); cxt.jmxConnectionFactory = mock(JmxConnectionFactory.class); when(cxt.jmxConnectionFactory.connectAny(Mockito.anyList())).thenReturn(proxy); + HostConnectionCounters connectionCounters = mock(HostConnectionCounters.class); + when(cxt.jmxConnectionFactory.getHostConnectionCounters()).thenReturn(connectionCounters); + when(connectionCounters.getSuccessfulConnections(any())).thenReturn(1); ClusterFacade clusterFacadeSpy = Mockito.spy(ClusterFacade.create(cxt)); - Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any()); + Mockito.doReturn("dc1").when(clusterFacadeSpy).getDatacenter(any(), any()); // do the actual pullStreams() call, which should succeed List result = StreamService .create(() -> clusterFacadeSpy) - .listStreams(Node.builder().withHostname("127.0.0.1").build()); + .listStreams(Node.builder().withHostname("127.0.0.1").withCluster(Cluster.builder().withJmxPort(7199) + .withSeedHosts(ImmutableSet.of("127.0.0.1")).withName("test").build()).build()); verify(streamingManagerMBean, times(1)).getCurrentStreams(); assertEquals(1, result.size()); diff --git a/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java index dea08e47b..85592cb0f 100644 --- a/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java +++ b/src/server/src/test/java/io/cassandrareaper/storage/PostgresStorageTest.java @@ -19,7 +19,6 @@ import io.cassandrareaper.AppContext; import io.cassandrareaper.core.GenericMetric; -import io.cassandrareaper.core.NodeMetrics; import io.cassandrareaper.storage.postgresql.IStoragePostgreSql; import io.cassandrareaper.storage.postgresql.UuidUtil; @@ -35,7 +34,6 @@ import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -206,92 +204,6 @@ public void testSaveHeartbeat() { Assertions.assertThat(numReapers).isEqualTo(1); } - @Test - public void testNodeMetrics() { - DBI dbi = new DBI(DB_URL); - UUID reaperInstanceId = UUID.randomUUID(); - PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); - Assertions.assertThat(storage.isStorageConnected()).isTrue(); - - Handle handle = dbi.open(); - handle.execute("DELETE from node_metrics_v1"); - - UUID runId = UUID.randomUUID(); - - // test empty result set - ArrayList emptyNmList = (ArrayList) storage.getNodeMetrics(runId); - Assertions.assertThat(emptyNmList.size()).isEqualTo(0); - - NodeMetrics originalNm = NodeMetrics.builder() - .withNode("fake_node") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withHasRepairRunning(true) - .withPendingCompactions(4) - .withActiveAnticompactions(1) - .build(); - - storage.storeNodeMetrics(runId, originalNm); - ArrayList nodeMetricsList = (ArrayList) storage.getNodeMetrics(runId); - Assertions.assertThat(nodeMetricsList.size()).isEqualTo(1); - - NodeMetrics fetchedNm = nodeMetricsList.get(0); - Assertions.assertThat(fetchedNm.getNode()).isEqualTo(originalNm.getNode()); - Assertions.assertThat(fetchedNm.getCluster()).isEqualTo(originalNm.getCluster()); - Assertions.assertThat(fetchedNm.getDatacenter()).isEqualTo(originalNm.getDatacenter()); - Assertions.assertThat(fetchedNm.hasRepairRunning()).isEqualTo(originalNm.hasRepairRunning()); - Assertions.assertThat(fetchedNm.getPendingCompactions()).isEqualTo(originalNm.getPendingCompactions()); - Assertions.assertThat(fetchedNm.getActiveAnticompactions()).isEqualTo(originalNm.getActiveAnticompactions()); - } - - @Test - public void testNodeMetricsByNode() throws InterruptedException { - DBI dbi = new DBI(DB_URL); - UUID reaperInstanceId = UUID.randomUUID(); - PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); - Assertions.assertThat(storage.isStorageConnected()).isTrue(); - - Handle handle = dbi.open(); - handle.execute("DELETE from node_metrics_v1"); - - UUID runId = UUID.randomUUID(); - - NodeMetrics nmRequest = NodeMetrics.builder() - .withNode("fake_node1") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withRequested(true) - .build(); - - NodeMetrics nm1 = NodeMetrics.builder() - .withNode("fake_node1") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withHasRepairRunning(true) - .withPendingCompactions(4) - .withActiveAnticompactions(1) - .build(); - - // store a metric request and a metric response - storage.storeNodeMetrics(runId, nmRequest); - TimeUnit.MILLISECONDS.sleep(100); - storage.storeNodeMetrics(runId, nm1); - - Optional fetchedNm1Opt = storage.getNodeMetrics(runId, "fake_node1"); - Assertions.assertThat(fetchedNm1Opt.isPresent()).isTrue(); - NodeMetrics fetchedNm1 = fetchedNm1Opt.get(); - Assertions.assertThat(fetchedNm1.getNode()).isEqualTo(nm1.getNode()); - Assertions.assertThat(fetchedNm1.getCluster()).isEqualTo(nm1.getCluster()); - Assertions.assertThat(fetchedNm1.getDatacenter()).isEqualTo(nm1.getDatacenter()); - Assertions.assertThat(fetchedNm1.hasRepairRunning()).isEqualTo(nm1.hasRepairRunning()); - Assertions.assertThat(fetchedNm1.getPendingCompactions()).isEqualTo(nm1.getPendingCompactions()); - Assertions.assertThat(fetchedNm1.getActiveAnticompactions()).isEqualTo(nm1.getActiveAnticompactions()); - - // test that fetching a non-existent metric returns Optional.Empty() - Optional fetchedNm2Opt = storage.getNodeMetrics(runId, "fake_node2"); - Assertions.assertThat(fetchedNm2Opt.isPresent()).isFalse(); - } - @Test public void testNodeOperations() { DBI dbi = new DBI(DB_URL); @@ -480,86 +392,4 @@ public void testUpdateLeaderEntry() throws InterruptedException { Assertions.assertThat(rowsUpdated).isEqualTo(1); // should update b/c original entry has expired } - - @Test - public void testDeleteOldNodeMetrics() throws InterruptedException { - System.out.println("Testing metrics timeout (this will take a minute)..."); - DBI dbi = new DBI(DB_URL); - UUID reaperInstanceId = UUID.randomUUID(); - PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi, 1, 1, 1, 1); - Assertions.assertThat(storage.isStorageConnected()).isTrue(); - - Handle handle = dbi.open(); - handle.execute("DELETE from node_metrics_v1"); - - UUID runId = UUID.randomUUID(); - NodeMetrics originalNm = NodeMetrics.builder() - .withNode("fake_node") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withHasRepairRunning(true) - .withPendingCompactions(4) - .withActiveAnticompactions(1) - .build(); - storage.storeNodeMetrics(runId, originalNm); - - // first delete attempt shouldn't do anything because the entry hasn't passed its expiration time - storage.purgeNodeMetrics(); - int numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") - .mapTo(Integer.class) - .first(); - Assertions.assertThat(numMetrics).isEqualTo(1); - - TimeUnit.SECONDS.sleep(61); - - // second delete attempt should work because entry has passed its expiration time - storage.purgeNodeMetrics(); - numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") - .mapTo(Integer.class) - .first(); - Assertions.assertThat(numMetrics).isEqualTo(0); - } - - @Test - public void testManualDeleteNodeMetrics() { - DBI dbi = new DBI(DB_URL); - UUID reaperInstanceId = UUID.randomUUID(); - PostgresStorage storage = new PostgresStorage(reaperInstanceId, dbi); - Assertions.assertThat(storage.isStorageConnected()).isTrue(); - - Handle handle = dbi.open(); - handle.execute("DELETE from node_metrics_v1"); - - UUID runId = UUID.randomUUID(); - NodeMetrics nm1 = NodeMetrics.builder() - .withNode("fake_node_1") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withHasRepairRunning(true) - .withPendingCompactions(4) - .withActiveAnticompactions(1) - .build(); - NodeMetrics nm2 = NodeMetrics.builder() - .withNode("fake_node2") - .withCluster("fake_cluster") - .withDatacenter("NYDC") - .withHasRepairRunning(true) - .withPendingCompactions(4) - .withActiveAnticompactions(1) - .build(); - storage.storeNodeMetrics(runId, nm1); - storage.storeNodeMetrics(runId, nm2); - - int numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") - .mapTo(Integer.class) - .first(); - Assertions.assertThat(numMetrics).isEqualTo(2); - - // delete metrics from table for fake_node_1 and verify delete succeeds - storage.deleteNodeMetrics(runId, "fake_node_1"); - numMetrics = handle.createQuery("SELECT COUNT(*) FROM node_metrics_v1") - .mapTo(Integer.class) - .first(); - Assertions.assertThat(numMetrics).isEqualTo(1); - } }