Skip to content

Commit

Permalink
Fix EACH and LOCAL modes
Browse files Browse the repository at this point in the history
The metrics request queue was removed in 2.2 to rely on metrics regularly extracted to the backend during heartbeats.
The code was missing support for such extractions in EACH and LOCAL modes, which made it impossible to perform repairs in EACH mode especially.
There was also a some dead code which only served the obsolete metrics request queue system.
The failed/successful connection counters to optimize scenarios where cross DC JMX is forbidden were fixed as well as they were sticking at their initial value all the time. They now count correctly successful connections and avoid spamming the logs with failed connection attempts.
  • Loading branch information
adejanovski committed Mar 9, 2021
1 parent a4e5893 commit 9083694
Show file tree
Hide file tree
Showing 18 changed files with 202 additions and 614 deletions.
28 changes: 18 additions & 10 deletions src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public CompactionStats listActiveCompactions(Node node)
throws MalformedObjectNameException, ReflectionException, ReaperException, InterruptedException, IOException {

LOG.debug("Listing active compactions for node {}", node);
String nodeDc = getDatacenter(node);
String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname());
if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) {
LOG.debug("Yay!! Node {} in DC {} is accessible through JMX", node.getHostname(), nodeDc);
// We have direct JMX access to the node
Expand Down Expand Up @@ -538,7 +538,7 @@ public Map<String, List<JmxStat>> collectMetrics(Node node, String[] collectedMe
*/
public List<MetricsHistogram> getClientRequestLatencies(Node node) throws ReaperException {
try {
String nodeDc = getDatacenter(node);
String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname());
if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) {
MetricsProxy metricsProxy = MetricsProxy.create(connect(node));
return convertToMetricsHistogram(
Expand All @@ -553,7 +553,7 @@ public List<MetricsHistogram> getClientRequestLatencies(Node node) throws Reaper
"ClientRequest",
DateTime.now().minusMinutes(METRICS_PARTITIONING_TIME_MINS + 1).getMillis()));
}
} catch (JMException | InterruptedException | IOException e) {
} catch (JMException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", node, e);
throw new ReaperException(e);
}
Expand All @@ -568,7 +568,7 @@ public List<MetricsHistogram> getClientRequestLatencies(Node node) throws Reaper
*/
public List<DroppedMessages> getDroppedMessages(Node node) throws ReaperException {
try {
String nodeDc = getDatacenter(node);
String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname());
if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) {
MetricsProxy proxy = MetricsProxy.create(connect(node));
return convertToDroppedMessages(MetricsProxy.convertToGenericMetrics(proxy.collectDroppedMessages(), node));
Expand All @@ -581,7 +581,7 @@ public List<DroppedMessages> getDroppedMessages(Node node) throws ReaperExceptio
"DroppedMessage",
DateTime.now().minusMinutes(1).getMillis()));
}
} catch (JMException | InterruptedException | IOException e) {
} catch (JMException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", node, e);
throw new ReaperException(e);
}
Expand Down Expand Up @@ -611,7 +611,7 @@ public List<DroppedMessages> convertToDroppedMessages(List<GenericMetric> metric
*/
public List<ThreadPoolStat> getTpStats(Node node) throws ReaperException {
try {
String nodeDc = getDatacenter(node);
String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname());
if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) {
MetricsProxy proxy = MetricsProxy.create(connect(node));
return convertToThreadPoolStats(MetricsProxy.convertToGenericMetrics(proxy.collectTpStats(), node));
Expand All @@ -624,7 +624,7 @@ public List<ThreadPoolStat> getTpStats(Node node) throws ReaperException {
"ThreadPools",
DateTime.now().minusMinutes(1).getMillis()));
}
} catch (JMException | InterruptedException | IOException e) {
} catch (JMException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", node, e);
throw new ReaperException(e);
}
Expand Down Expand Up @@ -697,11 +697,16 @@ public Pair<Node, String> takeSnapshot(String snapshotName, Node host, String...
*/
public List<Snapshot> listSnapshots(Node host) throws ReaperException {
try {
return SnapshotProxy.create(connect(host)).listSnapshots();
if (context.config.getDatacenterAvailability().isInCollocatedMode()
&& context.jmxConnectionFactory.getHostConnectionCounters().getSuccessfulConnections(
host.getHostname()) >= 0) {
return SnapshotProxy.create(connect(host)).listSnapshots();
}
} catch (UnsupportedOperationException unsupported) {
LOG.debug("Listing snapshot is unsupported with Cassandra 2.0 and prior");
throw unsupported;
}
return Collections.emptyList();
}

/**
Expand Down Expand Up @@ -731,7 +736,7 @@ public void clearSnapshot(String snapshotName, Node host) throws ReaperException
*/
public List<StreamSession> listActiveStreams(Node node)
throws ReaperException, InterruptedException, IOException {
String nodeDc = getDatacenter(node);
String nodeDc = getDatacenter(node.getCluster().get(), node.getHostname());
if (nodeIsAccessibleThroughJmx(nodeDc, node.getHostname())) {
// We have direct JMX access to the node
return listStreamsDirect(node);
Expand All @@ -741,8 +746,11 @@ public List<StreamSession> listActiveStreams(Node node)

String streamsJson = ((IDistributedStorage) context.storage)
.listOperations(node.getClusterName(), OpType.OP_STREAMING, node.getHostname());
if (streamsJson.length() > 0) {
return parseStreamSessionJson(streamsJson);
}

return parseStreamSessionJson(streamsJson);
return Collections.emptyList();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,19 @@ public final JmxProxy connectAny(Collection<Node> nodes) throws ReaperException
for (int i = 0; i < 2; i++) {
for (Node node : nodeList) {
// First loop, we try the most accessible nodes, then second loop we try all nodes
if (hostConnectionCounters.getSuccessfulConnections(node.getHostname()) >= 0 || 1 == i) {
if (getHostConnectionCounters().getSuccessfulConnections(node.getHostname()) >= 0 || 1 == i) {
try {
return connectImpl(node);
LOG.debug("Trying to connect to node {} with {} successful connections with i = {}",
node.getHostname(), getHostConnectionCounters().getSuccessfulConnections(node.getHostname()), i);
JmxProxy jmxProxy = connectImpl(node);
getHostConnectionCounters().incrementSuccessfulConnections(node.getHostname());
if (getHostConnectionCounters().getSuccessfulConnections(node.getHostname()) > 0) {
accessibleDatacenters.add(EndpointSnitchInfoProxy.create(jmxProxy).getDataCenter());
}
return jmxProxy;
} catch (ReaperException | RuntimeException e) {
LOG.info("Unreachable host: {}: {}", e.getMessage(), e.getCause().getMessage());
LOG.debug("Unreachable host: ", e);
getHostConnectionCounters().decrementSuccessfulConnections(node.getHostname());
LOG.info("Unreachable host: ", e);
} catch (InterruptedException expected) {
LOG.trace("Expected exception", expected);
}
Expand Down Expand Up @@ -172,7 +179,7 @@ public void setJmxmp(Jmxmp jmxmp) {
this.jmxmp = jmxmp;
}

public final HostConnectionCounters getHostConnectionCounters() {
public HostConnectionCounters getHostConnectionCounters() {
return hostConnectionCounters;
}

Expand Down Expand Up @@ -230,13 +237,8 @@ public JmxProxy apply(String host) {
try {
JmxProxy proxy = JmxProxyImpl.connect(
host, jmxCredentials, addressTranslator, connectionTimeout, metricRegistry, cryptograph, jmxmp);
if (hostConnectionCounters.getSuccessfulConnections(host) <= 0) {
accessibleDatacenters.add(EndpointSnitchInfoProxy.create(proxy).getDataCenter());
}
hostConnectionCounters.incrementSuccessfulConnections(host);
return proxy;
} catch (ReaperException | InterruptedException ex) {
hostConnectionCounters.decrementSuccessfulConnections(host);
throw new RuntimeException(ex);
}
}
Expand Down
120 changes: 49 additions & 71 deletions src/server/src/main/java/io/cassandrareaper/service/Heart.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import io.cassandrareaper.ReaperApplicationConfiguration.DatacenterAvailability;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.NodeMetrics;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.jmx.ClusterFacade;
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.storage.IDistributedStorage;
import io.cassandrareaper.storage.IStorage;

import java.util.Arrays;
import java.util.UUID;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,13 +39,13 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public final class Heart implements AutoCloseable {

private static final AtomicBoolean GAUGES_REGISTERED = new AtomicBoolean(false);
Expand Down Expand Up @@ -119,55 +118,68 @@ private void updateRequestedNodeMetrics() {
if (!updatingNodeMetrics.getAndSet(true)) {
forkJoinPool.submit(() -> {
try (Timer.Context t0 = timer(context, "updatingNodeMetrics")) {

ClusterFacade clusterFacade = ClusterFacade.create(context);
Collection<Cluster> clusters = context.storage.getClusters();
forkJoinPool.submit(() -> {
context.repairManager.repairRunners.keySet()
clusters
.parallelStream()
.forEach(runId -> {

((IDistributedStorage) context.storage).getNodeMetrics(runId)
.parallelStream()
.filter(metrics -> canAnswerToNodeMetricsRequest(metrics))
.forEach(req -> {

LOG.info("Got metric request for node {} in {}", req.getNode(), req.getCluster());
try (Timer.Context t1 = timer(
context,
req.getCluster().replace('.', '-'),
req.getNode().replace('.', '-'))) {

.filter(cluster -> cluster.getState() == Cluster.State.ACTIVE)
.forEach(cluster -> {
try {
clusterFacade.getLiveNodes(cluster)
.parallelStream()
.filter(hostname -> {
return context.jmxConnectionFactory
.getHostConnectionCounters()
.getSuccessfulConnections(hostname) >= 0;
})
.map(hostname -> Node.builder()
.withHostname(hostname)
.withCluster(
Cluster.builder()
.withName(cluster.getName())
.withSeedHosts(ImmutableSet.of(hostname))
.build())
.build())
.forEach(node -> {
try {
grabAndStoreNodeMetrics(context.storage, runId, req);

LOG.info("Responded to metric request for node {}", req.getNode());
} catch (ReaperException | RuntimeException | InterruptedException ex) {
LOG.debug("failed seed connection in cluster " + req.getCluster(), ex);
} catch (JMException e) {
LOG.warn(
"failed querying JMX MBean for metrics on node {} of cluster {} due to {}",
req.getNode(), req.getCluster(), e.getMessage());
metricsService.grabAndStoreCompactionStats(Optional.of(node));
metricsService.grabAndStoreActiveStreams(Optional.of(node));
if (lastMetricBeat.get() + maxBeatFrequencyMillis <= System.currentTimeMillis()) {
metricsService.grabAndStoreGenericMetrics(Optional.of(node));
lastMetricBeat.set(System.currentTimeMillis());
}
} catch (JMException | ReaperException | RuntimeException | IOException e) {
LOG.error("Couldn't extract metrics for node {} in cluster {}",
node.getHostname(), cluster.getName(), e);
} catch (InterruptedException e) {
LOG.error("Interrupted while extracting metrics for node {} in cluster {}",
node.getHostname(), cluster.getName(), e);
}
}
});
});
} catch (ReaperException e) {
LOG.error("Couldn't list live nodes in cluster {}", cluster.getName(), e);
e.printStackTrace();
}
});
}).get();

if (context.config.getDatacenterAvailability() == DatacenterAvailability.SIDECAR) {
// In sidecar mode we store metrics in the db on a regular basis

if (lastMetricBeat.get() + maxBeatFrequencyMillis <= System.currentTimeMillis()) {
metricsService.grabAndStoreGenericMetrics();
metricsService.grabAndStoreGenericMetrics(Optional.empty());
lastMetricBeat.set(System.currentTimeMillis());
} else {
LOG.trace("Not storing metrics yet... Last beat was {} and now is {}",
lastMetricBeat.get(),
System.currentTimeMillis());
}
metricsService.grabAndStoreCompactionStats();
metricsService.grabAndStoreActiveStreams();
metricsService.grabAndStoreCompactionStats(Optional.empty());
metricsService.grabAndStoreActiveStreams(Optional.empty());
}
} catch (ExecutionException | InterruptedException | RuntimeException
| ReaperException | JMException | JsonProcessingException ex) {
| ReaperException | JMException | IOException ex) {
LOG.warn("Failed metric collection during heartbeat", ex);
} finally {
assert updatingNodeMetrics.get();
Expand All @@ -177,40 +189,6 @@ private void updateRequestedNodeMetrics() {
}
}

/**
* Checks if the local Reaper instance is supposed to answer a metrics request.
* Requires to be in sidecar on the node for which metrics are requested, or to be in a different mode than ALL.
* Also checks that the metrics record as requested set to true.
*
* @param metric a metric request
* @return true if reaper should try to answer the metric request
*/
private boolean canAnswerToNodeMetricsRequest(NodeMetrics metric) {
return (context.config.getDatacenterAvailability() == DatacenterAvailability.SIDECAR
&& metric.getNode().equals(context.getLocalNodeAddress()))
|| (context.config.getDatacenterAvailability() != DatacenterAvailability.ALL
&& context.config.getDatacenterAvailability() != DatacenterAvailability.SIDECAR)
&& metric.isRequested();
}

private void grabAndStoreNodeMetrics(IStorage storage, UUID runId, NodeMetrics req)
throws ReaperException, InterruptedException, JMException {

Cluster cluster = storage.getCluster(req.getCluster());
JmxProxy nodeProxy = ClusterFacade.create(context).connect(cluster, Arrays.asList(req.getNode()));

((IDistributedStorage) storage).storeNodeMetrics(
runId,
NodeMetrics.builder()
.withNode(req.getNode())
.withCluster(req.getCluster())
.withDatacenter(req.getDatacenter())
.withPendingCompactions(nodeProxy.getPendingCompactions())
.withHasRepairRunning(nodeProxy.isRepairRunning())
.withActiveAnticompactions(0) // for future use
.build());
}

private static Timer.Context timer(AppContext context, String... names) {
return context.metricRegistry.timer(MetricRegistry.name(Heart.class, names)).time();
}
Expand Down
Loading

0 comments on commit 9083694

Please sign in to comment.