Skip to content

Commit

Permalink
Allow to force create repair runs with conflicting units
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Oct 19, 2021
1 parent f3f80a2 commit 25f4505
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,20 @@ public Response addRepairRun(
if (null != possibleFailedResponse) {
return possibleFailedResponse;
}

Double intensity;
if (intensityStr.isPresent()) {
intensity = Double.parseDouble(intensityStr.get());
} else {
intensity = context.config.getRepairIntensity();
LOG.debug("no intensity given, so using default value: {}", intensity);
}

boolean incrementalRepair;
if (incrementalRepairStr.isPresent()) {
incrementalRepair = Boolean.parseBoolean(incrementalRepairStr.get());
} else {
incrementalRepair = context.config.getIncrementalRepair();
LOG.debug("no incremental repair given, so using default value: {}", incrementalRepair);
}

int segments = context.config.getSegmentCountPerNode();
if (!incrementalRepair) {
if (segmentCountPerNode.isPresent()) {
Expand All @@ -163,7 +160,6 @@ public Response addRepairRun(
// hijack the segment count in case of incremental repair
segments = -1;
}

final Cluster cluster = context.storage.getCluster(Cluster.toSymbolicName(clusterName.get()));
Set<String> tableNames;
try {
Expand All @@ -172,7 +168,6 @@ public Response addRepairRun(
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

Set<String> blacklistedTableNames;
try {
blacklistedTableNames
Expand All @@ -181,15 +176,13 @@ public Response addRepairRun(
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

final Set<String> nodesToRepair;
try {
nodesToRepair = repairRunService.getNodesToRepairBasedOnParam(cluster, nodesToRepairParam);
} catch (IllegalArgumentException ex) {
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

final Set<String> datacentersToRepair;
try {
datacentersToRepair = RepairRunService
Expand All @@ -213,45 +206,52 @@ public Response addRepairRun(
.repairThreadCount(repairThreadCountParam.orElse(context.config.getRepairThreadCount()))
.timeout(timeout);

final RepairUnit theRepairUnit = repairUnitService.getOrCreateRepairUnit(cluster, builder, force);
if (theRepairUnit.getIncrementalRepair() != incrementalRepair) {
String msg = String.format(
"A repair unit %s already exist for the same cluster/keyspace/tables"
+ " but with a different incremental repair value. Requested value %s | Existing value: %s",
theRepairUnit.getId(),
incrementalRepair,
theRepairUnit.getIncrementalRepair());

return Response.status(Response.Status.BAD_REQUEST).entity(msg).build();
}

RepairParallelism parallelism = context.config.getRepairParallelism();
if (repairParallelism.isPresent()) {
LOG.debug(
"using given repair parallelism {} instead of configured value {}",
repairParallelism.get(),
context.config.getRepairParallelism());
final Optional<RepairUnit> maybeTheRepairUnit = repairUnitService.getOrCreateRepairUnit(cluster, builder, force);
if (maybeTheRepairUnit.isPresent()) {
RepairUnit theRepairUnit = maybeTheRepairUnit.get();
if (theRepairUnit.getIncrementalRepair() != incrementalRepair) {
String msg = String.format(
"A repair unit %s already exist for the same cluster/keyspace/tables"
+ " but with a different incremental repair value. Requested value %s | Existing value: %s",
theRepairUnit.getId(),
incrementalRepair,
theRepairUnit.getIncrementalRepair());

return Response.status(Response.Status.CONFLICT).entity(msg).build();
}

parallelism = RepairParallelism.valueOf(repairParallelism.get().toUpperCase());
}
RepairParallelism parallelism = context.config.getRepairParallelism();
if (repairParallelism.isPresent()) {
LOG.debug(
"using given repair parallelism {} instead of configured value {}",
repairParallelism.get(),
context.config.getRepairParallelism());

if (incrementalRepair) {
parallelism = RepairParallelism.PARALLEL;
}
parallelism = RepairParallelism.valueOf(repairParallelism.get().toUpperCase());
}

final RepairRun newRepairRun = repairRunService.registerRepairRun(
cluster,
theRepairUnit,
cause,
owner.get(),
segments,
parallelism,
intensity,
false);
if (incrementalRepair) {
parallelism = RepairParallelism.PARALLEL;
}

return Response.created(buildRepairRunUri(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0))
final RepairRun newRepairRun = repairRunService.registerRepairRun(
cluster,
theRepairUnit,
cause,
owner.get(),
segments,
parallelism,
intensity,
false);

return Response.created(buildRepairRunUri(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0))
.build();
} else {
return Response.status(Response.Status.CONFLICT)
.entity("An existing repair unit conflicts with your repair run.")
.build();
}

} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,28 @@ private Response addRepairSchedule(
}
}

RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState((percentUnrepairedThreshold > 0 && incremental) || percentUnrepairedThreshold <= 0,
"Setting a % repaired threshold can only be done on incremental schedules");

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(
cluster, unit, days, next, owner, segments, parallel, intensity, force, adaptive, percentUnrepairedThreshold);
Optional<RepairUnit> maybeUnit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

if (maybeUnit.isPresent()) {
RepairUnit unit = maybeUnit.get();
Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState((percentUnrepairedThreshold > 0 && incremental) || percentUnrepairedThreshold <= 0,
"Setting a % repaired threshold can only be done on incremental schedules");

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(
cluster, unit, days, next, owner, segments,
parallel, intensity, force, adaptive, percentUnrepairedThreshold);

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
return Response
.status(Response.Status.NO_CONTENT)
.entity("Repair schedule couldn't be created as an existing repair unit seems to conflict with it.")
.build();
}

private int getDaysBetween(Optional<Integer> scheduleDaysBetween) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void createRepairSchedule(Cluster cluster, String keyspace, DateTime nex

RepairSchedule repairSchedule = repairScheduleService.storeNewRepairSchedule(
cluster,
repairUnitService.getOrCreateRepairUnit(cluster, builder),
repairUnitService.getOrCreateRepairUnit(cluster, builder).get(),
context.config.getScheduleDaysBetween(),
nextActivationTime,
REPAIR_OWNER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public static RepairUnitService create(AppContext context) {
return new RepairUnitService(context);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
public Optional<RepairUnit> getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
return getOrCreateRepairUnit(cluster, params, false);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
public Optional<RepairUnit> getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
if (params.incrementalRepair) {
try {
String version = ClusterFacade.create(context).getCassandraVersion(cluster);
Expand All @@ -75,7 +75,17 @@ public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder para
}
}
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params, force);
if (repairUnit.isPresent()) {
return repairUnit;
}

try {
return Optional.of(createRepairUnit(cluster, params, force));
} catch (IllegalArgumentException e) {
return Optional.empty();
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,24 @@ public void a_new_repair_is_added_for_the_last_added_cluster_and_keyspace(String
}
}

@When("^a new repair is added for the last added cluster and keyspace \"([^\"]*)\" with force option$")
public void a_new_repair_is_added_for_the_last_added_cluster_and_keyspace_force(String keyspace) throws Throwable {
synchronized (BasicSteps.class) {
ReaperTestJettyRunner runner = RUNNERS.get(RAND.nextInt(RUNNERS.size()));
Map<String, String> params = Maps.newHashMap();
params.put("clusterName", TestContext.TEST_CLUSTER);
params.put("keyspace", keyspace);
params.put("owner", TestContext.TEST_USER);
params.put("force", "true");
Response response = runner.callReaper("POST", "/repair_run", Optional.of(params));
assertEquals(Response.Status.CREATED.getStatusCode(), response.getStatus());
String responseData = response.readEntity(String.class);
Assertions.assertThat(responseData).isNotBlank();
RepairRunStatus run = SimpleReaperClient.parseRepairRunStatusJSON(responseData);
testContext.addCurrentRepairId(run.getId());
}
}

@And("^the last added repair has table \"([^\"]*)\" in the blacklist$")
public void the_last_added_repair_has_table_in_the_blacklist(String blacklistedTable) throws Throwable {
synchronized (BasicSteps.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ Feature: Using Reaper
When reaper is upgraded to latest
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
And all added repair runs are deleted for the last added cluster
And a new repair is added for the last added cluster and keyspace "booya"
Then reaper has 1 repairs for cluster called "test"
When a new repair is added for the last added cluster and keyspace "booya" with force option
Then reaper has 2 repairs for cluster called "test"
When all added repair runs are deleted for the last added cluster
And the last added cluster is deleted
Then reaper has no longer the last added cluster in storage
${cucumber.upgrade-versions}
Expand Down
4 changes: 2 additions & 2 deletions src/ui/app/jsx/repair-form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ const repairForm = CreateReactClass({
</Modal.Header>
<Modal.Body>
<p>{this.state.addRepairResultMsg}</p>
<p>It is not reccommended to create overlapping repair schedules.</p>
<p>For Cassandra 4.0 and later, you can force creating this schedule by clicking the "Force" button below.</p>
<p>It is not reccommended to create overlapping repair schedules/runs.</p>
<p>For Cassandra 4.0 and later, you can force creating this schedule/run by clicking the "Force" button below.</p>
</Modal.Body>
<Modal.Footer>
<Button variant="secondary" onClick={this._onClose}>Cancel</Button>
Expand Down

0 comments on commit 25f4505

Please sign in to comment.