Skip to content

Commit

Permalink
Add endpoints for starting, pausing and resuming a repair run.
Browse files Browse the repository at this point in the history
Because we're just modifying an object, eshr said this should be done
using PUT method. This caused a bit of rework in RepairRunResource.
  • Loading branch information
Radovan Zvoncek committed Jan 20, 2015
1 parent d366f72 commit 02bbf29
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 30 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,8 @@ REST API
* POST /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource)
* Expected query parameters: *None*
* Triggers a repair run identified by the "id" path parameter.

* PUT /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource)
* Expected query parameters:
* *state*: new value for the state of the repair run, e.g. "PAUSED"
* Pauses a repair run identified by the "id" path parameter.
40 changes: 36 additions & 4 deletions bin/spreaper
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ReaperCaller(object):
r = requests.get(the_url, params=params)
elif http_method == 'POST':
r = requests.post(the_url, params=params)
elif http_method == 'PUT':
r = requests.put(the_url, params=params)
else:
assert False, "invalid HTTP method: {0}".format(http_method)
log.info("HTTP %s return code %s with content of length %s",
Expand All @@ -67,6 +69,10 @@ class ReaperCaller(object):
the_url = urlparse.urljoin(self.base_url, endpoint)
return self._http_req("POST", the_url, params)

def put(self, endpoint, **params):
the_url = urlparse.urljoin(self.base_url, endpoint)
return self._http_req("PUT", the_url, params)

# === Arguments for commands ============================================================


Expand Down Expand Up @@ -139,6 +145,12 @@ def _arguments_for_trigger(parser):
parser.add_argument("run_id", help="ID of the run to trigger")


def _arguments_for_pause(parser):
"""Arguments needed for pausing a repair
"""
parser.add_argument("run_id", help="ID of the run to pause")


def _parse_arguments(command, description, usage=None, extra_arguments=None):
"""Generic argument parsing done by every command
"""
Expand Down Expand Up @@ -166,6 +178,8 @@ Usage: spreaper [<global_args>] <command> [<command_args>]
add-table Register a table for a previously added cluster
repair Create a repair run, optionally triggering it
trigger Start a repair run
pause Pause a repair run
resume Resume a previously paused run
ping Test connectivity to to the Reaper service
"""

Expand Down Expand Up @@ -270,18 +284,36 @@ class ReaperCLI(object):
run_id = json.loads(reply)['id']
print "# run with id={0} created".format(run_id)
if args.trigger:
self._trigger_run(reaper, run_id, args)
self._trigger_run(reaper, run_id, args, "RUNNING")

def trigger(self):
args = _parse_arguments(command='trigger',
description='trigger a new or paused repair run',
extra_arguments=_arguments_for_trigger)
reaper = self.init_reaper(args)
self._trigger_run(reaper, args.id, args)
self._trigger_run(reaper, args.run_id, args, "RUNNING")

def pause(self):
args = _parse_arguments(command='pause',
description='pause a running repair run',
extra_arguments=_arguments_for_pause)
reaper = self.init_reaper(args)
print "# pausing run with id: {0}".format(args.run_id)
reaper.put("repair_run/{0}".format(args.run_id), state="PAUSED")
print "# run paused"

def resume(self):
args = _parse_arguments(command='resume',
description='resume a repair run',
extra_arguments=_arguments_for_pause)
reaper = self.init_reaper(args)
print "# resuming a run with id: {0}".format(args.run_id)
reaper.put("repair_run/{0}".format(args.run_id), state="RUNNING")
print "# run resumed"

def _trigger_run(self, reaper, run_id, args):
def _trigger_run(self, reaper, run_id, args, st):
print "# triggering run with id: {0}".format(run_id)
reaper.post("repair_run/{0}".format(run_id), owner=args.owner, cause=args.cause)
reaper.put("repair_run/{0}".format(run_id), owner=args.owner, cause=args.cause, state=st)
print "# run triggered"


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Builder(String clusterName, long columnFamilyId, DateTime creationTime,
this.intensity = intensity;
}

private Builder(RepairRun original) {
public Builder(RepairRun original) {
clusterName = original.clusterName;
columnFamilyId = original.columnFamilyId;
runState = original.runState;
Expand Down
98 changes: 82 additions & 16 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.spotify.reaper.service.SegmentGenerator;
import com.spotify.reaper.storage.IStorage;

import org.apache.cassandra.db.Column;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +47,7 @@

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -126,35 +128,99 @@ public Response addRepairRun(
}

/**
* Triggers an orchestration of a repair run.
* @return 201 if all goes well, 500 in case of errors.
* Modifies a state of the repair run.
*
* Currently supports NOT_STARTED|PAUSED -> RUNNING and RUNNING -> PAUSED.
* @return OK if all goes well NOT_MODIFIED if new state is the same as the old one,
* and 501 (NOT_IMPLEMENTED) if transition is not supported.
*/
@POST
@PUT
@Path("/{id}")
public Response triggerRepairRun(
@Context UriInfo uriInfo,
@PathParam("id") Long repairRunId) {
public Response modifyRunState(
@Context UriInfo uriInfo,
@PathParam("id") Long repairRunId,
@QueryParam("state") Optional<String> state) {

LOG.info("trigger repair run called with: runId = {}", repairRunId);
LOG.info("pause repair run called with: runId = {}", repairRunId);

if (!state.isPresent()) {
return Response.status(Response.Status.BAD_REQUEST.getStatusCode())
.entity("New state not specified")
.build();
}

try {
RepairRun repairRun = fetchRepairRun(repairRunId);
// TODO(zvo): this prevents PAUSED runs to resume, will fix later
if (repairRun.getRunState() != RepairRun.RunState.NOT_STARTED) {
throw new ReaperException(String.format("Repair run \"%d\" already running", repairRunId));
ColumnFamily table = storage.getColumnFamily(repairRun.getColumnFamilyId());
RepairRun.RunState newState = RepairRun.RunState.valueOf(state.get());
RepairRun.RunState oldState = repairRun.getRunState();

if (oldState == newState) {
return Response.status(Response.Status.NOT_MODIFIED).build();
}
ColumnFamily table = getTable(repairRun.getColumnFamilyId());
RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory);
return Response.created(buildRepairRunURI(uriInfo, repairRun))
.entity(new RepairRunStatus(repairRun, table))
.build();

if (isStarting(oldState, newState)) {
return startRun(repairRun, table);
}
if (isPausing(oldState, newState)) {
return pauseRun(repairRun, table);
}
if (isResuming(oldState, newState)) {
return resumeRun(repairRun, table);
}
String errMsg = String.format("Transition %s->%s not supported.", newState.toString(),
oldState.toString());
LOG.error(errMsg);
return Response.status(501).entity(errMsg).build();
} catch (ReaperException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return Response.status(500).entity(e.getMessage()).build();
return Response.status(Response.Status.NOT_FOUND).entity(e.getMessage()).build();
}
}

private boolean isStarting(RepairRun.RunState oldState, RepairRun.RunState newState) {
return oldState == RepairRun.RunState.NOT_STARTED && newState == RepairRun.RunState.RUNNING;
}

private boolean isPausing(RepairRun.RunState oldState, RepairRun.RunState newState) {
return oldState == RepairRun.RunState.RUNNING && newState == RepairRun.RunState.PAUSED;
}

private boolean isResuming(RepairRun.RunState oldState, RepairRun.RunState newState) {
return oldState == RepairRun.RunState.PAUSED && newState == RepairRun.RunState.RUNNING;
}

private Response startRun(RepairRun repairRun, ColumnFamily table) {
LOG.info("Starting run {}", repairRun.getId());
RepairRun updatedRun = new RepairRun.Builder(repairRun)
.runState(RepairRun.RunState.RUNNING)
.startTime(DateTime.now())
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory);
return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, table))
.build();
}

private Response pauseRun(RepairRun repairRun, ColumnFamily table) {
LOG.info("Pausing run {}", repairRun.getId());
RepairRun updatedRun = new RepairRun.Builder(repairRun)
.runState(RepairRun.RunState.PAUSED)
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
return Response.ok().entity(new RepairRunStatus(repairRun, table)).build();
}

private Response resumeRun(RepairRun repairRun, ColumnFamily table) {
LOG.info("Resuming run {}", repairRun.getId());
RepairRun updatedRun = new RepairRun.Builder(repairRun)
.runState(RepairRun.RunState.RUNNING)
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
return Response.ok().entity(new RepairRunStatus(repairRun, table)).build();
}

/**
* @return detailed information about a repair run.
*/
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public static void startNewRepairRun(IStorage storage, long repairRunID,
}
}


private final IStorage storage;
private final long repairRunId;
private final JmxConnectionFactory jmxConnectionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ public void testTriggerRepairRun() throws Exception {
long runId = repairRunStatus.getId();

DateTimeUtils.setCurrentMillisFixed(TIME_START);
response = resource.triggerRepairRun(uriInfo, runId);
Optional<String> newState = Optional.of(RepairRun.RunState.RUNNING.toString());
response = resource.modifyRunState(uriInfo, runId, newState);

assertEquals(201, response.getStatus());
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
assertTrue(response.getEntity() instanceof RepairRunStatus);
// the thing we get as a reply from the endpoint is a not started run. This is because the
// executor didn't have time to start the run
Expand All @@ -176,8 +177,9 @@ public void testTriggerRepairRun() throws Exception {
@Test
public void testTriggerNotExistingRun() {
RepairRunResource resource = new RepairRunResource(config, storage, factory);
Response response = resource.triggerRepairRun(uriInfo, 42l);
assertEquals(500, response.getStatus());
Optional<String> newState = Optional.of(RepairRun.RunState.RUNNING.toString());
Response response = resource.modifyRunState(uriInfo, 42l, newState);
assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
assertTrue(response.getEntity() instanceof String);
assertTrue(response.getEntity().toString().contains("not found"));
}
Expand All @@ -193,11 +195,11 @@ public void testTriggerAlreadyRunningRun() throws InterruptedException {
long runId = repairRunStatus.getId();

DateTimeUtils.setCurrentMillisFixed(TIME_START);
resource.triggerRepairRun(uriInfo, runId);
Optional<String> newState = Optional.of(RepairRun.RunState.RUNNING.toString());
resource.modifyRunState(uriInfo, runId, newState);
Thread.sleep(1000);
response = resource.triggerRepairRun(uriInfo, runId);
assertEquals(500, response.getStatus());
assertTrue(response.getEntity().toString().contains("already running"));
response = resource.modifyRunState(uriInfo, runId, newState);
assertEquals(Response.Status.NOT_MODIFIED.getStatusCode(), response.getStatus());
}

@Test
Expand Down Expand Up @@ -232,4 +234,65 @@ public void testTriggerRunMissingArgument() {
assertTrue(response.getEntity() instanceof String);
assertTrue(response.getEntity().toString().contains("argument missing"));
}

@Test
public void testPauseRunningRun() throws InterruptedException {
// first trigger a run
DateTimeUtils.setCurrentMillisFixed(TIME_CREATE);
RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS);
RepairRunResource resource = new RepairRunResource(config, storage, factory);
Response response = resource.addRepairRun(uriInfo, CLUSTER_NAME, KEYSPACE, TABLE, OWNER,
Optional.<String>absent());
RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity();
long runId = repairRunStatus.getId();
DateTimeUtils.setCurrentMillisFixed(TIME_START);
Optional<String> newState = Optional.of(RepairRun.RunState.RUNNING.toString());
resource.modifyRunState(uriInfo, runId, newState);

Thread.sleep(200);

// now pause it
response = resource.modifyRunState(uriInfo, runId,
Optional.of(RepairRun.RunState.PAUSED.toString()));
Thread.sleep(200);

assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
RepairRun repairRun = storage.getRepairRun(runId);
// the run should be paused
assertEquals(RepairRun.RunState.PAUSED, repairRun.getRunState());
// but the running segment should be untouched
assertEquals(1, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING));
}

@Test
public void testPauseNotRunningRun() throws InterruptedException {
DateTimeUtils.setCurrentMillisFixed(TIME_CREATE);
RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS);
RepairRunResource resource = new RepairRunResource(config, storage, factory);
Response response = resource.addRepairRun(uriInfo, CLUSTER_NAME, KEYSPACE, TABLE, OWNER,
Optional.<String>absent());
RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity();
long runId = repairRunStatus.getId();

response = resource.modifyRunState(uriInfo, runId,
Optional.of(RepairRun.RunState.PAUSED.toString()));
Thread.sleep(200);

assertEquals(501, response.getStatus());
RepairRun repairRun = storage.getRepairRun(runId);
// the run should be paused
assertEquals(RepairRun.RunState.NOT_STARTED, repairRun.getRunState());
// but the running segment should be untouched
assertEquals(0, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING));
}

@Test
public void testPauseNotExistingRun() throws InterruptedException {
RepairRunResource resource = new RepairRunResource(config, storage, factory);
Response response = resource.modifyRunState(uriInfo, 42l,
Optional.of(RepairRun.RunState.PAUSED.toString()));
assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
assertEquals(0, storage.getAllRunningRepairRuns().size());
}

}

0 comments on commit 02bbf29

Please sign in to comment.