From 4cd886136607082d7a9efbd6081d12821dd947a1 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Wed, 26 Aug 2015 14:28:27 +0200 Subject: [PATCH] fix not closing jmx connector sometimes --- .../spotify/reaper/cassandra/JmxProxy.java | 75 +++++++++++-------- .../reaper/resources/ClusterResource.java | 6 +- .../reaper/resources/PingResource.java | 2 +- .../reaper/resources/RepairRunResource.java | 4 +- .../resources/RepairScheduleResource.java | 5 +- .../spotify/reaper/service/RepairManager.java | 7 +- .../spotify/reaper/service/SegmentRunner.java | 4 +- 7 files changed, 58 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index 8d9923399..a599ec5b9 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -16,6 +16,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Lists; + import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.service.RingRange; @@ -73,12 +74,14 @@ public class JmxProxy implements NotificationListener, AutoCloseable { private final StorageServiceMBean ssProxy; private final Optional repairStatusHandler; private final String host; + private final JMXServiceURL jmxUrl; private final String clusterName; - private JmxProxy(Optional handler, String host, JMXConnector jmxConnector, - StorageServiceMBean ssProxy, ObjectName ssMbeanName, - MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) { + private JmxProxy(Optional handler, String host, JMXServiceURL jmxUrl, + JMXConnector jmxConnector, StorageServiceMBean ssProxy, ObjectName ssMbeanName, + MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) { this.host = host; + this.jmxUrl = jmxUrl; this.jmxConnector = jmxConnector; this.ssMbeanName = ssMbeanName; this.mbeanServer = mbeanServer; @@ -88,11 +91,12 @@ private JmxProxy(Optional handler, String host, JMXConnecto this.clusterName = Cluster.toSymbolicName(ssProxy.getClusterName()); } - + /** * @see JmxProxy#connect(Optional, String, int, String, String) */ - static JmxProxy connect(Optional handler, String host, String username, String password) + static JmxProxy connect(Optional handler, String host, String username, + String password) throws ReaperException { assert null != host : "null host given to JmxProxy.connect()"; String[] parts = host.split(":"); @@ -103,22 +107,24 @@ static JmxProxy connect(Optional handler, String host, Stri } } - + /** * Connect to JMX interface on the given host and port. * - * @param handler Implementation of {@link RepairStatusHandler} to process incoming notifications - * of repair events. - * @param host hostname or ip address of Cassandra node - * @param port port number to use for JMX connection + * @param handler Implementation of {@link RepairStatusHandler} to process incoming + * notifications + * of repair events. + * @param host hostname or ip address of Cassandra node + * @param port port number to use for JMX connection * @param username username to use for JMX authentication * @param password password to use for JMX authentication */ - static JmxProxy connect(Optional handler, String host, int port, String username, String password) + static JmxProxy connect(Optional handler, String host, int port, + String username, String password) throws ReaperException { - JMXServiceURL jmxUrl; ObjectName ssMbeanName; ObjectName cmMbeanName; + JMXServiceURL jmxUrl; try { jmxUrl = new JMXServiceURL(String.format(JMX_URL, host, port)); ssMbeanName = new ObjectName(SS_OBJECT_NAME); @@ -129,7 +135,7 @@ static JmxProxy connect(Optional handler, String host, int } try { Map env = new HashMap(); - if(username != null && password != null) { + if (username != null && password != null) { String[] creds = {username, password}; env.put(JMXConnector.CREDENTIALS, creds); } @@ -139,12 +145,13 @@ static JmxProxy connect(Optional handler, String host, int JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class); CompactionManagerMBean cmProxy = JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class); - JmxProxy proxy = - new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy); + JmxProxy proxy = new JmxProxy(handler, host, jmxUrl, jmxConn, ssProxy, ssMbeanName, + mbeanServerConn, cmProxy); // registering a listener throws bunch of exceptions, so we do it here rather than in the // constructor mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null); - LOG.debug(String.format("JMX connection to %s properly connected.", host)); + LOG.debug(String.format("JMX connection to %s properly connected: %s", + host, jmxUrl.toString())); return proxy; } catch (IOException | InstanceNotFoundException e) { LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port)); @@ -293,13 +300,12 @@ public void cancelAllRepairs() { /** * Checks if table exists in the cluster by instantiating a MBean for that table. - * */ public boolean tableExists(String ks, String cf) { try { String type = cf.contains(".") ? "IndexColumnFamilies" : "ColumnFamilies"; String nameStr = String.format("org.apache.cassandra.db:type=*%s,keyspace=%s,columnfamily=%s", - type, ks, cf); + type, ks, cf); Set beans = mbeanServer.queryNames(new ObjectName(nameStr), null); if (beans.isEmpty() || beans.size() != 1) { return false; @@ -308,7 +314,7 @@ public boolean tableExists(String ks, String cf) { JMX.newMBeanProxy(mbeanServer, bean, ColumnFamilyStoreMBean.class); } catch (MalformedObjectNameException | IOException e) { String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf, - e.getMessage()); + e.getMessage()); LOG.warn(errMsg); return false; } @@ -323,7 +329,7 @@ public boolean tableExists(String ks, String cf) { * @return Repair command number, or 0 if nothing to repair */ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace, - RepairParallelism repairParallelism, Collection columnFamilies) { + RepairParallelism repairParallelism, Collection columnFamilies) { checkNotNull(ssProxy, "Looks like the proxy is not connected"); String cassandraVersion = ssProxy.getReleaseVersion(); boolean canUseDatacenterAware = false; @@ -336,27 +342,27 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys + "host %s, with repair parallelism %s, in cluster with Cassandra " + "version '%s' (can use DATACENTER_AWARE '%s'), " + "for column families: %s", - beginToken.toString(), endToken.toString(), keyspace, this.host, - repairParallelism, cassandraVersion, canUseDatacenterAware, - columnFamilies); + beginToken.toString(), endToken.toString(), keyspace, this.host, + repairParallelism, cassandraVersion, canUseDatacenterAware, + columnFamilies); LOG.info(msg); if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) { if (canUseDatacenterAware) { return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace, - repairParallelism.ordinal(), null, null, - columnFamilies - .toArray(new String[columnFamilies.size()])); + repairParallelism.ordinal(), null, null, + columnFamilies + .toArray(new String[columnFamilies.size()])); } else { LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {}," + " falling back to SEQUENTIAL repair.", - cassandraVersion); + cassandraVersion); repairParallelism = RepairParallelism.SEQUENTIAL; } } boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL); return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace, - snapshotRepair, false, - columnFamilies.toArray(new String[columnFamilies.size()])); + snapshotRepair, false, + columnFamilies.toArray(new String[columnFamilies.size()])); } /** @@ -404,11 +410,18 @@ public boolean isConnectionAlive() { */ @Override public void close() throws ReaperException { + LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl)); try { mbeanServer.removeNotificationListener(ssMbeanName, this); + } catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) { + LOG.warn("failed on removing notification listener"); + e.printStackTrace(); + } + try { jmxConnector.close(); - } catch (IOException | InstanceNotFoundException | ListenerNotFoundException e) { - throw new ReaperException(e); + } catch (IOException e) { + LOG.warn("failed closing a JMX connection"); + e.printStackTrace(); } } diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 6bed13766..2ee20f752 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -59,7 +59,7 @@ public ClusterResource(AppContext context) { @GET public Response getClusterList() { - LOG.info("get cluster list called"); + LOG.debug("get cluster list called"); Collection clusters = context.storage.getClusters(); List clusterNames = new ArrayList<>(); for (Cluster cluster : clusters) { @@ -73,7 +73,7 @@ public Response getClusterList() { public Response getCluster( @PathParam("cluster_name") String clusterName, @QueryParam("limit") Optional limit) { - LOG.info("get cluster called with cluster_name: {}", clusterName); + LOG.debug("get cluster called with cluster_name: {}", clusterName); return viewCluster(clusterName, limit, Optional.absent()); } @@ -107,7 +107,7 @@ public Response addCluster( LOG.error("POST on cluster resource called without seedHost"); return Response.status(400).entity("query parameter \"seedHost\" required").build(); } - LOG.info("add cluster called with seedHost: {}", seedHost.get()); + LOG.debug("add cluster called with seedHost: {}", seedHost.get()); Cluster newCluster; try { diff --git a/src/main/java/com/spotify/reaper/resources/PingResource.java b/src/main/java/com/spotify/reaper/resources/PingResource.java index 604376c6f..025d3bf9b 100644 --- a/src/main/java/com/spotify/reaper/resources/PingResource.java +++ b/src/main/java/com/spotify/reaper/resources/PingResource.java @@ -29,7 +29,7 @@ public class PingResource { @GET public String answerPing() { - LOG.info("ping called"); + LOG.debug("ping called"); return String.format("Cassandra Reaper ping resource: PONG"); } diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 18fd71bab..32f3ae803 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -327,7 +327,7 @@ private Response abortRun(RepairRun repairRun, RepairUnit repairUnit, int segmen @GET @Path("/{id}") public Response getRepairRun(@PathParam("id") Long repairRunId) { - LOG.info("get repair_run called with: id = {}", repairRunId); + LOG.debug("get repair_run called with: id = {}", repairRunId); Optional repairRun = context.storage.getRepairRun(repairRunId); if (repairRun.isPresent()) { return Response.ok().entity(getRepairRunStatus(repairRun.get())).build(); @@ -343,7 +343,7 @@ public Response getRepairRun(@PathParam("id") Long repairRunId) { @GET @Path("/cluster/{cluster_name}") public Response getRepairRunsForCluster(@PathParam("cluster_name") String clusterName) { - LOG.info("get repair run for cluster called with: cluster_name = {}", clusterName); + LOG.debug("get repair run for cluster called with: cluster_name = {}", clusterName); Collection repairRuns = context.storage.getRepairRunsForCluster(clusterName); Collection repairRunViews = new ArrayList<>(); for (RepairRun repairRun : repairRuns) { diff --git a/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java b/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java index 1ab8304f7..1d09b18b4 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java @@ -259,7 +259,7 @@ private Response resumeSchedule(RepairSchedule repairSchedule, RepairUnit repair @GET @Path("/{id}") public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) { - LOG.info("get repair_schedule called with: id = {}", repairScheduleId); + LOG.debug("get repair_schedule called with: id = {}", repairScheduleId); Optional repairSchedule = context.storage.getRepairSchedule(repairScheduleId); if (repairSchedule.isPresent()) { return Response.ok().entity(getRepairScheduleStatus(repairSchedule.get())).build(); @@ -275,7 +275,7 @@ public Response getRepairSchedule(@PathParam("id") Long repairScheduleId) { @GET @Path("/cluster/{cluster_name}") public Response getRepairSchedulesForCluster(@PathParam("cluster_name") String clusterName) { - LOG.info("get repair schedules for cluster called with: cluster_name = {}", clusterName); + LOG.debug("get repair schedules for cluster called with: cluster_name = {}", clusterName); Collection repairSchedules = context.storage.getRepairSchedulesForCluster(clusterName); Collection repairScheduleViews = new ArrayList<>(); @@ -319,6 +319,7 @@ private URI buildRepairScheduleURI(UriInfo uriInfo, RepairSchedule repairSchedul */ @GET public Response listSchedules() { + LOG.debug("list all repair schedules called"); List scheduleStatuses = Lists.newArrayList(); Collection schedules = context.storage.getAllRepairSchedules(); for (RepairSchedule schedule : schedules) { diff --git a/src/main/java/com/spotify/reaper/service/RepairManager.java b/src/main/java/com/spotify/reaper/service/RepairManager.java index b6efea698..abfa50d88 100644 --- a/src/main/java/com/spotify/reaper/service/RepairManager.java +++ b/src/main/java/com/spotify/reaper/service/RepairManager.java @@ -8,6 +8,7 @@ import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; @@ -59,9 +60,9 @@ public void resumeRunningRepairRuns(AppContext context) { Collection runningSegments = context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING); for (RepairSegment segment : runningSegments) { - try { - SegmentRunner.abort(context, segment, - context.jmxConnectionFactory.connect(segment.getCoordinatorHost())); + try (JmxProxy jmxProxy = context.jmxConnectionFactory + .connect(segment.getCoordinatorHost())) { + SegmentRunner.abort(context, segment, jmxProxy); } catch (ReaperException e) { LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down" + " (so abortion won't be needed)", segment.getId()); diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 7ed9b6785..7bef1b3a5 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -36,7 +36,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; -import java.net.SocketException; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -353,8 +352,7 @@ protected void tryClearSnapshots(String message) { String repairId = parseRepairId(message); if (repairId != null) { for (String involvedNode : potentialCoordinators) { - try { - JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode); + try (JmxProxy jmx = new JmxConnectionFactory().connect(involvedNode)) { // there is no way of telling if the snapshot was cleared or not :( jmx.clearSnapshot(repairId, keyspace); } catch (ReaperException e) {