Skip to content

Commit

Permalink
Add ability to list and abort segments
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Feb 8, 2018
1 parent 42784f8 commit 263f4ec
Show file tree
Hide file tree
Showing 34 changed files with 936 additions and 130 deletions.
34 changes: 32 additions & 2 deletions src/packaging/bin/spreaper
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ def _arguments_for_status_keyspace(parser):
parser.add_argument("cluster_name", help="the cluster name")
parser.add_argument("keyspace_name", help="the keyspace name")

def _arguments_for_list_segments(parser):
"""Arguments relevant for querying a repair status"""
parser.add_argument("run_id", help="identifier of the run to fetch segments from")

def _arguments_for_status_repair(parser):
"""Arguments relevant for querying a repair status"""
Expand Down Expand Up @@ -234,6 +237,10 @@ def _arguments_for_abort_repair(parser):
"""Arguments needed for aborting a repair"""
parser.add_argument("run_id", help="ID of the repair run to abort")

def _arguments_for_abort_segment(parser):
"""Arguments needed for aborting a repair"""
parser.add_argument("run_id", help="ID of the repair run to abort")
parser.add_argument("segment_id", help="ID of the segment to abort")

def _arguments_for_start_schedule(parser):
"""Arguments relevant for resuming a repair schedule"""
Expand Down Expand Up @@ -286,6 +293,7 @@ Usage: spreaper [<global_args>] <command> [<command_args>]
list-clusters List all registered Cassandra clusters.
list-runs List registered repair runs.
list-schedules List registered repair schedules.
list-segments List all segments for a given repair run.
status-cluster Show status of a Cassandra cluster,
and any existing repair runs for the cluster.
status-keyspace Show status of a keyspace in a cluster.
Expand All @@ -300,6 +308,7 @@ Usage: spreaper [<global_args>] <command> [<command_args>]
resume-repair Resume a paused, start a not started or reattempt a failed repair run.
pause-repair Pause a repair run.
abort-repair Abort a repair run.
abort-segment Abort a segment.
start-schedule Resume a paused repair schedule.
pause-schedule Pause a repair schedule.
delete-schedule Delete a repair schedule.
Expand Down Expand Up @@ -377,6 +386,17 @@ class ReaperCLI(object):
print "# Found {0} repair runs".format(len(repair_runs))
print json.dumps(repair_runs, indent=2, sort_keys=True)

def list_segments(self):
reaper, args = ReaperCLI.prepare_reaper(
"list-segments",
"List segments for a given repair run",
extra_arguments=_arguments_for_list_segments
)
print "# Listing segments for repair run '{0}'".format(args.run_id)
segments = json.loads(reaper.get("repair_run/{0}/segments".format(args.run_id)))
print "# Found {0} segments".format(len(segments))
print json.dumps(segments, indent=2, sort_keys=True)

