Skip to content

Commit

Permalink
almost finished the runner implementation, still wip
Browse files Browse the repository at this point in the history
  • Loading branch information
varjoranta committed Dec 5, 2014
1 parent 1281a1a commit 6ecc29c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 19 deletions.
7 changes: 7 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.collect.Lists;

import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.RepairSegment;

import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageServiceMBean;
Expand Down Expand Up @@ -146,6 +147,12 @@ public String getClusterName() {
return toSymbolicName(ssProxy.getClusterName());
}

public int triggerRepair(RepairSegment segment) {
return this.triggerRepair(segment.getStartToken(), segment.getEndToken(),
segment.getColumnFamily().getKeyspaceName(),
segment.getColumnFamily().getName());
}

/**
* Triggers a repair of range (beginToken, endToken] for given keyspace and column family.
*
Expand Down
17 changes: 9 additions & 8 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class RepairRun {

private final String cause;
private final String owner;
private final State state;
private final RunState runState;
private final DateTime creationTime;
private final DateTime startTime;
private final DateTime endTime;
Expand All @@ -30,8 +30,8 @@ public String getOwner() {
return owner;
}

public State getState() {
return state;
public RunState getState() {
return runState;
}

public DateTime getCreationTime() {
Expand All @@ -50,9 +50,10 @@ public double getIntensity() {
return intensity;
}

public enum State {
public enum RunState {
NOT_STARTED,
RUNNING,
ERROR,
DONE,
PAUSED
}
Expand All @@ -61,7 +62,7 @@ private RepairRun(Builder builder, long id) {
this.id = id;
this.cause = builder.cause;
this.owner = builder.owner;
this.state = builder.state;
this.runState = builder.runState;
this.creationTime = builder.creationTime;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
Expand All @@ -70,16 +71,16 @@ private RepairRun(Builder builder, long id) {

public static class Builder {

public final State state;
public final RunState runState;
public final DateTime creationTime;
public final double intensity;
private String cause;
private String owner;
private DateTime startTime;
private DateTime endTime;

public Builder(State state, DateTime creationTime, double intensity) {
this.state = state;
public Builder(RunState runState, DateTime creationTime, double intensity) {
this.runState = runState;
this.creationTime = creationTime;
this.intensity = intensity;
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public DateTime getEndTime() {
return endTime;
}

public static RepairSegment.Builder getCopy(RepairSegment origSegment, State newState) {
return new RepairSegment.Builder(origSegment.getColumnFamily(), origSegment.getRunID(),
origSegment.getStartToken(), origSegment.getEndToken(),
newState);
}

public enum State {
NOT_STARTED,
RUNNING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Response addTable(@Context UriInfo uriInfo,
}

RepairRun newRepairRun =
storage.addRepairRun(new RepairRun.Builder(RepairRun.State.NOT_STARTED, DateTime.now(),
storage.addRepairRun(new RepairRun.Builder(RepairRun.RunState.NOT_STARTED, DateTime.now(),
config.getRepairIntensity())
.cause(cause.isPresent() ? cause.get() : "no cause specified")
.owner(owner.get()));
Expand Down
112 changes: 102 additions & 10 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.concurrent.TimeUnit;

/**
* RepairRunner controls single RepairRun, works in a separate thread, and dies when the RepairRun
* in question is complete.
* RepairRunner controls single RepairRun, is invoked in scheduled manner on separate thread,
* and dies when the RepairRun in question is complete.
*
* State of the RepairRun is in the Reaper storage, so if Reaper service is restarted, new
* RepairRunner will be spawned upon restart.
Expand Down Expand Up @@ -61,33 +61,49 @@ public static void startNewRepairRun(IStorage storage, RepairRun repairRun,

/**
* This run() method is run in scheduled manner, so don't make this blocking!
*
* NOTICE: Do scheduling next execution only in this method, or when starting run.
* Otherwise it is a risk to have multiple parallel scheduling for same run.
*/
@Override
public void run() {
LOG.debug("RepairRunner run on RepairRun \"{}\" with current segment id \"{}\"",
repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getStartToken());

if (!checkJmxProxyInitialized()) {
LOG.error("failed to initialize JMX proxy, retrying after {} seconds",
JMX_FAILURE_SLEEP_DELAY_SEC);
executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SEC, TimeUnit.SECONDS);
// TODO: should we change current segment run state to UNKNOWN now?
return;
}

// Need to check current status from database every time, if state changed etc.
repairRun = storage.getRepairRun(repairRun.getId());
RepairRun.State state = repairRun.getState();
RepairRun.RunState runState = repairRun.getState();

