Skip to content

Commit

Permalink
Merge pull request #9 from varjoranta/master
Browse files Browse the repository at this point in the history
fix storage layer issues, readme, and improve cli
  • Loading branch information
Bj0rnen committed Dec 16, 2014
2 parents ac6df77 + 8c6ea31 commit a9375fd
Show file tree
Hide file tree
Showing 21 changed files with 417 additions and 101 deletions.
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The Reaper service specific configuration values are:
divided by the intensity value. 0.5 means half of the time is spent sleeping, and half running.

* repairRunThreadCount:

The amount of threads to use for handling the Reaper tasks. Have this big enough not to cause
blocking in cause some thread is waiting for I/O, like calling a Cassandra cluster through JMX.

Expand All @@ -76,10 +77,10 @@ REST API

TODO:

GET /ping (com.spotify.reaper.resources.PingResource)
GET /cluster (com.spotify.reaper.resources.ClusterResource)
GET /cluster/{name} (com.spotify.reaper.resources.ClusterResource)
POST /cluster (com.spotify.reaper.resources.ClusterResource)
GET /table/{clusterName}/{keyspace}/{table} (com.spotify.reaper.resources.TableResource)
POST /table (com.spotify.reaper.resources.TableResource)
GET /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource)
GET /ping (com.spotify.reaper.resources.PingResource)
GET /cluster (com.spotify.reaper.resources.ClusterResource)
GET /cluster/{name} (com.spotify.reaper.resources.ClusterResource)
POST /cluster (com.spotify.reaper.resources.ClusterResource)
GET /table/{clusterName}/{keyspace}/{table} (com.spotify.reaper.resources.TableResource)
POST /table (com.spotify.reaper.resources.TableResource)
GET /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource)
93 changes: 60 additions & 33 deletions bin/spreaper
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ import urlparse
USER = getpass.getuser()
DEFAULT_CAUSE = "manual spreaper run"

logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s %(levelname)s - %(message)s')
log_level = logging.WARN
if "-v" in sys.argv or "--verbose" in sys.argv:
log_level = logging.DEBUG
logging.basicConfig(level=log_level, format='%(asctime)s %(name)s %(levelname)s - %(message)s')

log = logging.getLogger("spreaper")
log.debug("logging initialized, username running this: {0}".format(USER))
log.debug("logging initialized, the user running this: {0}".format(USER))


import json


def _global_arguments(parser):
Expand All @@ -37,6 +42,8 @@ def _global_arguments(parser):
help="port of the Reaper service [8080]")
parser.add_argument("--reaper-use-ssl", default=False, action='store_true',
help="use https to call Reaper [False]")
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")


def _triggering_arguments(parser):
Expand Down Expand Up @@ -93,8 +100,13 @@ The available commands are:
'spreaper list [<args>]')
reaper = self.init_reaper(args)
print "listing all available clusters in the Reaper"
reply = reaper.call_list_clusters()
print "got reply:\n{0}".format(reply)
cluster_names = reaper.call_list_clusters()
if cluster_names:
print "found {0} clusters:".format(len(cluster_names))
for cluster_name in cluster_names:
print cluster_name
else:
print "no clusters found"

def register(self):
def arguments_for_register(parser):
Expand Down Expand Up @@ -127,24 +139,29 @@ The available commands are:
if args.cluster_name:
print "triggering a repair on Cassandra cluster "\
"with name: {0}".format(args.cluster_name)
reaper.call_register(args.owner, cluster_name=args.cluster_name,
keyspace=args.keyspace, table=args.table, cause=args.cause,
repair_immediately=True)
reply = reaper.call_register(args.owner, cluster_name=args.cluster_name,
keyspace=args.keyspace, table=args.table,
cause=args.cause, repair_immediately=True)
else:
print "triggering a repair on Cassandra cluster "\
"with seed host: {0}".format(args.seed_host)
reaper.call_register(args.owner, seed_host=args.seed_host,
keyspace=args.keyspace, table=args.table, cause=args.cause,
repair_immediately=True)
print "repair triggering succeeded"
reply = reaper.call_register(args.owner, seed_host=args.seed_host,
keyspace=args.keyspace, table=args.table,
cause=args.cause, repair_immediately=True)
print "repair triggering succeeded. reply:"
print reply

def status(self):
args = _parse_arguments('Show status of a Cassandra cluster or a repair run',
'spreaper status [<args>]')
def arguments_for_status(parser):
parser.add_argument("repair_run_id",
help="the repair run id, which you get from the reply of calling "
"the repair command")
args = _parse_arguments('Show status of a repair run',
'spreaper status repair_run_id [<args>]')
reaper = self.init_reaper(args)
print "not implemented, just calling ping for testing purposes..."
reply = reaper.call_ping()
print "got reply: {0}".format(reply)
print "repair run with id {0}".format(args.repair_run_id)
repair_run = reaper.call_status(args.repair_run_id)
print json.dumps(repair_run, indent=2)


