Skip to content

Commit

Permalink
Ensure that, in an incremental repair, the replica list is updated on…
Browse files Browse the repository at this point in the history
… every repair run. This addresses cases where the node IPs change.
  • Loading branch information
Miles-Garnsey committed Oct 27, 2022
1 parent 5b2c027 commit 27186ed
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,16 @@ private void startNextSegment() throws ReaperException, InterruptedException {
for (RepairSegment segment : nextRepairSegments) {
Map<String, String> potentialReplicaMap = this.repairRunService.getDCsByNodeForRepairSegment(
cluster, segment.getTokenRange(), repairUnit.getKeyspaceName(), repairUnit);
potentialReplicas = repairUnit.getIncrementalRepair()
? Collections.singletonList(segment.getCoordinatorHost())
: potentialReplicaMap.keySet();
if (repairUnit.getIncrementalRepair()) {
potentialReplicas = clusterFacade
.getRangeToEndpointMap(cluster, repairUnit.getKeyspaceName())
.get(Arrays.asList(
segment.getTokenRange().getBaseRange().getStart().toString(),
segment.getTokenRange().getBaseRange().getEnd().toString())
);
} else {
potentialReplicas = potentialReplicaMap.keySet();
}
JmxProxy coordinator = clusterFacade.connect(cluster, potentialReplicas);
if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) {
nextRepairSegment = Optional.of(segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected JmxProxy connectImpl(Node host) throws ReaperException {
}
};
context.repairManager.startRepairRun(run);
await().with().atMost(2, TimeUnit.MINUTES).until(() -> {
await().with().atMost(10, TimeUnit.MINUTES).until(() -> {
try {
mutex.acquire();
LOG.info("MUTEX ACQUIRED");
Expand Down Expand Up @@ -817,7 +817,7 @@ private static Map<List<String>, List<String>> addRangeToMap(
}

@Test
public void testDontFailRepairAfterTopologyChange() throws InterruptedException, ReaperException,
public void testDontFailRepairAfterTopologyChangeFullRepair() throws InterruptedException, ReaperException,
MalformedObjectNameException, ReflectionException, IOException {
final String ksName = "reaper";
final Set<String> cfNames = Sets.newHashSet("reaper");
Expand Down Expand Up @@ -951,6 +951,143 @@ protected JmxProxy connectImpl(Node host) throws ReaperException {
});
}



@Test
public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws InterruptedException, ReaperException,
MalformedObjectNameException, ReflectionException, IOException {
final String ksName = "reaper";
final Set<String> cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = true;
final Set<String> nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final List<String> nodeSetAfterTopologyChange = Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.4");
final Map<String, String> nodeMap = ImmutableMap.of("127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1");
final Map<String, String> nodeMapAfterTopologyChange = ImmutableMap.of(
"127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.4", "dc1");
final Set<String> datacenters = Collections.emptySet();
final Set<String> blacklistedTables = Collections.emptySet();
final double intensity = 0.5f;
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorage storage = new MemoryStorage();
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
storage.addCluster(cluster);
UUID cf = storage.addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
.nodes(nodeSet)
.datacenters(datacenters)
.blacklistedTables(blacklistedTables)
.repairThreadCount(repairThreadCount)
.timeout(segmentTimeout))
.getId();
RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf);
final UUID runId = run.getId();
final UUID segmentId = storage.getNextFreeSegments(run.getId()).get(0).getId();
assertEquals(storage.getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED);
final JmxProxy jmx = JmxProxyTest.mockJmxProxyImpl();
when(jmx.getClusterName()).thenReturn(cluster.getName());
when(jmx.isConnectionAlive()).thenReturn(true);
when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps());
when(jmx.getEndpointToHostId()).thenReturn(nodeMap);
when(jmx.getTokens()).thenReturn(tokens);
EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class);
when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1");
try {
when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1");
} catch (UnknownHostException ex) {
throw new AssertionError(ex);
}
JmxProxyTest.mockGetEndpointSnitchInfoMBean(jmx, endpointSnitchInfoMBean);
ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
when(clusterFacade.nodeIsAccessibleThroughJmx(any(), any())).thenReturn(true);
when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any()))
.thenReturn(Lists.newArrayList(nodeSet));
when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
.thenReturn((Map)ImmutableMap.of(
Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet),
Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSet)));
when(clusterFacade.getEndpointToHostId(any())).thenReturn(nodeMap);
when(clusterFacade.listActiveCompactions(any())).thenReturn(
CompactionStats.builder()
.withActiveCompactions(Collections.emptyList())
.withPendingCompactions(Optional.of(0))
.build());
context.repairManager = RepairManager.create(
context,
clusterFacade,
Executors.newScheduledThreadPool(10),
1,
TimeUnit.MILLISECONDS,
1);
AtomicInteger repairNumberCounter = new AtomicInteger(1);
when(jmx.triggerRepair(any(), any(), any(), any(), any(), anyBoolean(), any(), any(), any(), anyInt()))
.then(
(invocation) -> {
final int repairNumber = repairNumberCounter.getAndIncrement();
new Thread() {
@Override
public void run() {
((RepairStatusHandler)invocation.getArgument(7))
.handle(
repairNumber,
Optional.of(ActiveRepairService.Status.STARTED),
Optional.empty(),
null,
jmx);
((RepairStatusHandler)invocation.getArgument(7))
.handle(
repairNumber,
Optional.of(ActiveRepairService.Status.SESSION_SUCCESS),
Optional.empty(),
null,
jmx);
((RepairStatusHandler)invocation.getArgument(7))
.handle(
repairNumber,
Optional.of(ActiveRepairService.Status.FINISHED),
Optional.empty(),
null,
jmx);
}
}.start();
return repairNumber;
});
context.jmxConnectionFactory = new JmxConnectionFactory(context, new NoopCrypotograph()) {
@Override
protected JmxProxy connectImpl(Node host) throws ReaperException {
return jmx;
}
};
ClusterFacade clusterProxy = ClusterFacade.create(context);
ClusterFacade clusterProxySpy = Mockito.spy(clusterProxy);
Mockito.doReturn(nodeSetAfterTopologyChange).when(clusterProxySpy).tokenRangeToEndpoint(any(), any(), any());
assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(runId).get().getRunState());
storage.updateRepairRun(
run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId));
// We'll now change the list of replicas for any segment, making the stored ones obsolete
when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
.thenReturn((Map)ImmutableMap.of(
Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSetAfterTopologyChange),
Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSetAfterTopologyChange)));
when(clusterFacade.getEndpointToHostId(any())).thenReturn(nodeMapAfterTopologyChange);
when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any()))
.thenReturn(Lists.newArrayList(nodeSetAfterTopologyChange));
context.repairManager.resumeRunningRepairRuns();

// The repair run should succeed despite the topology change.
await().with().atMost(20, TimeUnit.SECONDS).until(() -> {
return RepairRun.RunState.DONE == storage.getRepairRun(runId).get().getRunState();
});
}

private RepairRun addNewRepairRun(
final Map<String, String> nodeMap,
final double intensity,
Expand Down

0 comments on commit 27186ed

Please sign in to comment.