Skip to content

Commit

Permalink
prevent incremental subrange repair to allow proper support for inc r…
Browse files Browse the repository at this point in the history
…epairs
  • Loading branch information
adejanovski committed Jun 29, 2016
1 parent 05b912d commit 42f5f0d
Show file tree
Hide file tree
Showing 22 changed files with 1,291 additions and 97 deletions.
12 changes: 8 additions & 4 deletions resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ repairIntensity: 0.9
scheduleDaysBetween: 7
repairRunThreadCount: 15
hangingRepairTimeoutMins: 30
storageType: memory
storageType: database
enableCrossOrigin: true
incrementalRepair: false
jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
127.0.0.3: 7300

logging:
level: INFO
Expand All @@ -33,6 +37,6 @@ server:

database:
driverClass: org.postgresql.Driver
user: pg-user
password: pg-pass
url: jdbc:postgresql://db.example.com/db-prod
user: postgres
password: postgres
url: jdbc:postgresql://127.0.0.1/reaper
3 changes: 1 addition & 2 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.RepairStatusHandler;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.service.RingRange;

Expand Down Expand Up @@ -366,7 +365,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
columnFamilies.toArray(new String[columnFamilies.size()]));
}
else {
return ssProxy.forceRepairRangeAsync(beginToken.toString(), endToken.toString(), keyspace, RepairParallelism.PARALLEL.ordinal() , null, null, fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
return ssProxy.forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.TRUE, fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
}
}

Expand Down
99 changes: 81 additions & 18 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
package com.spotify.reaper.resources;

import static com.google.common.base.Preconditions.checkNotNull;

import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nullable;

import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.CharMatcher;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
Expand All @@ -18,21 +36,6 @@
import com.spotify.reaper.service.RingRange;
import com.spotify.reaper.service.SegmentGenerator;

import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkNotNull;

public class CommonTools {

private static final Logger LOG = LoggerFactory.getLogger(CommonTools.class);
Expand Down Expand Up @@ -60,8 +63,9 @@ public static RepairRun registerRepairRun(AppContext context, Cluster cluster,
List<RingRange> tokenSegments = generateSegments(context, cluster, segments);
checkNotNull(tokenSegments, "failed generating repair segments");

Map<String, RingRange> nodes = getClusterNodes(context, cluster, repairUnit);
// the next step is to prepare a repair run object
RepairRun repairRun = storeNewRepairRun(context, cluster, repairUnit, cause, owner, segments,
RepairRun repairRun = storeNewRepairRun(context, cluster, repairUnit, cause, owner, nodes.keySet().size(),
repairParallelism, intensity);
checkNotNull(repairRun, "failed preparing repair run");

Expand All @@ -70,7 +74,11 @@ public static RepairRun registerRepairRun(AppContext context, Cluster cluster,
// However, RepairSegment has a pointer to the RepairRun it lives in

// the last preparation step is to generate actual repair segments
storeNewRepairSegments(context, tokenSegments, repairRun, repairUnit);
if(!repairUnit.getIncrementalRepair()) {
storeNewRepairSegments(context, tokenSegments, repairRun, repairUnit);
} else {
storeNewRepairSegmentsForIncrementalRepair(context, nodes, repairRun, repairUnit);
}

// now we're done and can return
return repairRun;
Expand Down Expand Up @@ -163,6 +171,61 @@ private static void storeNewRepairSegments(AppContext context, List<RingRange> t
repairRun.with().segmentCount(tokenSegments.size()).build(repairRun.getId()));
}
}


/**
* Creates the repair runs linked to given RepairRun and stores them directly in the storage
* backend in case of incrementalRepair
*/
private static void storeNewRepairSegmentsForIncrementalRepair(AppContext context, Map<String, RingRange> nodes,
RepairRun repairRun, RepairUnit repairUnit) {
List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (Entry<String, RingRange> range : nodes.entrySet()) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range.getValue(),
repairUnit.getId());
repairSegment.coordinatorHost(range.getKey());
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != nodes.keySet().size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), nodes.keySet().size());
context.storage.updateRepairRun(
repairRun.with().segmentCount(nodes.keySet().size()).build(repairRun.getId()));
}
}