class ReaperCaller(object):
Expand All @@ -154,15 +171,27 @@ class ReaperCaller(object):
self.base_url = "{0}://{1}:{2}".format(use_ssl and 'https' or 'http',
str(host_name), int(host_port))

def call_list_clusters(self):
the_url = urlparse.urljoin(self.base_url, "cluster")
log.info("making HTTP GET to %s", the_url)
r = requests.get(the_url)
log.info("HTTP GET return code %s with content of length %s",
r.status_code, len(str(r.text)))
def _http_req(self, http_method, the_url, params=None):
http_method = http_method.upper()
if params is None:
params = {}
log.info("making HTTP %s to %s", http_method, the_url)
if http_method == 'GET':
r = requests.get(the_url, params=params)
elif http_method == 'POST':
r = requests.post(the_url, params=params)
else:
assert False, "invalid HTTP method: {0}".format(http_method)
log.info("HTTP %s return code %s with content of length %s",
http_method, r.status_code, len(str(r.text)))
r.raise_for_status()
return r.text

def call_list_clusters(self):
the_url = urlparse.urljoin(self.base_url, "cluster")
reply = self._http_req("GET", the_url)
return json.loads(reply)

def call_register(self, owner, seed_host=None, cluster_name=None, keyspace=None, table=None,
cause=None, repair_immediately=False):
if not seed_host and not cluster_name:
Expand Down Expand Up @@ -190,19 +219,17 @@ class ReaperCaller(object):
if repair_immediately:
params['startRepair'] = 'true'

log.info("making HTTP POST to %s with params %s", the_url, params)
r = requests.post(the_url, params=params)
log.info("HTTP POST return code %s with content: %s", r.status_code, r.text)
r.raise_for_status()
return r.text
reply = self._http_req("POST", the_url, params=params)
return json.loads(reply)

def call_status(self, repair_run_id):
the_url = urlparse.urljoin(self.base_url, "repair_run", repair_run_id)
reply = self._http_req("GET", the_url)
return json.loads(reply)

def call_ping(self):
the_url = urlparse.urljoin(self.base_url, "ping")
log.info("making HTTP GET to %s", the_url)
r = requests.get(the_url)
log.info("HTTP GET return code %s with content: %s", r.status_code, r.text)
r.raise_for_status()
return r.text
return self._http_req("GET", the_url)


if __name__ == '__main__':
Expand Down
10 changes: 6 additions & 4 deletions src/main/db/reaper_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ CREATE UNIQUE INDEX column_family_no_duplicates_idx

CREATE TABLE IF NOT EXISTS "repair_run" (
"id" SERIAL PRIMARY KEY,
"cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"),
"column_family_id" INT NOT NULL REFERENCES "column_family" ("id"),
"cause" TEXT NOT NULL,
"owner" TEXT NOT NULL,
-- see RepairRun.RunState for state values
-- see (Java) RepairRun.RunState for state values
"state" TEXT NOT NULL,
"creation_time" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
"start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
Expand All @@ -43,9 +45,9 @@ CREATE TABLE IF NOT EXISTS "repair_segment" (
"id" SERIAL PRIMARY KEY,
"column_family_id" INT NOT NULL REFERENCES "column_family" ("id"),
"run_id" INT NOT NULL REFERENCES "repair_run" ("id"),
"start_token" BIGINT NOT NULL,
"end_token" BIGINT NOT NULL,
-- see RepairSegment.State for state values
"start_token" NUMERIC(50) NOT NULL,
"end_token" NUMERIC(50) NOT NULL,
-- see (Java) RepairSegment.State for state values
"state" SMALLINT NOT NULL,
"start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ public String getName() {
return "cassandra-reaper";
}

/**
* Before a Dropwizard application can provide the command-line interface, parse a configuration
* file, or run as a server, it must first go through a bootstrapping phase. You can add Bundles,
* Commands, or register Jackson modules to allow you to include custom types as part of your
* configuration class.
*/
@Override
public void initialize(Bootstrap<ReaperApplicationConfiguration> bootstrap) {
LOG.debug("ReaperApplication.initialize called");
// nothing to initialize
}

@Override
public void run(ReaperApplicationConfiguration config,
Environment environment) throws ReaperException {
checkConfiguration(config);

LOG.info("initializing runner thread pool with {} threads", config.getRepairRunThreadCount());
RepairRunner.initializeThreadPool(config.getRepairRunThreadCount());
Expand Down Expand Up @@ -95,4 +102,10 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config,
return storage;
}

