Skip to content

Commit

Permalink
In BasicSteps when waiting on a scheduled repair run to start, abort …
Browse files Browse the repository at this point in the history
…any duplicate repair runs that occur (which can in an at-least-once eventually consistent design).
  • Loading branch information
michaelsembwever committed May 21, 2019
1 parent e686c2f commit acddf28
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,17 +379,18 @@ public Response modifyRunState(
}

final RepairRun.RunState newState = parseRunState(stateStr.get());
if (isUnitAlreadyRepairing(repairRun.get())) {
String errMsg = "repair unit already has run " + repairRun.get().getRepairUnitId() + " in RUNNING state";
LOG.error(errMsg);
return Response.status(Status.CONFLICT).entity(errMsg).build();
}

final RunState oldState = repairRun.get().getRunState();
if (oldState == newState) {
String msg = "given \"state\" " + stateStr + " is same as the current run state";
return Response.noContent().entity(msg).location(buildRepairRunUri(uriInfo, repairRun.get())).build();
}
if ((isStarting(oldState, newState) || isResuming(oldState, newState) || isRetrying(oldState, newState))
&& isUnitAlreadyRepairing(repairRun.get())) {

String errMsg = "repair unit already has run " + repairRun.get().getRepairUnitId() + " in RUNNING state";
LOG.error(errMsg);
return Response.status(Status.CONFLICT).entity(errMsg).build();
}

if (isStarting(oldState, newState)) {
return startRun(uriInfo, repairRun.get());
Expand Down
106 changes: 47 additions & 59 deletions src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import cucumber.api.java.en.And;
Expand Down Expand Up @@ -485,28 +486,52 @@ public void a_scheduled_repair_run_has_started_for_cluster(String clusterName) t
await().with().pollInterval(1, SECONDS).atMost(2, MINUTES).until(() -> {

Response resp = runner.callReaper("GET", "/repair_run/cluster/" + TestContext.TEST_CLUSTER, EMPTY_PARAMS);
try {
String responseData = resp.readEntity(String.class);
String responseData = resp.readEntity(String.class);

Assertions
.assertThat(resp.getStatus())
.withFailMessage(responseData)
.isEqualTo(Response.Status.OK.getStatusCode());

List<RepairRunStatus> runs = SimpleReaperClient.parseRepairRunStatusListJSON(responseData)
.stream()
.filter(r -> RepairRun.RunState.RUNNING == r.getState() || RepairRun.RunState.DONE == r.getState())
.collect(Collectors.toList());

Assertions.assertThat(runs).withFailMessage("Zero or multiple running repairs: " + runs.size()).hasSize(1);
Assertions.assertThat(runs.get(0).getCause()).contains(testContext.getCurrentScheduleId().toString());
runningRepairs.add(runs.get(0).getId());
return true;
} catch (AssertionError ex) {
LOG.warn("GET /repair_run/cluster/" + TestContext.TEST_CLUSTER + " failed: " + ex.getMessage());
Assertions
.assertThat(resp.getStatus())
.withFailMessage(responseData)
.isEqualTo(Response.Status.OK.getStatusCode());

List<RepairRunStatus> runs = SimpleReaperClient.parseRepairRunStatusListJSON(responseData)
.stream()
.filter(r -> RepairRun.RunState.RUNNING == r.getState() || RepairRun.RunState.DONE == r.getState())
.filter(r -> r.getCause().contains(testContext.getCurrentScheduleId().toString()))
.collect(Collectors.toList());

if (1 < runs.size()) {
LOG.error("found duplicate repairs from same schedule and trigger time. deleting those behind…");
logResponse(runner, "/repair_run/cluster/" + TestContext.TEST_CLUSTER);

UUID toKeep = runs.stream()
.sorted((r0, r1) -> (r0.getSegmentsRepaired() != r1.getSegmentsRepaired()
? r1.getSegmentsRepaired() - r0.getSegmentsRepaired() : r0.getId().compareTo(r1.getId())))
.findFirst()
.get()
.getId();

Optional<Map<String,String>> deleteParams = Optional.of(ImmutableMap.of("owner", TestContext.TEST_USER));

// pause the other repairs first
runs.stream()
.filter(r -> !r.getId().equals(toKeep))
.forEachOrdered(r -> {
Response res = runner.callReaper("PUT", "/repair_run/" + r.getId() + "/state/PAUSED", EMPTY_PARAMS);
LOG.warn(res.readEntity(String.class));
});

// then delete the other repairs
runs.stream()
.filter(r -> !r.getId().equals(toKeep))
.forEachOrdered(r -> {
Response res = runner.callReaper("DELETE", "/repair_run/" + r.getId(), deleteParams);
LOG.warn(res.readEntity(String.class));
});

return false;
}
runningRepairs.add(runs.get(0).getId());
return true;
});
});
Assertions.assertThat(runningRepairs).hasSize(1);
Expand Down Expand Up @@ -1102,39 +1127,6 @@ public void reaper_has_started_repairs_for_the_last_added_cluster(int expected)
}
}

