Skip to content

Commit

Permalink
fix not closing jmx connector sometimes
Browse files Browse the repository at this point in the history
  • Loading branch information
varjoranta committed Aug 26, 2015
1 parent 323b4b9 commit 4cd8861
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 45 deletions.
75 changes: 44 additions & 31 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.service.RingRange;
Expand Down Expand Up @@ -73,12 +74,14 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
private final StorageServiceMBean ssProxy;
private final Optional<RepairStatusHandler> repairStatusHandler;
private final String host;
private final JMXServiceURL jmxUrl;
private final String clusterName;

private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnector jmxConnector,
StorageServiceMBean ssProxy, ObjectName ssMbeanName,
MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) {
private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXServiceURL jmxUrl,
JMXConnector jmxConnector, StorageServiceMBean ssProxy, ObjectName ssMbeanName,
MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) {
this.host = host;
this.jmxUrl = jmxUrl;
this.jmxConnector = jmxConnector;
this.ssMbeanName = ssMbeanName;
this.mbeanServer = mbeanServer;
Expand All @@ -88,11 +91,12 @@ private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnecto
this.clusterName = Cluster.toSymbolicName(ssProxy.getClusterName());
}


/**
* @see JmxProxy#connect(Optional, String, int, String, String)
*/
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, String username, String password)
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, String username,
String password)
throws ReaperException {
assert null != host : "null host given to JmxProxy.connect()";
String[] parts = host.split(":");
Expand All @@ -103,22 +107,24 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, Stri
}
}


/**
* Connect to JMX interface on the given host and port.
*
* @param handler Implementation of {@link RepairStatusHandler} to process incoming notifications
* of repair events.
* @param host hostname or ip address of Cassandra node
* @param port port number to use for JMX connection
* @param handler Implementation of {@link RepairStatusHandler} to process incoming
* notifications
* of repair events.
* @param host hostname or ip address of Cassandra node
* @param port port number to use for JMX connection
* @param username username to use for JMX authentication
* @param password password to use for JMX authentication
*/
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int port, String username, String password)
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int port,
String username, String password)
throws ReaperException {
JMXServiceURL jmxUrl;
ObjectName ssMbeanName;
ObjectName cmMbeanName;
JMXServiceURL jmxUrl;
try {
jmxUrl = new JMXServiceURL(String.format(JMX_URL, host, port));
ssMbeanName = new ObjectName(SS_OBJECT_NAME);
Expand All @@ -129,7 +135,7 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
}
try {
Map<String, Object> env = new HashMap<String, Object>();
if(username != null && password != null) {
if (username != null && password != null) {
String[] creds = {username, password};
env.put(JMXConnector.CREDENTIALS, creds);
}
Expand All @@ -139,12 +145,13 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
CompactionManagerMBean cmProxy =
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
JmxProxy proxy =
new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy);
JmxProxy proxy = new JmxProxy(handler, host, jmxUrl, jmxConn, ssProxy, ssMbeanName,
mbeanServerConn, cmProxy);
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
LOG.debug(String.format("JMX connection to %s properly connected.", host));
LOG.debug(String.format("JMX connection to %s properly connected: %s",
host, jmxUrl.toString()));
return proxy;
} catch (IOException | InstanceNotFoundException e) {
LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port));
Expand Down Expand Up @@ -293,13 +300,12 @@ public void cancelAllRepairs() {

/**
* Checks if table exists in the cluster by instantiating a MBean for that table.
*
*/
public boolean tableExists(String ks, String cf) {
try {
String type = cf.contains(".") ? "IndexColumnFamilies" : "ColumnFamilies";
String nameStr = String.format("org.apache.cassandra.db:type=*%s,keyspace=%s,columnfamily=%s",
type, ks, cf);
type, ks, cf);
Set<ObjectName> beans = mbeanServer.queryNames(new ObjectName(nameStr), null);
if (beans.isEmpty() || beans.size() != 1) {
return false;
Expand All @@ -308,7 +314,7 @@ public boolean tableExists(String ks, String cf) {
JMX.newMBeanProxy(mbeanServer, bean, ColumnFamilyStoreMBean.class);
} catch (MalformedObjectNameException | IOException e) {
String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf,
e.getMessage());
e.getMessage());
LOG.warn(errMsg);
return false;
}
Expand All @@ -323,7 +329,7 @@ public boolean tableExists(String ks, String cf) {
* @return Repair command number, or 0 if nothing to repair
*/
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
RepairParallelism repairParallelism, Collection<String> columnFamilies) {
RepairParallelism repairParallelism, Collection<String> columnFamilies) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
String cassandraVersion = ssProxy.getReleaseVersion();
boolean canUseDatacenterAware = false;
Expand All @@ -336,27 +342,27 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
+ "host %s, with repair parallelism %s, in cluster with Cassandra "
+ "version '%s' (can use DATACENTER_AWARE '%s'), "
+ "for column families: %s",
beginToken.toString(), endToken.toString(), keyspace, this.host,
repairParallelism, cassandraVersion, canUseDatacenterAware,
columnFamilies);
beginToken.toString(), endToken.toString(), keyspace, this.host,
repairParallelism, cassandraVersion, canUseDatacenterAware,
columnFamilies);
LOG.info(msg);
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
if (canUseDatacenterAware) {
return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
repairParallelism.ordinal(), null, null,
columnFamilies
.toArray(new String[columnFamilies.size()]));
repairParallelism.ordinal(), null, null,
columnFamilies
.toArray(new String[columnFamilies.size()]));
} else {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.",
cassandraVersion);
cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace,
snapshotRepair, false,
columnFamilies.toArray(new String[columnFamilies.size()]));
snapshotRepair, false,
columnFamilies.toArray(new String[columnFamilies.size()]));
}

