From d904d1c84c285eb1c9e0339b7b908ce2f56b0216 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 16 Dec 2014 17:03:32 +0100 Subject: [PATCH 1/3] fix some readme formatting issues --- README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2ce1a9b3f..c89eec0e3 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ The Reaper service specific configuration values are: divided by the intensity value. 0.5 means half of the time is spent sleeping, and half running. * repairRunThreadCount: + The amount of threads to use for handling the Reaper tasks. Have this big enough not to cause blocking in cause some thread is waiting for I/O, like calling a Cassandra cluster through JMX. @@ -76,10 +77,10 @@ REST API TODO: -GET /ping (com.spotify.reaper.resources.PingResource) -GET /cluster (com.spotify.reaper.resources.ClusterResource) -GET /cluster/{name} (com.spotify.reaper.resources.ClusterResource) -POST /cluster (com.spotify.reaper.resources.ClusterResource) -GET /table/{clusterName}/{keyspace}/{table} (com.spotify.reaper.resources.TableResource) -POST /table (com.spotify.reaper.resources.TableResource) -GET /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource) + GET /ping (com.spotify.reaper.resources.PingResource) + GET /cluster (com.spotify.reaper.resources.ClusterResource) + GET /cluster/{name} (com.spotify.reaper.resources.ClusterResource) + POST /cluster (com.spotify.reaper.resources.ClusterResource) + GET /table/{clusterName}/{keyspace}/{table} (com.spotify.reaper.resources.TableResource) + POST /table (com.spotify.reaper.resources.TableResource) + GET /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource) From 66ea9d606b59a8cbe95c76d43d7b73e7a4ba261a Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 16 Dec 2014 17:03:59 +0100 Subject: [PATCH 2/3] improving the spreaper CLI --- bin/spreaper | 93 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/bin/spreaper b/bin/spreaper index a267a73ed..a64da799f 100644 --- a/bin/spreaper +++ b/bin/spreaper @@ -23,11 +23,16 @@ import urlparse USER = getpass.getuser() DEFAULT_CAUSE = "manual spreaper run" -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(name)s %(levelname)s - %(message)s') +log_level = logging.WARN +if "-v" in sys.argv or "--verbose" in sys.argv: + log_level = logging.DEBUG +logging.basicConfig(level=log_level, format='%(asctime)s %(name)s %(levelname)s - %(message)s') log = logging.getLogger("spreaper") -log.debug("logging initialized, username running this: {0}".format(USER)) +log.debug("logging initialized, the user running this: {0}".format(USER)) + + +import json def _global_arguments(parser): @@ -37,6 +42,8 @@ def _global_arguments(parser): help="port of the Reaper service [8080]") parser.add_argument("--reaper-use-ssl", default=False, action='store_true', help="use https to call Reaper [False]") + parser.add_argument("-v", "--verbose", help="increase output verbosity", + action="store_true") def _triggering_arguments(parser): @@ -93,8 +100,13 @@ The available commands are: 'spreaper list []') reaper = self.init_reaper(args) print "listing all available clusters in the Reaper" - reply = reaper.call_list_clusters() - print "got reply:\n{0}".format(reply) + cluster_names = reaper.call_list_clusters() + if cluster_names: + print "found {0} clusters:".format(len(cluster_names)) + for cluster_name in cluster_names: + print cluster_name + else: + print "no clusters found" def register(self): def arguments_for_register(parser): @@ -127,24 +139,29 @@ The available commands are: if args.cluster_name: print "triggering a repair on Cassandra cluster "\ "with name: {0}".format(args.cluster_name) - reaper.call_register(args.owner, cluster_name=args.cluster_name, - keyspace=args.keyspace, table=args.table, cause=args.cause, - repair_immediately=True) + reply = reaper.call_register(args.owner, cluster_name=args.cluster_name, + keyspace=args.keyspace, table=args.table, + cause=args.cause, repair_immediately=True) else: print "triggering a repair on Cassandra cluster "\ "with seed host: {0}".format(args.seed_host) - reaper.call_register(args.owner, seed_host=args.seed_host, - keyspace=args.keyspace, table=args.table, cause=args.cause, - repair_immediately=True) - print "repair triggering succeeded" + reply = reaper.call_register(args.owner, seed_host=args.seed_host, + keyspace=args.keyspace, table=args.table, + cause=args.cause, repair_immediately=True) + print "repair triggering succeeded. reply:" + print reply def status(self): - args = _parse_arguments('Show status of a Cassandra cluster or a repair run', - 'spreaper status []') + def arguments_for_status(parser): + parser.add_argument("repair_run_id", + help="the repair run id, which you get from the reply of calling " + "the repair command") + args = _parse_arguments('Show status of a repair run', + 'spreaper status repair_run_id []') reaper = self.init_reaper(args) - print "not implemented, just calling ping for testing purposes..." - reply = reaper.call_ping() - print "got reply: {0}".format(reply) + print "repair run with id {0}".format(args.repair_run_id) + repair_run = reaper.call_status(args.repair_run_id) + print json.dumps(repair_run, indent=2) class ReaperCaller(object): @@ -154,15 +171,27 @@ class ReaperCaller(object): self.base_url = "{0}://{1}:{2}".format(use_ssl and 'https' or 'http', str(host_name), int(host_port)) - def call_list_clusters(self): - the_url = urlparse.urljoin(self.base_url, "cluster") - log.info("making HTTP GET to %s", the_url) - r = requests.get(the_url) - log.info("HTTP GET return code %s with content of length %s", - r.status_code, len(str(r.text))) + def _http_req(self, http_method, the_url, params=None): + http_method = http_method.upper() + if params is None: + params = {} + log.info("making HTTP %s to %s", http_method, the_url) + if http_method == 'GET': + r = requests.get(the_url, params=params) + elif http_method == 'POST': + r = requests.post(the_url, params=params) + else: + assert False, "invalid HTTP method: {0}".format(http_method) + log.info("HTTP %s return code %s with content of length %s", + http_method, r.status_code, len(str(r.text))) r.raise_for_status() return r.text + def call_list_clusters(self): + the_url = urlparse.urljoin(self.base_url, "cluster") + reply = self._http_req("GET", the_url) + return json.loads(reply) + def call_register(self, owner, seed_host=None, cluster_name=None, keyspace=None, table=None, cause=None, repair_immediately=False): if not seed_host and not cluster_name: @@ -190,19 +219,17 @@ class ReaperCaller(object): if repair_immediately: params['startRepair'] = 'true' - log.info("making HTTP POST to %s with params %s", the_url, params) - r = requests.post(the_url, params=params) - log.info("HTTP POST return code %s with content: %s", r.status_code, r.text) - r.raise_for_status() - return r.text + reply = self._http_req("POST", the_url, params=params) + return json.loads(reply) + + def call_status(self, repair_run_id): + the_url = urlparse.urljoin(self.base_url, "repair_run", repair_run_id) + reply = self._http_req("GET", the_url) + return json.loads(reply) def call_ping(self): the_url = urlparse.urljoin(self.base_url, "ping") - log.info("making HTTP GET to %s", the_url) - r = requests.get(the_url) - log.info("HTTP GET return code %s with content: %s", r.status_code, r.text) - r.raise_for_status() - return r.text + return self._http_req("GET", the_url) if __name__ == '__main__': From 8c6ea319a66f22ed8cc044e77a11d2cbf65c9671 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 16 Dec 2014 17:06:09 +0100 Subject: [PATCH 3/3] fix database storage layer related issues * add method for listing repair runs for a cluster --- src/main/db/reaper_db.sql | 10 ++-- .../com/spotify/reaper/ReaperApplication.java | 15 ++++- .../com/spotify/reaper/core/RepairRun.java | 39 ++++++++++--- .../spotify/reaper/core/RepairSegment.java | 8 +++ .../reaper/resources/ClusterResource.java | 7 ++- .../reaper/resources/RepairRunResource.java | 16 +++++ .../reaper/resources/TableResource.java | 7 ++- .../spotify/reaper/service/RepairRunner.java | 20 +++---- .../com/spotify/reaper/storage/IStorage.java | 5 +- .../spotify/reaper/storage/MemoryStorage.java | 15 ++++- .../reaper/storage/PostgresStorage.java | 58 +++++++++++++------ .../postgresql/BigIntegerArgumentFactory.java | 36 ++++++++++++ .../postgresql/ColumnFamilyMapper.java | 13 +++++ .../postgresql/IStoragePostgreSQL.java | 37 +++++++++--- .../PostgresArrayArgumentFactory.java | 15 ++++- .../storage/postgresql/RepairRunMapper.java | 17 +++++- .../postgresql/RepairSegmentMapper.java | 13 +++++ .../postgresql/RunStateArgumentFactory.java | 46 +++++++++++++++ .../postgresql/StateArgumentFactory.java | 33 +++++++++++ 19 files changed, 349 insertions(+), 61 deletions(-) create mode 100644 src/main/java/com/spotify/reaper/storage/postgresql/BigIntegerArgumentFactory.java create mode 100644 src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java create mode 100644 src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index 64a94d2d4..78b19bce8 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -29,9 +29,11 @@ CREATE UNIQUE INDEX column_family_no_duplicates_idx CREATE TABLE IF NOT EXISTS "repair_run" ( "id" SERIAL PRIMARY KEY, + "cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"), + "column_family_id" INT NOT NULL REFERENCES "column_family" ("id"), "cause" TEXT NOT NULL, "owner" TEXT NOT NULL, - -- see RepairRun.RunState for state values + -- see (Java) RepairRun.RunState for state values "state" TEXT NOT NULL, "creation_time" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, "start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, @@ -43,9 +45,9 @@ CREATE TABLE IF NOT EXISTS "repair_segment" ( "id" SERIAL PRIMARY KEY, "column_family_id" INT NOT NULL REFERENCES "column_family" ("id"), "run_id" INT NOT NULL REFERENCES "repair_run" ("id"), - "start_token" BIGINT NOT NULL, - "end_token" BIGINT NOT NULL, - -- see RepairSegment.State for state values + "start_token" NUMERIC(50) NOT NULL, + "end_token" NUMERIC(50) NOT NULL, + -- see (Java) RepairSegment.State for state values "state" SMALLINT NOT NULL, "start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index c919b8ec6..40946b543 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -43,14 +43,21 @@ public String getName() { return "cassandra-reaper"; } + /** + * Before a Dropwizard application can provide the command-line interface, parse a configuration + * file, or run as a server, it must first go through a bootstrapping phase. You can add Bundles, + * Commands, or register Jackson modules to allow you to include custom types as part of your + * configuration class. + */ @Override public void initialize(Bootstrap bootstrap) { - LOG.debug("ReaperApplication.initialize called"); + // nothing to initialize } @Override public void run(ReaperApplicationConfiguration config, Environment environment) throws ReaperException { + checkConfiguration(config); LOG.info("initializing runner thread pool with {} threads", config.getRepairRunThreadCount()); RepairRunner.initializeThreadPool(config.getRepairRunThreadCount()); @@ -95,4 +102,10 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config, return storage; } + private void checkConfiguration(ReaperApplicationConfiguration config) throws ReaperException { + LOG.debug("repairIntensity: " + config.getRepairIntensity()); + LOG.debug("repairRunThreadCount: " + config.getRepairRunThreadCount()); + LOG.debug("segmentCount: " + config.getSegmentCount()); + LOG.debug("snapshotRepair: " + config.getSnapshotRepair()); + } } diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 70e016480..99bbc9967 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -29,6 +29,8 @@ public class RepairRun { private final String cause; private final String owner; + private final String clusterName; + private final long columnFamilyId; private final RunState runState; private final DateTime creationTime; private final DateTime startTime; @@ -41,6 +43,14 @@ public long getId() { return id; } + public long getColumnFamilyId() { + return columnFamilyId; + } + + public String getClusterName() { + return clusterName; + } + public String getCause() { return cause; } @@ -49,7 +59,7 @@ public String getOwner() { return owner; } - public RunState getState() { + public RunState getRunState() { return runState; } @@ -60,7 +70,9 @@ public DateTime getCreationTime() { @JsonProperty("creationTime") public String getCreationTimeISO8601() { - if (creationTime == null) return null; + if (creationTime == null) { + return null; + } return creationTime.toDateTime(DateTimeZone.UTC).toString( "YYYY-MM-dd'T'HH:mm:ss'Z'"); } @@ -72,7 +84,9 @@ public DateTime getStartTime() { @JsonProperty("startTime") public String getStartTimeISO8601() { - if (startTime == null) return null; + if (startTime == null) { + return null; + } return startTime.toDateTime(DateTimeZone.UTC).toString( "YYYY-MM-dd'T'HH:mm:ss'Z'"); } @@ -84,7 +98,9 @@ public DateTime getEndTime() { @JsonProperty("endTime") public String getEndTimeISO8601() { - if (endTime == null) return null; + if (endTime == null) { + return null; + } return endTime.toDateTime(DateTimeZone.UTC).toString( "YYYY-MM-dd'T'HH:mm:ss'Z'"); } @@ -103,8 +119,8 @@ public int getCompletedSegments() { public static RepairRun getCopy(RepairRun repairRun, RunState newState, DateTime startTime, DateTime endTime, int completedSegments) { - return new RepairRun.Builder(newState, - repairRun.getCreationTime(), repairRun.getIntensity(), + return new RepairRun.Builder(repairRun.getClusterName(), repairRun.getColumnFamilyId(), + newState, repairRun.getCreationTime(), repairRun.getIntensity(), repairRun.getTotalSegments(), completedSegments) .cause(repairRun.getCause()) .owner(repairRun.getOwner()) @@ -123,6 +139,8 @@ public enum RunState { private RepairRun(Builder builder, long id) { this.id = id; + this.clusterName = builder.clusterName; + this.columnFamilyId = builder.columnFamilyId; this.cause = builder.cause; this.owner = builder.owner; this.runState = builder.runState; @@ -136,6 +154,8 @@ private RepairRun(Builder builder, long id) { public static class Builder { + public final String clusterName; + public final long columnFamilyId; public final RunState runState; public final DateTime creationTime; public final double intensity; @@ -146,8 +166,11 @@ public static class Builder { private DateTime startTime; private DateTime endTime; - public Builder(RunState runState, DateTime creationTime, - double intensity, int totalSegments, int completedSegments) { + public Builder(String clusterName, long columnFamilyId, RunState runState, + DateTime creationTime, double intensity, int totalSegments, + int completedSegments) { + this.clusterName = clusterName; + this.columnFamilyId = columnFamilyId; this.runState = runState; this.creationTime = creationTime; this.intensity = intensity; diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index a8d516147..be277fb9c 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -50,6 +50,14 @@ public RingRange getTokenRange() { return tokenRange; } + public BigInteger getStartToken() { + return tokenRange.getStart(); + } + + public BigInteger getEndToken() { + return tokenRange.getEnd(); + } + public State getState() { return state; } diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 45b0c05d5..32c782cc1 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -26,6 +26,7 @@ import java.math.BigInteger; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -57,7 +58,11 @@ public ClusterResource(IStorage storage) { public Response getClusterList() { LOG.info("get cluster list called"); Collection clusters = storage.getClusters(); - return Response.ok().entity(clusters).build(); + List clusterNames = new ArrayList<>(); + for (Cluster cluster : clusters) { + clusterNames.add(cluster.getName()); + } + return Response.ok().entity(clusterNames).build(); } @GET diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index c576cad8e..b60386c7f 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -19,6 +19,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -46,6 +50,18 @@ public Response getRepairRun(@PathParam("id") Long repairRunId) { return Response.ok().entity(repairRun).build(); } + @GET + @Path("/cluster/{cluster_name}") + public Response getRepairRunsForCluster(@PathParam("cluster_name") String clusterName) { + LOG.info("get repair run for cluster called with: cluster_name = {}", clusterName); + Collection repairRuns = storage.getRepairRunsForCluster(clusterName); + List repairRunIds = new ArrayList<>(); + for (RepairRun repairRun : repairRuns) { + repairRunIds.add(repairRun.getId()); + } + return Response.ok().entity(repairRunIds).build(); + } + // We probably don't want to create repair runs with this resource, // but actually only by posting the table resource. // Get here is used only for providing visibility to what is going on with the run. diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index 885d9573d..8a5d8e51f 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -194,8 +194,11 @@ public Response addTable(@Context UriInfo uriInfo, } RepairRun newRepairRun = - storage.addRepairRun(new RepairRun.Builder(RepairRun.RunState.NOT_STARTED, - DateTime.now(), config.getRepairIntensity(), + storage.addRepairRun(new RepairRun.Builder(targetCluster.getName(), + existingTable.getId(), + RepairRun.RunState.NOT_STARTED, + DateTime.now(), + config.getRepairIntensity(), segments.size(), 0) .cause(cause.isPresent() ? cause.get() : "no cause specified") .owner(owner.get())); diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 30276b5c3..ea1e0e850 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -98,7 +98,7 @@ public void run() { // Need to check current status from database every time, if state changed etc. repairRun = storage.getRepairRun(repairRun.getId()); - RepairRun.RunState runState = repairRun.getState(); + RepairRun.RunState runState = repairRun.getRunState(); switch (runState) { case NOT_STARTED: @@ -145,11 +145,11 @@ private void checkIfNeedToStartNextSegmentSync() { } private void checkIfNeedToStartNextSegment() { - if (repairRun.getState() == RepairRun.RunState.PAUSED - || repairRun.getState() == RepairRun.RunState.DONE - || repairRun.getState() == RepairRun.RunState.ERROR) { + if (repairRun.getRunState() == RepairRun.RunState.PAUSED + || repairRun.getRunState() == RepairRun.RunState.DONE + || repairRun.getRunState() == RepairRun.RunState.ERROR) { LOG.debug("not starting new segment if repair run (id {}) is not running: {}", - repairRun.getId(), repairRun.getState()); + repairRun.getId(), repairRun.getRunState()); return; } @@ -161,11 +161,11 @@ private void checkIfNeedToStartNextSegment() { changeCurrentRepairRunState(RepairRun.RunState.ERROR); return; } - if (repairRun.getState() == RepairRun.RunState.NOT_STARTED) { + if (repairRun.getRunState() == RepairRun.RunState.NOT_STARTED) { LOG.info("started new repair run {}", repairRun.getId()); changeCurrentRepairRunState(RepairRun.RunState.RUNNING); } else { - assert repairRun.getState() == RepairRun.RunState.RUNNING : "logical error in run state"; + assert repairRun.getRunState() == RepairRun.RunState.RUNNING : "logical error in run state"; LOG.info("started existing repair run {}", repairRun.getId()); } } else { @@ -245,7 +245,7 @@ private int triggerRepair(RepairSegment segment) throws ReaperException { } private void changeCurrentRepairRunState(RepairRun.RunState newRunState) { - if (repairRun.getState() == newRunState) { + if (repairRun.getRunState() == newRunState) { LOG.info("repair run {} state {} same as before, not changed", repairRun.getId(), newRunState); return; @@ -265,7 +265,7 @@ private void changeCurrentRepairRunState(RepairRun.RunState newRunState) { } LOG.info("repair run with id {} state change from {} to {}", - repairRun.getId(), repairRun.getState().toString(), newRunState.toString()); + repairRun.getId(), repairRun.getRunState().toString(), newRunState.toString()); RepairRun updatedRun = RepairRun.getCopy(repairRun, newRunState, newStartTime, newEndTime, repairRun.getCompletedSegments()); if (!storage.updateRepairRun(updatedRun)) { @@ -323,7 +323,7 @@ public void handle(int repairNumber, ActiveRepairService.Status status, String m currentSegment.getStartTime(), DateTime.now()); storage.updateRepairSegment(currentSegment); repairRun = - RepairRun.getCopy(repairRun, repairRun.getState(), repairRun.getStartTime(), + RepairRun.getCopy(repairRun, repairRun.getRunState(), repairRun.getStartTime(), repairRun.getEndTime(), repairRun.getCompletedSegments() + 1); storage.updateRepairRun(repairRun); checkIfNeedToStartNextSegment(); diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 8c7b919bd..9b8bd02c8 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -19,7 +19,6 @@ import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; -import java.math.BigInteger; import java.util.Collection; /** @@ -43,6 +42,8 @@ public interface IStorage { RepairRun getRepairRun(long id); + Collection getRepairRunsForCluster(String clusterName); + ColumnFamily addColumnFamily(ColumnFamily.Builder newTable); ColumnFamily getColumnFamily(long id); @@ -58,6 +59,4 @@ public interface IStorage { RepairSegment getNextFreeSegment(long runId); RepairSegment getNextFreeSegmentInRange(long runId, RingRange range); - - } diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 2f50e862e..aff339dc9 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -20,9 +20,8 @@ import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; -import com.spotify.reaper.service.SegmentGenerator; -import java.math.BigInteger; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -130,6 +129,17 @@ public RepairRun getRepairRun(long id) { return repairRuns.get(id); } + @Override + public List getRepairRunsForCluster(String clusterName) { + List foundRepairRuns = new ArrayList<>(); + for (RepairRun repairRun : repairRuns.values()) { + if (repairRun.getClusterName().equalsIgnoreCase(clusterName)) { + foundRepairRuns.add(repairRun); + } + } + return foundRepairRuns; + } + @Override public ColumnFamily addColumnFamily(ColumnFamily.Builder columnFamily) { ColumnFamily existing = @@ -165,7 +175,6 @@ public int addRepairSegments(Collection segments) { repairSegments.put(newRepairSegment.getId(), newRepairSegment); newSegments.put(newRepairSegment.getId(), newRepairSegment); } - // TODO: (bj0rn) this is very ugly, the function should probably take runId. repairSegmentsByRunId.put(newSegments.values().iterator().next().getRunId(), newSegments); return newSegments.size(); } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index 0ac41e8f8..2c86cbd85 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -20,15 +20,17 @@ import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RingRange; +import com.spotify.reaper.storage.postgresql.BigIntegerArgumentFactory; import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; import com.spotify.reaper.storage.postgresql.PostgresArrayArgumentFactory; +import com.spotify.reaper.storage.postgresql.RunStateArgumentFactory; +import com.spotify.reaper.storage.postgresql.StateArgumentFactory; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -56,6 +58,14 @@ public PostgresStorage(ReaperApplicationConfiguration config, Environment enviro } } + private static IStoragePostgreSQL getPostgresStorage(Handle h) { + h.registerArgumentFactory(new PostgresArrayArgumentFactory()); + h.registerArgumentFactory(new RunStateArgumentFactory()); + h.registerArgumentFactory(new StateArgumentFactory()); + h.registerArgumentFactory(new BigIntegerArgumentFactory()); + return h.attach(IStoragePostgreSQL.class); + } + @Override public Cluster getCluster(String clusterName) { return (Cluster) getGeneric(Cluster.class, clusterName); @@ -67,7 +77,7 @@ public boolean isStorageConnected() { return false; } Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); String postgresVersion = postgres.getVersion(); LOG.debug("connected PostgreSQL version: {}", postgresVersion); return null != postgresVersion && postgresVersion.trim().length() > 0; @@ -76,15 +86,14 @@ public boolean isStorageConnected() { @Override public Collection getClusters() { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); return postgres.getClusters(); } @Override public Cluster addCluster(Cluster newCluster) { Handle h = jdbi.open(); - h.registerArgumentFactory(new PostgresArrayArgumentFactory()); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); int rowsAdded = postgres.insertCluster(newCluster); h.close(); if (rowsAdded < 1) { @@ -97,8 +106,7 @@ public Cluster addCluster(Cluster newCluster) { @Override public boolean updateCluster(Cluster cluster) { Handle h = jdbi.open(); - h.registerArgumentFactory(new PostgresArrayArgumentFactory()); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); int rowsAdded = postgres.updateCluster(cluster); h.close(); if (rowsAdded < 1) { @@ -114,11 +122,17 @@ public RepairRun getRepairRun(long id) { return result; } + @Override + public Collection getRepairRunsForCluster(String clusterName) { + Handle h = jdbi.open(); + IStoragePostgreSQL postgres = getPostgresStorage(h); + return postgres.getRepairRunsForCluster(clusterName); + } + @Override public RepairRun addRepairRun(RepairRun.Builder newRepairRun) { Handle h = jdbi.open(); - h.registerArgumentFactory(new PostgresArrayArgumentFactory()); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); long insertedId = postgres.insertRepairRun(newRepairRun.build(-1)); h.close(); return newRepairRun.build(insertedId); @@ -127,7 +141,7 @@ public RepairRun addRepairRun(RepairRun.Builder newRepairRun) { @Override public boolean updateRepairRun(RepairRun repairRun) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); int rowsAdded = postgres.updateRepairRun(repairRun); h.close(); if (rowsAdded < 1) { @@ -140,7 +154,7 @@ public boolean updateRepairRun(RepairRun repairRun) { @Override public ColumnFamily addColumnFamily(ColumnFamily.Builder newColumnFamily) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); long insertedId = postgres.insertColumnFamily(newColumnFamily.build(-1)); h.close(); return newColumnFamily.build(insertedId); @@ -152,8 +166,13 @@ public ColumnFamily getColumnFamily(long id) { } @Override - public ColumnFamily getColumnFamily(String cluster, String keyspace, String table) { - return null; + public ColumnFamily getColumnFamily(String clusterName, String keyspaceName, String tableName) { + Handle h = jdbi.open(); + IStoragePostgreSQL postgres = getPostgresStorage(h); + ColumnFamily result = postgres.getColumnFamilyByClusterAndName(clusterName, keyspaceName, + tableName); + h.close(); + return result; } @Override @@ -163,14 +182,14 @@ public int addRepairSegments(Collection newSegments) { insertableSegments.add(segment.build(-1)); } Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); return postgres.insertRepairSegments(insertableSegments.iterator()); } @Override public boolean updateRepairSegment(RepairSegment repairSegment) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); int rowsAdded = postgres.updateRepairSegment(repairSegment); h.close(); if (rowsAdded < 1) { @@ -188,7 +207,7 @@ public RepairSegment getRepairSegment(long id) { @Override public RepairSegment getNextFreeSegment(long runId) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); RepairSegment result = postgres.getNextFreeRepairSegment(runId); h.close(); return result; @@ -197,8 +216,9 @@ public RepairSegment getNextFreeSegment(long runId) { @Override public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); - RepairSegment result = postgres.getNextFreeRepairSegmentOnRange(runId, range.getStart(), range.getEnd()); + IStoragePostgreSQL postgres = getPostgresStorage(h); + RepairSegment result = postgres.getNextFreeRepairSegmentOnRange(runId, range.getStart(), + range.getEnd()); h.close(); return result; } @@ -208,7 +228,7 @@ public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { */ private Object getGeneric(Class coreObjectType, Object value) { Handle h = jdbi.open(); - IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + IStoragePostgreSQL postgres = getPostgresStorage(h); Object result = null; if (coreObjectType == Cluster.class) { result = postgres.getCluster((String) value); diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/BigIntegerArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/BigIntegerArgumentFactory.java new file mode 100644 index 000000000..bdfd30304 --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/BigIntegerArgumentFactory.java @@ -0,0 +1,36 @@ +package com.spotify.reaper.storage.postgresql; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.Argument; +import org.skife.jdbi.v2.tweak.ArgumentFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Provides JDBI a method to map BigInteger value to a BIGINT value in database. + */ +public class BigIntegerArgumentFactory implements ArgumentFactory { + + private static final Logger LOG = LoggerFactory.getLogger(BigIntegerArgumentFactory.class); + + @Override + public boolean accepts(Class expectedType, Object value, StatementContext ctx) { + return value instanceof BigInteger; + } + + @Override + public Argument build(Class expectedType, final BigInteger value, StatementContext ctx) { + return new Argument() { + public void apply(int position, + PreparedStatement statement, + StatementContext ctx) throws SQLException { + statement.setBigDecimal(position, new BigDecimal(value)); + } + }; + } +} diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java index d344838e8..56f06c7d0 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.spotify.reaper.storage.postgresql; import com.spotify.reaper.core.ColumnFamily; diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java index dab712b4a..917e35639 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -74,20 +74,30 @@ public interface IStoragePostgreSQL { // RepairRun // static final String SQL_INSERT_REPAIR_RUN = - "INSERT INTO repair_run (cause, owner, state, creation_time, start_time, end_time) " - + "VALUES (:cause, :owner, :state, :creationTime, :startTime, :endTime)"; + "INSERT INTO repair_run (cluster_name, column_family_id, cause, owner, state, " + + "creation_time, start_time, end_time, intensity) " + + "VALUES (:clusterName, :columnFamilyId, :cause, :owner, :runState, :creationTime, " + + ":startTime, :endTime, :intensity)"; static final String SQL_UPDATE_REPAIR_RUN = - "UPDATE repair_run SET cause = :cause, owner = :owner, state = :state, " - + "start_time = :startTime, end_time = :endTime WHERE id = :id"; + "UPDATE repair_run SET cause = :cause, owner = :owner, state = :runState, " + + "start_time = :startTime, end_time = :endTime, intensity = :intensity WHERE id = :id"; static final String SQL_GET_REPAIR_RUN = - "SELECT id, cause, owner, state, creation_time, start_time, end_time " - + "FROM repair_run WHERE id = :id"; + "SELECT id, cluster_name, column_family_id, cause, owner, state, creation_time, " + + "start_time, end_time, intensity FROM repair_run WHERE id = :id"; + + static final String SQL_GET_REPAIR_RUNS_FOR_CLUSTER = + "SELECT id, cluster_name, column_family_id, cause, owner, state, creation_time, " + + "start_time, end_time, intensity FROM repair_run WHERE cluster_name = :clusterName"; @SqlQuery(SQL_GET_REPAIR_RUN) @Mapper(RepairRunMapper.class) - public Cluster getRepairRun(@Bind("id") long repairRunId); + public RepairRun getRepairRun(@Bind("id") long repairRunId); + + @SqlQuery(SQL_GET_REPAIR_RUNS_FOR_CLUSTER) + @Mapper(RepairRunMapper.class) + public Collection getRepairRunsForCluster(@Bind("clusterName") String clusterName); @SqlUpdate(SQL_INSERT_REPAIR_RUN) @GetGeneratedKeys @@ -107,10 +117,21 @@ public interface IStoragePostgreSQL { "SELECT id, cluster_name, keyspace_name, name, segment_count, snapshot_repair " + "FROM column_family WHERE id = :id"; + static final String SQL_GET_COLUMN_FAMILY_BY_CLUSTER_AND_NAME = + "SELECT id, cluster_name, keyspace_name, name, segment_count, snapshot_repair " + + "FROM column_family WHERE cluster_name = :clusterName AND keyspace_name = :keyspaceName " + + "AND name = :name"; + @SqlQuery(SQL_GET_COLUMN_FAMILY) @Mapper(ColumnFamilyMapper.class) public ColumnFamily getColumnFamily(@Bind("id") long columnFamilyId); + @SqlQuery(SQL_GET_COLUMN_FAMILY_BY_CLUSTER_AND_NAME) + @Mapper(ColumnFamilyMapper.class) + public ColumnFamily getColumnFamilyByClusterAndName(@Bind("clusterName") String clusterName, + @Bind("keyspaceName") String keyspaceName, + @Bind("name") String tableName); + @SqlUpdate(SQL_INSERT_COLUMN_FAMILY) @GetGeneratedKeys public long insertColumnFamily(@BindBean ColumnFamily newColumnFamily); @@ -120,7 +141,7 @@ public interface IStoragePostgreSQL { static final String SQL_INSERT_REPAIR_SEGMENT = "INSERT INTO repair_segment (column_family_id, run_id, start_token, end_token, state, " + "start_time, end_time) VALUES (:columnFamilyId, :runId, :startToken, :endToken, " - + ":state, startTime:, endTime)"; + + ":state, :startTime, :endTime)"; static final String SQL_UPDATE_REPAIR_SEGMENT = "UPDATE repair_segment SET column_family_id = :columnFamilyId, run_id = :runId, " diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java index 23a4142a7..293157532 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.spotify.reaper.storage.postgresql; import org.skife.jdbi.v2.StatementContext; @@ -10,7 +23,7 @@ import java.util.Collection; /** - * Required as we are using JDBI, and it cannot do Array binding otherwise, duh. + * Provides JDBI a method to map String Collection to an SQL Array type. */ public class PostgresArrayArgumentFactory implements ArgumentFactory> { diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java index 00c5124d6..d34551d6b 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.spotify.reaper.storage.postgresql; import com.spotify.reaper.core.RepairRun; @@ -14,7 +27,9 @@ public class RepairRunMapper implements ResultSetMapper { public RepairRun map(int index, ResultSet r, StatementContext ctx) throws SQLException { RepairRun.RunState runState = RepairRun.RunState.valueOf(r.getString("state")); - return new RepairRun.Builder(runState, + return new RepairRun.Builder(r.getString("cluster_name"), + r.getLong("column_family_id"), + runState, getDateTimeOrNull(r, "creation_time"), r.getFloat("intensity"), r.getInt("total_segments"), diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java index f9e717e92..a4759bc8e 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.spotify.reaper.storage.postgresql; import com.spotify.reaper.core.RepairSegment; diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java new file mode 100644 index 000000000..80b5cb528 --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.spotify.reaper.storage.postgresql; + +import com.spotify.reaper.core.RepairRun; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.Argument; +import org.skife.jdbi.v2.tweak.ArgumentFactory; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Provides JDBI a method to map RunState value to a TEXT value in database. + */ +public class RunStateArgumentFactory implements ArgumentFactory { + + @Override + public boolean accepts(Class expectedType, Object value, StatementContext ctx) { + return value instanceof RepairRun.RunState; + } + + @Override + public Argument build(Class expectedType, final RepairRun.RunState value, + StatementContext ctx) { + return new Argument() { + public void apply(int position, + PreparedStatement statement, + StatementContext ctx) throws SQLException { + statement.setString(position, value.toString()); + } + }; + } +} diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java new file mode 100644 index 000000000..3d7f3715b --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java @@ -0,0 +1,33 @@ +package com.spotify.reaper.storage.postgresql; + +import com.spotify.reaper.core.RepairSegment; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.Argument; +import org.skife.jdbi.v2.tweak.ArgumentFactory; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Provides JDBI a method to map State value to an INT value in database. + */ +public class StateArgumentFactory implements ArgumentFactory { + + @Override + public boolean accepts(Class expectedType, Object value, StatementContext ctx) { + return value instanceof RepairSegment.State; + } + + @Override + public Argument build(Class expectedType, final RepairSegment.State value, + StatementContext ctx) { + return new Argument() { + public void apply(int position, + PreparedStatement statement, + StatementContext ctx) throws SQLException { + statement.setInt(position, value.ordinal()); + } + }; + } +}