Skip to content

Commit

Permalink
Consult all affected nodes about ongoing compactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Jan 23, 2015
1 parent 953d167 commit 757cff2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
12 changes: 12 additions & 0 deletions src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
*/
package com.spotify.reaper.service;

import com.google.common.base.Function;
import com.google.common.base.Optional;

import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.cassandra.RepairStatusHandler;

import javax.annotation.Nullable;
import java.util.Collection;

public class JmxConnectionFactory {
Expand All @@ -36,4 +39,13 @@ public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collecti
throws ReaperException {
return create(handler, hosts.iterator().next());
}

public final Collection<JmxProxy> connectAll(Collection<String> hosts)
throws ReaperException {
Collection<JmxProxy> connections = Lists.newArrayList();
for (String host : hosts) {
connections.add(create(host));
}
return connections;
}
}
31 changes: 16 additions & 15 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,25 @@ private SegmentRunner(IStorage storage, long segmentId) {
private void runRepair(Collection<String> potentialCoordinators,
JmxConnectionFactory jmxConnectionFactory, long timeoutMillis) {
final RepairSegment segment = storage.getRepairSegment(segmentId);
try (JmxProxy jmxConnection = jmxConnectionFactory
try (JmxProxy coordinator = jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators)) {
ColumnFamily columnFamily =
storage.getColumnFamily(segment.getColumnFamilyId());
String keyspace = columnFamily.getKeyspaceName();

if (!canRepair(jmxConnection, segment)) {
if (!canRepair(segment, keyspace, coordinator, jmxConnectionFactory)) {
postpone(segment);
return;
}

synchronized (condition) {
commandId = jmxConnection
commandId = coordinator
.triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace,
columnFamily.getName());
LOG.debug("Triggered repair with command id {}", commandId);
storage.updateRepairSegment(segment.with()
.state(RepairSegment.State.RUNNING)
.coordinatorHost(jmxConnection.getHost())
.coordinatorHost(coordinator.getHost())
.repairCommandId(commandId)
.build(segmentId));
LOG.info("Repair for segment {} started", segmentId);
Expand All @@ -110,7 +110,7 @@ private void runRepair(Collection<String> potentialCoordinators,
if (resultingSegment.getState().equals(RepairSegment.State.RUNNING)) {
LOG.info("Repair command {} on segment {} has been cancelled while running", commandId,
segmentId);
abort(resultingSegment, jmxConnection);
abort(resultingSegment, coordinator);
}
}
}
Expand All @@ -120,16 +120,17 @@ private void runRepair(Collection<String> potentialCoordinators,
}
}

boolean canRepair(JmxProxy jmx, RepairSegment segment) {
if (segment.getState().equals(RepairSegment.State.RUNNING)) {
LOG.error("Repair segment {} was already marked as started when SegmentRunner was "
+ "asked to trigger repair", segmentId);
return false;
}
if (jmx.getPendingCompactions() > MAX_PENDING_COMPACTIONS) {
LOG.warn("SegmentRunner declined to repair segment {} because of too many pending "
+ "compactions (> {})", segmentId, MAX_PENDING_COMPACTIONS);
return false;
boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
JmxConnectionFactory factory) throws ReaperException {
Collection<JmxProxy> allHosts = factory.connectAll(
coordinator.tokenRangeToEndpoint(keyspace, segment.getTokenRange()));
for (JmxProxy host : allHosts) {
if (host.getPendingCompactions() > MAX_PENDING_COMPACTIONS) {
LOG.warn("SegmentRunner declined to repair segment {} because of too many pending "
+ "compactions (> {}) on host \"{}\"", segmentId, MAX_PENDING_COMPACTIONS,
host.getHost());
return false;
}
}
return true;
}
Expand Down

0 comments on commit 757cff2

Please sign in to comment.