diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index b0dc8f4eb..f53ee9d24 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -45,6 +45,8 @@ public class JmxProxy implements NotificationListener, AutoCloseable { private static final int JMX_PORT = 7199; private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi"; private static final String SS_OBJECT_NAME = "org.apache.cassandra.db:type=StorageService"; + private static final String AES_OBJECT_NAME = + "org.apache.cassandra.internal:type=AntiEntropySessions"; private final JMXConnector jmxConnector; private final ObjectName ssMbeanName; @@ -108,8 +110,8 @@ public static JmxProxy connect(Optional handler, String hos JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class); CompactionManagerMBean cmProxy = JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class); - JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, - mbeanServerConn, cmProxy); + JmxProxy proxy = + new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy); // registering a listener throws bunch of exceptions, so we do it here rather than in the // constructor mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null); @@ -214,6 +216,31 @@ public int getPendingCompactions() { return cmProxy.getPendingTasks(); } + /** + * @return true if any compactions are running on the node. + */ + public boolean isRepairRunning() { + // Check if AntiEntropySession is actually running on the node + try { + ObjectName name = new ObjectName(AES_OBJECT_NAME); + int activeCount = (Integer) mbeanServer.getAttribute(name, "ActiveCount"); + long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks"); + return activeCount + pendingCount != 0; + } catch (IOException ignored) { + LOG.warn("Failed to connect to " + host + " using JMX"); + } catch (MalformedObjectNameException ignored) { + LOG.error("Internal error, malformed name"); + } catch (InstanceNotFoundException e) { + // This happens if no repair has yet been run on the node + // The AntiEntropySessions object is created on the first repair + return false; + } catch (Exception e) { + LOG.error("Error getting attribute from JMX", e); + } + // If uncertain, assume it's running + return true; + } + /** * Terminates all ongoing repairs on the node this proxy is connected to */ diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 41638ad79..86a8cbe2c 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -130,6 +130,11 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, host.getHost()); return false; } + if (host.isRepairRunning()) { + LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) was " + + "already involved in a repair", segmentId, host.getHost()); + return false; + } } return true; }