diff --git a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java deleted file mode 100644 index 6737e3bca..000000000 --- a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.spotify.reaper.cassandra; - -import com.spotify.reaper.ReaperException; - -import java.util.List; - -public class ClusterInfo implements IClusterInfo { - - private String seedHost; - private int seedPort = 0; - - private List tokens; - - public ClusterInfo(String seedHost, int seedPort) { - this.seedHost = seedHost; - this.seedPort = seedPort; - } - - public void init() throws ReaperException { - JMXProxy jmx = seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort); - tokens = jmx.getTokens(); - jmx.close(); - } - - @Override - public List getTokens() { - return tokens; - } - -} diff --git a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java deleted file mode 100644 index 192b53c48..000000000 --- a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.spotify.reaper.cassandra; - -import java.util.List; - -public interface IClusterInfo { - - public List getTokens(); - -} diff --git a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java b/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java deleted file mode 100644 index 259b0674e..000000000 --- a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.spotify.reaper.cassandra; - -import com.google.common.collect.Lists; -import com.spotify.reaper.ReaperException; -import org.apache.cassandra.service.StorageServiceMBean; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.List; - -import javax.management.JMX; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - -public class JMXProxy { - - private static final int DEFAULT_JMX_PORT = 7199; - - private JMXConnector jmxc = null; - private StorageServiceMBean ssProxy; - - public JMXProxy(JMXConnector jmxc, StorageServiceMBean ssProxy) { - this.jmxc = jmxc; - this.ssProxy = ssProxy; - } - - public static JMXProxy connect(String host) throws ReaperException { - return connect(host, DEFAULT_JMX_PORT); - } - - public static JMXProxy connect(String host, int port) throws ReaperException { - JMXServiceURL jmxUrl; - ObjectName name; - try { - jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", - host, port)); - name = new ObjectName("org.apache.cassandra.db:type=StorageService"); - } catch (MalformedURLException | MalformedObjectNameException e) { - throw new ReaperException("Failure during preparations for JMX connection", e); - } - try { - JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl); - MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection(); - StorageServiceMBean - ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); - return new JMXProxy(jmxc, ssProxy); - } catch (IOException e) { - throw new ReaperException("Failure when establishing JMX connection", e); - } - } - - public List getTokens() { - return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet()); - } - - public void close() throws ReaperException { - try { - jmxc.close(); - } catch (IOException e) { - throw new ReaperException(e); - } - } -} diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java new file mode 100644 index 000000000..da526419a --- /dev/null +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -0,0 +1,176 @@ +package com.spotify.reaper.cassandra; + +import com.google.common.collect.Lists; +import com.spotify.reaper.ReaperException; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageServiceMBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.util.List; + +import javax.management.InstanceNotFoundException; +import javax.management.JMX; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class JmxProxy implements NotificationListener, Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(JmxProxy.class); + + private static final int JMX_PORT = 7199; + private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi"; + private static final String JMX_OBJECT_NAME = "org.apache.cassandra.db:type=StorageService"; + + private final JMXConnector jmxc; + private final ObjectName mbeanName; + private final MBeanServerConnection mbenServer; + private final StorageServiceMBean ssProxy; + private final RepairStatusHandlder repairStatusHandlder; + private final String host; + + private JmxProxy(RepairStatusHandlder handler, String host, JMXConnector jmxc, + StorageServiceMBean ssProxy, ObjectName mbeanName, MBeanServerConnection mbenServer) { + this.host = host; + this.jmxc = jmxc; + this.mbeanName = mbeanName; + this.mbenServer = mbenServer; + this.ssProxy = ssProxy; + this.repairStatusHandlder = handler; + } + + /** + * Connect to JMX interface on the given host and default JMX port. + * @param handler Implementation of {@link RepairStatusHandlder} to process + * incoming notifications of repair events. + */ + public static JmxProxy connect(RepairStatusHandlder handler, String host) throws ReaperException { + return connect(handler, host, JMX_PORT); + } + + /** + * Connect to JMX interface on the given host and port. + * @param handler Implementation of {@link RepairStatusHandlder} to process + * incoming notifications of repair events. + */ + public static JmxProxy connect(RepairStatusHandlder handler, String host, int port) + throws ReaperException { + JMXServiceURL jmxUrl; + ObjectName mbeanName; + try { + jmxUrl = new JMXServiceURL(String.format(JMX_URL, host, port)); + mbeanName = new ObjectName(JMX_OBJECT_NAME); + } catch (MalformedURLException | MalformedObjectNameException e) { + LOG.error("Failed to prepare the JMX connection"); + throw new ReaperException("Failure during preparations for JMX connection", e); + } + try { + JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl); + MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection(); + StorageServiceMBean + ssProxy = JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class); + JmxProxy proxy = new JmxProxy(handler, host, jmxc, ssProxy, mbeanName, mbeanServerConn); + // registering a listener throws bunch of exceptions, so we do it here rather than in the + // constructor + mbeanServerConn.addNotificationListener(mbeanName, proxy, null, null); + LOG.info(String.format("JMX connection to %s properly connected.", host)); + return proxy; + } catch (IOException | InstanceNotFoundException e) { + LOG.error("Failed to establish JMX connection"); + throw new ReaperException("Failure when establishing JMX connection", e); + } + } + + /** + * @return list of tokens in the cluster + */ + public List getTokens() { + checkNotNull(ssProxy, "Looks like the proxy is not connected"); + return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet()); + } + + /** + * @return full class name of Cassandra's partitioner. + */ + public String getPartitioner() { + checkNotNull(ssProxy, "Looks like the proxy is not connected"); + return ssProxy.getPartitionerName(); + } + + /** + * Triggers a repair of range (beginToken, endToken] for given keyspace and column family. + * + * The repair is triggered by + * {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairRangeAsync} + * For time being, we don't allow local nor snapshot repairs. + * + * @return repair sequence number + */ + public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace, + String columnFamily) { + checkNotNull(ssProxy, "Looks like the proxy is not connected"); + LOG.info(String.format("Triggering repair of range (%s,%s] for %s.%s via host %s", + beginToken.toString(), endToken.toString(), keyspace, columnFamily, this.host)); + return ssProxy.forceRepairRangeAsync( + beginToken.toString(), + endToken.toString(), + keyspace, + false, // isSequential - if true, do "snapshot repairs" + false, // isLocal - if false, repair all DCs + columnFamily); + } + + /** + * Invoked when the MBean this class listens to publishes an event. + * + * We're only interested in repair-related events. Their format is explained at + * {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync} + * The format is: + * notification type: "repair" + * notification userData: int array of length 2 where + * [0] = command number + * [1] = ordinal of AntiEntropyService.Status + */ + @Override + public void handleNotification(Notification notification, Object handback) { + // we're interested in "repair" + String type = notification.getType(); + LOG.debug(String.format("Received notification: %s", notification.toString())); + if (type.equals("repair")) { + int[] data = (int[]) notification.getUserData(); + // get the repair sequence number + int repairNo = data[0]; + // get the repair status + ActiveRepairService.Status status = ActiveRepairService.Status.values()[data[1]]; + // this is some text message like "Starting repair...", "Finished repair...", etc. + String message = notification.getMessage(); + // let the handler process the event + repairStatusHandlder.handle(repairNo, status, message); + } + } + + /** + * Cleanly shut down by un-registering the listener and closing the JMX connection. + */ + public void close() throws ReaperException { + try { + mbenServer.removeNotificationListener(mbeanName, this); + jmxc.close(); + } catch (IOException | InstanceNotFoundException | ListenerNotFoundException e) { + throw new ReaperException(e); + } + } +} diff --git a/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandlder.java b/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandlder.java new file mode 100644 index 000000000..2b621d29a --- /dev/null +++ b/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandlder.java @@ -0,0 +1,18 @@ +package com.spotify.reaper.cassandra; + +import org.apache.cassandra.service.ActiveRepairService; + +public interface RepairStatusHandlder { + + /** + * Handle an event representing a change in the state of a running repair. + * + * Implementation of this method is intended to persist the repair state change in Reaper's + * state. + * @param repairNumber repair sequence number, obtained when triggering a repair + * @param status new status of the repair + * @param message additional information about the repair + */ + public void handle(int repairNumber, ActiveRepairService.Status status, String message); + +}