Skip to content

Commit

Permalink
Add force option and warning for overlapping repair schedules (#1099)
Browse files Browse the repository at this point in the history
* Add force option and warning for overlapping repair schedules

* [fixup] PR review comments

* add tests for force option
  • Loading branch information
emerkle826 authored Jul 13, 2021
1 parent b3e30d7 commit 5a2c27d
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public Response addRepairRun(
@QueryParam("nodes") Optional<String> nodesToRepairParam,
@QueryParam("datacenters") Optional<String> datacentersToRepairParam,
@QueryParam("blacklistedTables") Optional<String> blacklistedTableNamesParam,
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam) {
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam,
@QueryParam("force") Optional<String> forceParam) {

try {
final Response possibleFailedResponse
Expand All @@ -124,7 +125,8 @@ public Response addRepairRun(
nodesToRepairParam,
datacentersToRepairParam,
blacklistedTableNamesParam,
repairThreadCountParam);
repairThreadCountParam,
forceParam);

if (null != possibleFailedResponse) {
return possibleFailedResponse;
Expand Down Expand Up @@ -272,7 +274,8 @@ static Response checkRequestForAddRepair(
Optional<String> nodesStr,
Optional<String> datacentersStr,
Optional<String> blacklistedTableNamesParam,
Optional<Integer> repairThreadCountStr) throws ReaperException {
Optional<Integer> repairThreadCountStr,
Optional<String> forceParam) throws ReaperException {

if (!clusterName.isPresent()) {
return createMissingArgumentResponse("clusterName");
Expand Down Expand Up @@ -349,6 +352,15 @@ static Response checkRequestForAddRepair(
.build();
}

if (forceParam.isPresent()
&& (!forceParam.get().toUpperCase().contentEquals("TRUE")
&& !forceParam.get().toUpperCase().contentEquals("FALSE"))) {

return Response.status(Response.Status.BAD_REQUEST)
.entity("invalid query parameter \"force\", expecting [True,False]")
.build();
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public Response addRepairSchedule(
@QueryParam("nodes") Optional<String> nodesToRepairParam,
@QueryParam("datacenters") Optional<String> datacentersToRepairParam,
@QueryParam("blacklistedTables") Optional<String> blacklistedTableNamesParam,
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam) {
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam,
@QueryParam("force") Optional<String> forceParam) {

try {
Response possibleFailResponse = RepairRunResource.checkRequestForAddRepair(
Expand All @@ -118,7 +119,8 @@ public Response addRepairSchedule(
nodesToRepairParam,
datacentersToRepairParam,
blacklistedTableNamesParam,
repairThreadCountParam);
repairThreadCountParam,
forceParam);

if (null != possibleFailResponse) {
return possibleFailResponse;
Expand Down Expand Up @@ -197,6 +199,9 @@ public Response addRepairSchedule(
.build();
}

// explicitly force a schedule even if the schedule conflicts
boolean force = (forceParam.isPresent() ? Boolean.parseBoolean(forceParam.get()) : false);

RepairUnit.Builder unitBuilder = RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(keyspace.get())
Expand All @@ -217,7 +222,8 @@ public Response addRepairSchedule(
incremental,
nextActivation,
getSegmentCount(segmentCountPerNode),
getIntensity(intensityStr));
getIntensity(intensityStr),
force);

} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
Expand All @@ -235,10 +241,17 @@ private Response addRepairSchedule(
boolean incremental,
DateTime next,
int segments,
Double intensity) {
Double intensity,
boolean force) {

Optional<RepairSchedule> conflictingRepairSchedule
= repairScheduleService.conflictingRepairSchedule(cluster, unitBuilder);
= repairScheduleService.identicalRepairUnit(cluster, unitBuilder);

if (conflictingRepairSchedule.isPresent()) {
return Response.noContent().location(buildRepairScheduleUri(uriInfo, conflictingRepairSchedule.get())).build();
}

conflictingRepairSchedule = repairScheduleService.conflictingRepairSchedule(cluster, unitBuilder);

if (conflictingRepairSchedule.isPresent()) {
RepairSchedule existingSchedule = conflictingRepairSchedule.get();
Expand All @@ -250,29 +263,30 @@ private Response addRepairSchedule(
return Response.noContent().location(buildRepairScheduleUri(uriInfo, existingSchedule)).build();
}

String msg = String.format(
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
unitBuilder.keyspaceName,
unitBuilder.columnFamilies);

return Response
.status(Response.Status.CONFLICT)
.location(buildRepairScheduleUri(uriInfo, existingSchedule))
.entity(msg)
.build();
} else {
if (!force) {
String msg = String.format(
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
unitBuilder.keyspaceName,
unitBuilder.columnFamilies);

return Response
.status(Response.Status.CONFLICT)
.location(buildRepairScheduleUri(uriInfo, existingSchedule))
.entity(msg)
.build();
}
}

RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder);
RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity);
RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity, force);

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}
return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}

private int getDaysBetween(Optional<Integer> scheduleDaysBetween) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private void createRepairSchedule(Cluster cluster, String keyspace, DateTime nex
REPAIR_OWNER,
context.config.getSegmentCountPerNode(),
context.config.getRepairParallelism(),
context.config.getRepairIntensity());
context.config.getRepairIntensity(),
false);

LOG.info("Scheduled repair created: {}", repairSchedule);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ public Optional<RepairSchedule> conflictingRepairSchedule(Cluster cluster, Repai
return Optional.empty();
}

public Optional<RepairSchedule> identicalRepairUnit(Cluster cluster, RepairUnit.Builder repairUnit) {

Collection<RepairSchedule> repairSchedules = context.storage
.getRepairSchedulesForClusterAndKeyspace(repairUnit.clusterName, repairUnit.keyspaceName);

for (RepairSchedule sched : repairSchedules) {
RepairUnit repairUnitForSched = context.storage.getRepairUnit(sched.getRepairUnitId());
Preconditions.checkState(repairUnitForSched.getClusterName().equals(repairUnit.clusterName));
Preconditions.checkState(repairUnitForSched.getKeyspaceName().equals(repairUnit.keyspaceName));

// if the schedule is identical, return immediately
if (repairUnitService.identicalUnits(cluster, repairUnitForSched, repairUnit)) {
return Optional.of(sched);
}
}
return Optional.empty();
}

/**
* Instantiates a RepairSchedule and stores it in the storage backend.
*
Expand All @@ -77,10 +95,11 @@ public RepairSchedule storeNewRepairSchedule(
String owner,
int segmentCountPerNode,
RepairParallelism repairParallelism,
Double intensity) {
Double intensity,
boolean force) {

Preconditions.checkArgument(
!conflictingRepairSchedule(cluster, repairUnit.with()).isPresent(),
force || !conflictingRepairSchedule(cluster, repairUnit.with()).isPresent(),
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
repairUnit.getKeyspaceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.core.Table;
import io.cassandrareaper.jmx.ClusterFacade;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,6 +60,10 @@ public static RepairUnitService create(AppContext context) {
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
return getOrCreateRepairUnit(cluster, params, false);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
if (params.incrementalRepair) {
try {
String version = ClusterFacade.create(context).getCassandraVersion(cluster);
Expand All @@ -69,7 +75,7 @@ public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder para
}
}
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params, force);
}

/**
Expand Down Expand Up @@ -130,9 +136,9 @@ private static boolean isBlackListedCompactionStrategy(Table table) {
.anyMatch(s -> table.getCompactionStrategy().toLowerCase().contains(s.toLowerCase()));
}

private RepairUnit createRepairUnit(Cluster cluster, RepairUnit.Builder builder) {
private RepairUnit createRepairUnit(Cluster cluster, RepairUnit.Builder builder, boolean force) {
Preconditions.checkArgument(
!unitConflicts(cluster, builder),
force || !unitConflicts(cluster, builder),
"unit conflicts with existing in " + builder.clusterName + ":" + builder.keyspaceName);

return context.storage.addRepairUnit(builder);
Expand Down Expand Up @@ -170,6 +176,51 @@ boolean conflictingUnits(Cluster cluster, RepairUnit unit, RepairUnit.Builder bu
return !Sets.intersection(listRepairTables(unit.with(), tables), listRepairTables(builder, tables)).isEmpty();
}

boolean identicalUnits(Cluster cluster, RepairUnit unit, RepairUnit.Builder builder) {
// if the Builders are equal, everything is the same
if (unit.with().equals(builder)) {
return true;
}

// if incremental repair is not the same, the units are not identical
if (unit.getIncrementalRepair() != builder.incrementalRepair.booleanValue()) {
// incremental reapir is not the same
return false;
}

// check the set of tables to be repaired
Preconditions.checkState(unit.getKeyspaceName().equals(builder.keyspaceName));

Set<String> tables = unit.getColumnFamilies().isEmpty() || builder.columnFamilies.isEmpty()
? getTableNamesForKeyspace(cluster, unit.getKeyspaceName())
: Collections.emptySet();

// if the set of tables to repair is not the same, the units are not identical
if (!Objects.equals(listRepairTables(unit.with(), tables), listRepairTables(builder, tables))) {
// repair tables not the same
return false;
}

// if the set of nodes isn't the same, the units are not identical
Set<String> unitNodes = getRepairUnitNodes(cluster, unit.with());
Set<String> builderNodes = getRepairUnitNodes(cluster, builder);
if (!Objects.equals(unitNodes, builderNodes)) {
// repair unit nodes not the same
return false;
}

// if the set of datacenetrrs isn't the same, the units are not identical
Set<String> unitDatacenters = getRepairUnitDatacenters(cluster, unit.with(), unitNodes);
Set<String> builderDatacenters = getRepairUnitDatacenters(cluster, builder, builderNodes);
if (!Objects.equals(unitDatacenters, builderDatacenters)) {
// repair datacenters not the same
return false;
}

// units are effectively identical
return true;
}

public Set<String> getTableNamesForKeyspace(Cluster cluster, String keyspace) {
try {
return ClusterFacade
Expand All @@ -184,6 +235,38 @@ public Set<String> getTableNamesForKeyspace(Cluster cluster, String keyspace) {
}
}

private Set<String> getRepairUnitNodes(Cluster cluster, RepairUnit.Builder builder) {
if (!builder.nodes.isEmpty()) {
return builder.nodes;
}
try {
return ClusterFacade
.create(context)
.getLiveNodes(cluster)
.stream()
.collect(Collectors.toSet());
} catch (ReaperException e) {
LOG.warn("Unable to get list of live nodes for cluster {}", cluster.getName());
return Collections.emptySet();
}
}

private Set<String> getRepairUnitDatacenters(Cluster cluster, RepairUnit.Builder builder, Set<String> nodes) {
if (!builder.datacenters.isEmpty()) {
return builder.datacenters;
}
Set<String> datacenters = Sets.newHashSet();
try {
ClusterFacade facade = ClusterFacade.create(context);
for (String node : nodes) {
datacenters.add(facade.getDatacenter(Node.builder().withHostname(node).build()));
}
} catch (ReaperException | InterruptedException e) {
LOG.warn("Unable to get the list of datacenters for cluster {}", cluster.getName(), e);
}
return datacenters;
}

private static Set<String> listRepairTables(RepairUnit.Builder builder, Set<String> allTables) {
// subtract blacklisted tables from all tables (or those explicitly listed)
Set<String> tables = Sets.newHashSet(builder.columnFamilies.isEmpty() ? allTables : builder.columnFamilies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,37 @@ public void a_new_daily_repair_schedule_is_added_that_already_exists_for(
}
}

@When("^a new daily \"([^\"]*)\" repair schedule is added "
+ "that already exists for \"([^\"]*)\" and keyspace \"([^\"]*)\" with force option$")
public void a_new_daily_repair_schedule_is_added_that_already_exists_with_force_option_for(
String repairType,
String clusterName,
String keyspace) throws Throwable {

synchronized (BasicSteps.class) {
RUNNERS.parallelStream().forEach(runner -> {
Map<String, String> params = Maps.newHashMap();
params.put("clusterName", clusterName);
params.put("keyspace", keyspace);
params.put("owner", TestContext.TEST_USER);
params.put("intensity", "0.9");
params.put("scheduleDaysBetween", "1");
params.put("repairParallelism", repairType.equals("incremental") ? "parallel" : "sequential");
params.put("incrementalRepair", repairType.equals("incremental") ? "True" : "False");
params.put("force", "true");
Response response = runner.callReaper("POST", "/repair_schedule", Optional.of(params));

int status = response.getStatus();
String responseEntity = response.readEntity(String.class);

Assertions.assertThat(
ImmutableList.of(Response.Status.NO_CONTENT.getStatusCode(), Response.Status.CREATED.getStatusCode()))
.withFailMessage(responseEntity)
.contains(status);
});
}
}

@And("^the last added repair is activated$")
public void the_last_added_repair_is_activated_for() throws Throwable {
synchronized (BasicSteps.class) {
Expand Down
Loading

0 comments on commit 5a2c27d

Please sign in to comment.