getSimpleStates() {
return ((FailureDetectorMBean) fdProxy).getSimpleStates();
}
-
/**
- * Invoked when the MBean this class listens to publishes an event.
- * We're only interested in repair-related events.
- * Their format is explained at {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync}
- * The format is: notification type: "repair" notification userData: int array of length 2 where
- * [0] = command number [1] = ordinal of AntiEntropyService.Status
+ * Invoked when the MBean this class listens to publishes an event. We're only interested in repair-related events.
+ * Their format is explained at {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync} The forma
+ * is: notification type: "repair" notification userData: int array of length 2 where [0] = command number [1] =
+ * ordinal of AntiEntropyService.Status
*/
@Override
public void handleNotification(Notification notification, Object handback) {
Thread.currentThread().setName(clusterName);
// we're interested in "repair"
String type = notification.getType();
- LOG.debug("Received notification: {} with type {} and repairStatusHandler {}", notification, type, repairStatusHandler);
+ LOG.debug(
+ "Received notification: {} with type {} and repairStatusHandler {}", notification, type, repairStatusHandler);
if (repairStatusHandler.isPresent() && ("repair").equals(type)) {
processOldApiNotification(notification);
}
@@ -717,10 +801,9 @@ private void processOldApiNotification(Notification notification) {
ActiveRepairService.Status status = ActiveRepairService.Status.values()[data[1]];
// this is some text message like "Starting repair...", "Finished repair...", etc.
String message = notification.getMessage();
- // let the handler process the event
+ // let the handler process the even
repairStatusHandler.get().handle(repairNo, Optional.of(status), Optional.absent(), message);
- } catch (Exception e) {
- // TODO Auto-generated catch block
+ } catch (RuntimeException e) {
LOG.error("Error while processing JMX notification", e);
}
}
@@ -737,10 +820,9 @@ private void processNewApiNotification(Notification notification) {
ProgressEventType progress = ProgressEventType.values()[data.get("type")];
// this is some text message like "Starting repair...", "Finished repair...", etc.
String message = notification.getMessage();
- // let the handler process the event
+ // let the handler process the even
repairStatusHandler.get().handle(repairNo, Optional.absent(), Optional.of(progress), message);
- } catch (Exception e) {
- // TODO Auto-generated catch block
+ } catch (RuntimeException e) {
LOG.error("Error while processing JMX notification", e);
}
}
@@ -765,7 +847,7 @@ public boolean isConnectionAlive() {
@Override
public void close() throws ReaperException {
LOG.debug("close JMX connection to '{}': {}", host, jmxUrl);
- if(this.repairStatusHandler.isPresent()){
+ if (this.repairStatusHandler.isPresent()) {
try {
mbeanServer.removeNotificationListener(ssMbeanName, this);
LOG.debug("Successfully removed notification listener for '{}': {}", host, jmxUrl);
@@ -784,17 +866,18 @@ public void close() throws ReaperException {
* NOTICE: This code is loosely based on StackOverflow answer:
* http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
*
+ *
* Compares two version strings.
*
- * Use this instead of String.compareTo() for a non-lexicographical
- * comparison that works for version strings. e.g. "1.10".compareTo("1.6").
+ *
+ * Use this instead of String.compareTo() for a non-lexicographical comparison that works for version strings. e.g.
+ * "1.10".compareTo("1.6").
*
* @param str1 a string of ordinal numbers separated by decimal points.
* @param str2 a string of ordinal numbers separated by decimal points.
- * @return The result is a negative integer if str1 is _numerically_ less than str2.
- * The result is a positive integer if str1 is _numerically_ greater than str2.
- * The result is zero if the strings are _numerically_ equal.
- * It does not work if "1.10" is supposed to be equal to "1.10.0".
+ * @return The result is a negative integer if str1 is _numerically_ less than str2. The result is a positive integer
+ * if str1 is _numerically_ greater than str2. The result is zero if the strings are _numerically_ equal. It does
+ * not work if "1.10" is supposed to be equal to "1.10.0".
*/
public static Integer versionCompare(String str1, String str2) throws ReaperException {
try {
@@ -802,37 +885,36 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce
String cleanedUpStr2 = str2.split(" ")[0].replaceAll("[-_~]", ".");
String[] parts1 = cleanedUpStr1.split("\\.");
String[] parts2 = cleanedUpStr2.split("\\.");
- int i = 0;
+ int idx = 0;
// set index to first non-equal ordinal or length of shortest version string
- while (i < parts1.length && i < parts2.length) {
+ while (idx < parts1.length && idx < parts2.length) {
try {
- Integer.parseInt(parts1[i]);
- Integer.parseInt(parts2[i]);
+ Integer.parseInt(parts1[idx]);
+ Integer.parseInt(parts2[idx]);
} catch (NumberFormatException ex) {
- if (i == 0) {
+ if (idx == 0) {
throw ex; // just comparing two non-version strings should fail
}
- // first non integer part, so let's just stop comparison here and ignore the rest
- i--;
+ // first non integer part, so let's just stop comparison here and ignore the res
+ idx--;
break;
}
- if (parts1[i].equals(parts2[i])) {
- i++;
+ if (parts1[idx].equals(parts2[idx])) {
+ idx++;
continue;
}
break;
}
// compare first non-equal ordinal number
- if (i < parts1.length && i < parts2.length) {
- int diff = Integer.valueOf(parts1[i]).compareTo(Integer.valueOf(parts2[i]));
+ if (idx < parts1.length && idx < parts2.length) {
+ int diff = Integer.valueOf(parts1[idx]).compareTo(Integer.valueOf(parts2[idx]));
return Integer.signum(diff);
- }
- // the strings are equal or one string is a substring of the other
- // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
- else {
+ } else {
+ // the strings are equal or one string is a substring of the other
+ // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
return Integer.signum(parts1.length - parts2.length);
}
- } catch (Exception ex) {
+ } catch (RuntimeException ex) {
LOG.error("failed comparing strings for versions: '{}' '{}'", str1, str2);
throw new ReaperException(ex);
}
@@ -840,72 +922,29 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce
public void clearSnapshot(String repairId, String keyspaceName) throws ReaperException {
if (repairId == null || ("").equals(repairId)) {
- // Passing in null or empty string will clear all snapshots on the host
+ // Passing in null or empty string will clear all snapshots on the hos
throw new IllegalArgumentException("repairId cannot be null or empty string");
}
try {
- ((StorageServiceMBean) ssProxy).clearSnapshot(repairId, keyspaceName);
+ ((StorageServiceMBean) ssProxy).clearSnapshot(repairId, keyspaceName);
} catch (IOException e) {
throw new ReaperException(e);
}
}
- public List getLiveNodes()
- throws ReaperException {
+ public List getLiveNodes() throws ReaperException {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
try {
return ((StorageServiceMBean) ssProxy).getLiveNodes();
- } catch (Exception e) {
+ } catch (RuntimeException e) {
LOG.error(e.getMessage());
throw new ReaperException(e.getMessage(), e);
}
}
- private static RMIClientSocketFactory getRMIClientSocketFactory() {
- return Boolean.parseBoolean(System.getProperty("ssl.enable"))
- ? new SslRMIClientSocketFactory()
- : RMISocketFactory.getDefaultSocketFactory();
- }
-}
-
-/**
- * This code is copied and adjusted from from NodeProbe.java from Cassandra source.
- */
-class ColumnFamilyStoreMBeanIterator
- implements Iterator> {
-
- private final Iterator resIter;
- private final MBeanServerConnection mbeanServerConn;
-
- public ColumnFamilyStoreMBeanIterator(MBeanServerConnection mbeanServerConn)
- throws MalformedObjectNameException, NullPointerException, IOException {
- ObjectName query = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*");
- resIter = mbeanServerConn.queryNames(query, null).iterator();
- this.mbeanServerConn = mbeanServerConn;
- }
-
- static Iterator> getColumnFamilyStoreMBeanProxies(
- MBeanServerConnection mbeanServerConn)
- throws IOException, MalformedObjectNameException {
- return new ColumnFamilyStoreMBeanIterator(mbeanServerConn);
- }
-
- @Override
- public boolean hasNext() {
- return resIter.hasNext();
- }
-
- @Override
- public Map.Entry next() {
- ObjectName objectName = resIter.next();
- String keyspaceName = objectName.getKeyProperty("keyspace");
- ColumnFamilyStoreMBean cfsProxy =
- JMX.newMBeanProxy(mbeanServerConn, objectName, ColumnFamilyStoreMBean.class);
- return new AbstractMap.SimpleImmutableEntry<>(keyspaceName, cfsProxy);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ private static RMIClientSocketFactory getRmiClientSocketFactory() {
+ return Boolean.parseBoolean(System.getProperty("ssl.enable"))
+ ? new SslRMIClientSocketFactory()
+ : RMISocketFactory.getDefaultSocketFactory();
}
}
diff --git a/src/server/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandler.java b/src/server/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandler.java
index d8d73b21e..b88d2545c 100644
--- a/src/server/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandler.java
+++ b/src/server/src/main/java/com/spotify/reaper/cassandra/RepairStatusHandler.java
@@ -11,27 +11,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.cassandra;
+import com.google.common.base.Optional;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.utils.progress.ProgressEventType;
-import com.google.common.base.Optional;
public interface RepairStatusHandler {
/**
* Handle an event representing a change in the state of a running repair.
*
- * Implementation of this method is intended to persist the repair state change in Reaper's
- * state.
+ *
+ * Implementation of this method is intended to persist the repair state change in Reaper's state.
*
* @param repairNumber repair sequence number, obtained when triggering a repair
- * @param status new status of the repair (old API)
- * @param progress new status of the repair (new API)
- * @param message additional information about the repair
+ * @param status new status of the repair (old API)
+ * @param progress new status of the repair (new API)
+ * @param message additional information about the repair
*/
- void handle(int repairNumber, Optional status, Optional progress, String message);
-
+ void handle(
+ int repairNumber,
+ Optional status,
+ Optional progress,
+ String message);
}
diff --git a/src/server/src/main/java/com/spotify/reaper/cassandra/StorageServiceMBean20.java b/src/server/src/main/java/com/spotify/reaper/cassandra/StorageServiceMBean20.java
index 6f56a0125..bcbb45bd5 100644
--- a/src/server/src/main/java/com/spotify/reaper/cassandra/StorageServiceMBean20.java
+++ b/src/server/src/main/java/com/spotify/reaper/cassandra/StorageServiceMBean20.java
@@ -1,14 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.spotify.reaper.cassandra;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import javax.management.NotificationEmitter;
@@ -16,479 +24,134 @@
public interface StorageServiceMBean20 extends NotificationEmitter, StorageServiceMBean {
- /**
- * Retrieve the list of live nodes in the cluster, where "liveness" is
- * determined by the failure detector of the node being queried.
- *
- * @return set of IP addresses, as Strings
- */
- public List getLiveNodes();
-
- /**
- * Retrieve the list of unreachable nodes in the cluster, as determined
- * by this node's failure detector.
- *
- * @return set of IP addresses, as Strings
- */
- public List getUnreachableNodes();
-
- /**
- * Retrieve the list of nodes currently bootstrapping into the ring.
- *
- * @return set of IP addresses, as Strings
- */
- public List getJoiningNodes();
-
- /**
- * Retrieve the list of nodes currently leaving the ring.
- *
- * @return set of IP addresses, as Strings
- */
- public List getLeavingNodes();
-
- /**
- * Retrieve the list of nodes currently moving in the ring.
- *
- * @return set of IP addresses, as Strings
- */
- public List getMovingNodes();
-
- /**
- * Fetch string representations of the tokens for this node.
- *
- * @return a collection of tokens formatted as strings
- */
- public List getTokens();
-
- /**
- * Fetch string representations of the tokens for a specified node.
- *
- * @param endpoint string representation of an node
- * @return a collection of tokens formatted as strings
- */
- public List getTokens(String endpoint) throws UnknownHostException;
-
- /**
- * Fetch a string representation of the Cassandra version.
- * @return A string representation of the Cassandra version.
- */
- public String getReleaseVersion();
-
- /**
- * Fetch a string representation of the current Schema version.
- * @return A string representation of the Schema version.
- */
- public String getSchemaVersion();
-
-
- /**
- * Get the list of all data file locations from conf
- * @return String array of all locations
- */
- public String[] getAllDataFileLocations();
-
- /**
- * Get location of the commit log
- * @return a string path
- */
- public String getCommitLogLocation();
-
- /**
- * Get location of the saved caches dir
- * @return a string path
- */
- public String getSavedCachesLocation();
-
- /**
- * Retrieve a map of range to end points that describe the ring topology
- * of a Cassandra cluster.
- *
- * @return mapping of ranges to end points
- */
- public Map, List> getRangeToEndpointMap(String keyspace);
-
- /**
- * Retrieve a map of range to rpc addresses that describe the ring topology
- * of a Cassandra cluster.
- *
- * @return mapping of ranges to rpc addresses
- */
- public Map, List> getRangeToRpcaddressMap(String keyspace);
-
- /**
- * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
- *
- * @param keyspace The keyspace to fetch information about
- *
- * @return a List of TokenRange(s) converted to String for the given keyspace
- */
- public List describeRingJMX(String keyspace) throws IOException;
-
- /**
- * Retrieve a map of pending ranges to endpoints that describe the ring topology
- * @param keyspace the keyspace to get the pending range map for.
- * @return a map of pending ranges to endpoints
- */
- public Map, List> getPendingRangeToEndpointMap(String keyspace);
-
- /**
- * Retrieve a map of tokens to endpoints, including the bootstrapping
- * ones.
- *
- * @return a map of tokens to endpoints in ascending order
- */
- public Map getTokenToEndpointMap();
-
- /** Retrieve this hosts unique ID */
- public String getLocalHostId();
-
- /** Retrieve the mapping of endpoint to host ID */
- public Map getHostIdMap();
-
- /**
- * Numeric load value.
- * @see org.apache.cassandra.metrics.StorageMetrics#load
- */
- @Deprecated
- public double getLoad();
-
- /** Human-readable load value */
- public String getLoadString();
-
- /** Human-readable load value. Keys are IP addresses. */
- public Map getLoadMap();
-
- /**
- * Return the generation value for this node.
- *
- * @return generation number
- */
- public int getCurrentGenerationNumber();
-
- /**
- * This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param keyspaceName keyspace name
- * @param cf Column family name
- * @param key - key for which we need to find the endpoint return value -
- * the endpoint responsible for this key
- */
- public List getNaturalEndpoints(String keyspaceName, String cf, String key);
- public List getNaturalEndpoints(String keyspaceName, ByteBuffer key);
-
- /**
- * Takes the snapshot for the given keyspaces. A snapshot name must be specified.
- *
- * @param tag the tag given to the snapshot; may not be null or empty
- * @param keyspaceNames the name of the keyspaces to snapshot; empty means "all."
- */
- public void takeSnapshot(String tag, String... keyspaceNames) throws IOException;
-
- /**
- * Takes the snapshot of a specific column family. A snapshot name must be specified.
- *
- * @param keyspaceName the keyspace which holds the specified column family
- * @param columnFamilyName the column family to snapshot
- * @param tag the tag given to the snapshot; may not be null or empty
- */
- public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException;
-
- /**
- * Remove the snapshot with the given name from the given keyspaces.
- * If no tag is specified we will remove all snapshots.
- */
- public void clearSnapshot(String tag, String... keyspaceNames) throws IOException;
-
- /**
- * Forces major compaction of a single keyspace
- */
- public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
-
- public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
-
- /**
- * Invoke repair asynchronously.
- * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
- * Notification format is:
- * type: "repair"
- * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
- *
- * @return Repair command number, or 0 if nothing to repair
- */
- public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies);
-
- /**
- * Invoke repair asynchronously.
- * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
- * Notification format is:
- * type: "repair"
- * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
- *
- * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
- * @return Repair command number, or 0 if nothing to repair
- */
- public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies);
-
- /**
- * Same as forceRepairAsync, but handles a specified range
- */
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection dataCenters, final Collection hosts, final String... columnFamilies);
-
- /**
- * Same as forceRepairAsync, but handles a specified range
- *
- * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
- */
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, int parallelismDegree, Collection dataCenters, final Collection hosts, final String... columnFamilies);
-
- /**
- * Invoke repair asynchronously.
- * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
- * Notification format is:
- * type: "repair"
- * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
- *
- * @return Repair command number, or 0 if nothing to repair
- * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
- */
- public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
-
- /**
- * Same as forceRepairAsync, but handles a specified range
- */
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies);
-
- /**
- * Triggers proactive repair for given column families, or all columnfamilies for the given keyspace
- * if none are explicitly listed.
- * @param keyspaceName
- * @param columnFamilies
- * @throws IOException
- */
- public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
-
- /**
- * Triggers proactive repair but only for the node primary range.
- */
- public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
-
- /**
- * Perform repair of a specific range.
- *
- * This allows incremental repair to be performed by having an external controller submitting repair jobs.
- * Note that the provided range much be a subset of one of the node local range.
- */
- public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
-
- public void forceTerminateAllRepairSessions();
-
- /**
- * transfer this node's data to other machines and remove it from service.
- */
- public void decommission() throws InterruptedException;
-
- /**
- * @param newToken token to move this node to.
- * This node will unload its data onto its neighbors, and bootstrap to the new token.
- */
- public void move(String newToken) throws IOException;
-
- /**
- * removeToken removes token (and all data associated with
- * enpoint that had it) from the ring
- */
- public void removeNode(String token);
-
- /**
- * Get the status of a token removal.
- */
- public String getRemovalStatus();
-
- /**
- * Force a remove operation to finish.
- */
- public void forceRemoveCompletion();
-
- /** set the logging level at runtime */
- public void setLog4jLevel(String classQualifier, String level);
-
- public MapgetLoggingLevels();
-
- /** get the operational mode (leaving, joining, normal, decommissioned, client) **/
- public String getOperationMode();
-
- /** Returns whether the storage service is starting or not */
- public boolean isStarting();
-
- /** get the progress of a drain operation */
- public String getDrainProgress();
-
- /** makes node unavailable for writes, flushes memtables and replays commitlog. */
- public void drain() throws IOException, InterruptedException, ExecutionException;
-
- /**
- * Truncates (deletes) the given columnFamily from the provided keyspace.
- * Calling truncate results in actual deletion of all data in the cluster
- * under the given columnFamily and it will fail unless all hosts are up.
- * All data in the given column family will be deleted, but its definition
- * will not be affected.
- *
- * @param keyspace The keyspace to delete from
- * @param columnFamily The column family to delete data from.
- */
- public void truncate(String keyspace, String columnFamily)throws TimeoutException, IOException;
-
- /**
- * given a list of tokens (representing the nodes in the cluster), returns
- * a mapping from "token -> %age of cluster owned by that token"
- */
- public Map getOwnership();
-
- /**
- * Effective ownership is % of the data each node owns given the keyspace
- * we calculate the percentage using replication factor.
- * If Keyspace == null, this method will try to verify if all the keyspaces
- * in the cluster have the same replication strategies and if yes then we will
- * use the first else a empty Map is returned.
- */
- public Map effectiveOwnership(String keyspace) throws IllegalStateException;
-
- public List getKeyspaces();
-
- /**
- * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime
- * @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch
- * @param dynamic boolean that decides whether dynamicsnitch is used or not
- * @param dynamicUpdateInterval integer, in ms (default 100)
- * @param dynamicResetInterval integer, in ms (default 600,000)
- * @param dynamicBadnessThreshold double, (default 0.0)
- */
- public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException;
-
- // allows a user to forcibly 'kill' a sick node
- public void stopGossiping();
-
- // allows a user to recover a forcibly 'killed' node
- public void startGossiping();
-
- // allows a user to see whether gossip is running or not
- public boolean isGossipRunning();
-
- // allows a user to forcibly completely stop cassandra
- public void stopDaemon();
-
- // to determine if gossip is disabled
- public boolean isInitialized();
-
- // allows a user to disable thrift
- public void stopRPCServer();
-
- // allows a user to reenable thrift
- public void startRPCServer();
-
- // to determine if thrift is running
- public boolean isRPCServerRunning();
-
- public void stopNativeTransport();
- public void startNativeTransport();
- public boolean isNativeTransportRunning();
-
- // allows a node that have been started without joining the ring to join it
- public void joinRing() throws IOException;
- public boolean isJoined();
-
- @Deprecated
- public int getExceptionCount();
-
- public void setStreamThroughputMbPerSec(int value);
- public int getStreamThroughputMbPerSec();
-
- public int getCompactionThroughputMbPerSec();
- public void setCompactionThroughputMbPerSec(int value);
-
- public boolean isIncrementalBackupsEnabled();
- public void setIncrementalBackupsEnabled(boolean value);
-
- /**
- * Initiate a process of streaming data for which we are responsible from other nodes. It is similar to bootstrap
- * except meant to be used on a node which is already in the cluster (typically containing no data) as an
- * alternative to running repair.
- *
- * @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
- */
- public void rebuild(String sourceDc);
-
- /** Starts a bulk load and blocks until it completes. */
- public void bulkLoad(String directory);
-
- /**
- * Starts a bulk load asynchronously and returns the String representation of the planID for the new
- * streaming session.
- */
- public String bulkLoadAsync(String directory);
-
- public void rescheduleFailedDeletions();
-
- /**
- * Load new SSTables to the given keyspace/columnFamily
- *
- * @param ksName The parent keyspace name
- * @param cfName The ColumnFamily name where SSTables belong
- */
- public void loadNewSSTables(String ksName, String cfName);
-
- /**
- * Return a List of Tokens representing a sample of keys across all ColumnFamilyStores.
- *
- * Note: this should be left as an operation, not an attribute (methods starting with "get")
- * to avoid sending potentially multiple MB of data when accessing this mbean by default. See CASSANDRA-4452.
- *
- * @return set of Tokens as Strings
- */
- public List sampleKeyRange();
-
- /**
- * rebuild the specified indexes
- */
- public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames);
-
- public void resetLocalSchema() throws IOException;
-
- /**
- * Enables/Disables tracing for the whole system. Only thrift requests can start tracing currently.
- *
- * @param probability
- * ]0,1[ will enable tracing on a partial number of requests with the provided probability. 0 will
- * disable tracing and 1 will enable tracing for all requests (which mich severely cripple the system)
- */
- public void setTraceProbability(double probability);
-
- /**
- * Returns the configured tracing probability.
- */
- public double getTracingProbability();
-
- void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
- void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
-
- public void deliverHints(String host) throws UnknownHostException;
-
- /** Returns the name of the cluster */
- public String getClusterName();
- /** Returns the cluster partitioner */
- public String getPartitionerName();
-
- /** Returns the threshold for warning of queries with many tombstones */
- public int getTombstoneWarnThreshold();
- /** Sets the threshold for warning queries with many tombstones */
- public void setTombstoneWarnThreshold(int tombstoneDebugThreshold);
-
- /** Returns the threshold for abandoning queries with many tombstones */
- public int getTombstoneFailureThreshold();
- /** Sets the threshold for abandoning queries with many tombstones */
- public void setTombstoneFailureThreshold(int tombstoneDebugThreshold);
+ /**
+ * Numeric load value.
+ *
+ * @see org.apache.cassandra.metrics.StorageMetrics#load
+ */
+ @Deprecated
+ double getLoad();
+
+ /**
+ * Forces major compaction of a single keyspace
+ */
+ void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies)
+ throws IOException, ExecutionException, InterruptedException;
+
+ /**
+ * Invoke repair asynchronously. You can track repair progress by subscribing JMX notification sent from this
+ * StorageServiceMBean. Notification format is: type: "repair" userObject: int array of length 2, [0]=command number,
+ * [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
+ */
+ int forceRepairAsync(
+ String keyspace,
+ boolean isSequential,
+ Collection dataCenters,
+ final Collection hosts,
+ boolean primaryRange,
+ String... columnFamilies);
+
+ /**
+ * Invoke repair asynchronously. You can track repair progress by subscribing JMX notification sent from this
+ * StorageServiceMBean. Notification format is: type: "repair" userObject: int array of length 2, [0]=command number,
+ * [1]=ordinal of AntiEntropyService.Status
+ *
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
+ * @return Repair command number, or 0 if nothing to repair
+ */
+ int forceRepairAsync(
+ String keyspace,
+ int parallelismDegree,
+ Collection dataCenters,
+ final Collection hosts,
+ boolean primaryRange,
+ String... columnFamilies);
+
+ /**
+ * Invoke repair asynchronously. You can track repair progress by subscribing JMX notification sent from this
+ * StorageServiceMBean. Notification format is: type: "repair" userObject: int array of length 2, [0]=command number,
+ * [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
+ * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
+ */
+ int forceRepairAsync(
+ String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
+
+ /**
+ * Same as forceRepairAsync, but handles a specified range
+ */
+ int forceRepairRangeAsync(
+ String beginToken,
+ String endToken,
+ final String keyspaceName,
+ boolean isSequential,
+ Collection dataCenters,
+ final Collection hosts,
+ final String... columnFamilies);
+
+ /**
+ * Same as forceRepairAsync, but handles a specified range
+ *
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
+ */
+ int forceRepairRangeAsync(
+ String beginToken,
+ String endToken,
+ final String keyspaceName,
+ int parallelismDegree,
+ Collection dataCenters,
+ final Collection hosts,
+ final String... columnFamilies);
+
+ /**
+ * Same as forceRepairAsync, but handles a specified range
+ */
+ int forceRepairRangeAsync(
+ String beginToken,
+ String endToken,
+ final String keyspaceName,
+ boolean isSequential,
+ boolean isLocal,
+ final String... columnFamilies);
+
+ /**
+ * Triggers proactive repair for given column families, or all columnfamilies for the given keyspace if none are
+ * explicitly listed.
+ */
+ void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies)
+ throws IOException;
+
+ /**
+ * Triggers proactive repair but only for the node primary range.
+ */
+ void forceKeyspaceRepairPrimaryRange(
+ String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+
+ /**
+ * Perform repair of a specific range.
+ *
+ *
+ * This allows incremental repair to be performed by having an external controller submitting repair jobs. Note
+ * that the provided range much be a subset of one of the node local range.
+ */
+ void forceKeyspaceRepairRange(
+ String beginToken,
+ String endToken,
+ String keyspaceName,
+ boolean isSequential,
+ boolean isLocal,
+ String... columnFamilies)
+ throws IOException;
+
+ /**
+ * set the logging level at runtime
+ */
+ void setLog4jLevel(String classQualifier, String level);
+
+ @Deprecated
+ int getExceptionCount();
- /** Sets the hinted handoff throttle in kb per second, per delivery thread. */
- public void setHintedHandoffThrottleInKB(int throttleInKB);
}
diff --git a/src/server/src/main/java/com/spotify/reaper/cassandra/package-info.java b/src/server/src/main/java/com/spotify/reaper/cassandra/package-info.java
new file mode 100644
index 000000000..bc10cfecf
--- /dev/null
+++ b/src/server/src/main/java/com/spotify/reaper/cassandra/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+@javax.annotation.ParametersAreNonnullByDefault
+package com.spotify.reaper.cassandra;
diff --git a/src/server/src/main/java/com/spotify/reaper/core/Cluster.java b/src/server/src/main/java/com/spotify/reaper/core/Cluster.java
index cf958a81d..97096eb5b 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/Cluster.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/Cluster.java
@@ -11,27 +11,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.core;
import java.util.Set;
-public class Cluster {
+import com.google.common.base.Preconditions;
+
+public final class Cluster {
private final String name;
private final String partitioner; // Full name of the partitioner class
private final Set seedHosts;
- public static String toSymbolicName(String s) {
- assert s != null : "cannot turn null into symbolic name";
- return s.toLowerCase().replaceAll("[^a-z0-9_\\-\\.]", "");
- }
-
public Cluster(String name, String partitioner, Set seedHosts) {
this.name = toSymbolicName(name);
this.partitioner = partitioner;
this.seedHosts = seedHosts;
}
+ public static String toSymbolicName(String name) {
+ Preconditions.checkNotNull(name, "cannot turn null into symbolic name");
+ return name.toLowerCase().replaceAll("[^a-z0-9_\\-\\.]", "");
+ }
+
public String getName() {
return name;
}
diff --git a/src/server/src/main/java/com/spotify/reaper/core/NodeMetrics.java b/src/server/src/main/java/com/spotify/reaper/core/NodeMetrics.java
index 96417096d..40a6901bf 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/NodeMetrics.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/NodeMetrics.java
@@ -1,13 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.spotify.reaper.core;
public final class NodeMetrics {
+
private final String hostAddress;
private final String datacenter;
private final int pendingCompactions;
private final boolean hasRepairRunning;
private final int activeAnticompactions;
-
private NodeMetrics(Builder builder) {
this.hostAddress = builder.hostAddress;
this.datacenter = builder.datacenter;
@@ -36,55 +50,56 @@ public int getActiveAnticompactions() {
return activeAnticompactions;
}
-
/**
* Creates builder to build {@link NodeMetrics}.
+ *
* @return created builder
*/
public static Builder builder() {
- return new Builder();
+ return new Builder();
}
/**
* Builder to build {@link NodeMetrics}.
*/
public static final class Builder {
- private String hostAddress;
- private String datacenter;
- private int pendingCompactions;
- private boolean hasRepairRunning;
- private int activeAnticompactions;
- private Builder() {
- }
+ private String hostAddress;
+ private String datacenter;
+ private int pendingCompactions;
+ private boolean hasRepairRunning;
+ private int activeAnticompactions;
- public Builder withHostAddress(String hostAddress) {
- this.hostAddress = hostAddress;
- return this;
- }
+ private Builder() {
+ }
+
+ public Builder withHostAddress(String hostAddress) {
+ this.hostAddress = hostAddress;
+ return this;
+ }
- public Builder withDatacenter(String datacenter) {
+ public Builder withDatacenter(String datacenter) {
this.datacenter = datacenter;
return this;
}
- public Builder withPendingCompactions(int pendingCompactions) {
- this.pendingCompactions = pendingCompactions;
- return this;
- }
+ public Builder withPendingCompactions(int pendingCompactions) {
+ this.pendingCompactions = pendingCompactions;
+ return this;
+ }
- public Builder withHasRepairRunning(boolean hasRepairRunning) {
- this.hasRepairRunning = hasRepairRunning;
- return this;
- }
+ public Builder withHasRepairRunning(boolean hasRepairRunning) {
+ this.hasRepairRunning = hasRepairRunning;
+ return this;
+ }
- public Builder withActiveAnticompactions(int activeAnticompactions) {
- this.activeAnticompactions = activeAnticompactions;
- return this;
- }
+ public Builder withActiveAnticompactions(int activeAnticompactions) {
+ this.activeAnticompactions = activeAnticompactions;
+ return this;
+ }
- public NodeMetrics build() {
- return new NodeMetrics(this);
- }
+ public NodeMetrics build() {
+ return new NodeMetrics(this);
+ }
}
}
diff --git a/src/server/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/server/src/main/java/com/spotify/reaper/core/RepairRun.java
index 6ab04b3b1..207563af2 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/RepairRun.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/RepairRun.java
@@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.core;
import java.util.Objects;
@@ -20,14 +21,13 @@
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
-public class RepairRun implements Comparable {
+public final class RepairRun implements Comparable {
private final UUID id;
// IDEA: maybe we want to have start and stop token for parallel runners on same repair run?
- //private final long startToken;
- //private final long endToken;
-
+ // private final long startToken;
+ // private final long endToken;
private final String cause;
private final String owner;
private final String clusterName;
@@ -102,7 +102,7 @@ public DateTime getPauseTime() {
public double getIntensity() {
return intensity;
}
-
+
public String getLastEvent() {
return lastEvent;
}
@@ -120,11 +120,10 @@ public Builder with() {
}
/**
- * Order RepairRun instances by time. Primarily endTime, secondarily startTime. Descending, i.e.
- * latest first.
+ * Order RepairRun instances by time. Primarily endTime, secondarily startTime. Descending, i.e. latest first.
+ *
* @param other the RepairRun compared to
- * @return negative if this RepairRun is later than the specified RepairRun. Positive if earlier.
- * 0 if equal.
+ * @return negative if this RepairRun is later than the specified RepairRun. Positive if earlier. 0 if equal.
*/
@Override
public int compareTo(RepairRun other) {
@@ -136,19 +135,19 @@ public int compareTo(RepairRun other) {
return -comparator.compare(startTime, other.startTime);
}
}
-
+
@Override
public boolean equals(Object other) {
- if (other == this)
+ if (other == this) {
return true;
+ }
if (!(other instanceof RepairRun)) {
return false;
}
RepairRun run = (RepairRun) other;
- return this.id == run.id
- && this.repairUnitId == run.repairUnitId;
+ return this.id == run.id && this.repairUnitId == run.repairUnitId;
}
-
+
@Override
public int hashCode() {
return Objects.hash(this.id, this.repairUnitId);
@@ -172,14 +171,13 @@ public boolean isTerminated() {
}
}
- public static class Builder {
+ public static final class Builder {
public final String clusterName;
public final UUID repairUnitId;
private RunState runState;
private DateTime creationTime;
private double intensity;
- private boolean incrementalRepair;
private String cause;
private String owner;
private DateTime startTime;
@@ -189,8 +187,14 @@ public static class Builder {
private int segmentCount;
private RepairParallelism repairParallelism;
- public Builder(String clusterName, UUID repairUnitId, DateTime creationTime,
- double intensity, int segmentCount, RepairParallelism repairParallelism) {
+ public Builder(
+ String clusterName,
+ UUID repairUnitId,
+ DateTime creationTime,
+ double intensity,
+ int segmentCount,
+ RepairParallelism repairParallelism) {
+
this.clusterName = clusterName;
this.repairUnitId = repairUnitId;
this.runState = RunState.NOT_STARTED;
@@ -230,7 +234,7 @@ public Builder intensity(double intensity) {
this.intensity = intensity;
return this;
}
-
+
public Builder cause(String cause) {
this.cause = cause;
return this;
diff --git a/src/server/src/main/java/com/spotify/reaper/core/RepairSchedule.java b/src/server/src/main/java/com/spotify/reaper/core/RepairSchedule.java
index 27eae2248..2faba5660 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/RepairSchedule.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/RepairSchedule.java
@@ -11,20 +11,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.core;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import java.util.Collection;
-
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
-public class RepairSchedule {
+public final class RepairSchedule {
private final UUID id;
@@ -84,15 +84,11 @@ public ImmutableList getRunHistory() {
}
/**
- * Required for JDBI mapping into database.
- * Generic collection type would be hard to map into Postgres array types.
+ * Required for JDBI mapping into database. Generic collection type would be hard to map into Postgres array types.
*/
- public LongCollectionSQLType getRunHistorySQL() {
- List list = runHistory
- .stream()
- .map(UUID::getMostSignificantBits)
- .collect(Collectors.toList());
- return new LongCollectionSQLType(list);
+ public LongCollectionSqlType getRunHistorySql() {
+ List list = runHistory.stream().map(UUID::getMostSignificantBits).collect(Collectors.toList());
+ return new LongCollectionSqlType(list);
}
public int getSegmentCount() {
@@ -143,10 +139,16 @@ public static class Builder {
private String owner;
private DateTime pauseTime;
- public Builder(UUID repairUnitId, State state, int daysBetween, DateTime nextActivation,
- ImmutableList runHistory, int segmentCount,
- RepairParallelism repairParallelism,
- double intensity, DateTime creationTime) {
+ public Builder(
+ UUID repairUnitId,
+ State state,
+ int daysBetween,
+ DateTime nextActivation,
+ ImmutableList runHistory,
+ int segmentCount,
+ RepairParallelism repairParallelism,
+ double intensity,
+ DateTime creationTime) {
this.repairUnitId = repairUnitId;
this.state = state;
this.daysBetween = daysBetween;
@@ -173,8 +175,7 @@ private Builder(RepairSchedule original) {
intensity = original.intensity;
}
-
- public Builder state(State state) {
+ public Builder state(State state) {
this.state = state;
return this;
}
@@ -229,19 +230,19 @@ public RepairSchedule build(UUID id) {
}
}
- /**
- * This is required to be able to map in generic manner into Postgres array types through JDBI.
- */
- public static final class LongCollectionSQLType {
+ /**
+ * This is required to be able to map in generic manner into Postgres array types through JDBI.
+ */
+ public static final class LongCollectionSqlType {
- private final Collection collection;
+ private final Collection collection;
- public LongCollectionSQLType(Collection collection) {
- this.collection = collection;
- }
+ public LongCollectionSqlType(Collection collection) {
+ this.collection = collection;
+ }
- public Collection getValue() {
- return null == collection ? collection : Lists.newArrayList();
- }
+ public Collection getValue() {
+ return null == collection ? collection : Lists.newArrayList();
}
+ }
}
diff --git a/src/server/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/server/src/main/java/com/spotify/reaper/core/RepairSegment.java
index 2188af594..09bc54286 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/RepairSegment.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/RepairSegment.java
@@ -11,16 +11,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.core;
import com.spotify.reaper.service.RingRange;
-import org.joda.time.DateTime;
-
import java.math.BigInteger;
import java.util.UUID;
-public class RepairSegment {
+import org.joda.time.DateTime;
+
+public final class RepairSegment {
private final UUID id;
private final UUID runId;
@@ -106,7 +107,6 @@ public enum State {
public static class Builder {
-
public final RingRange tokenRange;
private final UUID repairUnitId;
private UUID runId;
@@ -136,9 +136,9 @@ private Builder(RepairSegment original) {
endTime = original.endTime;
}
- public Builder withRunId(UUID runId){
- this.runId = runId;
- return this;
+ public Builder withRunId(UUID runId) {
+ this.runId = runId;
+ return this;
}
public Builder failCount(int failCount) {
diff --git a/src/server/src/main/java/com/spotify/reaper/core/RepairUnit.java b/src/server/src/main/java/com/spotify/reaper/core/RepairUnit.java
index b121f27d6..fe4985c96 100644
--- a/src/server/src/main/java/com/spotify/reaper/core/RepairUnit.java
+++ b/src/server/src/main/java/com/spotify/reaper/core/RepairUnit.java
@@ -11,12 +11,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.core;
import java.util.Set;
import java.util.UUID;
-public class RepairUnit {
+public final class RepairUnit {
private final UUID id;
private final String clusterName;
@@ -77,8 +78,13 @@ public static class Builder {
public final Set nodes;
public final Set datacenters;
- public Builder(String clusterName, String keyspaceName, Set columnFamilies, Boolean incrementalRepair,
- Set nodes, Set datacenters) {
+ public Builder(
+ String clusterName,
+ String keyspaceName,
+ Set columnFamilies,
+ Boolean incrementalRepair,
+ Set nodes,
+ Set datacenters) {
this.clusterName = clusterName;
this.keyspaceName = keyspaceName;
this.columnFamilies = columnFamilies;
diff --git a/src/server/src/main/java/com/spotify/reaper/core/package-info.java b/src/server/src/main/java/com/spotify/reaper/core/package-info.java
new file mode 100644
index 000000000..c69bc07f8
--- /dev/null
+++ b/src/server/src/main/java/com/spotify/reaper/core/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+@javax.annotation.ParametersAreNonnullByDefault
+package com.spotify.reaper.core;
diff --git a/src/server/src/main/java/com/spotify/reaper/package-info.java b/src/server/src/main/java/com/spotify/reaper/package-info.java
new file mode 100644
index 000000000..55f643239
--- /dev/null
+++ b/src/server/src/main/java/com/spotify/reaper/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+@javax.annotation.ParametersAreNonnullByDefault
+package com.spotify.reaper;
diff --git a/src/server/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/server/src/main/java/com/spotify/reaper/resources/ClusterResource.java
index 49bf265b9..0cf895023 100644
--- a/src/server/src/main/java/com/spotify/reaper/resources/ClusterResource.java
+++ b/src/server/src/main/java/com/spotify/reaper/resources/ClusterResource.java
@@ -11,9 +11,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.spotify.reaper.resources;
-import com.google.common.base.Optional;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
@@ -23,15 +23,11 @@
import com.spotify.reaper.resources.view.NodesStatus;
import com.spotify.reaper.resources.view.RepairRunStatus;
import com.spotify.reaper.resources.view.RepairScheduleStatus;
-
import com.spotify.reaper.service.ClusterRepairScheduler;
-import jersey.repackaged.com.google.common.collect.Lists;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,7 +43,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -61,14 +56,19 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
+import com.google.common.base.Optional;
+import jersey.repackaged.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
-public class ClusterResource {
+public final class ClusterResource {
private static final int JMX_NODE_STATUS_CONCURRENCY = 3;
private static final ExecutorService CLUSTER_STATUS_EXECUTOR
- = Executors.newFixedThreadPool(JMX_NODE_STATUS_CONCURRENCY *2);
+ = Executors.newFixedThreadPool(JMX_NODE_STATUS_CONCURRENCY * 2);
private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class);
@@ -81,7 +81,9 @@ public ClusterResource(AppContext context) {
}
@GET
- public Response getClusterList(@QueryParam("seedHost") Optional seedHost) {
+ public Response getClusterList(
+ @QueryParam("seedHost") Optional seedHost) {
+
LOG.debug("get cluster list called");
Collection clusters = context.storage.getClusters();
List clusterNames = new ArrayList<>();
@@ -101,29 +103,32 @@ public Response getClusterList(@QueryParam("seedHost") Optional seedHost
@Path("/{cluster_name}")
public Response getCluster(
@PathParam("cluster_name") String clusterName,
- @QueryParam("limit") Optional limit) throws ReaperException {
+ @QueryParam("limit") Optional limit)
+ throws ReaperException {
+
LOG.debug("get cluster called with cluster_name: {}", clusterName);
return viewCluster(clusterName, limit, Optional.absent());
}
- private Response viewCluster(String clusterName, Optional limit,
- Optional createdURI) throws ReaperException {
+ private Response viewCluster(String clusterName, Optional limit, Optional createdUri)
+ throws ReaperException {
+
Optional cluster = context.storage.getCluster(clusterName);
if (!cluster.isPresent()) {
return Response.status(Response.Status.NOT_FOUND)
- .entity("cluster with name \"" + clusterName + "\" not found").build();
+ .entity("cluster with name \"" + clusterName + "\" not found")
+ .build();
} else {
- ClusterStatus view =
- new ClusterStatus(cluster.get(),
- context.storage.getClusterRunStatuses(clusterName, limit.or(Integer.MAX_VALUE)),
- context.storage.getClusterScheduleStatuses(clusterName), getNodesStatus(cluster).orNull());
- if (createdURI.isPresent()) {
- return Response.created(createdURI.get())
- .entity(view).build();
+ ClusterStatus view = new ClusterStatus(
+ cluster.get(),
+ context.storage.getClusterRunStatuses(clusterName, limit.or(Integer.MAX_VALUE)),
+ context.storage.getClusterScheduleStatuses(clusterName),
+ getNodesStatus(cluster).orNull());
+ if (createdUri.isPresent()) {
+ return Response.created(createdUri.get()).entity(view).build();
} else {
- return Response.ok()
- .entity(view).build();
+ return Response.ok().entity(view).build();
}
}
}
@@ -131,7 +136,9 @@ private Response viewCluster(String clusterName, Optional limit,
@POST
public Response addCluster(
@Context UriInfo uriInfo,
- @QueryParam("seedHost") Optional seedHost) throws ReaperException {
+ @QueryParam("seedHost") Optional seedHost)
+ throws ReaperException {
+
if (!seedHost.isPresent()) {
LOG.error("POST on cluster resource called without seedHost");
return Response.status(400).entity("query parameter \"seedHost\" required").build();
@@ -144,12 +151,11 @@ public Response addCluster(
} catch (java.lang.SecurityException e) {
LOG.error(e.getMessage(), e);
return Response.status(400)
- .entity("seed host \"" + seedHost.get() + "\" JMX threw security exception: "
- + e.getMessage()).build();
+ .entity("seed host \"" + seedHost.get() + "\" JMX threw security exception: " + e.getMessage())
+ .build();
} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
- return Response.status(400)
- .entity("failed to create cluster with seed host: " + seedHost.get()).build();
+ return Response.status(400).entity("failed to create cluster with seed host: " + seedHost.get()).build();
}
Optional existingCluster = context.storage.getCluster(newCluster.getName());
if (existingCluster.isPresent()) {
@@ -167,22 +173,26 @@ public Response addCluster(
} catch (ReaperException e) {
LOG.error("failed to automatically schedule repairs", e);
return Response.status(400)
- .entity("failed to automatically schedule repairs for cluster with seed host \"" + seedHost.get()
- + "\". Exception was: " + e.getMessage()).build();
+ .entity(
+ "failed to automatically schedule repairs for cluster with seed host \""
+ + seedHost.get()
+ + "\". Exception was: "
+ + e.getMessage())
+ .build();
}
}
}
- URI createdURI;
+ URI createdUri;
try {
- createdURI = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI();
- } catch (Exception e) {
+ createdUri = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI();
+ } catch (MalformedURLException | URISyntaxException e) {
String errMsg = "failed creating target URI for cluster: " + newCluster.getName();
LOG.error(errMsg, e);
return Response.status(400).entity(errMsg).build();
}
- return viewCluster(newCluster.getName(), Optional.absent(), Optional.of(createdURI));
+ return viewCluster(newCluster.getName(), Optional.absent(), Optional.of(createdUri));
}
public Cluster createClusterWithSeedHost(String seedHostInput) throws ReaperException {
@@ -191,8 +201,8 @@ public Cluster createClusterWithSeedHost(String seedHostInput) throws ReaperExce
Optional> liveNodes = Optional.absent();
Set seedHosts = CommonTools.parseSeedHosts(seedHostInput);
- try (JmxProxy jmxProxy = context.jmxConnectionFactory
- .connectAny(Optional.absent(), seedHosts, context.config.getJmxConnectionTimeoutInSeconds())) {
+ try (JmxProxy jmxProxy = context.jmxConnectionFactory.connectAny(
+ Optional.absent(), seedHosts, context.config.getJmxConnectionTimeoutInSeconds())) {
clusterName = Optional.of(jmxProxy.getClusterName());
partitioner = Optional.of(jmxProxy.getPartitioner());
@@ -201,15 +211,13 @@ public Cluster createClusterWithSeedHost(String seedHostInput) throws ReaperExce
LOG.error("failed to create cluster with seed hosts: {}", seedHosts, e);
}
- if(!clusterName.isPresent()) {
+ if (!clusterName.isPresent()) {
throw new ReaperException("Could not connect any seed host");
}
Set seedHostsFinal = seedHosts;
if (context.config.getEnableDynamicSeedList() && liveNodes.isPresent()) {
- seedHostsFinal = !liveNodes.get().isEmpty()
- ? liveNodes.get().stream().collect(Collectors.toSet())
- : seedHosts;
+ seedHostsFinal = !liveNodes.get().isEmpty() ? liveNodes.get().stream().collect(Collectors.toSet()) : seedHosts;
}
LOG.debug("Seeds {}", seedHostsFinal);
@@ -222,7 +230,8 @@ public Cluster createClusterWithSeedHost(String seedHostInput) throws ReaperExce
public Response modifyClusterSeed(
@Context UriInfo uriInfo,
@PathParam("cluster_name") String clusterName,
- @QueryParam("seedHost") Optional seedHost) throws ReaperException {
+ @QueryParam("seedHost") Optional seedHost)
+ throws ReaperException {
if (!seedHost.isPresent()) {
LOG.error("PUT on cluster resource called without seedHost");
@@ -232,17 +241,16 @@ public Response modifyClusterSeed(
Optional cluster = context.storage.getCluster(clusterName);
if (!cluster.isPresent()) {
- return Response
- .status(Response.Status.NOT_FOUND)
- .entity("cluster with name " + clusterName + " not found")
- .build();
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity("cluster with name " + clusterName + " not found")
+ .build();
}
Set newSeeds = CommonTools.parseSeedHosts(seedHost.get());
- if(context.config.getEnableDynamicSeedList()) {
- try (JmxProxy jmxProxy = context.jmxConnectionFactory
- .connectAny(Optional.absent(), newSeeds, context.config.getJmxConnectionTimeoutInSeconds())) {
+ if (context.config.getEnableDynamicSeedList()) {
+ try (JmxProxy jmxProxy = context.jmxConnectionFactory.connectAny(
+ Optional.absent(), newSeeds, context.config.getJmxConnectionTimeoutInSeconds())) {
Optional> liveNodes = Optional.of(jmxProxy.getLiveNodes());
newSeeds = liveNodes.get().stream().collect(Collectors.toSet());
@@ -264,110 +272,105 @@ public Response modifyClusterSeed(
/**
* Delete a Cluster object with given name.
*
- * Cluster can be only deleted when it hasn't any RepairRun or RepairSchedule instances under it,
- * i.e. you must delete all repair runs and schedules first.
+ *
+ * Cluster can be only deleted when it hasn't any RepairRun or RepairSchedule instances under it, i.e. you mus
+ * delete all repair runs and schedules first.
*
* @param clusterName The name of the Cluster instance you are about to delete.
* @return The deleted RepairRun instance, with state overwritten to string "DELETED".
- * @throws ReaperException
*/
@DELETE
@Path("/{cluster_name}")
- public Response deleteCluster(@PathParam("cluster_name") String clusterName) throws ReaperException {
+ public Response deleteCluster(
+ @PathParam("cluster_name") String clusterName)
+ throws ReaperException {
LOG.info("delete cluster called with clusterName: {}", clusterName);
Optional clusterToDelete = context.storage.getCluster(clusterName);
if (!clusterToDelete.isPresent()) {
- return Response.status(Response.Status.NOT_FOUND).entity(
- "cluster with name \"" + clusterName + "\" not found").build();
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity("cluster with name \"" + clusterName + "\" not found")
+ .build();
}
if (!context.storage.getRepairSchedulesForCluster(clusterName).isEmpty()) {
- return Response.status(Response.Status.FORBIDDEN).entity(
- "cluster with name \"" + clusterName + "\" cannot be deleted, as it "
- + "has repair schedules").build();
+ return Response.status(Response.Status.FORBIDDEN)
+ .entity("cluster with name \"" + clusterName + "\" cannot be deleted, as it " + "has repair schedules")
+ .build();
}
if (!context.storage.getRepairRunsForCluster(clusterName).isEmpty()) {
- return Response.status(Response.Status.FORBIDDEN).entity(
- "cluster with name \"" + clusterName + "\" cannot be deleted, as it "
- + "has repair runs").build();
+ return Response.status(Response.Status.FORBIDDEN)
+ .entity("cluster with name \"" + clusterName + "\" cannot be deleted, as it " + "has repair runs")
+ .build();
}
Optional deletedCluster = context.storage.deleteCluster(clusterName);
if (deletedCluster.isPresent()) {
return Response.ok(
- new ClusterStatus(
- deletedCluster.get(),
- Collections.emptyList(),
- Collections.emptyList(),
- getNodesStatus(deletedCluster).orNull()))
+ new ClusterStatus(
+ deletedCluster.get(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ getNodesStatus(deletedCluster).orNull()))
.build();
}
return Response.serverError().entity("delete failed for schedule with name \"" + clusterName + "\"").build();
}
-
/**
* Callable to get and parse endpoint states through JMX
*
- *
* @param seedHost The host address to connect to via JMX
* @return An optional NodesStatus object with the status of each node in the cluster as seen from the seedHost node
*/
private Callable> getEndpointState(List seeds) {
return () -> {
- try (JmxProxy jmxProxy = context.jmxConnectionFactory
- .connectAny(Optional.absent(), seeds, context.config.getJmxConnectionTimeoutInSeconds())) {
+ try (JmxProxy jmxProxy = context.jmxConnectionFactory.connectAny(
+ Optional.absent(), seeds, context.config.getJmxConnectionTimeoutInSeconds())) {
Optional allEndpointsState = Optional.fromNullable(jmxProxy.getAllEndpointsState());
Optional