From eca1eaa509a602773ec3cb687d70c8b8bc8bf715 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Wed, 21 Jan 2015 17:38:11 +0100 Subject: [PATCH] WIP removing column family concept --- src/main/db/reaper_db.sql | 26 +-- .../com/spotify/reaper/ReaperApplication.java | 5 +- .../JmxConnectionFactory.java | 11 +- .../spotify/reaper/cassandra/JmxProxy.java | 102 ++++++--- .../com/spotify/reaper/core/RepairRun.java | 16 +- .../spotify/reaper/core/RepairSegment.java | 16 +- .../{ColumnFamily.java => RepairUnit.java} | 30 +-- .../reaper/resources/ClusterResource.java | 59 ++--- .../reaper/resources/RepairRunResource.java | 135 ++++++----- .../reaper/resources/ResourceUtils.java | 37 +++ .../reaper/resources/TableResource.java | 213 ------------------ .../reaper/resources/view/ClusterStatus.java | 7 + .../resources/view/ColumnFamilyStatus.java | 47 ---- .../resources/view/RepairRunStatus.java | 26 ++- .../spotify/reaper/service/RepairRunner.java | 13 +- .../spotify/reaper/service/SegmentRunner.java | 11 +- .../com/spotify/reaper/storage/IStorage.java | 8 +- .../spotify/reaper/storage/MemoryStorage.java | 26 +-- .../reaper/storage/PostgresStorage.java | 14 +- .../postgresql/ColumnFamilyMapper.java | 8 +- .../postgresql/IStoragePostgreSQL.java | 10 +- .../reaper/resources/ClusterResourceTest.java | 3 +- .../resources/RepairRunResourceTest.java | 6 +- .../reaper/resources/TableResourceTest.java | 10 +- .../reaper/service/RepairRunnerTest.java | 9 +- .../reaper/service/SegmentRunnerTest.java | 15 +- 26 files changed, 344 insertions(+), 519 deletions(-) rename src/main/java/com/spotify/reaper/{service => cassandra}/JmxConnectionFactory.java (82%) rename src/main/java/com/spotify/reaper/core/{ColumnFamily.java => RepairUnit.java} (76%) create mode 100644 src/main/java/com/spotify/reaper/resources/ResourceUtils.java delete mode 100644 src/main/java/com/spotify/reaper/resources/TableResource.java delete mode 100644 src/main/java/com/spotify/reaper/resources/view/ColumnFamilyStatus.java diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index 24a9d8c1e..aaf441420 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -10,7 +10,7 @@ -- For cleaning up the database, just do first in the following order: -- DROP TABLE "repair_segment"; -- DROP TABLE "repair_run"; --- DROP TABLE "column_family"; +-- DROP TABLE "repair_unit"; -- DROP TABLE "cluster"; CREATE TABLE IF NOT EXISTS "cluster" ( @@ -19,24 +19,22 @@ CREATE TABLE IF NOT EXISTS "cluster" ( "seed_hosts" TEXT[] NOT NULL ); -CREATE TABLE IF NOT EXISTS "column_family" ( +-- Repair unit is basically a keyspace with a set of column families. +-- Cassandra supports repairing multiple column families in one go. +-- +CREATE TABLE IF NOT EXISTS "repair_unit" ( "id" SERIAL PRIMARY KEY, "cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"), "keyspace_name" TEXT NOT NULL, - "name" TEXT NOT NULL, + "column_families" TEXT[] NOT NULL, "segment_count" INT NOT NULL, "snapshot_repair" BOOLEAN NOT NULL ); --- Preventing duplicate column families within a same cluster and keyspace --- with the following index: -CREATE UNIQUE INDEX column_family_no_duplicates_idx - ON "column_family" ("cluster_name", "keyspace_name", "name"); - CREATE TABLE IF NOT EXISTS "repair_run" ( "id" SERIAL PRIMARY KEY, "cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"), - "column_family_id" INT NOT NULL REFERENCES "column_family" ("id"), + "repair_unit_id" INT NOT NULL REFERENCES "repair_unit" ("id"), "cause" TEXT NOT NULL, "owner" TEXT NOT NULL, -- see (Java) RepairRun.RunState for state values @@ -49,7 +47,7 @@ CREATE TABLE IF NOT EXISTS "repair_run" ( CREATE TABLE IF NOT EXISTS "repair_segment" ( "id" SERIAL PRIMARY KEY, - "column_family_id" INT NOT NULL REFERENCES "column_family" ("id"), + "repair_unit_id" INT NOT NULL REFERENCES "repair_unit" ("id"), "run_id" INT NOT NULL REFERENCES "repair_run" ("id"), "start_token" NUMERIC(50) NOT NULL, "end_token" NUMERIC(50) NOT NULL, @@ -59,14 +57,14 @@ CREATE TABLE IF NOT EXISTS "repair_segment" ( "end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "fail_count" INT NOT NULL DEFAULT 0 ); -CREATE INDEX "repair_segment_run_id_fail_count_start_token_idx" - ON "repair_segment" USING BTREE ("run_id" DESC, "fail_count" ASC, "start_token" ASC); +CREATE INDEX "repair_segment_run_id_fail_count_idx" + ON "repair_segment" USING BTREE ("run_id" ASC, "fail_count" ASC); CREATE INDEX "repair_segment_state_idx" ON "repair_segment" USING BTREE ("state"); GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE cluster TO reaper; -GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE column_family TO reaper; -GRANT USAGE, SELECT ON SEQUENCE column_family_id_seq TO reaper; +GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_unit TO reaper; +GRANT USAGE, SELECT ON SEQUENCE repair_unit_id_seq TO reaper; GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_run TO reaper; GRANT USAGE, SELECT ON SEQUENCE repair_run_id_seq TO reaper; GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_segment TO reaper; diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index 81e4bb7de..ad1ab6b1d 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -13,16 +13,13 @@ */ package com.spotify.reaper; -import com.spotify.reaper.core.RepairRun; -import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.resources.ClusterResource; import com.spotify.reaper.resources.PingResource; import com.spotify.reaper.resources.ReaperHealthCheck; import com.spotify.reaper.resources.RepairRunResource; import com.spotify.reaper.resources.TableResource; -import com.spotify.reaper.service.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RepairRunner; -import com.spotify.reaper.service.SegmentRunner; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; import com.spotify.reaper.storage.PostgresStorage; diff --git a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java b/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java similarity index 82% rename from src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java rename to src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java index 42766a1e0..dad9876b6 100644 --- a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java @@ -11,13 +11,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.spotify.reaper.service; +package com.spotify.reaper.cassandra; import com.google.common.base.Optional; - import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxProxy; -import com.spotify.reaper.cassandra.RepairStatusHandler; +import com.spotify.reaper.core.Cluster; import java.util.Collection; @@ -36,4 +34,9 @@ public final JmxProxy connectAny(Optional handler, Collecti throws ReaperException { return create(handler, hosts.iterator().next()); } + + public final JmxProxy connectAny(Cluster cluster) + throws ReaperException { + return connectAny(Optional.absent(), cluster.getSeedHosts()); + } } diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index 68660b734..bddde8339 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -16,10 +16,8 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Lists; - import com.spotify.reaper.ReaperException; import com.spotify.reaper.service.RingRange; - import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionManagerMBean; @@ -28,26 +26,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.net.MalformedURLException; -import java.util.List; -import java.util.Map; -import java.util.Set; - import javax.annotation.Nullable; -import javax.management.InstanceNotFoundException; -import javax.management.JMX; -import javax.management.ListenerNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.Notification; -import javax.management.NotificationListener; -import javax.management.ObjectName; +import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.util.*; import static com.google.common.base.Preconditions.checkNotNull; @@ -62,10 +50,10 @@ public class JmxProxy implements NotificationListener, Serializable, AutoCloseab private final JMXConnector jmxConnector; private final ObjectName ssMbeanName; private final MBeanServerConnection mbeanServer; + private final CompactionManagerMBean cmProxy; private final StorageServiceMBean ssProxy; private final Optional repairStatusHandler; private final String host; - private final CompactionManagerMBean cmProxy; private JmxProxy(Optional handler, String host, JMXConnector jmxConnector, StorageServiceMBean ssProxy, ObjectName ssMbeanName, MBeanServerConnection mbeanServer, @@ -79,13 +67,6 @@ private JmxProxy(Optional handler, String host, JMXConnecto this.cmProxy = cmProxy; } - /** - * Connect to JMX interface on the given host and default JMX port without RepairStatusHandler. - */ - public static JmxProxy connect(String host) throws ReaperException { - return connect(Optional.absent(), host); - } - /** * Connect to JMX interface on the given host and default JMX port. * @@ -128,14 +109,14 @@ public static JmxProxy connect(Optional handler, String hos JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class); CompactionManagerMBean cmProxy = JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class); - JmxProxy proxy = - new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy); + JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, + mbeanServerConn, cmProxy); // registering a listener throws bunch of exceptions, so we do it here rather than in the // constructor mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null); LOG.info(String.format("JMX connection to %s properly connected.", host)); return proxy; - } catch (IOException | InstanceNotFoundException e) { + } catch (IOException | InstanceNotFoundException | MalformedObjectNameException e) { LOG.error("Failed to establish JMX connection"); throw new ReaperException("Failure when establishing JMX connection", e); } @@ -202,6 +183,26 @@ public List getKeyspaces() { return ssProxy.getKeyspaces(); } + public List getTableNamesForKeyspace(String keyspace) throws ReaperException { + List tableNames = new ArrayList<>(); + Iterator> proxies = null; + try { + proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer); + } catch (IOException | MalformedObjectNameException e) { + e.printStackTrace(); + throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX"); + } + while (proxies.hasNext()) { + Map.Entry proxyEntry = proxies.next(); + String keyspaceName = proxyEntry.getKey(); + if (keyspace.equalsIgnoreCase(keyspaceName)) { + ColumnFamilyStoreMBean columnFamilyMBean = proxyEntry.getValue(); + tableNames.add(columnFamilyMBean.getColumnFamilyName()); + } + } + return tableNames; + } + /** * @return number of pending compactions on the node this proxy is connected to */ @@ -245,7 +246,7 @@ public boolean tableExists(String ks, String cf) throws ReaperException { /** * Triggers a repair of range (beginToken, endToken] for given keyspace and column family. - * + *

* The repair is triggered by {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairRangeAsync} * For time being, we don't allow local nor snapshot repairs. * @@ -268,7 +269,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys /** * Invoked when the MBean this class listens to publishes an event. - * + *

* We're only interested in repair-related events. Their format is explained at * {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync} * The format is: notification type: "repair" notification @@ -320,3 +321,42 @@ public void close() throws ReaperException { } } } + +/** + * This code is copied and adjusted from from NodeProbe.java from Cassandra source. + */ +class ColumnFamilyStoreMBeanIterator + implements Iterator> { + + static Iterator> getColumnFamilyStoreMBeanProxies( + MBeanServerConnection mbeanServerConn) + throws IOException, MalformedObjectNameException { + return new ColumnFamilyStoreMBeanIterator(mbeanServerConn); + } + + private Iterator resIter; + private MBeanServerConnection mbeanServerConn; + + public ColumnFamilyStoreMBeanIterator(MBeanServerConnection mbeanServerConn) + throws MalformedObjectNameException, NullPointerException, IOException { + ObjectName query = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*"); + resIter = mbeanServerConn.queryNames(query, null).iterator(); + this.mbeanServerConn = mbeanServerConn; + } + + public boolean hasNext() { + return resIter.hasNext(); + } + + public Map.Entry next() { + ObjectName objectName = resIter.next(); + String keyspaceName = objectName.getKeyProperty("keyspace"); + ColumnFamilyStoreMBean cfsProxy = + JMX.newMBeanProxy(mbeanServerConn, objectName, ColumnFamilyStoreMBean.class); + return new AbstractMap.SimpleImmutableEntry<>(keyspaceName, cfsProxy); + } + + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 7ef7d0cdf..0532877e3 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -26,7 +26,7 @@ public class RepairRun { private final String cause; private final String owner; private final String clusterName; - private final long columnFamilyId; + private final long repairUnitId; private final RunState runState; private final DateTime creationTime; private final DateTime startTime; @@ -37,8 +37,8 @@ public long getId() { return id; } - public long getColumnFamilyId() { - return columnFamilyId; + public long getRepairUnitId() { + return repairUnitId; } public String getClusterName() { @@ -84,7 +84,7 @@ public enum RunState { private RepairRun(Builder builder, long id) { this.id = id; this.clusterName = builder.clusterName; - this.columnFamilyId = builder.columnFamilyId; + this.repairUnitId = builder.repairUnitId; this.cause = builder.cause; this.owner = builder.owner; this.runState = builder.runState; @@ -101,7 +101,7 @@ public Builder with() { public static class Builder { public final String clusterName; - public final long columnFamilyId; + public final long repairUnitId; private RunState runState; private DateTime creationTime; private double intensity; @@ -110,10 +110,10 @@ public static class Builder { private DateTime startTime; private DateTime endTime; - public Builder(String clusterName, long columnFamilyId, DateTime creationTime, + public Builder(String clusterName, long repairUnitId, DateTime creationTime, double intensity) { this.clusterName = clusterName; - this.columnFamilyId = columnFamilyId; + this.repairUnitId = repairUnitId; this.runState = RunState.NOT_STARTED; this.creationTime = creationTime; this.intensity = intensity; @@ -121,7 +121,7 @@ public Builder(String clusterName, long columnFamilyId, DateTime creationTime, private Builder(RepairRun original) { clusterName = original.clusterName; - columnFamilyId = original.columnFamilyId; + repairUnitId = original.repairUnitId; runState = original.runState; creationTime = original.creationTime; intensity = original.intensity; diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index 936673f68..723dbce10 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -23,7 +23,7 @@ public class RepairSegment { private final long id; private final Integer repairCommandId; // received when triggering repair in Cassandra - private final long columnFamilyId; + private final long repairUnitId; private final long runId; private final RingRange tokenRange; private final State state; @@ -39,8 +39,8 @@ public Integer getRepairCommandId() { return repairCommandId; } - public long getColumnFamilyId() { - return columnFamilyId; + public long getRepairUnitId() { + return repairUnitId; } public long getRunId() { @@ -84,7 +84,7 @@ public enum State { private RepairSegment(Builder builder, long id) { this.id = id; this.repairCommandId = builder.repairCommandId; - this.columnFamilyId = builder.columnFamilyId; + this.repairUnitId = builder.repairUnitId; this.runId = builder.runId; this.tokenRange = builder.tokenRange; this.state = builder.state; @@ -101,17 +101,17 @@ public static class Builder { public final long runId; public final RingRange tokenRange; - private final long columnFamilyId; + private final long repairUnitId; private State state; private int failCount; private Integer repairCommandId; private DateTime startTime; private DateTime endTime; - public Builder(long runId, RingRange tokenRange, long columnFamilyId) { + public Builder(long runId, RingRange tokenRange, long repairUnitId) { this.runId = runId; this.tokenRange = tokenRange; - this.columnFamilyId = columnFamilyId; + this.repairUnitId = repairUnitId; this.state = State.NOT_STARTED; this.failCount = 0; } @@ -121,7 +121,7 @@ private Builder(RepairSegment original) { tokenRange = original.tokenRange; state = original.state; failCount = original.failCount; - columnFamilyId = original.columnFamilyId; + repairUnitId = original.repairUnitId; repairCommandId = original.repairCommandId; startTime = original.startTime; endTime = original.endTime; diff --git a/src/main/java/com/spotify/reaper/core/ColumnFamily.java b/src/main/java/com/spotify/reaper/core/RepairUnit.java similarity index 76% rename from src/main/java/com/spotify/reaper/core/ColumnFamily.java rename to src/main/java/com/spotify/reaper/core/RepairUnit.java index c91191656..ef3a32365 100644 --- a/src/main/java/com/spotify/reaper/core/ColumnFamily.java +++ b/src/main/java/com/spotify/reaper/core/RepairUnit.java @@ -13,12 +13,14 @@ */ package com.spotify.reaper.core; -public class ColumnFamily { +import java.util.Set; + +public class RepairUnit { private final long id; private final String clusterName; private final String keyspaceName; - private final String name; + private final Set columnFamilies; private final int segmentCount; private final boolean snapshotRepair; @@ -34,8 +36,8 @@ public String getKeyspaceName() { return keyspaceName; } - public String getName() { - return name; + public Set getColumnFamilies() { + return columnFamilies; } public int getSegmentCount() { @@ -46,11 +48,11 @@ public boolean isSnapshotRepair() { return snapshotRepair; } - private ColumnFamily(Builder builder, long id) { + private RepairUnit(Builder builder, long id) { this.id = id; this.clusterName = builder.clusterName; this.keyspaceName = builder.keyspaceName; - this.name = builder.name; + this.columnFamilies = builder.columnFamilies; this.segmentCount = builder.segmentCount; this.snapshotRepair = builder.snapshotRepair; } @@ -63,23 +65,23 @@ public static class Builder { public final String clusterName; public final String keyspaceName; - public final String name; + public final Set columnFamilies; private int segmentCount; private boolean snapshotRepair; - public Builder(String clusterName, String keyspaceName, String name, int segmentCount, - boolean snapshotRepair) { + public Builder(String clusterName, String keyspaceName, Set columnFamilies, + int segmentCount, boolean snapshotRepair) { this.clusterName = clusterName; this.keyspaceName = keyspaceName; - this.name = name; + this.columnFamilies = columnFamilies; this.segmentCount = segmentCount; this.snapshotRepair = snapshotRepair; } - private Builder(ColumnFamily original) { + private Builder(RepairUnit original) { clusterName = original.clusterName; keyspaceName = original.keyspaceName; - name = original.name; + columnFamilies = original.columnFamilies; segmentCount = original.segmentCount; snapshotRepair = original.snapshotRepair; } @@ -94,8 +96,8 @@ public Builder snapshotRepair(boolean snapshotRepair) { return this; } - public ColumnFamily build(long id) { - return new ColumnFamily(this, id); + public RepairUnit build(long id) { + return new RepairUnit(this, id); } } } diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index e08d7924e..2bd9985cd 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -14,35 +14,26 @@ package com.spotify.reaper.resources; import com.google.common.base.Optional; - import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.resources.view.ClusterStatus; -import com.spotify.reaper.service.JmxConnectionFactory; import com.spotify.reaper.storage.IStorage; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; - @Path("/cluster") @Produces(MediaType.APPLICATION_JSON) public class ClusterResource { @@ -52,9 +43,9 @@ public class ClusterResource { private final JmxConnectionFactory jmxFactory; private final IStorage storage; - public ClusterResource(IStorage storage, JmxConnectionFactory factory) { + public ClusterResource(IStorage storage, JmxConnectionFactory jmxFactory) { this.storage = storage; - this.jmxFactory = factory; + this.jmxFactory = jmxFactory; } @GET @@ -73,9 +64,7 @@ public Response getClusterList() { public Response getCluster(@PathParam("cluster_name") String clusterName) { LOG.info("get cluster called with cluster_name: {}", clusterName); Cluster cluster = storage.getCluster(clusterName); - ClusterStatus view = new ClusterStatus(cluster); - view.setRepairRunIds(storage.getRepairRunIdsForCluster(cluster.getName())); - return Response.ok().entity(view).build(); + return viewCluster(cluster, Optional.absent()); } @POST @@ -90,7 +79,7 @@ public Response addCluster( Cluster newCluster; try { - newCluster = createClusterWithSeedHost(seedHost.get(), jmxFactory); + newCluster = ResourceUtils.createClusterWithSeedHost(seedHost.get(), jmxFactory); } catch (ReaperException e) { return Response.status(400) .entity("failed to create cluster with seed host: " + seedHost.get()).build(); @@ -106,7 +95,7 @@ public Response addCluster( .build(); } - URI createdURI = null; + URI createdURI; try { createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName())).toURI(); } catch (Exception e) { @@ -116,24 +105,26 @@ public Response addCluster( return Response.status(400).entity(errMsg).build(); } - return Response.created(createdURI).entity(new ClusterStatus(newCluster)).build(); + return viewCluster(newCluster, Optional.of(createdURI)); } - public static Cluster createClusterWithSeedHost(String seedHost, JmxConnectionFactory factory) - throws ReaperException { - String clusterName; - String partitioner; + private Response viewCluster(Cluster cluster, Optional createdURI) { + ClusterStatus view = new ClusterStatus(cluster); + view.setRepairRunIds(storage.getRepairRunIdsForCluster(cluster.getName())); try { - JmxProxy jmxProxy = factory.create(seedHost); - clusterName = jmxProxy.getClusterName(); - partitioner = jmxProxy.getPartitioner(); - jmxProxy.close(); + JmxProxy jmx = this.jmxFactory.connectAny(cluster); + view.setKeyspaces(jmx.getKeyspaces()); + jmx.close(); } catch (ReaperException e) { - LOG.error("failed to create cluster with seed host: " + seedHost); e.printStackTrace(); - throw e; + LOG.error("failed connecting JMX", e); + return Response.status(500).entity("failed connecting given clusters JMX endpoint").build(); + } + if (createdURI.isPresent()) { + return Response.created(createdURI.get()).entity(view).build(); + } else { + return Response.ok().entity(view).build(); } - return new Cluster(clusterName, partitioner, Collections.singleton(seedHost)); } } diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 029c9390b..99bfd61aa 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -20,17 +20,16 @@ import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.resources.view.RepairRunStatus; -import com.spotify.reaper.service.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RepairRunner; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentGenerator; import com.spotify.reaper.storage.IStorage; -import org.apache.cassandra.db.Column; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,19 +86,24 @@ public RepairRunResource(ReaperApplicationConfiguration config, IStorage storage * must be called to initiate the repair. Creating a repair run includes generating repair * segments. * - * @return repair run ID in case of everything going well, 500 in case of errors. + * Notice that query parameter "tables" can be a single String, or a comma-separated list + * of table names. If the "tables" parameter is omitted, and only the keyspace is defined, + * then created repair run will target all the tables in the keyspace. + * + * @return repair run ID in case of everything going well, + * and a status code 500 in case of errors. */ @POST public Response addRepairRun( @Context UriInfo uriInfo, @QueryParam("clusterName") Optional clusterName, @QueryParam("keyspace") Optional keyspace, - @QueryParam("table") Optional tableName, + @QueryParam("tables") Optional tableNames, @QueryParam("owner") Optional owner, @QueryParam("cause") Optional cause) { - LOG.info("add repair run called with: clusterName = {}, keyspace = {}, table = {}, owner = {}," - + " cause = {}", clusterName, keyspace, tableName, owner, cause); + LOG.info("add repair run called with: clusterName = {}, keyspace = {}, tables = {}, owner = {}," + + " cause = {}", clusterName, keyspace, tableNames, owner, cause); try { if (!clusterName.isPresent()) { @@ -108,17 +112,26 @@ public Response addRepairRun( if (!keyspace.isPresent()) { throw new ReaperException("\"keyspace\" argument missing"); } - if (!tableName.isPresent()) { - throw new ReaperException("\"tableName\" argument missing"); - } if (!owner.isPresent()) { throw new ReaperException("\"owner\" argument missing"); } + Cluster cluster = getCluster(clusterName.get()); - ColumnFamily table = getTable(clusterName.get(), keyspace.get(), tableName.get()); - RepairRun newRepairRun = registerRepairRun(cluster, table, cause, owner.get()); + JmxProxy jmxProxy = jmxFactory.create(cluster.getSeedHosts().iterator().next()); + List knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get()); + if (knownTables.size() == 0) { + LOG.debug("no known tables for keyspace {} in cluster {}", keyspace.get(), + clusterName.get()); + return Response.status(Response.Status.NOT_FOUND).entity( + "no column families found for keyspace").build(); + } + + jmxProxy.close(); + + RepairUnit repairUnit = getRepairUnit(clusterName.get(), keyspace.get(), tableNames.get()); + RepairRun newRepairRun = registerRepairRun(cluster, repairUnit, cause, owner.get()); return Response.created(buildRepairRunURI(uriInfo, newRepairRun)) - .entity(new RepairRunStatus(newRepairRun, table)) + .entity(new RepairRunStatus(newRepairRun, repairUnit)) .build(); } catch (ReaperException e) { LOG.error(e.getMessage()); @@ -141,32 +154,32 @@ public Response modifyRunState( @PathParam("id") Long repairRunId, @QueryParam("state") Optional state) { - LOG.info("pause repair run called with: runId = {}", repairRunId); + LOG.info("pause repair run called with: id = {}, state = {}", repairRunId, state); if (!state.isPresent()) { return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) - .entity("New state not specified") + .entity("\"state\" argument missing") .build(); } try { - RepairRun repairRun = fetchRepairRun(repairRunId); - ColumnFamily table = storage.getColumnFamily(repairRun.getColumnFamilyId()); + RepairRun repairRun = getRepairRun(repairRunId); + RepairUnit repairUnit = storage.getRepairUnit(repairRun.getRepairUnitId()); RepairRun.RunState newState = RepairRun.RunState.valueOf(state.get()); RepairRun.RunState oldState = repairRun.getRunState(); if (oldState == newState) { - return Response.status(Response.Status.NOT_MODIFIED).build(); + return Response.ok("given \"state\" is same as the current run state").build(); } if (isStarting(oldState, newState)) { - return startRun(repairRun, table); + return startRun(repairRun, repairUnit); } if (isPausing(oldState, newState)) { - return pauseRun(repairRun, table); + return pauseRun(repairRun, repairUnit); } if (isResuming(oldState, newState)) { - return resumeRun(repairRun, table); + return resumeRun(repairRun, repairUnit); } String errMsg = String.format("Transition %s->%s not supported.", newState.toString(), oldState.toString()); @@ -191,7 +204,7 @@ private boolean isResuming(RepairRun.RunState oldState, RepairRun.RunState newSt return oldState == RepairRun.RunState.PAUSED && newState == RepairRun.RunState.RUNNING; } - private Response startRun(RepairRun repairRun, ColumnFamily table) { + private Response startRun(RepairRun repairRun, RepairUnit repairUnit) { LOG.info("Starting run {}", repairRun.getId()); RepairRun updatedRun = repairRun.with() .runState(RepairRun.RunState.RUNNING) @@ -199,26 +212,26 @@ private Response startRun(RepairRun repairRun, ColumnFamily table) { .build(repairRun.getId()); storage.updateRepairRun(updatedRun); RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory); - return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, table)) + return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, repairUnit)) .build(); } - private Response pauseRun(RepairRun repairRun, ColumnFamily table) { + private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit) { LOG.info("Pausing run {}", repairRun.getId()); RepairRun updatedRun = repairRun.with() .runState(RepairRun.RunState.PAUSED) .build(repairRun.getId()); storage.updateRepairRun(updatedRun); - return Response.ok().entity(new RepairRunStatus(repairRun, table)).build(); + return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build(); } - private Response resumeRun(RepairRun repairRun, ColumnFamily table) { + private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) { LOG.info("Resuming run {}", repairRun.getId()); RepairRun updatedRun = repairRun.with() .runState(RepairRun.RunState.RUNNING) .build(repairRun.getId()); storage.updateRepairRun(updatedRun); - return Response.ok().entity(new RepairRunStatus(repairRun, table)).build(); + return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build(); } /** @@ -228,14 +241,13 @@ private Response resumeRun(RepairRun repairRun, ColumnFamily table) { @Path("/{id}") public Response getRepairRun(@PathParam("id") Long repairRunId) { LOG.info("get repair_run called with: id = {}", repairRunId); - try { - RepairRun repairRun = fetchRepairRun(repairRunId); + RepairRun repairRun = storage.getRepairRun(repairRunId); + if (null != repairRun) { return Response.ok().entity(getRepairRunStatus(repairRun)).build(); - } catch (ReaperException e) { - e.printStackTrace(); - LOG.error(e.getMessage()); - return Response.status(404).entity("repair run \"" + repairRunId + "\" does not exist") - .build(); + } + else { + return Response.status(404).entity( + "repair run with id " + repairRunId + " doesn't exist").build(); } } @@ -267,43 +279,30 @@ private Cluster getCluster(String clusterName) throws ReaperException { } /** - * @return table information for given cluster, keyspace and table name + * @return repair unit information for given cluster, keyspace and table name * @throws ReaperException if such table is not found in Reaper's storage */ - private ColumnFamily getTable(String clusterName, String keyspace, - String tableName) throws ReaperException { - ColumnFamily cf = storage.getColumnFamily(clusterName, keyspace, tableName); - if (cf == null) { + private RepairUnit getRepairUnit(String clusterName, String keyspace, + Collection tableNames) throws ReaperException { + RepairUnit repairUnit = storage.getRepairUnit(clusterName, keyspace, tableName); + if (repairUnit == null) { throw new ReaperException(String.format("Column family \"%s/%s/%s\" not found", clusterName, keyspace, tableName)); } - return cf; + return repairUnit; } /** * @return table information for given table id * @throws ReaperException if such table is not found in Reaper's storage */ - private ColumnFamily getTable(long columnFamilyId) throws ReaperException { - ColumnFamily cf = storage.getColumnFamily(columnFamilyId); - if (cf == null) { + private RepairUnit getRepairUnit(long repairUnitId) throws ReaperException { + RepairUnit repairUnit = storage.getRepairUnit(repairUnitId); + if (repairUnit == null) { throw new ReaperException(String.format("Column family with id \"%d\" not found", - columnFamilyId)); - } - return cf; - } - - /** - * @return repair run given its ID - * @throws ReaperException if the run is not found - * @param repairRunId - */ - private RepairRun fetchRepairRun(Long repairRunId) throws ReaperException { - RepairRun repairRun = storage.getRepairRun(repairRunId); - if (repairRun == null) { - throw new ReaperException(String.format("Repair run with id = %s not found", repairRunId)); + repairUnitId)); } - return repairRun; + return repairUnit; } /** @@ -315,17 +314,17 @@ private RepairRun fetchRepairRun(Long repairRunId) throws ReaperException { * 3) create RepairSegment instances linked to RepairRun. these are directly stored in storage * @throws ReaperException if repair run fails to be stored in Reaper's storage */ - private RepairRun registerRepairRun(Cluster cluster, ColumnFamily table, Optional cause, - String owner) throws ReaperException { + private RepairRun registerRepairRun(Cluster cluster, RepairUnit repairUnit, + Optional cause, String owner) throws ReaperException { // preparing a repair run involves several steps // the first step is to generate token segments - List tokenSegments = generateSegments(cluster, table); + List tokenSegments = generateSegments(cluster, repairUnit); checkNotNull(tokenSegments, "failed generating repair segments"); // the next step is to prepare a repair run object - RepairRun repairRun = storeNewRepairRun(cluster, table, cause, owner); + RepairRun repairRun = storeNewRepairRun(cluster, repairUnit, cause, owner); checkNotNull(repairRun, "failed preparing repair run"); // Notice that our RepairRun core object doesn't contain pointer to @@ -333,7 +332,7 @@ private RepairRun registerRepairRun(Cluster cluster, ColumnFamily table, Optiona // However, RepairSegment has a pointer to the RepairRun it lives in // the last preparation step is to generate actual repair segments - storeNewRepairSegments(tokenSegments, repairRun, table); + storeNewRepairSegments(tokenSegments, repairRun, repairUnit); // now we're done and can return return repairRun; @@ -345,7 +344,7 @@ private RepairRun registerRepairRun(Cluster cluster, ColumnFamily table, Optiona * @throws ReaperException when fails to discover seeds for the cluster or fails to connect to * any of the nodes in the Cluster. */ - private List generateSegments(Cluster targetCluster, ColumnFamily existingTable) + private List generateSegments(Cluster targetCluster, RepairUnit existingTable) throws ReaperException { List segments = null; SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner()); @@ -381,7 +380,7 @@ private List generateSegments(Cluster targetCluster, ColumnFamily exi * @return * @throws ReaperException when fails to store the RepairRun. */ - private RepairRun storeNewRepairRun(Cluster cluster, ColumnFamily table, Optional cause, + private RepairRun storeNewRepairRun(Cluster cluster, RepairUnit table, Optional cause, String owner) throws ReaperException { RepairRun.Builder runBuilder = new RepairRun.Builder(cluster.getName(), table.getId(), DateTime.now(), config.getRepairIntensity()); @@ -402,7 +401,7 @@ private RepairRun storeNewRepairRun(Cluster cluster, ColumnFamily table, Optiona * storage backend. */ private void storeNewRepairSegments(List tokenSegments, RepairRun repairRun, - ColumnFamily table) { + RepairUnit table) { List repairSegmentBuilders = Lists.newArrayList(); for (RingRange range : tokenSegments) { RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range, @@ -417,8 +416,8 @@ private void storeNewRepairSegments(List tokenSegments, RepairRun rep * @return only a status of a repair run, not the entire repair run info. */ private RepairRunStatus getRepairRunStatus(RepairRun repairRun) { - ColumnFamily columnFamily = storage.getColumnFamily(repairRun.getColumnFamilyId()); - RepairRunStatus repairRunStatus = new RepairRunStatus(repairRun, columnFamily); + RepairUnit repairUnit = storage.getColumnFamily(repairRun.getRepairUnitId()); + RepairRunStatus repairRunStatus = new RepairRunStatus(repairRun, repairUnit); if (repairRun.getRunState() != RepairRun.RunState.NOT_STARTED) { int segmentsRepaired = storage.getSegmentAmountForRepairRun(repairRun.getId(), RepairSegment.State.DONE); diff --git a/src/main/java/com/spotify/reaper/resources/ResourceUtils.java b/src/main/java/com/spotify/reaper/resources/ResourceUtils.java new file mode 100644 index 000000000..91d854043 --- /dev/null +++ b/src/main/java/com/spotify/reaper/resources/ResourceUtils.java @@ -0,0 +1,37 @@ +package com.spotify.reaper.resources; + +import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxProxy; +import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.cassandra.JmxConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * Generic utilities for actions within the resources. + * Handles JMX and Storage layer access. + */ +public class ResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class); + + public static Cluster createClusterWithSeedHost(String seedHost, JmxConnectionFactory factory) + throws ReaperException { + String clusterName; + String partitioner; + try { + JmxProxy jmxProxy = factory.create(seedHost); + clusterName = jmxProxy.getClusterName(); + partitioner = jmxProxy.getPartitioner(); + jmxProxy.close(); + } catch (ReaperException e) { + LOG.error("failed to create cluster with seed host: " + seedHost); + e.printStackTrace(); + throw e; + } + return new Cluster(clusterName, partitioner, Collections.singleton(seedHost)); + } + +} diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java deleted file mode 100644 index 6f2fd237c..000000000 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.spotify.reaper.resources; - -import com.google.common.base.Optional; - -import com.spotify.reaper.ReaperApplicationConfiguration; -import com.spotify.reaper.ReaperException; -import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; -import com.spotify.reaper.resources.view.ColumnFamilyStatus; -import com.spotify.reaper.service.JmxConnectionFactory; -import com.spotify.reaper.storage.IStorage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; - -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; - -@Path("/table") -@Produces(MediaType.APPLICATION_JSON) -public class TableResource { - - private static final Logger LOG = LoggerFactory.getLogger(TableResource.class); - - private final IStorage storage; - private final ReaperApplicationConfiguration config; - private final JmxConnectionFactory jmxFactory; - - public TableResource(ReaperApplicationConfiguration config, IStorage storage) { - this.storage = storage; - this.config = config; - this.jmxFactory = new JmxConnectionFactory(); - } - - public TableResource(ReaperApplicationConfiguration config, IStorage storage, - JmxConnectionFactory jmxFactory) { - this.storage = storage; - this.config = config; - this.jmxFactory = jmxFactory; - } - - - /** - * Will return repair status of a table. - * @return - */ - @GET - @Path("/{clusterName}/{keyspace}/{table}") - public Response getTable( - @PathParam("clusterName") String clusterName, - @PathParam("keyspace") String keyspace, - @PathParam("table") String table) { - LOG.info("get table called with: clusterName = {}, keyspace = {}, table = {}", - clusterName, keyspace, table); - return Response.ok().entity("not implemented yet").build(); - } - - /** - * Very beastly endpoint to register a new table, with the option to immediately trigger - * a repair. - * - * @return 500 if something goes wrong or args are missing, 200+ if requested operation - * is successful. - */ - @POST - public Response addTable( - @Context UriInfo uriInfo, - @QueryParam("clusterName") Optional clusterName, - @QueryParam("keyspace") Optional keyspace, - @QueryParam("table") Optional tableName) { - - LOG.info("add table called with: clusterName = {}, keyspace = {}, table = {}", clusterName, - keyspace, tableName); - - try { - if (!clusterName.isPresent()) { - throw new ReaperException("\"clusterName\" argument missing"); - } - if (!keyspace.isPresent()) { - throw new ReaperException("\"keyspace\" argument missing"); - } - if (!tableName.isPresent()) { - throw new ReaperException("\"tableName\" argument missing"); - } - String clusterNameStr = clusterName.get(); - String keyspaceStr = keyspace.get(); - String tableNameStr = tableName.get(); - URI tableUri = buildTableUri(uriInfo, clusterNameStr, keyspaceStr, tableNameStr); - ColumnFamily newTable = registerNewTable(tableUri, clusterNameStr, keyspaceStr, tableNameStr); - return Response.created(buildTableUri(uriInfo, clusterNameStr, keyspaceStr, tableNameStr)) - .entity(new ColumnFamilyStatus(newTable)) - .build(); - } catch (ReaperException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); - return Response.status(500).entity(e.getMessage()).build(); - } - } - - /** - * Builds an URI used to describe a table. - * @throws ReaperException - */ - private URI buildTableUri(UriInfo uriInfo, String clusterName, String keyspace, String table) - throws ReaperException { - String tablePath = String.format("%s/%s/%s", clusterName, keyspace, table); - try { - return new URL(uriInfo.getAbsolutePath().toURL(), tablePath).toURI(); - } catch (MalformedURLException | URISyntaxException e) { - e.printStackTrace(); - throw new ReaperException(e); - } - } - - /** - * Registers a new table by first fetching a cluster info from the storage backend, and - * consequently storing the table. - * - * Cluster is keyed by seedHost if present, otherwise clusterName is used. - * - * @return - * @throws ReaperException from below - */ - private ColumnFamily registerNewTable(URI tableUri, String clusterName, String keyspace, - String table) throws ReaperException { - // fetch information about the cluster the table is added to - Cluster targetCluster = storage.getCluster(clusterName); - if (targetCluster == null) { - String errMsg = String.format("Failed to fetch cluster for table \"%s\"", table); - throw new ReaperException(errMsg); - } - - // store the new table - return storeNewTable(tableUri, targetCluster, keyspace, table); - } - - /** - * Stores a table information into the storage. - * @return - * @throws ReaperException if table already exists in the storage or in Cassandra cluster. - */ - private ColumnFamily storeNewTable(URI createdUri, Cluster cluster, String keyspace, String table) - throws ReaperException { - String clusterName = cluster.getName(); - - // check if the table doesn't already exists in Reaper's storage - ColumnFamily existingTable = storage.getColumnFamily(clusterName, keyspace, table); - if (existingTable != null) { - String errMsg = String.format("table \"%s\" already exists", createdUri.toString()); - throw new ReaperException(errMsg); - } - - // check if the table actually exists in the Cassandra cluster - if (!existsInCluster(cluster, keyspace, table)) { - String errMsg = String.format("table \"%s\" doesn't exist in Cassandra", - createdUri.toString()); - throw new ReaperException(errMsg); - } - - // actually store the new table - LOG.info(String.format("storing new table \"%s\"", createdUri.toString())); - ColumnFamily.Builder newCf = new ColumnFamily.Builder(clusterName, keyspace, table, - config.getSegmentCount(), config.getSnapshotRepair()); - existingTable = storage.addColumnFamily(newCf); - if (existingTable == null) { - String errMsg = String.format("failed storing new table \"%s\"", createdUri.toString()); - throw new ReaperException(errMsg); - } - return existingTable; - } - - /** - * Checks if given table actually exists in the Cassandra cluster. - * @return - */ - private boolean existsInCluster(Cluster cluster, String keyspace, String table) { - String seedHost = cluster.getSeedHosts().iterator().next(); - try { - return jmxFactory.create(seedHost).tableExists(keyspace, table); - } catch (ReaperException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); - return false; - } - } - -} diff --git a/src/main/java/com/spotify/reaper/resources/view/ClusterStatus.java b/src/main/java/com/spotify/reaper/resources/view/ClusterStatus.java index 4de8358e5..fadb77c72 100644 --- a/src/main/java/com/spotify/reaper/resources/view/ClusterStatus.java +++ b/src/main/java/com/spotify/reaper/resources/view/ClusterStatus.java @@ -35,6 +35,9 @@ public class ClusterStatus { @JsonProperty("repair_run_ids") private Collection repairRunIds; + @JsonProperty() + private Collection keyspaces; + public ClusterStatus(Cluster cluster) { this.clusterName = cluster.getName(); this.partitioner = cluster.getPartitioner(); @@ -60,4 +63,8 @@ public Collection getRepairRunIds() { public void setRepairRunIds(Collection repairRunIds) { this.repairRunIds = repairRunIds; } + + public void setKeyspaces(Collection keyspaces) { + this.keyspaces = keyspaces; + } } diff --git a/src/main/java/com/spotify/reaper/resources/view/ColumnFamilyStatus.java b/src/main/java/com/spotify/reaper/resources/view/ColumnFamilyStatus.java deleted file mode 100644 index 91dd7770d..000000000 --- a/src/main/java/com/spotify/reaper/resources/view/ColumnFamilyStatus.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.spotify.reaper.resources.view; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.spotify.reaper.core.ColumnFamily; - -/** - * Contains the data to be shown when replying with column family (table) data. - */ -public class ColumnFamilyStatus { - - @JsonProperty("cluster_name") - private final String clusterName; - - @JsonProperty("keyspace_name") - private final String keyspaceName; - - @JsonProperty("table_name") - private final String tableName; - - @JsonProperty("segment_count") - private final int segmentCount; - - @JsonProperty("snapshot_repair") - private final boolean snapshotRepair; - - public ColumnFamilyStatus(ColumnFamily columnFamily) { - this.clusterName = columnFamily.getClusterName(); - this.keyspaceName = columnFamily.getKeyspaceName(); - this.tableName = columnFamily.getName(); - this.segmentCount = columnFamily.getSegmentCount(); - this.snapshotRepair = columnFamily.isSnapshotRepair(); - } - -} diff --git a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java index 89a724b04..638c10560 100644 --- a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java +++ b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java @@ -15,12 +15,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; - +import com.spotify.reaper.core.RepairUnit; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import java.util.Collection; + /** * Contains the data to be shown when querying repair run status. */ @@ -28,18 +29,20 @@ public class RepairRunStatus { public static final String TIMESTAMP_ISO8601_YODA_TEMPLATE = "YYYY-MM-dd'T'HH:mm:ss'Z'"; + @JsonProperty private final String cause; + @JsonProperty private final String owner; - @JsonProperty("id") + @JsonProperty private final long id; @JsonProperty("cluster_name") private final String clusterName; - @JsonProperty("table_name") - private final String columnFamilyName; + @JsonProperty("column_families") + private final Collection columnFamilies; @JsonProperty("keyspace_name") private final String keyspaceName; @@ -56,6 +59,7 @@ public class RepairRunStatus { @JsonIgnore private final DateTime endTime; + @JsonProperty private final double intensity; @JsonProperty("segment_count") @@ -88,6 +92,9 @@ public String getEndTimeISO8601() { return endTime.toDateTime(DateTimeZone.UTC).toString(TIMESTAMP_ISO8601_YODA_TEMPLATE); } + @JsonProperty("snapshot_repair") + private final boolean snapshotRepair; + public void setSegmentsRepaired(int segmentsRepaired) { this.segmentsRepaired = segmentsRepaired; } @@ -100,18 +107,19 @@ public String getRunState() { return this.runState; } - public RepairRunStatus(RepairRun repairRun, ColumnFamily columnFamily) { + public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) { this.id = repairRun.getId(); this.cause = repairRun.getCause(); this.owner = repairRun.getOwner(); this.clusterName = repairRun.getClusterName(); - this.columnFamilyName = columnFamily.getName(); - this.keyspaceName = columnFamily.getKeyspaceName(); + this.columnFamilies = repairUnit.getColumnFamilies(); + this.keyspaceName = repairUnit.getKeyspaceName(); this.runState = repairRun.getRunState().name(); this.creationTime = repairRun.getCreationTime(); this.startTime = repairRun.getStartTime(); this.endTime = repairRun.getEndTime(); this.intensity = repairRun.getIntensity(); - this.segmentCount = columnFamily.getSegmentCount(); + this.segmentCount = repairUnit.getSegmentCount(); + this.snapshotRepair = repairUnit.isSnapshotRepair(); } } diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index fdfaf5c90..8466beed1 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -17,9 +17,10 @@ import com.google.common.base.Optional; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; @@ -91,8 +92,8 @@ public static void startNewRepairRun(IStorage storage, long repairRunID, this.storage = storage; this.repairRunId = repairRunId; this.jmxConnectionFactory = jmxConnectionFactory; - jmxConnection = this.jmxConnectionFactory.connectAny(Optional.absent(), - storage.getCluster(storage.getRepairRun(repairRunId).getClusterName()).getSeedHosts()); + jmxConnection = this.jmxConnectionFactory.connectAny( + storage.getCluster(storage.getRepairRun(repairRunId).getClusterName())); } /** @@ -167,9 +168,9 @@ private void startNextSegment() { * @param tokenRange token range of the segment to repair. */ private void repairSegment(long segmentId, RingRange tokenRange) { - ColumnFamily columnFamily = - storage.getColumnFamily(storage.getRepairRun(repairRunId).getColumnFamilyId()); - String keyspace = columnFamily.getKeyspaceName(); + RepairUnit repairUnit = + storage.getColumnFamily(storage.getRepairRun(repairRunId).getRepairUnitId()); + String keyspace = repairUnit.getKeyspaceName(); if (!jmxConnection.isConnectionAlive()) { try { diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 8d3cb9c82..3c8b87673 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -16,9 +16,10 @@ import com.google.common.base.Optional; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; @@ -77,9 +78,9 @@ private void runRepair(Collection potentialCoordinators, final RepairSegment segment = storage.getRepairSegment(segmentId); try (JmxProxy jmxConnection = jmxConnectionFactory .connectAny(Optional.of(this), potentialCoordinators)) { - ColumnFamily columnFamily = - storage.getColumnFamily(segment.getColumnFamilyId()); - String keyspace = columnFamily.getKeyspaceName(); + RepairUnit repairUnit = + storage.getColumnFamily(segment.getRepairUnitId()); + String keyspace = repairUnit.getKeyspaceName(); if (!canRepair(jmxConnection, segment)) { postpone(segment); @@ -89,7 +90,7 @@ private void runRepair(Collection potentialCoordinators, synchronized (condition) { commandId = jmxConnection .triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace, - columnFamily.getName()); + repairUnit.getName()); LOG.debug("Triggered repair with command id {}", commandId); storage.updateRepairSegment(segment.with() .state(RepairSegment.State.RUNNING) diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index bbafa6f7c..af3f138c9 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -14,7 +14,7 @@ package com.spotify.reaper.storage; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; @@ -48,11 +48,11 @@ public interface IStorage { Collection getAllRunningRepairRuns(); - ColumnFamily addColumnFamily(ColumnFamily.Builder newTable); + RepairUnit addRepairUnit(RepairUnit.Builder newRepairUnit); - ColumnFamily getColumnFamily(long id); + RepairUnit getRepairUnit(long id); - ColumnFamily getColumnFamily(String cluster, String keyspace, String table); + RepairUnit getRepairUnit(String cluster, String keyspace, String table); void addRepairSegments(Collection newSegments, long runId); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 9cb012cd7..f2efab8b2 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -16,7 +16,7 @@ import com.google.common.collect.Maps; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; @@ -43,8 +43,8 @@ public class MemoryStorage implements IStorage { private ConcurrentMap clusters = Maps.newConcurrentMap(); private ConcurrentMap repairRuns = Maps.newConcurrentMap(); - private ConcurrentMap columnFamilies = Maps.newConcurrentMap(); - private ConcurrentMap columnFamiliesByName = Maps.newConcurrentMap(); + private ConcurrentMap columnFamilies = Maps.newConcurrentMap(); + private ConcurrentMap columnFamiliesByName = Maps.newConcurrentMap(); private ConcurrentMap repairSegments = Maps.newConcurrentMap(); private ConcurrentMap> repairSegmentsByRunId = Maps.newConcurrentMap(); @@ -152,28 +152,28 @@ public Collection getAllRunningRepairRuns() { } @Override - public ColumnFamily addColumnFamily(ColumnFamily.Builder columnFamily) { - ColumnFamily existing = + public RepairUnit addColumnFamily(RepairUnit.Builder columnFamily) { + RepairUnit existing = getColumnFamily(columnFamily.clusterName, columnFamily.keyspaceName, columnFamily.name); if (existing == null) { - ColumnFamily newColumnFamily = columnFamily.build(COLUMN_FAMILY_ID.incrementAndGet()); - columnFamilies.put(newColumnFamily.getId(), newColumnFamily); - TableName tableName = new TableName(newColumnFamily.getClusterName(), - newColumnFamily.getKeyspaceName(), newColumnFamily.getName()); - columnFamiliesByName.put(tableName, newColumnFamily); - return newColumnFamily; + RepairUnit newRepairUnit = columnFamily.build(COLUMN_FAMILY_ID.incrementAndGet()); + columnFamilies.put(newRepairUnit.getId(), newRepairUnit); + TableName tableName = new TableName(newRepairUnit.getClusterName(), + newRepairUnit.getKeyspaceName(), newRepairUnit.getName()); + columnFamiliesByName.put(tableName, newRepairUnit); + return newRepairUnit; } else { return null; } } @Override - public ColumnFamily getColumnFamily(long id) { + public RepairUnit getColumnFamily(long id) { return columnFamilies.get(id); } @Override - public ColumnFamily getColumnFamily(String cluster, String keyspace, String table) { + public RepairUnit getColumnFamily(String cluster, String keyspace, String table) { return columnFamiliesByName.get(new TableName(cluster, keyspace, table)); } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index 4a9ccedde..efb80276b 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -16,7 +16,7 @@ import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; @@ -178,8 +178,8 @@ public boolean updateRepairRun(RepairRun repairRun) { } @Override - public ColumnFamily addColumnFamily(ColumnFamily.Builder newColumnFamily) { - ColumnFamily result; + public RepairUnit addColumnFamily(RepairUnit.Builder newColumnFamily) { + RepairUnit result; try (Handle h = jdbi.open()) { long insertedId = getPostgresStorage(h).insertColumnFamily(newColumnFamily.build(-1)); result = newColumnFamily.build(insertedId); @@ -188,8 +188,8 @@ public ColumnFamily addColumnFamily(ColumnFamily.Builder newColumnFamily) { } @Override - public ColumnFamily getColumnFamily(long id) { - ColumnFamily result; + public RepairUnit getColumnFamily(long id) { + RepairUnit result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getColumnFamily(id); } @@ -197,8 +197,8 @@ public ColumnFamily getColumnFamily(long id) { } @Override - public ColumnFamily getColumnFamily(String clusterName, String keyspaceName, String tableName) { - ColumnFamily result; + public RepairUnit getColumnFamily(String clusterName, String keyspaceName, String tableName) { + RepairUnit result; try (Handle h = jdbi.open()) { IStoragePostgreSQL storage = getPostgresStorage(h); result = storage.getColumnFamilyByClusterAndName(clusterName, keyspaceName, tableName); diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java index e41aa4d57..1fdcee194 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java @@ -13,7 +13,7 @@ */ package com.spotify.reaper.storage.postgresql; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.ResultSetMapper; @@ -21,10 +21,10 @@ import java.sql.ResultSet; import java.sql.SQLException; -public class ColumnFamilyMapper implements ResultSetMapper { +public class ColumnFamilyMapper implements ResultSetMapper { - public ColumnFamily map(int index, ResultSet r, StatementContext ctx) throws SQLException { - ColumnFamily.Builder builder = new ColumnFamily.Builder(r.getString("cluster_name"), + public RepairUnit map(int index, ResultSet r, StatementContext ctx) throws SQLException { + RepairUnit.Builder builder = new RepairUnit.Builder(r.getString("cluster_name"), r.getString("keyspace_name"), r.getString("name"), r.getInt("segment_count"), r.getBoolean("snapshot_repair")); return builder.build(r.getLong("id")); diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java index 33161396e..c3febb4fa 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -14,7 +14,7 @@ package com.spotify.reaper.storage.postgresql; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; @@ -115,7 +115,7 @@ public interface IStoragePostgreSQL { @SqlUpdate(SQL_UPDATE_REPAIR_RUN) public int updateRepairRun(@BindBean RepairRun newRepairRun); - // ColumnFamily + // RepairUnit // static final String SQL_COLUMN_FAMILY_ALL_FIELDS_NO_ID = "cluster_name, keyspace_name, name, segment_count, snapshot_repair"; @@ -135,17 +135,17 @@ public interface IStoragePostgreSQL { @SqlQuery(SQL_GET_COLUMN_FAMILY) @Mapper(ColumnFamilyMapper.class) - public ColumnFamily getColumnFamily(@Bind("id") long columnFamilyId); + public RepairUnit getColumnFamily(@Bind("id") long columnFamilyId); @SqlQuery(SQL_GET_COLUMN_FAMILY_BY_CLUSTER_AND_NAME) @Mapper(ColumnFamilyMapper.class) - public ColumnFamily getColumnFamilyByClusterAndName(@Bind("clusterName") String clusterName, + public RepairUnit getColumnFamilyByClusterAndName(@Bind("clusterName") String clusterName, @Bind("keyspaceName") String keyspaceName, @Bind("name") String tableName); @SqlUpdate(SQL_INSERT_COLUMN_FAMILY) @GetGeneratedKeys - public long insertColumnFamily(@BindBean ColumnFamily newColumnFamily); + public long insertColumnFamily(@BindBean RepairUnit newRepairUnit); // RepairSegment // diff --git a/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java b/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java index 635b128be..4af8ef4b0 100644 --- a/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java +++ b/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java @@ -7,14 +7,13 @@ import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.service.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; import org.junit.Before; import org.junit.Test; import java.net.URI; -import java.util.Collection; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; diff --git a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java b/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java index 78a07cab5..9034ed724 100644 --- a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java +++ b/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java @@ -9,11 +9,11 @@ import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.resources.view.RepairRunStatus; -import com.spotify.reaper.service.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RepairRunner; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.storage.IStorage; @@ -99,7 +99,7 @@ public JmxProxy create(Optional handler, String host) } }; - ColumnFamily.Builder cfBuilder = new ColumnFamily.Builder(CLUSTER_NAME.get(), KEYSPACE.get(), + RepairUnit.Builder cfBuilder = new RepairUnit.Builder(CLUSTER_NAME.get(), KEYSPACE.get(), TABLE.get(), SEGMENT_CNT, IS_SNAPSHOT_REPAIR); storage.addColumnFamily(cfBuilder); } diff --git a/src/test/java/com/spotify/reaper/resources/TableResourceTest.java b/src/test/java/com/spotify/reaper/resources/TableResourceTest.java index b18ed1c56..7ecb45587 100644 --- a/src/test/java/com/spotify/reaper/resources/TableResourceTest.java +++ b/src/test/java/com/spotify/reaper/resources/TableResourceTest.java @@ -9,9 +9,9 @@ import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; -import com.spotify.reaper.resources.view.ColumnFamilyStatus; -import com.spotify.reaper.service.JmxConnectionFactory; +import com.spotify.reaper.core.RepairUnit; +import com.spotify.reaper.resources.view.RepairUnitStatus; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; @@ -93,13 +93,13 @@ public void testAddTable() throws Exception { Response response = resource.addTable(uriInfo, CLUSTER_NAME, KEYSPACE, TABLE); assertEquals(201, response.getStatus()); - assertTrue(response.getEntity() instanceof ColumnFamilyStatus); + assertTrue(response.getEntity() instanceof RepairUnitStatus); assertEquals(1, storage.getClusters().size()); assertEquals(0, storage.getRepairRunsForCluster(CLUSTER_NAME.get()).size()); assertEquals(0, storage.getRepairRunIdsForCluster(CLUSTER_NAME.get()).size()); - ColumnFamily cf = storage.getColumnFamily(CLUSTER_NAME.get(), KEYSPACE.get(), TABLE.get()); + RepairUnit cf = storage.getColumnFamily(CLUSTER_NAME.get(), KEYSPACE.get(), TABLE.get()); assertNotNull("Failed fetch table info from storage", cf); assertEquals(SEGMENT_CNT, cf.getSegmentCount()); assertFalse(cf.isSnapshotRepair()); diff --git a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java index 99070942c..ee6555032 100644 --- a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java @@ -17,10 +17,11 @@ import com.google.common.collect.Lists; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; @@ -114,8 +115,8 @@ public void testHangingRepair() throws ReaperException, InterruptedException { final IStorage storage = new MemoryStorage(); storage.addCluster(new Cluster(CLUSTER_NAME, null, Collections.singleton(null))); - ColumnFamily cf = - storage.addColumnFamily(new ColumnFamily.Builder(CLUSTER_NAME, KS_NAME, CF_NAME, 1, false)); + RepairUnit cf = + storage.addColumnFamily(new RepairUnit.Builder(CLUSTER_NAME, KS_NAME, CF_NAME, 1, false)); DateTimeUtils.setCurrentMillisFixed(TIME_RUN); RepairRun run = storage.addRepairRun( new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY)); @@ -206,7 +207,7 @@ public void testResumeRepair() throws InterruptedException { storage.addCluster(new Cluster(CLUSTER_NAME, null, Collections.singleton(null))); long cf = - storage.addColumnFamily(new ColumnFamily.Builder(CLUSTER_NAME, KS_NAME, CF_NAME, 1, false)) + storage.addColumnFamily(new RepairUnit.Builder(CLUSTER_NAME, KS_NAME, CF_NAME, 1, false)) .getId(); DateTimeUtils.setCurrentMillisFixed(TIME_RUN); RepairRun run = storage.addRepairRun( diff --git a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java index 90544a71a..d8685b4cc 100644 --- a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java @@ -17,9 +17,10 @@ import com.google.common.collect.Lists; import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; -import com.spotify.reaper.core.ColumnFamily; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; @@ -51,8 +52,8 @@ public class SegmentRunnerTest { @Test public void timeoutTest() throws InterruptedException, ReaperException, ExecutionException { final IStorage storage = new MemoryStorage(); - ColumnFamily cf = - storage.addColumnFamily(new ColumnFamily.Builder("reaper", "reaper", "reaper", 1, false)); + RepairUnit cf = + storage.addColumnFamily(new RepairUnit.Builder("reaper", "reaper", "reaper", 1, false)); RepairRun run = storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); @@ -108,8 +109,8 @@ public void run() { @Test public void successTest() throws InterruptedException, ReaperException, ExecutionException { final IStorage storage = new MemoryStorage(); - ColumnFamily cf = - storage.addColumnFamily(new ColumnFamily.Builder("reaper", "reaper", "reaper", 1, false)); + RepairUnit cf = + storage.addColumnFamily(new RepairUnit.Builder("reaper", "reaper", "reaper", 1, false)); RepairRun run = storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); storage.addRepairSegments(Collections.singleton( @@ -175,8 +176,8 @@ public void run() { @Test public void failureTest() throws InterruptedException, ReaperException, ExecutionException { final IStorage storage = new MemoryStorage(); - ColumnFamily cf = - storage.addColumnFamily(new ColumnFamily.Builder("reaper", "reaper", "reaper", 1, false)); + RepairUnit cf = + storage.addColumnFamily(new RepairUnit.Builder("reaper", "reaper", "reaper", 1, false)); RepairRun run = storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); storage.addRepairSegments(Collections.singleton(