Skip to content

Commit

Permalink
Reimplement RepairRunner using SegmentRunner
Browse files Browse the repository at this point in the history
A few things are not that great yet:
* The tests will go through even if some assertions fail, because they are on other threads.
* The tests also need better code reuse; they're too big now.
* The RepairRunnerTest.testHangingRepair test is particularly problematic, because the repair run is done on its own thread, which forced me to put a sleep before checking the result. The conclusion is that startNewRepairRun should block.
* Exceptions from the SegmentRunner aren't handled.
  • Loading branch information
Bj0rnen committed Jan 19, 2015
1 parent e0c2bc5 commit 555ffdc
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 251 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import sun.misc.Signal;
import sun.misc.SignalHandler;

import java.util.concurrent.TimeUnit;

import io.dropwizard.Application;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void run(ReaperApplicationConfiguration config,

LOG.info("initializing runner thread pool with {} threads", config.getRepairRunThreadCount());
RepairRunner.initializeThreadPool(config.getRepairRunThreadCount(),
config.getHangingRepairTimeoutMins() * 60);
config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES);

LOG.info("initializing storage of type: {}", config.getStorageType());
IStorage storage = initializeStorage(config, environment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class JmxConnectionFactory {

public JmxProxy create(String host) throws ReaperException {
public final JmxProxy create(String host) throws ReaperException {
return create(Optional.<RepairStatusHandler>absent(), host);
}

Expand All @@ -32,7 +32,7 @@ public JmxProxy create(Optional<RepairStatusHandler> handler, String host)
return JmxProxy.connect(handler, host);
}

public JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collection<String> hosts)
public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collection<String> hosts)
throws ReaperException {
return create(handler, hosts.iterator().next());
}
Expand Down
222 changes: 40 additions & 182 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,15 @@
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.storage.IStorage;