switch (state) {
switch (runState) {
case NOT_STARTED:
checkIfNeedToStartNextSegment();
break;
case RUNNING:
checkIfNeedToStartNextSegment();
break;
case ERROR:
LOG.warn("repair run {} in ERROR, not doing anything", repairRun.getId());
finishRepairRun();
return; // no new run scheduling
case PAUSED:
startNextSegmentEarliest = DateTime.now().plusSeconds(10);
break;
case DONE:
finishRepairRun();
return;
return; // no new run scheduling
}

int sleepTime = Seconds.secondsBetween(DateTime.now(), startNextSegmentEarliest).getSeconds();
executor.schedule(this, sleepTime > 0 ? sleepTime : 1, TimeUnit.SECONDS);
sleepTime = sleepTime > 0 ? sleepTime : 1;
executor.schedule(this, sleepTime, TimeUnit.SECONDS);
}

private boolean checkJmxProxyInitialized() {
Expand All @@ -96,21 +112,97 @@ private boolean checkJmxProxyInitialized() {
try {
jmxProxy = JmxProxy.connect(clusterSeedHost);
} catch (ReaperException e) {
LOG.error("failed to initialize JMX proxy, retrying after {} seconds",
JMX_FAILURE_SLEEP_DELAY_SEC);
e.printStackTrace();
executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SEC, TimeUnit.SECONDS);
return false;
}
}
return true;
}

private void checkIfNeedToStartNextSegment() {
// TODO: Should this be synchronized by each repair run id?
if (repairRun.getState() == RepairRun.RunState.PAUSED
|| repairRun.getState() == RepairRun.RunState.DONE
|| repairRun.getState() == RepairRun.RunState.ERROR) {
LOG.debug("not starting new segment if repair run is not running: {}", repairRun.getId());
return;
}

int newRepairCommandId = -1;
if (null == currentSegment) {
currentSegment = storage.getNextFreeSegment(repairRun.getId());
if (null == currentSegment) {
LOG.error("first segment not found for repair run {}", repairRun.getId());
changeCurrentRepairRunState(RepairRun.RunState.ERROR);
return;
}
LOG.info("triggering repair on segment {} with start token {} on run id {}",
currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId());
newRepairCommandId = jmxProxy.triggerRepair(currentSegment);
if (repairRun.getState() == RepairRun.RunState.NOT_STARTED) {
LOG.info("started new repair run {}", repairRun.getId());
changeCurrentRepairRunState(RepairRun.RunState.RUNNING);
}
else {
assert repairRun.getState() == RepairRun.RunState.RUNNING : "logical error in run state";
LOG.info("started existing repair run {}", repairRun.getId());
}
}
else {
LOG.debug("checking whether we need to start new segment on run: {}", repairRun.getId());
currentSegment = storage.getRepairSegment(currentSegment.getId());

if (currentSegment.getState() == RepairSegment.State.RUNNING) {
LOG.info("segment {} still running on run {}", currentSegment.getId(), repairRun.getId());
}
else if (currentSegment.getState() == RepairSegment.State.ERROR) {
LOG.error("current segment {} in ERROR status for run {}",
currentSegment.getId(), repairRun.getId());
changeCurrentRepairRunState(RepairRun.RunState.ERROR);
return;
}
else if (currentSegment.getState() == RepairSegment.State.NOT_STARTED) {
LOG.warn("segment {} repair not started, even triggered for run {}, re-triggering now",
currentSegment.getId(), repairRun.getId());
newRepairCommandId = jmxProxy.triggerRepair(currentSegment);
}
else if (currentSegment.getState() == RepairSegment.State.DONE) {
LOG.warn("segment {} repair completed for run {}",
currentSegment.getId(), repairRun.getId());
currentSegment = storage.getNextFreeSegment(repairRun.getId());
if (null == currentSegment) {
LOG.info("no new free segment found for repair run {}", repairRun.getId());
changeCurrentRepairRunState(RepairRun.RunState.DONE);
return;
}

LOG.info("triggering repair on segment {} with start token {} on run id {}",
currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId());
newRepairCommandId = jmxProxy.triggerRepair(currentSegment);
assert repairRun.getState() == RepairRun.RunState.RUNNING : "logical error in run state";
}
}

if (newRepairCommandId > 0) {
RepairSegment updatedSegment = RepairSegment.getCopy(currentSegment,
currentSegment.getState())
.repairCommandId(newRepairCommandId).build(currentSegment.getId());
storage.updateRepairSegment(updatedSegment);
LOG.debug("updated segment {} repair command id to {}",
currentSegment.getId(), currentSegment.getRepairCommandId());
}

// TODO: should sleep time be relative to past performance?
startNextSegmentEarliest = DateTime.now().plusSeconds(5);
}

private void changeCurrentRepairRunState(RepairRun.RunState runState) {
LOG.info("repair run with id {} state change from {} to {}",
repairRun.getId(), repairRun.getState().toString(), runState.toString());
// TODO:
}

private void changeCurrentSegmentState(RepairSegment.State running) {
private void changeCurrentSegmentState(RepairSegment.State state) {
// TODO:
}

Expand Down

0 comments on commit 6ecc29c

Please sign in to comment.