Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force option and warning for overlapping repair schedules #1099

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public Response addRepairRun(
@QueryParam("nodes") Optional<String> nodesToRepairParam,
@QueryParam("datacenters") Optional<String> datacentersToRepairParam,
@QueryParam("blacklistedTables") Optional<String> blacklistedTableNamesParam,
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam) {
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam,
@QueryParam("force") Optional<String> forceParam) {

try {
final Response possibleFailedResponse
Expand All @@ -124,7 +125,8 @@ public Response addRepairRun(
nodesToRepairParam,
datacentersToRepairParam,
blacklistedTableNamesParam,
repairThreadCountParam);
repairThreadCountParam,
forceParam);

if (null != possibleFailedResponse) {
return possibleFailedResponse;
Expand Down Expand Up @@ -272,7 +274,8 @@ static Response checkRequestForAddRepair(
Optional<String> nodesStr,
Optional<String> datacentersStr,
Optional<String> blacklistedTableNamesParam,
Optional<Integer> repairThreadCountStr) throws ReaperException {
Optional<Integer> repairThreadCountStr,
Optional<String> forceParam) throws ReaperException {

if (!clusterName.isPresent()) {
return createMissingArgumentResponse("clusterName");
Expand Down Expand Up @@ -349,6 +352,15 @@ static Response checkRequestForAddRepair(
.build();
}

if (forceParam.isPresent()
&& (!forceParam.get().toUpperCase().contentEquals("TRUE")
&& !forceParam.get().toUpperCase().contentEquals("FALSE"))) {

return Response.status(Response.Status.BAD_REQUEST)
.entity("invalid query parameter \"force\", expecting [True,False]")
.build();
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public Response addRepairSchedule(
@QueryParam("nodes") Optional<String> nodesToRepairParam,
@QueryParam("datacenters") Optional<String> datacentersToRepairParam,
@QueryParam("blacklistedTables") Optional<String> blacklistedTableNamesParam,
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam) {
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam,
@QueryParam("force") Optional<String> forceParam) {

try {
Response possibleFailResponse = RepairRunResource.checkRequestForAddRepair(
Expand All @@ -118,7 +119,8 @@ public Response addRepairSchedule(
nodesToRepairParam,
datacentersToRepairParam,
blacklistedTableNamesParam,
repairThreadCountParam);
repairThreadCountParam,
forceParam);

if (null != possibleFailResponse) {
return possibleFailResponse;
Expand Down Expand Up @@ -197,6 +199,9 @@ public Response addRepairSchedule(
.build();
}

// explicitly force a schedule even if the schedule conflicts
boolean force = (forceParam.isPresent() ? Boolean.parseBoolean(forceParam.get()) : false);

RepairUnit.Builder unitBuilder = RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(keyspace.get())
Expand All @@ -217,7 +222,8 @@ public Response addRepairSchedule(
incremental,
nextActivation,
getSegmentCount(segmentCountPerNode),
getIntensity(intensityStr));
getIntensity(intensityStr),
force);

} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
Expand All @@ -235,10 +241,17 @@ private Response addRepairSchedule(
boolean incremental,
DateTime next,
int segments,
Double intensity) {
Double intensity,
boolean force) {

Optional<RepairSchedule> conflictingRepairSchedule
= repairScheduleService.conflictingRepairSchedule(cluster, unitBuilder);
= repairScheduleService.identicalRepairUnit(cluster, unitBuilder);

if (conflictingRepairSchedule.isPresent()) {
return Response.noContent().location(buildRepairScheduleUri(uriInfo, conflictingRepairSchedule.get())).build();
}

conflictingRepairSchedule = repairScheduleService.conflictingRepairSchedule(cluster, unitBuilder);

if (conflictingRepairSchedule.isPresent()) {
RepairSchedule existingSchedule = conflictingRepairSchedule.get();
Expand All @@ -250,29 +263,30 @@ private Response addRepairSchedule(
return Response.noContent().location(buildRepairScheduleUri(uriInfo, existingSchedule)).build();
}

String msg = String.format(
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
unitBuilder.keyspaceName,
unitBuilder.columnFamilies);

return Response
.status(Response.Status.CONFLICT)
.location(buildRepairScheduleUri(uriInfo, existingSchedule))
.entity(msg)
.build();
} else {
if (!force) {
String msg = String.format(
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
unitBuilder.keyspaceName,
unitBuilder.columnFamilies);

return Response
.status(Response.Status.CONFLICT)
.location(buildRepairScheduleUri(uriInfo, existingSchedule))
.entity(msg)
.build();
}
}

RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder);
RepairUnit unit = repairUnitService.getOrCreateRepairUnit(cluster, unitBuilder, force);

Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);
Preconditions
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity);
RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity, force);

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}
return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}

private int getDaysBetween(Optional<Integer> scheduleDaysBetween) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private void createRepairSchedule(Cluster cluster, String keyspace, DateTime nex
REPAIR_OWNER,
context.config.getSegmentCountPerNode(),
context.config.getRepairParallelism(),
context.config.getRepairIntensity());
context.config.getRepairIntensity(),
false);

