Skip to content

Commit

Permalink
WIP still ongoing
Browse files Browse the repository at this point in the history
  • Loading branch information
varjoranta authored and Bj0rnen committed Jan 22, 2015
1 parent 90608df commit c01bef2
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/main/db/reaper_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS "repair_run" (
"creation_time" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
"start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"pause_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"intensity" REAL NOT NULL
);

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static JmxProxy connect(Optional<RepairStatusHandler> handler, String hos
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
LOG.info(String.format("JMX connection to %s properly connected.", host));
return proxy;
} catch (IOException | InstanceNotFoundException | MalformedObjectNameException e) {
} catch (IOException | InstanceNotFoundException e) {
LOG.error("Failed to establish JMX connection");
throw new ReaperException("Failure when establishing JMX connection", e);
}
Expand Down Expand Up @@ -187,9 +187,9 @@ public List<String> getKeyspaces() {
return ssProxy.getKeyspaces();
}

public List<String> getTableNamesForKeyspace(String keyspace) throws ReaperException {
List<String> tableNames = new ArrayList<>();
Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> proxies = null;
public Set<String> getTableNamesForKeyspace(String keyspace) throws ReaperException {
Set<String> tableNames = new HashSet<>();
Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> proxies;
try {
proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer);
} catch (IOException | MalformedObjectNameException e) {
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/com/spotify/reaper/resources/ClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ public Response getClusterList() {
@Path("/{cluster_name}")
public Response getCluster(@PathParam("cluster_name") String clusterName) {
LOG.info("get cluster called with cluster_name: {}", clusterName);
Cluster cluster = storage.getCluster(clusterName);
return viewCluster(cluster, Optional.<URI>absent());
Optional<Cluster> cluster = storage.getCluster(clusterName);
if (cluster.isPresent()) {
return viewCluster(cluster.get(), Optional.<URI>absent());
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity("cluster with name \"" + clusterName + "\" not found").build();
}
}

@POST
Expand All @@ -84,15 +89,15 @@ public Response addCluster(
return Response.status(400)
.entity("failed to create cluster with seed host: " + seedHost.get()).build();
}
Cluster existingCluster = storage.getCluster(newCluster.getName());
if (existingCluster == null) {
LOG.info("creating new cluster based on given seed host: {}", newCluster);
storage.addCluster(newCluster);
} else {
Optional<Cluster> existingCluster = storage.getCluster(newCluster.getName());
if (existingCluster.isPresent()) {
LOG.info("cluster already stored with this name: {}", existingCluster);
return Response.status(403)
.entity(String.format("cluster \"%s\" already exists", existingCluster.getName()))
.entity(String.format("cluster \"%s\" already exists", existingCluster.get().getName()))
.build();
} else {
LOG.info("creating new cluster based on given seed host: {}", newCluster);
storage.addCluster(newCluster);
}

URI createdURI;
Expand Down
125 changes: 71 additions & 54 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package com.spotify.reaper.resources;

import com.google.common.base.CharMatcher;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;

import com.google.common.collect.Sets;
import com.spotify.reaper.ReaperApplicationConfiguration;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
Expand Down Expand Up @@ -68,6 +71,9 @@ public class RepairRunResource {
private final ReaperApplicationConfiguration config;
private final JmxConnectionFactory jmxFactory;

public static final Splitter COMMA_SEPARATED_LIST_SPLITTER =
Splitter.on(',').trimResults(CharMatcher.anyOf(" ()[]\"'")).omitEmptyStrings();

public RepairRunResource(ReaperApplicationConfiguration config, IStorage storage) {
this.config = config;
this.storage = storage;
Expand Down Expand Up @@ -98,13 +104,13 @@ public Response addRepairRun(
@Context UriInfo uriInfo,
@QueryParam("clusterName") Optional<String> clusterName,
@QueryParam("keyspace") Optional<String> keyspace,
@QueryParam("tables") Optional<String> tableNames,
@QueryParam("tables") Optional<String> tableNamesParam,
@QueryParam("owner") Optional<String> owner,
@QueryParam("cause") Optional<String> cause) {

@QueryParam("cause") Optional<String> cause,
@QueryParam("segmentCount") Optional<Integer> segmentCount
) {
LOG.info("add repair run called with: clusterName = {}, keyspace = {}, tables = {}, owner = {},"
+ " cause = {}", clusterName, keyspace, tableNames, owner, cause);

+ " cause = {}", clusterName, keyspace, tableNamesParam, owner, cause);
try {
if (!clusterName.isPresent()) {
throw new ReaperException("\"clusterName\" argument missing");
Expand All @@ -118,21 +124,47 @@ public Response addRepairRun(

Cluster cluster = getCluster(clusterName.get());
JmxProxy jmxProxy = jmxFactory.create(cluster.getSeedHosts().iterator().next());
List<String> knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get());
Set<String> knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get());
if (knownTables.size() == 0) {
LOG.debug("no known tables for keyspace {} in cluster {}", keyspace.get(),
clusterName.get());
return Response.status(Response.Status.NOT_FOUND).entity(
"no column families found for keyspace").build();
}

jmxProxy.close();

RepairUnit repairUnit = getRepairUnit(clusterName.get(), keyspace.get(), tableNames.get());
RepairRun newRepairRun = registerRepairRun(cluster, repairUnit, cause, owner.get());
Set<String> tableNames;
if (tableNamesParam.isPresent()) {
tableNames = Sets.newHashSet(COMMA_SEPARATED_LIST_SPLITTER.split(tableNamesParam.get()));
} else {
tableNames = knownTables;
}

Optional<RepairUnit> storedRepairUnit =
storage.getRepairUnit(clusterName.get(), keyspace.get(), tableNames);
RepairUnit theRepairUnit;
if (storedRepairUnit.isPresent()) {
if (segmentCount.isPresent()) {
LOG.warn("stored repair unit already exists, and segment count given, "
+ "which is thus ignored");
}
theRepairUnit = storedRepairUnit.get();
} else {
int segments = config.getSegmentCount();
if (segmentCount.isPresent()) {
LOG.debug("using given segment count {} instead of configured value {}",
segmentCount.get(), config.getSegmentCount());
segments = segmentCount.get();
}
LOG.info("create new repair unit for cluster '{}', keyspace '{}', and column families: {}",
clusterName.get(), keyspace.get(), tableNames);
theRepairUnit = storage.addRepairUnit(new RepairUnit.Builder(clusterName.get(),
keyspace.get(), tableNames, segments, config.getSnapshotRepair()));
}
RepairRun newRepairRun = registerRepairRun(cluster, theRepairUnit, cause, owner.get());
return Response.created(buildRepairRunURI(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, repairUnit))
.build();
.entity(new RepairRunStatus(newRepairRun, theRepairUnit)).build();

} catch (ReaperException e) {
LOG.error(e.getMessage());
e.printStackTrace();
Expand All @@ -154,19 +186,29 @@ public Response modifyRunState(
@PathParam("id") Long repairRunId,
@QueryParam("state") Optional<String> state) {

LOG.info("pause repair run called with: id = {}, state = {}", repairRunId, state);
LOG.info("modify repair run state called with: id = {}, state = {}", repairRunId, state);

if (!state.isPresent()) {
return Response.status(Response.Status.BAD_REQUEST.getStatusCode())
.entity("\"state\" argument missing")
.build();
.entity("\"state\" argument missing").build();
}

try {
RepairRun repairRun = getRepairRun(repairRunId);
RepairUnit repairUnit = storage.getRepairUnit(repairRun.getRepairUnitId());
Optional<RepairRun> repairRun = storage.getRepairRun(repairRunId);
if (!repairRun.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).entity("repair run with id "
+ repairRunId + " not found").build();
}

Optional<RepairUnit> repairUnit = storage.getRepairUnit(repairRun.get().getRepairUnitId());
if (!repairUnit.isPresent()) {
String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found";
LOG.error(errMsg);
return Response.status(Response.Status.NOT_FOUND).entity(errMsg).build();
}

RepairRun.RunState newState = RepairRun.RunState.valueOf(state.get());
RepairRun.RunState oldState = repairRun.getRunState();
RepairRun.RunState oldState = repairRun.get().getRunState();

if (oldState == newState) {
return Response.ok("given \"state\" is same as the current run state").build();
Expand Down Expand Up @@ -241,9 +283,9 @@ private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) {
@Path("/{id}")
public Response getRepairRun(@PathParam("id") Long repairRunId) {
LOG.info("get repair_run called with: id = {}", repairRunId);
RepairRun repairRun = storage.getRepairRun(repairRunId);
if (null != repairRun) {
return Response.ok().entity(getRepairRunStatus(repairRun)).build();
Optional<RepairRun> repairRun = storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
return Response.ok().entity(getRepairRunStatus(repairRun.get())).build();
}
else {
return Response.status(404).entity(
Expand Down Expand Up @@ -278,33 +320,6 @@ private Cluster getCluster(String clusterName) throws ReaperException {
return cluster;
}

/**
* @return repair unit information for given cluster, keyspace and table name
* @throws ReaperException if such table is not found in Reaper's storage
*/
private RepairUnit getRepairUnit(String clusterName, String keyspace,
Collection<String> tableNames) throws ReaperException {
RepairUnit repairUnit = storage.getRepairUnit(clusterName, keyspace, tableName);
if (repairUnit == null) {
throw new ReaperException(String.format("Column family \"%s/%s/%s\" not found", clusterName,
keyspace, tableName));
}
return repairUnit;
}

/**
* @return table information for given table id
* @throws ReaperException if such table is not found in Reaper's storage
*/
private RepairUnit getRepairUnit(long repairUnitId) throws ReaperException {
RepairUnit repairUnit = storage.getRepairUnit(repairUnitId);
if (repairUnit == null) {
throw new ReaperException(String.format("Column family with id \"%d\" not found",
repairUnitId));
}
return repairUnit;
}

/**
* Creates a repair run but does not trigger it.
*
Expand Down Expand Up @@ -344,22 +359,22 @@ private RepairRun registerRepairRun(Cluster cluster, RepairUnit repairUnit,
* @throws ReaperException when fails to discover seeds for the cluster or fails to connect to
* any of the nodes in the Cluster.
*/
private List<RingRange> generateSegments(Cluster targetCluster, RepairUnit existingTable)
private List<RingRange> generateSegments(Cluster targetCluster, RepairUnit repairUnit)
throws ReaperException {
List<RingRange> segments = null;
SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner());
Set<String> seedHosts = targetCluster.getSeedHosts();
if (seedHosts.isEmpty()) {
String errMsg = String.format("didn't get any seed hosts for cluster \"%s\"",
existingTable.getClusterName());
repairUnit.getClusterName());
LOG.error(errMsg);
throw new ReaperException(errMsg);
}
for (String host : seedHosts) {
try {
JmxProxy jmxProxy = jmxFactory.create(host);
List<BigInteger> tokens = jmxProxy.getTokens();
segments = sg.generateSegments(existingTable.getSegmentCount(), tokens);
segments = sg.generateSegments(repairUnit.getSegmentCount(), tokens);
jmxProxy.close();
break;
} catch (ReaperException e) {
Expand All @@ -368,7 +383,7 @@ private List<RingRange> generateSegments(Cluster targetCluster, RepairUnit exist
}
if (segments == null) {
String errMsg = String.format("failed to generate repair segments for cluster \"%s\"",
existingTable.getClusterName());
repairUnit.getClusterName());
LOG.error(errMsg);
throw new ReaperException(errMsg);
}
Expand Down Expand Up @@ -401,15 +416,17 @@ private RepairRun storeNewRepairRun(Cluster cluster, RepairUnit table, Optional<
* storage backend.
*/
private void storeNewRepairSegments(List<RingRange> tokenSegments, RepairRun repairRun,
RepairUnit table) {
RepairUnit table) throws ReaperException {
List <RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (RingRange range : tokenSegments) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range,
table.getId());
repairSegmentBuilders.add(repairSegment);
}
// TODO(zvo): I don't like we can't figure out if this suceeds or not
storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
boolean success = storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (!success) {
throw new ReaperException("failed adding repair segments to storage");
}
}

/**
Expand Down
45 changes: 24 additions & 21 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Optional;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.service.RingRange;

import java.util.Collection;

import javax.annotation.Nullable;

/**
* API definition for cassandra-reaper.
*/
Expand All @@ -32,47 +31,51 @@ public interface IStorage {

Collection<Cluster> getClusters();

Cluster addCluster(Cluster cluster);
boolean addCluster(Cluster cluster);

boolean updateCluster(Cluster newCluster);

Cluster getCluster(String clusterName);
Optional<Cluster> getCluster(String clusterName);

RepairRun addRepairRun(RepairRun.Builder repairRun);

boolean updateRepairRun(RepairRun repairRun);

RepairRun getRepairRun(long id);
Optional<RepairRun> getRepairRun(long id);

Collection<RepairRun> getRepairRunsForCluster(String clusterName);

Collection<RepairRun> getAllRunningRepairRuns();

RepairUnit addRepairUnit(RepairUnit.Builder newRepairUnit);

RepairUnit getRepairUnit(long id);
Optional<RepairUnit> getRepairUnit(long id);

RepairUnit getRepairUnit(String cluster, String keyspace, String table);
/**
* Get a stored RepairUnit targeting the given tables in the given keyspace.
* Tables must be always defined, so targeting the whole keyspace requires
* first getting all the column family names from the keyspace.
*
* @param cluster Cluster name for the RepairUnit.
* @param keyspace Keyspace name for the RepairUnit.
* @param columnFamilyNames List of column families targeted by the RepairUnit.
* @return Instance of a RepairUnit matching the parameters, or null if not found.
*/
Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,
Collection<String> columnFamilyNames);

void addRepairSegments(Collection<RepairSegment.Builder> newSegments, long runId);
boolean addRepairSegments(Collection<RepairSegment.Builder> newSegments, long runId);

boolean updateRepairSegment(RepairSegment newRepairSegment);

RepairSegment getRepairSegment(long id);
Optional<RepairSegment> getRepairSegment(long id);

RepairSegment getNextFreeSegment(long runId);
Optional<RepairSegment> getNextFreeSegment(long runId);

RepairSegment getNextFreeSegmentInRange(long runId, RingRange range);
Optional<RepairSegment> getNextFreeSegmentInRange(long runId, RingRange range);

/**
* If RepairRun is running, there should always be only one running segment at a time.
* TODO: what if we have parallel repair run on one ring?
*
* @param runId The RepairRun id that should be running to have one running segment.
* @return The running segment, or null in case there is none.
*/
@Nullable
RepairSegment getTheRunningSegment(long runId);
Collection<RepairSegment> getSegmentsWithStateForRun(long runId,
RepairSegment.State segmentState);

Collection<Long> getRepairRunIdsForCluster(String clusterName);

Expand Down

0 comments on commit c01bef2

Please sign in to comment.