@When("^the last added repair run is deleted$")
public void the_last_added_repair_run_is_deleted_for_cluster_called() throws Throwable {
synchronized (BasicSteps.class) {
LOG.info("delete last added repair run with id: {}", testContext.getCurrentRepairId());
Map<String, String> params = Maps.newHashMap();
params.put("owner", TestContext.TEST_USER);

callAndExpect(
"DELETE",
"/repair_run/" + testContext.getCurrentRepairId(),
Optional.of(params),
Optional.empty(),
Response.Status.ACCEPTED,
Response.Status.NOT_FOUND,
Response.Status.CONFLICT);

await().with().pollInterval(1, SECONDS).atMost(1, MINUTES).until(() -> {
try {
callAndExpect(
"DELETE",
"/repair_run/" + testContext.getCurrentRepairId(),
Optional.of(params),
Optional.empty(),
Response.Status.NOT_FOUND);
} catch (AssertionError ex) {
LOG.warn("DELETE /repair_run/" + testContext.getCurrentRepairId() + " failed: " + ex.getMessage());
return false;
}
return true;
});
}
}

@When("^all added repair runs are deleted for the last added cluster$")
public void all_added_repair_runs_are_deleted_for_the_last_added_cluster() throws Throwable {
synchronized (BasicSteps.class) {
Expand Down Expand Up @@ -1272,20 +1264,15 @@ public void the_last_added_repair_is_stopped_for() throws Throwable {
private void stopRepairRun(UUID repairRunId) {
// given "state" is same as the current run state
RUNNERS.parallelStream().forEach(runner -> {
Map<String, String> params = Maps.newHashMap();

Response response = runner.callReaper(
"PUT",
"/repair_run/" + repairRunId + "/state/PAUSED",
Optional.of(params));

Response response = runner.callReaper("PUT", "/repair_run/" + repairRunId + "/state/PAUSED", EMPTY_PARAMS);
int status = response.getStatus();
String responseEntity = response.readEntity(String.class);

Assertions.assertThat(
ImmutableList.of(
Response.Status.OK.getStatusCode(),
Response.Status.NO_CONTENT.getStatusCode(),
Response.Status.NOT_FOUND.getStatusCode(),
Response.Status.CONFLICT.getStatusCode()))
.withFailMessage(responseEntity)
.contains(status);
Expand All @@ -1299,9 +1286,10 @@ private void stopRepairRun(UUID repairRunId) {
callAndExpect(
"PUT",
"/repair_run/" + repairRunId + "/state/PAUSED",
Optional.empty(),
EMPTY_PARAMS,
Optional.empty(),
Response.Status.NO_CONTENT,
Response.Status.NOT_FOUND,
Response.Status.CONFLICT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Feature: Using Reaper
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
Then reseting one segment sets its state to not started
And the last added repair run is deleted
And all added repair runs are deleted for the last added cluster
And deleting the last added cluster fails
When all added schedules are deleted for the last added cluster
And the last added cluster is deleted
Expand All @@ -142,7 +142,7 @@ Feature: Using Reaper
And we wait for at least 1 segments to be repaired
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
And the last added repair run is deleted
And all added repair runs are deleted for the last added cluster
And the last added cluster is deleted
Then reaper has no longer the last added cluster in storage
${cucumber.upgrade-versions}
Expand All @@ -164,7 +164,7 @@ Feature: Using Reaper
And we wait for at least 1 segments to be repaired
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
And the last added repair run is deleted
And all added repair runs are deleted for the last added cluster
And the last added cluster is deleted
Then reaper has no longer the last added cluster in storage
${cucumber.upgrade-versions}
Expand All @@ -186,7 +186,7 @@ Feature: Using Reaper
When reaper is upgraded to latest
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
And the last added repair run is deleted
And all added repair runs are deleted for the last added cluster
And the last added cluster is deleted
Then reaper has no longer the last added cluster in storage
${cucumber.upgrade-versions}
Expand Down

0 comments on commit acddf28

Please sign in to comment.