diff --git a/pom.xml b/pom.xml
index 93f35d9e3..7c5367fb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,12 @@
org.apache.cassandra
cassandra-all
${cassandra.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
junit
@@ -41,26 +47,26 @@
-
+
org.apache.maven.plugins
maven-compiler-plugin
diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java
index dffbdff58..4bbcdb58f 100644
--- a/src/main/java/com/spotify/reaper/ReaperApplication.java
+++ b/src/main/java/com/spotify/reaper/ReaperApplication.java
@@ -1,6 +1,6 @@
package com.spotify.reaper;
-import com.spotify.reaper.resources.AddClusterResource;
+import com.spotify.reaper.resources.ClusterResource;
import com.spotify.reaper.resources.AddTableResource;
import com.spotify.reaper.resources.PingResource;
import com.spotify.reaper.resources.RepairTableResource;
@@ -40,7 +40,7 @@ public void run(ReaperApplicationConfiguration config,
IStorage storage = initializeStorage(config, environment);
final PingResource pingResource = new PingResource();
- final AddClusterResource addClusterResource = new AddClusterResource(storage);
+ final ClusterResource addClusterResource = new ClusterResource(storage);
final AddTableResource addTableResource = new AddTableResource();
final RepairTableResource repairTableResource = new RepairTableResource();
diff --git a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java
index 5a4852d81..64210168d 100644
--- a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java
+++ b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java
@@ -26,6 +26,7 @@ public class ReaperApplicationConfiguration extends Configuration {
@Valid
@NotNull
+ @JsonProperty
private DataSourceFactory database = new DataSourceFactory();
@JsonProperty
diff --git a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java
index 6737e3bca..0e61c2326 100644
--- a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java
+++ b/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java
@@ -8,18 +8,26 @@ public class ClusterInfo implements IClusterInfo {
private String seedHost;
private int seedPort = 0;
+ private String clusterName;
private List tokens;
+ public static ClusterInfo getInstance(String seedHost) throws ReaperException {
+ return new ClusterInfo(seedHost, 0).init();
+ }
+
public ClusterInfo(String seedHost, int seedPort) {
this.seedHost = seedHost;
this.seedPort = seedPort;
}
- public void init() throws ReaperException {
- JMXProxy jmx = seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort);
+ public ClusterInfo init() throws ReaperException {
+ JMXProxy jmx =
+ seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort);
tokens = jmx.getTokens();
+ clusterName = jmx.getClusterName();
jmx.close();
+ return this;
}
@Override
@@ -27,4 +35,9 @@ public List getTokens() {
return tokens;
}
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
}
diff --git a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java
index 192b53c48..8299586e0 100644
--- a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java
+++ b/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java
@@ -6,4 +6,5 @@ public interface IClusterInfo {
public List getTokens();
+ public String getClusterName();
}
diff --git a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java b/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java
index 259b0674e..6e7438ede 100644
--- a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java
+++ b/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java
@@ -1,7 +1,9 @@
package com.spotify.reaper.cassandra;
import com.google.common.collect.Lists;
+
import com.spotify.reaper.ReaperException;
+
import org.apache.cassandra.service.StorageServiceMBean;
import java.io.IOException;
@@ -20,11 +22,11 @@ public class JMXProxy {
private static final int DEFAULT_JMX_PORT = 7199;
- private JMXConnector jmxc = null;
+ private JMXConnector jmxConnector = null;
private StorageServiceMBean ssProxy;
- public JMXProxy(JMXConnector jmxc, StorageServiceMBean ssProxy) {
- this.jmxc = jmxc;
+ public JMXProxy(JMXConnector jmxConnector, StorageServiceMBean ssProxy) {
+ this.jmxConnector = jmxConnector;
this.ssProxy = ssProxy;
}
@@ -37,17 +39,17 @@ public static JMXProxy connect(String host, int port) throws ReaperException {
ObjectName name;
try {
jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi",
- host, port));
+ host, port));
name = new ObjectName("org.apache.cassandra.db:type=StorageService");
} catch (MalformedURLException | MalformedObjectNameException e) {
throw new ReaperException("Failure during preparations for JMX connection", e);
}
try {
- JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl);
- MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection();
+ JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl);
+ MBeanServerConnection mbeanServerConn = jmxConnector.getMBeanServerConnection();
StorageServiceMBean
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
- return new JMXProxy(jmxc, ssProxy);
+ return new JMXProxy(jmxConnector, ssProxy);
} catch (IOException e) {
throw new ReaperException("Failure when establishing JMX connection", e);
}
@@ -57,9 +59,13 @@ public List getTokens() {
return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet());
}
+ public String getClusterName() {
+ return ssProxy.getClusterName();
+ }
+
public void close() throws ReaperException {
try {
- jmxc.close();
+ jmxConnector.close();
} catch (IOException e) {
throw new ReaperException(e);
}
diff --git a/src/main/java/com/spotify/reaper/resources/AddClusterResource.java b/src/main/java/com/spotify/reaper/resources/AddClusterResource.java
deleted file mode 100644
index 94627791c..000000000
--- a/src/main/java/com/spotify/reaper/resources/AddClusterResource.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.spotify.reaper.resources;
-
-import com.google.common.base.Optional;
-
-import com.spotify.reaper.storage.IStorage;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-
-@Path("/add_cluster")
-@Produces(MediaType.TEXT_PLAIN)
-public class AddClusterResource {
-
- private static final Logger LOG = LoggerFactory.getLogger(AddClusterResource.class);
-
- private final IStorage storage;
-
- public AddClusterResource(IStorage storage) {
- this.storage = storage;
- }
-
- @POST
- public String addCluster(@QueryParam("host") Optional host) {
- LOG.info("add_cluster called with host: {}", host);
- // TODO: should call the storage here
- return String.format("Not implemented yet");
- }
-
-}
diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java
new file mode 100644
index 000000000..c36b22697
--- /dev/null
+++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java
@@ -0,0 +1,75 @@
+package com.spotify.reaper.resources;
+
+import com.google.common.base.Optional;
+
+import com.spotify.reaper.ReaperException;
+import com.spotify.reaper.cassandra.ClusterInfo;
+import com.spotify.reaper.core.Cluster;
+import com.spotify.reaper.storage.IStorage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URL;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+@Path("/cluster/{name}")
+@Produces(MediaType.APPLICATION_JSON)
+public class ClusterResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class);
+
+ private final IStorage storage;
+
+ public ClusterResource(IStorage storage) {
+ this.storage = storage;
+ }
+
+ @GET
+ public Response getCluster(@PathParam("name") String name) {
+ Cluster cluster = storage.getCluster(name);
+ return Response.ok().entity(cluster).build();
+ }
+
+ @POST
+ public Response addCluster(@Context UriInfo uriInfo, @QueryParam("host") Optional host) {
+ if (!host.isPresent()) {
+ LOG.error("POST on cluster resource called without host");
+ return Response.status(400).entity("query parameter \"host\" required").build();
+ }
+ LOG.info("add cluster called with host: {}", host);
+
+ ClusterInfo clusterInfo;
+ try {
+ clusterInfo = ClusterInfo.getInstance(host.get());
+ } catch (ReaperException e) {
+ String errMsg = "failed to get cluster info from seed host: " + host.get();
+ LOG.error(errMsg);
+ e.printStackTrace();
+ return Response.status(400).entity(errMsg).build();
+ }
+
+ URI createdURI = null;
+ try {
+ createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), "1")).toURI();
+ } catch (Exception e) {
+ LOG.error("failed creating target URI: " + uriInfo.getAbsolutePath());
+ e.printStackTrace();
+ }
+
+ String replyMsg = "cluster with name \"" + clusterInfo.getClusterName() + "\" created";
+ return Response.created(createdURI).entity(replyMsg).build();
+ }
+
+}
diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java
index 44761f07d..a6190575a 100644
--- a/src/main/java/com/spotify/reaper/storage/IStorage.java
+++ b/src/main/java/com/spotify/reaper/storage/IStorage.java
@@ -10,6 +10,8 @@ public interface IStorage {
public Cluster getCluster(String clusterName);
+ public Cluster insertCluster(Cluster newCluster);
+
public RepairRun addRepairRun(RepairRun repairRun);
public RepairRun getRepairRun(long id);
diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java
index 455202198..08a6f7c5c 100644
--- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java
+++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java
@@ -13,6 +13,11 @@ public Cluster getCluster(String clusterName) {
return null;
}
+ @Override
+ public Cluster insertCluster(Cluster newCluster) {
+ return null;
+ }
+
@Override
public RepairRun addRepairRun(RepairRun repairRun) {
return null;
diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java
index ac38a87b0..ce86a62da 100644
--- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java
+++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java
@@ -4,8 +4,10 @@
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.Cluster;
import com.spotify.reaper.core.RepairRun;
+import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL;
import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,8 +28,7 @@ public PostgresStorage(ReaperApplicationConfiguration config, Environment enviro
try {
final DBIFactory factory = new DBIFactory();
jdbi = factory.build(environment, config.getDataSourceFactory(), "postgresql");
- }
- catch (ClassNotFoundException ex) {
+ } catch (ClassNotFoundException ex) {
LOG.error("failed creating database connection: {}", ex);
throw new ReaperException(ex);
}
@@ -35,7 +36,28 @@ public PostgresStorage(ReaperApplicationConfiguration config, Environment enviro
@Override
public Cluster getCluster(String clusterName) {
- return null;
+ Handle h = jdbi.open();
+ IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class);
+ Cluster result = postgres.getCluster(clusterName);
+ h.close();
+ return result;
+ }
+
+ @Override
+ public Cluster insertCluster(Cluster newCluster) {
+ Handle h = jdbi.open();
+ IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class);
+ int rowsAdded = postgres.insertCluster(newCluster);
+ Cluster result;
+ if (rowsAdded < 1) {
+ LOG.warn("failed inserting cluster with name: {}", newCluster.getName());
+ result = null;
+ }
+ else {
+ result = postgres.getCluster(newCluster.getName());
+ }
+ h.close();
+ return result;
}
@Override
diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java
new file mode 100644
index 000000000..1ee8b28cf
--- /dev/null
+++ b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java
@@ -0,0 +1,25 @@
+package com.spotify.reaper.storage.postgresql;
+
+import com.spotify.reaper.core.Cluster;
+
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class ClusterMapper implements ResultSetMapper {
+
+ public Cluster map(int index, ResultSet r, StatementContext ctx) throws SQLException {
+ String[] seedHosts = (String[]) r.getArray("seed_hosts").getArray();
+ return new Cluster.ClusterBuilder()
+ .id(r.getLong("id"))
+ .partitioner(r.getString("partitioner"))
+ .name(r.getString("name"))
+ .seedHosts(new HashSet(Arrays.asList(seedHosts)))
+ .build();
+ }
+
+}
diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java
new file mode 100644
index 000000000..771657991
--- /dev/null
+++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java
@@ -0,0 +1,40 @@
+package com.spotify.reaper.storage.postgresql;
+
+import com.spotify.reaper.core.Cluster;
+
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.BindBean;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+
+/**
+ * JDBI based PostgreSQL interface.
+ *
+ * See following specification for more info: http://jdbi.org/sql_object_api_dml/
+ */
+public interface IStoragePostgreSQL {
+
+ static final String SQL_GET_CLUSTER = "SELECT id, partitioner, name, seed_hosts FROM cluster ";
+
+ static final String SQL_INSERT_CLUSTER = "INSERT INTO cluster (partitioner, name, seed_hosts) "
+ + "VALUES (:partitioner, :name, :seedHosts)";
+
+ static final String SQL_UPDATE_CLUSTER = "UPDATE cluster SET partitioner = :partitioner, "
+ + "name = :name, seed_hosts = :seedHosts WHERE id = :id";
+
+ @SqlQuery(SQL_GET_CLUSTER + "WHERE name = :name")
+ @Mapper(ClusterMapper.class)
+ public Cluster getCluster(@Bind("name") String clusterName);
+
+ @SqlQuery(SQL_GET_CLUSTER + "WHERE id = :id")
+ @Mapper(ClusterMapper.class)
+ public Cluster getCluster(@Bind("id") long id);
+
+ @SqlUpdate(SQL_INSERT_CLUSTER)
+ public int insertCluster(@BindBean Cluster newCluster);
+
+ @SqlUpdate(SQL_UPDATE_CLUSTER)
+ public int updateCluster(@BindBean Cluster newCluster);
+
+}