Skip to content

Commit

Permalink
Simplify the creation of repair runs and their segments.
Browse files Browse the repository at this point in the history
 Repair runs and their segments are one unit of work in concept and the persistence layer should be designed accordingly.
 Previous they were separated because the concern of sequence generation for IDs were exposed in the code. This is now encapsulated within storage implementations.
 This work allows the CassandraStorage to implement segments as clustering keys within the repair_run table.

ref:
 - #94
  • Loading branch information
michaelsembwever committed May 12, 2017
1 parent 58c70c7 commit 4084359
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 134 deletions.
11 changes: 8 additions & 3 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ public enum State {

public static class Builder {

public final UUID runId;

public final RingRange tokenRange;
private final UUID repairUnitId;
private UUID runId;
private int failCount;
private State state;
private String coordinatorHost;
private Integer repairCommandId;
private DateTime startTime;
private DateTime endTime;

public Builder(UUID runId, RingRange tokenRange, UUID repairUnitId) {
this.runId = runId;
public Builder(RingRange tokenRange, UUID repairUnitId) {
this.repairUnitId = repairUnitId;
this.tokenRange = tokenRange;
this.failCount = 0;
Expand All @@ -136,6 +136,11 @@ private Builder(RepairSegment original) {
endTime = original.endTime;
}

public Builder withRunId(UUID runId){
this.runId = runId;
return this;
}

public Builder failCount(int failCount) {
this.failCount = failCount;
return this;
Expand Down
118 changes: 49 additions & 69 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import com.spotify.reaper.service.SegmentGenerator;
import java.util.UUID;

public class CommonTools {
public final class CommonTools {

private static final Logger LOG = LoggerFactory.getLogger(CommonTools.class);

Expand Down Expand Up @@ -70,20 +70,29 @@ public static RepairRun registerRepairRun(AppContext context, Cluster cluster,

Map<String, RingRange> nodes = getClusterNodes(context, cluster, repairUnit);
// the next step is to prepare a repair run object
RepairRun repairRun = storeNewRepairRun(context, cluster, repairUnit, cause, owner, nodes.keySet().size(),
repairParallelism, intensity);
checkNotNull(repairRun, "failed preparing repair run");

// Notice that our RepairRun core object doesn't contain pointer to
// the set of RepairSegments in the run, as they are accessed separately.
// However, RepairSegment has a pointer to the RepairRun it lives in
segments = repairUnit.getIncrementalRepair() ? nodes.keySet().size() : tokenSegments.size();

RepairRun.Builder runBuilder
= createNewRepairRun(cluster, repairUnit, cause, owner, segments, repairParallelism, intensity);

// the last preparation step is to generate actual repair segments
if(!repairUnit.getIncrementalRepair()) {
return storeNewRepairSegments(context, tokenSegments, repairRun, repairUnit);
} else {
return storeNewRepairSegmentsForIncrementalRepair(context, nodes, repairRun, repairUnit);
List<RepairSegment.Builder> segmentBuilders = repairUnit.getIncrementalRepair()
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit)
: createRepairSegments(tokenSegments, repairUnit);

RepairRun repairRun = context.storage.addRepairRun(runBuilder, segmentBuilders);

if (null == repairRun){
String errMsg = String.format(
"failed storing repair run for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
repairUnit.getKeyspaceName(),
repairUnit.getColumnFamilies());

LOG.error(errMsg);
throw new ReaperException(errMsg);
}
return repairRun;
}

/**
Expand Down Expand Up @@ -133,78 +142,47 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
* @return the new, just stored RepairRun instance
* @throws ReaperException when fails to store the RepairRun.
*/
private static RepairRun storeNewRepairRun(AppContext context, Cluster cluster,
RepairUnit repairUnit, Optional<String> cause,
String owner, int segments,
RepairParallelism repairParallelism, Double intensity)
throws ReaperException {
RepairRun.Builder runBuilder = new RepairRun.Builder(cluster.getName(), repairUnit.getId(),
DateTime.now(), intensity,
segments, repairParallelism);
runBuilder.cause(cause.isPresent() ? cause.get() : "no cause specified");
runBuilder.owner(owner);
RepairRun newRepairRun = context.storage.addRepairRun(runBuilder);
if (newRepairRun == null) {
String errMsg = String.format("failed storing repair run for cluster \"%s\", "
+ "keyspace \"%s\", and column families: %s",
cluster.getName(), repairUnit.getKeyspaceName(),
repairUnit.getColumnFamilies());
LOG.error(errMsg);
throw new ReaperException(errMsg);
}
return newRepairRun;
private static RepairRun.Builder createNewRepairRun(
Cluster cluster,
RepairUnit repairUnit,
Optional<String> cause,
String owner,
int segments,
RepairParallelism repairParallelism,
Double intensity) throws ReaperException {

return new RepairRun.Builder(cluster.getName(), repairUnit.getId(), DateTime.now(), intensity, segments, repairParallelism)
.cause(cause.isPresent() ? cause.get() : "no cause specified")
.owner(owner);
}

/**
* Creates the repair runs linked to given RepairRun and stores them directly in the storage
* backend.
*/
private static RepairRun storeNewRepairSegments(AppContext context, List<RingRange> tokenSegments,
RepairRun repairRun, RepairUnit repairUnit) {
private static List<RepairSegment.Builder> createRepairSegments(List<RingRange> tokenSegments, RepairUnit repairUnit){

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (RingRange range : tokenSegments) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range,
repairUnit.getId());
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != tokenSegments.size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), tokenSegments.size());
RepairRun newRepairRun = repairRun.with().segmentCount(tokenSegments.size()).build(repairRun.getId());
context.storage.updateRepairRun(newRepairRun);

return newRepairRun;
}

return repairRun;
tokenSegments.forEach((range) -> repairSegmentBuilders.add(new RepairSegment.Builder(range, repairUnit.getId())));
return repairSegmentBuilders;
}


/**
* Creates the repair runs linked to given RepairRun and stores them directly in the storage
* backend in case of incrementalRepair
*/
private static RepairRun storeNewRepairSegmentsForIncrementalRepair(AppContext context, Map<String, RingRange> nodes,
RepairRun repairRun, RepairUnit repairUnit) {
private static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRepair(
Map<String, RingRange> nodes,
RepairUnit repairUnit) {

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (Entry<String, RingRange> range : nodes.entrySet()) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range.getValue(),
repairUnit.getId());
repairSegment.coordinatorHost(range.getKey());
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != nodes.keySet().size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), nodes.keySet().size());
RepairRun newRepairRun = repairRun.with().segmentCount(nodes.keySet().size()).build(repairRun.getId());
context.storage.updateRepairRun(newRepairRun);

return newRepairRun;
}

return repairRun;

nodes.entrySet().forEach((range)
-> repairSegmentBuilders.add(
new RepairSegment.Builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));

return repairSegmentBuilders;
}

private static Map<String, RingRange> getClusterNodes(AppContext context, Cluster targetCluster, RepairUnit repairUnit) throws ReaperException {
Expand Down Expand Up @@ -378,4 +356,6 @@ public static Set<String> parseSeedHosts(String seedHost) {
return Arrays.stream(seedHost.split(",")).map(String::trim).collect(Collectors.toSet());
}

private CommonTools(){}

}
28 changes: 20 additions & 8 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import io.dropwizard.setup.Environment;

public class CassandraStorage implements IStorage {
public final class CassandraStorage implements IStorage {
private static final Logger LOG = LoggerFactory.getLogger(CassandraStorage.class);
com.datastax.driver.core.Cluster cassandra = null;
Session session;
Expand Down Expand Up @@ -182,7 +182,7 @@ public Optional<Cluster> deleteCluster(String clusterName) {
}

@Override
public RepairRun addRepairRun(Builder repairRun) {
public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builder> newSegments) {
RepairRun newRepairRun = repairRun.build(UUIDs.timeBased());
BatchStatement batch = new BatchStatement();
batch.add(insertRepairRunPrepStmt.bind(newRepairRun.getId(),
Expand All @@ -203,6 +203,7 @@ public RepairRun addRepairRun(Builder repairRun) {
batch.add(insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId()));
batch.add(insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId()));
session.execute(batch);
addRepairSegments(newSegments, newRepairRun.getId());
return newRepairRun;
}

Expand Down Expand Up @@ -355,13 +356,24 @@ public Optional<RepairUnit> getRepairUnit(String cluster, String keyspace, Set<S
return Optional.fromNullable(repairUnit);
}

@Override
public void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
private void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
List<ResultSetFuture> insertFutures = Lists.<ResultSetFuture>newArrayList();
BatchStatement batch = new BatchStatement();
for(com.spotify.reaper.core.RepairSegment.Builder builder:newSegments){
RepairSegment segment = builder.build(UUIDs.timeBased());
insertFutures.add(session.executeAsync(insertRepairSegmentPrepStmt.bind(segment.getId(), segment.getRepairUnitId(), segment.getRunId(), segment.getStartToken(), segment.getEndToken(), segment.getState().ordinal(), segment.getCoordinatorHost(), segment.getStartTime(), segment.getEndTime(), segment.getFailCount())));
for(RepairSegment.Builder builder:newSegments){
RepairSegment segment = builder.withRunId(runId).build(UUIDs.timeBased());
insertFutures.add(session.executeAsync(
insertRepairSegmentPrepStmt.bind(
segment.getId(),
segment.getRepairUnitId(),
segment.getRunId(),
segment.getStartToken(),
segment.getEndToken(),
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.getStartTime(),
segment.getEndTime(),
segment.getFailCount())));

batch.add(insertRepairSegmentByRunPrepStmt.bind(segment.getRunId(), segment.getId()));
if(insertFutures.size()%100==0){
// cluster ddos protection
Expand Down Expand Up @@ -445,9 +457,9 @@ private RepairSegment createRepairSegmentFromRow(Row segmentRow){
}
private RepairSegment createRepairSegmentFromRow(Row segmentRow, UUID segmentId){
return new RepairSegment.Builder(
segmentRow.getUUID("run_id"),
new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")),
segmentRow.getUUID("repair_unit_id"))
.withRunId(segmentRow.getUUID("run_id"))
.coordinatorHost(segmentRow.getString("coordinator_host"))
.endTime(new DateTime(segmentRow.getTimestamp("end_time")))
.failCount(segmentRow.getInt("fail_count"))
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface IStorage {
*/
Optional<Cluster> deleteCluster(String clusterName);

RepairRun addRepairRun(RepairRun.Builder repairRun);
RepairRun addRepairRun(RepairRun.Builder repairRun, Collection<RepairSegment.Builder> newSegments);

boolean updateRepairRun(RepairRun repairRun);

Expand Down Expand Up @@ -91,7 +91,6 @@ public interface IStorage {
Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,
Set<String> columnFamilyNames);

void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId);

boolean updateRepairSegment(RepairSegment newRepairSegment);

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
/**
* Implements the StorageAPI using transient Java classes.
*/
public class MemoryStorage implements IStorage {
public final class MemoryStorage implements IStorage {

private final ConcurrentMap<String, Cluster> clusters = Maps.newConcurrentMap();
private final ConcurrentMap<UUID, RepairRun> repairRuns = Maps.newConcurrentMap();
Expand Down Expand Up @@ -96,9 +96,10 @@ && getRepairRunsForCluster(clusterName).isEmpty()) {
}

@Override
public RepairRun addRepairRun(RepairRun.Builder repairRun) {
public RepairRun addRepairRun(RepairRun.Builder repairRun, Collection<RepairSegment.Builder> newSegments) {
RepairRun newRepairRun = repairRun.build(UUIDs.timeBased());
repairRuns.put(newRepairRun.getId(), newRepairRun);
addRepairSegments(newSegments, newRepairRun.getId());
return newRepairRun;
}

Expand Down Expand Up @@ -229,11 +230,10 @@ public Optional<RepairUnit> getRepairUnit(String cluster, String keyspace, Set<S
repairUnitsByKey.get(new RepairUnitKey(cluster, keyspace, tables)));
}

@Override
public void addRepairSegments(Collection<RepairSegment.Builder> segments, UUID runId) {
private void addRepairSegments(Collection<RepairSegment.Builder> segments, UUID runId) {
LinkedHashMap<UUID, RepairSegment> newSegments = Maps.newLinkedHashMap();
for (RepairSegment.Builder segment : segments) {
RepairSegment newRepairSegment = segment.build(UUIDs.timeBased());
RepairSegment newRepairSegment = segment.withRunId(runId).build(UUIDs.timeBased());
repairSegments.put(newRepairSegment.getId(), newRepairSegment);
newSegments.put(newRepairSegment.getId(), newRepairSegment);
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/spotify/reaper/storage/PostgresStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
/**
* Implements the StorageAPI using PostgreSQL database.
*/
public class PostgresStorage implements IStorage {
public final class PostgresStorage implements IStorage {

private static final Logger LOG = LoggerFactory.getLogger(PostgresStorage.class);

Expand Down Expand Up @@ -232,12 +232,13 @@ private void tryDeletingRepairUnit(UUID id) {
}

@Override
public RepairRun addRepairRun(RepairRun.Builder newRepairRun) {
public RepairRun addRepairRun(RepairRun.Builder newRepairRun, Collection<RepairSegment.Builder> newSegments) {
RepairRun result;
try (Handle h = jdbi.open()) {
long insertedId = getPostgresStorage(h).insertRepairRun(newRepairRun.build(null));
result = newRepairRun.build(fromSequenceId(insertedId));
}
addRepairSegments(newSegments, result.getId());
return result;
}

Expand Down Expand Up @@ -284,11 +285,10 @@ public Optional<RepairUnit> getRepairUnit(String clusterName, String keyspaceNam
return Optional.fromNullable(result);
}

@Override
public void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
private void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
List<RepairSegment> insertableSegments = new ArrayList<>();
for (RepairSegment.Builder segment : newSegments) {
insertableSegments.add(segment.build(null));
insertableSegments.add(segment.withRunId(runId).build(null));
}
try (Handle h = jdbi.open()) {
getPostgresStorage(h).insertRepairSegments(insertableSegments.iterator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public class RepairSegmentMapper implements ResultSetMapper<RepairSegment> {
public RepairSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException {
RingRange range = new RingRange(r.getBigDecimal("start_token").toBigInteger(),
r.getBigDecimal("end_token").toBigInteger());
RepairSegment.Builder repairSegmentBuilder =
new RepairSegment.Builder(fromSequenceId(r.getLong("run_id")), range, fromSequenceId(r.getLong("repair_unit_id")));
return repairSegmentBuilder
.state(RepairSegment.State.values()[r.getInt("state")])
.coordinatorHost(r.getString("coordinator_host"))
.startTime(RepairRunMapper.getDateTimeOrNull(r, "start_time"))
.endTime(RepairRunMapper.getDateTimeOrNull(r, "end_time"))
.failCount(r.getInt("fail_count"))
.build(fromSequenceId(r.getLong("id")));
return
new RepairSegment.Builder(range, fromSequenceId(r.getLong("repair_unit_id")))
.withRunId(fromSequenceId(r.getLong("run_id")))
.state(RepairSegment.State.values()[r.getInt("state")])
.coordinatorHost(r.getString("coordinator_host"))
.startTime(RepairRunMapper.getDateTimeOrNull(r, "start_time"))
.endTime(RepairRunMapper.getDateTimeOrNull(r, "end_time"))
.failCount(r.getInt("fail_count"))
.build(fromSequenceId(r.getLong("id")));
}

private static UUID fromSequenceId(long insertedId) {
Expand Down
Loading

0 comments on commit 4084359

Please sign in to comment.