Skip to content

Commit

Permalink
JmxProxy class responsible for communication between Reaper and Cassa…
Browse files Browse the repository at this point in the history
…ndra

JmxProxy can issue commands to Cassandra, as well as it subscribes to
events generated by StorageService mbean, from which it filters only
the repair-specific ones. Handling of repair events is done via the
RepairStatusHanlder interface implementation of which must be
provided when connecting the JmxProxy.

Change-Id: I607557b4f1d9c4dd6133d5622d42610c5071b5f6
  • Loading branch information
Radovan Zvoncek committed Dec 2, 2014
1 parent 0c50b4e commit 3296e8e
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 106 deletions.
30 changes: 0 additions & 30 deletions src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java

This file was deleted.

9 changes: 0 additions & 9 deletions src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java

This file was deleted.

67 changes: 0 additions & 67 deletions src/main/java/com/spotify/reaper/cassandra/JMXProxy.java

This file was deleted.

176 changes: 176 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.spotify.reaper.cassandra;

import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.util.List;

import javax.management.InstanceNotFoundException;
import javax.management.JMX;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import static com.google.common.base.Preconditions.checkNotNull;

public class JmxProxy implements NotificationListener, Serializable {

private static final Logger LOG = LoggerFactory.getLogger(JmxProxy.class);

private static final int JMX_PORT = 7199;
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
private static final String JMX_OBJECT_NAME = "org.apache.cassandra.db:type=StorageService";

private final JMXConnector jmxc;
private final ObjectName mbeanName;
private final MBeanServerConnection mbenServer;
private final StorageServiceMBean ssProxy;
private final RepairStatusHandlder repairStatusHandlder;
private final String host;

private JmxProxy(RepairStatusHandlder handler, String host, JMXConnector jmxc,
StorageServiceMBean ssProxy, ObjectName mbeanName, MBeanServerConnection mbenServer) {
this.host = host;
this.jmxc = jmxc;
this.mbeanName = mbeanName;
this.mbenServer = mbenServer;
this.ssProxy = ssProxy;
this.repairStatusHandlder = handler;
}

/**
* Connect to JMX interface on the given host and default JMX port.
* @param handler Implementation of {@link RepairStatusHandlder} to process
* incoming notifications of repair events.
*/
public static JmxProxy connect(RepairStatusHandlder handler, String host) throws ReaperException {
return connect(handler, host, JMX_PORT);
}

/**
* Connect to JMX interface on the given host and port.
* @param handler Implementation of {@link RepairStatusHandlder} to process
* incoming notifications of repair events.
*/
public static JmxProxy connect(RepairStatusHandlder handler, String host, int port)
throws ReaperException {
JMXServiceURL jmxUrl;
ObjectName mbeanName;
try {
jmxUrl = new JMXServiceURL(String.format(JMX_URL, host, port));
mbeanName = new ObjectName(JMX_OBJECT_NAME);
} catch (MalformedURLException | MalformedObjectNameException e) {
LOG.error("Failed to prepare the JMX connection");
throw new ReaperException("Failure during preparations for JMX connection", e);
}
try {
JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl);
MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection();
StorageServiceMBean
ssProxy = JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class);
JmxProxy proxy = new JmxProxy(handler, host, jmxc, ssProxy, mbeanName, mbeanServerConn);
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(mbeanName, proxy, null, null);
LOG.info(String.format("JMX connection to %s properly connected.", host));
return proxy;
} catch (IOException | InstanceNotFoundException e) {
LOG.error("Failed to establish JMX connection");
throw new ReaperException("Failure when establishing JMX connection", e);
}
}

/**
* @return list of tokens in the cluster
*/
public List<String> getTokens() {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet());
}

/**
* @return full class name of Cassandra's partitioner.
*/
public String getPartitioner() {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
return ssProxy.getPartitionerName();
}

/**
* Triggers a repair of range (beginToken, endToken] for given keyspace and column family.
*
* The repair is triggered by
* {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairRangeAsync}
* For time being, we don't allow local nor snapshot repairs.
*
* @return repair sequence number
*/
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
String columnFamily) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
LOG.info(String.format("Triggering repair of range (%s,%s] for %s.%s via host %s",
beginToken.toString(), endToken.toString(), keyspace, columnFamily, this.host));
return ssProxy.forceRepairRangeAsync(
beginToken.toString(),
endToken.toString(),
keyspace,
false, // isSequential - if true, do "snapshot repairs"
false, // isLocal - if false, repair all DCs
columnFamily);
}

/**
* Invoked when the MBean this class listens to publishes an event.
*
* We're only interested in repair-related events. Their format is explained at
* {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync}
* The format is:
* notification type: "repair"
* notification userData: int array of length 2 where
* [0] = command number
* [1] = ordinal of AntiEntropyService.Status
*/
@Override
public void handleNotification(Notification notification, Object handback) {
// we're interested in "repair"
String type = notification.getType();
LOG.debug(String.format("Received notification: %s", notification.toString()));
if (type.equals("repair")) {
int[] data = (int[]) notification.getUserData();
// get the repair sequence number
int repairNo = data[0];
// get the repair status
ActiveRepairService.Status status = ActiveRepairService.Status.values()[data[1]];
// this is some text message like "Starting repair...", "Finished repair...", etc.
String message = notification.getMessage();
// let the handler process the event
repairStatusHandlder.handle(repairNo, status, message);
}
}

/**
* Cleanly shut down by un-registering the listener and closing the JMX connection.
*/
public void close() throws ReaperException {
try {
mbenServer.removeNotificationListener(mbeanName, this);
jmxc.close();
} catch (IOException | InstanceNotFoundException | ListenerNotFoundException e) {
throw new ReaperException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.spotify.reaper.cassandra;

import org.apache.cassandra.service.ActiveRepairService;

public interface RepairStatusHandlder {

/**
* Handle an event representing a change in the state of a running repair.
*
* Implementation of this method is intended to persist the repair state change in Reaper's
* state.
* @param repairNumber repair sequence number, obtained when triggering a repair
* @param status new status of the repair
* @param message additional information about the repair
*/
public void handle(int repairNumber, ActiveRepairService.Status status, String message);

}

0 comments on commit 3296e8e

Please sign in to comment.