Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ft-reaper-improvements-final small npe fix #111

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,17 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
for (String hostName : allHosts) {
LOG.debug("checking host '{}' for pending compactions and other repairs (can repair?)"
+ " Run id '{}'", hostName, segment.getRunId());
JmxProxy hostProxy = null;
Optional<JmxProxy> hostProxy = Optional.absent();
try{
Optional<HostMetrics> hostMetrics;
Optional<HostMetrics> hostMetrics = Optional.absent();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR Remove this useless assignment to local variable "hostMetrics". rule

try{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR Extract this nested try block into a separate method. rule

hostProxy = context.jmxConnectionFactory.connect(hostName);
hostProxy = Optional.fromNullable(context.jmxConnectionFactory.connect(hostName));
connected = true;
}
catch(Exception e) {
LOG.debug("Couldn't reach host {} through JMX. Trying to collect metrics from storage...");
LOG.debug("Couldn't reach host {} through JMX. Trying to collect metrics from storage...", hostName);
}

hostMetrics = getMetricsForHost(hostName, hostProxy);

if(!hostMetrics.isPresent()) {
Expand Down Expand Up @@ -396,43 +397,44 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
return gotMetricsForAllHosts; // check if we should postpone when we cannot get all metrics, or just drop the lead
}

private void closeJmxConnection(JmxProxy jmxProxy, boolean connected) {
if(connected)
private void closeJmxConnection(Optional<JmxProxy> jmxProxy, boolean connected) {
if(connected && jmxProxy.isPresent())
try {
jmxProxy.close();
} catch (Exception e) {
LOG.debug("Could not close JMX connection to {}. Potential leak...", jmxProxy.getHost());
jmxProxy.get().close();
} catch (ReaperException e) {
LOG.warn("Could not close JMX connection to {}. Potential leak...", jmxProxy.get().getHost());

}
}

private void handlePotentialStuckRepairs(JmxProxy hostProxy, LazyInitializer<Set<String>> busyHosts, String hostName) throws ConcurrentException {
if (!busyHosts.get().contains(hostName) && context.storage.getStorageType() != StorageType.CASSANDRA) {
private void handlePotentialStuckRepairs(Optional<JmxProxy> hostProxy, LazyInitializer<Set<String>> busyHosts, String hostName) throws ConcurrentException {
if (!busyHosts.get().contains(hostName) && context.storage.getStorageType() != StorageType.CASSANDRA && hostProxy.isPresent()) {
LOG.warn("A host ({}) reported that it is involved in a repair, but there is no record "
+ "of any ongoing repair involving the host. Sending command to abort all repairs "
+ "on the host.", hostProxy.getHost());
hostProxy.cancelAllRepairs();
+ "on the host.", hostProxy.get().getHost());
hostProxy.get().cancelAllRepairs();
}
}

private Optional<HostMetrics> getMetricsForHost(String hostName, JmxProxy hostProxy) {
try {
int pendingCompactions = hostProxy.getPendingCompactions();
boolean hasRepairRunning = hostProxy.isRepairRunning();

HostMetrics metrics = HostMetrics.builder().withHostAddress(hostName)
.withPendingCompactions(pendingCompactions)
.withHasRepairRunning(hasRepairRunning)
.withActiveAnticompactions(0) // for future use
.build();

context.storage.storeHostMetrics(metrics);

return Optional.fromNullable(metrics);

} catch(Exception e) {
LOG.debug("Cannot reach node {} through JMX. Trying to get metrics from storage...", hostName, e);
return context.storage.getHostMetrics(hostName);
private Optional<HostMetrics> getMetricsForHost(String hostName, Optional<JmxProxy> hostProxy) {
if(hostProxy.isPresent()) {
try {
int pendingCompactions = hostProxy.get().getPendingCompactions();
boolean hasRepairRunning = hostProxy.get().isRepairRunning();

HostMetrics metrics = HostMetrics.builder().withHostAddress(hostName)
.withPendingCompactions(pendingCompactions)
.withHasRepairRunning(hasRepairRunning)
.withActiveAnticompactions(0) // for future use
.build();
context.storage.storeHostMetrics(metrics);
return Optional.fromNullable(metrics);
} catch(Exception e) {
LOG.debug("Cannot reach node {} through JMX. Trying to get metrics from storage...", hostName, e);
}
}

return context.storage.getHostMetrics(hostName);
}

private boolean IsRepairRunningOnOneNode(RepairSegment segment) {
Expand Down