private void checkConfiguration(ReaperApplicationConfiguration config) throws ReaperException {
LOG.debug("repairIntensity: " + config.getRepairIntensity());
LOG.debug("repairRunThreadCount: " + config.getRepairRunThreadCount());
LOG.debug("segmentCount: " + config.getSegmentCount());
LOG.debug("snapshotRepair: " + config.getSnapshotRepair());
}
}
39 changes: 31 additions & 8 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class RepairRun {

private final String cause;
private final String owner;
private final String clusterName;
private final long columnFamilyId;
private final RunState runState;
private final DateTime creationTime;
private final DateTime startTime;
Expand All @@ -41,6 +43,14 @@ public long getId() {
return id;
}

public long getColumnFamilyId() {
return columnFamilyId;
}

public String getClusterName() {
return clusterName;
}

public String getCause() {
return cause;
}
Expand All @@ -49,7 +59,7 @@ public String getOwner() {
return owner;
}

public RunState getState() {
public RunState getRunState() {
return runState;
}

Expand All @@ -60,7 +70,9 @@ public DateTime getCreationTime() {

@JsonProperty("creationTime")
public String getCreationTimeISO8601() {
if (creationTime == null) return null;
if (creationTime == null) {
return null;
}
return creationTime.toDateTime(DateTimeZone.UTC).toString(
"YYYY-MM-dd'T'HH:mm:ss'Z'");
}
Expand All @@ -72,7 +84,9 @@ public DateTime getStartTime() {

@JsonProperty("startTime")
public String getStartTimeISO8601() {
if (startTime == null) return null;
if (startTime == null) {
return null;
}
return startTime.toDateTime(DateTimeZone.UTC).toString(
"YYYY-MM-dd'T'HH:mm:ss'Z'");
}
Expand All @@ -84,7 +98,9 @@ public DateTime getEndTime() {

@JsonProperty("endTime")
public String getEndTimeISO8601() {
if (endTime == null) return null;
if (endTime == null) {
return null;
}
return endTime.toDateTime(DateTimeZone.UTC).toString(
"YYYY-MM-dd'T'HH:mm:ss'Z'");
}
Expand All @@ -103,8 +119,8 @@ public int getCompletedSegments() {

public static RepairRun getCopy(RepairRun repairRun, RunState newState,
DateTime startTime, DateTime endTime, int completedSegments) {
return new RepairRun.Builder(newState,
repairRun.getCreationTime(), repairRun.getIntensity(),
return new RepairRun.Builder(repairRun.getClusterName(), repairRun.getColumnFamilyId(),
newState, repairRun.getCreationTime(), repairRun.getIntensity(),
repairRun.getTotalSegments(), completedSegments)
.cause(repairRun.getCause())
.owner(repairRun.getOwner())
Expand All @@ -123,6 +139,8 @@ public enum RunState {

private RepairRun(Builder builder, long id) {
this.id = id;
this.clusterName = builder.clusterName;
this.columnFamilyId = builder.columnFamilyId;
this.cause = builder.cause;
this.owner = builder.owner;
this.runState = builder.runState;
Expand All @@ -136,6 +154,8 @@ private RepairRun(Builder builder, long id) {

public static class Builder {

public final String clusterName;
public final long columnFamilyId;
public final RunState runState;
public final DateTime creationTime;
public final double intensity;
Expand All @@ -146,8 +166,11 @@ public static class Builder {
private DateTime startTime;
private DateTime endTime;

public Builder(RunState runState, DateTime creationTime,
double intensity, int totalSegments, int completedSegments) {
public Builder(String clusterName, long columnFamilyId, RunState runState,
DateTime creationTime, double intensity, int totalSegments,
int completedSegments) {
this.clusterName = clusterName;
this.columnFamilyId = columnFamilyId;
this.runState = runState;
this.creationTime = creationTime;
this.intensity = intensity;
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public RingRange getTokenRange() {
return tokenRange;
}

public BigInteger getStartToken() {
return tokenRange.getStart();
}

public BigInteger getEndToken() {
return tokenRange.getEnd();
}

public State getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -57,7 +58,11 @@ public ClusterResource(IStorage storage) {
public Response getClusterList() {
LOG.info("get cluster list called");
Collection<Cluster> clusters = storage.getClusters();
return Response.ok().entity(clusters).build();
List<String> clusterNames = new ArrayList<>();
for (Cluster cluster : clusters) {
clusterNames.add(cluster.getName());
}
return Response.ok().entity(clusterNames).build();
}

@GET
Expand Down
Loading

0 comments on commit a9375fd

Please sign in to comment.