Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code improvements based on SonarQube analysis #48

Merged
merged 2 commits into from
Feb 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ database:
# PostgreSQL JDBC settings
driverClass: org.postgresql.Driver
user: postgres
password: postgres
url: jdbc:postgresql://127.0.0.1/reaper
password:
url: jdbc:postgresql://127.0.0.1/reaper
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperApplicationConfiguration;
import com.spotify.reaper.ReaperApplicationConfiguration.JmxCredentials;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void run(ReaperApplicationConfiguration config,
// read jmx host/port mapping from config and provide to jmx con.factory
Map<String, Integer> jmxPorts = config.getJmxPorts();
if (jmxPorts != null) {
LOG.debug("using JMX ports mapping: " + jmxPorts);
LOG.debug("using JMX ports mapping: {}", jmxPorts);
context.jmxConnectionFactory.setJmxPorts(jmxPorts);
}

Expand Down Expand Up @@ -201,7 +202,7 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config,
LOG.error("invalid storageType: {}", config.getStorageType());
throw new ReaperException("invalid storage type: " + config.getStorageType());
}
assert storage.isStorageConnected() : "Failed to connect storage";
Preconditions.checkState(storage.isStorageConnected(), "Failed to connect storage");
return storage;
}

Expand All @@ -222,7 +223,7 @@ public static void checkRepairParallelismString(String givenRepairParallelism)
} catch (java.lang.IllegalArgumentException ex) {
throw new ReaperException(
"invalid repair parallelism given \"" + givenRepairParallelism
+ "\", must be one of: " + Arrays.toString(RepairParallelism.values()));
+ "\", must be one of: " + Arrays.toString(RepairParallelism.values()), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Map;

import javax.validation.Valid;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -143,7 +142,7 @@ public String getEnableCrossOrigin() {
}

public boolean isEnableCrossOrigin() {
return this.enableCrossOrigin != null && this.enableCrossOrigin.equalsIgnoreCase("true");
return this.enableCrossOrigin != null && ("true").equalsIgnoreCase(this.enableCrossOrigin);
}

public void setStorageType(String storageType) {
Expand Down
138 changes: 72 additions & 66 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
LOG.debug(String.format("JMX connection to %s properly connected: %s",
host, jmxUrl.toString()));
LOG.debug("JMX connection to {} properly connected: {}",
host, jmxUrl.toString());
return proxy;
} catch (IOException | InstanceNotFoundException e) {
LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port));
LOG.error("Failed to establish JMX connection to {}:{}", host, port);
throw new ReaperException("Failure when establishing JMX connection", e);
}
}
Expand Down Expand Up @@ -189,9 +189,9 @@ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
checkNotNull(ssProxy, "Looks like the proxy is not connected");
try {
return ((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace);
} catch (AssertionError e) {
} catch (Exception e) {
LOG.error(e.getMessage());
throw new ReaperException(e.getMessage());
throw new ReaperException(e.getMessage(), e);
}
}

Expand Down Expand Up @@ -243,8 +243,7 @@ public Set<String> getTableNamesForKeyspace(String keyspace) throws ReaperExcept
try {
proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer);
} catch (IOException | MalformedObjectNameException e) {
e.printStackTrace();
throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX");
throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX", e);
}
while (proxies.hasNext()) {
Map.Entry<String, ColumnFamilyStoreMBean> proxyEntry = proxies.next();
Expand All @@ -267,12 +266,13 @@ public int getPendingCompactions() {
int pendingCount = (int) mbeanServer.getAttribute(name, "PendingTasks");
return pendingCount;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX");
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name");
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
LOG.debug("No compaction has run yet on the node. Ignoring exception.", e);
return 0;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
Expand All @@ -292,12 +292,13 @@ public boolean isRepairRunning() {
long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks");
return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX");
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name");
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
LOG.debug("No repair has run yet on the node. Ignoring exception.", e);
return false;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
Expand Down Expand Up @@ -337,7 +338,7 @@ public boolean tableExists(String ks, String cf) {
} catch (MalformedObjectNameException | IOException e) {
String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf,
e.getMessage());
LOG.warn(errMsg);
LOG.warn(errMsg, e);
return false;
}
return true;
Expand All @@ -363,7 +364,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
try {
canUseDatacenterAware = versionCompare(cassandraVersion, "2.0.12") >= 0;
} catch (ReaperException e) {
LOG.warn("failed on version comparison, not using dc aware repairs by default");
LOG.warn("failed on version comparison, not using dc aware repairs by default", e);
}
String msg = String.format("Triggering repair of range (%s,%s] for keyspace \"%s\" on "
+ "host %s, with repair parallelism %s, in cluster with Cassandra "
Expand All @@ -373,55 +374,62 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
repairParallelism, cassandraVersion, canUseDatacenterAware,
columnFamilies);
LOG.info(msg);

if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE) && !canUseDatacenterAware) {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
try {
if (!cassandraVersion.startsWith("2.0") && !cassandraVersion.startsWith("1.")) {
if (fullRepair) {
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
if (canUseDatacenterAware) {
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));
} else {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);

return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));

} else {
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
}
if (cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")) {
return triggerRepairPre2dot1(repairParallelism, keyspace, columnFamilies, beginToken, endToken);
} else {
// Cassandra 2.0 compatibility
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
if (canUseDatacenterAware) {
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), null, null,
columnFamilies.toArray(new String[columnFamilies.size()]));
} else {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));
return triggerRepairPost2dot1(fullRepair, repairParallelism, keyspace, columnFamilies, beginToken, endToken, cassandraVersion);
}
} catch (Exception e) {
LOG.error("Segment repair failed", e);
throw new ReaperException(e);
}

}


