Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology change resilience for incremental repair #1235

Merged
merged 22 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0593f56
Migration file to add host_id to repair_run table.
Miles-Garnsey Nov 8, 2022
52f1261
Changes to CassandraStorage to read/write new hostID field.
Miles-Garnsey Nov 8, 2022
95f0eae
Changes to RepairSegment class to include new hostID field.
Miles-Garnsey Nov 8, 2022
72a53d3
RepairRunService now adds in hostID for every repair RepairSegment it…
Miles-Garnsey Nov 8, 2022
92643af
RepairRunner now consults the hostID field for the nominated segment …
Miles-Garnsey Nov 8, 2022
f4f931c
Test for topology change when running incremental repairs.
Miles-Garnsey Nov 8, 2022
f5ae5bd
Make checkstyle happy.
Miles-Garnsey Nov 8, 2022
91ef907
Tweaks to the way hostID is instantiated on RepairSegment.
Miles-Garnsey Nov 8, 2022
7d2a410
Better mocks for the test.
Miles-Garnsey Nov 8, 2022
d25a9e4
Update mocks in failIncrRepairRunCreationTest to cater to new use of …
Miles-Garnsey Nov 8, 2022
8f74d82
HostID is only added where the repair run is incremental.
Miles-Garnsey Nov 9, 2022
6fa57f7
Tweak error messages in RepairRunner.
Miles-Garnsey Nov 9, 2022
0431d5b
Remove extra if statement from createRepairSegmentsForIncrementalRepair.
Miles-Garnsey Nov 10, 2022
16f0052
Test method `addNewRepairRun` needs to set the hostID for incremental…
Miles-Garnsey Nov 10, 2022
c186b9a
Fix up `testDontFailRepairAfterTopologyChangeIncrementalRepair` so th…
Miles-Garnsey Nov 10, 2022
12a15f6
`addNewRepairRun` was still missing a hostID on its second returned e…
Miles-Garnsey Nov 10, 2022
248c9ae
Don't throw exceptions when there are no potential replicas.
Miles-Garnsey Nov 10, 2022
0989743
Comments and optimisations.
Miles-Garnsey Nov 10, 2022
04f2d5d
Checkstyle...
Miles-Garnsey Nov 11, 2022
e8a2d04
Ensure createRepairSegmentFromRow storage function hydrates the hostI…
Miles-Garnsey Nov 13, 2022
aff7669
(Courtesy of Alexander Dejanovski) ensure that the segmentReplicas ar…
Miles-Garnsey Nov 16, 2022
4a0d51a
Change some info level logs to debug.
Miles-Garnsey Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public final class RepairSegment {
private final DateTime startTime;
private final DateTime endTime;
private final Map<String, String> replicas;
// hostID field is only ever populated for incremental repairs. For full repairs it is always null.
private final UUID hostID;


private RepairSegment(Builder builder, @Nullable UUID id) {
this.id = id;
Expand All @@ -58,6 +61,7 @@ private RepairSegment(Builder builder, @Nullable UUID id) {
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
this.hostID = builder.hostID;
}

public static Builder builder(Segment tokenRange, UUID repairUnitId) {
Expand Down Expand Up @@ -129,6 +133,10 @@ public boolean hasEndTime() {
return null != endTime;
}

public UUID getHostID() {
return hostID;
}

/** Reset to NOT_STARTED state, with nulled startTime and endTime. */
public Builder reset() {
Builder builder = new Builder(this);
Expand Down Expand Up @@ -159,6 +167,7 @@ public static final class Builder {
private DateTime startTime;
private DateTime endTime;
private Map<String, String> replicas;
private UUID hostID;

private Builder() {}

Expand All @@ -170,6 +179,7 @@ private Builder(Segment tokenRange, UUID repairUnitId) {
this.failCount = 0;
this.state = State.NOT_STARTED;
this.replicas = tokenRange.getReplicas();
this.hostID = null;
}

private Builder(RepairSegment original) {
Expand All @@ -183,6 +193,7 @@ private Builder(RepairSegment original) {
startTime = original.startTime;
endTime = original.endTime;
replicas = original.replicas;
hostID = original.hostID;
}

public Builder withRunId(UUID runId) {
Expand Down Expand Up @@ -242,6 +253,12 @@ public Builder withReplicas(Map<String, String> replicas) {
return this;
}

public Builder withHostID(UUID hostID) {
this.hostID = hostID;
return this;
}


public RepairSegment build() {
// a null segmentId is a special case where the storage uses a sequence for it
Preconditions.checkNotNull(runId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import io.cassandrareaper.jmx.JmxProxy;

import java.math.BigInteger;

import java.util.Arrays;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,7 +127,7 @@ public RepairRun registerRepairRun(

// the last preparation step is to generate actual repair segments
List<RepairSegment.Builder> segmentBuilders = repairUnit.getIncrementalRepair()
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit)
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit, cluster, clusterFacade)
: createRepairSegments(tokenSegments, repairUnit);

RepairRun repairRun = context.storage.addRepairRun(runBuilder, segmentBuilders);
Expand Down Expand Up @@ -333,23 +336,27 @@ private static List<RepairSegment.Builder> createRepairSegments(
@VisibleForTesting
static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRepair(
Map<String, RingRange> nodes,
RepairUnit repairUnit) {
RepairUnit repairUnit,
Cluster cluster,
ClusterFacade clusterFacade) throws ReaperException {

Map<String, String> endpointHostIdMap = clusterFacade.getEndpointToHostId(cluster);

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();

nodes
.entrySet()
.forEach(
range ->
repairSegmentBuilders.add(
RepairSegment.builder(
Segment.builder()
.withTokenRanges(Arrays.asList(range.getValue()))
.build(),
repairUnit.getId())
.withReplicas(Collections.emptyMap())
.withCoordinatorHost(range.getKey())));

.forEach(range -> {
RepairSegment.Builder segment = RepairSegment.builder(
Segment.builder()
.withTokenRanges(Arrays.asList(range.getValue()))
.build(),
repairUnit.getId())
.withReplicas(Collections.emptyMap())
.withCoordinatorHost(range.getKey())
.withHostID(UUID.fromString(endpointHostIdMap.get(range.getKey())));
repairSegmentBuilders.add(segment);
});
return repairSegmentBuilders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.cassandrareaper.storage.IDistributedStorage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -462,13 +461,24 @@ private void startNextSegment() throws ReaperException, InterruptedException {
repairRunId);

Optional<RepairSegment> nextRepairSegment = Optional.empty();
Collection<String> potentialReplicas = new HashSet<>();
final Collection<String> potentialReplicas = new HashSet<>();
for (RepairSegment segment : nextRepairSegments) {
Map<String, String> potentialReplicaMap = this.repairRunService.getDCsByNodeForRepairSegment(
cluster, segment.getTokenRange(), repairUnit.getKeyspaceName(), repairUnit);
potentialReplicas = repairUnit.getIncrementalRepair()
? Collections.singletonList(segment.getCoordinatorHost())
: potentialReplicaMap.keySet();
if (repairUnit.getIncrementalRepair()) {
Map<String, String> endpointHostIdMap = clusterFacade.getEndpointToHostId(cluster);
if (segment.getHostID() == null) {
throw new ReaperException(
String.format("No host ID for repair segment %s", segment.getId().toString())
);
}
endpointHostIdMap.entrySet().stream()
.filter(entry -> entry.getValue().equals(segment.getHostID().toString()))
.forEach(entry -> potentialReplicas.add(entry.getKey()));
} else {
potentialReplicas.addAll(potentialReplicaMap.keySet());
}
LOG.debug("Potential replicas for segment {}: {}", segment.getId(), potentialReplicas);
JmxProxy coordinator = clusterFacade.connect(cluster, potentialReplicas);
if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) {
nextRepairSegment = Optional.of(segment);
Expand Down Expand Up @@ -533,6 +543,7 @@ private boolean nodesReadyForNewRepair(
UUID segmentId) {

Collection<String> nodes = getNodesInvolvedInSegment(dcByNode);
LOG.debug("Nodes involved in segment {}: {}", segmentId, nodes);
String dc = EndpointSnitchInfoProxy.create(coordinator).getDataCenter();
boolean requireAllHostMetrics = DatacenterAvailability.LOCAL != context.config.getDatacenterAvailability();
boolean allLocalDcHostsChecked = true;
Expand Down Expand Up @@ -577,6 +588,7 @@ private boolean nodesReadyForNewRepair(
LOG.debug("Ok to repair segment '{}' on repair run with id '{}'", segment.getId(), segment.getRunId());
return true;
} else {
LOG.debug("Couldn't get metrics for hosts {}, will retry later", unreachableNodes);
String msg = String.format(
"Postponed repair segment %s on repair run with id %s because we couldn't get %shosts metrics on %s",
segment.getId(),
Expand Down Expand Up @@ -635,7 +647,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
String keyspace = repairUnit.getKeyspaceName();
LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRunId);

List<String> potentialCoordinators;
List<String> potentialCoordinators = Lists.newArrayList();
if (!repairUnit.getIncrementalRepair()) {
// full repair
try {
Expand Down Expand Up @@ -675,7 +687,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
Thread.sleep(ThreadLocalRandom.current().nextInt(10, 100) * 100);
Optional<RepairSegment> rs = context.storage.getRepairSegment(repairRunId, segmentId);
if (rs.isPresent()) {
potentialCoordinators = Arrays.asList(rs.get().getCoordinatorHost());
potentialCoordinators.addAll(segmentReplicas);
} else {
// the segment has been removed. should only happen in tests on backends that delete repair segments.
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,34 +444,34 @@ private void prepareStatements() {
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,repair_unit_id,start_token,end_token,"
+ " segment_state,fail_count, token_ranges, replicas)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ " segment_state,fail_count, token_ranges, replicas,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
insertRepairSegmentIncrementalPrepStmt = session
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,repair_unit_id,start_token,end_token,"
+ "segment_state,coordinator_host,fail_count,replicas)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ "segment_state,coordinator_host,fail_count,replicas,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
updateRepairSegmentPrepStmt = session
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,segment_state,coordinator_host,segment_start_time,fail_count)"
+ " VALUES(?, ?, ?, ?, ?, ?)")
+ "(id,segment_id,segment_state,coordinator_host,segment_start_time,fail_count,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
insertRepairSegmentEndTimePrepStmt = session
.prepare("INSERT INTO repair_run(id, segment_id, segment_end_time) VALUES(?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
getRepairSegmentPrepStmt = session
.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas, host_id"
+ " FROM repair_run WHERE id = ? and segment_id = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
getRepairSegmentsByRunIdPrepStmt = session.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,"
+ "segment_end_time,fail_count, token_ranges, replicas FROM repair_run WHERE id = ?");
+ "segment_end_time,fail_count, token_ranges, replicas, host_id FROM repair_run WHERE id = ?");
getRepairSegmentCountByRunIdPrepStmt = session.prepare("SELECT count(*) FROM repair_run WHERE id = ?");
prepareScheduleStatements();
prepareLeaderElectionStatements(timeUdf);
Expand Down Expand Up @@ -504,7 +504,7 @@ private void prepareStatements() {
try {
getRepairSegmentsByRunIdAndStatePrepStmt = session.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas FROM repair_run "
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas, host_id FROM repair_run "
+ "WHERE id = ? AND segment_state = ? ALLOW FILTERING");
getRepairSegmentCountByRunIdAndStatePrepStmt = session.prepare(
"SELECT count(segment_id) FROM repair_run WHERE id = ? AND segment_state = ? ALLOW FILTERING");
Expand Down Expand Up @@ -817,7 +817,10 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.getFailCount(),
segment.getReplicas()));
segment.getReplicas(),
segment.getHostID()
)
);
} else {
try {
repairRunBatch.add(
Expand All @@ -830,7 +833,10 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
segment.getState().ordinal(),
segment.getFailCount(),
objectMapper.writeValueAsString(segment.getTokenRange().getTokenRanges()),
segment.getReplicas()));
segment.getReplicas(),
segment.getHostID()
)
);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
Expand Down Expand Up @@ -1134,7 +1140,10 @@ public boolean updateRepairSegmentUnsafe(RepairSegment segment) {
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.hasStartTime() ? segment.getStartTime().toDate() : null,
segment.getFailCount()));
segment.getFailCount(),
segment.getHostID()
)
);

if (null != segment.getEndTime() || State.NOT_STARTED == segment.getState()) {

Expand Down Expand Up @@ -1228,6 +1237,10 @@ private static RepairSegment createRepairSegmentFromRow(Row segmentRow) {
builder = builder.withReplicas(segmentRow.getMap("replicas", String.class, String.class));
}

if (null != segmentRow.getUUID("host_id")) {
builder = builder.withHostID(segmentRow.getUUID("host_id"));
}

return builder.withId(segmentRow.getUUID("segment_id")).build();
}

Expand Down
18 changes: 18 additions & 0 deletions src/server/src/main/resources/db/cassandra/031_add_hostID.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- Copyright 2021-2021 Datastax inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-- Store the percent unrepaired threshold to trigger incremental repair schedules

ALTER TABLE repair_run ADD host_id uuid;
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;

import com.datastax.driver.core.utils.UUIDs;
Expand Down Expand Up @@ -493,6 +495,12 @@ public void failIncrRepairRunCreationTest() throws ReaperException, UnknownHostE
.thenReturn((Map)ImmutableMap.of(Lists.newArrayList("0", "100"), Lists.newArrayList(NODES)));
when(clusterFacade.getCassandraVersion(any())).thenReturn("3.11.6");
when(clusterFacade.getTokens(any())).thenReturn(TOKENS);
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(Collections.emptyMap());
Map<String, String> endpointToHostIDMap = new HashMap<String, String>();
endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString());
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(endpointToHostIDMap);

RepairRunService repairRunService = RepairRunService.create(context, () -> clusterFacade);

Expand Down Expand Up @@ -634,7 +642,7 @@ public JmxProxy connectImpl(Node host) throws ReaperException {
}

@Test
public void createRepairSegmentsForIncrementalRepairTest() {
public void createRepairSegmentsForIncrementalRepairTest() throws ReaperException {
final String KS_NAME = "reaper";
final Set<String> CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
Expand All @@ -643,7 +651,7 @@ public void createRepairSegmentsForIncrementalRepairTest() {
final Set<String> BLACKLISTED_TABLES = Collections.emptySet();
final int REPAIR_THREAD_COUNT = 1;
final int segmentTimeout = 30;
Cluster cluster = Cluster.builder()
final Cluster cluster = Cluster.builder()
.withName("test_" + RandomStringUtils.randomAlphabetic(12))
.withSeedHosts(ImmutableSet.of("127.0.0.1", "127.0.0.2", "127.0.0.3"))
.withState(Cluster.State.ACTIVE)
Expand All @@ -654,6 +662,12 @@ public void createRepairSegmentsForIncrementalRepairTest() {
nodes.put("127.0.0.1", new RingRange("1", "2"));
nodes.put("127.0.0.2", new RingRange("3", "4"));

Map<String, String> endpointToHostIDMap = new HashMap<String, String>();
endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString());
ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(endpointToHostIDMap);
RepairUnit repairUnit = RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(KS_NAME)
Expand All @@ -665,9 +679,8 @@ public void createRepairSegmentsForIncrementalRepairTest() {
.repairThreadCount(REPAIR_THREAD_COUNT)
.incrementalRepair(true)
.timeout(segmentTimeout).build(UUIDs.timeBased());

List<RepairSegment.Builder> segmentBuilders
= RepairRunService.createRepairSegmentsForIncrementalRepair(nodes, repairUnit);
= RepairRunService.createRepairSegmentsForIncrementalRepair(nodes, repairUnit, cluster, clusterFacade);
assertEquals("Not enough segment builders were created", 2, segmentBuilders.size());
}

Expand Down
Loading