Skip to content

Commit

Permalink
Merge pull request #9 from thelastpickle/allow-different-inc-repair-s…
Browse files Browse the repository at this point in the history
…ettings

Fix for issue #8 (problem starting incremental repair when full repair was executed earlier or vice-versa)
  • Loading branch information
zznate authored Oct 27, 2016
2 parents 16d7759 + 41ef23d commit faa1b26
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<name>Cassandra Reaper</name>
<groupId>com.spotify</groupId>
<artifactId>cassandra-reaper</artifactId>
<version>0.3.0-SNAPSHOT</version>
<version>0.3.1-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
for (String host : seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
List<BigInteger> tokens = jmxProxy.getTokens();
segments = sg.generateSegments(segmentCount, tokens);
segments = sg.generateSegments(segmentCount, tokens, incrementalRepair);
break;
} catch (ReaperException e) {
LOG.warn("couldn't connect to host: {}, will try next one", host);
Expand Down Expand Up @@ -315,7 +315,7 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster
throw new ReaperException(errMsg);
}

if (storedRepairUnit.isPresent()) {
if (storedRepairUnit.isPresent() && storedRepairUnit.get().getIncrementalRepair().equals(incrementalRepair)) {
LOG.info("use existing repair unit for cluster '{}', keyspace '{}', and column families: {}",
cluster.getName(), keyspace, tableNames);
theRepairUnit = storedRepairUnit.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public Response addRepairRun(
RepairUnit theRepairUnit =
CommonTools.getNewOrExistingRepairUnit(context, cluster, keyspace.get(), tableNames, incrementalRepair);

if(theRepairUnit.getIncrementalRepair() != incrementalRepair) {
if (theRepairUnit.getIncrementalRepair() != incrementalRepair) {
return Response.status(Response.Status.BAD_REQUEST).entity(
"A repair run already exist for the same cluster/keyspace/table but with a different incremental repair value."
+ "Requested value: " + incrementalRepair + " | Existing value: " + theRepairUnit.getIncrementalRepair()).build();
Expand All @@ -161,12 +161,10 @@ public Response addRepairRun(
parallelism = RepairParallelism.valueOf(repairParallelism.get().toUpperCase());
}

if(!parallelism.equals(RepairParallelism.PARALLEL) && incrementalRepair) {
return Response.status(Response.Status.BAD_REQUEST).entity(
"It is not possible to mix sequential repair and incremental repairs. parallelism "
+ parallelism + " : incrementalRepair " + incrementalRepair).build();
if (incrementalRepair) {
parallelism = RepairParallelism.PARALLEL;
}

RepairRun newRepairRun = CommonTools.registerRepairRun(
context, cluster, theRepairUnit, cause, owner.get(), segments,
parallelism, intensity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ public static boolean greaterThanOrEqual(BigInteger a, BigInteger b) {
* @param totalSegmentCount requested total amount of repair segments. This function may generate
* more segments.
* @param ringTokens list of all start tokens in a cluster. They have to be in ring order.
* @param incrementalRepair
* @return a list containing at least {@code totalSegmentCount} repair segments.
*/
public List<RingRange> generateSegments(int totalSegmentCount, List<BigInteger> ringTokens)
public List<RingRange> generateSegments(int totalSegmentCount, List<BigInteger> ringTokens, Boolean incrementalRepair)
throws ReaperException {
int tokenRangeCount = ringTokens.size();

Expand Down Expand Up @@ -156,7 +157,7 @@ public List<RingRange> generateSegments(int totalSegmentCount, List<BigInteger>
BigInteger size = segment.span(RANGE_SIZE);
total = total.add(size);
}
if (!total.equals(RANGE_SIZE)) {
if (!total.equals(RANGE_SIZE) && !incrementalRepair) {
throw new ReaperException("Not entire ring would get repaired");
}
return repairSegments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Optional<RepairRun> deleteRepairRun(long id) {
public RepairUnit addRepairUnit(RepairUnit.Builder repairUnit) {
Optional<RepairUnit> existing =
getRepairUnit(repairUnit.clusterName, repairUnit.keyspaceName, repairUnit.columnFamilies);
if (existing.isPresent()) {
if (existing.isPresent() && repairUnit.incrementalRepair == existing.get().getIncrementalRepair().booleanValue()) {
return existing.get();
} else {
RepairUnit newRepairUnit = repairUnit.build(REPAIR_UNIT_ID.incrementAndGet());
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/com/spotify/reaper/acceptance/BasicSteps.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,22 @@ public void a_new_repair_is_added_for_and_keyspace(String clusterName, String ke
RepairRunStatus run = SimpleReaperClient.parseRepairRunStatusJSON(responseData);
TestContext.LAST_MODIFIED_ID = run.getId();
}

@When("^a new incremental repair is added for \"([^\"]*)\" and keyspace \"([^\"]*)\"$")
public void a_new_incremental_repair_is_added_for_and_keyspace(String clusterName, String keyspace)
throws Throwable {
Map<String, String> params = Maps.newHashMap();
params.put("clusterName", clusterName);
params.put("keyspace", keyspace);
params.put("owner", TestContext.TEST_USER);
params.put("incrementalRepair", Boolean.TRUE.toString());
ClientResponse response =
ReaperTestJettyRunner.callReaper("POST", "/repair_run", Optional.of(params));
assertEquals(Response.Status.CREATED.getStatusCode(), response.getStatus());
String responseData = response.getEntity(String.class);
RepairRunStatus run = SimpleReaperClient.parseRepairRunStatusJSON(responseData);
TestContext.LAST_MODIFIED_ID = run.getId();
}

@Then("^reaper has (\\d+) repairs for cluster called \"([^\"]*)\"$")
public void reaper_has_repairs_for_cluster_called(int runAmount, String clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public BigInteger apply(String s) {
}
);
SegmentGenerator generator = new SegmentGenerator(new BigInteger("0"), new BigInteger("299"));
List<RingRange> segments = generator.generateSegments(32, tokens);
List<RingRange> segments = generator.generateSegments(32, tokens, Boolean.FALSE);

Map<List<String>, List<String>> map = RepairRunnerTest.sixNodeCluster();
List<RingRange> ranges = RepairRunner.getParallelRanges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public BigInteger apply(String s) {
);

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
List<RingRange> segments = generator.generateSegments(10, tokens);
List<RingRange> segments = generator.generateSegments(10, tokens, Boolean.FALSE);
assertEquals(15, segments.size());
assertEquals("(0,1]",
segments.get(0).toString());
Expand All @@ -74,7 +74,7 @@ public BigInteger apply(String s) {
}
);

segments = generator.generateSegments(10, tokens);
segments = generator.generateSegments(10, tokens, Boolean.FALSE);
assertEquals(15, segments.size());
assertEquals("(5,6]",
segments.get(0).toString());
Expand All @@ -100,7 +100,7 @@ public BigInteger apply(String s) {
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
generator.generateSegments(10, tokens);
generator.generateSegments(10, tokens, Boolean.FALSE);
}

@Test
Expand All @@ -118,7 +118,7 @@ public BigInteger apply(String s) {
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
List<RingRange> segments = generator.generateSegments(10, tokens);
List<RingRange> segments = generator.generateSegments(10, tokens, Boolean.FALSE);
assertEquals(15, segments.size());
assertEquals(
"(113427455640312821154458202477256070484,113427455640312821154458202477256070485]",
Expand All @@ -144,7 +144,7 @@ public BigInteger apply(String s) {
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
generator.generateSegments(10, tokens);
generator.generateSegments(10, tokens, Boolean.FALSE);
// Will throw an exception when concluding that the repair segments don't add up.
// This is because the tokens were supplied out of order.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ Feature: Using Reaper to launch repairs
When the last added repair run is deleted for cluster called "other_cluster"
And cluster called "other_cluster" is deleted
Then reaper has no cluster called "other_cluster" in storage

Scenario: Create a cluster and one incremental repair run and one full repair run on the same keyspace
Given reaper has no cluster with name "other_cluster" in storage
And that we are going to use "127.0.0.2" as cluster seed host
When an add-cluster request is made to reaper
Then reaper has a cluster called "other_cluster" in storage
And reaper has 0 scheduled repairs for cluster called "other_cluster"
When a new incremental repair is added for "other_cluster" and keyspace "system"
And a new repair is added for "other_cluster" and keyspace "system"
Then reaper has 2 repairs for cluster called "other_cluster"

0 comments on commit faa1b26

Please sign in to comment.