Skip to content

Commit

Permalink
Fix number of parallel repair computation
Browse files Browse the repository at this point in the history
Add local caching for repair segments
Downgrade to Dropwizard 1.0.7 and Guava 19.0 to fix dependency issues
Make repair manager schedule cycle configurable (was 30s hardcoded)
  • Loading branch information
adejanovski committed May 26, 2017
1 parent f7682f2 commit 770555f
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dropwizard.version>1.1.0</dropwizard.version>
<dropwizard.version>1.0.7</dropwizard.version>
<dropwizard.cassandra.version>4.1.0</dropwizard.cassandra.version>
<cassandra.version>2.2.7</cassandra.version>
<cucumber.version>1.2.5</cucumber.version>
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra-ssl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7198
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-h2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-memory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void run(ReaperApplicationConfiguration config,
context.repairManager.initializeThreadPool(
config.getRepairRunThreadCount(),
config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES,
30, TimeUnit.SECONDS);
config.getRepairManagerSchedulingIntervalSeconds(), TimeUnit.SECONDS);

if (context.storage == null) {
LOG.info("initializing storage of type: {}", config.getStorageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class ReaperApplicationConfiguration extends Configuration {
@JsonProperty
@DefaultValue("true")
private Boolean enableDynamicSeedList;

@JsonProperty
private Integer repairManagerSchedulingIntervalSeconds;

public int getSegmentCount() {
return segmentCount;
Expand Down Expand Up @@ -170,13 +173,13 @@ public void setDataSourceFactory(DataSourceFactory database) {
this.database = database;
}

public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
public int getRepairManagerSchedulingIntervalSeconds() {
return this.repairManagerSchedulingIntervalSeconds==null?30:this.repairManagerSchedulingIntervalSeconds;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
public void setRepairManagerSchedulingIntervalSeconds(int repairManagerSchedulingIntervalSeconds) {
this.repairManagerSchedulingIntervalSeconds = repairManagerSchedulingIntervalSeconds;
}

public Map<String, Integer> getJmxPorts() {
Expand Down Expand Up @@ -261,6 +264,15 @@ public void setAllowUnreachableNodes(Boolean allow) {
this.allowUnreachableNodes = allow;
}

public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
}

public static class AutoSchedulingConfiguration {

@JsonProperty
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange)
return Lists.newArrayList();
}

/**
* @return all hosts in the ring with their host id
*/
@NotNull
public Map<String, String> getEndpointToHostId() {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
Map<String, String> hosts =
((StorageServiceMBean) ssProxy).getEndpointToHostId();

return hosts;
}

/**
* @return full class name of Cassandra's partitioner.
*/
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public RepairRunner(AppContext context, long repairRunId)
JmxProxy jmx = this.context.jmxConnectionFactory.connectAny(cluster.get());

String keyspace = repairUnitOpt.get().getKeyspaceName();
int parallelRepairs = getPossibleParallelRepairsCount(jmx.getRangeToEndpointMap(keyspace));
int parallelRepairs = getPossibleParallelRepairsCount(jmx.getRangeToEndpointMap(keyspace), jmx.getEndpointToHostId());
if(repairUnitOpt.isPresent() && repairUnitOpt.get().getIncrementalRepair()) {
// with incremental repair, can't have more parallel repairs than nodes
parallelRepairs = 1;
Expand All @@ -96,14 +96,15 @@ public long getRepairRunId() {
}

@VisibleForTesting
public static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> ranges)
public static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> ranges, Map<String, String> hostsInRing)
throws ReaperException {
if (ranges.isEmpty()) {
String msg = "Repairing 0-sized cluster.";
LOG.error(msg);
throw new ReaperException(msg);
}
return ranges.size() / ranges.values().iterator().next().size();

return Math.min(ranges.size() / ranges.values().iterator().next().size(), Math.max(1, hostsInRing.keySet().size()/ranges.values().iterator().next().size()));
}

@VisibleForTesting
Expand Down
70 changes: 63 additions & 7 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -26,6 +27,8 @@
import com.datastax.driver.core.Session;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -61,6 +64,11 @@ public class CassandraStorage implements IStorage {
/** simple cache of repair_id.
* not accurate, only provides a floor value to shortcut looking for next appropriate id */
private final ConcurrentMap<String,Long> repairIds = new ConcurrentHashMap<>();

private static final Cache<Long, RepairSegment> segmentCache = CacheBuilder.newBuilder()
.maximumSize(5000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build();

/* Simple statements */
private final String getClustersStmt = "SELECT * FROM cluster";
Expand Down Expand Up @@ -371,6 +379,7 @@ public void addRepairSegments(Collection<com.spotify.reaper.core.RepairSegment.B
for(com.spotify.reaper.core.RepairSegment.Builder builder:newSegments){
RepairSegment segment = builder.build(getNewRepairId("repair_segment"));
insertFutures.add(session.executeAsync(insertRepairSegmentPrepStmt.bind(segment.getId(), segment.getRepairUnitId(), segment.getRunId(), segment.getStartToken(), segment.getEndToken(), segment.getState().ordinal(), segment.getCoordinatorHost(), segment.getStartTime(), segment.getEndTime(), segment.getFailCount())));
saveSegmentToCache(Optional.of(segment));
batch.add(insertRepairSegmentByRunPrepStmt.bind(segment.getRunId(), segment.getId()));
if(insertFutures.size()%100==0){
// cluster ddos protection
Expand Down Expand Up @@ -426,7 +435,7 @@ public Collection<RepairSegment> getRepairSegmentsForRun(long runId) {
// Then get segments by id
segmentsFuture.add(session.executeAsync(getRepairSegmentPrepStmt.bind(segmentIdResult.getLong("segment_id"))));
i++;
if(i%100==0 || segmentsIdResultSet.isFullyFetched()) {
if(i%100==0 || segmentsIdResultSet.isExhausted()) {
segments.addAll(fetchRepairSegmentFromFutures(segmentsFuture));
segmentsFuture = Lists.newArrayList();
}
Expand All @@ -435,38 +444,75 @@ public Collection<RepairSegment> getRepairSegmentsForRun(long runId) {
return segments;
}

public Collection<RepairSegment> getRepairSegmentsForRunWithCache(long runId) {
List<ResultSetFuture> segmentsFuture = Lists.newArrayList();
Collection<RepairSegment> segments = Lists.newArrayList();

// First gather segments ids
ResultSet segmentsIdResultSet = session.execute(getRepairSegmentByRunIdPrepStmt.bind(runId));
int i=0;
for(Row segmentIdResult:segmentsIdResultSet) {
Optional<RepairSegment> segment = Optional.fromNullable(segmentCache.getIfPresent(segmentIdResult.getLong("segment_id")));

if(segment.isPresent()) {
segments.add(segment.get());
} else {
// Then get segments by id
segmentsFuture.add(session.executeAsync(getRepairSegmentPrepStmt.bind(segmentIdResult.getLong("segment_id"))));
i++;
if(i%100==0 || segmentsIdResultSet.isExhausted()) {
segments.addAll(fetchRepairSegmentFromFutures(segmentsFuture));
segmentsFuture = Lists.newArrayList();
}
}
}

return segments;
}

private Collection<RepairSegment> fetchRepairSegmentFromFutures(List<ResultSetFuture> segmentsFuture){
Collection<RepairSegment> segments = Lists.newArrayList();

for(ResultSetFuture segmentResult:segmentsFuture) {
Row segmentRow = segmentResult.getUninterruptibly().one();
if(segmentRow!=null){
segments.add(createRepairSegmentFromRow(segmentRow));
RepairSegment segment = createRepairSegmentFromRow(segmentRow);
segments.add(segment);
saveSegmentToCache(Optional.of(segment));
}
}

return segments;

}

private boolean segmentIsWithinRange(RepairSegment segment, RingRange range) {
return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken()));

}

private RepairSegment createRepairSegmentFromRow(Row segmentRow){
return createRepairSegmentFromRow(segmentRow, segmentRow.getLong("id"));
}

private RepairSegment createRepairSegmentFromRow(Row segmentRow, long segmentId){
return new RepairSegment.Builder(segmentRow.getLong("run_id"), new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")), segmentRow.getLong("repair_unit_id"))
RepairSegment segment = new RepairSegment.Builder(segmentRow.getLong("run_id"), new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")), segmentRow.getLong("repair_unit_id"))
.coordinatorHost(segmentRow.getString("coordinator_host"))
.endTime(new DateTime(segmentRow.getTimestamp("end_time")))
.failCount(segmentRow.getInt("fail_count"))
.startTime(new DateTime(segmentRow.getTimestamp("start_time")))
.state(State.values()[segmentRow.getInt("state")])
.build(segmentRow.getLong("id"));

saveSegmentToCache(Optional.of(segment));
return segment;
}


public Optional<RepairSegment> getSegment(long runId, Optional<RingRange> range){
RepairSegment segment = null;
List<RepairSegment> segments = Lists.<RepairSegment>newArrayList();
segments.addAll(getRepairSegmentsForRun(runId));
segments.addAll(getRepairSegmentsForRunWithCache(runId));

// Sort segments by fail count and start token (in order to try those who haven't failed first, in start token order)
Collections.sort( segments, new Comparator<RepairSegment>(){
Expand All @@ -481,15 +527,25 @@ public int compare(RepairSegment seg1, RepairSegment seg2) {
for(RepairSegment seg:segments){
if(seg.getState().equals(State.NOT_STARTED) // State condition
&& ((range.isPresent() &&
(range.get().getStart().compareTo(seg.getStartToken())>=0 || range.get().getEnd().compareTo(seg.getEndToken())<=0)
(segmentIsWithinRange(seg, range.get()))
) || !range.isPresent()) // Token range condition
){
segment = seg;
break;
Optional<RepairSegment> upToDateSegment = getRepairSegment(seg.getId());
saveSegmentToCache(upToDateSegment);
if(upToDateSegment.isPresent() && upToDateSegment.get().getState().equals(State.NOT_STARTED)) {
segment = seg;
break;
}
}
}
return Optional.fromNullable(segment);
}

private void saveSegmentToCache(Optional<RepairSegment> segment) {
if(segment.isPresent()) {
segmentCache.put(segment.get().getId(), segment.get());
}
}

@Override
public Optional<RepairSegment> getNextFreeSegment(long runId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,12 @@ public void run() {
@Test
public void getPossibleParallelRepairsTest() throws Exception {
Map<List<String>, List<String>> map = RepairRunnerTest.threeNodeCluster();
assertEquals(1, RepairRunner.getPossibleParallelRepairsCount(map));
Map<String, String> endpointsThreeNodes = RepairRunnerTest.threeNodeClusterEndpoint();
assertEquals(1, RepairRunner.getPossibleParallelRepairsCount(map, endpointsThreeNodes));

map = RepairRunnerTest.sixNodeCluster();
assertEquals(2, RepairRunner.getPossibleParallelRepairsCount(map));
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
assertEquals(2, RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes));
}

@Test
Expand All @@ -384,8 +386,37 @@ public BigInteger apply(String s) {
List<RingRange> segments = generator.generateSegments(32, tokens, Boolean.FALSE);

Map<List<String>, List<String>> map = RepairRunnerTest.sixNodeCluster();
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
List<RingRange> ranges = RepairRunner.getParallelRanges(
RepairRunner.getPossibleParallelRepairsCount(map),
RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes),
segments
);
assertEquals(2, ranges.size());
assertEquals( "0", ranges.get(0).getStart().toString());
assertEquals("150", ranges.get(0).getEnd().toString());
assertEquals("150", ranges.get(1).getStart().toString());
assertEquals( "0", ranges.get(1).getEnd().toString());
}

@Test
public void getParallelSegmentsTest2() throws ReaperException {
List<BigInteger> tokens = Lists.transform(
Lists.newArrayList("0", "25", "50", "75", "100", "125", "150", "175", "200", "225", "250"),
new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(String s) {
return new BigInteger(s);
}
}
);
SegmentGenerator generator = new SegmentGenerator(new BigInteger("0"), new BigInteger("299"));
List<RingRange> segments = generator.generateSegments(32, tokens, Boolean.FALSE);

Map<List<String>, List<String>> map = RepairRunnerTest.sixNodeCluster();
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
List<RingRange> ranges = RepairRunner.getParallelRanges(
RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes),
segments
);
assertEquals(2, ranges.size());
Expand Down Expand Up @@ -413,6 +444,26 @@ public static Map<List<String>, List<String>> sixNodeCluster() {
map = addRangeToMap(map, "250", "0", "a6", "a1", "a2");
return map;
}

public static Map<String, String> threeNodeClusterEndpoint() {
Map<String, String> map = Maps.newHashMap();
map.put("host1", "hostId1");
map.put("host2", "hostId2");
map.put("host3", "hostId3");
return map;
}

public static Map<String, String> sixNodeClusterEndpoint() {
Map<String, String> map = Maps.newHashMap();
map.put("host1", "hostId1");
map.put("host2", "hostId2");
map.put("host3", "hostId3");
map.put("host4", "hostId4");
map.put("host5", "hostId5");
map.put("host6", "hostId6");
return map;
}


private static Map<List<String>, List<String>> addRangeToMap(Map<List<String>, List<String>> map,
String rStart, String rEnd, String... hosts) {
Expand Down

0 comments on commit 770555f

Please sign in to comment.