From cb027024e92b8775499baaf818627967f17fe977 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 2 Dec 2014 15:44:57 +0100 Subject: [PATCH] wip, continue implementation of cluster resource --- pom.xml | 46 +++++++----- .../com/spotify/reaper/ReaperApplication.java | 4 +- .../ReaperApplicationConfiguration.java | 1 + .../spotify/reaper/cassandra/ClusterInfo.java | 17 ++++- .../reaper/cassandra/IClusterInfo.java | 1 + .../spotify/reaper/cassandra/JMXProxy.java | 22 ++++-- .../reaper/resources/AddClusterResource.java | 35 --------- .../reaper/resources/ClusterResource.java | 75 +++++++++++++++++++ .../com/spotify/reaper/storage/IStorage.java | 2 + .../spotify/reaper/storage/MemoryStorage.java | 5 ++ .../reaper/storage/PostgresStorage.java | 28 ++++++- .../storage/postgresql/ClusterMapper.java | 25 +++++++ .../postgresql/IStoragePostgreSQL.java | 40 ++++++++++ 13 files changed, 231 insertions(+), 70 deletions(-) delete mode 100644 src/main/java/com/spotify/reaper/resources/AddClusterResource.java create mode 100644 src/main/java/com/spotify/reaper/resources/ClusterResource.java create mode 100644 src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java create mode 100644 src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java diff --git a/pom.xml b/pom.xml index 93f35d9e3..7c5367fb6 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,12 @@ org.apache.cassandra cassandra-all ${cassandra.version} + + + org.slf4j + slf4j-log4j12 + + junit @@ -41,26 +47,26 @@ - + org.apache.maven.plugins maven-compiler-plugin diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index dffbdff58..4bbcdb58f 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -1,6 +1,6 @@ package com.spotify.reaper; -import com.spotify.reaper.resources.AddClusterResource; +import com.spotify.reaper.resources.ClusterResource; import com.spotify.reaper.resources.AddTableResource; import com.spotify.reaper.resources.PingResource; import com.spotify.reaper.resources.RepairTableResource; @@ -40,7 +40,7 @@ public void run(ReaperApplicationConfiguration config, IStorage storage = initializeStorage(config, environment); final PingResource pingResource = new PingResource(); - final AddClusterResource addClusterResource = new AddClusterResource(storage); + final ClusterResource addClusterResource = new ClusterResource(storage); final AddTableResource addTableResource = new AddTableResource(); final RepairTableResource repairTableResource = new RepairTableResource(); diff --git a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java index 5a4852d81..64210168d 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java +++ b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java @@ -26,6 +26,7 @@ public class ReaperApplicationConfiguration extends Configuration { @Valid @NotNull + @JsonProperty private DataSourceFactory database = new DataSourceFactory(); @JsonProperty diff --git a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java index 6737e3bca..0e61c2326 100644 --- a/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java +++ b/src/main/java/com/spotify/reaper/cassandra/ClusterInfo.java @@ -8,18 +8,26 @@ public class ClusterInfo implements IClusterInfo { private String seedHost; private int seedPort = 0; + private String clusterName; private List tokens; + public static ClusterInfo getInstance(String seedHost) throws ReaperException { + return new ClusterInfo(seedHost, 0).init(); + } + public ClusterInfo(String seedHost, int seedPort) { this.seedHost = seedHost; this.seedPort = seedPort; } - public void init() throws ReaperException { - JMXProxy jmx = seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort); + public ClusterInfo init() throws ReaperException { + JMXProxy jmx = + seedPort == 0 ? JMXProxy.connect(seedHost) : JMXProxy.connect(seedHost, seedPort); tokens = jmx.getTokens(); + clusterName = jmx.getClusterName(); jmx.close(); + return this; } @Override @@ -27,4 +35,9 @@ public List getTokens() { return tokens; } + @Override + public String getClusterName() { + return clusterName; + } + } diff --git a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java b/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java index 192b53c48..8299586e0 100644 --- a/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java +++ b/src/main/java/com/spotify/reaper/cassandra/IClusterInfo.java @@ -6,4 +6,5 @@ public interface IClusterInfo { public List getTokens(); + public String getClusterName(); } diff --git a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java b/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java index 259b0674e..6e7438ede 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JMXProxy.java @@ -1,7 +1,9 @@ package com.spotify.reaper.cassandra; import com.google.common.collect.Lists; + import com.spotify.reaper.ReaperException; + import org.apache.cassandra.service.StorageServiceMBean; import java.io.IOException; @@ -20,11 +22,11 @@ public class JMXProxy { private static final int DEFAULT_JMX_PORT = 7199; - private JMXConnector jmxc = null; + private JMXConnector jmxConnector = null; private StorageServiceMBean ssProxy; - public JMXProxy(JMXConnector jmxc, StorageServiceMBean ssProxy) { - this.jmxc = jmxc; + public JMXProxy(JMXConnector jmxConnector, StorageServiceMBean ssProxy) { + this.jmxConnector = jmxConnector; this.ssProxy = ssProxy; } @@ -37,17 +39,17 @@ public static JMXProxy connect(String host, int port) throws ReaperException { ObjectName name; try { jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", - host, port)); + host, port)); name = new ObjectName("org.apache.cassandra.db:type=StorageService"); } catch (MalformedURLException | MalformedObjectNameException e) { throw new ReaperException("Failure during preparations for JMX connection", e); } try { - JMXConnector jmxc = JMXConnectorFactory.connect(jmxUrl); - MBeanServerConnection mbeanServerConn = jmxc.getMBeanServerConnection(); + JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl); + MBeanServerConnection mbeanServerConn = jmxConnector.getMBeanServerConnection(); StorageServiceMBean ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); - return new JMXProxy(jmxc, ssProxy); + return new JMXProxy(jmxConnector, ssProxy); } catch (IOException e) { throw new ReaperException("Failure when establishing JMX connection", e); } @@ -57,9 +59,13 @@ public List getTokens() { return Lists.newArrayList(ssProxy.getTokenToEndpointMap().keySet()); } + public String getClusterName() { + return ssProxy.getClusterName(); + } + public void close() throws ReaperException { try { - jmxc.close(); + jmxConnector.close(); } catch (IOException e) { throw new ReaperException(e); } diff --git a/src/main/java/com/spotify/reaper/resources/AddClusterResource.java b/src/main/java/com/spotify/reaper/resources/AddClusterResource.java deleted file mode 100644 index 94627791c..000000000 --- a/src/main/java/com/spotify/reaper/resources/AddClusterResource.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.spotify.reaper.resources; - -import com.google.common.base.Optional; - -import com.spotify.reaper.storage.IStorage; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; - -@Path("/add_cluster") -@Produces(MediaType.TEXT_PLAIN) -public class AddClusterResource { - - private static final Logger LOG = LoggerFactory.getLogger(AddClusterResource.class); - - private final IStorage storage; - - public AddClusterResource(IStorage storage) { - this.storage = storage; - } - - @POST - public String addCluster(@QueryParam("host") Optional host) { - LOG.info("add_cluster called with host: {}", host); - // TODO: should call the storage here - return String.format("Not implemented yet"); - } - -} diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java new file mode 100644 index 000000000..c36b22697 --- /dev/null +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -0,0 +1,75 @@ +package com.spotify.reaper.resources; + +import com.google.common.base.Optional; + +import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.ClusterInfo; +import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.storage.IStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URL; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; + +@Path("/cluster/{name}") +@Produces(MediaType.APPLICATION_JSON) +public class ClusterResource { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class); + + private final IStorage storage; + + public ClusterResource(IStorage storage) { + this.storage = storage; + } + + @GET + public Response getCluster(@PathParam("name") String name) { + Cluster cluster = storage.getCluster(name); + return Response.ok().entity(cluster).build(); + } + + @POST + public Response addCluster(@Context UriInfo uriInfo, @QueryParam("host") Optional host) { + if (!host.isPresent()) { + LOG.error("POST on cluster resource called without host"); + return Response.status(400).entity("query parameter \"host\" required").build(); + } + LOG.info("add cluster called with host: {}", host); + + ClusterInfo clusterInfo; + try { + clusterInfo = ClusterInfo.getInstance(host.get()); + } catch (ReaperException e) { + String errMsg = "failed to get cluster info from seed host: " + host.get(); + LOG.error(errMsg); + e.printStackTrace(); + return Response.status(400).entity(errMsg).build(); + } + + URI createdURI = null; + try { + createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), "1")).toURI(); + } catch (Exception e) { + LOG.error("failed creating target URI: " + uriInfo.getAbsolutePath()); + e.printStackTrace(); + } + + String replyMsg = "cluster with name \"" + clusterInfo.getClusterName() + "\" created"; + return Response.created(createdURI).entity(replyMsg).build(); + } + +} diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 44761f07d..a6190575a 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -10,6 +10,8 @@ public interface IStorage { public Cluster getCluster(String clusterName); + public Cluster insertCluster(Cluster newCluster); + public RepairRun addRepairRun(RepairRun repairRun); public RepairRun getRepairRun(long id); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 455202198..08a6f7c5c 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -13,6 +13,11 @@ public Cluster getCluster(String clusterName) { return null; } + @Override + public Cluster insertCluster(Cluster newCluster) { + return null; + } + @Override public RepairRun addRepairRun(RepairRun repairRun) { return null; diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index ac38a87b0..ce86a62da 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -4,8 +4,10 @@ import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +28,7 @@ public PostgresStorage(ReaperApplicationConfiguration config, Environment enviro try { final DBIFactory factory = new DBIFactory(); jdbi = factory.build(environment, config.getDataSourceFactory(), "postgresql"); - } - catch (ClassNotFoundException ex) { + } catch (ClassNotFoundException ex) { LOG.error("failed creating database connection: {}", ex); throw new ReaperException(ex); } @@ -35,7 +36,28 @@ public PostgresStorage(ReaperApplicationConfiguration config, Environment enviro @Override public Cluster getCluster(String clusterName) { - return null; + Handle h = jdbi.open(); + IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + Cluster result = postgres.getCluster(clusterName); + h.close(); + return result; + } + + @Override + public Cluster insertCluster(Cluster newCluster) { + Handle h = jdbi.open(); + IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + int rowsAdded = postgres.insertCluster(newCluster); + Cluster result; + if (rowsAdded < 1) { + LOG.warn("failed inserting cluster with name: {}", newCluster.getName()); + result = null; + } + else { + result = postgres.getCluster(newCluster.getName()); + } + h.close(); + return result; } @Override diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java new file mode 100644 index 000000000..1ee8b28cf --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java @@ -0,0 +1,25 @@ +package com.spotify.reaper.storage.postgresql; + +import com.spotify.reaper.core.Cluster; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; + +public class ClusterMapper implements ResultSetMapper { + + public Cluster map(int index, ResultSet r, StatementContext ctx) throws SQLException { + String[] seedHosts = (String[]) r.getArray("seed_hosts").getArray(); + return new Cluster.ClusterBuilder() + .id(r.getLong("id")) + .partitioner(r.getString("partitioner")) + .name(r.getString("name")) + .seedHosts(new HashSet(Arrays.asList(seedHosts))) + .build(); + } + +} diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java new file mode 100644 index 000000000..771657991 --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -0,0 +1,40 @@ +package com.spotify.reaper.storage.postgresql; + +import com.spotify.reaper.core.Cluster; + +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.Mapper; + +/** + * JDBI based PostgreSQL interface. + * + * See following specification for more info: http://jdbi.org/sql_object_api_dml/ + */ +public interface IStoragePostgreSQL { + + static final String SQL_GET_CLUSTER = "SELECT id, partitioner, name, seed_hosts FROM cluster "; + + static final String SQL_INSERT_CLUSTER = "INSERT INTO cluster (partitioner, name, seed_hosts) " + + "VALUES (:partitioner, :name, :seedHosts)"; + + static final String SQL_UPDATE_CLUSTER = "UPDATE cluster SET partitioner = :partitioner, " + + "name = :name, seed_hosts = :seedHosts WHERE id = :id"; + + @SqlQuery(SQL_GET_CLUSTER + "WHERE name = :name") + @Mapper(ClusterMapper.class) + public Cluster getCluster(@Bind("name") String clusterName); + + @SqlQuery(SQL_GET_CLUSTER + "WHERE id = :id") + @Mapper(ClusterMapper.class) + public Cluster getCluster(@Bind("id") long id); + + @SqlUpdate(SQL_INSERT_CLUSTER) + public int insertCluster(@BindBean Cluster newCluster); + + @SqlUpdate(SQL_UPDATE_CLUSTER) + public int updateCluster(@BindBean Cluster newCluster); + +}