Skip to content

Commit

Permalink
Make the SchedulingManager more BASE design friendly
Browse files Browse the repository at this point in the history
Running the SchedulingManager concurrently (in different processes) leads to parallel repair runs being spawned at the same time.

This has been implemented by
 - making reads and writes to repair_schedule table strictly consistent (quorum),
 - updating (writes to storage) the repair_schedule incremently in the SchedulingManager.manageSchedule(..) codepath, and
 - multiple checks (reads from storage) through the same codepath.

 ref: #124
  • Loading branch information
michaelsembwever committed Aug 23, 2017
1 parent c791f64 commit 1cc8520
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 96 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/spotify/reaper/AppContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.spotify.reaper.cassandra.JmxConnectionFactory;
import com.spotify.reaper.service.RepairManager;
import com.spotify.reaper.storage.IStorage;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Single class to hold all application global interfacing objects,
* and app global options.
*/
public class AppContext {
public final class AppContext {

public final AtomicBoolean isRunning = new AtomicBoolean(true);
public IStorage storage;
public RepairManager repairManager;
public JmxConnectionFactory jmxConnectionFactory;
Expand Down
243 changes: 154 additions & 89 deletions src/main/java/com/spotify/reaper/service/SchedulingManager.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.spotify.reaper.service;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSchedule;
import com.spotify.reaper.core.RepairUnit;
Expand All @@ -20,7 +20,7 @@
import java.util.TimerTask;
import java.util.UUID;

public class SchedulingManager extends TimerTask {
public final class SchedulingManager extends TimerTask {

private static final Logger LOG = LoggerFactory.getLogger(SchedulingManager.class);

Expand Down Expand Up @@ -59,7 +59,7 @@ public static RepairSchedule resumeRepairSchedule(AppContext context, RepairSche
return updatedSchedule;
}

private AppContext context;
private final AppContext context;

/* nextActivatedSchedule used for nicer logging only */
private RepairSchedule nextActivatedSchedule;
Expand All @@ -73,28 +73,32 @@ private SchedulingManager(AppContext context) {
*/
@Override
public void run() {
LOG.debug("Checking for repair schedules...");
UUID lastId = null;
try {
Collection<RepairSchedule> schedules = context.storage.getAllRepairSchedules();
boolean anyRunStarted = false;
for (RepairSchedule schedule : schedules) {
lastId = schedule.getId();
anyRunStarted = manageSchedule(schedule) || anyRunStarted;
}
if (!anyRunStarted && nextActivatedSchedule != null) {
LOG.debug("not scheduling new repairs yet, next activation is '{}' for schedule id '{}'",
nextActivatedSchedule.getNextActivation(), nextActivatedSchedule.getId());
}
} catch (Exception ex) {
LOG.error("failed managing schedule for run with id: {}", lastId);
LOG.error("catch exception", ex);
try {
assert false : "if assertions are enabled then exit the jvm";
} catch (AssertionError ae) {
LOG.error("SchedulingManager failed. Exiting JVM.");
System.exit(1);
}
if (context.isRunning.get()) {
LOG.debug("Checking for repair schedules...");
UUID lastId = null;
try {
Collection<RepairSchedule> schedules = context.storage.getAllRepairSchedules();
boolean anyRunStarted = false;
for (RepairSchedule schedule : schedules) {
lastId = schedule.getId();
anyRunStarted = manageSchedule(schedule) || anyRunStarted;
}
if (!anyRunStarted && nextActivatedSchedule != null) {
LOG.debug("not scheduling new repairs yet, next activation is '{}' for schedule id '{}'",
nextActivatedSchedule.getNextActivation(), nextActivatedSchedule.getId());
}
} catch (Exception ex) {
LOG.error("failed managing schedule for run with id: {}", lastId);
LOG.error("catch exception", ex);
try {
assert false : "if assertions are enabled then exit the jvm";
} catch (AssertionError ae) {
if (context.isRunning.get()) {
LOG.error("SchedulingManager failed. Exiting JVM.");
System.exit(1);
}
}
}
}
}

Expand All @@ -103,79 +107,140 @@ public void run() {
* @param schedule The schedule to be checked for activation.
* @return boolean indicating whether a new RepairRun instance was created and started.
*/
private boolean manageSchedule(RepairSchedule schedule) {
if (schedule.getNextActivation().isBeforeNow()) {
boolean startNewRun = true;
LOG.info("repair unit '{}' should be repaired based on RepairSchedule with id '{}'",
schedule.getRepairUnitId(), schedule.getId());

RepairUnit repairUnit = null;
if (schedule.getState() == RepairSchedule.State.PAUSED) {
LOG.info("Repair schedule '{}' is paused", schedule.getId());
startNewRun = false;
} else {
Optional<RepairUnit> fetchedUnit =
context.storage.getRepairUnit(schedule.getRepairUnitId());
if (!fetchedUnit.isPresent()) {
LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId());
return false;
}
repairUnit = fetchedUnit.get();
Collection<RepairRun> repairRuns =
context.storage.getRepairRunsForUnit(schedule.getRepairUnitId());
for (RepairRun repairRun : repairRuns) {
RepairRun.RunState state = repairRun.getRunState();
if (state.isActive()) {
LOG.info("there is repair (id {}) in state '{}' for repair unit '{}', "
+ "postponing current schedule trigger until next scheduling",
repairRun.getId(), repairRun.getRunState(), repairUnit.getId());
startNewRun = false;
}
private boolean manageSchedule(RepairSchedule schedule_) {
switch (schedule_.getState()) {
case ACTIVE:
if (schedule_.getNextActivation().isBeforeNow()) {

RepairSchedule schedule = schedule_
.with().nextActivation(schedule_.getFollowingActivation()).build(schedule_.getId());

context.storage.updateRepairSchedule(schedule);

LOG.info(
"repair unit '{}' should be repaired based on RepairSchedule with id '{}'",
schedule.getRepairUnitId(), schedule.getId());

Optional<RepairUnit> fetchedUnit = context.storage.getRepairUnit(schedule.getRepairUnitId());
if (!fetchedUnit.isPresent()) {
LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId());
return false;
}
RepairUnit repairUnit = fetchedUnit.get();
if (repairRunAlreadyScheduled(schedule, repairUnit)) {
return false;
}

try {
RepairRun newRepairRun = createNewRunForUnit(schedule, repairUnit);

ImmutableList<UUID> newRunHistory = new ImmutableList.Builder<UUID>()
.addAll(schedule.getRunHistory()).add(newRepairRun.getId()).build();

RepairSchedule latestSchedule = context.storage.getRepairSchedule(schedule.getId()).get();

if (equal(schedule, latestSchedule)) {

boolean result = context.storage.updateRepairSchedule(
schedule.with()
.runHistory(newRunHistory)
.build(schedule.getId()));
// FIXME – concurrency is broken unless we atomically add/remove run history items
//boolean result = context.storage
// .addRepairRunToRepairSchedule(schedule.getId(), newRepairRun.getId());

if (result) {
context.repairManager.startRepairRun(context, newRepairRun);
return true;
}
} else if (schedule.getRunHistory().size() < latestSchedule.getRunHistory().size()) {
UUID newRepairRunId = latestSchedule.getRunHistory().get(latestSchedule.getRunHistory().size()-1);
LOG.info("schedule {} has already added a new repair run {}", schedule.getId(), newRepairRunId);
// this repair_run is identified as a duplicate (for this activation):
// so take the last repair run, and try start it. it's ok if already running.
newRepairRun = context.storage.getRepairRun(newRepairRunId).get();
context.repairManager.startRepairRun(context, newRepairRun);
} else {
LOG.warn("schedule {} has been altered by someone else. not running repair", schedule.getId());
}
// this duplicated repair_run needs to be removed from the schedule's history
// FIXME – concurrency is broken unless we atomically add/remove run history items
//boolean result = context.storage
// .deleteRepairRunFromRepairSchedule(schedule.getId(), newRepairRun.getId());
} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
}
} else {
if (nextActivatedSchedule == null
|| nextActivatedSchedule.getNextActivation().isAfter(schedule_.getNextActivation())) {

nextActivatedSchedule = schedule_;
}
}
break;
case PAUSED:
LOG.info("Repair schedule '{}' is paused", schedule_.getId());
return false;
default:
throw new AssertionError("illegal schedule state in call to manageSchedule(..): " + schedule_.getState());
}
return false;
}

private static boolean equal(RepairSchedule s1, RepairSchedule s2) {
Preconditions.checkArgument(s1.getId().equals(s2.getId()));
Preconditions.checkArgument(s1.getOwner().equals(s2.getOwner()));
Preconditions.checkArgument(s1.getDaysBetween() == s2.getDaysBetween());
Preconditions.checkArgument(s1.getIntensity() == s2.getIntensity());
Preconditions.checkArgument(s1.getCreationTime().equals(s2.getCreationTime()));
Preconditions.checkArgument(s1.getNextActivation().equals(s2.getNextActivation()));
Preconditions.checkArgument(s1.getFollowingActivation().equals(s2.getFollowingActivation()));

boolean result = s1.getState().equals(s2.getState());
result &= s1.getRunHistory().size() == s2.getRunHistory().size();

for (int i = 0 ; result && i < s1.getRunHistory().size() ; ++i) {
result &= s1.getRunHistory().get(i).equals(s2.getRunHistory().get(i));
}
}
return result;
}

if (startNewRun) {
try {
RepairRun startedRun = startNewRunForUnit(schedule, repairUnit);
ImmutableList<UUID> newRunHistory = new ImmutableList.Builder<UUID>()
.addAll(schedule.getRunHistory()).add(startedRun.getId()).build();
return context.storage.updateRepairSchedule(schedule.with()
.runHistory(newRunHistory)
.nextActivation(schedule.getFollowingActivation())
.build(schedule.getId()));
} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
skipScheduling(schedule);
return false;
private boolean repairRunAlreadyScheduled(RepairSchedule schedule, RepairUnit repairUnit) {
Collection<RepairRun> repairRuns = context.storage.getRepairRunsForUnit(schedule.getRepairUnitId());
for (RepairRun repairRun : repairRuns) {
if (repairRunComesFromSchedule(repairRun, schedule)) {
LOG.info(
"there is repair (id {}) in state '{}' for repair unit '{}', "
+ "postponing current schedule trigger until next scheduling",
repairRun.getId(), repairRun.getRunState(), repairUnit.getId());
return true;
}
}
} else {
skipScheduling(schedule);
return false;
}
} else {
if (nextActivatedSchedule == null || nextActivatedSchedule.getNextActivation().isAfter(schedule.getNextActivation())) {
nextActivatedSchedule = schedule;
}
}
return false;

private static boolean repairRunComesFromSchedule(RepairRun repairRun, RepairSchedule schedule) {
return repairRun.getRunState().isActive()
|| (RepairRun.RunState.NOT_STARTED == repairRun.getRunState()
&& repairRun.getCause().equals(getCauseName(schedule)));
}

private void skipScheduling(RepairSchedule schedule) {
LOG.warn("skip scheduling, next activation for repair schedule '{}' will be: {}",
schedule.getId(), schedule.getFollowingActivation());
context.storage.updateRepairSchedule(schedule.with()
.nextActivation(schedule.getFollowingActivation())
.build(schedule.getId()));

private RepairRun createNewRunForUnit(RepairSchedule schedule, RepairUnit repairUnit) throws ReaperException {

return CommonTools.registerRepairRun(
context,
context.storage.getCluster(repairUnit.getClusterName()).get(),
repairUnit,
Optional.of(getCauseName(schedule)),
schedule.getOwner(),
schedule.getSegmentCount(),
schedule.getRepairParallelism(),
schedule.getIntensity());
}

private RepairRun startNewRunForUnit(RepairSchedule schedule, RepairUnit repairUnit) throws ReaperException{
Cluster cluster = context.storage.getCluster(repairUnit.getClusterName()).get();
RepairRun newRepairRun = CommonTools.registerRepairRun(
context, cluster, repairUnit, Optional.of("scheduled run"),
schedule.getOwner(), schedule.getSegmentCount(), schedule.getRepairParallelism(),
schedule.getIntensity());
context.repairManager.startRepairRun(context, newRepairRun);
return newRepairRun;
private static String getCauseName(RepairSchedule schedule) {
return "scheduled run (schedule id " + schedule.getId().toString() + ')';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.QueryLogger;
import com.datastax.driver.core.ResultSet;
Expand Down Expand Up @@ -130,8 +131,8 @@ private void prepareStatements(){
insertRepairSegmentPrepStmt = session.prepare("INSERT INTO repair_run(id, segment_id, repair_unit_id, start_token, end_token, segment_state, coordinator_host, segment_start_time, segment_end_time, fail_count) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
getRepairSegmentPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ? and segment_id = ?");
getRepairSegmentsByRunIdPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ?");
insertRepairSchedulePrepStmt = session.prepare("INSERT INTO repair_schedule_v1(id, repair_unit_id, state, days_between, next_activation, run_history, segment_count, repair_parallelism, intensity, creation_time, owner, pause_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
getRepairSchedulePrepStmt = session.prepare("SELECT * FROM repair_schedule_v1 WHERE id = ?");
insertRepairSchedulePrepStmt = session.prepare("INSERT INTO repair_schedule_v1(id, repair_unit_id, state, days_between, next_activation, run_history, segment_count, repair_parallelism, intensity, creation_time, owner, pause_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)").setConsistencyLevel(ConsistencyLevel.QUORUM);
getRepairSchedulePrepStmt = session.prepare("SELECT * FROM repair_schedule_v1 WHERE id = ?").setConsistencyLevel(ConsistencyLevel.QUORUM);
insertRepairScheduleByClusterAndKsPrepStmt = session.prepare("INSERT INTO repair_schedule_by_cluster_and_keyspace(cluster_name, keyspace_name, repair_schedule_id) VALUES(?, ?, ?)");
getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?");
deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule_v1 WHERE id = ?");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class ReaperCassandraIT {

@BeforeClass
public static void setUp() throws Exception {
LOG.info("setting up testing Reaper runner with {} seed hosts defined and cassandra storage",
TestContext.TEST_CLUSTER_SEED_HOSTS.size());
AppContext context = new AppContext();
LOG.info(
"setting up testing Reaper runner with {} seed hosts defined and cassandra storage",
TestContext.TEST_CLUSTER_SEED_HOSTS.size());
initSchema();
runnerInstance = ReaperTestJettyRunner.setup(context, CASS_CONFIG_FILE);
runnerInstance = ReaperTestJettyRunner.setup(new AppContext(), CASS_CONFIG_FILE);
BasicSteps.setReaperClient(ReaperTestJettyRunner.getClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public Application<ReaperApplicationConfiguration> newApplication()
{
return new ReaperApplication(this.context);
}

@Override
public void after() {
context.isRunning.set(false);
try {
Thread.sleep(100);
} catch (InterruptedException ex) {}
super.after();
}
}

}

0 comments on commit 1cc8520

Please sign in to comment.