Skip to content

Commit

Permalink
WIP removing column family concept
Browse files Browse the repository at this point in the history
  • Loading branch information
varjoranta committed Jan 21, 2015
1 parent 774a3be commit eca1eaa
Show file tree
Hide file tree
Showing 26 changed files with 344 additions and 519 deletions.
26 changes: 12 additions & 14 deletions src/main/db/reaper_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-- For cleaning up the database, just do first in the following order:
-- DROP TABLE "repair_segment";
-- DROP TABLE "repair_run";
-- DROP TABLE "column_family";
-- DROP TABLE "repair_unit";
-- DROP TABLE "cluster";

CREATE TABLE IF NOT EXISTS "cluster" (
Expand All @@ -19,24 +19,22 @@ CREATE TABLE IF NOT EXISTS "cluster" (
"seed_hosts" TEXT[] NOT NULL
);

CREATE TABLE IF NOT EXISTS "column_family" (
-- Repair unit is basically a keyspace with a set of column families.
-- Cassandra supports repairing multiple column families in one go.
--
CREATE TABLE IF NOT EXISTS "repair_unit" (
"id" SERIAL PRIMARY KEY,
"cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"),
"keyspace_name" TEXT NOT NULL,
"name" TEXT NOT NULL,
"column_families" TEXT[] NOT NULL,
"segment_count" INT NOT NULL,
"snapshot_repair" BOOLEAN NOT NULL
);

-- Preventing duplicate column families within a same cluster and keyspace
-- with the following index:
CREATE UNIQUE INDEX column_family_no_duplicates_idx
ON "column_family" ("cluster_name", "keyspace_name", "name");

CREATE TABLE IF NOT EXISTS "repair_run" (
"id" SERIAL PRIMARY KEY,
"cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"),
"column_family_id" INT NOT NULL REFERENCES "column_family" ("id"),
"repair_unit_id" INT NOT NULL REFERENCES "repair_unit" ("id"),
"cause" TEXT NOT NULL,
"owner" TEXT NOT NULL,
-- see (Java) RepairRun.RunState for state values
Expand All @@ -49,7 +47,7 @@ CREATE TABLE IF NOT EXISTS "repair_run" (

CREATE TABLE IF NOT EXISTS "repair_segment" (
"id" SERIAL PRIMARY KEY,
"column_family_id" INT NOT NULL REFERENCES "column_family" ("id"),
"repair_unit_id" INT NOT NULL REFERENCES "repair_unit" ("id"),
"run_id" INT NOT NULL REFERENCES "repair_run" ("id"),
"start_token" NUMERIC(50) NOT NULL,
"end_token" NUMERIC(50) NOT NULL,
Expand All @@ -59,14 +57,14 @@ CREATE TABLE IF NOT EXISTS "repair_segment" (
"end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"fail_count" INT NOT NULL DEFAULT 0
);
CREATE INDEX "repair_segment_run_id_fail_count_start_token_idx"
ON "repair_segment" USING BTREE ("run_id" DESC, "fail_count" ASC, "start_token" ASC);
CREATE INDEX "repair_segment_run_id_fail_count_idx"
ON "repair_segment" USING BTREE ("run_id" ASC, "fail_count" ASC);
CREATE INDEX "repair_segment_state_idx"
ON "repair_segment" USING BTREE ("state");

GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE cluster TO reaper;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE column_family TO reaper;
GRANT USAGE, SELECT ON SEQUENCE column_family_id_seq TO reaper;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_unit TO reaper;
GRANT USAGE, SELECT ON SEQUENCE repair_unit_id_seq TO reaper;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_run TO reaper;
GRANT USAGE, SELECT ON SEQUENCE repair_run_id_seq TO reaper;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE repair_segment TO reaper;
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@
*/
package com.spotify.reaper;

import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.resources.ClusterResource;
import com.spotify.reaper.resources.PingResource;
import com.spotify.reaper.resources.ReaperHealthCheck;
import com.spotify.reaper.resources.RepairRunResource;
import com.spotify.reaper.resources.TableResource;
import com.spotify.reaper.service.JmxConnectionFactory;
import com.spotify.reaper.cassandra.JmxConnectionFactory;
import com.spotify.reaper.service.RepairRunner;
import com.spotify.reaper.service.SegmentRunner;
import com.spotify.reaper.storage.IStorage;
import com.spotify.reaper.storage.MemoryStorage;
import com.spotify.reaper.storage.PostgresStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.spotify.reaper.service;
package com.spotify.reaper.cassandra;

import com.google.common.base.Optional;

import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.cassandra.RepairStatusHandler;
import com.spotify.reaper.core.Cluster;

import java.util.Collection;

Expand All @@ -36,4 +34,9 @@ public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collecti
throws ReaperException {
return create(handler, hosts.iterator().next());
}

public final JmxProxy connectAny(Cluster cluster)
throws ReaperException {
return connectAny(Optional.<RepairStatusHandler>absent(), cluster.getSeedHosts());
}
}
102 changes: 71 additions & 31 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import com.spotify.reaper.ReaperException;
import com.spotify.reaper.service.RingRange;

import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
Expand All @@ -28,26 +26,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.Nullable;
import javax.management.InstanceNotFoundException;
import javax.management.JMX;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.MalformedURLException;
import java.util.*;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -62,10 +50,10 @@ public class JmxProxy implements NotificationListener, Serializable, AutoCloseab
private final JMXConnector jmxConnector;
private final ObjectName ssMbeanName;
private final MBeanServerConnection mbeanServer;
private final CompactionManagerMBean cmProxy;
private final StorageServiceMBean ssProxy;
private final Optional<RepairStatusHandler> repairStatusHandler;
private final String host;
private final CompactionManagerMBean cmProxy;

private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnector jmxConnector,
StorageServiceMBean ssProxy, ObjectName ssMbeanName, MBeanServerConnection mbeanServer,
Expand All @@ -79,13 +67,6 @@ private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXConnecto
this.cmProxy = cmProxy;
}

/**
* Connect to JMX interface on the given host and default JMX port without RepairStatusHandler.
*/
public static JmxProxy connect(String host) throws ReaperException {
return connect(Optional.<RepairStatusHandler>absent(), host);
}

/**
* Connect to JMX interface on the given host and default JMX port.
*
Expand Down Expand Up @@ -128,14 +109,14 @@ public static JmxProxy connect(Optional<RepairStatusHandler> handler, String hos
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
CompactionManagerMBean cmProxy =
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
JmxProxy proxy =
new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName, mbeanServerConn, cmProxy);
JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, ssMbeanName,
mbeanServerConn, cmProxy);
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
LOG.info(String.format("JMX connection to %s properly connected.", host));
return proxy;
} catch (IOException | InstanceNotFoundException e) {
} catch (IOException | InstanceNotFoundException | MalformedObjectNameException e) {
LOG.error("Failed to establish JMX connection");
throw new ReaperException("Failure when establishing JMX connection", e);
}
Expand Down Expand Up @@ -202,6 +183,26 @@ public List<String> getKeyspaces() {
return ssProxy.getKeyspaces();
}

public List<String> getTableNamesForKeyspace(String keyspace) throws ReaperException {
List<String> tableNames = new ArrayList<>();
Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> proxies = null;
try {
proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer);
} catch (IOException | MalformedObjectNameException e) {
e.printStackTrace();
throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX");
}
while (proxies.hasNext()) {
Map.Entry<String, ColumnFamilyStoreMBean> proxyEntry = proxies.next();
String keyspaceName = proxyEntry.getKey();
if (keyspace.equalsIgnoreCase(keyspaceName)) {
ColumnFamilyStoreMBean columnFamilyMBean = proxyEntry.getValue();
tableNames.add(columnFamilyMBean.getColumnFamilyName());
}
}
return tableNames;
}

/**
* @return number of pending compactions on the node this proxy is connected to
*/
Expand Down Expand Up @@ -245,7 +246,7 @@ public boolean tableExists(String ks, String cf) throws ReaperException {

/**
* Triggers a repair of range (beginToken, endToken] for given keyspace and column family.
*
* <p/>
* The repair is triggered by {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairRangeAsync}
* For time being, we don't allow local nor snapshot repairs.
*
Expand All @@ -268,7 +269,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys

/**
* Invoked when the MBean this class listens to publishes an event.
*
* <p/>
* We're only interested in repair-related events. Their format is explained at
* {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync}
* The format is: notification type: "repair" notification
Expand Down Expand Up @@ -320,3 +321,42 @@ public void close() throws ReaperException {
}
}
}

/**
* This code is copied and adjusted from from NodeProbe.java from Cassandra source.
*/
class ColumnFamilyStoreMBeanIterator
implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> {

static Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> getColumnFamilyStoreMBeanProxies(
MBeanServerConnection mbeanServerConn)
throws IOException, MalformedObjectNameException {
return new ColumnFamilyStoreMBeanIterator(mbeanServerConn);
}

private Iterator<ObjectName> resIter;
private MBeanServerConnection mbeanServerConn;

public ColumnFamilyStoreMBeanIterator(MBeanServerConnection mbeanServerConn)
throws MalformedObjectNameException, NullPointerException, IOException {
ObjectName query = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*");
resIter = mbeanServerConn.queryNames(query, null).iterator();
this.mbeanServerConn = mbeanServerConn;
}

public boolean hasNext() {
return resIter.hasNext();
}

public Map.Entry<String, ColumnFamilyStoreMBean> next() {
ObjectName objectName = resIter.next();
String keyspaceName = objectName.getKeyProperty("keyspace");
ColumnFamilyStoreMBean cfsProxy =
JMX.newMBeanProxy(mbeanServerConn, objectName, ColumnFamilyStoreMBean.class);
return new AbstractMap.SimpleImmutableEntry<>(keyspaceName, cfsProxy);
}

public void remove() {
throw new UnsupportedOperationException();
}
}
16 changes: 8 additions & 8 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class RepairRun {
private final String cause;
private final String owner;
private final String clusterName;
private final long columnFamilyId;
private final long repairUnitId;
private final RunState runState;
private final DateTime creationTime;
private final DateTime startTime;
Expand All @@ -37,8 +37,8 @@ public long getId() {
return id;
}

public long getColumnFamilyId() {
return columnFamilyId;
public long getRepairUnitId() {
return repairUnitId;
}

public String getClusterName() {
Expand Down Expand Up @@ -84,7 +84,7 @@ public enum RunState {
private RepairRun(Builder builder, long id) {
this.id = id;
this.clusterName = builder.clusterName;
this.columnFamilyId = builder.columnFamilyId;
this.repairUnitId = builder.repairUnitId;
this.cause = builder.cause;
this.owner = builder.owner;
this.runState = builder.runState;
Expand All @@ -101,7 +101,7 @@ public Builder with() {
public static class Builder {

public final String clusterName;
public final long columnFamilyId;
public final long repairUnitId;
private RunState runState;
private DateTime creationTime;
private double intensity;
Expand All @@ -110,18 +110,18 @@ public static class Builder {
private DateTime startTime;
private DateTime endTime;

public Builder(String clusterName, long columnFamilyId, DateTime creationTime,
public Builder(String clusterName, long repairUnitId, DateTime creationTime,
double intensity) {
this.clusterName = clusterName;
this.columnFamilyId = columnFamilyId;
this.repairUnitId = repairUnitId;
this.runState = RunState.NOT_STARTED;
this.creationTime = creationTime;
this.intensity = intensity;
}

private Builder(RepairRun original) {
clusterName = original.clusterName;
columnFamilyId = original.columnFamilyId;
repairUnitId = original.repairUnitId;
runState = original.runState;
creationTime = original.creationTime;
intensity = original.intensity;
Expand Down
Loading

0 comments on commit eca1eaa

Please sign in to comment.