Skip to content

Commit

Permalink
Fix NPE issue on unreachable nodes by using optionals
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Jun 22, 2017
1 parent b2b8b80 commit ea3b853
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,19 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
for (String hostName : allHosts) {
LOG.debug("checking host '{}' for pending compactions and other repairs (can repair?)"
+ " Run id '{}'", hostName, segment.getRunId());
JmxProxy hostProxy = null;
Optional<JmxProxy> hostProxy = Optional.absent();
try{
Optional<HostMetrics> hostMetrics = Optional.absent();
try{
hostProxy = context.jmxConnectionFactory.connect(hostName);
hostProxy = Optional.fromNullable(context.jmxConnectionFactory.connect(hostName));
connected = true;
hostMetrics = getMetricsForHost(hostName, hostProxy);
}
catch(Exception e) {
LOG.debug("Couldn't reach host {} through JMX. Trying to collect metrics from storage...");
LOG.debug("Couldn't reach host {} through JMX. Trying to collect metrics from storage...", hostName);
}

hostMetrics = getMetricsForHost(hostName, hostProxy);

if(!hostMetrics.isPresent()) {
gotMetricsForAllHosts = false;
closeJmxConnection(hostProxy, connected);
Expand Down Expand Up @@ -396,43 +397,43 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
return gotMetricsForAllHosts; // check if we should postpone when we cannot get all metrics, or just drop the lead
}

private void closeJmxConnection(JmxProxy jmxProxy, boolean connected) {
if(connected)
private void closeJmxConnection(Optional<JmxProxy> jmxProxy, boolean connected) {
if(connected && jmxProxy.isPresent())
try {
jmxProxy.close();
jmxProxy.get().close();
} catch (ReaperException e) {
LOG.warn("Could not close JMX connection to {}. Potential leak...", jmxProxy.getHost());
LOG.warn("Could not close JMX connection to {}. Potential leak...", jmxProxy.get().getHost());
}
}

private void handlePotentialStuckRepairs(JmxProxy hostProxy, LazyInitializer<Set<String>> busyHosts, String hostName) throws ConcurrentException {
if (!busyHosts.get().contains(hostName) && context.storage.getStorageType() != StorageType.CASSANDRA) {
private void handlePotentialStuckRepairs(Optional<JmxProxy> hostProxy, LazyInitializer<Set<String>> busyHosts, String hostName) throws ConcurrentException {
if (!busyHosts.get().contains(hostName) && context.storage.getStorageType() != StorageType.CASSANDRA && hostProxy.isPresent()) {
LOG.warn("A host ({}) reported that it is involved in a repair, but there is no record "
+ "of any ongoing repair involving the host. Sending command to abort all repairs "
+ "on the host.", hostProxy.getHost());
hostProxy.cancelAllRepairs();
+ "on the host.", hostProxy.get().getHost());
hostProxy.get().cancelAllRepairs();
}
}

private Optional<HostMetrics> getMetricsForHost(String hostName, JmxProxy hostProxy) {
try {
int pendingCompactions = hostProxy.getPendingCompactions();
boolean hasRepairRunning = hostProxy.isRepairRunning();

HostMetrics metrics = HostMetrics.builder().withHostAddress(hostName)
.withPendingCompactions(pendingCompactions)
.withHasRepairRunning(hasRepairRunning)
.withActiveAnticompactions(0) // for future use
.build();

context.storage.storeHostMetrics(metrics);

return Optional.fromNullable(metrics);

} catch(Exception e) {
LOG.debug("Cannot reach node {} through JMX. Trying to get metrics from storage...", hostName, e);
return context.storage.getHostMetrics(hostName);
private Optional<HostMetrics> getMetricsForHost(String hostName, Optional<JmxProxy> hostProxy) {
if(hostProxy.isPresent()) {
try {
int pendingCompactions = hostProxy.get().getPendingCompactions();
boolean hasRepairRunning = hostProxy.get().isRepairRunning();

HostMetrics metrics = HostMetrics.builder().withHostAddress(hostName)
.withPendingCompactions(pendingCompactions)
.withHasRepairRunning(hasRepairRunning)
.withActiveAnticompactions(0) // for future use
.build();
context.storage.storeHostMetrics(metrics);
return Optional.fromNullable(metrics);
} catch(Exception e) {
LOG.debug("Cannot reach node {} through JMX. Trying to get metrics from storage...", hostName, e);
}
}

return context.storage.getHostMetrics(hostName);
}

private boolean IsRepairRunningOnOneNode(RepairSegment segment) {
Expand Down

0 comments on commit ea3b853

Please sign in to comment.