Skip to content

Commit

Permalink
Merge pull request #25 from varjoranta/master
Browse files Browse the repository at this point in the history
add fail count for repair segment
  • Loading branch information
Bj0rnen committed Jan 20, 2015
2 parents f1fbd59 + 88bb14b commit 0749e6f
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 38 deletions.
2 changes: 1 addition & 1 deletion bin/spreaper
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class ReaperCLI(object):
print 'Unrecognized command: {0}'.format(command)
print REAPER_USAGE
exit(1)
# use dispatch pattern to invoke method with same)
# use dispatch pattern to invoke method with same name as given command
getattr(self, command)()

def init_reaper(self, args):
Expand Down
2 changes: 1 addition & 1 deletion debian/cassandra-reaper.upstart
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ script
exec /usr/bin/cassandra-reaper
end script

# prevent respawning more than once every 5 seconds
# prevent re-spawning more than once every 5 seconds
post-stop exec sleep 5
5 changes: 3 additions & 2 deletions src/main/db/reaper_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ CREATE TABLE IF NOT EXISTS "repair_segment" (
-- see (Java) RepairSegment.State for state values
"state" SMALLINT NOT NULL,
"start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL
"end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"fail_count" INT NOT NULL DEFAULT 0
);
CREATE INDEX "repair_segment_run_id_start_token_idx"
ON "repair_segment" USING BTREE ("run_id" ASC, "start_token" ASC);
ON "repair_segment" USING BTREE ("run_id" DESC, "fail_count" ASC, "start_token" ASC);
CREATE INDEX "repair_segment_state_idx"
ON "repair_segment" USING BTREE ("state");

Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
/**
* Invoked when the MBean this class listens to publishes an event.
*
* We're only interested in repair-related events. Their format is explained at {@link
* org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync} The format is: notification
* type: "repair" notification userData: int array of length 2 where [0] = command number [1] =
* ordinal of AntiEntropyService.Status
* We're only interested in repair-related events. Their format is explained at
* {@link org.apache.cassandra.service.StorageServiceMBean#forceRepairAsync}
* The format is: notification type: "repair" notification
* userData: int array of length 2 where [0] = command number
* [1] = ordinal of AntiEntropyService.Status
*/
@Override
public void handleNotification(Notification notification, Object handback) {
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class RepairSegment {
private final long runId;
private final RingRange tokenRange;
private final State state;
private final int failCount;
private final DateTime startTime;
private final DateTime endTime;

Expand Down Expand Up @@ -62,6 +63,10 @@ public State getState() {
return state;
}

public int getFailCount() {
return failCount;
}

public DateTime getStartTime() {
return startTime;
}
Expand All @@ -86,6 +91,7 @@ private RepairSegment(Builder builder, long id) {
this.state = builder.state;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
this.failCount = builder.failCount;
}

public Builder with() {
Expand All @@ -98,6 +104,7 @@ public static class Builder {
public final RingRange tokenRange;
private final long columnFamilyId;
private State state;
private int failCount;
private Integer repairCommandId;
private DateTime startTime;
private DateTime endTime;
Expand All @@ -107,12 +114,14 @@ public Builder(long runId, RingRange tokenRange, long columnFamilyId) {
this.tokenRange = tokenRange;
this.columnFamilyId = columnFamilyId;
this.state = State.NOT_STARTED;
this.failCount = 0;
}

private Builder(RepairSegment original) {
runId = original.runId;
tokenRange = original.tokenRange;
state = original.state;
failCount = original.failCount;
columnFamilyId = original.columnFamilyId;
repairCommandId = original.repairCommandId;
startTime = original.startTime;
Expand All @@ -124,6 +133,11 @@ public Builder state(State state) {
return this;
}

public Builder failCount(int failCount) {
this.failCount = failCount;
return this;
}

public Builder repairCommandId(Integer repairCommandId) {
this.repairCommandId = repairCommandId;
return this;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/spotify/reaper/resources/PingResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class PingResource {
private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class);

@GET
public String answerPing(@QueryParam("name") Optional<String> name) {
LOG.info("ping called with name: {}", name);
return String.format("Ping %s", name.or("stranger"));
public String answerPing() {
LOG.info("ping called");
return String.format("Cassandra Reaper ping resource replies to you in plain text: PING");
}

}
49 changes: 27 additions & 22 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ public final class SegmentRunner implements RepairStatusHandler {
private final JmxProxy jmxConnection;


public static void triggerRepair(IStorage storage, long segmentId, Collection<String> potentialCoordinators, long timeoutMillis, JmxConnectionFactory jmxConnectionFactory)
public static void triggerRepair(IStorage storage, long segmentId,
Collection<String> potentialCoordinators, long timeoutMillis,
JmxConnectionFactory jmxConnectionFactory)
throws ReaperException, InterruptedException {
new SegmentRunner(storage, segmentId, potentialCoordinators, jmxConnectionFactory).awaitOutcome(
timeoutMillis);
new SegmentRunner(storage, segmentId, potentialCoordinators, jmxConnectionFactory)
.awaitOutcome(timeoutMillis);
}

private SegmentRunner(IStorage storage, long segmentId, Collection<String> potentialCoordinators, JmxConnectionFactory jmxConnectionFactory)
private SegmentRunner(IStorage storage, long segmentId, Collection<String> potentialCoordinators,
JmxConnectionFactory jmxConnectionFactory)
throws ReaperException {
this.storage = storage;
this.segmentId = segmentId;

// TODO: don't trigger the repair in the constructor. The change will force commandId to be
// TODO: mutable, but that's better than this.
synchronized (this) {
jmxConnection = jmxConnectionFactory.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators);
jmxConnection = jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators);

RepairSegment segment = storage.getRepairSegment(segmentId);
ColumnFamily columnFamily =
Expand All @@ -63,13 +67,13 @@ private SegmentRunner(IStorage storage, long segmentId, Collection<String> poten
assert !segment.getState().equals(RepairSegment.State.RUNNING);
commandId = jmxConnection
.triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace,
columnFamily.getName());
columnFamily.getName());
LOG.debug("Triggered repair with command id {}", commandId);
LOG.info("Repair for segment {} started", segmentId);
storage.updateRepairSegment(segment.with()
.state(RepairSegment.State.RUNNING)
.repairCommandId(commandId)
.build(segmentId));
.state(RepairSegment.State.RUNNING)
.repairCommandId(commandId)
.build(segmentId));
}
}

Expand Down Expand Up @@ -97,10 +101,10 @@ private synchronized void awaitOutcome(long timeoutMillis)
private synchronized void abort(RepairSegment segment) {
LOG.warn("Aborting command {} on segment {}", commandId, segmentId);
storage.updateRepairSegment(segment.with()
.startTime(null)
.repairCommandId(null)
.state(RepairSegment.State.NOT_STARTED)
.build(segmentId));
.startTime(null)
.repairCommandId(null)
.state(RepairSegment.State.NOT_STARTED)
.build(segmentId));
}


Expand All @@ -113,7 +117,8 @@ private synchronized void abort(RepairSegment segment) {
* @param message additional information about the repair
*/
@Override
public synchronized void handle(int repairNumber, ActiveRepairService.Status status, String message) {
public synchronized void handle(int repairNumber, ActiveRepairService.Status status,
String message) {
LOG.debug(
"handleRepairOutcome called for repairCommandId {}, outcome {} and message: {}",
repairNumber, status, message);
Expand All @@ -129,26 +134,26 @@ public synchronized void handle(int repairNumber, ActiveRepairService.Status sta
case STARTED:
DateTime now = DateTime.now();
storage.updateRepairSegment(currentSegment.with()
.startTime(now)
.build(segmentId));
.startTime(now)
.build(segmentId));
// We already set the state of the segment to RUNNING.
break;
case SESSION_FAILED:
// TODO: Bj0rn: How should we handle this? Here, it's almost treated like a success.
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.ERROR)
.endTime(DateTime.now())
.build(segmentId));
.state(RepairSegment.State.ERROR)
.endTime(DateTime.now())
.build(segmentId));
notify();
break;
case SESSION_SUCCESS:
// Do nothing, wait for FINISHED.
break;
case FINISHED:
storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
notify();
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,19 @@ public ColumnFamily getColumnFamilyByClusterAndName(@Bind("clusterName") String
// RepairSegment
//
static final String SQL_REPAIR_SEGMENT_ALL_FIELDS_NO_ID =
"column_family_id, run_id, start_token, end_token, state, start_time, end_time";
"column_family_id, run_id, start_token, end_token, state, start_time, end_time, fail_count";

static final String SQL_REPAIR_SEGMENT_ALL_FIELDS = "id, " + SQL_REPAIR_SEGMENT_ALL_FIELDS_NO_ID;

static final String SQL_INSERT_REPAIR_SEGMENT =
"INSERT INTO repair_segment (" + SQL_REPAIR_SEGMENT_ALL_FIELDS_NO_ID + ") VALUES "
+ "(:columnFamilyId, :runId, :startToken, :endToken, :state, :startTime, :endTime)";
+ "(:columnFamilyId, :runId, :startToken, :endToken, :state, :startTime, :endTime, "
+ ":failCount)";

static final String SQL_UPDATE_REPAIR_SEGMENT =
"UPDATE repair_segment SET column_family_id = :columnFamilyId, run_id = :runId, "
+ "start_token = :startToken, end_token = :endToken, state = :state, "
+ "start_time = :startTime, end_time = :endTime WHERE id = :id";
+ "start_time = :startTime, end_time = :endTime, fail_count = :failCount WHERE id = :id";

static final String SQL_GET_REPAIR_SEGMENT =
"SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE id = :id";
Expand All @@ -172,12 +173,12 @@ public ColumnFamily getColumnFamilyByClusterAndName(@Bind("clusterName") String

static final String SQL_GET_NEXT_FREE_REPAIR_SEGMENT =
"SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE run_id = :runId "
+ "AND state = 0 ORDER BY start_token ASC LIMIT 1";
+ "AND state = 0 ORDER BY fail_count ASC, start_token ASC LIMIT 1";

static final String SQL_GET_NEXT_FREE_REPAIR_SEGMENT_ON_RANGE =
"SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE "
+ "run_id = :runId AND state = 0 AND start_token >= :startToken "
+ "AND end_token < :endToken ORDER BY start_token ASC LIMIT 1";
+ "AND end_token < :endToken ORDER BY fail_count ASC, start_token ASC LIMIT 1";

@SqlBatch(SQL_INSERT_REPAIR_SEGMENT)
@BatchChunkSize(500)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public RepairSegment map(int index, ResultSet r, StatementContext ctx) throws SQ
.state(RepairSegment.State.values()[r.getInt("state")])
.startTime(RepairRunMapper.getDateTimeOrNull(r, "start_time"))
.endTime(RepairRunMapper.getDateTimeOrNull(r, "end_time"))
.failCount(r.getInt("fail_count"))
.build(r.getLong("id"));
}
}

0 comments on commit 0749e6f

Please sign in to comment.