private static Map<String, RingRange> getClusterNodes(AppContext context, Cluster targetCluster, RepairUnit repairUnit) throws ReaperException {
Set<String> nodes = Sets.newHashSet();
ConcurrentHashMap<String, RingRange> nodesWithRanges = new ConcurrentHashMap<String, RingRange>();
Set<String> seedHosts = targetCluster.getSeedHosts();
if (seedHosts.isEmpty()) {
String errMsg = String.format("didn't get any seed hosts for cluster \"%s\"",
targetCluster.getName());
LOG.error(errMsg);
throw new ReaperException(errMsg);
}


Map<List<String>, List<String>> rangeToEndpoint = Maps.newHashMap();
for (String host : seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
rangeToEndpoint = jmxProxy.getRangeToEndpointMap(repairUnit.getKeyspaceName());
break;
} catch (ReaperException e) {
LOG.warn("couldn't connect to host: {}, will try next one", host);
}
}

for(Entry<List<String>, List<String>> tokenRangeToEndpoint:rangeToEndpoint.entrySet()) {
String node = tokenRangeToEndpoint.getValue().get(0);
RingRange range = new RingRange(tokenRangeToEndpoint.getKey().get(0), tokenRangeToEndpoint.getKey().get(1));
RingRange added = nodesWithRanges.putIfAbsent(node, range);
}

return nodesWithRanges;
}


/**
* Instantiates a RepairSchedule and stores it in the storage backend.
Expand Down
53 changes: 31 additions & 22 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,7 @@
*/
package com.spotify.reaper.resources;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperApplication;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;

import org.apache.cassandra.repair.RepairParallelism;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;

import java.net.MalformedURLException;
import java.net.URI;
Expand All @@ -54,7 +38,22 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.cassandra.repair.RepairParallelism;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperApplication;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;

@Path("/repair_run")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -120,12 +119,22 @@ public Response addRepairRun(
LOG.debug("no incremental repair given, so using default value: " + incrementalRepair);
}


int segments = context.config.getSegmentCount();
if (segmentCount.isPresent()) {
LOG.debug("using given segment count {} instead of configured value {}",
segmentCount.get(), context.config.getSegmentCount());
segments = segmentCount.get();
if (!incrementalRepair) {
if (segmentCount.isPresent()) {
LOG.debug("using given segment count {} instead of configured value {}",
segmentCount.get(), context.config.getSegmentCount());
segments = segmentCount.get();
}
} else {
// hijack the segment count in case of incremental repair
// since unit subrange incremental repairs are highly inefficient...
segments = -1;
}




Cluster cluster = context.storage.getCluster(Cluster.toSymbolicName(clusterName.get())).get();
Set<String> tableNames;
Expand Down
25 changes: 12 additions & 13 deletions src/main/java/com/spotify/reaper/service/RepairManager.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package com.spotify.reaper.service;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RepairManager {

private static final Logger LOG = LoggerFactory.getLogger(RepairManager.class);
Expand Down Expand Up @@ -65,8 +64,8 @@ public void resumeRunningRepairRuns(AppContext context) {
SegmentRunner.abort(context, segment, jmxProxy);
} catch (ReaperException e) {
LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down"
+ " (so abortion won't be needed)", segment.getId());
SegmentRunner.postpone(context, segment);
+ " (so abortion won't be needed)", segment.getId());
SegmentRunner.postpone(context, segment, context.storage.getRepairUnit(repairRun.getId()));
}
}
startRepairRun(context, repairRun);
Expand Down
Loading

0 comments on commit 42f5f0d

Please sign in to comment.