Skip to content

Commit

Permalink
Merge pull request #8 from spotify/bj0rn/repairCoordinator
Browse files Browse the repository at this point in the history
Connect to valid coordinator when triggering repair
  • Loading branch information
varjoranta committed Dec 16, 2014
2 parents adff8a4 + cb3cb09 commit ac6df77
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 11 deletions.
21 changes: 19 additions & 2 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Lists;

import com.spotify.reaper.ReaperException;
import com.spotify.reaper.service.RingRange;

import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageServiceMBean;
Expand All @@ -30,6 +31,7 @@
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;
import javax.management.InstanceNotFoundException;
Expand Down Expand Up @@ -132,14 +134,29 @@ public List<BigInteger> getTokens() {
return Lists.transform(
Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet()),
new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
public BigInteger apply(String s) {
return new BigInteger(s);
}
});
}

/**
* @return all hosts owning a range of tokens
*/
@Nullable
public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
for (Map.Entry<List<String>, List<String>> entry : ssProxy.getRangeToEndpointMap(keyspace)
.entrySet()) {
if (new RingRange(new BigInteger(entry.getKey().get(0)),
new BigInteger(entry.getKey().get(1))).encloses(tokenRange)) {
return entry.getValue();
}
}
return null;
}

/**
* @return full class name of Cassandra's partitioner.
*/
Expand Down
22 changes: 19 additions & 3 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -183,7 +184,12 @@ private void checkIfNeedToStartNextSegment() {
DateTime.now().isAfter(startNextSegmentEarliest)) {
LOG.info("triggering repair on segment #{} with token range {} on run id {}",
currentSegment.getId(), currentSegment.getTokenRange(), repairRun.getId());
newRepairCommandId = triggerRepair(currentSegment);
try {
newRepairCommandId = triggerRepair(currentSegment);
} catch (ReaperException e) {
LOG.error("failed triggering repair on segment #{} with token range {} on run id {}",
currentSegment.getId(), currentSegment.getTokenRange(), repairRun.getId());
}
} else if (currentSegment.getState() == RepairSegment.State.DONE) {
LOG.warn("segment {} repair completed for run {}",
currentSegment.getId(), repairRun.getId());
Expand Down Expand Up @@ -221,9 +227,19 @@ private void checkIfNeedToStartNextSegment() {
}
}

private int triggerRepair(RepairSegment segment) {
private int triggerRepair(RepairSegment segment) throws ReaperException {
ColumnFamily columnFamily = this.storage.getColumnFamily(segment.getColumnFamilyId());
return this.jmxProxy

// Make sure that we connect to a node that can act as coordinator for this repair.
List<String>
potentialCoordinators =
jmxProxy.tokenRangeToEndpoint(columnFamily.getKeyspaceName(), segment.getTokenRange());

jmxProxy.close();
// TODO: What if the coordinator doesn't use the default JMX port?
jmxProxy =
JmxProxy.connect(Optional.<RepairStatusHandler>of(this), potentialCoordinators.get(0));
return jmxProxy
.triggerRepair(segment.getTokenRange().getStart(), segment.getTokenRange().getEnd(),
columnFamily.getKeyspaceName(), columnFamily.getName());
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/spotify/reaper/service/RingRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.math.BigInteger;

// TODO: Check if this duplicates org.apache.cassandra.dht.Range.
public class RingRange {
private final BigInteger start;
private final BigInteger end;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.ColumnFamily;
import com.spotify.reaper.core.RepairSegment;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,8 +58,7 @@ public SegmentGenerator(String partitioner) throws ReaperException {
* @param ringTokens list of all start tokens in a cluster. They have to be in ring order.
* @return a list containing at least {@code totalSegmentCount} repair segments.
*/
public List<RingRange> generateSegments(int totalSegmentCount,
List<BigInteger> ringTokens)
public List<RingRange> generateSegments(int totalSegmentCount, List<BigInteger> ringTokens)
throws ReaperException {
int tokenRangeCount = ringTokens.size();

Expand Down Expand Up @@ -119,8 +116,6 @@ public List<RingRange> generateSegments(int totalSegmentCount,
BigInteger total = BigInteger.ZERO;
for (RingRange segment : repairSegments) {
BigInteger size = segment.span(RANGE_SIZE);
if (lowerThan(size, BigInteger.ZERO))
size = size.add(RANGE_SIZE);
total = total.add(size);
}
if (!total.equals(RANGE_SIZE)) {
Expand Down

0 comments on commit ac6df77

Please sign in to comment.