/**
Expand Down Expand Up @@ -404,11 +410,18 @@ public boolean isConnectionAlive() {
*/
@Override
public void close() throws ReaperException {
LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl));
try {
mbeanServer.removeNotificationListener(ssMbeanName, this);
} catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) {
LOG.warn("failed on removing notification listener");
e.printStackTrace();
}
try {
jmxConnector.close();
} catch (IOException | InstanceNotFoundException | ListenerNotFoundException e) {
throw new ReaperException(e);
} catch (IOException e) {
LOG.warn("failed closing a JMX connection");
e.printStackTrace();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ClusterResource(AppContext context) {

@GET
public Response getClusterList() {
LOG.info("get cluster list called");
LOG.debug("get cluster list called");
Collection<Cluster> clusters = context.storage.getClusters();
List<String> clusterNames = new ArrayList<>();
for (Cluster cluster : clusters) {
Expand All @@ -73,7 +73,7 @@ public Response getClusterList() {
public Response getCluster(
@PathParam("cluster_name") String clusterName,
@QueryParam("limit") Optional<Integer> limit) {
LOG.info("get cluster called with cluster_name: {}", clusterName);
LOG.debug("get cluster called with cluster_name: {}", clusterName);
return viewCluster(clusterName, limit, Optional.<URI>absent());
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public Response addCluster(
LOG.error("POST on cluster resource called without seedHost");
return Response.status(400).entity("query parameter \"seedHost\" required").build();
}
LOG.info("add cluster called with seedHost: {}", seedHost.get());
LOG.debug("add cluster called with seedHost: {}", seedHost.get());

Cluster newCluster;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class PingResource {

@GET
public String answerPing() {
LOG.info("ping called");
LOG.debug("ping called");
return String.format("Cassandra Reaper ping resource: PONG");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private Response abortRun(RepairRun repairRun, RepairUnit repairUnit, int segmen
@GET
@Path("/{id}")
public Response getRepairRun(@PathParam("id") Long repairRunId) {
LOG.info("get repair_run called with: id = {}", repairRunId);
LOG.debug("get repair_run called with: id = {}", repairRunId);
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
return Response.ok().entity(getRepairRunStatus(repairRun.get())).build();
Expand All @@ -343,7 +343,7 @@ public Response getRepairRun(@PathParam("id") Long repairRunId) {
@GET
@Path("/cluster/{cluster_name}")
public Response getRepairRunsForCluster(@PathParam("cluster_name") String clusterName) {
LOG.info("get repair run for cluster called with: cluster_name = {}", clusterName);
LOG.debug("get repair run for cluster called with: cluster_name = {}", clusterName);
Collection<RepairRun> repairRuns = context.storage.getRepairRunsForCluster(clusterName);
Collection<RepairRunStatus> repairRunViews = new ArrayList<>();
for (RepairRun repairRun : repairRuns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private Response resumeSchedule(RepairSchedule repairSchedule, RepairUnit repair
@GET
@Path("/{id}")
public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) {
LOG.info("get repair_schedule called with: id = {}", repairScheduleId);
LOG.debug("get repair_schedule called with: id = {}", repairScheduleId);
Optional<RepairSchedule> repairSchedule = context.storage.getRepairSchedule(repairScheduleId);
if (repairSchedule.isPresent()) {
return Response.ok().entity(getRepairScheduleStatus(repairSchedule.get())).build();
Expand All @@ -275,7 +275,7 @@ public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) {
@GET
@Path("/cluster/{cluster_name}")
public Response getRepairSchedulesForCluster(@PathParam("cluster_name") String clusterName) {
LOG.info("get repair schedules for cluster called with: cluster_name = {}", clusterName);
LOG.debug("get repair schedules for cluster called with: cluster_name = {}", clusterName);
Collection<RepairSchedule> repairSchedules =
context.storage.getRepairSchedulesForCluster(clusterName);
Collection<RepairScheduleStatus> repairScheduleViews = new ArrayList<>();
Expand Down Expand Up @@ -319,6 +319,7 @@ private URI buildRepairScheduleURI(UriInfo uriInfo, RepairSchedule repairSchedul
*/
@GET
public Response listSchedules() {
LOG.debug("list all repair schedules called");
List<RepairScheduleStatus> scheduleStatuses = Lists.newArrayList();
Collection<RepairSchedule> schedules = context.storage.getAllRepairSchedules();
for (RepairSchedule schedule : schedules) {
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/service/RepairManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;

Expand Down Expand Up @@ -59,9 +60,9 @@ public void resumeRunningRepairRuns(AppContext context) {
Collection<RepairSegment> runningSegments =
context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING);
for (RepairSegment segment : runningSegments) {
try {
SegmentRunner.abort(context, segment,
context.jmxConnectionFactory.connect(segment.getCoordinatorHost()));
try (JmxProxy jmxProxy = context.jmxConnectionFactory
.connect(segment.getCoordinatorHost())) {
SegmentRunner.abort(context, segment, jmxProxy);
} catch (ReaperException e) {
LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down"
+ " (so abortion won't be needed)", segment.getId());
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.net.SocketException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -353,8 +352,7 @@ protected void tryClearSnapshots(String message) {
String repairId = parseRepairId(message);
if (repairId != null) {
for (String involvedNode : potentialCoordinators) {
try {
JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode);
try (JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode)) {
// there is no way of telling if the snapshot was cleared or not :(
jmx.clearSnapshot(repairId, keyspace);
} catch (ReaperException e) {
Expand Down

0 comments on commit 4cd8861

Please sign in to comment.