From a19ddfbdb7673f26dd8ba1928defeb9f8266fe76 Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Mon, 12 Jan 2015 16:46:57 +0100 Subject: [PATCH 1/3] Code style cleanup --- .../spotify/reaper/cassandra/JmxProxy.java | 27 +++---- .../com/spotify/reaper/core/ColumnFamily.java | 2 +- .../com/spotify/reaper/core/RepairRun.java | 2 +- .../spotify/reaper/core/RepairSegment.java | 2 +- .../reaper/resources/ClusterResource.java | 9 +-- .../reaper/resources/TableResource.java | 77 ++++++++++--------- .../spotify/reaper/service/RepairRunner.java | 76 +++++++++--------- .../com/spotify/reaper/service/RingRange.java | 6 +- .../reaper/service/SegmentGenerator.java | 20 ++--- .../spotify/reaper/storage/MemoryStorage.java | 15 ++-- .../reaper/storage/PostgresStorage.java | 8 +- .../storage/postgresql/ClusterMapper.java | 2 +- .../postgresql/ColumnFamilyMapper.java | 10 +-- .../PostgresArrayArgumentFactory.java | 6 +- .../storage/postgresql/RepairRunMapper.java | 11 ++- .../postgresql/RepairSegmentMapper.java | 11 ++- .../postgresql/RunStateArgumentFactory.java | 7 +- .../postgresql/StateArgumentFactory.java | 7 +- 18 files changed, 148 insertions(+), 150 deletions(-) diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index 3b4c76909..1190bbaa7 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -29,9 +29,9 @@ import java.io.Serializable; import java.math.BigInteger; import java.net.MalformedURLException; -import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import javax.management.InstanceNotFoundException; @@ -64,8 +64,7 @@ public class JmxProxy implements NotificationListener, Serializable { private final String host; private JmxProxy(Optional handler, String host, JMXConnector jmxConnector, - StorageServiceMBean ssProxy, ObjectName mbeanName, - MBeanServerConnection mbeanServer) { + StorageServiceMBean ssProxy, ObjectName mbeanName, MBeanServerConnection mbeanServer) { this.host = host; this.jmxConnector = jmxConnector; this.mbeanName = mbeanName; @@ -118,8 +117,8 @@ public static JmxProxy connect(Optional handler, String hos try { JMXConnector jmxConn = JMXConnectorFactory.connect(jmxUrl); MBeanServerConnection mbeanServerConn = jmxConn.getMBeanServerConnection(); - StorageServiceMBean - ssProxy = JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class); + StorageServiceMBean ssProxy = + JMX.newMBeanProxy(mbeanServerConn, mbeanName, StorageServiceMBean.class); JmxProxy proxy = new JmxProxy(handler, host, jmxConn, ssProxy, mbeanName, mbeanServerConn); // registering a listener throws bunch of exceptions, so we do it here rather than in the // constructor @@ -153,10 +152,12 @@ public BigInteger apply(String s) { @Nullable public List tokenRangeToEndpoint(String keyspace, RingRange tokenRange) { checkNotNull(ssProxy, "Looks like the proxy is not connected"); - for (Map.Entry, List> entry : ssProxy.getRangeToEndpointMap(keyspace) - .entrySet()) { - if (new RingRange(new BigInteger(entry.getKey().get(0)), - new BigInteger(entry.getKey().get(1))).encloses(tokenRange)) { + Set, List>> entries = + ssProxy.getRangeToEndpointMap(keyspace).entrySet(); + for (Map.Entry, List> entry : entries) { + BigInteger rangeStart = new BigInteger(entry.getKey().get(0)); + BigInteger rangeEnd = new BigInteger(entry.getKey().get(1)); + if (new RingRange(rangeStart, rangeEnd).encloses(tokenRange)) { return entry.getValue(); } } @@ -200,11 +201,11 @@ public List getKeySpaces() { * @return Repair command number, or 0 if nothing to repair */ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace, - String columnFamily) { + String columnFamily) { checkNotNull(ssProxy, "Looks like the proxy is not connected"); - LOG.info(String.format("Triggering repair of range (%s,%s] for %s.%s via host %s", - beginToken.toString(), endToken.toString(), keyspace, columnFamily, - this.host)); + String msg = String.format("Triggering repair of range (%s,%s] for %s.%s via host %s", + beginToken.toString(), endToken.toString(), keyspace, columnFamily, this.host); + LOG.info(msg); return ssProxy.forceRepairRangeAsync( beginToken.toString(), endToken.toString(), diff --git a/src/main/java/com/spotify/reaper/core/ColumnFamily.java b/src/main/java/com/spotify/reaper/core/ColumnFamily.java index ed444a714..0e6c8931b 100644 --- a/src/main/java/com/spotify/reaper/core/ColumnFamily.java +++ b/src/main/java/com/spotify/reaper/core/ColumnFamily.java @@ -68,7 +68,7 @@ public static class Builder { private boolean snapshotRepair; public Builder(String clusterName, String keyspaceName, String name, int segmentCount, - boolean snapshotRepair) { + boolean snapshotRepair) { this.clusterName = clusterName; this.keyspaceName = keyspaceName; this.name = name; diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 161dc6535..ba0c62e7c 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -111,7 +111,7 @@ public static class Builder { private DateTime endTime; public Builder(String clusterName, long columnFamilyId, RunState runState, - DateTime creationTime, double intensity) { + DateTime creationTime, double intensity) { this.clusterName = clusterName; this.columnFamilyId = columnFamilyId; this.runState = runState; diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index 490419426..d987af112 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -77,7 +77,7 @@ public enum State { DONE } - private RepairSegment(Builder builder,long id) { + private RepairSegment(Builder builder, long id) { this.id = id; this.repairCommandId = builder.repairCommandId; this.columnFamilyId = builder.columnFamilyId; diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 394e713c2..3d80b1f23 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -76,8 +76,9 @@ public Response getCluster(@PathParam("cluster_name") String clusterName) { } @POST - public Response addCluster(@Context UriInfo uriInfo, - @QueryParam("seedHost") Optional seedHost) { + public Response addCluster( + @Context UriInfo uriInfo, + @QueryParam("seedHost") Optional seedHost) { if (!seedHost.isPresent()) { LOG.error("POST on cluster resource called without seedHost"); return Response.status(400).entity("query parameter \"seedHost\" required").build(); @@ -129,9 +130,7 @@ public static Cluster createClusterWithSeedHost(String seedHost) e.printStackTrace(); throw e; } - Cluster newCluster = - new Cluster(clusterName, partitioner, Collections.singleton(seedHost)); - return newCluster; + return new Cluster(clusterName, partitioner, Collections.singleton(seedHost)); } } diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index 058e5962e..f29cec9c6 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -66,23 +66,25 @@ public TableResource(ReaperApplicationConfiguration config, IStorage storage) { @GET @Path("/{clusterName}/{keyspace}/{table}") - public Response getTable(@PathParam("clusterName") String clusterName, - @PathParam("keyspace") String keyspace, - @PathParam("table") String table) { + public Response getTable( + @PathParam("clusterName") String clusterName, + @PathParam("keyspace") String keyspace, + @PathParam("table") String table) { LOG.info("get table called with: clusterName = {}, keyspace = {}, table = {}", clusterName, keyspace, table); return Response.ok().entity("not implemented yet").build(); } @POST - public Response addTable(@Context UriInfo uriInfo, - @QueryParam("clusterName") Optional clusterName, - @QueryParam("seedHost") Optional seedHost, - @QueryParam("keyspace") Optional keyspace, - @QueryParam("table") Optional table, - @QueryParam("startRepair") Optional startRepair, - @QueryParam("owner") Optional owner, - @QueryParam("cause") Optional cause) { + public Response addTable( + @Context UriInfo uriInfo, + @QueryParam("clusterName") Optional clusterName, + @QueryParam("seedHost") Optional seedHost, + @QueryParam("keyspace") Optional keyspace, + @QueryParam("table") Optional table, + @QueryParam("startRepair") Optional startRepair, + @QueryParam("owner") Optional owner, + @QueryParam("cause") Optional cause) { LOG.info("add table called with: clusterName = {}, seedHost = {}, keyspace = {}, table = {}, " + "owner = {}, cause = {}", clusterName, seedHost, keyspace, table, owner, cause); @@ -108,7 +110,8 @@ public Response addTable(@Context UriInfo uriInfo, } catch (ReaperException e) { e.printStackTrace(); return Response.status(400) - .entity("failed creating cluster with seed host: " + seedHost.get()).build(); + .entity("failed creating cluster with seed host: " + seedHost.get()) + .build(); } Cluster existingCluster = storage.getCluster(targetCluster.getName()); if (existingCluster == null) { @@ -121,16 +124,16 @@ public Response addTable(@Context UriInfo uriInfo, } else if (clusterName.isPresent()) { targetCluster = storage.getCluster(clusterName.get()); if (null == targetCluster) { - return Response.status(404) - .entity("cluster \"" + clusterName + "\" does not exist").build(); + return Response.status(404).entity("cluster \"" + clusterName + "\" does not exist") + .build(); } } else { - return Response.status(400) - .entity("Query parameter \"clusterName\" or \"seedHost\" required").build(); + return Response.status(400).entity("Query parameter \"clusterName\" or \"seedHost\" required") + .build(); } - String newTablePathPart = targetCluster.getName() + "/" + keyspace.get() - + "/" + table.get(); + String newTablePathPart = + String.format("%s/%s/%s", targetCluster.getName(), keyspace.get(), table.get()); URI createdURI; try { createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newTablePathPart)).toURI(); @@ -147,9 +150,9 @@ public Response addTable(@Context UriInfo uriInfo, if (existingTable == null) { LOG.info("storing new table"); - existingTable = storage.addColumnFamily( - new ColumnFamily.Builder(targetCluster.getName(), keyspace.get(), table.get(), - config.getSegmentCount(), config.getSnapshotRepair())); + ColumnFamily.Builder newCf = new ColumnFamily.Builder(targetCluster.getName(), keyspace.get(), + table.get(), config.getSegmentCount(), config.getSnapshotRepair()); + existingTable = storage.addColumnFamily(newCf); if (existingTable == null) { return Response.status(500) @@ -183,9 +186,9 @@ public Response addTable(@Context UriInfo uriInfo, } if (segments == null || seedHosts.isEmpty()) { - return Response.status(404) - .entity("couldn't connect to any of the seed hosts in cluster \"" - + existingTable.getClusterName() + "\"").build(); + String errMsg = String.format("couldn't connect to any of the seed hosts in cluster \"%s\"", + existingTable.getClusterName()); + return Response.status(404).entity(errMsg).build(); } } catch (ReaperException e) { String errMsg = "failed generating segments for new table: " + existingTable; @@ -194,14 +197,12 @@ public Response addTable(@Context UriInfo uriInfo, return Response.status(400).entity(errMsg).build(); } - RepairRun newRepairRun = - storage.addRepairRun(new RepairRun.Builder(targetCluster.getName(), - existingTable.getId(), - RepairRun.RunState.NOT_STARTED, - DateTime.now(), - config.getRepairIntensity()) - .cause(cause.isPresent() ? cause.get() : "no cause specified") - .owner(owner.get())); + RepairRun.Builder runBuilder = new RepairRun.Builder(targetCluster.getName(), + existingTable.getId(), RepairRun.RunState.NOT_STARTED, DateTime.now(), + config.getRepairIntensity()); + runBuilder.cause(cause.isPresent() ? cause.get() : "no cause specified"); + runBuilder.owner(owner.get()); + RepairRun newRepairRun = storage.addRepairRun(runBuilder); if (newRepairRun == null) { return Response.status(500) .entity("failed creating repair run into Reaper storage for owner: " + owner.get()) @@ -213,10 +214,10 @@ public Response addTable(@Context UriInfo uriInfo, // RepairSegment has a pointer to the RepairRun it lives in. List repairSegments = Lists.newArrayList(); for (RingRange range : segments) { - repairSegments - .add(new RepairSegment.Builder(newRepairRun.getId(), range, - RepairSegment.State.NOT_STARTED) - .columnFamilyId(existingTable.getId())); + RepairSegment.Builder repairSegment = + new RepairSegment.Builder(newRepairRun.getId(), range, RepairSegment.State.NOT_STARTED); + repairSegment.columnFamilyId(existingTable.getId()); + repairSegments.add(repairSegment); } storage.addRepairSegments(repairSegments); @@ -233,8 +234,8 @@ public Response addTable(@Context UriInfo uriInfo, return Response.status(400).entity(errMsg).build(); } - return Response.created(createdRepairRunURI) - .entity(new ColumnFamilyStatus(existingTable)).build(); + return Response.created(createdRepairRunURI).entity(new ColumnFamilyStatus(existingTable)) + .build(); } } diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 1083a242c..376420686 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -121,11 +121,10 @@ public void run() { private void start() { LOG.info("Repairs for repair run #{} starting", repairRunId); RepairRun repairRun = storage.getRepairRun(repairRunId); - storage.updateRepairRun( - repairRun.with() - .runState(RepairRun.RunState.RUNNING) - .startTime(DateTime.now()) - .build(repairRun.getId())); + storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.RUNNING) + .startTime(DateTime.now()) + .build(repairRun.getId())); startNextSegment(); } @@ -135,11 +134,10 @@ private void start() { private void end() { LOG.info("Repairs for repair run #{} done", repairRunId); RepairRun repairRun = storage.getRepairRun(repairRunId); - storage.updateRepairRun( - repairRun.with() - .runState(RepairRun.RunState.DONE) - .endTime(DateTime.now()) - .build(repairRun.getId())); + storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.DONE) + .endTime(DateTime.now()) + .build(repairRun.getId())); } /** @@ -172,8 +170,9 @@ public void handleRunningRepairSegment(RepairSegment running) { // Implies that repair has timed out. assert repairTimeout.isDone(); repairTimeout = null; - storage.updateRepairSegment( - running.with().state(RepairSegment.State.NOT_STARTED).build(running.getId())); + storage.updateRepairSegment(running.with() + .state(RepairSegment.State.NOT_STARTED) + .build(running.getId())); run(); } else { // The repair might not have finished, so let it timeout before resetting its status. @@ -203,19 +202,18 @@ private synchronized void doRepairSegment(RepairSegment next) { return; } - ColumnFamily - columnFamily = + ColumnFamily columnFamily = storage.getColumnFamily(storage.getRepairRun(repairRunId).getColumnFamilyId()); String keyspace = columnFamily.getKeyspaceName(); - List potentialCoordinators = - jmxProxy.tokenRangeToEndpoint(keyspace, - storage.getNextFreeSegment(repairRunId).getTokenRange()); + RingRange tokenRange = storage.getNextFreeSegment(repairRunId).getTokenRange(); + List potentialCoordinators = jmxProxy.tokenRangeToEndpoint(keyspace, tokenRange); if (potentialCoordinators == null) { // This segment has a faulty token range. Abort the entire repair run. RepairRun repairRun = storage.getRepairRun(repairRunId); - storage.updateRepairRun( - repairRun.with().runState(RepairRun.RunState.ERROR).build(repairRun.getId())); + storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.ERROR) + .build(repairRun.getId())); return; } @@ -223,7 +221,7 @@ private synchronized void doRepairSegment(RepairSegment next) { try { jmxProxy.close(); jmxConnection = JmxProxy.connect(Optional.of(this), - potentialCoordinators.get(0)); + potentialCoordinators.get(0)); } catch (ReaperException e) { e.printStackTrace(); executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SECONDS, TimeUnit.SECONDS); @@ -233,19 +231,20 @@ private synchronized void doRepairSegment(RepairSegment next) { currentSegmentId = next.getId(); repairTimeout = executor.schedule(this, repairTimeoutMins, TimeUnit.MINUTES); // TODO: ensure that no repair is already running (abort all repairs) - currentCommandId = jmxConnection - .triggerRepair(next.getStartToken(), next.getEndToken(), keyspace, columnFamily.getName()); + currentCommandId = jmxConnection.triggerRepair(next.getStartToken(), next.getEndToken(), + keyspace, columnFamily.getName()); LOG.debug("Triggered repair with command id {}", currentCommandId); - storage.updateRepairSegment( - next.with().state(RepairSegment.State.RUNNING).repairCommandId(currentCommandId) - .build(currentSegmentId)); + storage.updateRepairSegment(next.with() + .state(RepairSegment.State.RUNNING) + .repairCommandId(currentCommandId) + .build(currentSegmentId)); } @Override public synchronized void handle(int repairNumber, ActiveRepairService.Status status, - String message) { + String message) { LOG.debug("handle called with repairRunId {}, repairNumber {} and status {}", repairRunId, - repairNumber, status); + repairNumber, status); if (repairNumber != currentCommandId) { LOG.warn("Repair run id != current command id. {} != {}", repairNumber, currentCommandId); // bj0rn: Should this ever be allowed to happen? Perhaps shut down Reaper, because repairs @@ -270,20 +269,20 @@ public synchronized void handle(int repairNumber, ActiveRepairService.Status sta break; case SESSION_FAILED: { // Bj0rn: How should we handle this? Here, it's almost treated like a success. - RepairSegment - updatedSegment = - currentSegment.with().state(RepairSegment.State.ERROR).endTime(DateTime.now()) - .build(currentSegmentId); + RepairSegment updatedSegment = currentSegment.with() + .state(RepairSegment.State.ERROR) + .endTime(DateTime.now()) + .build(currentSegmentId); storage.updateRepairSegment(updatedSegment); closeRepairCommand(); executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS); } break; case FINISHED: { - RepairSegment - updatedSegment = - currentSegment.with().state(RepairSegment.State.DONE).endTime(DateTime.now()) - .build(currentSegmentId); + RepairSegment updatedSegment = currentSegment.with() + .state(RepairSegment.State.DONE) + .endTime(DateTime.now()) + .build(currentSegmentId); storage.updateRepairSegment(updatedSegment); closeRepairCommand(); executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS); @@ -306,7 +305,7 @@ boolean repairIsTriggered() { */ void closeRepairCommand() { LOG.debug("Closing repair command with commandId {} and segmentId {} in repair run {}", - currentCommandId, currentSegmentId, repairRunId); + currentCommandId, currentSegmentId, repairRunId); assert repairTimeout != null; repairTimeout.cancel(false); @@ -330,8 +329,9 @@ void closeRepairCommand() { long intensityBasedDelayMillis(RepairSegment repairSegment) { RepairRun repairRun = storage.getRepairRun(repairRunId); assert repairSegment.getEndTime() != null && repairSegment.getStartTime() != null; - long repairDuration = - repairSegment.getEndTime().getMillis() - repairSegment.getStartTime().getMillis(); + long repairEnd = repairSegment.getEndTime().getMillis(); + long repairStart = repairSegment.getStartTime().getMillis(); + long repairDuration = repairEnd - repairStart; long delay = (long) (repairDuration / repairRun.getIntensity() - repairDuration); LOG.debug("Scheduling next runner run() with delay {} ms", delay); return delay; diff --git a/src/main/java/com/spotify/reaper/service/RingRange.java b/src/main/java/com/spotify/reaper/service/RingRange.java index 2184c2e62..2b53cfe90 100644 --- a/src/main/java/com/spotify/reaper/service/RingRange.java +++ b/src/main/java/com/spotify/reaper/service/RingRange.java @@ -45,13 +45,13 @@ public boolean encloses(RingRange other) { // TODO: unit test for this if (SegmentGenerator.lowerThanOrEqual(start, end)) { return SegmentGenerator.greaterThanOrEqual(other.start, start) && - SegmentGenerator.lowerThanOrEqual(other.end, end); + SegmentGenerator.lowerThanOrEqual(other.end, end); } else if (SegmentGenerator.lowerThanOrEqual(other.start, other.end)) { return SegmentGenerator.greaterThanOrEqual(other.start, start) || - SegmentGenerator.lowerThanOrEqual(other.end, end); + SegmentGenerator.lowerThanOrEqual(other.end, end); } else { return SegmentGenerator.greaterThanOrEqual(other.start, start) && - SegmentGenerator.lowerThanOrEqual(other.end, end); + SegmentGenerator.lowerThanOrEqual(other.end, end); } } diff --git a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java index b6ff424b3..b4d68b2ae 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java +++ b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java @@ -69,11 +69,11 @@ public List generateSegments(int totalSegmentCount, List if (!inRange(start) || !inRange(stop)) { throw new ReaperException(String.format("Tokens (%s,%s) not in range of %s", - start, stop, partitioner)); + start, stop, partitioner)); } if (start.equals(stop) && tokenRangeCount != 1) { throw new ReaperException(String.format("Tokens (%s,%s): two nodes have the same token", - start, stop)); + start, stop)); } BigInteger rangeSize = stop.subtract(start); @@ -87,20 +87,20 @@ public List generateSegments(int totalSegmentCount, List BigInteger[] segmentCountAndRemainder = rangeSize.multiply(BigInteger.valueOf(totalSegmentCount)).divideAndRemainder(RANGE_SIZE); int segmentCount = segmentCountAndRemainder[0].intValue() + - (segmentCountAndRemainder[1].equals(BigInteger.ZERO) ? 0 : 1); + (segmentCountAndRemainder[1].equals(BigInteger.ZERO) ? 0 : 1); LOG.info("Dividing token range [{},{}) into {} segments", start, stop, segmentCount); // Make a list of all the endpoints for the repair segments, including both start and stop List endpointTokens = Lists.newArrayList(); for (int j = 0; j <= segmentCount; j++) { - BigInteger reaperToken = - start.add( - rangeSize - .multiply(BigInteger.valueOf(j)) - .divide(BigInteger.valueOf(segmentCount))); - if (greaterThan(reaperToken, RANGE_MAX)) + BigInteger offset = rangeSize + .multiply(BigInteger.valueOf(j)) + .divide(BigInteger.valueOf(segmentCount)); + BigInteger reaperToken = start.add(offset); + if (greaterThan(reaperToken, RANGE_MAX)) { reaperToken = reaperToken.subtract(RANGE_SIZE); + } endpointTokens.add(reaperToken); } @@ -108,7 +108,7 @@ public List generateSegments(int totalSegmentCount, List for (int j = 0; j < segmentCount; j++) { repairSegments.add(new RingRange(endpointTokens.get(j), endpointTokens.get(j + 1))); LOG.debug("Segment #{}: [{},{})", j + 1, endpointTokens.get(j), - endpointTokens.get(j + 1)); + endpointTokens.get(j + 1)); } } diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index c76c0965d..5f6d2f1db 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -63,8 +63,7 @@ public TableName(String cluster, String keyspace, String table) { @Override public boolean equals(Object other) { - return - other instanceof TableName && + return other instanceof TableName && cluster.equals(((TableName) other).cluster) && keyspace.equals(((TableName) other).keyspace) && table.equals(((TableName) other).table); @@ -159,10 +158,9 @@ public ColumnFamily addColumnFamily(ColumnFamily.Builder columnFamily) { if (existing == null) { ColumnFamily newColumnFamily = columnFamily.build(COLUMN_FAMILY_ID.incrementAndGet()); columnFamilies.put(newColumnFamily.getId(), newColumnFamily); - columnFamiliesByName - .put(new TableName(newColumnFamily.getClusterName(), - newColumnFamily.getKeyspaceName(), - newColumnFamily.getName()), newColumnFamily); + TableName tableName = new TableName(newColumnFamily.getClusterName(), + newColumnFamily.getKeyspaceName(), newColumnFamily.getName()); + columnFamiliesByName.put(tableName, newColumnFamily); return newColumnFamily; } else { return null; @@ -196,8 +194,9 @@ public boolean updateRepairSegment(RepairSegment newRepairSegment) { return false; } else { repairSegments.put(newRepairSegment.getId(), newRepairSegment); - repairSegmentsByRunId.get(newRepairSegment.getRunId()) - .put(newRepairSegment.getId(), newRepairSegment); + LinkedHashMap updatedSegment = + repairSegmentsByRunId.get(newRepairSegment.getRunId()); + updatedSegment.put(newRepairSegment.getId(), newRepairSegment); return true; } } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index af1a4c588..223723131 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -253,8 +253,8 @@ public RepairSegment getNextFreeSegment(long runId) { public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { RepairSegment result; try (Handle h = jdbi.open()) { - result = getPostgresStorage(h).getNextFreeRepairSegmentOnRange(runId, range.getStart(), - range.getEnd()); + result = getPostgresStorage(h) + .getNextFreeRepairSegmentOnRange(runId, range.getStart(), range.getEnd()); } return result; } @@ -264,8 +264,8 @@ public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { public RepairSegment getTheRunningSegment(long runId) { RepairSegment result = null; try (Handle h = jdbi.open()) { - Collection segments = - getPostgresStorage(h).getRepairSegmentForRunWithState(runId, RepairSegment.State.RUNNING); + Collection segments = getPostgresStorage(h) + .getRepairSegmentForRunWithState(runId, RepairSegment.State.RUNNING); if (null != segments) { assert segments.size() < 2 : "there are more than one RUNNING segment on run: " + runId; if (segments.size() == 1) { diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java index c91e657c1..f1f03efd8 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java @@ -29,7 +29,7 @@ public class ClusterMapper implements ResultSetMapper { public Cluster map(int index, ResultSet r, StatementContext ctx) throws SQLException { String[] seedHosts = (String[]) r.getArray("seed_hosts").getArray(); return new Cluster(r.getString("name"), r.getString("partitioner"), - Sets.newHashSet(Arrays.asList(seedHosts))); + Sets.newHashSet(Arrays.asList(seedHosts))); } } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java index 56f06c7d0..e41aa4d57 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java @@ -24,12 +24,10 @@ public class ColumnFamilyMapper implements ResultSetMapper { public ColumnFamily map(int index, ResultSet r, StatementContext ctx) throws SQLException { - return new ColumnFamily.Builder(r.getString("cluster_name"), - r.getString("keyspace_name"), - r.getString("name"), - r.getInt("segment_count"), - r.getBoolean("snapshot_repair")) - .build(r.getLong("id")); + ColumnFamily.Builder builder = new ColumnFamily.Builder(r.getString("cluster_name"), + r.getString("keyspace_name"), r.getString("name"), r.getInt("segment_count"), + r.getBoolean("snapshot_repair")); + return builder.build(r.getLong("id")); } } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java index 293157532..54c93498c 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/PostgresArrayArgumentFactory.java @@ -34,11 +34,11 @@ public boolean accepts(Class expectedType, Object value, StatementContext ctx @Override public Argument build(Class expectedType, final Collection value, - StatementContext ctx) { + StatementContext ctx) { return new Argument() { public void apply(int position, - PreparedStatement statement, - StatementContext ctx) throws SQLException { + PreparedStatement statement, + StatementContext ctx) throws SQLException { Array sqlArray = ctx.getConnection().createArrayOf("text", value.toArray()); statement.setArray(position, sqlArray); } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java index 17c4d4436..1ad93267c 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java @@ -27,11 +27,10 @@ public class RepairRunMapper implements ResultSetMapper { public RepairRun map(int index, ResultSet r, StatementContext ctx) throws SQLException { RepairRun.RunState runState = RepairRun.RunState.valueOf(r.getString("state")); - return new RepairRun.Builder(r.getString("cluster_name"), - r.getLong("column_family_id"), - runState, - getDateTimeOrNull(r, "creation_time"), - r.getFloat("intensity")) + RepairRun.Builder repairRunBuilder = new RepairRun.Builder(r.getString("cluster_name"), + r.getLong("column_family_id"), runState, getDateTimeOrNull(r, "creation_time"), + r.getFloat("intensity")); + return repairRunBuilder .owner(r.getString("owner")) .cause(r.getString("cause")) .startTime(getDateTimeOrNull(r, "start_time")) @@ -39,7 +38,7 @@ public RepairRun map(int index, ResultSet r, StatementContext ctx) throws SQLExc .build(r.getLong("id")); } - static final DateTime getDateTimeOrNull(ResultSet r, String columnName) + static DateTime getDateTimeOrNull(ResultSet r, String columnName) throws SQLException { Timestamp timestamp = r.getTimestamp(columnName); DateTime result = null; diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java index a4759bc8e..8999e66dd 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java @@ -25,10 +25,13 @@ public class RepairSegmentMapper implements ResultSetMapper { public RepairSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException { - return new RepairSegment.Builder(r.getLong("run_id"), - new RingRange(r.getBigDecimal("start_token").toBigInteger(), - r.getBigDecimal("end_token").toBigInteger()), - RepairSegment.State.values()[r.getInt("state")]) + RingRange range = new RingRange(r.getBigDecimal("start_token").toBigInteger(), + r.getBigDecimal("end_token").toBigInteger()); + RepairSegment.Builder repairSegmentBuilder = + new RepairSegment.Builder(r.getLong("run_id"), + range, + RepairSegment.State.values()[r.getInt("state")]); + return repairSegmentBuilder .columnFamilyId(r.getLong("column_family_id")) .startTime(RepairRunMapper.getDateTimeOrNull(r, "start_time")) .endTime(RepairRunMapper.getDateTimeOrNull(r, "end_time")) diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java index 80b5cb528..cc935977b 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RunStateArgumentFactory.java @@ -34,11 +34,10 @@ public boolean accepts(Class expectedType, Object value, StatementContext ctx @Override public Argument build(Class expectedType, final RepairRun.RunState value, - StatementContext ctx) { + StatementContext ctx) { return new Argument() { - public void apply(int position, - PreparedStatement statement, - StatementContext ctx) throws SQLException { + public void apply(int position, PreparedStatement statement, StatementContext ctx) + throws SQLException { statement.setString(position, value.toString()); } }; diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java b/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java index 3d7f3715b..1d189798c 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/StateArgumentFactory.java @@ -21,11 +21,10 @@ public boolean accepts(Class expectedType, Object value, StatementContext ctx @Override public Argument build(Class expectedType, final RepairSegment.State value, - StatementContext ctx) { + StatementContext ctx) { return new Argument() { - public void apply(int position, - PreparedStatement statement, - StatementContext ctx) throws SQLException { + public void apply(int position, PreparedStatement statement, StatementContext ctx) + throws SQLException { statement.setInt(position, value.ordinal()); } }; From d632a50b23d1e602145e796651fe21309f0ae830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Tue, 13 Jan 2015 12:38:22 +0100 Subject: [PATCH 2/3] Pass runId to addRepairSegments --- .../java/com/spotify/reaper/resources/TableResource.java | 2 +- src/main/java/com/spotify/reaper/storage/IStorage.java | 2 +- src/main/java/com/spotify/reaper/storage/MemoryStorage.java | 4 ++-- .../java/com/spotify/reaper/storage/PostgresStorage.java | 2 +- .../java/com/spotify/reaper/service/RepairRunnerTest.java | 6 +++++- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index f29cec9c6..50963f03b 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -219,7 +219,7 @@ public Response addTable( repairSegment.columnFamilyId(existingTable.getId()); repairSegments.add(repairSegment); } - storage.addRepairSegments(repairSegments); + storage.addRepairSegments(repairSegments, newRepairRun.getId()); RepairRunner.startNewRepairRun(storage, newRepairRun.getId()); diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 511dc4108..bbafa6f7c 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -54,7 +54,7 @@ public interface IStorage { ColumnFamily getColumnFamily(String cluster, String keyspace, String table); - void addRepairSegments(Collection newSegments); + void addRepairSegments(Collection newSegments, long runId); boolean updateRepairSegment(RepairSegment newRepairSegment); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 5f6d2f1db..d40be7099 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -178,14 +178,14 @@ public ColumnFamily getColumnFamily(String cluster, String keyspace, String tabl } @Override - public void addRepairSegments(Collection segments) { + public void addRepairSegments(Collection segments, long runId) { LinkedHashMap newSegments = Maps.newLinkedHashMap(); for (RepairSegment.Builder segment : segments) { RepairSegment newRepairSegment = segment.build(SEGMENT_ID.incrementAndGet()); repairSegments.put(newRepairSegment.getId(), newRepairSegment); newSegments.put(newRepairSegment.getId(), newRepairSegment); } - repairSegmentsByRunId.put(newSegments.values().iterator().next().getRunId(), newSegments); + repairSegmentsByRunId.put(runId, newSegments); } @Override diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index 223723131..1f26729d2 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -207,7 +207,7 @@ public ColumnFamily getColumnFamily(String clusterName, String keyspaceName, Str } @Override - public void addRepairSegments(Collection newSegments) { + public void addRepairSegments(Collection newSegments, long runId) { List insertableSegments = new ArrayList<>(); for (RepairSegment.Builder segment : newSegments) { insertableSegments.add(segment.build(-1)); diff --git a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java index 8697bb328..8bcd61385 100644 --- a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java @@ -1,6 +1,7 @@ package com.spotify.reaper.service; import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; import org.joda.time.DateTime; @@ -8,6 +9,8 @@ import org.junit.Before; import org.junit.Test; +import java.util.Collections; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -34,6 +37,7 @@ public void runTest() throws InterruptedException { RepairRun.Builder runBuilder = new RepairRun.Builder("TestCluster", CF_ID, RepairRun.RunState.NOT_STARTED, DateTime.now(), INTENSITY); storage.addRepairRun(runBuilder); + storage.addRepairSegments(Collections.emptySet(), RUN_ID); // start the repair DateTimeUtils.setCurrentMillisFixed(TIME_START); @@ -49,7 +53,7 @@ public void runTest() throws InterruptedException { // end the repair DateTimeUtils.setCurrentMillisFixed(TIME_END); RepairRun run = storage.getRepairRun(RUN_ID); - storage.updateRepairRun(run.with().runState(RepairRun.RunState.DONE).build(RUN_ID)); + storage.updateRepairRun(run.with().runState(RepairRun.RunState.RUNNING).build(RUN_ID)); RepairRunner.startNewRepairRun(storage, RUN_ID); Thread.sleep(200); From 31ded88006157b7a10540be11089187f8204845d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Tue, 13 Jan 2015 12:52:49 +0100 Subject: [PATCH 3/3] Remove redundancy in a test --- .../spotify/reaper/service/RepairRunnerTest.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java index 8bcd61385..7f1f19139 100644 --- a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java @@ -24,13 +24,12 @@ public void setUp() throws Exception { } @Test - public void runTest() throws InterruptedException { + public void noSegmentsTest() throws InterruptedException { final int RUN_ID = 1; final int CF_ID = 1; final double INTENSITY = 0.5f; final long TIME_CREATION = 41l; final long TIME_START = 42l; - final long TIME_END = 43l; // place a dummy repair run into the storage DateTimeUtils.setCurrentMillisFixed(TIME_CREATION); @@ -50,17 +49,10 @@ public void runTest() throws InterruptedException { assertNotNull(startTime); assertEquals(TIME_START, startTime.getMillis()); - // end the repair - DateTimeUtils.setCurrentMillisFixed(TIME_END); - RepairRun run = storage.getRepairRun(RUN_ID); - storage.updateRepairRun(run.with().runState(RepairRun.RunState.RUNNING).build(RUN_ID)); - RepairRunner.startNewRepairRun(storage, RUN_ID); - Thread.sleep(200); - - // check if the end time was properly set + // end time will also be set immediately DateTime endTime = storage.getRepairRun(RUN_ID).getEndTime(); assertNotNull(endTime); - assertEquals(TIME_END, endTime.getMillis()); + assertEquals(TIME_START, endTime.getMillis()); } }