diff --git a/src/server/src/main/java/io/cassandrareaper/resources/RepairRunResource.java b/src/server/src/main/java/io/cassandrareaper/resources/RepairRunResource.java index 91cd8647e..9f3518c4b 100644 --- a/src/server/src/main/java/io/cassandrareaper/resources/RepairRunResource.java +++ b/src/server/src/main/java/io/cassandrareaper/resources/RepairRunResource.java @@ -379,17 +379,18 @@ public Response modifyRunState( } final RepairRun.RunState newState = parseRunState(stateStr.get()); - if (isUnitAlreadyRepairing(repairRun.get())) { - String errMsg = "repair unit already has run " + repairRun.get().getRepairUnitId() + " in RUNNING state"; - LOG.error(errMsg); - return Response.status(Status.CONFLICT).entity(errMsg).build(); - } - final RunState oldState = repairRun.get().getRunState(); if (oldState == newState) { String msg = "given \"state\" " + stateStr + " is same as the current run state"; return Response.noContent().entity(msg).location(buildRepairRunUri(uriInfo, repairRun.get())).build(); } + if ((isStarting(oldState, newState) || isResuming(oldState, newState) || isRetrying(oldState, newState)) + && isUnitAlreadyRepairing(repairRun.get())) { + + String errMsg = "repair unit already has run " + repairRun.get().getRepairUnitId() + " in RUNNING state"; + LOG.error(errMsg); + return Response.status(Status.CONFLICT).entity(errMsg).build(); + } if (isStarting(oldState, newState)) { return startRun(uriInfo, repairRun.get()); diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java index 5a26171c3..bda4b3e78 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java @@ -52,6 +52,7 @@ import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import cucumber.api.java.en.And; @@ -485,28 +486,52 @@ public void a_scheduled_repair_run_has_started_for_cluster(String clusterName) t await().with().pollInterval(1, SECONDS).atMost(2, MINUTES).until(() -> { Response resp = runner.callReaper("GET", "/repair_run/cluster/" + TestContext.TEST_CLUSTER, EMPTY_PARAMS); - try { - String responseData = resp.readEntity(String.class); + String responseData = resp.readEntity(String.class); - Assertions - .assertThat(resp.getStatus()) - .withFailMessage(responseData) - .isEqualTo(Response.Status.OK.getStatusCode()); - - List runs = SimpleReaperClient.parseRepairRunStatusListJSON(responseData) - .stream() - .filter(r -> RepairRun.RunState.RUNNING == r.getState() || RepairRun.RunState.DONE == r.getState()) - .collect(Collectors.toList()); - - Assertions.assertThat(runs).withFailMessage("Zero or multiple running repairs: " + runs.size()).hasSize(1); - Assertions.assertThat(runs.get(0).getCause()).contains(testContext.getCurrentScheduleId().toString()); - runningRepairs.add(runs.get(0).getId()); - return true; - } catch (AssertionError ex) { - LOG.warn("GET /repair_run/cluster/" + TestContext.TEST_CLUSTER + " failed: " + ex.getMessage()); + Assertions + .assertThat(resp.getStatus()) + .withFailMessage(responseData) + .isEqualTo(Response.Status.OK.getStatusCode()); + + List runs = SimpleReaperClient.parseRepairRunStatusListJSON(responseData) + .stream() + .filter(r -> RepairRun.RunState.RUNNING == r.getState() || RepairRun.RunState.DONE == r.getState()) + .filter(r -> r.getCause().contains(testContext.getCurrentScheduleId().toString())) + .collect(Collectors.toList()); + + if (1 < runs.size()) { + LOG.error("found duplicate repairs from same schedule and trigger time. deleting those behind…"); logResponse(runner, "/repair_run/cluster/" + TestContext.TEST_CLUSTER); + + UUID toKeep = runs.stream() + .sorted((r0, r1) -> (r0.getSegmentsRepaired() != r1.getSegmentsRepaired() + ? r1.getSegmentsRepaired() - r0.getSegmentsRepaired() : r0.getId().compareTo(r1.getId()))) + .findFirst() + .get() + .getId(); + + Optional> deleteParams = Optional.of(ImmutableMap.of("owner", TestContext.TEST_USER)); + + // pause the other repairs first + runs.stream() + .filter(r -> !r.getId().equals(toKeep)) + .forEachOrdered(r -> { + Response res = runner.callReaper("PUT", "/repair_run/" + r.getId() + "/state/PAUSED", EMPTY_PARAMS); + LOG.warn(res.readEntity(String.class)); + }); + + // then delete the other repairs + runs.stream() + .filter(r -> !r.getId().equals(toKeep)) + .forEachOrdered(r -> { + Response res = runner.callReaper("DELETE", "/repair_run/" + r.getId(), deleteParams); + LOG.warn(res.readEntity(String.class)); + }); + return false; } + runningRepairs.add(runs.get(0).getId()); + return true; }); }); Assertions.assertThat(runningRepairs).hasSize(1); @@ -1102,39 +1127,6 @@ public void reaper_has_started_repairs_for_the_last_added_cluster(int expected) } } - @When("^the last added repair run is deleted$") - public void the_last_added_repair_run_is_deleted_for_cluster_called() throws Throwable { - synchronized (BasicSteps.class) { - LOG.info("delete last added repair run with id: {}", testContext.getCurrentRepairId()); - Map params = Maps.newHashMap(); - params.put("owner", TestContext.TEST_USER); - - callAndExpect( - "DELETE", - "/repair_run/" + testContext.getCurrentRepairId(), - Optional.of(params), - Optional.empty(), - Response.Status.ACCEPTED, - Response.Status.NOT_FOUND, - Response.Status.CONFLICT); - - await().with().pollInterval(1, SECONDS).atMost(1, MINUTES).until(() -> { - try { - callAndExpect( - "DELETE", - "/repair_run/" + testContext.getCurrentRepairId(), - Optional.of(params), - Optional.empty(), - Response.Status.NOT_FOUND); - } catch (AssertionError ex) { - LOG.warn("DELETE /repair_run/" + testContext.getCurrentRepairId() + " failed: " + ex.getMessage()); - return false; - } - return true; - }); - } - } - @When("^all added repair runs are deleted for the last added cluster$") public void all_added_repair_runs_are_deleted_for_the_last_added_cluster() throws Throwable { synchronized (BasicSteps.class) { @@ -1272,13 +1264,7 @@ public void the_last_added_repair_is_stopped_for() throws Throwable { private void stopRepairRun(UUID repairRunId) { // given "state" is same as the current run state RUNNERS.parallelStream().forEach(runner -> { - Map params = Maps.newHashMap(); - - Response response = runner.callReaper( - "PUT", - "/repair_run/" + repairRunId + "/state/PAUSED", - Optional.of(params)); - + Response response = runner.callReaper("PUT", "/repair_run/" + repairRunId + "/state/PAUSED", EMPTY_PARAMS); int status = response.getStatus(); String responseEntity = response.readEntity(String.class); @@ -1286,6 +1272,7 @@ private void stopRepairRun(UUID repairRunId) { ImmutableList.of( Response.Status.OK.getStatusCode(), Response.Status.NO_CONTENT.getStatusCode(), + Response.Status.NOT_FOUND.getStatusCode(), Response.Status.CONFLICT.getStatusCode())) .withFailMessage(responseEntity) .contains(status); @@ -1299,9 +1286,10 @@ private void stopRepairRun(UUID repairRunId) { callAndExpect( "PUT", "/repair_run/" + repairRunId + "/state/PAUSED", - Optional.empty(), + EMPTY_PARAMS, Optional.empty(), Response.Status.NO_CONTENT, + Response.Status.NOT_FOUND, Response.Status.CONFLICT); } diff --git a/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature b/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature index aea9195cd..e65ff867c 100644 --- a/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature +++ b/src/server/src/test/resources/io.cassandrareaper.acceptance/integration_reaper_functionality.feature @@ -118,7 +118,7 @@ Feature: Using Reaper Then reaper has 1 started or done repairs for the last added cluster When the last added repair is stopped Then reseting one segment sets its state to not started - And the last added repair run is deleted + And all added repair runs are deleted for the last added cluster And deleting the last added cluster fails When all added schedules are deleted for the last added cluster And the last added cluster is deleted @@ -142,7 +142,7 @@ Feature: Using Reaper And we wait for at least 1 segments to be repaired Then reaper has 1 started or done repairs for the last added cluster When the last added repair is stopped - And the last added repair run is deleted + And all added repair runs are deleted for the last added cluster And the last added cluster is deleted Then reaper has no longer the last added cluster in storage ${cucumber.upgrade-versions} @@ -164,7 +164,7 @@ Feature: Using Reaper And we wait for at least 1 segments to be repaired Then reaper has 1 started or done repairs for the last added cluster When the last added repair is stopped - And the last added repair run is deleted + And all added repair runs are deleted for the last added cluster And the last added cluster is deleted Then reaper has no longer the last added cluster in storage ${cucumber.upgrade-versions} @@ -186,7 +186,7 @@ Feature: Using Reaper When reaper is upgraded to latest Then reaper has 1 started or done repairs for the last added cluster When the last added repair is stopped - And the last added repair run is deleted + And all added repair runs are deleted for the last added cluster And the last added cluster is deleted Then reaper has no longer the last added cluster in storage ${cucumber.upgrade-versions}