From 1cc8520181d28718de72717566dc8de6c192799d Mon Sep 17 00:00:00 2001 From: mck Date: Thu, 29 Jun 2017 18:39:48 +1000 Subject: [PATCH] Make the SchedulingManager more BASE design friendly Running the SchedulingManager concurrently (in different processes) leads to parallel repair runs being spawned at the same time. This has been implemented by - making reads and writes to repair_schedule table strictly consistent (quorum), - updating (writes to storage) the repair_schedule incremently in the SchedulingManager.manageSchedule(..) codepath, and - multiple checks (reads from storage) through the same codepath. ref: https://github.com/thelastpickle/cassandra-reaper/pull/124 --- .../java/com/spotify/reaper/AppContext.java | 4 +- .../reaper/service/SchedulingManager.java | 243 +++++++++++------- .../reaper/storage/CassandraStorage.java | 5 +- .../reaper/acceptance/ReaperCassandraIT.java | 8 +- .../acceptance/ReaperTestJettyRunner.java | 9 + 5 files changed, 173 insertions(+), 96 deletions(-) diff --git a/src/main/java/com/spotify/reaper/AppContext.java b/src/main/java/com/spotify/reaper/AppContext.java index d9b2f2906..bc24619b2 100644 --- a/src/main/java/com/spotify/reaper/AppContext.java +++ b/src/main/java/com/spotify/reaper/AppContext.java @@ -3,13 +3,15 @@ import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RepairManager; import com.spotify.reaper.storage.IStorage; +import java.util.concurrent.atomic.AtomicBoolean; /** * Single class to hold all application global interfacing objects, * and app global options. */ -public class AppContext { +public final class AppContext { + public final AtomicBoolean isRunning = new AtomicBoolean(true); public IStorage storage; public RepairManager repairManager; public JmxConnectionFactory jmxConnectionFactory; diff --git a/src/main/java/com/spotify/reaper/service/SchedulingManager.java b/src/main/java/com/spotify/reaper/service/SchedulingManager.java index a0342fbee..b1e442aea 100644 --- a/src/main/java/com/spotify/reaper/service/SchedulingManager.java +++ b/src/main/java/com/spotify/reaper/service/SchedulingManager.java @@ -1,11 +1,11 @@ package com.spotify.reaper.service; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.core.Cluster; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSchedule; import com.spotify.reaper.core.RepairUnit; @@ -20,7 +20,7 @@ import java.util.TimerTask; import java.util.UUID; -public class SchedulingManager extends TimerTask { +public final class SchedulingManager extends TimerTask { private static final Logger LOG = LoggerFactory.getLogger(SchedulingManager.class); @@ -59,7 +59,7 @@ public static RepairSchedule resumeRepairSchedule(AppContext context, RepairSche return updatedSchedule; } - private AppContext context; + private final AppContext context; /* nextActivatedSchedule used for nicer logging only */ private RepairSchedule nextActivatedSchedule; @@ -73,28 +73,32 @@ private SchedulingManager(AppContext context) { */ @Override public void run() { - LOG.debug("Checking for repair schedules..."); - UUID lastId = null; - try { - Collection schedules = context.storage.getAllRepairSchedules(); - boolean anyRunStarted = false; - for (RepairSchedule schedule : schedules) { - lastId = schedule.getId(); - anyRunStarted = manageSchedule(schedule) || anyRunStarted; - } - if (!anyRunStarted && nextActivatedSchedule != null) { - LOG.debug("not scheduling new repairs yet, next activation is '{}' for schedule id '{}'", - nextActivatedSchedule.getNextActivation(), nextActivatedSchedule.getId()); - } - } catch (Exception ex) { - LOG.error("failed managing schedule for run with id: {}", lastId); - LOG.error("catch exception", ex); - try { - assert false : "if assertions are enabled then exit the jvm"; - } catch (AssertionError ae) { - LOG.error("SchedulingManager failed. Exiting JVM."); - System.exit(1); - } + if (context.isRunning.get()) { + LOG.debug("Checking for repair schedules..."); + UUID lastId = null; + try { + Collection schedules = context.storage.getAllRepairSchedules(); + boolean anyRunStarted = false; + for (RepairSchedule schedule : schedules) { + lastId = schedule.getId(); + anyRunStarted = manageSchedule(schedule) || anyRunStarted; + } + if (!anyRunStarted && nextActivatedSchedule != null) { + LOG.debug("not scheduling new repairs yet, next activation is '{}' for schedule id '{}'", + nextActivatedSchedule.getNextActivation(), nextActivatedSchedule.getId()); + } + } catch (Exception ex) { + LOG.error("failed managing schedule for run with id: {}", lastId); + LOG.error("catch exception", ex); + try { + assert false : "if assertions are enabled then exit the jvm"; + } catch (AssertionError ae) { + if (context.isRunning.get()) { + LOG.error("SchedulingManager failed. Exiting JVM."); + System.exit(1); + } + } + } } } @@ -103,79 +107,140 @@ public void run() { * @param schedule The schedule to be checked for activation. * @return boolean indicating whether a new RepairRun instance was created and started. */ - private boolean manageSchedule(RepairSchedule schedule) { - if (schedule.getNextActivation().isBeforeNow()) { - boolean startNewRun = true; - LOG.info("repair unit '{}' should be repaired based on RepairSchedule with id '{}'", - schedule.getRepairUnitId(), schedule.getId()); - - RepairUnit repairUnit = null; - if (schedule.getState() == RepairSchedule.State.PAUSED) { - LOG.info("Repair schedule '{}' is paused", schedule.getId()); - startNewRun = false; - } else { - Optional fetchedUnit = - context.storage.getRepairUnit(schedule.getRepairUnitId()); - if (!fetchedUnit.isPresent()) { - LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId()); - return false; - } - repairUnit = fetchedUnit.get(); - Collection repairRuns = - context.storage.getRepairRunsForUnit(schedule.getRepairUnitId()); - for (RepairRun repairRun : repairRuns) { - RepairRun.RunState state = repairRun.getRunState(); - if (state.isActive()) { - LOG.info("there is repair (id {}) in state '{}' for repair unit '{}', " - + "postponing current schedule trigger until next scheduling", - repairRun.getId(), repairRun.getRunState(), repairUnit.getId()); - startNewRun = false; - } + private boolean manageSchedule(RepairSchedule schedule_) { + switch (schedule_.getState()) { + case ACTIVE: + if (schedule_.getNextActivation().isBeforeNow()) { + + RepairSchedule schedule = schedule_ + .with().nextActivation(schedule_.getFollowingActivation()).build(schedule_.getId()); + + context.storage.updateRepairSchedule(schedule); + + LOG.info( + "repair unit '{}' should be repaired based on RepairSchedule with id '{}'", + schedule.getRepairUnitId(), schedule.getId()); + + Optional fetchedUnit = context.storage.getRepairUnit(schedule.getRepairUnitId()); + if (!fetchedUnit.isPresent()) { + LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId()); + return false; + } + RepairUnit repairUnit = fetchedUnit.get(); + if (repairRunAlreadyScheduled(schedule, repairUnit)) { + return false; + } + + try { + RepairRun newRepairRun = createNewRunForUnit(schedule, repairUnit); + + ImmutableList newRunHistory = new ImmutableList.Builder() + .addAll(schedule.getRunHistory()).add(newRepairRun.getId()).build(); + + RepairSchedule latestSchedule = context.storage.getRepairSchedule(schedule.getId()).get(); + + if (equal(schedule, latestSchedule)) { + + boolean result = context.storage.updateRepairSchedule( + schedule.with() + .runHistory(newRunHistory) + .build(schedule.getId())); + // FIXME – concurrency is broken unless we atomically add/remove run history items + //boolean result = context.storage + // .addRepairRunToRepairSchedule(schedule.getId(), newRepairRun.getId()); + + if (result) { + context.repairManager.startRepairRun(context, newRepairRun); + return true; + } + } else if (schedule.getRunHistory().size() < latestSchedule.getRunHistory().size()) { + UUID newRepairRunId = latestSchedule.getRunHistory().get(latestSchedule.getRunHistory().size()-1); + LOG.info("schedule {} has already added a new repair run {}", schedule.getId(), newRepairRunId); + // this repair_run is identified as a duplicate (for this activation): + // so take the last repair run, and try start it. it's ok if already running. + newRepairRun = context.storage.getRepairRun(newRepairRunId).get(); + context.repairManager.startRepairRun(context, newRepairRun); + } else { + LOG.warn("schedule {} has been altered by someone else. not running repair", schedule.getId()); + } + // this duplicated repair_run needs to be removed from the schedule's history + // FIXME – concurrency is broken unless we atomically add/remove run history items + //boolean result = context.storage + // .deleteRepairRunFromRepairSchedule(schedule.getId(), newRepairRun.getId()); + } catch (ReaperException e) { + LOG.error(e.getMessage(), e); + } + } else { + if (nextActivatedSchedule == null + || nextActivatedSchedule.getNextActivation().isAfter(schedule_.getNextActivation())) { + + nextActivatedSchedule = schedule_; + } + } + break; + case PAUSED: + LOG.info("Repair schedule '{}' is paused", schedule_.getId()); + return false; + default: + throw new AssertionError("illegal schedule state in call to manageSchedule(..): " + schedule_.getState()); + } + return false; + } + + private static boolean equal(RepairSchedule s1, RepairSchedule s2) { + Preconditions.checkArgument(s1.getId().equals(s2.getId())); + Preconditions.checkArgument(s1.getOwner().equals(s2.getOwner())); + Preconditions.checkArgument(s1.getDaysBetween() == s2.getDaysBetween()); + Preconditions.checkArgument(s1.getIntensity() == s2.getIntensity()); + Preconditions.checkArgument(s1.getCreationTime().equals(s2.getCreationTime())); + Preconditions.checkArgument(s1.getNextActivation().equals(s2.getNextActivation())); + Preconditions.checkArgument(s1.getFollowingActivation().equals(s2.getFollowingActivation())); + + boolean result = s1.getState().equals(s2.getState()); + result &= s1.getRunHistory().size() == s2.getRunHistory().size(); + + for (int i = 0 ; result && i < s1.getRunHistory().size() ; ++i) { + result &= s1.getRunHistory().get(i).equals(s2.getRunHistory().get(i)); } - } + return result; + } - if (startNewRun) { - try { - RepairRun startedRun = startNewRunForUnit(schedule, repairUnit); - ImmutableList newRunHistory = new ImmutableList.Builder() - .addAll(schedule.getRunHistory()).add(startedRun.getId()).build(); - return context.storage.updateRepairSchedule(schedule.with() - .runHistory(newRunHistory) - .nextActivation(schedule.getFollowingActivation()) - .build(schedule.getId())); - } catch (ReaperException e) { - LOG.error(e.getMessage(), e); - skipScheduling(schedule); - return false; + private boolean repairRunAlreadyScheduled(RepairSchedule schedule, RepairUnit repairUnit) { + Collection repairRuns = context.storage.getRepairRunsForUnit(schedule.getRepairUnitId()); + for (RepairRun repairRun : repairRuns) { + if (repairRunComesFromSchedule(repairRun, schedule)) { + LOG.info( + "there is repair (id {}) in state '{}' for repair unit '{}', " + + "postponing current schedule trigger until next scheduling", + repairRun.getId(), repairRun.getRunState(), repairUnit.getId()); + return true; + } } - } else { - skipScheduling(schedule); return false; - } - } else { - if (nextActivatedSchedule == null || nextActivatedSchedule.getNextActivation().isAfter(schedule.getNextActivation())) { - nextActivatedSchedule = schedule; - } } - return false; + + private static boolean repairRunComesFromSchedule(RepairRun repairRun, RepairSchedule schedule) { + return repairRun.getRunState().isActive() + || (RepairRun.RunState.NOT_STARTED == repairRun.getRunState() + && repairRun.getCause().equals(getCauseName(schedule))); } - private void skipScheduling(RepairSchedule schedule) { - LOG.warn("skip scheduling, next activation for repair schedule '{}' will be: {}", - schedule.getId(), schedule.getFollowingActivation()); - context.storage.updateRepairSchedule(schedule.with() - .nextActivation(schedule.getFollowingActivation()) - .build(schedule.getId())); + + private RepairRun createNewRunForUnit(RepairSchedule schedule, RepairUnit repairUnit) throws ReaperException { + + return CommonTools.registerRepairRun( + context, + context.storage.getCluster(repairUnit.getClusterName()).get(), + repairUnit, + Optional.of(getCauseName(schedule)), + schedule.getOwner(), + schedule.getSegmentCount(), + schedule.getRepairParallelism(), + schedule.getIntensity()); } - private RepairRun startNewRunForUnit(RepairSchedule schedule, RepairUnit repairUnit) throws ReaperException{ - Cluster cluster = context.storage.getCluster(repairUnit.getClusterName()).get(); - RepairRun newRepairRun = CommonTools.registerRepairRun( - context, cluster, repairUnit, Optional.of("scheduled run"), - schedule.getOwner(), schedule.getSegmentCount(), schedule.getRepairParallelism(), - schedule.getIntensity()); - context.repairManager.startRepairRun(context, newRepairRun); - return newRepairRun; + private static String getCauseName(RepairSchedule schedule) { + return "scheduled run (schedule id " + schedule.getId().toString() + ')'; } } diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index 8e191f6dd..5c9abff86 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -15,6 +15,7 @@ import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.QueryLogger; import com.datastax.driver.core.ResultSet; @@ -130,8 +131,8 @@ private void prepareStatements(){ insertRepairSegmentPrepStmt = session.prepare("INSERT INTO repair_run(id, segment_id, repair_unit_id, start_token, end_token, segment_state, coordinator_host, segment_start_time, segment_end_time, fail_count) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); getRepairSegmentPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ? and segment_id = ?"); getRepairSegmentsByRunIdPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ?"); - insertRepairSchedulePrepStmt = session.prepare("INSERT INTO repair_schedule_v1(id, repair_unit_id, state, days_between, next_activation, run_history, segment_count, repair_parallelism, intensity, creation_time, owner, pause_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); - getRepairSchedulePrepStmt = session.prepare("SELECT * FROM repair_schedule_v1 WHERE id = ?"); + insertRepairSchedulePrepStmt = session.prepare("INSERT INTO repair_schedule_v1(id, repair_unit_id, state, days_between, next_activation, run_history, segment_count, repair_parallelism, intensity, creation_time, owner, pause_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)").setConsistencyLevel(ConsistencyLevel.QUORUM); + getRepairSchedulePrepStmt = session.prepare("SELECT * FROM repair_schedule_v1 WHERE id = ?").setConsistencyLevel(ConsistencyLevel.QUORUM); insertRepairScheduleByClusterAndKsPrepStmt = session.prepare("INSERT INTO repair_schedule_by_cluster_and_keyspace(cluster_name, keyspace_name, repair_schedule_id) VALUES(?, ?, ?)"); getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?"); deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule_v1 WHERE id = ?"); diff --git a/src/test/java/com/spotify/reaper/acceptance/ReaperCassandraIT.java b/src/test/java/com/spotify/reaper/acceptance/ReaperCassandraIT.java index 8013c06ca..cdfb060e3 100644 --- a/src/test/java/com/spotify/reaper/acceptance/ReaperCassandraIT.java +++ b/src/test/java/com/spotify/reaper/acceptance/ReaperCassandraIT.java @@ -45,11 +45,11 @@ public class ReaperCassandraIT { @BeforeClass public static void setUp() throws Exception { - LOG.info("setting up testing Reaper runner with {} seed hosts defined and cassandra storage", - TestContext.TEST_CLUSTER_SEED_HOSTS.size()); - AppContext context = new AppContext(); + LOG.info( + "setting up testing Reaper runner with {} seed hosts defined and cassandra storage", + TestContext.TEST_CLUSTER_SEED_HOSTS.size()); initSchema(); - runnerInstance = ReaperTestJettyRunner.setup(context, CASS_CONFIG_FILE); + runnerInstance = ReaperTestJettyRunner.setup(new AppContext(), CASS_CONFIG_FILE); BasicSteps.setReaperClient(ReaperTestJettyRunner.getClient()); } diff --git a/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java index c3a7ee85e..099d3fa31 100644 --- a/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java +++ b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java @@ -111,6 +111,15 @@ public Application newApplication() { return new ReaperApplication(this.context); } + + @Override + public void after() { + context.isRunning.set(false); + try { + Thread.sleep(100); + } catch (InterruptedException ex) {} + super.after(); + } } }