Skip to content

Commit

Permalink
Check for already running anti-entropy sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Jan 23, 2015
1 parent 0da9b92 commit 2f7bbac
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
31 changes: 29 additions & 2 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class JmxProxy implements NotificationListener, AutoCloseable {
private static final int JMX_PORT = 7199;
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
private static final String SS_OBJECT_NAME = "org.apache.cassandra.db:type=StorageService";
private static final String AES_OBJECT_NAME =
"org.apache.cassandra.internal:type=AntiEntropySessions";

private final JMXConnector jmxConnector;
private final ObjectName ssMbeanName;
Expand Down Expand Up @@ -108,8 +110,8 @@ public static JmxProxy connect(Optional<RepairStatusHandler> handler, String hos
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
CompactionManagerMBean cmProxy =
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName,
mbeanServerConn, cmProxy);
JmxProxy proxy =
new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy);
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
Expand Down Expand Up @@ -214,6 +216,31 @@ public int getPendingCompactions() {
return cmProxy.getPendingTasks();
}

/**
* @return true if any compactions are running on the node.
*/
public boolean isRepairRunning() {
// Check if AntiEntropySession is actually running on the node
try {
ObjectName name = new ObjectName(AES_OBJECT_NAME);
int activeCount = (Integer) mbeanServer.getAttribute(name, "ActiveCount");
long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks");
return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX");
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name");
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
return false;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
}
// If uncertain, assume it's running
return true;
}

/**
* Terminates all ongoing repairs on the node this proxy is connected to
*/
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
host.getHost());
return false;
}
if (host.isRepairRunning()) {
LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) was "
+ "already involved in a repair", segmentId, host.getHost());
return false;
}
}
return true;
}
Expand Down

0 comments on commit 2f7bbac

Please sign in to comment.