LOG.info("Scheduled repair created: {}", repairSchedule);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ public Optional<RepairSchedule> conflictingRepairSchedule(Cluster cluster, Repai
return Optional.empty();
}

public Optional<RepairSchedule> identicalRepairUnit(Cluster cluster, RepairUnit.Builder repairUnit) {

Collection<RepairSchedule> repairSchedules = context.storage
.getRepairSchedulesForClusterAndKeyspace(repairUnit.clusterName, repairUnit.keyspaceName);

for (RepairSchedule sched : repairSchedules) {
RepairUnit repairUnitForSched = context.storage.getRepairUnit(sched.getRepairUnitId());
Preconditions.checkState(repairUnitForSched.getClusterName().equals(repairUnit.clusterName));
Preconditions.checkState(repairUnitForSched.getKeyspaceName().equals(repairUnit.keyspaceName));

// if the schedule is identical, return immediately
if (repairUnitService.identicalUnits(cluster, repairUnitForSched, repairUnit)) {
return Optional.of(sched);
}
}
return Optional.empty();
}

/**
* Instantiates a RepairSchedule and stores it in the storage backend.
*
Expand All @@ -77,10 +95,11 @@ public RepairSchedule storeNewRepairSchedule(
String owner,
int segmentCountPerNode,
RepairParallelism repairParallelism,
Double intensity) {
Double intensity,
boolean force) {

Preconditions.checkArgument(
!conflictingRepairSchedule(cluster, repairUnit.with()).isPresent(),
force || !conflictingRepairSchedule(cluster, repairUnit.with()).isPresent(),
"A repair schedule already exists for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
repairUnit.getKeyspaceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.core.Table;
import io.cassandrareaper.jmx.ClusterFacade;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,6 +60,10 @@ public static RepairUnitService create(AppContext context) {
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params) {
return getOrCreateRepairUnit(cluster, params, false);
}

public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder params, boolean force) {
if (params.incrementalRepair) {
try {
String version = ClusterFacade.create(context).getCassandraVersion(cluster);
Expand All @@ -69,7 +75,7 @@ public RepairUnit getOrCreateRepairUnit(Cluster cluster, RepairUnit.Builder para
}
}
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params);
return repairUnit.isPresent() ? repairUnit.get() : createRepairUnit(cluster, params, force);
}

/**
Expand Down Expand Up @@ -130,9 +136,9 @@ private static boolean isBlackListedCompactionStrategy(Table table) {
.anyMatch(s -> table.getCompactionStrategy().toLowerCase().contains(s.toLowerCase()));
}

private RepairUnit createRepairUnit(Cluster cluster, RepairUnit.Builder builder) {
private RepairUnit createRepairUnit(Cluster cluster, RepairUnit.Builder builder, boolean force) {
Preconditions.checkArgument(
!unitConflicts(cluster, builder),
force || !unitConflicts(cluster, builder),
"unit conflicts with existing in " + builder.clusterName + ":" + builder.keyspaceName);

return context.storage.addRepairUnit(builder);
Expand Down Expand Up @@ -170,6 +176,51 @@ boolean conflictingUnits(Cluster cluster, RepairUnit unit, RepairUnit.Builder bu
return !Sets.intersection(listRepairTables(unit.with(), tables), listRepairTables(builder, tables)).isEmpty();
}

boolean identicalUnits(Cluster cluster, RepairUnit unit, RepairUnit.Builder builder) {
// if the Builders are equal, everything is the same
if (unit.with().equals(builder)) {
return true;
}

// if incremental repair is not the same, the units are not identical
if (unit.getIncrementalRepair() != builder.incrementalRepair.booleanValue()) {
// incremental reapir is not the same
return false;
}

// check the set of tables to be repaired
Preconditions.checkState(unit.getKeyspaceName().equals(builder.keyspaceName));

Set<String> tables = unit.getColumnFamilies().isEmpty() || builder.columnFamilies.isEmpty()
? getTableNamesForKeyspace(cluster, unit.getKeyspaceName())
: Collections.emptySet();

// if the set of tables to repair is not the same, the units are not identical
if (!Objects.equals(listRepairTables(unit.with(), tables), listRepairTables(builder, tables))) {
// repair tables not the same
return false;
}

// if the set of nodes isn't the same, the units are not identical
Set<String> unitNodes = getRepairUnitNodes(cluster, unit.with());
Set<String> builderNodes = getRepairUnitNodes(cluster, builder);
if (!Objects.equals(unitNodes, builderNodes)) {
// repair unit nodes not the same
return false;
}

// if the set of datacenetrrs isn't the same, the units are not identical
Set<String> unitDatacenters = getRepairUnitDatacenters(cluster, unit.with(), unitNodes);
Set<String> builderDatacenters = getRepairUnitDatacenters(cluster, builder, builderNodes);
if (!Objects.equals(unitDatacenters, builderDatacenters)) {
// repair datacenters not the same
return false;
}

// units are effectively identical
return true;
}

public Set<String> getTableNamesForKeyspace(Cluster cluster, String keyspace) {
try {
return ClusterFacade
Expand All @@ -184,6 +235,38 @@ public Set<String> getTableNamesForKeyspace(Cluster cluster, String keyspace) {
}
}

private Set<String> getRepairUnitNodes(Cluster cluster, RepairUnit.Builder builder) {
if (!builder.nodes.isEmpty()) {
return builder.nodes;
}
try {
return ClusterFacade
.create(context)
.getLiveNodes(cluster)
.stream()
.collect(Collectors.toSet());
} catch (ReaperException e) {
LOG.warn("Unable to get list of live nodes for cluster {}", cluster.getName());
return Collections.emptySet();
}
}

private Set<String> getRepairUnitDatacenters(Cluster cluster, RepairUnit.Builder builder, Set<String> nodes) {
if (!builder.datacenters.isEmpty()) {
return builder.datacenters;
}
Set<String> datacenters = Sets.newHashSet();
try {
ClusterFacade facade = ClusterFacade.create(context);
for (String node : nodes) {
datacenters.add(facade.getDatacenter(Node.builder().withHostname(node).build()));
}
} catch (ReaperException | InterruptedException e) {
LOG.warn("Unable to get the list of datacenters for cluster {}", cluster.getName(), e);
}
return datacenters;
}

private static Set<String> listRepairTables(RepairUnit.Builder builder, Set<String> allTables) {
// subtract blacklisted tables from all tables (or those explicitly listed)
Set<String> tables = Sets.newHashSet(builder.columnFamilies.isEmpty() ? allTables : builder.columnFamilies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,37 @@ public void a_new_daily_repair_schedule_is_added_that_already_exists_for(
}
}

@When("^a new daily \"([^\"]*)\" repair schedule is added "
+ "that already exists for \"([^\"]*)\" and keyspace \"([^\"]*)\" with force option$")
public void a_new_daily_repair_schedule_is_added_that_already_exists_with_force_option_for(
String repairType,
String clusterName,
String keyspace) throws Throwable {

synchronized (BasicSteps.class) {
RUNNERS.parallelStream().forEach(runner -> {
Map<String, String> params = Maps.newHashMap();
params.put("clusterName", clusterName);
params.put("keyspace", keyspace);
params.put("owner", TestContext.TEST_USER);
params.put("intensity", "0.9");
params.put("scheduleDaysBetween", "1");
params.put("repairParallelism", repairType.equals("incremental") ? "parallel" : "sequential");
params.put("incrementalRepair", repairType.equals("incremental") ? "True" : "False");
params.put("force", "true");
Response response = runner.callReaper("POST", "/repair_schedule", Optional.of(params));

int status = response.getStatus();
String responseEntity = response.readEntity(String.class);

Assertions.assertThat(
ImmutableList.of(Response.Status.NO_CONTENT.getStatusCode(), Response.Status.CREATED.getStatusCode()))
.withFailMessage(responseEntity)
.contains(status);
});
}
}

@And("^the last added repair is activated$")
public void the_last_added_repair_is_activated_for() throws Throwable {
synchronized (BasicSteps.class) {
Expand Down
Loading