Skip to content

Commit

Permalink
Merge pull request #75 from spotify/bj0rn/InformativeAPI
Browse files Browse the repository at this point in the history
Bj0rn/informative api
  • Loading branch information
Bj0rnen committed Mar 17, 2015
2 parents 34a5f05 + 8b6ff4a commit 6072838
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 211 deletions.
90 changes: 22 additions & 68 deletions src/main/java/com/spotify/reaper/resources/ClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
package com.spotify.reaper.resources;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.resources.view.ClusterStatus;
import com.spotify.reaper.resources.view.KeyspaceStatus;
import com.spotify.reaper.resources.view.RepairRunStatus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,29 +69,27 @@ public Response getClusterList() {

@GET
@Path("/{cluster_name}")
public Response getCluster(@PathParam("cluster_name") String clusterName) {
public Response getCluster(
@PathParam("cluster_name") String clusterName,
@QueryParam("limit") Optional<Integer> limit) {
LOG.info("get cluster called with cluster_name: {}", clusterName);
Optional<Cluster> cluster = context.storage.getCluster(clusterName);
if (cluster.isPresent()) {
return viewCluster(cluster.get(), Optional.<URI>absent());
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity("cluster with name \"" + clusterName + "\" not found").build();
}
return viewCluster(clusterName, limit, Optional.<URI>absent());
}

@GET
@Path("/{cluster_name}/{keyspace_name}")
public Response getCluster(@PathParam("cluster_name") String clusterName,
@PathParam("keyspace_name") String keyspaceName) {
LOG.info("get cluster/keyspace called with cluster_name: {}, and keyspace_name: {}",
clusterName, keyspaceName);
Optional<Cluster> cluster = context.storage.getCluster(clusterName);
if (cluster.isPresent()) {
return viewKeyspace(cluster.get(), keyspaceName);
} else {
private Response viewCluster(String clusterName, Optional<Integer> limit,
Optional<URI> createdURI) {
ClusterStatus view =
new ClusterStatus(context.storage.getClusterRunStatuses(clusterName, limit.or(10)));

if (view.repairRuns == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity("cluster with name \"" + clusterName + "\" not found").build();
} else if (createdURI.isPresent()) {
return Response.created(createdURI.get())
.entity(view).build();
} else {
return Response.ok()
.entity(view).build();
}
}

Expand Down Expand Up @@ -127,15 +123,15 @@ public Response addCluster(

URI createdURI;
try {
createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName())).toURI();
createdURI = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI();
} catch (Exception e) {
String errMsg = "failed creating target URI for cluster: " + newCluster.getName();
LOG.error(errMsg);
e.printStackTrace();
return Response.status(400).entity(errMsg).build();
}

return viewCluster(newCluster, Optional.of(createdURI));
return viewCluster(newCluster.getName(), Optional.<Integer>absent(), Optional.of(createdURI));
}

public Cluster createClusterWithSeedHost(String seedHost)
Expand All @@ -153,49 +149,6 @@ public Cluster createClusterWithSeedHost(String seedHost)
return new Cluster(clusterName, partitioner, Collections.singleton(seedHost));
}

private Response viewCluster(Cluster cluster, Optional<URI> createdURI) {
ClusterStatus view = new ClusterStatus(cluster);
Collection<Collection<Object>> runIdTuples = Lists.newArrayList();
for (Long repairRunId : context.storage.getRepairRunIdsForCluster(cluster.getName())) {
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
runIdTuples
.add(Lists.newArrayList(new Object[]{repairRunId, repairRun.get().getRunState()}));
}
}
view.setRepairRunIds(runIdTuples);
try (JmxProxy jmx = context.jmxConnectionFactory.connectAny(cluster)) {
view.setKeyspaces(jmx.getKeyspaces());
} catch (ReaperException e) {
e.printStackTrace();
LOG.error("failed connecting JMX", e);
return Response.status(500).entity("failed connecting given clusters JMX endpoint").build();
}
if (createdURI.isPresent()) {
return Response.created(createdURI.get()).entity(view).build();
} else {
return Response.ok().entity(view).build();
}
}

private Response viewKeyspace(Cluster cluster, String keyspaceName) {
KeyspaceStatus view = new KeyspaceStatus(cluster);
try (JmxProxy jmx = context.jmxConnectionFactory.connectAny(cluster)) {
if (jmx.getKeyspaces().contains(keyspaceName)) {
view.setTables(jmx.getTableNamesForKeyspace(keyspaceName));
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity("cluster with name \"" + cluster.getName() + "\" does not contain keyspace \""
+ keyspaceName + "\"").build();
}
} catch (ReaperException e) {
e.printStackTrace();
LOG.error("failed connecting JMX", e);
return Response.status(500).entity("failed connecting given clusters JMX endpoint").build();
}
return Response.ok().entity(view).build();
}

