Skip to content

Commit

Permalink
Merge branch '2.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Jul 11, 2021
2 parents ceb70c6 + 5b36f67 commit b3e30d7
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ public void run(ReaperApplicationConfiguration config, Environment environment)
final PingResource pingResource = new PingResource(healthCheck);
environment.jersey().register(pingResource);

final ClusterResource addClusterResource = new ClusterResource(
context,
cryptograph,
environment.lifecycle().executorService("ClusterResource").minThreads(6).maxThreads(6).build());
final ClusterResource addClusterResource = ClusterResource.create(context, cryptograph);

environment.jersey().register(addClusterResource);
final RepairRunResource addRepairRunResource = new RepairRunResource(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ public List<String> getLiveNodes(Cluster cluster, Collection<String> endpoints)
* @return a NodeStatus object with all nodes state
* @throws ReaperException any runtime exception we catch
*/
public NodesStatus getNodesStatus(Cluster cluster, Collection<String> endpoints) throws ReaperException {
JmxProxy jmxProxy = connect(cluster, endpoints);
public NodesStatus getNodesStatus(Cluster cluster) throws ReaperException {
JmxProxy jmxProxy = connect(cluster);
FailureDetectorProxy proxy = FailureDetectorProxy.create(jmxProxy);
return new NodesStatus(jmxProxy.getHost(), proxy.getAllEndpointsState(), proxy.getSimpleStates());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,18 @@
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.crypto.Cryptograph;
import io.cassandrareaper.jmx.ClusterFacade;
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.resources.view.ClusterStatus;
import io.cassandrareaper.resources.view.NodesStatus;
import io.cassandrareaper.service.ClusterRepairScheduler;

import java.net.URI;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.ws.rs.DELETE;
Expand All @@ -59,12 +52,11 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import com.codahale.metrics.InstrumentedExecutorService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -76,19 +68,26 @@ public final class ClusterResource {
private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class);

private final AppContext context;
private final ExecutorService executor;
private final ClusterRepairScheduler clusterRepairScheduler;
private final ClusterFacade clusterFacade;
private final Cryptograph cryptograph;

public ClusterResource(AppContext context, Cryptograph cryptograph, ExecutorService executor) {
private ClusterResource(AppContext context, Cryptograph cryptograph, Supplier<ClusterFacade> clusterFacadeSupplier) {
this.context = context;
this.executor = new InstrumentedExecutorService(executor, context.metricRegistry);
this.clusterRepairScheduler = new ClusterRepairScheduler(context);
this.clusterFacade = ClusterFacade.create(context);
this.clusterFacade = clusterFacadeSupplier.get();
this.cryptograph = cryptograph;
}

@VisibleForTesting
static ClusterResource create(AppContext context, Cryptograph cryptograph, Supplier<ClusterFacade> supplier) {
return new ClusterResource(context, cryptograph, supplier);
}

public static ClusterResource create(AppContext context, Cryptograph cryptograph) {
return new ClusterResource(context, cryptograph, () -> ClusterFacade.create(context));
}

@GET
public Response getClusterList(@QueryParam("seedHost") Optional<String> seedHost) {
LOG.debug("get cluster list called");
Expand Down Expand Up @@ -129,10 +128,15 @@ public Response getCluster(
jmxPasswordIsSet,
context.storage.getClusterRunStatuses(cluster.getName(), limit.orElse(Integer.MAX_VALUE)),
context.storage.getClusterScheduleStatuses(cluster.getName()),
getNodesStatus(cluster));
clusterFacade.getNodesStatus(cluster));

return Response.ok().entity(clusterStatus).build();
} catch (IllegalArgumentException ignore) { }
} catch (IllegalArgumentException ignore) {
// Ignoring this exception
} catch (ReaperException e) {
LOG.error("Failed getting cluster {} info", clusterName, e);
return Response.status(500).entity(e).build();
}
return Response.status(404).entity("cluster with name \"" + clusterName + "\" not found").build();
}

Expand Down Expand Up @@ -415,59 +419,6 @@ public Response deleteCluster(
}
}

/**
* Callable to get and parse endpoint states through JMX
*
* @param cluster the cluster object contains additional connection info like jmx port and jmx credentials
* @param seeds The host address to connect to via JMX
* @return An optional NodesStatus object with the status of each node in the cluster as seen from
* the seedHost node
*/
private Callable<NodesStatus> getEndpointState(Cluster cluster, Set<String> seeds) {
return () -> {
try {
return clusterFacade.getNodesStatus(cluster, seeds);
} catch (RuntimeException e) {
LOG.debug("failed to get endpoints for cluster {} with seeds {}", cluster.getName(), seeds, e);
Thread.sleep((int) JmxProxy.DEFAULT_JMX_CONNECTION_TIMEOUT.getSeconds() * 1000);
return new NodesStatus(Collections.EMPTY_LIST);
}
};
}

/**
* Get all nodes state by querying the AllEndpointsState attribute through JMX.
*
* <p>
* To speed up execution, the method calls JMX on 3 nodes asynchronously and processes the first response
*
* @return An optional NodesStatus object with all nodes statuses
*/
private NodesStatus getNodesStatus(Cluster cluster) {
List<Callable<NodesStatus>> endpointStateTasks = Lists.newArrayList();
List<String> seedHosts = new ArrayList<>(cluster.getSeedHosts());
Collections.shuffle(seedHosts);
int index = 0;
for (String host : seedHosts) {
if (index >= 3) {
break;
}
endpointStateTasks.add(getEndpointState(cluster, Collections.singleton(host)));
index++;
}

try {
return executor.invokeAny(
endpointStateTasks,
(int) JmxProxy.DEFAULT_JMX_CONNECTION_TIMEOUT.getSeconds(),
TimeUnit.SECONDS);

} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.debug("failed grabbing nodes status", e);
}
return new NodesStatus(Collections.EMPTY_LIST);
}

/*
* Creates a Set of seed hosts based on the comma delimited string passed
* as argument when adding a cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,8 @@ private static void initializeCassandraSchema(
.filter((reaperInstanceHost) -> !AppContext.REAPER_INSTANCE_ADDRESS.equals(reaperInstanceHost))
.collect(Collectors.toList());

Preconditions.checkState(
otherRunningReapers.isEmpty(),
"Database migration can not happen with other reaper instances running. Found ",
LOG.warn(
"Database migration is happenning with other reaper instances possibly running. Found {}",
StringUtils.join(otherRunningReapers));
}

Expand Down
Loading

0 comments on commit b3e30d7

Please sign in to comment.