diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index 56f2d59c9..3c7bf7036 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -5,6 +5,7 @@ import com.google.common.collect.Lists; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.core.RepairSegment; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageServiceMBean; @@ -146,6 +147,12 @@ public String getClusterName() { return toSymbolicName(ssProxy.getClusterName()); } + public int triggerRepair(RepairSegment segment) { + return this.triggerRepair(segment.getStartToken(), segment.getEndToken(), + segment.getColumnFamily().getKeyspaceName(), + segment.getColumnFamily().getName()); + } + /** * Triggers a repair of range (beginToken, endToken] for given keyspace and column family. * diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 3fe50f786..14fb81b03 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -12,7 +12,7 @@ public class RepairRun { private final String cause; private final String owner; - private final State state; + private final RunState runState; private final DateTime creationTime; private final DateTime startTime; private final DateTime endTime; @@ -30,8 +30,8 @@ public String getOwner() { return owner; } - public State getState() { - return state; + public RunState getState() { + return runState; } public DateTime getCreationTime() { @@ -50,9 +50,10 @@ public double getIntensity() { return intensity; } - public enum State { + public enum RunState { NOT_STARTED, RUNNING, + ERROR, DONE, PAUSED } @@ -61,7 +62,7 @@ private RepairRun(Builder builder, long id) { this.id = id; this.cause = builder.cause; this.owner = builder.owner; - this.state = builder.state; + this.runState = builder.runState; this.creationTime = builder.creationTime; this.startTime = builder.startTime; this.endTime = builder.endTime; @@ -70,7 +71,7 @@ private RepairRun(Builder builder, long id) { public static class Builder { - public final State state; + public final RunState runState; public final DateTime creationTime; public final double intensity; private String cause; @@ -78,8 +79,8 @@ public static class Builder { private DateTime startTime; private DateTime endTime; - public Builder(State state, DateTime creationTime, double intensity) { - this.state = state; + public Builder(RunState runState, DateTime creationTime, double intensity) { + this.runState = runState; this.creationTime = creationTime; this.intensity = intensity; } diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index 2d7339cf6..a5d2e9e0d 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -57,6 +57,12 @@ public DateTime getEndTime() { return endTime; } + public static RepairSegment.Builder getCopy(RepairSegment origSegment, State newState) { + return new RepairSegment.Builder(origSegment.getColumnFamily(), origSegment.getRunID(), + origSegment.getStartToken(), origSegment.getEndToken(), + newState); + } + public enum State { NOT_STARTED, RUNNING, diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index a859a5e99..78a8307ce 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -143,7 +143,7 @@ public Response addTable(@Context UriInfo uriInfo, } RepairRun newRepairRun = - storage.addRepairRun(new RepairRun.Builder(RepairRun.State.NOT_STARTED, DateTime.now(), + storage.addRepairRun(new RepairRun.Builder(RepairRun.RunState.NOT_STARTED, DateTime.now(), config.getRepairIntensity()) .cause(cause.isPresent() ? cause.get() : "no cause specified") .owner(owner.get())); diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 7b4f78518..8901a82c0 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -18,8 +18,8 @@ import java.util.concurrent.TimeUnit; /** - * RepairRunner controls single RepairRun, works in a separate thread, and dies when the RepairRun - * in question is complete. + * RepairRunner controls single RepairRun, is invoked in scheduled manner on separate thread, + * and dies when the RepairRun in question is complete. * * State of the RepairRun is in the Reaper storage, so if Reaper service is restarted, new * RepairRunner will be spawned upon restart. @@ -61,33 +61,49 @@ public static void startNewRepairRun(IStorage storage, RepairRun repairRun, /** * This run() method is run in scheduled manner, so don't make this blocking! + * + * NOTICE: Do scheduling next execution only in this method, or when starting run. + * Otherwise it is a risk to have multiple parallel scheduling for same run. */ @Override public void run() { LOG.debug("RepairRunner run on RepairRun \"{}\" with current segment id \"{}\"", repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getStartToken()); + if (!checkJmxProxyInitialized()) { + LOG.error("failed to initialize JMX proxy, retrying after {} seconds", + JMX_FAILURE_SLEEP_DELAY_SEC); + executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SEC, TimeUnit.SECONDS); + // TODO: should we change current segment run state to UNKNOWN now? + return; + } + // Need to check current status from database every time, if state changed etc. repairRun = storage.getRepairRun(repairRun.getId()); - RepairRun.State state = repairRun.getState(); + RepairRun.RunState runState = repairRun.getState(); - switch (state) { + switch (runState) { case NOT_STARTED: checkIfNeedToStartNextSegment(); break; case RUNNING: checkIfNeedToStartNextSegment(); break; + case ERROR: + LOG.warn("repair run {} in ERROR, not doing anything", repairRun.getId()); + finishRepairRun(); + return; // no new run scheduling case PAUSED: startNextSegmentEarliest = DateTime.now().plusSeconds(10); break; case DONE: finishRepairRun(); - return; + return; // no new run scheduling } int sleepTime = Seconds.secondsBetween(DateTime.now(), startNextSegmentEarliest).getSeconds(); - executor.schedule(this, sleepTime > 0 ? sleepTime : 1, TimeUnit.SECONDS); + sleepTime = sleepTime > 0 ? sleepTime : 1; + executor.schedule(this, sleepTime, TimeUnit.SECONDS); } private boolean checkJmxProxyInitialized() { @@ -96,10 +112,7 @@ private boolean checkJmxProxyInitialized() { try { jmxProxy = JmxProxy.connect(clusterSeedHost); } catch (ReaperException e) { - LOG.error("failed to initialize JMX proxy, retrying after {} seconds", - JMX_FAILURE_SLEEP_DELAY_SEC); e.printStackTrace(); - executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SEC, TimeUnit.SECONDS); return false; } } @@ -107,10 +120,89 @@ private boolean checkJmxProxyInitialized() { } private void checkIfNeedToStartNextSegment() { + // TODO: Should this be synchronized by each repair run id? + if (repairRun.getState() == RepairRun.RunState.PAUSED + || repairRun.getState() == RepairRun.RunState.DONE + || repairRun.getState() == RepairRun.RunState.ERROR) { + LOG.debug("not starting new segment if repair run is not running: {}", repairRun.getId()); + return; + } + + int newRepairCommandId = -1; + if (null == currentSegment) { + currentSegment = storage.getNextFreeSegment(repairRun.getId()); + if (null == currentSegment) { + LOG.error("first segment not found for repair run {}", repairRun.getId()); + changeCurrentRepairRunState(RepairRun.RunState.ERROR); + return; + } + LOG.info("triggering repair on segment {} with start token {} on run id {}", + currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId()); + newRepairCommandId = jmxProxy.triggerRepair(currentSegment); + if (repairRun.getState() == RepairRun.RunState.NOT_STARTED) { + LOG.info("started new repair run {}", repairRun.getId()); + changeCurrentRepairRunState(RepairRun.RunState.RUNNING); + } + else { + assert repairRun.getState() == RepairRun.RunState.RUNNING : "logical error in run state"; + LOG.info("started existing repair run {}", repairRun.getId()); + } + } + else { + LOG.debug("checking whether we need to start new segment on run: {}", repairRun.getId()); + currentSegment = storage.getRepairSegment(currentSegment.getId()); + + if (currentSegment.getState() == RepairSegment.State.RUNNING) { + LOG.info("segment {} still running on run {}", currentSegment.getId(), repairRun.getId()); + } + else if (currentSegment.getState() == RepairSegment.State.ERROR) { + LOG.error("current segment {} in ERROR status for run {}", + currentSegment.getId(), repairRun.getId()); + changeCurrentRepairRunState(RepairRun.RunState.ERROR); + return; + } + else if (currentSegment.getState() == RepairSegment.State.NOT_STARTED) { + LOG.warn("segment {} repair not started, even triggered for run {}, re-triggering now", + currentSegment.getId(), repairRun.getId()); + newRepairCommandId = jmxProxy.triggerRepair(currentSegment); + } + else if (currentSegment.getState() == RepairSegment.State.DONE) { + LOG.warn("segment {} repair completed for run {}", + currentSegment.getId(), repairRun.getId()); + currentSegment = storage.getNextFreeSegment(repairRun.getId()); + if (null == currentSegment) { + LOG.info("no new free segment found for repair run {}", repairRun.getId()); + changeCurrentRepairRunState(RepairRun.RunState.DONE); + return; + } + + LOG.info("triggering repair on segment {} with start token {} on run id {}", + currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId()); + newRepairCommandId = jmxProxy.triggerRepair(currentSegment); + assert repairRun.getState() == RepairRun.RunState.RUNNING : "logical error in run state"; + } + } + + if (newRepairCommandId > 0) { + RepairSegment updatedSegment = RepairSegment.getCopy(currentSegment, + currentSegment.getState()) + .repairCommandId(newRepairCommandId).build(currentSegment.getId()); + storage.updateRepairSegment(updatedSegment); + LOG.debug("updated segment {} repair command id to {}", + currentSegment.getId(), currentSegment.getRepairCommandId()); + } + + // TODO: should sleep time be relative to past performance? + startNextSegmentEarliest = DateTime.now().plusSeconds(5); + } + + private void changeCurrentRepairRunState(RepairRun.RunState runState) { + LOG.info("repair run with id {} state change from {} to {}", + repairRun.getId(), repairRun.getState().toString(), runState.toString()); // TODO: } - private void changeCurrentSegmentState(RepairSegment.State running) { + private void changeCurrentSegmentState(RepairSegment.State state) { // TODO: }