Skip to content

Commit

Permalink
Fix number of parallel repair computation
Browse files Browse the repository at this point in the history
Downgrade to Dropwizard 1.0.7 and Guava 19.0 to fix dependency issues
Make repair manager schedule cycle configurable (was 30s hardcoded)

ref: #108
  • Loading branch information
adejanovski authored and michaelsembwever committed May 31, 2017
1 parent 6e8fd29 commit 01b56e6
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 52 deletions.
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra-ssl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7198
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-h2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-memory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
1 change: 1 addition & 0 deletions resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true
repairManagerSchedulingIntervalSeconds: 30

jmxPorts:
127.0.0.1: 7100
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void run(ReaperApplicationConfiguration config,
context.repairManager.initializeThreadPool(
config.getRepairRunThreadCount(),
config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES,
30, TimeUnit.SECONDS);
config.getRepairManagerSchedulingIntervalSeconds(), TimeUnit.SECONDS);

if (context.storage == null) {
LOG.info("initializing storage of type: {}", config.getStorageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class ReaperApplicationConfiguration extends Configuration {

private String enableCrossOrigin;


@JsonProperty
private DataSourceFactory database = new DataSourceFactory();

Expand All @@ -86,14 +86,17 @@ public class ReaperApplicationConfiguration extends Configuration {
@JsonProperty
@DefaultValue("false")
private Boolean allowUnreachableNodes;

@JsonProperty
private AutoSchedulingConfiguration autoScheduling;

@JsonProperty
@DefaultValue("true")
private Boolean enableDynamicSeedList;

@JsonProperty
private Integer repairManagerSchedulingIntervalSeconds;

public int getSegmentCount() {
return segmentCount;
}
Expand All @@ -117,15 +120,15 @@ public double getRepairIntensity() {
public void setRepairIntensity(double repairIntensity) {
this.repairIntensity = repairIntensity;
}

public Boolean getIncrementalRepair() {
return incrementalRepair;
}

public void setIncrementalRepair(Boolean incrementalRepair) {
this.incrementalRepair = incrementalRepair;
}

public Integer getScheduleDaysBetween() {
return scheduleDaysBetween;
}
Expand Down Expand Up @@ -170,13 +173,13 @@ public void setDataSourceFactory(DataSourceFactory database) {
this.database = database;
}

public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
public int getRepairManagerSchedulingIntervalSeconds() {
return this.repairManagerSchedulingIntervalSeconds==null?30:this.repairManagerSchedulingIntervalSeconds;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
public void setRepairManagerSchedulingIntervalSeconds(int repairManagerSchedulingIntervalSeconds) {
this.repairManagerSchedulingIntervalSeconds = repairManagerSchedulingIntervalSeconds;
}

public Map<String, Integer> getJmxPorts() {
Expand Down Expand Up @@ -206,11 +209,11 @@ public AutoSchedulingConfiguration getAutoScheduling() {
public void setAutoScheduling(AutoSchedulingConfiguration autoRepairScheduling) {
this.autoScheduling = autoRepairScheduling;
}

public void setEnableDynamicSeedList(Boolean enableDynamicSeedList) {
this.enableDynamicSeedList = enableDynamicSeedList;
}

public Boolean getEnableDynamicSeedList() {
return this.enableDynamicSeedList==null?Boolean.TRUE:this.enableDynamicSeedList;
}
Expand Down Expand Up @@ -239,8 +242,8 @@ public String getPassword() {
}

}


private CassandraFactory cassandra = new CassandraFactory();

@JsonProperty("cassandra")
Expand All @@ -252,15 +255,24 @@ public CassandraFactory getCassandraFactory() {
public void setCassandraFactory(CassandraFactory cassandra) {
this.cassandra = cassandra;
}

public Boolean getAllowUnreachableNodes() {
return allowUnreachableNodes != null ? allowUnreachableNodes : false;
}

public void setAllowUnreachableNodes(Boolean allow) {
this.allowUnreachableNodes = allow;
}


public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
}

public static class AutoSchedulingConfiguration {

@JsonProperty
Expand All @@ -277,7 +289,7 @@ public static class AutoSchedulingConfiguration {

@JsonProperty
private Duration scheduleSpreadPeriod;

@JsonProperty
private List<String> excludedKeyspaces = Collections.emptyList();

Expand Down Expand Up @@ -324,7 +336,7 @@ public void setScheduleSpreadPeriod(Duration scheduleSpreadPeriod) {
public boolean hasScheduleSpreadPeriod() {
return scheduleSpreadPeriod != null;
}

public void setExcludedKeyspaces(List<String> excludedKeyspaces) {
this.excludedKeyspaces = excludedKeyspaces;
}
Expand Down
64 changes: 38 additions & 26 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
if(cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")){
ssProxy = JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean20.class);
}

CompactionManagerMBean cmProxy =
JMX.newMBeanProxy(mbeanServerConn, cmMbeanName, CompactionManagerMBean.class);
JmxProxy proxy = new JmxProxy(handler, host, jmxUrl, jmxConn, ssProxy, ssMbeanName,
Expand Down Expand Up @@ -235,6 +235,18 @@ public List<String> tokenRangeToEndpoint(String keyspace, RingRange tokenRange)
return Lists.newArrayList();
}

/**
* @return all hosts in the ring with their host id
*/
@NotNull
public Map<String, String> getEndpointToHostId() {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
Map<String, String> hosts =
((StorageServiceMBean) ssProxy).getEndpointToHostId();

return hosts;
}

/**
* @return full class name of Cassandra's partitioner.
*/
Expand Down Expand Up @@ -309,8 +321,8 @@ public int getPendingCompactions() {
public boolean isRepairRunning() {
return isRepairRunningPre22() || isRepairRunningPost22() || isValidationCompactionRunning();
}


/**
* @return true if any repairs are running on the node.
*/
Expand All @@ -336,7 +348,7 @@ public boolean isRepairRunningPre22() {
// If uncertain, assume it's running
return true;
}

/**
* @return true if any repairs are running on the node.
*/
Expand All @@ -345,7 +357,7 @@ public boolean isValidationCompactionRunning() {
try {
int activeCount = (Integer) mbeanServer.getAttribute(new ObjectName(VALIDATION_ACTIVE_OBJECT_NAME), VALUE_ATTRIBUTE);
long pendingCount = (Long) mbeanServer.getAttribute(new ObjectName(VALIDATION_PENDING_OBJECT_NAME), VALUE_ATTRIBUTE);

return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn(FAILED_TO_CONNECT_TO_USING_JMX, host, ignored);
Expand All @@ -360,15 +372,15 @@ public boolean isValidationCompactionRunning() {
// If uncertain, assume it's not running
return false;
}

/**
* New way of determining if a repair is running after C* 2.2
*
*
* @return true if any repairs are running on the node.
*/
public boolean isRepairRunningPost22() {
try {
// list all mbeans in search of one with the name Repair#??
// list all mbeans in search of one with the name Repair#??
// This is the replacement for AntiEntropySessions since Cassandra 2.2
Set beanSet = mbeanServer.queryNames(new ObjectName("org.apache.cassandra.internal:*"), null);
for(Object bean:beanSet) {
Expand Down Expand Up @@ -425,7 +437,7 @@ public boolean tableExists(String ks, String cf) {
}
return true;
}

public String getCassandraVersion(){
return ((StorageServiceMBean) ssProxy).getReleaseVersion();
}
Expand All @@ -436,7 +448,7 @@ public String getCassandraVersion(){
* For time being, we don't allow local nor snapshot repairs.
*
* @return Repair command number, or 0 if nothing to repair
* @throws ReaperException
* @throws ReaperException
*/
public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keyspace,
RepairParallelism repairParallelism, Collection<String> columnFamilies, boolean fullRepair) throws ReaperException {
Expand Down Expand Up @@ -474,11 +486,11 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
throw new ReaperException(e);
}
}


public int triggerRepairPost2dot2(boolean fullRepair, RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken, String cassandraVersion) {
Map<String, String> options = new HashMap<>();

options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName());
//options.put(RepairOption.PRIMARY_RANGE_KEY, Boolean.toString(primaryRange));
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!fullRepair));
Expand All @@ -489,47 +501,47 @@ public int triggerRepairPost2dot2(boolean fullRepair, RepairParallelism repairPa
if (fullRepair) {
options.put(RepairOption.RANGES_KEY, beginToken.toString() + ":" + endToken.toString());
}

//options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(specificDataCenters, ","));
//options.put(RepairOption.HOSTS_KEY, StringUtils.join(specificHosts, ","));

return ((StorageServiceMBean) ssProxy).repairAsync(keyspace, options);
}

public int triggerRepair2dot1(boolean fullRepair, RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken, String cassandraVersion) {
if (fullRepair) {
// full repair
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));
columnFamilies.toArray(new String[columnFamilies.size()]));
}

boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);

return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));

}
}

// incremental repair
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
}

public int triggerRepairPre2dot1(RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken) {
// Cassandra 1.2 and 2.0 compatibility
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), null, null,
columnFamilies.toArray(new String[columnFamilies.size()]));
columnFamilies.toArray(new String[columnFamilies.size()]));
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));

}


Expand All @@ -549,12 +561,12 @@ public void handleNotification(Notification notification, Object handback) {
if (repairStatusHandler.isPresent() && ("repair").equals(type)) {
processOldApiNotification(notification);
}

if (repairStatusHandler.isPresent() && ("progress").equals(type)) {
processNewApiNotification(notification);
}
}

/**
* Handles notifications from the old repair API (forceRepairAsync)
*/
Expand All @@ -574,7 +586,7 @@ private void processOldApiNotification(Notification notification) {
LOG.error("Error while processing JMX notification", e);
}
}

/**
* Handles notifications from the new repair API (repairAsync)
*/
Expand Down Expand Up @@ -696,7 +708,7 @@ public void clearSnapshot(String repairId, String keyspaceName) throws ReaperExc
throw new ReaperException(e);
}
}

public List<String> getLiveNodes()
throws ReaperException {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
Expand Down
Loading

0 comments on commit 01b56e6

Please sign in to comment.