public int triggerRepairPost2dot1(boolean fullRepair, RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken, String cassandraVersion) {
if (fullRepair) {
// full repair
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));
}

boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);

return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));

}

// incremental repair
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));

}


public int triggerRepairPre2dot1(RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken) {
// Cassandra 1.2 and 2.0 compatibility
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), null, null,
columnFamilies.toArray(new String[columnFamilies.size()]));
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));

}


/**
Expand All @@ -436,8 +444,8 @@ public void handleNotification(Notification notification, Object handback) {
Thread.currentThread().setName(clusterName);
// we're interested in "repair"
String type = notification.getType();
LOG.debug("Received notification: {}", notification.toString());
if (repairStatusHandler.isPresent() && type.equals("repair")) {
LOG.debug("Received notification: {}", notification);
if (repairStatusHandler.isPresent() && ("repair").equals(type)) {
int[] data = (int[]) notification.getUserData();
// get the repair sequence number
int repairNo = data[0];
Expand All @@ -459,7 +467,7 @@ public boolean isConnectionAlive() {
String connectionId = getConnectionId();
return null != connectionId && connectionId.length() > 0;
} catch (IOException e) {
e.printStackTrace();
LOG.error("Couldn't get Connection Id", e);
}
return false;
}
Expand All @@ -469,18 +477,16 @@ public boolean isConnectionAlive() {
*/
@Override
public void close() throws ReaperException {
LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl));
LOG.debug("close JMX connection to '{}': {}", host, jmxUrl);
try {
mbeanServer.removeNotificationListener(ssMbeanName, this);
} catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) {
LOG.warn("failed on removing notification listener");
e.printStackTrace();
LOG.warn("failed on removing notification listener", e);
}
try {
jmxConnector.close();
} catch (IOException e) {
LOG.warn("failed closing a JMX connection");
e.printStackTrace();
LOG.warn("failed closing a JMX connection", e);
}
}

Expand All @@ -502,10 +508,10 @@ public void close() throws ReaperException {
*/
public static Integer versionCompare(String str1, String str2) throws ReaperException {
try {
str1 = str1.split(" ")[0].replaceAll("[-_~]", ".");
str2 = str2.split(" ")[0].replaceAll("[-_~]", ".");
String[] parts1 = str1.split("\\.");
String[] parts2 = str2.split("\\.");
String cleanedUpStr1 = str1.split(" ")[0].replaceAll("[-_~]", ".");
String cleanedUpStr2 = str2.split(" ")[0].replaceAll("[-_~]", ".");
String[] parts1 = cleanedUpStr1.split("\\.");
String[] parts2 = cleanedUpStr2.split("\\.");
int i = 0;
// set index to first non-equal ordinal or length of shortest version string
while (i < parts1.length && i < parts2.length) {
Expand Down Expand Up @@ -543,7 +549,7 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce
}

public void clearSnapshot(String repairId, String keyspaceName) throws ReaperException {
if (repairId == null || repairId.equals("")) {
if (repairId == null || ("").equals(repairId)) {
// Passing in null or empty string will clear all snapshots on the host
throw new IllegalArgumentException("repairId cannot be null or empty string");
}
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.spotify.reaper.core;

import java.util.Objects;

import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
Expand Down Expand Up @@ -133,6 +135,23 @@ public int compareTo(RepairRun other) {
return -comparator.compare(startTime, other.startTime);
}
}

@Override
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof RepairRun)) {
return false;
}
RepairRun run = (RepairRun) other;
return this.id == run.id
&& this.repairUnitId == run.repairUnitId;
}

@Override
public int hashCode() {
return Objects.hash(this.id, this.repairUnitId);
}

public enum RunState {
NOT_STARTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ public Response addCluster(
try {
newCluster = createClusterWithSeedHost(seedHost.get());
} catch (java.lang.SecurityException e) {
LOG.error(e.getMessage(), e);
return Response.status(400)
.entity("seed host \"" + seedHost.get() + "\" JMX threw security exception: "
+ e.getMessage()).build();
} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
return Response.status(400)
.entity("failed to create cluster with seed host: " + seedHost.get()).build();
}
Expand All @@ -144,8 +146,7 @@ public Response addCluster(
createdURI = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI();
} catch (Exception e) {
String errMsg = "failed creating target URI for cluster: " + newCluster.getName();
LOG.error(errMsg);
e.printStackTrace();
LOG.error(errMsg, e);
return Response.status(400).entity(errMsg).build();
}

Expand All @@ -160,8 +161,7 @@ public Cluster createClusterWithSeedHost(String seedHost)
clusterName = jmxProxy.getClusterName();
partitioner = jmxProxy.getPartitioner();
} catch (ReaperException e) {
LOG.error("failed to create cluster with seed host: " + seedHost);
e.printStackTrace();
LOG.error("failed to create cluster with seed host: {}", seedHost, e);
throw e;
}
return new Cluster(clusterName, partitioner, Collections.singleton(seedHost));
Expand Down
Loading