From 143e45777be00eda16e6f946da87f59eabbad59c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Fri, 12 Dec 2014 18:55:12 +0100 Subject: [PATCH] A new type RingRange represents token ranges --- .../spotify/reaper/core/RepairSegment.java | 50 ++++++--------- .../reaper/resources/TableResource.java | 17 ++++-- .../spotify/reaper/service/RepairRunner.java | 13 ++-- .../com/spotify/reaper/service/RingRange.java | 61 +++++++++++++++++++ .../reaper/service/SegmentGenerator.java | 17 ++---- .../com/spotify/reaper/storage/IStorage.java | 5 +- .../spotify/reaper/storage/MemoryStorage.java | 25 ++------ .../reaper/storage/PostgresStorage.java | 9 +-- .../postgresql/RepairSegmentMapper.java | 8 ++- .../reaper/service/SegmentGeneratorTest.java | 10 +-- 10 files changed, 126 insertions(+), 89 deletions(-) create mode 100644 src/main/java/com/spotify/reaper/service/RingRange.java diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index 26d619132..a8d516147 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -13,6 +13,8 @@ */ package com.spotify.reaper.core; +import com.spotify.reaper.service.RingRange; + import org.joda.time.DateTime; import java.math.BigInteger; @@ -23,8 +25,7 @@ public class RepairSegment { private final Integer repairCommandId; // received when triggering repair in Cassandra private final long columnFamilyId; private final long runId; - private final BigInteger startToken; // open - private final BigInteger endToken; // closed + private final RingRange tokenRange; private final State state; private final DateTime startTime; private final DateTime endTime; @@ -45,12 +46,8 @@ public long getRunId() { return runId; } - public BigInteger getStartToken() { - return startToken; - } - - public BigInteger getEndToken() { - return endToken; + public RingRange getTokenRange() { + return tokenRange; } public State getState() { @@ -68,12 +65,11 @@ public DateTime getEndTime() { public static RepairSegment getCopy(RepairSegment origSegment, State newState, int newRepairCommandId, DateTime newStartTime, DateTime newEndTime) { - return new Builder(origSegment.getStartToken(), - origSegment.getEndToken(), newState) + return new Builder(origSegment.getRunId(), origSegment.getTokenRange(), newState) .columnFamilyId(origSegment.getColumnFamilyId()) .repairCommandId(newRepairCommandId) .startTime(newStartTime) - .endTime(newEndTime).build(origSegment.getRunId(), origSegment.getId()); + .endTime(newEndTime).build(origSegment.getId()); } public enum State { @@ -83,13 +79,12 @@ public enum State { DONE } - private RepairSegment(Builder builder, long runId, long id) { + private RepairSegment(Builder builder,long id) { this.id = id; this.repairCommandId = builder.repairCommandId; this.columnFamilyId = builder.columnFamilyId; - this.runId = runId; - this.startToken = builder.startToken; - this.endToken = builder.endToken; + this.runId = builder.runId; + this.tokenRange = builder.tokenRange; this.state = builder.state; this.startTime = builder.startTime; this.endTime = builder.endTime; @@ -97,17 +92,17 @@ private RepairSegment(Builder builder, long runId, long id) { public static class Builder { - public final BigInteger startToken; - public final BigInteger endToken; + public final long runId; + public final RingRange tokenRange; public final State state; private long columnFamilyId; private int repairCommandId; private DateTime startTime; private DateTime endTime; - public Builder(BigInteger startToken, BigInteger endToken, State state) { - this.startToken = startToken; - this.endToken = endToken; + public Builder(long runId, RingRange tokenRange, State state) { + this.runId = runId; + this.tokenRange = tokenRange; this.state = state; } @@ -131,19 +126,8 @@ public Builder endTime(DateTime endTime) { return this; } - public RepairSegment build(long runId, long id) { - return new RepairSegment(this, runId, id); + public RepairSegment build(long id) { + return new RepairSegment(this, id); } - - @Override - public String toString() { - return String.format("(%s,%s]", startToken.toString(), endToken.toString()); - } - } - - public String toString() { - return String.format("(%s,%s]", - startToken.toString(), - endToken.toString()); } } diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index b43f88ae0..885d9573d 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -14,6 +14,7 @@ package com.spotify.reaper.resources; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; @@ -23,6 +24,7 @@ import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.service.RepairRunner; +import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentGenerator; import com.spotify.reaper.storage.IStorage; @@ -161,7 +163,7 @@ public Response addTable(@Context UriInfo uriInfo, } // create segments - List segments = null; + List segments = null; String usedSeedHost = null; try { SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner()); @@ -170,9 +172,7 @@ public Response addTable(@Context UriInfo uriInfo, try { JmxProxy jmxProxy = JmxProxy.connect(host); List tokens = jmxProxy.getTokens(); - segments = sg.generateSegments(existingTable.getSegmentCount(), - tokens, - existingTable); + segments = sg.generateSegments(existingTable.getSegmentCount(), tokens); jmxProxy.close(); usedSeedHost = host; break; @@ -208,7 +208,14 @@ public Response addTable(@Context UriInfo uriInfo, // Notice that our RepairRun core object doesn't contain pointer to // the set of RepairSegments in the run, as they are accessed separately. // RepairSegment has a pointer to the RepairRun it lives in. - storage.addRepairSegments(newRepairRun.getId(), segments); + List repairSegments = Lists.newArrayList(); + for (RingRange range : segments) { + repairSegments + .add(new RepairSegment.Builder(newRepairRun.getId(), range, + RepairSegment.State.NOT_STARTED) + .columnFamilyId(existingTable.getId())); + } + storage.addRepairSegments(repairSegments); RepairRunner.startNewRepairRun(storage, newRepairRun, usedSeedHost); diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index e527ca188..e7e8641df 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -84,8 +84,8 @@ public static void startNewRepairRun(IStorage storage, RepairRun repairRun, */ @Override public void run() { - LOG.debug("RepairRunner run on RepairRun \"{}\" with start token \"{}\"", - repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getStartToken()); + LOG.debug("RepairRunner run on RepairRun \"{}\" with token range \"{}\"", + repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getTokenRange()); if (!checkJmxProxyInitialized()) { LOG.error("failed to initialize JMX proxy, retrying after {} seconds", @@ -181,8 +181,8 @@ private void checkIfNeedToStartNextSegment() { return; } else if (currentSegment.getState() == RepairSegment.State.NOT_STARTED && DateTime.now().isAfter(startNextSegmentEarliest)) { - LOG.info("triggering repair on segment {} with start token {} on run id {}", - currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId()); + LOG.info("triggering repair on segment #{} with token range {} on run id {}", + currentSegment.getId(), currentSegment.getTokenRange(), repairRun.getId()); newRepairCommandId = triggerRepair(currentSegment); } else if (currentSegment.getState() == RepairSegment.State.DONE) { LOG.warn("segment {} repair completed for run {}", @@ -223,8 +223,9 @@ private void checkIfNeedToStartNextSegment() { private int triggerRepair(RepairSegment segment) { ColumnFamily columnFamily = this.storage.getColumnFamily(segment.getColumnFamilyId()); - return this.jmxProxy.triggerRepair(segment.getStartToken(), segment.getEndToken(), - columnFamily.getKeyspaceName(), columnFamily.getName()); + return this.jmxProxy + .triggerRepair(segment.getTokenRange().getStart(), segment.getTokenRange().getEnd(), + columnFamily.getKeyspaceName(), columnFamily.getName()); } private void changeCurrentRepairRunState(RepairRun.RunState newRunState) { diff --git a/src/main/java/com/spotify/reaper/service/RingRange.java b/src/main/java/com/spotify/reaper/service/RingRange.java new file mode 100644 index 000000000..b8ab3a3f7 --- /dev/null +++ b/src/main/java/com/spotify/reaper/service/RingRange.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.spotify.reaper.service; + +import java.math.BigInteger; + +public class RingRange { + private final BigInteger start; + private final BigInteger end; + + public RingRange(BigInteger start, BigInteger end) { + this.start = start; + this.end = end; + } + + public BigInteger getStart() { + return start; + } + + public BigInteger getEnd() { + return end; + } + + public BigInteger span(BigInteger ringSize) { + if (SegmentGenerator.greaterThanOrEqual(start, end)) { + return end.subtract(start).add(ringSize); + } else { + return end.subtract(start); + } + } + + public boolean encloses(RingRange other) { + // TODO: unit test for this + if (SegmentGenerator.lowerThanOrEqual(start, end)) { + return SegmentGenerator.greaterThanOrEqual(other.start, start) && + SegmentGenerator.lowerThanOrEqual(other.end, end); + } else if (SegmentGenerator.lowerThanOrEqual(other.start, other.end)) { + return SegmentGenerator.greaterThanOrEqual(other.start, start) || + SegmentGenerator.lowerThanOrEqual(other.end, end); + } else { + return SegmentGenerator.greaterThanOrEqual(other.start, start) && + SegmentGenerator.lowerThanOrEqual(other.end, end); + } + } + + @Override + public String toString() { + return String.format("(%s,%s]", start.toString(), end.toString()); + } +} diff --git a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java index 69504fb6d..da33b7c36 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java +++ b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java @@ -33,7 +33,6 @@ public class SegmentGenerator { private static final Logger LOG = LoggerFactory.getLogger(SegmentGenerator.class); private final String partitioner; - private final BigInteger MIN_SEGMENT_SIZE = new BigInteger("100"); private BigInteger RANGE_MIN; private BigInteger RANGE_MAX; private BigInteger RANGE_SIZE; @@ -61,13 +60,12 @@ public SegmentGenerator(String partitioner) throws ReaperException { * @param ringTokens list of all start tokens in a cluster. They have to be in ring order. * @return a list containing at least {@code totalSegmentCount} repair segments. */ - public List generateSegments(int totalSegmentCount, - List ringTokens, - ColumnFamily table) + public List generateSegments(int totalSegmentCount, + List ringTokens) throws ReaperException { int tokenRangeCount = ringTokens.size(); - List repairSegments = Lists.newArrayList(); + List repairSegments = Lists.newArrayList(); for (int i = 0; i < tokenRangeCount; i++) { BigInteger start = ringTokens.get(i); BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); @@ -111,10 +109,7 @@ public List generateSegments(int totalSegmentCount, // Append the segments between the endpoints for (int j = 0; j < segmentCount; j++) { - repairSegments.add(new RepairSegment.Builder(endpointTokens.get(j), - endpointTokens.get(j + 1), - RepairSegment.State.NOT_STARTED) - .columnFamilyId(table.getId())); + repairSegments.add(new RingRange(endpointTokens.get(j), endpointTokens.get(j + 1))); LOG.debug("Segment #{}: [{},{})", j + 1, endpointTokens.get(j), endpointTokens.get(j + 1)); } @@ -122,8 +117,8 @@ public List generateSegments(int totalSegmentCount, // verify that the whole range is repaired BigInteger total = BigInteger.ZERO; - for (RepairSegment.Builder segment : repairSegments) { - BigInteger size = segment.endToken.subtract(segment.startToken); + for (RingRange segment : repairSegments) { + BigInteger size = segment.span(RANGE_SIZE); if (lowerThan(size, BigInteger.ZERO)) size = size.add(RANGE_SIZE); total = total.add(size); diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 3da525dfc..8c7b919bd 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -17,6 +17,7 @@ import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.service.RingRange; import java.math.BigInteger; import java.util.Collection; @@ -48,7 +49,7 @@ public interface IStorage { ColumnFamily getColumnFamily(String cluster, String keyspace, String table); - int addRepairSegments(long runId, Collection newSegments); + int addRepairSegments(Collection newSegments); boolean updateRepairSegment(RepairSegment newRepairSegment); @@ -56,7 +57,7 @@ public interface IStorage { RepairSegment getNextFreeSegment(long runId); - RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end); + RepairSegment getNextFreeSegmentInRange(long runId, RingRange range); } diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index fdcd592b5..2f50e862e 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -19,6 +19,7 @@ import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentGenerator; import java.math.BigInteger; @@ -157,10 +158,10 @@ public ColumnFamily getColumnFamily(String cluster, String keyspace, String tabl } @Override - public int addRepairSegments(long runId, Collection segments) { + public int addRepairSegments(Collection segments) { LinkedHashMap newSegments = Maps.newLinkedHashMap(); for (RepairSegment.Builder segment : segments) { - RepairSegment newRepairSegment = segment.build(runId, SEGMENT_ID.incrementAndGet()); + RepairSegment newRepairSegment = segment.build(SEGMENT_ID.incrementAndGet()); repairSegments.put(newRepairSegment.getId(), newRepairSegment); newSegments.put(newRepairSegment.getId(), newRepairSegment); } @@ -196,27 +197,11 @@ public RepairSegment getNextFreeSegment(long runId) { return null; } - - public static boolean encloses(BigInteger rangeStart, BigInteger rangeEnd, - BigInteger segmentStart, BigInteger segmentEnd) { - // TODO: unit test for this - if (SegmentGenerator.lowerThanOrEqual(rangeStart, rangeEnd)) { - return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) && - SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd); - } else if (SegmentGenerator.lowerThanOrEqual(segmentStart, segmentEnd)) { - return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) || - SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd); - } else { - return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) && - SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd); - } - } - @Override - public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) { + public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) { if (segment.getState() == RepairSegment.State.NOT_STARTED && - encloses(start, end, segment.getStartToken(), segment.getEndToken())) { + range.encloses(segment.getTokenRange())) { return segment; } } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index d5c9df698..a3d0cd3a7 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -19,6 +19,7 @@ import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.service.RingRange; import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; import org.skife.jdbi.v2.DBI; @@ -152,10 +153,10 @@ public ColumnFamily getColumnFamily(String cluster, String keyspace, String tabl } @Override - public int addRepairSegments(long runId, Collection newSegments) { + public int addRepairSegments(Collection newSegments) { List insertableSegments = new ArrayList<>(); for (RepairSegment.Builder segment : newSegments) { - insertableSegments.add(segment.build(runId, -1)); + insertableSegments.add(segment.build(-1)); } Handle h = jdbi.open(); IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); @@ -190,10 +191,10 @@ public RepairSegment getNextFreeSegment(long runId) { } @Override - public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) { + public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { Handle h = jdbi.open(); IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); - RepairSegment result = postgres.getNextFreeRepairSegmentOnRange(runId, start, end); + RepairSegment result = postgres.getNextFreeRepairSegmentOnRange(runId, range.getStart(), range.getEnd()); h.close(); return result; } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java index 2e3152b6e..f9e717e92 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairSegmentMapper.java @@ -1,6 +1,7 @@ package com.spotify.reaper.storage.postgresql; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.service.RingRange; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.ResultSetMapper; @@ -11,13 +12,14 @@ public class RepairSegmentMapper implements ResultSetMapper { public RepairSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException { - return new RepairSegment.Builder(r.getBigDecimal("start_token").toBigInteger(), - r.getBigDecimal("end_token").toBigInteger(), + return new RepairSegment.Builder(r.getLong("run_id"), + new RingRange(r.getBigDecimal("start_token").toBigInteger(), + r.getBigDecimal("end_token").toBigInteger()), RepairSegment.State.values()[r.getInt("state")]) .columnFamilyId(r.getLong("column_family_id")) .startTime(RepairRunMapper.getDateTimeOrNull(r, "start_time")) .endTime(RepairRunMapper.getDateTimeOrNull(r, "end_time")) - .build(r.getLong("run_id"),r.getLong("id")); + .build(r.getLong("id")); } } diff --git a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java b/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java index 6a349b333..4d8daac76 100644 --- a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java +++ b/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java @@ -47,7 +47,7 @@ public BigInteger apply(@Nullable String s) { ); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - List segments = generator.generateSegments(10, tokens, null); + List segments = generator.generateSegments(10, tokens); assertEquals(15, segments.size()); assertEquals("(0,1]", segments.get(0).toString()); @@ -71,7 +71,7 @@ public BigInteger apply(@Nullable String s) { } ); - segments = generator.generateSegments(10, tokens, null); + segments = generator.generateSegments(10, tokens); assertEquals(15, segments.size()); assertEquals("(5,6]", segments.get(0).toString()); @@ -96,7 +96,7 @@ public BigInteger apply(@Nullable String s) { }); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - generator.generateSegments(10, tokens, null); + generator.generateSegments(10, tokens); } @Test @@ -114,7 +114,7 @@ public BigInteger apply(@Nullable String s) { }); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - List segments = generator.generateSegments(10, tokens, null); + List segments = generator.generateSegments(10, tokens); assertEquals(15, segments.size()); assertEquals("(113427455640312821154458202477256070484,113427455640312821154458202477256070485]", segments.get(4).toString()); @@ -139,7 +139,7 @@ public BigInteger apply(@Nullable String s) { }); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - generator.generateSegments(10, tokens, null); + generator.generateSegments(10, tokens); // Will throw an exception when concluding that the repair segments don't add up. // This is because the tokens were supplied out of order. }