Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix number of parallel repair computation #108

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dropwizard.version>1.1.0</dropwizard.version>
<dropwizard.version>1.0.7</dropwizard.version>
<dropwizard.cassandra.version>4.1.0</dropwizard.cassandra.version>
<cassandra.version>2.2.7</cassandra.version>
<cucumber.version>1.2.5</cucumber.version>
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra-ssl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7198
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-h2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-memory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void run(ReaperApplicationConfiguration config,
context.repairManager.initializeThreadPool(
config.getRepairRunThreadCount(),
config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES,
30, TimeUnit.SECONDS);
config.getRepairManagerSchedulingIntervalSeconds(), TimeUnit.SECONDS);

if (context.storage == null) {
LOG.info("initializing storage of type: {}", config.getStorageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class ReaperApplicationConfiguration extends Configuration {
@JsonProperty
@DefaultValue("true")
private Boolean enableDynamicSeedList;

@JsonProperty
private Integer repairManagerSchedulingIntervalSeconds;

public int getSegmentCount() {
return segmentCount;
Expand Down Expand Up @@ -170,13 +173,13 @@ public void setDataSourceFactory(DataSourceFactory database) {
this.database = database;
}

public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
public int getRepairManagerSchedulingIntervalSeconds() {
return this.repairManagerSchedulingIntervalSeconds==null?30:this.repairManagerSchedulingIntervalSeconds;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
public void setRepairManagerSchedulingIntervalSeconds(int repairManagerSchedulingIntervalSeconds) {
this.repairManagerSchedulingIntervalSeconds = repairManagerSchedulingIntervalSeconds;
}

public Map<String, Integer> getJmxPorts() {
Expand Down Expand Up @@ -261,6 +264,15 @@ public void setAllowUnreachableNodes(Boolean allow) {
this.allowUnreachableNodes = allow;
}

public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
}

public static class AutoSchedulingConfiguration {

@JsonProperty
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange)
return Lists.newArrayList();
}

/**
* @return all hosts in the ring with their host id
*/
@NotNull
public Map<String, String> getEndpointToHostId() {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
Map<String, String> hosts =
((StorageServiceMBean) ssProxy).getEndpointToHostId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR Immediately return this expression instead of assigning it to the temporary variable "hosts". rule


return hosts;
}

/**
* @return full class name of Cassandra's partitioner.
*/
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public RepairRunner(AppContext context, long repairRunId)
JmxProxy jmx = this.context.jmxConnectionFactory.connectAny(cluster.get());

String keyspace = repairUnitOpt.get().getKeyspaceName();
int parallelRepairs = getPossibleParallelRepairsCount(jmx.getRangeToEndpointMap(keyspace));
int parallelRepairs = getPossibleParallelRepairsCount(jmx.getRangeToEndpointMap(keyspace), jmx.getEndpointToHostId());
if(repairUnitOpt.isPresent() && repairUnitOpt.get().getIncrementalRepair()) {
// with incremental repair, can't have more parallel repairs than nodes
parallelRepairs = 1;
Expand All @@ -96,14 +96,15 @@ public long getRepairRunId() {
}

@VisibleForTesting
public static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> ranges)
public static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> ranges, Map<String, String> hostsInRing)
throws ReaperException {
if (ranges.isEmpty()) {
String msg = "Repairing 0-sized cluster.";
LOG.error(msg);
throw new ReaperException(msg);
}
return ranges.size() / ranges.values().iterator().next().size();

return Math.min(ranges.size() / ranges.values().iterator().next().size(), Math.max(1, hostsInRing.keySet().size()/ranges.values().iterator().next().size()));
}

@VisibleForTesting
Expand Down
47 changes: 38 additions & 9 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR Remove this unused import 'java.util.concurrent.TimeUnit'. rule

import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class CassandraStorage implements IStorage {
/** simple cache of repair_id.
* not accurate, only provides a floor value to shortcut looking for next appropriate id */
private final ConcurrentMap<String,Long> repairIds = new ConcurrentHashMap<>();

/* Simple statements */
private final String getClustersStmt = "SELECT * FROM cluster";

Expand Down Expand Up @@ -426,7 +427,26 @@ public Collection<RepairSegment> getRepairSegmentsForRun(long runId) {
// Then get segments by id
segmentsFuture.add(session.executeAsync(getRepairSegmentPrepStmt.bind(segmentIdResult.getLong("segment_id"))));
i++;
if(i%100==0 || segmentsIdResultSet.isFullyFetched()) {
if(i%100==0 || segmentsIdResultSet.isExhausted()) {
segments.addAll(fetchRepairSegmentFromFutures(segmentsFuture));
segmentsFuture = Lists.newArrayList();
}
}

return segments;
}

public Collection<RepairSegment> getRepairSegmentsForRunWithCache(long runId) {
List<ResultSetFuture> segmentsFuture = Lists.newArrayList();
Collection<RepairSegment> segments = Lists.newArrayList();

// First gather segments ids
ResultSet segmentsIdResultSet = session.execute(getRepairSegmentByRunIdPrepStmt.bind(runId));
int i=0;
for(Row segmentIdResult:segmentsIdResultSet) {
segmentsFuture.add(session.executeAsync(getRepairSegmentPrepStmt.bind(segmentIdResult.getLong("segment_id"))));
i++;
if(i%100==0 || segmentsIdResultSet.isExhausted()) {
segments.addAll(fetchRepairSegmentFromFutures(segmentsFuture));
segmentsFuture = Lists.newArrayList();
}
Expand All @@ -441,32 +461,41 @@ private Collection<RepairSegment> fetchRepairSegmentFromFutures(List<ResultSetFu
for(ResultSetFuture segmentResult:segmentsFuture) {
Row segmentRow = segmentResult.getUninterruptibly().one();
if(segmentRow!=null){
segments.add(createRepairSegmentFromRow(segmentRow));
RepairSegment segment = createRepairSegmentFromRow(segmentRow);
segments.add(segment);
}
}

return segments;

}

private boolean segmentIsWithinRange(RepairSegment segment, RingRange range) {
return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken()));

}

private RepairSegment createRepairSegmentFromRow(Row segmentRow){
return createRepairSegmentFromRow(segmentRow, segmentRow.getLong("id"));
}

private RepairSegment createRepairSegmentFromRow(Row segmentRow, long segmentId){
return new RepairSegment.Builder(segmentRow.getLong("run_id"), new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")), segmentRow.getLong("repair_unit_id"))
RepairSegment segment = new RepairSegment.Builder(segmentRow.getLong("run_id"), new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")), segmentRow.getLong("repair_unit_id"))
Copy link
Collaborator

@tlpsonarqube tlpsonarqube May 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR Immediately return this expression instead of assigning it to the temporary variable "segment". rule

.coordinatorHost(segmentRow.getString("coordinator_host"))
.endTime(new DateTime(segmentRow.getTimestamp("end_time")))
.failCount(segmentRow.getInt("fail_count"))
.startTime(new DateTime(segmentRow.getTimestamp("start_time")))
.state(State.values()[segmentRow.getInt("state")])
.build(segmentRow.getLong("id"));

return segment;
}


public Optional<RepairSegment> getSegment(long runId, Optional<RingRange> range){
RepairSegment segment = null;
List<RepairSegment> segments = Lists.<RepairSegment>newArrayList();
segments.addAll(getRepairSegmentsForRun(runId));
segments.addAll(getRepairSegmentsForRunWithCache(runId));

// Sort segments by fail count and start token (in order to try those who haven't failed first, in start token order)
Collections.sort( segments, new Comparator<RepairSegment>(){
Expand All @@ -481,16 +510,16 @@ public int compare(RepairSegment seg1, RepairSegment seg2) {
for(RepairSegment seg:segments){
if(seg.getState().equals(State.NOT_STARTED) // State condition
&& ((range.isPresent() &&
(range.get().getStart().compareTo(seg.getStartToken())>=0 || range.get().getEnd().compareTo(seg.getEndToken())<=0)
(segmentIsWithinRange(seg, range.get()))
) || !range.isPresent()) // Token range condition
){
segment = seg;
break;
segment = seg;
break;
}
}
return Optional.fromNullable(segment);
}

@Override
public Optional<RepairSegment> getNextFreeSegment(long runId) {
return getSegment(runId, Optional.<RingRange>absent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,12 @@ public void run() {
@Test
public void getPossibleParallelRepairsTest() throws Exception {
Map<List<String>, List<String>> map = RepairRunnerTest.threeNodeCluster();
assertEquals(1, RepairRunner.getPossibleParallelRepairsCount(map));
Map<String, String> endpointsThreeNodes = RepairRunnerTest.threeNodeClusterEndpoint();
assertEquals(1, RepairRunner.getPossibleParallelRepairsCount(map, endpointsThreeNodes));

map = RepairRunnerTest.sixNodeCluster();
assertEquals(2, RepairRunner.getPossibleParallelRepairsCount(map));
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
assertEquals(2, RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes));
}

@Test
Expand All @@ -384,8 +386,37 @@ public BigInteger apply(String s) {
List<RingRange> segments = generator.generateSegments(32, tokens, Boolean.FALSE);

Map<List<String>, List<String>> map = RepairRunnerTest.sixNodeCluster();
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
List<RingRange> ranges = RepairRunner.getParallelRanges(
RepairRunner.getPossibleParallelRepairsCount(map),
RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes),
segments
);
assertEquals(2, ranges.size());
assertEquals( "0", ranges.get(0).getStart().toString());
assertEquals("150", ranges.get(0).getEnd().toString());
assertEquals("150", ranges.get(1).getStart().toString());
assertEquals( "0", ranges.get(1).getEnd().toString());
}

@Test
public void getParallelSegmentsTest2() throws ReaperException {
List<BigInteger> tokens = Lists.transform(
Lists.newArrayList("0", "25", "50", "75", "100", "125", "150", "175", "200", "225", "250"),
new Function<String, BigInteger>() {
@Nullable
@Override
public BigInteger apply(String s) {
return new BigInteger(s);
}
}
);
SegmentGenerator generator = new SegmentGenerator(new BigInteger("0"), new BigInteger("299"));
List<RingRange> segments = generator.generateSegments(32, tokens, Boolean.FALSE);

Map<List<String>, List<String>> map = RepairRunnerTest.sixNodeCluster();
Map<String, String> endpointsSixNodes = RepairRunnerTest.sixNodeClusterEndpoint();
List<RingRange> ranges = RepairRunner.getParallelRanges(
RepairRunner.getPossibleParallelRepairsCount(map, endpointsSixNodes),
segments
);
assertEquals(2, ranges.size());
Expand Down Expand Up @@ -413,6 +444,26 @@ public static Map<List<String>, List<String>> sixNodeCluster() {
map = addRangeToMap(map, "250", "0", "a6", "a1", "a2");
return map;
}

public static Map<String, String> threeNodeClusterEndpoint() {
Map<String, String> map = Maps.newHashMap();
map.put("host1", "hostId1");
map.put("host2", "hostId2");
map.put("host3", "hostId3");
return map;
}

public static Map<String, String> sixNodeClusterEndpoint() {
Map<String, String> map = Maps.newHashMap();
map.put("host1", "hostId1");
map.put("host2", "hostId2");
map.put("host3", "hostId3");
map.put("host4", "hostId4");
map.put("host5", "hostId5");
map.put("host6", "hostId6");
return map;
}


private static Map<List<String>, List<String>> addRangeToMap(Map<List<String>, List<String>> map,
String rStart, String rEnd, String... hosts) {
Expand Down