import org.apache.cassandra.service.ActiveRepairService;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* A repair runner that only triggers one segment repair at a time.
*/
public class RepairRunner implements Runnable {
// TODO: test
// TODO: handle failed storage updates
Expand All @@ -47,11 +42,11 @@ public class RepairRunner implements Runnable {
private static final int JMX_FAILURE_SLEEP_DELAY_SECONDS = 30;

private static ScheduledExecutorService executor = null;
private static int repairTimeoutSecs;
private static long repairTimeoutMillis;

public static void initializeThreadPool(int threadAmount, int repairTimeoutSecs) {
public static void initializeThreadPool(int threadAmount, long repairTimeout, TimeUnit timeUnit) {
executor = Executors.newScheduledThreadPool(threadAmount);
RepairRunner.repairTimeoutSecs = repairTimeoutSecs;
RepairRunner.repairTimeoutMillis = timeUnit.toMillis(repairTimeout);
}

public static void startNewRepairRun(IStorage storage, long repairRunID,
Expand All @@ -73,12 +68,6 @@ public static void startNewRepairRun(IStorage storage, long repairRunID,
private final JmxConnectionFactory jmxConnectionFactory;
private JmxProxy jmxConnection;

// These fields are only set when a segment is being repaired.
// TODO: bundle them into a class?
private ScheduledFuture<?> repairTimeout = null;
private int currentCommandId = -1;
private long currentSegmentId = -1;

@VisibleForTesting
RepairRunner(IStorage storage, long repairRunId, JmxConnectionFactory jmxConnectionFactory)
throws ReaperException {
Expand All @@ -94,6 +83,9 @@ public static void startNewRepairRun(IStorage storage, long repairRunID,
*/
@Override
public void run() {
// TODO: just like SegmentRunner, RepairRunner should probably be blocking.
// TODO: the best way to do that is probably to remove the Runnable interface and do everything
// TODO: in a while loop.
RepairRun.RunState state = storage.getRepairRun(repairRunId).getRunState();
LOG.debug("run() called for repair run #{} with run state {}", repairRunId, state);
switch (state) {
Expand Down Expand Up @@ -142,47 +134,23 @@ private void end() {
* as NOT_STARTED to queue it up for a retry.
*/
private void startNextSegment() {
RepairSegment running = storage.getTheRunningSegment(repairRunId);
if (running != null) {
abortSegment(running);
startNextSegment();
// Currently not allowing parallel repairs.
assert storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.RUNNING) == 0;
RepairSegment next = storage.getNextFreeSegment(repairRunId);
if (next != null) {
doRepairSegment(next.getId(), next.getTokenRange());
} else {
assert !repairIsTriggered();
RepairSegment next = storage.getNextFreeSegment(repairRunId);
if (next != null) {
doRepairSegment(next);
} else {
end();
}
end();
}
}

/**
* Set the running repair segment back to NOT_STARTED.
*
* @param runningSegment the running repair segment.
*/
public void abortSegment(RepairSegment runningSegment) {
// TODO: actually abort the repair. (runningSegment.getRepairCommandId() should be set)
assert runningSegment.getRepairCommandId() != null;
LOG.debug("Aborting repair with and segmentId {} in repair run #{}",
runningSegment.getId(), repairRunId);
storage.updateRepairSegment(runningSegment.with()
.startTime(null)
.repairCommandId(null)
.state(RepairSegment.State.NOT_STARTED)
.build(runningSegment.getId()));
}

/**
* Start the repair of a segment.
*
* @param next the segment to repair.
* @param segmentId id of the segment to repair.
* @param tokenRange token range of the segment to repair.
*/
private synchronized void doRepairSegment(RepairSegment next) {
// TODO: directly store the right host to contact per segment (or per run, if we guarantee that
// TODO: one host can coordinate all repair segments).

private void doRepairSegment(long segmentId, RingRange tokenRange) {
ColumnFamily columnFamily =
storage.getColumnFamily(storage.getRepairRun(repairRunId).getColumnFamilyId());
String keyspace = columnFamily.getKeyspaceName();
Expand All @@ -202,9 +170,7 @@ private synchronized void doRepairSegment(RepairSegment next) {
LOG.info("successfully reestablished JMX proxy for repair runner on run id: {}", repairRunId);
}


List<String> potentialCoordinators = jmxConnection
.tokenRangeToEndpoint(keyspace, storage.getNextFreeSegment(repairRunId).getTokenRange());
List<String> potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange);
if (potentialCoordinators == null) {
// This segment has a faulty token range. Abort the entire repair run.
RepairRun repairRun = storage.getRepairRun(repairRunId);
Expand All @@ -214,151 +180,43 @@ private synchronized void doRepairSegment(RepairSegment next) {
return;
}

// Connect to a node that can act as coordinator for the new repair.
try {
jmxConnection.close();
jmxConnection = jmxConnectionFactory.connectAny(Optional.<RepairStatusHandler>of(
new RepairStatusHandler() {
@Override
public void handle(int repairNumber, ActiveRepairService.Status status,
String message) {
RepairOutcome outcome;
switch (status) {
case STARTED:
outcome = RepairOutcome.STARTED;
break;
case SESSION_FAILED:
outcome = RepairOutcome.FAILED;
break;
case FINISHED:
outcome = RepairOutcome.FINISHED;
break;
default:
// Do nothing, wait for FINISHED.
return;
}
handleRepairOutcome(repairNumber, outcome, message);
}
}), potentialCoordinators);
SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis,
jmxConnectionFactory);
} catch (ReaperException e) {
e.printStackTrace();
LOG.warn("Failed to connect to a coordinator node for next repair in runner #{}, "
+ "reattempting in {} seconds", repairRunId, JMX_FAILURE_SLEEP_DELAY_SECONDS);
executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SECONDS, TimeUnit.SECONDS);
return;
} catch (InterruptedException e) {
e.printStackTrace();
}

currentSegmentId = next.getId();
// TODO: ensure that no repair is already running (abort all repairs)
currentCommandId = jmxConnection
.triggerRepair(next.getStartToken(), next.getEndToken(), keyspace, columnFamily.getName());
repairTimeout = executor.schedule(new Runnable() {
@Override
public void run() {
handleRepairOutcome(currentCommandId, RepairOutcome.TIMEOUT,
"[Reaper] Repair command timed out");
}
}, repairTimeoutSecs, TimeUnit.SECONDS);
LOG.debug("Triggered repair with command id {}", currentCommandId);
LOG.info("Repair for segment {} started in repair run #{}", currentSegmentId, repairRunId);
storage.updateRepairSegment(next.with()
.state(RepairSegment.State.RUNNING)
.repairCommandId(currentCommandId)
.build(currentSegmentId));
handleResult(segmentId);
}


public static enum RepairOutcome {
STARTED, FINISHED, FAILED, TIMEOUT
}

/**
* Called when there is an event coming either from JMX or this runner regarding on-going
* repairs.
*
* @param repairCommandId repair sequence number, obtained when triggering a repair
* @param outcome new status of the repair (STARTED, FINISHED, FAILED, TIMEOUT)
* @param message additional information about the repair
*/
public synchronized void handleRepairOutcome(int repairCommandId, RepairOutcome outcome,
String message) {
LOG.debug(
"handleRepairOutcome called for repair run #{}, repairCommandId {}, outcome {} and message: {}",
repairRunId, repairCommandId, outcome, message);
if (repairCommandId != currentCommandId) {
LOG.warn("Repair run id != current command id. {} != {}", repairCommandId, currentCommandId);
// This can be reached if timeout happens while finished repair is being handled, or vice
// versa. Since this method is synchronized, only one will get through.

// Another cause for getting here is other repairs running on the node than what this runner
// has initiated.
return;
}

if (repairIsTriggered()) {
RepairSegment currentSegment = storage.getRepairSegment(currentSegmentId);
// See status explanations from: https://wiki.apache.org/cassandra/RepairAsyncAPI
switch (outcome) {
case STARTED:
DateTime now = DateTime.now();
storage.updateRepairSegment(currentSegment.with()
.startTime(now)
.build(currentSegmentId));
// We already set the state of the segment to RUNNING.
break;
case FINISHED: {
RepairSegment updatedSegment = currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(currentSegmentId);
storage.updateRepairSegment(updatedSegment);
closeRepairCommand();
executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS);
}
private void handleResult(long segmentId) {
RepairSegment segment = storage.getRepairSegment(segmentId);
RepairSegment.State state = segment.getState();
LOG.debug("In repair run #{}, triggerRepair on segment {} terminated with state {}",
repairRunId, segmentId, state);
switch (state) {
case NOT_STARTED:
// Repair timed out
executor.submit(this);
break;
case FAILED: {
// TODO: Bj0rn: How should we handle this? Here, it's almost treated like a success.
RepairSegment updatedSegment = currentSegment.with()
.state(RepairSegment.State.ERROR)
.endTime(DateTime.now())
.build(currentSegmentId);
storage.updateRepairSegment(updatedSegment);
closeRepairCommand();
executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS);
}
case DONE:
// Successful repair
executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS);
break;
case TIMEOUT: {
closeRepairCommand();
abortSegment(currentSegment);
executor.submit(this);
}
case ERROR:
// Unsuccessful repair
executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS);
break;
}
case RUNNING:
// Another thread has started a new repair on this segment already
// Or maybe the same repair segment id should never be re-run in which case this is an error
executor.submit(this);
}
}

/**
* @return <code>true</code> if this RepairRunner has triggered a repair and is currently waiting
* for a repair status notification from JMX.
*/
boolean repairIsTriggered() {
return repairTimeout != null;
}

/**
* Stop countdown for repair, and stop listening for JMX notifications for the current repair.
*/
void closeRepairCommand() {
LOG.debug("Closing repair command with commandId {} and segmentId {} in repair run #{}",
currentCommandId, currentSegmentId, repairRunId);
assert repairTimeout != null;

repairTimeout.cancel(false);
repairTimeout = null;
currentCommandId = -1;
currentSegmentId = -1;
}

/**
* Calculate the delay that should be used before starting the next repair segment.
*
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ private SegmentRunner(IStorage storage, long segmentId, Collection<String> poten
this.storage = storage;
this.segmentId = segmentId;

// TODO: don't trigger the repair in the constructor. Requires commandId to be mutable
// TODO: don't trigger the repair in the constructor. The change will force commandId to be
// TODO: mutable, but that's better than this.
synchronized (this) {
jmxConnection = jmxConnectionFactory.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators);

Expand All @@ -59,6 +60,7 @@ private SegmentRunner(IStorage storage, long segmentId, Collection<String> poten
storage.getColumnFamily(segment.getColumnFamilyId());
String keyspace = columnFamily.getKeyspaceName();

assert !segment.getState().equals(RepairSegment.State.RUNNING);
commandId = jmxConnection
.triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace,
columnFamily.getName());
Expand Down
Loading

0 comments on commit 555ffdc

Please sign in to comment.