Skip to content

Commit

Permalink
Merge pull request #3 from spotify/Bj0rnen/totalSegmentCount
Browse files Browse the repository at this point in the history
Bj0rnen/total segment count
  • Loading branch information
varjoranta committed Dec 2, 2014
2 parents c19968b + cd0c28a commit 0aa5d70
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 78 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ public class RepairSegment {
private Long id;
private final ColumnFamily columnFamily;
private final long runID;
private final BigInteger startToken;
private final BigInteger endToken;
private final BigInteger startToken; // closed/inclusive
private final BigInteger endToken; // open/exclusive
private final State state;
private final DateTime startTime;
private final DateTime endTime;
Expand Down Expand Up @@ -128,6 +128,6 @@ public RepairSegment build() {

@Override
public String toString() {
return String.format("(%s,%s)", startToken.toString(), endToken.toString());
return String.format("[%s,%s)", startToken.toString(), endToken.toString());
}
}
104 changes: 70 additions & 34 deletions src/main/java/com/spotify/reaper/service/SegmentGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.RepairSegment;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.util.List;

Expand All @@ -15,6 +18,8 @@
*/
public class SegmentGenerator {

private static final Logger LOG = LoggerFactory.getLogger(SegmentGenerator.class);

private final String partitioner;
private final BigInteger MIN_SEGMENT_SIZE = new BigInteger("100");
private BigInteger RANGE_MIN;
Expand All @@ -35,50 +40,81 @@ public SegmentGenerator(String partitioner) throws ReaperException {
this.partitioner = partitioner;
}

public List<RepairSegment> generateSegments(int cnt, List<String> ring) throws ReaperException {
/**
* Given a properly ordered list of tokens, compute at least {@code totalSegmentCount} repair
* segments.
* @param totalSegmentCount requested total amount of repair segments. This function may generate
* more segments.
* @param ringTokens list of all start tokens in a cluster. They have to be in ring order.
* @return a list containing at least {@code totalSegmentCount} repair segments.
* @throws ReaperException
*/
public List<RepairSegment> generateSegments(int totalSegmentCount, List<BigInteger> ringTokens)
throws ReaperException {
int tokenRangeCount = ringTokens.size();

List<RepairSegment> repairSegments = Lists.newArrayList();
int ringSize = ring.size();
BigInteger count = new BigInteger(String.valueOf(cnt));
BigInteger start;
BigInteger stop;
BigInteger next;
BigInteger cur;
BigInteger total = BigInteger.ZERO;
for (int i = 0; i < tokenRangeCount; i++) {
BigInteger start = ringTokens.get(i);
BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount);

for (int i = 0; i < ringSize; i++) {
start = new BigInteger(ring.get(i));
stop = new BigInteger(ring.get((i+1) % ringSize));
if (!inRange(start) || !inRange(stop)) {
throw new ReaperException(String.format("Tokens (%s,%s) not in range of %s",
start, stop, partitioner));
start, stop, partitioner));
}
if (lowerThan(stop, start)) {
if (start.equals(stop)) {
throw new ReaperException(String.format("Tokens (%s,%s): two nodes have the same token",
start, stop));
}

BigInteger rangeSize = stop.subtract(start);
if (lowerThan(rangeSize, BigInteger.ZERO)) {
// wrap around case
stop = stop.add(RANGE_SIZE);
rangeSize = rangeSize.add(RANGE_SIZE);
}
BigInteger segmentSize = stop.subtract(start).divide(count).add(BigInteger.ONE);
BigInteger rangeLength = max(MIN_SEGMENT_SIZE, segmentSize);
cur = start;
while (lowerThan(cur, stop)) {
next = min(stop, cur.add(rangeLength));
BigInteger ocur = cur;
BigInteger onext = next;
if (greaterThan(onext, RANGE_MAX)) {
onext = onext.subtract(RANGE_SIZE);
}
if (greaterThan(ocur, RANGE_MAX)) {
ocur = ocur.subtract(RANGE_SIZE);
}
//repairSegments.add(new RepairSegment(ocur, onext));

// the below, in essence, does this:
// segmentCount = ceiling((rangeSize / RANGE_SIZE) * totalSegmentCount)
BigInteger[] segmentCountAndRemainder =
rangeSize.multiply(BigInteger.valueOf(totalSegmentCount)).divideAndRemainder(RANGE_SIZE);
int segmentCount = segmentCountAndRemainder[0].intValue() +
(segmentCountAndRemainder[1].equals(BigInteger.ZERO) ? 0 : 1);

LOG.info("Dividing token range [{},{}) into {} segments", start, stop, segmentCount);

// Make a list of all the endpoints for the repair segments, including both start and stop
List<BigInteger> endpointTokens = Lists.newArrayList();
for (int j = 0; j <= segmentCount; j++) {
BigInteger reaperToken =
start.add(
rangeSize
.multiply(BigInteger.valueOf(j))
.divide(BigInteger.valueOf(segmentCount)));
if (greaterThan(reaperToken, RANGE_MAX))
reaperToken = reaperToken.subtract(RANGE_SIZE);
endpointTokens.add(reaperToken);
}

// Append the segments between the endpoints
for (int j = 0; j < segmentCount; j++)
{
repairSegments.add(new RepairSegment.RepairSegmentBuilder()
.startToken(ocur)
.endToken(onext)
.startToken(endpointTokens.get(j))
.endToken(endpointTokens.get(j + 1))
.build());
total = total.add(next).subtract(cur);
cur = next;
LOG.debug("Segment #{}: [{},{})", j + 1, endpointTokens.get(j),
endpointTokens.get(j + 1));
}
}

// verify that the whole range is repaired
BigInteger total = BigInteger.ZERO;
for (RepairSegment segment : repairSegments) {
BigInteger size = segment.getEndToken().subtract(segment.getStartToken());
if (lowerThan(size, BigInteger.ZERO))
size = size.add(RANGE_SIZE);
total = total.add(size);
}
if (!total.equals(RANGE_SIZE)) {
throw new ReaperException("Not entire ring would get repaired");
}
Expand All @@ -105,12 +141,12 @@ protected static BigInteger min(BigInteger a, BigInteger b) {

@VisibleForTesting
protected static boolean lowerThan(BigInteger a, BigInteger b) {
return a.compareTo(b) == -1;
return a.compareTo(b) < 0;
}

@VisibleForTesting
protected static boolean greaterThan(BigInteger a, BigInteger b) {
return a.compareTo(b) == 1;
return a.compareTo(b) > 0;
}


Expand Down
144 changes: 103 additions & 41 deletions src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.spotify.reaper.service;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.RepairSegment;
Expand All @@ -8,6 +9,8 @@
import java.math.BigInteger;
import java.util.List;

import javax.annotation.Nullable;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -16,57 +19,116 @@ public class SegmentGeneratorTest {

@Test
public void testGenerateSegments() throws Exception {
List<String> tokens = Lists.newArrayList("0", "1",
"56713727820156410577229101238628035242", "56713727820156410577229101238628035242",
"113427455640312821154458202477256070484", "113427455640312821154458202477256070485");
List<BigInteger> tokens = Lists.transform(
Lists.newArrayList(
"0", "1",
"56713727820156410577229101238628035242", "56713727820156410577229101238628035243",
"113427455640312821154458202477256070484", "113427455640312821154458202477256070485"),
new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
return new BigInteger(s);
}
}
);

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
List<RepairSegment> segments = generator.generateSegments(3, tokens);
assertEquals(11, segments.size());
assertEquals("(0,1)", segments.get(0).toString());
assertEquals("(1,18904575940052136859076367079542678415)", segments.get(1).toString());
assertEquals("(18904575940052136859076367079542678415,37809151880104273718152734159085356829)",
segments.get(2).toString());
assertEquals("(37809151880104273718152734159085356829,56713727820156410577229101238628035242)",
segments.get(3).toString());
assertEquals("(56713727820156410577229101238628035242,75618303760208547436305468318170713657)",
segments.get(4).toString());
assertEquals("(75618303760208547436305468318170713657,94522879700260684295381835397713392072)",
List<RepairSegment> segments = generator.generateSegments(10, tokens);
assertEquals(15, segments.size());
assertEquals("[0,1)",
segments.get(0).toString());
assertEquals("[56713727820156410577229101238628035242,56713727820156410577229101238628035243)",
segments.get(5).toString());
assertEquals("(94522879700260684295381835397713392072,113427455640312821154458202477256070484)",
segments.get(6).toString());
assertEquals("(113427455640312821154458202477256070484,113427455640312821154458202477256070485)",
segments.get(7).toString());
assertEquals("(113427455640312821154458202477256070485,132332031580364958013534569556798748900)",
segments.get(8).toString());
assertEquals("(132332031580364958013534569556798748900,151236607520417094872610936636341427315)",
segments.get(9).toString());
assertEquals("(151236607520417094872610936636341427315,0)", segments.get(10).toString());
assertEquals("[113427455640312821154458202477256070484,113427455640312821154458202477256070485)",
segments.get(10).toString());


tokens = Lists.transform(
Lists.newArrayList(
"5", "6",
"56713727820156410577229101238628035242", "56713727820156410577229101238628035243",
"113427455640312821154458202477256070484", "113427455640312821154458202477256070485"),
new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
return new BigInteger(s);
}
}
);

segments = generator.generateSegments(10, tokens);
assertEquals(15, segments.size());
assertEquals("[5,6)",
segments.get(0).toString());
assertEquals("[56713727820156410577229101238628035242,56713727820156410577229101238628035243)",
segments.get(5).toString());
assertEquals("[113427455640312821154458202477256070484,113427455640312821154458202477256070485)",
segments.get(10).toString());
}

tokens = Lists.newArrayList("5", "6",
@Test(expected=ReaperException.class)
public void testZeroSizeRange() throws Exception {
List<String> tokenStrings = Lists.newArrayList(
"0", "1",
"56713727820156410577229101238628035242", "56713727820156410577229101238628035242",
"113427455640312821154458202477256070484", "113427455640312821154458202477256070485");
generator = new SegmentGenerator("foo.bar.RandomPartitioner");
segments = generator.generateSegments(3, tokens);
assertEquals(11, segments.size());
assertEquals("(5,6)", segments.get(0).toString());
assertEquals("(6,18904575940052136859076367079542678419)", segments.get(1).toString());
assertEquals("(151236607520417094872610936636341427319,5)", segments.get(10).toString());

tokens = Lists.newArrayList("-9223372036854775808", "-9223372036854775807",
"-3074457345618258603", "-3074457345618258602", "3074457345618258602", "3074457345618258603");
generator = new SegmentGenerator("foo.bar.Murmur3Partitioner");
segments = generator.generateSegments(3, tokens);
assertEquals(12, segments.size());
List<BigInteger> tokens = Lists.transform(tokenStrings, new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
return new BigInteger(s);
}
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
generator.generateSegments(10, tokens);
}

@Test
public void testRotatedRing() throws Exception {
List<String> tokenStrings = Lists.newArrayList(
"56713727820156410577229101238628035243", "113427455640312821154458202477256070484",
"113427455640312821154458202477256070485", "5",
"6", "56713727820156410577229101238628035242");
List<BigInteger> tokens = Lists.transform(tokenStrings, new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
return new BigInteger(s);
}
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
List<RepairSegment> segments = generator.generateSegments(10, tokens);
assertEquals(15, segments.size());
assertEquals("[113427455640312821154458202477256070484,113427455640312821154458202477256070485)",
segments.get(4).toString());
assertEquals("[5,6)",
segments.get(9).toString());
assertEquals("[56713727820156410577229101238628035242,56713727820156410577229101238628035243)",
segments.get(14).toString());
}

@Test(expected=ReaperException.class)
public void testGenerateSegmentsFail() throws Exception {
List<String> tokens = Lists.newArrayList("-9223372036854775808", "-9223372036854775807",
"-3074457345618258603", "-3074457345618258602",
"3074457345618258602", "3074457345618258603");
public void testDisorderedRing() throws Exception {
List<String> tokenStrings = Lists.newArrayList(
"0", "113427455640312821154458202477256070485", "1",
"56713727820156410577229101238628035242", "56713727820156410577229101238628035243",
"113427455640312821154458202477256070484");
List<BigInteger> tokens = Lists.transform(tokenStrings, new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(@Nullable String s) {
return new BigInteger(s);
}
});

SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner");
generator.generateSegments(3, tokens);
generator.generateSegments(10, tokens);
// Will throw an exception when concluding that the repair segments don't add up.
// This is because the tokens were supplied out of order.
}

@Test
Expand Down

0 comments on commit 0aa5d70

Please sign in to comment.