From c01bef21554c912aab50ad936e21eafd6e40120b Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Thu, 22 Jan 2015 14:30:44 +0100 Subject: [PATCH] WIP still ongoing --- src/main/db/reaper_db.sql | 1 + .../spotify/reaper/cassandra/JmxProxy.java | 8 +- .../reaper/resources/ClusterResource.java | 21 +-- .../reaper/resources/RepairRunResource.java | 125 ++++++++++-------- .../com/spotify/reaper/storage/IStorage.java | 45 ++++--- 5 files changed, 113 insertions(+), 87 deletions(-) diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index aaf441420..f4a203c0c 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS "repair_run" ( "creation_time" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, "start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, + "pause_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "intensity" REAL NOT NULL ); diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index 746169760..ec1d52b6c 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -116,7 +116,7 @@ public static JmxProxy connect(Optional handler, String hos mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null); LOG.info(String.format("JMX connection to %s properly connected.", host)); return proxy; - } catch (IOException | InstanceNotFoundException | MalformedObjectNameException e) { + } catch (IOException | InstanceNotFoundException e) { LOG.error("Failed to establish JMX connection"); throw new ReaperException("Failure when establishing JMX connection", e); } @@ -187,9 +187,9 @@ public List getKeyspaces() { return ssProxy.getKeyspaces(); } - public List getTableNamesForKeyspace(String keyspace) throws ReaperException { - List tableNames = new ArrayList<>(); - Iterator> proxies = null; + public Set getTableNamesForKeyspace(String keyspace) throws ReaperException { + Set tableNames = new HashSet<>(); + Iterator> proxies; try { proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer); } catch (IOException | MalformedObjectNameException e) { diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 2bd9985cd..d9612e421 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -63,8 +63,13 @@ public Response getClusterList() { @Path("/{cluster_name}") public Response getCluster(@PathParam("cluster_name") String clusterName) { LOG.info("get cluster called with cluster_name: {}", clusterName); - Cluster cluster = storage.getCluster(clusterName); - return viewCluster(cluster, Optional.absent()); + Optional cluster = storage.getCluster(clusterName); + if (cluster.isPresent()) { + return viewCluster(cluster.get(), Optional.absent()); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity("cluster with name \"" + clusterName + "\" not found").build(); + } } @POST @@ -84,15 +89,15 @@ public Response addCluster( return Response.status(400) .entity("failed to create cluster with seed host: " + seedHost.get()).build(); } - Cluster existingCluster = storage.getCluster(newCluster.getName()); - if (existingCluster == null) { - LOG.info("creating new cluster based on given seed host: {}", newCluster); - storage.addCluster(newCluster); - } else { + Optional existingCluster = storage.getCluster(newCluster.getName()); + if (existingCluster.isPresent()) { LOG.info("cluster already stored with this name: {}", existingCluster); return Response.status(403) - .entity(String.format("cluster \"%s\" already exists", existingCluster.getName())) + .entity(String.format("cluster \"%s\" already exists", existingCluster.get().getName())) .build(); + } else { + LOG.info("creating new cluster based on given seed host: {}", newCluster); + storage.addCluster(newCluster); } URI createdURI; diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 99bfd61aa..33a17773e 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -13,9 +13,12 @@ */ package com.spotify.reaper.resources; +import com.google.common.base.CharMatcher; import com.google.common.base.Optional; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxProxy; @@ -68,6 +71,9 @@ public class RepairRunResource { private final ReaperApplicationConfiguration config; private final JmxConnectionFactory jmxFactory; + public static final Splitter COMMA_SEPARATED_LIST_SPLITTER = + Splitter.on(',').trimResults(CharMatcher.anyOf(" ()[]\"'")).omitEmptyStrings(); + public RepairRunResource(ReaperApplicationConfiguration config, IStorage storage) { this.config = config; this.storage = storage; @@ -98,13 +104,13 @@ public Response addRepairRun( @Context UriInfo uriInfo, @QueryParam("clusterName") Optional clusterName, @QueryParam("keyspace") Optional keyspace, - @QueryParam("tables") Optional tableNames, + @QueryParam("tables") Optional tableNamesParam, @QueryParam("owner") Optional owner, - @QueryParam("cause") Optional cause) { - + @QueryParam("cause") Optional cause, + @QueryParam("segmentCount") Optional segmentCount + ) { LOG.info("add repair run called with: clusterName = {}, keyspace = {}, tables = {}, owner = {}," - + " cause = {}", clusterName, keyspace, tableNames, owner, cause); - + + " cause = {}", clusterName, keyspace, tableNamesParam, owner, cause); try { if (!clusterName.isPresent()) { throw new ReaperException("\"clusterName\" argument missing"); @@ -118,21 +124,47 @@ public Response addRepairRun( Cluster cluster = getCluster(clusterName.get()); JmxProxy jmxProxy = jmxFactory.create(cluster.getSeedHosts().iterator().next()); - List knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get()); + Set knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get()); if (knownTables.size() == 0) { LOG.debug("no known tables for keyspace {} in cluster {}", keyspace.get(), clusterName.get()); return Response.status(Response.Status.NOT_FOUND).entity( "no column families found for keyspace").build(); } - jmxProxy.close(); - RepairUnit repairUnit = getRepairUnit(clusterName.get(), keyspace.get(), tableNames.get()); - RepairRun newRepairRun = registerRepairRun(cluster, repairUnit, cause, owner.get()); + Set tableNames; + if (tableNamesParam.isPresent()) { + tableNames = Sets.newHashSet(COMMA_SEPARATED_LIST_SPLITTER.split(tableNamesParam.get())); + } else { + tableNames = knownTables; + } + + Optional storedRepairUnit = + storage.getRepairUnit(clusterName.get(), keyspace.get(), tableNames); + RepairUnit theRepairUnit; + if (storedRepairUnit.isPresent()) { + if (segmentCount.isPresent()) { + LOG.warn("stored repair unit already exists, and segment count given, " + + "which is thus ignored"); + } + theRepairUnit = storedRepairUnit.get(); + } else { + int segments = config.getSegmentCount(); + if (segmentCount.isPresent()) { + LOG.debug("using given segment count {} instead of configured value {}", + segmentCount.get(), config.getSegmentCount()); + segments = segmentCount.get(); + } + LOG.info("create new repair unit for cluster '{}', keyspace '{}', and column families: {}", + clusterName.get(), keyspace.get(), tableNames); + theRepairUnit = storage.addRepairUnit(new RepairUnit.Builder(clusterName.get(), + keyspace.get(), tableNames, segments, config.getSnapshotRepair())); + } + RepairRun newRepairRun = registerRepairRun(cluster, theRepairUnit, cause, owner.get()); return Response.created(buildRepairRunURI(uriInfo, newRepairRun)) - .entity(new RepairRunStatus(newRepairRun, repairUnit)) - .build(); + .entity(new RepairRunStatus(newRepairRun, theRepairUnit)).build(); + } catch (ReaperException e) { LOG.error(e.getMessage()); e.printStackTrace(); @@ -154,19 +186,29 @@ public Response modifyRunState( @PathParam("id") Long repairRunId, @QueryParam("state") Optional state) { - LOG.info("pause repair run called with: id = {}, state = {}", repairRunId, state); + LOG.info("modify repair run state called with: id = {}, state = {}", repairRunId, state); if (!state.isPresent()) { return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) - .entity("\"state\" argument missing") - .build(); + .entity("\"state\" argument missing").build(); } try { - RepairRun repairRun = getRepairRun(repairRunId); - RepairUnit repairUnit = storage.getRepairUnit(repairRun.getRepairUnitId()); + Optional repairRun = storage.getRepairRun(repairRunId); + if (!repairRun.isPresent()) { + return Response.status(Response.Status.NOT_FOUND).entity("repair run with id " + + repairRunId + " not found").build(); + } + + Optional repairUnit = storage.getRepairUnit(repairRun.get().getRepairUnitId()); + if (!repairUnit.isPresent()) { + String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found"; + LOG.error(errMsg); + return Response.status(Response.Status.NOT_FOUND).entity(errMsg).build(); + } + RepairRun.RunState newState = RepairRun.RunState.valueOf(state.get()); - RepairRun.RunState oldState = repairRun.getRunState(); + RepairRun.RunState oldState = repairRun.get().getRunState(); if (oldState == newState) { return Response.ok("given \"state\" is same as the current run state").build(); @@ -241,9 +283,9 @@ private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) { @Path("/{id}") public Response getRepairRun(@PathParam("id") Long repairRunId) { LOG.info("get repair_run called with: id = {}", repairRunId); - RepairRun repairRun = storage.getRepairRun(repairRunId); - if (null != repairRun) { - return Response.ok().entity(getRepairRunStatus(repairRun)).build(); + Optional repairRun = storage.getRepairRun(repairRunId); + if (repairRun.isPresent()) { + return Response.ok().entity(getRepairRunStatus(repairRun.get())).build(); } else { return Response.status(404).entity( @@ -278,33 +320,6 @@ private Cluster getCluster(String clusterName) throws ReaperException { return cluster; } - /** - * @return repair unit information for given cluster, keyspace and table name - * @throws ReaperException if such table is not found in Reaper's storage - */ - private RepairUnit getRepairUnit(String clusterName, String keyspace, - Collection tableNames) throws ReaperException { - RepairUnit repairUnit = storage.getRepairUnit(clusterName, keyspace, tableName); - if (repairUnit == null) { - throw new ReaperException(String.format("Column family \"%s/%s/%s\" not found", clusterName, - keyspace, tableName)); - } - return repairUnit; - } - - /** - * @return table information for given table id - * @throws ReaperException if such table is not found in Reaper's storage - */ - private RepairUnit getRepairUnit(long repairUnitId) throws ReaperException { - RepairUnit repairUnit = storage.getRepairUnit(repairUnitId); - if (repairUnit == null) { - throw new ReaperException(String.format("Column family with id \"%d\" not found", - repairUnitId)); - } - return repairUnit; - } - /** * Creates a repair run but does not trigger it. * @@ -344,14 +359,14 @@ private RepairRun registerRepairRun(Cluster cluster, RepairUnit repairUnit, * @throws ReaperException when fails to discover seeds for the cluster or fails to connect to * any of the nodes in the Cluster. */ - private List generateSegments(Cluster targetCluster, RepairUnit existingTable) + private List generateSegments(Cluster targetCluster, RepairUnit repairUnit) throws ReaperException { List segments = null; SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner()); Set seedHosts = targetCluster.getSeedHosts(); if (seedHosts.isEmpty()) { String errMsg = String.format("didn't get any seed hosts for cluster \"%s\"", - existingTable.getClusterName()); + repairUnit.getClusterName()); LOG.error(errMsg); throw new ReaperException(errMsg); } @@ -359,7 +374,7 @@ private List generateSegments(Cluster targetCluster, RepairUnit exist try { JmxProxy jmxProxy = jmxFactory.create(host); List tokens = jmxProxy.getTokens(); - segments = sg.generateSegments(existingTable.getSegmentCount(), tokens); + segments = sg.generateSegments(repairUnit.getSegmentCount(), tokens); jmxProxy.close(); break; } catch (ReaperException e) { @@ -368,7 +383,7 @@ private List generateSegments(Cluster targetCluster, RepairUnit exist } if (segments == null) { String errMsg = String.format("failed to generate repair segments for cluster \"%s\"", - existingTable.getClusterName()); + repairUnit.getClusterName()); LOG.error(errMsg); throw new ReaperException(errMsg); } @@ -401,15 +416,17 @@ private RepairRun storeNewRepairRun(Cluster cluster, RepairUnit table, Optional< * storage backend. */ private void storeNewRepairSegments(List tokenSegments, RepairRun repairRun, - RepairUnit table) { + RepairUnit table) throws ReaperException { List repairSegmentBuilders = Lists.newArrayList(); for (RingRange range : tokenSegments) { RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range, table.getId()); repairSegmentBuilders.add(repairSegment); } - // TODO(zvo): I don't like we can't figure out if this suceeds or not - storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); + boolean success = storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); + if (!success) { + throw new ReaperException("failed adding repair segments to storage"); + } } /** diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index af3f138c9..e69e0e3fa 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -13,16 +13,15 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Optional; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.service.RingRange; import java.util.Collection; -import javax.annotation.Nullable; - /** * API definition for cassandra-reaper. */ @@ -32,17 +31,17 @@ public interface IStorage { Collection getClusters(); - Cluster addCluster(Cluster cluster); + boolean addCluster(Cluster cluster); boolean updateCluster(Cluster newCluster); - Cluster getCluster(String clusterName); + Optional getCluster(String clusterName); RepairRun addRepairRun(RepairRun.Builder repairRun); boolean updateRepairRun(RepairRun repairRun); - RepairRun getRepairRun(long id); + Optional getRepairRun(long id); Collection getRepairRunsForCluster(String clusterName); @@ -50,29 +49,33 @@ public interface IStorage { RepairUnit addRepairUnit(RepairUnit.Builder newRepairUnit); - RepairUnit getRepairUnit(long id); + Optional getRepairUnit(long id); - RepairUnit getRepairUnit(String cluster, String keyspace, String table); + /** + * Get a stored RepairUnit targeting the given tables in the given keyspace. + * Tables must be always defined, so targeting the whole keyspace requires + * first getting all the column family names from the keyspace. + * + * @param cluster Cluster name for the RepairUnit. + * @param keyspace Keyspace name for the RepairUnit. + * @param columnFamilyNames List of column families targeted by the RepairUnit. + * @return Instance of a RepairUnit matching the parameters, or null if not found. + */ + Optional getRepairUnit(String cluster, String keyspace, + Collection columnFamilyNames); - void addRepairSegments(Collection newSegments, long runId); + boolean addRepairSegments(Collection newSegments, long runId); boolean updateRepairSegment(RepairSegment newRepairSegment); - RepairSegment getRepairSegment(long id); + Optional getRepairSegment(long id); - RepairSegment getNextFreeSegment(long runId); + Optional getNextFreeSegment(long runId); - RepairSegment getNextFreeSegmentInRange(long runId, RingRange range); + Optional getNextFreeSegmentInRange(long runId, RingRange range); - /** - * If RepairRun is running, there should always be only one running segment at a time. - * TODO: what if we have parallel repair run on one ring? - * - * @param runId The RepairRun id that should be running to have one running segment. - * @return The running segment, or null in case there is none. - */ - @Nullable - RepairSegment getTheRunningSegment(long runId); + Collection getSegmentsWithStateForRun(long runId, + RepairSegment.State segmentState); Collection getRepairRunIdsForCluster(String clusterName);