Skip to content

Commit

Permalink
Add dynamic discovery of seeds for clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Mar 13, 2017
1 parent bec2916 commit b15bc66
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 17 deletions.
4 changes: 3 additions & 1 deletion resource/cassandra-reaper-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ storageType: cassandra
enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true

jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
Expand Down Expand Up @@ -56,4 +58,4 @@ autoScheduling:
scheduleSpreadPeriod: PT6H
excludedKeyspaces:
- keyspace1
- keyspace2
- keyspace2
4 changes: 3 additions & 1 deletion resource/cassandra-reaper-h2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ storageType: database
enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true

jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
Expand Down Expand Up @@ -58,4 +60,4 @@ autoScheduling:
scheduleSpreadPeriod: PT6H
excludedKeyspaces:
- keyspace1
- keyspace2
- keyspace2
4 changes: 3 additions & 1 deletion resource/cassandra-reaper-memory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ storageType: memory
enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true

jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
Expand Down Expand Up @@ -51,4 +53,4 @@ autoScheduling:
scheduleSpreadPeriod: PT6H
excludedKeyspaces:
- keyspace1
- keyspace2
- keyspace2
4 changes: 3 additions & 1 deletion resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ storageType: database
enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true

jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
Expand Down Expand Up @@ -58,4 +60,4 @@ autoScheduling:
scheduleSpreadPeriod: PT6H
excludedKeyspaces:
- keyspace1
- keyspace2
- keyspace2
2 changes: 2 additions & 0 deletions resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ storageType: memory
enableCrossOrigin: true
incrementalRepair: false
allowUnreachableNodes: false
enableDynamicSeedList: true

