Skip to content

Commit

Permalink
simplify creating repair run and segments (#101)
Browse files Browse the repository at this point in the history
* Cassandra performance: Replace sequence ids with time-based UUIDs

ref:
 - #99
 - #94

* Simplify the creation of repair runs and their segments.
 Repair runs and their segments are one unit of work in concept and the persistence layer should be designed accordingly.
 Previous they were separated because the concern of sequence generation for IDs were exposed in the code. This is now encapsulated within storage implementations.
 This work allows the CassandraStorage to implement segments as clustering keys within the repair_run table.

ref:
 - #94
 - #101

* SQUASH ME
 Makes the schema changes in a separate migration step, so that data in the repair_unit and repair_schedule tables can be migrated over.

ref: #99 (comment)

* Fix file names and 002 migration file
  • Loading branch information
michaelsembwever authored and adejanovski committed May 29, 2017
1 parent 1e37a09 commit e303141
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 154 deletions.
11 changes: 8 additions & 3 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ public enum State {

public static class Builder {

public final UUID runId;

public final RingRange tokenRange;
private final UUID repairUnitId;
private UUID runId;
private int failCount;
private State state;
private String coordinatorHost;
private Integer repairCommandId;
private DateTime startTime;
private DateTime endTime;

public Builder(UUID runId, RingRange tokenRange, UUID repairUnitId) {
this.runId = runId;
public Builder(RingRange tokenRange, UUID repairUnitId) {
this.repairUnitId = repairUnitId;
this.tokenRange = tokenRange;
this.failCount = 0;
Expand All @@ -136,6 +136,11 @@ private Builder(RepairSegment original) {
endTime = original.endTime;
}

public Builder withRunId(UUID runId){
this.runId = runId;
return this;
}

public Builder failCount(int failCount) {
this.failCount = failCount;
return this;
Expand Down
154 changes: 67 additions & 87 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import com.spotify.reaper.service.SegmentGenerator;
import java.util.UUID;

public class CommonTools {
public final class CommonTools {

private static final Logger LOG = LoggerFactory.getLogger(CommonTools.class);

Expand Down Expand Up @@ -70,25 +70,34 @@ public static RepairRun registerRepairRun(AppContext context, Cluster cluster,

Map<String, RingRange> nodes = getClusterNodes(context, cluster, repairUnit);
// the next step is to prepare a repair run object
RepairRun repairRun = storeNewRepairRun(context, cluster, repairUnit, cause, owner, nodes.keySet().size(),
repairParallelism, intensity);
checkNotNull(repairRun, "failed preparing repair run");
segments = repairUnit.getIncrementalRepair() ? nodes.keySet().size() : tokenSegments.size();

// Notice that our RepairRun core object doesn't contain pointer to
// the set of RepairSegments in the run, as they are accessed separately.
// However, RepairSegment has a pointer to the RepairRun it lives in
RepairRun.Builder runBuilder
= createNewRepairRun(cluster, repairUnit, cause, owner, segments, repairParallelism, intensity);

// the last preparation step is to generate actual repair segments
if(!repairUnit.getIncrementalRepair()) {
return storeNewRepairSegments(context, tokenSegments, repairRun, repairUnit);
} else {
return storeNewRepairSegmentsForIncrementalRepair(context, nodes, repairRun, repairUnit);
List<RepairSegment.Builder> segmentBuilders = repairUnit.getIncrementalRepair()
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit)
: createRepairSegments(tokenSegments, repairUnit);

RepairRun repairRun = context.storage.addRepairRun(runBuilder, segmentBuilders);

if (null == repairRun){
String errMsg = String.format(
"failed storing repair run for cluster \"%s\", keyspace \"%s\", and column families: %s",
cluster.getName(),
repairUnit.getKeyspaceName(),
repairUnit.getColumnFamilies());

LOG.error(errMsg);
throw new ReaperException(errMsg);
}
return repairRun;
}

/**
* Splits a token range for given table into segments
* @param incrementalRepair
* @param incrementalRepair
*
* @return the created segments
* @throws ReaperException when fails to discover seeds for the cluster or fails to connect to
Expand All @@ -98,7 +107,7 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
int segmentCount, Boolean incrementalRepair)
throws ReaperException {
List<RingRange> segments = null;
Preconditions.checkState(targetCluster.getPartitioner() != null,
Preconditions.checkState(targetCluster.getPartitioner() != null,
"no partitioner for cluster: " + targetCluster.getName());
SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner());
Set<String> seedHosts = targetCluster.getSeedHosts();
Expand All @@ -117,7 +126,7 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
LOG.warn("couldn't connect to host: {}, will try next one", host, e);
}
}

if (segments == null) {
String errMsg = String.format("failed to generate repair segments for cluster \"%s\"",
targetCluster.getName());
Expand All @@ -133,80 +142,49 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
* @return the new, just stored RepairRun instance
* @throws ReaperException when fails to store the RepairRun.
*/
private static RepairRun storeNewRepairRun(AppContext context, Cluster cluster,
RepairUnit repairUnit, Optional<String> cause,
String owner, int segments,
RepairParallelism repairParallelism, Double intensity)
throws ReaperException {
RepairRun.Builder runBuilder = new RepairRun.Builder(cluster.getName(), repairUnit.getId(),
DateTime.now(), intensity,
segments, repairParallelism);
runBuilder.cause(cause.isPresent() ? cause.get() : "no cause specified");
runBuilder.owner(owner);
RepairRun newRepairRun = context.storage.addRepairRun(runBuilder);
if (newRepairRun == null) {
String errMsg = String.format("failed storing repair run for cluster \"%s\", "
+ "keyspace \"%s\", and column families: %s",
cluster.getName(), repairUnit.getKeyspaceName(),
repairUnit.getColumnFamilies());
LOG.error(errMsg);
throw new ReaperException(errMsg);
}
return newRepairRun;
private static RepairRun.Builder createNewRepairRun(
Cluster cluster,
RepairUnit repairUnit,
Optional<String> cause,
String owner,
int segments,
RepairParallelism repairParallelism,
Double intensity) throws ReaperException {

return new RepairRun.Builder(cluster.getName(), repairUnit.getId(), DateTime.now(), intensity, segments, repairParallelism)
.cause(cause.isPresent() ? cause.get() : "no cause specified")
.owner(owner);
}

/**
* Creates the repair runs linked to given RepairRun and stores them directly in the storage
* backend.
*/
private static RepairRun storeNewRepairSegments(AppContext context, List<RingRange> tokenSegments,
RepairRun repairRun, RepairUnit repairUnit) {
private static List<RepairSegment.Builder> createRepairSegments(List<RingRange> tokenSegments, RepairUnit repairUnit){

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (RingRange range : tokenSegments) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range,
repairUnit.getId());
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != tokenSegments.size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), tokenSegments.size());
RepairRun newRepairRun = repairRun.with().segmentCount(tokenSegments.size()).build(repairRun.getId());
context.storage.updateRepairRun(newRepairRun);

return newRepairRun;
}

return repairRun;
tokenSegments.forEach(range -> repairSegmentBuilders.add(new RepairSegment.Builder(range, repairUnit.getId())));
return repairSegmentBuilders;
}


/**
* Creates the repair runs linked to given RepairRun and stores them directly in the storage
* backend in case of incrementalRepair
*/
private static RepairRun storeNewRepairSegmentsForIncrementalRepair(AppContext context, Map<String, RingRange> nodes,
RepairRun repairRun, RepairUnit repairUnit) {
private static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRepair(
Map<String, RingRange> nodes,
RepairUnit repairUnit) {

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
for (Entry<String, RingRange> range : nodes.entrySet()) {
RepairSegment.Builder repairSegment = new RepairSegment.Builder(repairRun.getId(), range.getValue(),
repairUnit.getId());
repairSegment.coordinatorHost(range.getKey());
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != nodes.keySet().size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), nodes.keySet().size());
RepairRun newRepairRun = repairRun.with().segmentCount(nodes.keySet().size()).build(repairRun.getId());
context.storage.updateRepairRun(newRepairRun);

return newRepairRun;
}

return repairRun;

nodes.entrySet().forEach(range
-> repairSegmentBuilders.add(
new RepairSegment.Builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));

return repairSegmentBuilders;
}

private static Map<String, RingRange> getClusterNodes(AppContext context, Cluster targetCluster, RepairUnit repairUnit) throws ReaperException {
Set<String> nodes = Sets.newHashSet();
ConcurrentHashMap<String, RingRange> nodesWithRanges = new ConcurrentHashMap<String, RingRange>();
Expand All @@ -217,27 +195,27 @@ private static Map<String, RingRange> getClusterNodes(AppContext context, Clust
LOG.error(errMsg);
throw new ReaperException(errMsg);
}


Map<List<String>, List<String>> rangeToEndpoint = Maps.newHashMap();
for (String host : seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
rangeToEndpoint = jmxProxy.getRangeToEndpointMap(repairUnit.getKeyspaceName());
rangeToEndpoint = jmxProxy.getRangeToEndpointMap(repairUnit.getKeyspaceName());
break;
} catch (ReaperException e) {
LOG.warn("couldn't connect to host: {}, will try next one", host, e);
}
}

for(Entry<List<String>, List<String>> tokenRangeToEndpoint:rangeToEndpoint.entrySet()) {
String node = tokenRangeToEndpoint.getValue().get(0);
RingRange range = new RingRange(tokenRangeToEndpoint.getKey().get(0), tokenRangeToEndpoint.getKey().get(1));
RingRange added = nodesWithRanges.putIfAbsent(node, range);
RingRange added = nodesWithRanges.putIfAbsent(node, range);
}

return nodesWithRanges;
}


/**
* Instantiates a RepairSchedule and stores it in the storage backend.
Expand All @@ -262,7 +240,7 @@ public static RepairSchedule storeNewRepairSchedule(
repairParallelism, intensity,
DateTime.now());
scheduleBuilder.owner(owner);

Collection<RepairSchedule> repairSchedules = context.storage.getRepairSchedulesForClusterAndKeyspace(repairUnit.getClusterName(), repairUnit.getKeyspaceName());
for(RepairSchedule sched:repairSchedules){
Optional<RepairUnit> repairUnitForSched = context.storage.getRepairUnit(sched.getRepairUnitId());
Expand All @@ -277,7 +255,7 @@ public static RepairSchedule storeNewRepairSchedule(
}
}
}

RepairSchedule newRepairSchedule = context.storage.addRepairSchedule(scheduleBuilder);
if (newRepairSchedule == null) {
String errMsg = String.format("failed storing repair schedule for cluster \"%s\", "
Expand All @@ -289,13 +267,13 @@ public static RepairSchedule storeNewRepairSchedule(
}
return newRepairSchedule;
}

private static final boolean aConflictingScheduleAlreadyExists(RepairUnit newRepairUnit, RepairUnit existingRepairUnit){
return (newRepairUnit.getColumnFamilies().isEmpty() && existingRepairUnit.getColumnFamilies().isEmpty())
|| newRepairUnit.getColumnFamilies().isEmpty() && !existingRepairUnit.getColumnFamilies().isEmpty()
|| !newRepairUnit.getColumnFamilies().isEmpty() && existingRepairUnit.getColumnFamilies().isEmpty()
|| !Sets.intersection(existingRepairUnit.getColumnFamilies(),newRepairUnit.getColumnFamilies()).isEmpty();

}

public static final Splitter COMMA_SEPARATED_LIST_SPLITTER =
Expand Down Expand Up @@ -332,7 +310,7 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster
Optional<RepairUnit> storedRepairUnit =
context.storage.getRepairUnit(cluster.getName(), keyspace, tableNames);
RepairUnit theRepairUnit;

Optional<String> cassandraVersion = Optional.absent();
for (String host : cluster.getSeedHosts()) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
Expand All @@ -348,7 +326,7 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster
LOG.error(errMsg);
throw new ReaperException(errMsg);
}

if (storedRepairUnit.isPresent() && storedRepairUnit.get().getIncrementalRepair().equals(incrementalRepair)) {
LOG.info("use existing repair unit for cluster '{}', keyspace '{}', and column families: {}",
cluster.getName(), keyspace, tableNames);
Expand Down Expand Up @@ -378,4 +356,6 @@ public static Set<String> parseSeedHosts(String seedHost) {
return Arrays.stream(seedHost.split(",")).map(String::trim).collect(Collectors.toSet());
}

private CommonTools(){}

}
30 changes: 20 additions & 10 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.spotify.reaper.storage;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -24,7 +23,6 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -53,7 +51,7 @@

import io.dropwizard.setup.Environment;

public class CassandraStorage implements IStorage {
public final class CassandraStorage implements IStorage {
private static final Logger LOG = LoggerFactory.getLogger(CassandraStorage.class);
com.datastax.driver.core.Cluster cassandra = null;
Session session;
Expand Down Expand Up @@ -182,7 +180,7 @@ public Optional<Cluster> deleteCluster(String clusterName) {
}

@Override
public RepairRun addRepairRun(Builder repairRun) {
public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builder> newSegments) {
RepairRun newRepairRun = repairRun.build(UUIDs.timeBased());
BatchStatement batch = new BatchStatement();
batch.add(insertRepairRunPrepStmt.bind(newRepairRun.getId(),
Expand All @@ -203,6 +201,7 @@ public RepairRun addRepairRun(Builder repairRun) {
batch.add(insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId()));
batch.add(insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId()));
session.execute(batch);
addRepairSegments(newSegments, newRepairRun.getId());
return newRepairRun;
}

Expand Down Expand Up @@ -355,13 +354,24 @@ public Optional<RepairUnit> getRepairUnit(String cluster, String keyspace, Set<S
return Optional.fromNullable(repairUnit);
}

@Override
public void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
private void addRepairSegments(Collection<RepairSegment.Builder> newSegments, UUID runId) {
List<ResultSetFuture> insertFutures = Lists.<ResultSetFuture>newArrayList();
BatchStatement batch = new BatchStatement();
for(com.spotify.reaper.core.RepairSegment.Builder builder:newSegments){
RepairSegment segment = builder.build(UUIDs.timeBased());
insertFutures.add(session.executeAsync(insertRepairSegmentPrepStmt.bind(segment.getId(), segment.getRepairUnitId(), segment.getRunId(), segment.getStartToken(), segment.getEndToken(), segment.getState().ordinal(), segment.getCoordinatorHost(), segment.getStartTime(), segment.getEndTime(), segment.getFailCount())));
for(RepairSegment.Builder builder:newSegments){
RepairSegment segment = builder.withRunId(runId).build(UUIDs.timeBased());
insertFutures.add(session.executeAsync(
insertRepairSegmentPrepStmt.bind(
segment.getId(),
segment.getRepairUnitId(),
segment.getRunId(),
segment.getStartToken(),
segment.getEndToken(),
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.getStartTime(),
segment.getEndTime(),
segment.getFailCount())));

batch.add(insertRepairSegmentByRunPrepStmt.bind(segment.getRunId(), segment.getId()));
if(insertFutures.size()%100==0){
// cluster ddos protection
Expand Down Expand Up @@ -445,9 +455,9 @@ private RepairSegment createRepairSegmentFromRow(Row segmentRow){
}
private RepairSegment createRepairSegmentFromRow(Row segmentRow, UUID segmentId){
return new RepairSegment.Builder(
segmentRow.getUUID("run_id"),
new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")),
segmentRow.getUUID("repair_unit_id"))
.withRunId(segmentRow.getUUID("run_id"))
.coordinatorHost(segmentRow.getString("coordinator_host"))
.endTime(new DateTime(segmentRow.getTimestamp("end_time")))
.failCount(segmentRow.getInt("fail_count"))
Expand Down
Loading

0 comments on commit e303141

Please sign in to comment.