def list_schedules(self):
reaper, args = ReaperCLI.prepare_reaper(
"list-schedules",
Expand Down Expand Up @@ -577,6 +597,16 @@ class ReaperCLI(object):
reaper.put("repair_run/{0}".format(args.run_id), state="ABORTED")
print "# Repair run '{0}' aborted".format(args.run_id)

def abort_segment(self):
reaper, args = ReaperCLI.prepare_reaper(
"abort-segment",
"Abort a segment.",
extra_arguments=_arguments_for_abort_segment
)
print "# Aborting a segment with run id: {0} and segment id {1}".format(args.run_id, args.segment_id)
reaper.get("repair_run/{0}/segments/abort/{1}".format(args.run_id, args.segment_id))
print "# Segment '{0}' aborted".format(args.segment_id)

def start_schedule(self):
reaper, args = ReaperCLI.prepare_reaper(
"start-schedule",
Expand Down Expand Up @@ -609,6 +639,6 @@ class ReaperCLI(object):


if __name__ == '__main__':
print("# Report improvements/bugs at https://github.com/spotify/cassandra-reaper/issues")
print("# ------------------------------------------------------------------------------")
print("# Report improvements/bugs at https://github.com/thelastpickle/cassandra-reaper/issues")
print("# ------------------------------------------------------------------------------------")
ReaperCLI()
46 changes: 36 additions & 10 deletions src/server/src/main/java/io/cassandrareaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import java.util.UUID;
import javax.annotation.Nullable;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;
import org.joda.time.DateTime;

@JsonDeserialize(builder = RepairSegment.Builder.class)
public final class RepairSegment {

private static final boolean STRICT = !Boolean.getBoolean("reaper.disableSegmentChecks");
Expand Down Expand Up @@ -69,10 +73,12 @@ public RingRange getTokenRange() {
return tokenRange;
}

@JsonIgnore
public BigInteger getStartToken() {
return tokenRange.getStart();
}

@JsonIgnore
public BigInteger getEndToken() {
return tokenRange.getEnd();
}
Expand Down Expand Up @@ -127,17 +133,21 @@ public enum State {
DONE
}

@JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with")
public static final class Builder {

public final RingRange tokenRange;
private final UUID repairUnitId;
private UUID repairUnitId;
private RingRange tokenRange;
private UUID id;
private UUID runId;
private int failCount;
private State state;
private String coordinatorHost;
private DateTime startTime;
private DateTime endTime;

private Builder() {}

private Builder(RingRange tokenRange, UUID repairUnitId) {
Preconditions.checkNotNull(tokenRange);
Preconditions.checkNotNull(repairUnitId);
Expand All @@ -149,6 +159,7 @@ private Builder(RingRange tokenRange, UUID repairUnitId) {

private Builder(RepairSegment original) {
runId = original.runId;
id = original.id;
repairUnitId = original.repairUnitId;
tokenRange = original.tokenRange;
failCount = original.failCount;
Expand All @@ -164,23 +175,34 @@ public Builder withRunId(UUID runId) {
return this;
}

public Builder failCount(int failCount) {
public Builder withRepairUnitId(UUID repairUnitId) {
Preconditions.checkNotNull(repairUnitId);
this.repairUnitId = repairUnitId;
return this;
}

public Builder withTokenRange(RingRange tokenRange) {
this.tokenRange = tokenRange;
return this;
}

public Builder withFailCount(int failCount) {
this.failCount = failCount;
return this;
}

public Builder state(State state) {
public Builder withState(State state) {
Preconditions.checkNotNull(state);
this.state = state;
return this;
}

public Builder coordinatorHost(@Nullable String coordinatorHost) {
public Builder withCoordinatorHost(@Nullable String coordinatorHost) {
this.coordinatorHost = coordinatorHost;
return this;
}

public Builder startTime(DateTime startTime) {
public Builder withStartTime(DateTime startTime) {
Preconditions.checkState(
null != startTime || null == endTime,
"unsetting startTime only permitted if endTime unset");
Expand All @@ -189,13 +211,17 @@ public Builder startTime(DateTime startTime) {
return this;
}

public Builder endTime(DateTime endTime) {
Preconditions.checkNotNull(endTime);
public Builder withEndTime(DateTime endTime) {
this.endTime = endTime;
return this;
}

public RepairSegment build(@Nullable UUID segmentId) {
public Builder withId(@Nullable UUID segmentId) {
this.id = segmentId;
return this;
}

public RepairSegment build() {
// a null segmentId is a special case where the storage uses a sequence for it
Preconditions.checkNotNull(runId);
if (STRICT) {
Expand All @@ -214,7 +240,7 @@ public RepairSegment build(@Nullable UUID segmentId) {
}
}

return new RepairSegment(this, segmentId);
return new RepairSegment(this, this.id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,53 @@ public Response getRepairRun(
}
}

/**
* @return list the segments of a repair run.
*/
@GET
@Path("/{id}/segments")
public Response getRepairRunSegments(@PathParam("id") UUID repairRunId) {

LOG.debug("get repair_run called with: id = {}", repairRunId);
final Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
Collection<RepairSegment> segments = context.storage.getRepairSegmentsForRun(repairRunId);
return Response.ok().entity(segments).build();
} else {
return Response.status(404)
.entity("repair run with id " + repairRunId + " doesn't exist")
.build();
}
}

/**
* @return Aborts a running segment.
*/
@GET
@Path("/{id}/segments/abort/{segment_id}")
public Response getRepairRunSegments(
@PathParam("id") UUID repairRunId, @PathParam("segment_id") UUID segmentId) {

LOG.debug("abort segment called with: run id = {} and segment id = {}", repairRunId, segmentId);
final Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
if (RepairRun.RunState.RUNNING == repairRun.get().getRunState()
|| RepairRun.RunState.PAUSED == repairRun.get().getRunState()) {
RepairSegment segment = context.repairManager.abortSegment(repairRunId, segmentId);
return Response.ok().entity(segment).build();
} else {
return Response.status(Response.Status.FORBIDDEN)
.entity(
"Cannot abort segment on repair run with status " + repairRun.get().getRunState())
.build();
}
} else {
return Response.status(404)
.entity("repair run with id " + repairRunId + " doesn't exist")
.build();
}
}

/**
* @return all know repair runs for a cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.storage.IDistributedStorage;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -34,7 +35,6 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -118,7 +118,7 @@ public void resumeRunningRepairRuns() throws ReaperException {
context.storage.getSegmentsWithState(
pausedRepairRun.getId(), RepairSegment.State.RUNNING);

abortSegments(runningSegments, pausedRepairRun, false);
abortSegments(runningSegments, pausedRepairRun, false, false);
}

if (!repairRunners.containsKey(pausedRepairRun.getId())) {
Expand Down Expand Up @@ -157,31 +157,42 @@ private void abortSegmentsWithNoLeader(RepairRun repairRun, Collection<RepairSeg
.filter(segment -> !activeLeaders.contains(segment.getId()))
.collect(Collectors.toSet()),
repairRun,
false,
true);
}
}

void abortSegments(
Collection<RepairSegment> runningSegments,
RepairRun repairRun) {
public RepairSegment abortSegment(UUID repairRunId, UUID segmentId) {
RepairSegment segment = context.storage.getRepairSegment(repairRunId, segmentId).get();
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
if (context.storage instanceof IDistributedStorage) {
((IDistributedStorage) context.storage).forceReleaseLead(segmentId);
((IDistributedStorage) context.storage).takeLead(segmentId);
}
if (null == segment.getCoordinatorHost() || RepairSegment.State.DONE == segment.getState()) {
SegmentRunner.postponeSegment(context, segment);
} else {
abortSegments(Arrays.asList(segment), repairRun, true, false);
}

abortSegments(
runningSegments,
repairRun,
false);
return context.storage.getRepairSegment(repairRunId, segmentId).get();
}

void abortSegments(
void abortSegments(Collection<RepairSegment> runningSegments, RepairRun repairRun) {
abortSegments(runningSegments, repairRun, false, false);
}

public void abortSegments(
Collection<RepairSegment> runningSegments,
RepairRun repairRun,
boolean forced,
boolean postponeWithoutAborting) {

RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()).get();
for (RepairSegment segment : runningSegments) {
LOG.debug(
"Trying to abort stuck segment {} in repair run {}", segment.getId(), repairRun.getId());
UUID leaderElectionId = repairUnit.getIncrementalRepair() ? repairRun.getId() : segment.getId();
if (takeLead(context, leaderElectionId) || renewLead(context, leaderElectionId)) {
if (forced || takeLead(context, leaderElectionId) || renewLead(context, leaderElectionId)) {
// refresh segment once we're inside leader-election
segment = context.storage.getRepairSegment(repairRun.getId(), segment.getId()).get();
if (RepairSegment.State.RUNNING == segment.getState()) {
Expand All @@ -199,7 +210,7 @@ void abortSegments(
+ "Postponing the segment.",
segment.getId(),
e);
SegmentRunner.postpone(context, segment, Optional.fromNullable(repairUnit));
SegmentRunner.postponeSegment(context, segment);
} finally {
// if someone else does hold the lease, ie renewLead(..) was true,
// then their writes to repair_run table and any call to releaseLead(..) will throw an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRep
.forEach(
range
-> repairSegmentBuilders.add(
RepairSegment.builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));
RepairSegment.builder(range.getValue(), repairUnit.getId()).withCoordinatorHost(range.getKey())));

return repairSegmentBuilders;
}
Expand Down
Loading

0 comments on commit 263f4ec

Please sign in to comment.