/**
* Delete a Cluster object with given name.
*
Expand All @@ -207,7 +160,8 @@ private Response viewKeyspace(Cluster cluster, String keyspaceName) {
*/
@DELETE
@Path("/{cluster_name}")
public Response deleteCluster(@PathParam("cluster_name") String clusterName) {
public Response deleteCluster(
@PathParam("cluster_name") String clusterName) {
LOG.info("delete cluster called with clusterName: {}", clusterName);
Optional<Cluster> clusterToDelete = context.storage.getCluster(clusterName);
if (!clusterToDelete.isPresent()) {
Expand All @@ -226,7 +180,7 @@ public Response deleteCluster(@PathParam("cluster_name") String clusterName) {
}
Optional<Cluster> deletedCluster = context.storage.deleteCluster(clusterName);
if (deletedCluster.isPresent()) {
return Response.ok().entity(new ClusterStatus(deletedCluster.get())).build();
return Response.ok(new ClusterStatus(Collections.<RepairRunStatus>emptyList())).build();
}
return Response.serverError().entity("delete failed for schedule with name \""
+ clusterName + "\"").build();
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Set;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkNotNull;

public class CommonTools {
Expand Down Expand Up @@ -245,7 +247,8 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster
return theRepairUnit;
}

public static String dateTimeToISO8601(DateTime dateTime) {
@Nullable
public static String dateTimeToISO8601(@Nullable DateTime dateTime) {
if (null == dateTime) {
return null;
}
Expand Down
51 changes: 28 additions & 23 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Response addRepairRun(
parallelism, intensity);

return Response.created(buildRepairRunURI(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit)).build();
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0)).build();

} catch (ReaperException e) {
LOG.error(e.getMessage());
Expand Down Expand Up @@ -235,15 +235,17 @@ public Response modifyRunState(
.build();
}

Optional<RepairUnit>
repairUnit =
Optional<RepairUnit> repairUnit =
context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
if (!repairUnit.isPresent()) {
String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found";
LOG.error(errMsg);
return Response.status(Response.Status.NOT_FOUND).entity(errMsg).build();
}

int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(repairRunId, RepairSegment.State.DONE);

RepairRun.RunState newState;
try {
newState = RepairRun.RunState.valueOf(state.get().toUpperCase());
Expand All @@ -258,11 +260,11 @@ public Response modifyRunState(
}

if (isStarting(oldState, newState)) {
return startRun(repairRun.get(), repairUnit.get());
return startRun(repairRun.get(), repairUnit.get(), segmentsRepaired);
} else if (isPausing(oldState, newState)) {
return pauseRun(repairRun.get(), repairUnit.get());
return pauseRun(repairRun.get(), repairUnit.get(), segmentsRepaired);
} else if (isResuming(oldState, newState)) {
return resumeRun(repairRun.get(), repairUnit.get());
return resumeRun(repairRun.get(), repairUnit.get(), segmentsRepaired);
} else {
String errMsg = String.format("Transition %s->%s not supported.", oldState.toString(),
newState.toString());
Expand All @@ -283,23 +285,24 @@ private boolean isResuming(RepairRun.RunState oldState, RepairRun.RunState newSt
return oldState == RepairRun.RunState.PAUSED && newState == RepairRun.RunState.RUNNING;
}

private Response startRun(RepairRun repairRun, RepairUnit repairUnit) {
private Response startRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) {
LOG.info("Starting run {}", repairRun.getId());
RepairRun newRun = context.repairManager.startRepairRun(context, repairRun);
return Response.status(Response.Status.OK).entity(new RepairRunStatus(newRun, repairUnit))
return Response.status(Response.Status.OK).entity(
new RepairRunStatus(newRun, repairUnit, segmentsRepaired))
.build();
}

private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit) {
private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) {
LOG.info("Pausing run {}", repairRun.getId());
RepairRun newRun = context.repairManager.pauseRepairRun(context, repairRun);
return Response.ok().entity(new RepairRunStatus(newRun, repairUnit)).build();
return Response.ok().entity(new RepairRunStatus(newRun, repairUnit, segmentsRepaired)).build();
}

private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) {
private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) {
LOG.info("Resuming run {}", repairRun.getId());
RepairRun newRun = context.repairManager.startRepairRun(context, repairRun);
return Response.ok().entity(new RepairRunStatus(newRun, repairUnit)).build();
return Response.ok().entity(new RepairRunStatus(newRun, repairUnit, segmentsRepaired)).build();
}

/**
Expand Down Expand Up @@ -339,13 +342,10 @@ public Response getRepairRunsForCluster(@PathParam("cluster_name") String cluste
private RepairRunStatus getRepairRunStatus(RepairRun repairRun) {
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId());
assert repairUnit.isPresent() : "no repair unit found with id: " + repairRun.getRepairUnitId();
RepairRunStatus repairRunStatus = new RepairRunStatus(repairRun, repairUnit.get());
if (repairRun.getRunState() != RepairRun.RunState.NOT_STARTED) {
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRun(repairRun.getId(), RepairSegment.State.DONE);
repairRunStatus.setSegmentsRepaired(segmentsRepaired);
}
return repairRunStatus;
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(repairRun.getId(),
RepairSegment.State.DONE);
return new RepairRunStatus(repairRun, repairUnit.get(), segmentsRepaired);
}

/**
Expand Down Expand Up @@ -389,8 +389,11 @@ public Response listRepairRuns(@QueryParam("state") Optional<String> state) {
continue;
}
Optional<RepairUnit> runsUnit = context.storage.getRepairUnit(run.getRepairUnitId());
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(run.getId(),
RepairSegment.State.DONE);
if (runsUnit.isPresent()) {
runStatuses.add(new RepairRunStatus(run, runsUnit.get()));
runStatuses.add(new RepairRunStatus(run, runsUnit.get(), segmentsRepaired));
} else {
String errMsg =
String.format("Found repair run %d with no associated repair unit", run.getId());
Expand Down Expand Up @@ -454,18 +457,20 @@ public Response deleteRepairRun(@PathParam("id") Long runId,
"Repair run with id \"" + runId + "\" is not owned by the user you defined: "
+ owner.get()).build();
}
if (context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING) > 0) {
if (context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.RUNNING) > 0) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId
+ "\" has a running segment, which must be waited to finish before deleting").build();
}
// Need to get the RepairUnit before it's possibly deleted.
Optional<RepairUnit> unitPossiblyDeleted =
context.storage.getRepairUnit(runToDelete.get().getRepairUnitId());
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.DONE);
Optional<RepairRun> deletedRun = context.storage.deleteRepairRun(runId);
if (deletedRun.isPresent()) {
RepairRunStatus repairRunStatus = new RepairRunStatus(deletedRun.get(),
unitPossiblyDeleted.get());
RepairRunStatus repairRunStatus =
new RepairRunStatus(deletedRun.get(), unitPossiblyDeleted.get(), segmentsRepaired);
return Response.ok().entity(repairRunStatus).build();
}
return Response.serverError().entity("delete failed for repair run with id \""
Expand Down
46 changes: 2 additions & 44 deletions src/main/java/com/spotify/reaper/resources/view/ClusterStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,15 @@
package com.spotify.reaper.resources.view;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.spotify.reaper.core.Cluster;

import java.util.Collection;

/**
* Contains the data to be shown when querying cluster status.
*/
public class ClusterStatus {

@JsonProperty("cluster_name")
private final String clusterName;

@JsonProperty()
private final String partitioner;

@JsonProperty("seed_hosts")
private final Collection<String> seedHosts;

@JsonProperty("repair_runs")
private Collection<Collection<Object>> repairRuns;

@JsonProperty()
private Collection<String> keyspaces;

public ClusterStatus(Cluster cluster) {
this.clusterName = cluster.getName();
this.partitioner = cluster.getPartitioner();
this.seedHosts = cluster.getSeedHosts();
}

public String getClusterName() {
return clusterName;
}
public final Collection<RepairRunStatus> repairRuns;

public String getPartitioner() {
return partitioner;
}

public Collection<String> getSeedHosts() {
return seedHosts;
}

public Collection<Collection<Object>> getRepairRuns() {
return repairRuns;
}

public void setRepairRunIds(Collection<Collection<Object>> repairRuns) {
public ClusterStatus(Collection<RepairRunStatus> repairRuns) {
this.repairRuns = repairRuns;
}

public void setKeyspaces(Collection<String> keyspaces) {
this.keyspaces = keyspaces;
}
}
Loading

0 comments on commit 6072838

Please sign in to comment.