diff --git a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java b/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java index 6ef115658..db1266372 100644 --- a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java +++ b/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java @@ -13,12 +13,15 @@ */ package com.spotify.reaper.service; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; +import javax.annotation.Nullable; import java.util.Collection; public class JmxConnectionFactory { @@ -36,4 +39,13 @@ public final JmxProxy connectAny(Optional handler, Collecti throws ReaperException { return create(handler, hosts.iterator().next()); } + + public final Collection connectAll(Collection hosts) + throws ReaperException { + Collection connections = Lists.newArrayList(); + for (String host : hosts) { + connections.add(create(host)); + } + return connections; + } } diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index dcab2d595..fc3411044 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -76,25 +76,25 @@ private SegmentRunner(IStorage storage, long segmentId) { private void runRepair(Collection potentialCoordinators, JmxConnectionFactory jmxConnectionFactory, long timeoutMillis) { final RepairSegment segment = storage.getRepairSegment(segmentId); - try (JmxProxy jmxConnection = jmxConnectionFactory + try (JmxProxy coordinator = jmxConnectionFactory .connectAny(Optional.of(this), potentialCoordinators)) { ColumnFamily columnFamily = storage.getColumnFamily(segment.getColumnFamilyId()); String keyspace = columnFamily.getKeyspaceName(); - if (!canRepair(jmxConnection, segment)) { + if (!canRepair(segment, keyspace, coordinator, jmxConnectionFactory)) { postpone(segment); return; } synchronized (condition) { - commandId = jmxConnection + commandId = coordinator .triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace, columnFamily.getName()); LOG.debug("Triggered repair with command id {}", commandId); storage.updateRepairSegment(segment.with() .state(RepairSegment.State.RUNNING) - .coordinatorHost(jmxConnection.getHost()) + .coordinatorHost(coordinator.getHost()) .repairCommandId(commandId) .build(segmentId)); LOG.info("Repair for segment {} started", segmentId); @@ -110,7 +110,7 @@ private void runRepair(Collection potentialCoordinators, if (resultingSegment.getState().equals(RepairSegment.State.RUNNING)) { LOG.info("Repair command {} on segment {} has been cancelled while running", commandId, segmentId); - abort(resultingSegment, jmxConnection); + abort(resultingSegment, coordinator); } } } @@ -120,16 +120,17 @@ private void runRepair(Collection potentialCoordinators, } } - boolean canRepair(JmxProxy jmx, RepairSegment segment) { - if (segment.getState().equals(RepairSegment.State.RUNNING)) { - LOG.error("Repair segment {} was already marked as started when SegmentRunner was " - + "asked to trigger repair", segmentId); - return false; - } - if (jmx.getPendingCompactions() > MAX_PENDING_COMPACTIONS) { - LOG.warn("SegmentRunner declined to repair segment {} because of too many pending " - + "compactions (> {})", segmentId, MAX_PENDING_COMPACTIONS); - return false; + boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, + JmxConnectionFactory factory) throws ReaperException { + Collection allHosts = factory.connectAll( + coordinator.tokenRangeToEndpoint(keyspace, segment.getTokenRange())); + for (JmxProxy host : allHosts) { + if (host.getPendingCompactions() > MAX_PENDING_COMPACTIONS) { + LOG.warn("SegmentRunner declined to repair segment {} because of too many pending " + + "compactions (> {}) on host \"{}\"", segmentId, MAX_PENDING_COMPACTIONS, + host.getHost()); + return false; + } } return true; }