Skip to content

Commit

Permalink
A new type RingRange represents token ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Dec 12, 2014
1 parent 1f5b7c2 commit 143e457
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 89 deletions.
50 changes: 17 additions & 33 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.spotify.reaper.core;

import com.spotify.reaper.service.RingRange;

import org.joda.time.DateTime;

import java.math.BigInteger;
Expand All @@ -23,8 +25,7 @@ public class RepairSegment {
private final Integer repairCommandId; // received when triggering repair in Cassandra
private final long columnFamilyId;
private final long runId;
private final BigInteger startToken; // open
private final BigInteger endToken; // closed
private final RingRange tokenRange;
private final State state;
private final DateTime startTime;
private final DateTime endTime;
Expand All @@ -45,12 +46,8 @@ public long getRunId() {
return runId;
}

public BigInteger getStartToken() {
return startToken;
}

public BigInteger getEndToken() {
return endToken;
public RingRange getTokenRange() {
return tokenRange;
}

public State getState() {
Expand All @@ -68,12 +65,11 @@ public DateTime getEndTime() {
public static RepairSegment getCopy(RepairSegment origSegment, State newState,
int newRepairCommandId,
DateTime newStartTime, DateTime newEndTime) {
return new Builder(origSegment.getStartToken(),
origSegment.getEndToken(), newState)
return new Builder(origSegment.getRunId(), origSegment.getTokenRange(), newState)
.columnFamilyId(origSegment.getColumnFamilyId())
.repairCommandId(newRepairCommandId)
.startTime(newStartTime)
.endTime(newEndTime).build(origSegment.getRunId(), origSegment.getId());
.endTime(newEndTime).build(origSegment.getId());
}

public enum State {
Expand All @@ -83,31 +79,30 @@ public enum State {
DONE
}

private RepairSegment(Builder builder, long runId, long id) {
private RepairSegment(Builder builder,long id) {
this.id = id;
this.repairCommandId = builder.repairCommandId;
this.columnFamilyId = builder.columnFamilyId;
this.runId = runId;
this.startToken = builder.startToken;
this.endToken = builder.endToken;
this.runId = builder.runId;
this.tokenRange = builder.tokenRange;
this.state = builder.state;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
}

public static class Builder {

public final BigInteger startToken;
public final BigInteger endToken;
public final long runId;
public final RingRange tokenRange;
public final State state;
private long columnFamilyId;
private int repairCommandId;
private DateTime startTime;
private DateTime endTime;

public Builder(BigInteger startToken, BigInteger endToken, State state) {
this.startToken = startToken;
this.endToken = endToken;
public Builder(long runId, RingRange tokenRange, State state) {
this.runId = runId;
this.tokenRange = tokenRange;
this.state = state;
}

Expand All @@ -131,19 +126,8 @@ public Builder endTime(DateTime endTime) {
return this;
}

public RepairSegment build(long runId, long id) {
return new RepairSegment(this, runId, id);
public RepairSegment build(long id) {
return new RepairSegment(this, id);
}

@Override
public String toString() {
return String.format("(%s,%s]", startToken.toString(), endToken.toString());
}
}

public String toString() {
return String.format("(%s,%s]",
startToken.toString(),
endToken.toString());
}
}
17 changes: 12 additions & 5 deletions src/main/java/com/spotify/reaper/resources/TableResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.spotify.reaper.resources;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import com.spotify.reaper.ReaperApplicationConfiguration;
import com.spotify.reaper.ReaperException;
Expand All @@ -23,6 +24,7 @@
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.service.RepairRunner;
import com.spotify.reaper.service.RingRange;
import com.spotify.reaper.service.SegmentGenerator;
import com.spotify.reaper.storage.IStorage;

Expand Down Expand Up @@ -161,7 +163,7 @@ public Response addTable(@Context UriInfo uriInfo,
}

// create segments
List<RepairSegment.Builder> segments = null;
List<RingRange> segments = null;
String usedSeedHost = null;
try {
SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner());
Expand All @@ -170,9 +172,7 @@ public Response addTable(@Context UriInfo uriInfo,
try {
JmxProxy jmxProxy = JmxProxy.connect(host);
List<BigInteger> tokens = jmxProxy.getTokens();
segments = sg.generateSegments(existingTable.getSegmentCount(),
tokens,
existingTable);
segments = sg.generateSegments(existingTable.getSegmentCount(), tokens);
jmxProxy.close();
usedSeedHost = host;
break;
Expand Down Expand Up @@ -208,7 +208,14 @@ public Response addTable(@Context UriInfo uriInfo,
// Notice that our RepairRun core object doesn't contain pointer to
// the set of RepairSegments in the run, as they are accessed separately.
// RepairSegment has a pointer to the RepairRun it lives in.
storage.addRepairSegments(newRepairRun.getId(), segments);
List<RepairSegment.Builder> repairSegments = Lists.newArrayList();
for (RingRange range : segments) {
repairSegments
.add(new RepairSegment.Builder(newRepairRun.getId(), range,
RepairSegment.State.NOT_STARTED)
.columnFamilyId(existingTable.getId()));
}
storage.addRepairSegments(repairSegments);

RepairRunner.startNewRepairRun(storage, newRepairRun, usedSeedHost);

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public static void startNewRepairRun(IStorage storage, RepairRun repairRun,
*/
@Override
public void run() {
LOG.debug("RepairRunner run on RepairRun \"{}\" with start token \"{}\"",
repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getStartToken());
LOG.debug("RepairRunner run on RepairRun \"{}\" with token range \"{}\"",
repairRun.getId(), currentSegment == null ? "n/a" : currentSegment.getTokenRange());

if (!checkJmxProxyInitialized()) {
LOG.error("failed to initialize JMX proxy, retrying after {} seconds",
Expand Down Expand Up @@ -181,8 +181,8 @@ private void checkIfNeedToStartNextSegment() {
return;
} else if (currentSegment.getState() == RepairSegment.State.NOT_STARTED &&
DateTime.now().isAfter(startNextSegmentEarliest)) {
LOG.info("triggering repair on segment {} with start token {} on run id {}",
currentSegment.getId(), currentSegment.getStartToken(), repairRun.getId());
LOG.info("triggering repair on segment #{} with token range {} on run id {}",
currentSegment.getId(), currentSegment.getTokenRange(), repairRun.getId());
newRepairCommandId = triggerRepair(currentSegment);
} else if (currentSegment.getState() == RepairSegment.State.DONE) {
LOG.warn("segment {} repair completed for run {}",
Expand Down Expand Up @@ -223,8 +223,9 @@ private void checkIfNeedToStartNextSegment() {

private int triggerRepair(RepairSegment segment) {
ColumnFamily columnFamily = this.storage.getColumnFamily(segment.getColumnFamilyId());
return this.jmxProxy.triggerRepair(segment.getStartToken(), segment.getEndToken(),
columnFamily.getKeyspaceName(), columnFamily.getName());
return this.jmxProxy
.triggerRepair(segment.getTokenRange().getStart(), segment.getTokenRange().getEnd(),
columnFamily.getKeyspaceName(), columnFamily.getName());
}

private void changeCurrentRepairRunState(RepairRun.RunState newRunState) {
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/spotify/reaper/service/RingRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.spotify.reaper.service;

import java.math.BigInteger;

public class RingRange {
private final BigInteger start;
private final BigInteger end;

public RingRange(BigInteger start, BigInteger end) {
this.start = start;
this.end = end;
}

public BigInteger getStart() {
return start;
}

public BigInteger getEnd() {
return end;
}

public BigInteger span(BigInteger ringSize) {
if (SegmentGenerator.greaterThanOrEqual(start, end)) {
return end.subtract(start).add(ringSize);
} else {
return end.subtract(start);
}
}

public boolean encloses(RingRange other) {
// TODO: unit test for this
if (SegmentGenerator.lowerThanOrEqual(start, end)) {
return SegmentGenerator.greaterThanOrEqual(other.start, start) &&
SegmentGenerator.lowerThanOrEqual(other.end, end);
} else if (SegmentGenerator.lowerThanOrEqual(other.start, other.end)) {
return SegmentGenerator.greaterThanOrEqual(other.start, start) ||
SegmentGenerator.lowerThanOrEqual(other.end, end);
} else {
return SegmentGenerator.greaterThanOrEqual(other.start, start) &&
SegmentGenerator.lowerThanOrEqual(other.end, end);
}
}

@Override
public String toString() {
return String.format("(%s,%s]", start.toString(), end.toString());
}
}
17 changes: 6 additions & 11 deletions src/main/java/com/spotify/reaper/service/SegmentGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class SegmentGenerator {
private static final Logger LOG = LoggerFactory.getLogger(SegmentGenerator.class);

private final String partitioner;
private final BigInteger MIN_SEGMENT_SIZE = new BigInteger("100");
private BigInteger RANGE_MIN;
private BigInteger RANGE_MAX;
private BigInteger RANGE_SIZE;
Expand Down Expand Up @@ -61,13 +60,12 @@ public SegmentGenerator(String partitioner) throws ReaperException {
* @param ringTokens list of all start tokens in a cluster. They have to be in ring order.
* @return a list containing at least {@code totalSegmentCount} repair segments.
*/
public List<RepairSegment.Builder> generateSegments(int totalSegmentCount,
List<BigInteger> ringTokens,
ColumnFamily table)
public List<RingRange> generateSegments(int totalSegmentCount,
List<BigInteger> ringTokens)
throws ReaperException {
int tokenRangeCount = ringTokens.size();

List<RepairSegment.Builder> repairSegments = Lists.newArrayList();
List<RingRange> repairSegments = Lists.newArrayList();
for (int i = 0; i < tokenRangeCount; i++) {
BigInteger start = ringTokens.get(i);
BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount);
Expand Down Expand Up @@ -111,19 +109,16 @@ public List<RepairSegment.Builder> generateSegments(int totalSegmentCount,

// Append the segments between the endpoints
for (int j = 0; j < segmentCount; j++) {
repairSegments.add(new RepairSegment.Builder(endpointTokens.get(j),
endpointTokens.get(j + 1),
RepairSegment.State.NOT_STARTED)
.columnFamilyId(table.getId()));
repairSegments.add(new RingRange(endpointTokens.get(j), endpointTokens.get(j + 1)));
LOG.debug("Segment #{}: [{},{})", j + 1, endpointTokens.get(j),
endpointTokens.get(j + 1));
}
}

// verify that the whole range is repaired
BigInteger total = BigInteger.ZERO;
for (RepairSegment.Builder segment : repairSegments) {
BigInteger size = segment.endToken.subtract(segment.startToken);
for (RingRange segment : repairSegments) {
BigInteger size = segment.span(RANGE_SIZE);
if (lowerThan(size, BigInteger.ZERO))
size = size.add(RANGE_SIZE);
total = total.add(size);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.spotify.reaper.core.ColumnFamily;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.service.RingRange;

import java.math.BigInteger;
import java.util.Collection;
Expand Down Expand Up @@ -48,15 +49,15 @@ public interface IStorage {

ColumnFamily getColumnFamily(String cluster, String keyspace, String table);

int addRepairSegments(long runId, Collection<RepairSegment.Builder> newSegments);
int addRepairSegments(Collection<RepairSegment.Builder> newSegments);

boolean updateRepairSegment(RepairSegment newRepairSegment);

RepairSegment getRepairSegment(long id);

RepairSegment getNextFreeSegment(long runId);

RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end);
RepairSegment getNextFreeSegmentInRange(long runId, RingRange range);


}
25 changes: 5 additions & 20 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.spotify.reaper.core.ColumnFamily;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.service.RingRange;
import com.spotify.reaper.service.SegmentGenerator;

import java.math.BigInteger;
Expand Down Expand Up @@ -157,10 +158,10 @@ public ColumnFamily getColumnFamily(String cluster, String keyspace, String tabl
}

@Override
public int addRepairSegments(long runId, Collection<RepairSegment.Builder> segments) {
public int addRepairSegments(Collection<RepairSegment.Builder> segments) {
LinkedHashMap<Long, RepairSegment> newSegments = Maps.newLinkedHashMap();
for (RepairSegment.Builder segment : segments) {
RepairSegment newRepairSegment = segment.build(runId, SEGMENT_ID.incrementAndGet());
RepairSegment newRepairSegment = segment.build(SEGMENT_ID.incrementAndGet());
repairSegments.put(newRepairSegment.getId(), newRepairSegment);
newSegments.put(newRepairSegment.getId(), newRepairSegment);
}
Expand Down Expand Up @@ -196,27 +197,11 @@ public RepairSegment getNextFreeSegment(long runId) {
return null;
}


public static boolean encloses(BigInteger rangeStart, BigInteger rangeEnd,
BigInteger segmentStart, BigInteger segmentEnd) {
// TODO: unit test for this
if (SegmentGenerator.lowerThanOrEqual(rangeStart, rangeEnd)) {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
} else if (SegmentGenerator.lowerThanOrEqual(segmentStart, segmentEnd)) {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) ||
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
} else {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
}
}

@Override
public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) {
public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED &&
encloses(start, end, segment.getStartToken(), segment.getEndToken())) {
range.encloses(segment.getTokenRange())) {
return segment;
}
}
Expand Down
Loading

0 comments on commit 143e457

Please sign in to comment.