From accb60cb1c6a090c50ca5257e6a438bcaa4f8d40 Mon Sep 17 00:00:00 2001 From: Miles-Garnsey Date: Thu, 27 Oct 2022 12:30:20 +1100 Subject: [PATCH] Ensure that, in an incremental repair, the replica list is updated on every repair run. This addresses cases where the node IPs change. --- .../cassandrareaper/service/RepairRunner.java | 13 +- .../service/RepairRunnerTest.java | 141 +++++++++++++++++- 2 files changed, 149 insertions(+), 5 deletions(-) diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java b/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java index 73d6695b3..279b1840d 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java @@ -466,9 +466,16 @@ private void startNextSegment() throws ReaperException, InterruptedException { for (RepairSegment segment : nextRepairSegments) { Map potentialReplicaMap = this.repairRunService.getDCsByNodeForRepairSegment( cluster, segment.getTokenRange(), repairUnit.getKeyspaceName(), repairUnit); - potentialReplicas = repairUnit.getIncrementalRepair() - ? Collections.singletonList(segment.getCoordinatorHost()) - : potentialReplicaMap.keySet(); + if (repairUnit.getIncrementalRepair()) { + potentialReplicas = clusterFacade + .getRangeToEndpointMap(cluster, repairUnit.getKeyspaceName()) + .get(Arrays.asList( + segment.getTokenRange().getBaseRange().getStart().toString(), + segment.getTokenRange().getBaseRange().getEnd().toString()) + ); + } else { + potentialReplicas = potentialReplicaMap.keySet(); + } JmxProxy coordinator = clusterFacade.connect(cluster, potentialReplicas); if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) { nextRepairSegment = Optional.of(segment); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java index d0dc6a167..9a332e939 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java @@ -265,7 +265,7 @@ protected JmxProxy connectImpl(Node host) throws ReaperException { } }; context.repairManager.startRepairRun(run); - await().with().atMost(2, TimeUnit.MINUTES).until(() -> { + await().with().atMost(10, TimeUnit.MINUTES).until(() -> { try { mutex.acquire(); LOG.info("MUTEX ACQUIRED"); @@ -817,7 +817,7 @@ private static Map, List> addRangeToMap( } @Test - public void testDontFailRepairAfterTopologyChange() throws InterruptedException, ReaperException, + public void testDontFailRepairAfterTopologyChangeFullRepair() throws InterruptedException, ReaperException, MalformedObjectNameException, ReflectionException, IOException { final String ksName = "reaper"; final Set cfNames = Sets.newHashSet("reaper"); @@ -951,6 +951,143 @@ protected JmxProxy connectImpl(Node host) throws ReaperException { }); } + + + @Test + public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws InterruptedException, ReaperException, + MalformedObjectNameException, ReflectionException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = true; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final List nodeSetAfterTopologyChange = Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.4"); + final Map nodeMap = ImmutableMap.of("127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Map nodeMapAfterTopologyChange = ImmutableMap.of( + "127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.4", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorage storage = new MemoryStorage(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + storage.addCluster(cluster); + UUID cf = storage.addRepairUnit( + RepairUnit.builder() + .clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf); + final UUID runId = run.getId(); + final UUID segmentId = storage.getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED); + final JmxProxy jmx = JmxProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + when(jmx.getEndpointToHostId()).thenReturn(nodeMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + JmxProxyTest.mockGetEndpointSnitchInfoMBean(jmx, endpointSnitchInfoMBean); + ClusterFacade clusterFacade = mock(ClusterFacade.class); + when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); + when(clusterFacade.nodeIsAccessibleThroughJmx(any(), any())).thenReturn(true); + when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any())) + .thenReturn(Lists.newArrayList(nodeSet)); + when(clusterFacade.getRangeToEndpointMap(any(), anyString())) + .thenReturn((Map)ImmutableMap.of( + Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet), + Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSet))); + when(clusterFacade.getEndpointToHostId(any())).thenReturn(nodeMap); + when(clusterFacade.listActiveCompactions(any())).thenReturn( + CompactionStats.builder() + .withActiveCompactions(Collections.emptyList()) + .withPendingCompactions(Optional.of(0)) + .build()); + context.repairManager = RepairManager.create( + context, + clusterFacade, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), anyBoolean(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler)invocation.getArgument(7)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler)invocation.getArgument(7)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler)invocation.getArgument(7)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.jmxConnectionFactory = new JmxConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + ClusterFacade clusterProxy = ClusterFacade.create(context); + ClusterFacade clusterProxySpy = Mockito.spy(clusterProxy); + Mockito.doReturn(nodeSetAfterTopologyChange).when(clusterProxySpy).tokenRangeToEndpoint(any(), any(), any()); + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(runId).get().getRunState()); + storage.updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + // We'll now change the list of replicas for any segment, making the stored ones obsolete + when(clusterFacade.getRangeToEndpointMap(any(), anyString())) + .thenReturn((Map)ImmutableMap.of( + Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSetAfterTopologyChange), + Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSetAfterTopologyChange))); + when(clusterFacade.getEndpointToHostId(any())).thenReturn(nodeMapAfterTopologyChange); + when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any())) + .thenReturn(Lists.newArrayList(nodeSetAfterTopologyChange)); + context.repairManager.resumeRunningRepairRuns(); + + // The repair run should succeed despite the topology change. + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.DONE == storage.getRepairRun(runId).get().getRunState(); + }); + } + private RepairRun addNewRepairRun( final Map nodeMap, final double intensity,