Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to force create repair runs with conflicting units #1131

Merged
merged 2 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case "${TEST_TYPE}" in
exit 1
;;
"local")
mvn -B package
mvn -B package -DskipTests
mvn -B org.jacoco:jacoco-maven-plugin:${JACOCO_VERSION}:prepare-agent surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperShiroIT -Dcucumber.options="$CUCUMBER_OPTIONS" org.jacoco:jacoco-maven-plugin:${JACOCO_VERSION}:report
mvn -B org.jacoco:jacoco-maven-plugin:${JACOCO_VERSION}:prepare-agent surefire:test -DsurefireArgLine="-Xmx256m" -Dtest=ReaperIT -Dcucumber.options="$CUCUMBER_OPTIONS" org.jacoco:jacoco-maven-plugin:${JACOCO_VERSION}:report
;;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,20 @@ public Response addRepairRun(
if (null != possibleFailedResponse) {
return possibleFailedResponse;
}

Double intensity;
if (intensityStr.isPresent()) {
intensity = Double.parseDouble(intensityStr.get());
} else {
intensity = context.config.getRepairIntensity();
LOG.debug("no intensity given, so using default value: {}", intensity);
}

boolean incrementalRepair;
if (incrementalRepairStr.isPresent()) {
incrementalRepair = Boolean.parseBoolean(incrementalRepairStr.get());
} else {
incrementalRepair = context.config.getIncrementalRepair();
LOG.debug("no incremental repair given, so using default value: {}", incrementalRepair);
}

int segments = context.config.getSegmentCountPerNode();
if (!incrementalRepair) {
if (segmentCountPerNode.isPresent()) {
Expand All @@ -163,7 +160,6 @@ public Response addRepairRun(
// hijack the segment count in case of incremental repair
segments = -1;
}

final Cluster cluster = context.storage.getCluster(Cluster.toSymbolicName(clusterName.get()));
Set<String> tableNames;
try {
Expand All @@ -172,7 +168,6 @@ public Response addRepairRun(
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

Set<String> blacklistedTableNames;
try {
blacklistedTableNames
Expand All @@ -181,15 +176,13 @@ public Response addRepairRun(
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

final Set<String> nodesToRepair;
try {
nodesToRepair = repairRunService.getNodesToRepairBasedOnParam(cluster, nodesToRepairParam);
} catch (IllegalArgumentException ex) {
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

final Set<String> datacentersToRepair;
try {
datacentersToRepair = RepairRunService
Expand All @@ -213,45 +206,52 @@ public Response addRepairRun(
.repairThreadCount(repairThreadCountParam.orElse(context.config.getRepairThreadCount()))
.timeout(timeout);

final RepairUnit theRepairUnit = repairUnitService.getOrCreateRepairUnit(cluster, builder, force);
if (theRepairUnit.getIncrementalRepair() != incrementalRepair) {
String msg = String.format(
"A repair unit %s already exist for the same cluster/keyspace/tables"
+ " but with a different incremental repair value. Requested value %s | Existing value: %s",
theRepairUnit.getId(),
incrementalRepair,
theRepairUnit.getIncrementalRepair());

return Response.status(Response.Status.BAD_REQUEST).entity(msg).build();
}

RepairParallelism parallelism = context.config.getRepairParallelism();
if (repairParallelism.isPresent()) {
LOG.debug(
"using given repair parallelism {} instead of configured value {}",
repairParallelism.get(),
context.config.getRepairParallelism());
final Optional<RepairUnit> maybeTheRepairUnit = repairUnitService.getOrCreateRepairUnit(cluster, builder, force);
if (maybeTheRepairUnit.isPresent()) {
RepairUnit theRepairUnit = maybeTheRepairUnit.get();
if (theRepairUnit.getIncrementalRepair() != incrementalRepair) {
String msg = String.format(
"A repair unit %s already exist for the same cluster/keyspace/tables"
+ " but with a different incremental repair value. Requested value %s | Existing value: %s",
theRepairUnit.getId(),
incrementalRepair,
theRepairUnit.getIncrementalRepair());

return Response.status(Response.Status.CONFLICT).entity(msg).build();
}

parallelism = RepairParallelism.valueOf(repairParallelism.get().toUpperCase());
}
RepairParallelism parallelism = context.config.getRepairParallelism();
if (repairParallelism.isPresent()) {
LOG.debug(
"using given repair parallelism {} instead of configured value {}",
repairParallelism.get(),
context.config.getRepairParallelism());

if (incrementalRepair) {
parallelism = RepairParallelism.PARALLEL;
}
parallelism = RepairParallelism.valueOf(repairParallelism.get().toUpperCase());
}

final RepairRun newRepairRun = repairRunService.registerRepairRun(
cluster,
theRepairUnit,
cause,
owner.get(),
segments,
parallelism,
intensity,
false);
if (incrementalRepair) {
parallelism = RepairParallelism.PARALLEL;
}

return Response.created(buildRepairRunUri(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0))
final RepairRun newRepairRun = repairRunService.registerRepairRun(
cluster,
theRepairUnit,
cause,
owner.get(),
segments,
parallelism,
intensity,
false);

return Response.created(buildRepairRunUri(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0))
.build();
} else {
return Response.status(Response.Status.CONFLICT)
.entity("An existing repair unit conflicts with your repair run.")
.build();
}

} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,28 @@ private Response addRepairSchedule(
}
}

RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState((percentUnrepairedThreshold > 0 && incremental) || percentUnrepairedThreshold <= 0,
"Setting a % repaired threshold can only be done on incremental schedules");

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(
cluster, unit, days, next, owner, segments, parallel, intensity, force, adaptive, percentUnrepairedThreshold);
Optional<RepairUnit> maybeUnit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

if (maybeUnit.isPresent()) {
RepairUnit unit = maybeUnit.get();
Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState((percentUnrepairedThreshold > 0 && incremental) || percentUnrepairedThreshold <= 0,
"Setting a % repaired threshold can only be done on incremental schedules");

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(
cluster, unit, days, next, owner, segments,
parallel, intensity, force, adaptive, percentUnrepairedThreshold);

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
return Response
.status(Response.Status.NO_CONTENT)
.entity("Repair schedule couldn't be created as an existing repair unit seems to conflict with it.")
.build();
}

private int getDaysBetween(Optional<Integer> scheduleDaysBetween) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void createRepairSchedule(Cluster cluster, String keyspace, DateTime nex

RepairSchedule repairSchedule = repairScheduleService.storeNewRepairSchedule(
cluster,
repairUnitService.getOrCreateRepairUnit(cluster, builder),
repairUnitService.getOrCreateRepairUnit(cluster, builder).get(),
context.config.getScheduleDaysBetween(),
nextActivationTime,
REPAIR_OWNER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public static RepairUnitService create(AppContext context) {
return new RepairUnitService(context);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
public Optional<RepairUnit> getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
return getOrCreateRepairUnit(cluster, params, false);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
public Optional<RepairUnit> getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
if (params.incrementalRepair) {
try {
String version = ClusterFacade.create(context).getCassandraVersion(cluster);
Expand All @@ -75,7 +75,17 @@ public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder para
}
}
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params, force);
if (repairUnit.isPresent()) {
return repairUnit;
}

try {
return Optional.of(createRepairUnit(cluster, params, force));
} catch (IllegalArgumentException e) {
return Optional.empty();
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,24 @@ public void a_new_repair_is_added_for_the_last_added_cluster_and_keyspace(String
}
}

@When("^a new repair is added for the last added cluster and keyspace \"([^\"]*)\" with force option$")
public void a_new_repair_is_added_for_the_last_added_cluster_and_keyspace_force(String keyspace) throws Throwable {
synchronized (BasicSteps.class) {
ReaperTestJettyRunner runner = RUNNERS.get(RAND.nextInt(RUNNERS.size()));
Map<String, String> params = Maps.newHashMap();
params.put("clusterName", TestContext.TEST_CLUSTER);
params.put("keyspace", keyspace);
params.put("owner", TestContext.TEST_USER);
params.put("force", "true");
Response response = runner.callReaper("POST", "/repair_run", Optional.of(params));
assertEquals(Response.Status.CREATED.getStatusCode(), response.getStatus());
String responseData = response.readEntity(String.class);
Assertions.assertThat(responseData).isNotBlank();
RepairRunStatus run = SimpleReaperClient.parseRepairRunStatusJSON(responseData);
testContext.addCurrentRepairId(run.getId());
}
}

@And("^the last added repair has table \"([^\"]*)\" in the blacklist$")
public void the_last_added_repair_has_table_in_the_blacklist(String blacklistedTable) throws Throwable {
synchronized (BasicSteps.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ Feature: Using Reaper
When reaper is upgraded to latest
Then reaper has 1 started or done repairs for the last added cluster
When the last added repair is stopped
When a new repair is added for the last added cluster and keyspace "booya" with force option
And the last added repair is activated
And we wait for at least 1 segments to be repaired
Then reaper has 2 repairs for cluster called "test"
When the last added repair is stopped
And all added repair runs are deleted for the last added cluster
And the last added cluster is deleted
Then reaper has no longer the last added cluster in storage
Expand Down
4 changes: 2 additions & 2 deletions src/ui/app/jsx/repair-form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ const repairForm = CreateReactClass({
</Modal.Header>
<Modal.Body>
<p>{this.state.addRepairResultMsg}</p>
<p>It is not reccommended to create overlapping repair schedules.</p>
<p>For Cassandra 4.0 and later, you can force creating this schedule by clicking the "Force" button below.</p>
<p>It is not reccommended to create overlapping repair schedules/runs.</p>
<p>For Cassandra 4.0 and later, you can force creating this schedule/run by clicking the "Force" button below.</p>
</Modal.Body>
<Modal.Footer>
<Button variant="secondary" onClick={this._onClose}>Cancel</Button>
Expand Down