Skip to content

Commit

Permalink
Merge branch 'master' into Bj0rnen/sustainJMX
Browse files Browse the repository at this point in the history
Conflicts:
	src/main/java/com/spotify/reaper/resources/TableResource.java
	src/main/java/com/spotify/reaper/service/RepairRunner.java
	src/test/java/com/spotify/reaper/service/RepairRunnerTest.java
  • Loading branch information
Bj0rnen committed Jan 13, 2015
2 parents f82cc0e + 31ded88 commit 7a4e30e
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 165 deletions.
27 changes: 14 additions & 13 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.Nullable;
import javax.management.InstanceNotFoundException;
Expand Down Expand Up @@ -64,8 +64,7 @@ public class JmxProxy implements NotificationListener, Serializable {
private final String host;

private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnector jmxConnector,
StorageServiceMBean ssProxy, ObjectName mbeanName,
MBeanServerConnection mbeanServer) {
StorageServiceMBean ssProxy, ObjectName mbeanName, MBeanServerConnection mbeanServer) {
this.host = host;
this.jmxConnector = jmxConnector;
this.mbeanName = mbeanName;
Expand Down Expand Up @@ -118,8 +117,8 @@ public static JmxProxy connect(Optional<RepairStatusHandler> handler, String hos
try {
JMXConnector jmxConn = JMXConnectorFactory.connect(jmxUrl);
MBeanServerConnection mbeanServerConn = jmxConn.getMBeanServerConnection();
StorageServiceMBean
ssProxy = JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class);
StorageServiceMBean ssProxy =
JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class);
JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, mbeanName, mbeanServerConn);
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
Expand Down Expand Up @@ -153,10 +152,12 @@ public BigInteger apply(String s) {
@Nullable
public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
for (Map.Entry<List<String>, List<String>> entry : ssProxy.getRangeToEndpointMap(keyspace)
.entrySet()) {
if (new RingRange(new BigInteger(entry.getKey().get(0)),
new BigInteger(entry.getKey().get(1))).encloses(tokenRange)) {
Set<Map.Entry<List<String>, List<String>>> entries =
ssProxy.getRangeToEndpointMap(keyspace).entrySet();
for (Map.Entry<List<String>, List<String>> entry : entries) {
BigInteger rangeStart = new BigInteger(entry.getKey().get(0));
BigInteger rangeEnd = new BigInteger(entry.getKey().get(1));
if (new RingRange(rangeStart, rangeEnd).encloses(tokenRange)) {
return entry.getValue();
}
}
Expand Down Expand Up @@ -200,11 +201,11 @@ public List<String> getKeyspaces() {
* @return Repair command number, or 0 if nothing to repair
*/
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
String columnFamily) {
String columnFamily) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
LOG.info(String.format("Triggering repair of range (%s,%s] for %s.%s via host %s",
beginToken.toString(), endToken.toString(), keyspace, columnFamily,
this.host));
String msg = String.format("Triggering repair of range (%s,%s] for %s.%s via host %s",
beginToken.toString(), endToken.toString(), keyspace, columnFamily, this.host);
LOG.info(msg);
return ssProxy.forceRepairRangeAsync(
beginToken.toString(),
endToken.toString(),
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/core/ColumnFamily.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static class Builder {
private boolean snapshotRepair;

public Builder(String clusterName, String keyspaceName, String name, int segmentCount,
boolean snapshotRepair) {
boolean snapshotRepair) {
this.clusterName = clusterName;
this.keyspaceName = keyspaceName;
this.name = name;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static class Builder {
private DateTime endTime;

public Builder(String clusterName, long columnFamilyId, RunState runState,
DateTime creationTime, double intensity) {
DateTime creationTime, double intensity) {
this.clusterName = clusterName;
this.columnFamilyId = columnFamilyId;
this.runState = runState;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public enum State {
DONE
}

private RepairSegment(Builder builder,long id) {
private RepairSegment(Builder builder, long id) {
this.id = id;
this.repairCommandId = builder.repairCommandId;
this.columnFamilyId = builder.columnFamilyId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public Response getCluster(@PathParam("cluster_name") String clusterName) {
}

@POST
public Response addCluster(@Context UriInfo uriInfo,
@QueryParam("seedHost") Optional<String> seedHost) {
public Response addCluster(
@Context UriInfo uriInfo,
@QueryParam("seedHost") Optional<String> seedHost) {
if (!seedHost.isPresent()) {
LOG.error("POST on cluster resource called without seedHost");
return Response.status(400).entity("query parameter \"seedHost\" required").build();
Expand Down Expand Up @@ -129,9 +130,7 @@ public static Cluster createClusterWithSeedHost(String seedHost)
e.printStackTrace();
throw e;
}
Cluster newCluster =
new Cluster(clusterName, partitioner, Collections.singleton(seedHost));
return newCluster;
return new Cluster(clusterName, partitioner, Collections.singleton(seedHost));
}

}
78 changes: 40 additions & 38 deletions src/main/java/com/spotify/reaper/resources/TableResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,25 @@ public TableResource(ReaperApplicationConfiguration config, IStorage storage) {

@GET
@Path("/{clusterName}/{keyspace}/{table}")
public Response getTable(@PathParam("clusterName") String clusterName,
@PathParam("keyspace") String keyspace,
@PathParam("table") String table) {
public Response getTable(
@PathParam("clusterName") String clusterName,
@PathParam("keyspace") String keyspace,
@PathParam("table") String table) {
LOG.info("get table called with: clusterName = {}, keyspace = {}, table = {}",
clusterName, keyspace, table);
return Response.ok().entity("not implemented yet").build();
}

@POST
public Response addTable(@Context UriInfo uriInfo,
@QueryParam("clusterName") Optional<String> clusterName,
@QueryParam("seedHost") Optional<String> seedHost,
@QueryParam("keyspace") Optional<String> keyspace,
@QueryParam("table") Optional<String> table,
@QueryParam("startRepair") Optional<Boolean> startRepair,
@QueryParam("owner") Optional<String> owner,
@QueryParam("cause") Optional<String> cause) {
public Response addTable(
@Context UriInfo uriInfo,
@QueryParam("clusterName") Optional<String> clusterName,
@QueryParam("seedHost") Optional<String> seedHost,
@QueryParam("keyspace") Optional<String> keyspace,
@QueryParam("table") Optional<String> table,
@QueryParam("startRepair") Optional<Boolean> startRepair,
@QueryParam("owner") Optional<String> owner,
@QueryParam("cause") Optional<String> cause) {
LOG.info("add table called with: clusterName = {}, seedHost = {}, keyspace = {}, table = {}, "
+ "owner = {}, cause = {}", clusterName, seedHost, keyspace, table, owner, cause);

Expand All @@ -109,7 +111,8 @@ public Response addTable(@Context UriInfo uriInfo,
} catch (ReaperException e) {
e.printStackTrace();
return Response.status(400)
.entity("failed creating cluster with seed host: " + seedHost.get()).build();
.entity("failed creating cluster with seed host: " + seedHost.get())
.build();
}
Cluster existingCluster = storage.getCluster(targetCluster.getName());
if (existingCluster == null) {
Expand All @@ -122,16 +125,16 @@ public Response addTable(@Context UriInfo uriInfo,
} else if (clusterName.isPresent()) {
targetCluster = storage.getCluster(clusterName.get());
if (null == targetCluster) {
return Response.status(404)
.entity("cluster \"" + clusterName + "\" does not exist").build();
return Response.status(404).entity("cluster \"" + clusterName + "\" does not exist")
.build();
}
} else {
return Response.status(400)
.entity("Query parameter \"clusterName\" or \"seedHost\" required").build();
return Response.status(400).entity("Query parameter \"clusterName\" or \"seedHost\" required")
.build();
}

String newTablePathPart = targetCluster.getName() + "/" + keyspace.get()
+ "/" + table.get();
String newTablePathPart =
String.format("%s/%s/%s", targetCluster.getName(), keyspace.get(), table.get());
URI createdURI;
try {
createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newTablePathPart)).toURI();
Expand All @@ -148,9 +151,9 @@ public Response addTable(@Context UriInfo uriInfo,
if (existingTable == null) {
LOG.info("storing new table");

existingTable = storage.addColumnFamily(
new ColumnFamily.Builder(targetCluster.getName(), keyspace.get(), table.get(),
config.getSegmentCount(), config.getSnapshotRepair()));
ColumnFamily.Builder newCf = new ColumnFamily.Builder(targetCluster.getName(), keyspace.get(),
table.get(), config.getSegmentCount(), config.getSnapshotRepair());
existingTable = storage.addColumnFamily(newCf);

if (existingTable == null) {
return Response.status(500)
Expand Down Expand Up @@ -184,9 +187,9 @@ public Response addTable(@Context UriInfo uriInfo,
}

if (segments == null || seedHosts.isEmpty()) {
return Response.status(404)
.entity("couldn't connect to any of the seed hosts in cluster \"" + existingTable
.getClusterName() + "\"").build();
String errMsg = String.format("couldn't connect to any of the seed hosts in cluster \"%s\"",
existingTable.getClusterName());
return Response.status(404).entity(errMsg).build();
}
} catch (ReaperException e) {
String errMsg = "failed generating segments for new table: " + existingTable;
Expand All @@ -195,14 +198,12 @@ public Response addTable(@Context UriInfo uriInfo,
return Response.status(400).entity(errMsg).build();
}

RepairRun newRepairRun =
storage.addRepairRun(new RepairRun.Builder(targetCluster.getName(),
existingTable.getId(),
RepairRun.RunState.NOT_STARTED,
DateTime.now(),
config.getRepairIntensity())
.cause(cause.isPresent() ? cause.get() : "no cause specified")
.owner(owner.get()));
RepairRun.Builder runBuilder = new RepairRun.Builder(targetCluster.getName(),
existingTable.getId(), RepairRun.RunState.NOT_STARTED, DateTime.now(),
config.getRepairIntensity());
runBuilder.cause(cause.isPresent() ? cause.get() : "no cause specified");
runBuilder.owner(owner.get());
RepairRun newRepairRun = storage.addRepairRun(runBuilder);
if (newRepairRun == null) {
return Response.status(500)
.entity("failed creating repair run into Reaper storage for owner: " + owner.get())
Expand All @@ -214,11 +215,12 @@ public Response addTable(@Context UriInfo uriInfo,
// RepairSegment has a pointer to the RepairRun it lives in.
List<RepairSegment.Builder> repairSegments = Lists.newArrayList();
for (RingRange range : segments) {
repairSegments
.add(new RepairSegment.Builder(newRepairRun.getId(), range,
RepairSegment.State.NOT_STARTED).columnFamilyId(existingTable.getId()));
RepairSegment.Builder repairSegment =
new RepairSegment.Builder(newRepairRun.getId(), range, RepairSegment.State.NOT_STARTED);
repairSegment.columnFamilyId(existingTable.getId());
repairSegments.add(repairSegment);
}
storage.addRepairSegments(repairSegments);
storage.addRepairSegments(repairSegments, newRepairRun.getId());

RepairRunner.startNewRepairRun(storage, newRepairRun.getId(), new JmxConnectionFactory());

Expand All @@ -233,8 +235,8 @@ public Response addTable(@Context UriInfo uriInfo,
return Response.status(400).entity(errMsg).build();
}

return Response.created(createdRepairRunURI)
.entity(new ColumnFamilyStatus(existingTable)).build();
return Response.created(createdRepairRunURI).entity(new ColumnFamilyStatus(existingTable))
.build();
}

}
Loading

0 comments on commit 7a4e30e

Please sign in to comment.