jmxPorts:
127.0.0.1: 7100
127.0.0.2: 7200
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ public class ReaperApplicationConfiguration extends Configuration {
@JsonProperty
@DefaultValue("false")
private Boolean allowUnreachableNodes;

@JsonProperty
private AutoSchedulingConfiguration autoScheduling;

@JsonProperty
@DefaultValue("true")
private Boolean enableDynamicSeedList;

public int getSegmentCount() {
return segmentCount;
Expand Down Expand Up @@ -198,6 +203,14 @@ public AutoSchedulingConfiguration getAutoScheduling() {
public void setAutoScheduling(AutoSchedulingConfiguration autoRepairScheduling) {
this.autoScheduling = autoRepairScheduling;
}

public void setEnableDynamicSeedList(Boolean enableDynamicSeedList) {
this.enableDynamicSeedList = enableDynamicSeedList;
}

public Boolean getEnableDynamicSeedList() {
return this.enableDynamicSeedList==null?Boolean.TRUE:this.enableDynamicSeedList;
}

public static class JmxCredentials {

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,17 @@ public void clearSnapshot(String repairId, String keyspaceName) throws ReaperExc
throw new ReaperException(e);
}
}

public List<String> getLiveNodes()
throws ReaperException {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
try {
return ((StorageServiceMBean) ssProxy).getLiveNodes();
} catch (Exception e) {
LOG.error(e.getMessage());
throw new ReaperException(e.getMessage(), e);
}
}
}

/**
Expand Down
66 changes: 54 additions & 12 deletions src/main/java/com/spotify/reaper/resources/ClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -167,18 +169,39 @@ public Response addCluster(
return viewCluster(newCluster.getName(), Optional.<Integer>absent(), Optional.of(createdURI));
}

public Cluster createClusterWithSeedHost(String seedHost)
public Cluster createClusterWithSeedHost(String seedHostInput)
throws ReaperException {
String clusterName;
String partitioner;
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost)) {
clusterName = jmxProxy.getClusterName();
partitioner = jmxProxy.getPartitioner();
} catch (ReaperException e) {
LOG.error("failed to create cluster with seed host: {}", seedHost, e);
throw e;
Optional<String> clusterName = Optional.absent();
Optional<String> partitioner = Optional.absent();
Optional<List<String>> liveNodes = Optional.absent();
Set<String> seedHosts = parseSeedHosts(seedHostInput);
for(String seedHost:seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost)) {
clusterName = Optional.of(jmxProxy.getClusterName());
partitioner = Optional.of(jmxProxy.getPartitioner());
liveNodes = Optional.of(jmxProxy.getLiveNodes());
break;
} catch (ReaperException e) {
LOG.error("failed to create cluster with seed host: {}", seedHost, e);
}
}
return new Cluster(clusterName, partitioner, Collections.singleton(seedHost));

if(!clusterName.isPresent()) {
throw new ReaperException("Could not connect any seed host");
}


Set<String> seedHostsFinal = seedHosts;
if (context.config.getEnableDynamicSeedList()
&& liveNodes.isPresent()) {
seedHostsFinal = liveNodes.get().size()>0
?liveNodes.get().stream().collect(Collectors.toSet())
:seedHosts;
}

LOG.debug("Seeds {}", seedHostsFinal);

return new Cluster(clusterName.get(), partitioner.get(), seedHostsFinal);
}

@PUT
Expand All @@ -201,8 +224,22 @@ public Response modifyClusterSeed(
.build();
}

Set<String> newSeeds = Collections.singleton(seedHost.get());
if (newSeeds.equals(cluster.get().getSeedHosts())) {
Set<String> newSeeds = parseSeedHosts(seedHost.get());
Optional<List<String>> liveNodes = Optional.absent();

if(context.config.getEnableDynamicSeedList()) {
for(String seed:newSeeds) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seed)) {
liveNodes = Optional.of(jmxProxy.getLiveNodes());
newSeeds = liveNodes.get().stream().collect(Collectors.toSet());
break;
} catch (ReaperException e) {
LOG.error("failed to create cluster with seed host: {}", seedHost, e);
}
}
}

if (newSeeds.equals(cluster.get().getSeedHosts()) || newSeeds.isEmpty()) {
return Response.notModified().build();
}

Expand Down Expand Up @@ -253,5 +290,10 @@ public Response deleteCluster(
return Response.serverError().entity("delete failed for schedule with name \""
+ clusterName + "\"").build();
}

private Set<String> parseSeedHosts(String seedHost) {
return Arrays.stream(seedHost.split(",")).map(seed -> seed.trim()).collect(Collectors.toSet());
}


}
3 changes: 3 additions & 0 deletions src/test/java/com/spotify/reaper/acceptance/BasicSteps.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -75,6 +76,8 @@ public static void setupReaperTestRunner() throws Exception {
when(jmx.getPartitioner()).thenReturn("org.apache.cassandra.dht.RandomPartitioner");
when(jmx.getKeyspaces()).thenReturn(Lists.newArrayList(clusterKeyspaces.keySet()));
when(jmx.getTokens()).thenReturn(Lists.newArrayList(new BigInteger("0")));
when(jmx.getLiveNodes()).thenReturn(Arrays.asList(seedHost));

for (String keyspace : clusterKeyspaces.keySet()) {
when(jmx.getTableNamesForKeyspace(keyspace)).thenReturn(clusterKeyspaces.get(keyspace));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.URI;
import java.time.Duration;
import java.util.Arrays;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
Expand Down Expand Up @@ -51,6 +52,7 @@ public void setUp() throws Exception {
jmxProxy = mock(JmxProxy.class);
when(jmxProxy.getClusterName()).thenReturn(CLUSTER_NAME);
when(jmxProxy.getPartitioner()).thenReturn(PARTITIONER);
when(jmxProxy.getLiveNodes()).thenReturn(Arrays.asList(SEED_HOST));
context.jmxConnectionFactory = new JmxConnectionFactory() {
@Override
public JmxProxy connect(Optional<RepairStatusHandler> handler, String host)
Expand Down Expand Up @@ -109,10 +111,12 @@ public void testGetExistingCluster() {
}

@Test
public void testModifyClusterSeeds() {
public void testModifyClusterSeeds() throws ReaperException {
ClusterResource clusterResource = new ClusterResource(context);
clusterResource.addCluster(uriInfo, Optional.of(SEED_HOST));

when(jmxProxy.getLiveNodes()).thenReturn(Arrays.asList(SEED_HOST+1));

Response response = clusterResource.modifyClusterSeed(uriInfo, CLUSTER_NAME,
Optional.of(SEED_HOST + 1));

Expand All @@ -126,6 +130,7 @@ public void testModifyClusterSeeds() {

response = clusterResource.modifyClusterSeed(uriInfo, CLUSTER_NAME, Optional.of(SEED_HOST + 1));
assertEquals(304, response.getStatus());
when(jmxProxy.getLiveNodes()).thenReturn(Arrays.asList(SEED_HOST));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/cassandra-reaper-at.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ repairRunThreadCount: 15
hangingRepairTimeoutMins: 1
storageType: memory
incrementalRepair: false
enableDynamicSeedList: false

logging:
level: DEBUG
Expand Down

0 comments on commit b15bc66

Please sign in to comment.