Skip to content

Commit

Permalink
Include last event info in a RepairRun(Status)
Browse files Browse the repository at this point in the history
Conflicts:
	src/main/java/com/spotify/reaper/service/RepairRunner.java
	src/main/java/com/spotify/reaper/service/SegmentRunner.java
  • Loading branch information
Radovan Zvoncek authored and varjoranta committed Feb 4, 2015
1 parent 7ca9c14 commit 0308a54
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 8 deletions.
13 changes: 13 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class RepairRun {
private final DateTime endTime;
private final DateTime pauseTime;
private final double intensity;
private final String lastEvent;

private RepairRun(Builder builder, long id) {
this.id = id;
Expand All @@ -46,6 +47,7 @@ private RepairRun(Builder builder, long id) {
this.endTime = builder.endTime;
this.pauseTime = builder.pauseTime;
this.intensity = builder.intensity;
this.lastEvent = builder.lastEvent;
}

public long getId() {
Expand Down Expand Up @@ -92,6 +94,10 @@ public double getIntensity() {
return intensity;
}

public String getLastEvent() {
return lastEvent;
}

public Builder with() {
return new Builder(this);
}
Expand All @@ -116,6 +122,7 @@ public static class Builder {
private DateTime startTime;
private DateTime endTime;
private DateTime pauseTime;
private String lastEvent = "Nothing happened yet";

public Builder(String clusterName, long repairUnitId, DateTime creationTime,
double intensity) {
Expand All @@ -137,6 +144,7 @@ private Builder(RepairRun original) {
startTime = original.startTime;
endTime = original.endTime;
pauseTime = original.pauseTime;
lastEvent = original.lastEvent;
}

public Builder runState(RunState runState) {
Expand Down Expand Up @@ -179,6 +187,11 @@ public Builder pauseTime(DateTime pauseTime) {
return this;
}

public Builder lastEvent(String event) {
this.lastEvent = event;
return this;
}

public RepairRun build(long id) {
return new RepairRun(this, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class RepairRunStatus {
@JsonProperty("segments_repaired")
private int segmentsRepaired = 0;

@JsonProperty("last_event")
private final String lastEvent;

public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) {
this.id = repairRun.getId();
this.cause = repairRun.getCause();
Expand All @@ -91,6 +94,7 @@ public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) {
this.intensity = roundIntensity(repairRun.getIntensity());
this.segmentCount = repairUnit.getSegmentCount();
this.repairParallelism = repairUnit.getRepairParallelism().name().toLowerCase();
this.lastEvent = repairRun.getLastEvent();
}

@VisibleForTesting
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ private void end() {
LOG.info("Repairs for repair run #{} done", repairRunId);
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
boolean success = context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.build(repairRun.getId()));
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.lastEvent("All done")
.build(repairRun.getId()));
if (!success) {
LOG.error("failed updating repair run " + repairRun.getId());
}
Expand Down Expand Up @@ -263,7 +264,13 @@ private void handleResult(long segmentId) {
break;
case DONE:
// Successful repair
executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS);
long delay = intensityBasedDelayMillis(segment);
executor.schedule(this, delay, TimeUnit.MILLISECONDS);
String event = String.format("Waiting %ds because of intensity based delay", delay / 1000);
RepairRun updatedRepairRun =
context.storage.getRepairRun(repairRunId).get().with().lastEvent(event)
.build(repairRunId);
context.storage.updateRepairRun(updatedRepairRun);
break;
default:
// Another thread has started a new repair on this segment already
Expand Down
23 changes: 19 additions & 4 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.cassandra.RepairStatusHandler;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;

Expand Down Expand Up @@ -95,6 +96,7 @@ public int getCurrentCommandId() {

private void runRepair(Collection<String> potentialCoordinators, long timeoutMillis) {
final RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
final RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get();
try (JmxProxy coordinator = context.jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators)) {
RepairUnit repairUnit = context.storage.getRepairUnit(segment.getRepairUnitId()).get();
Expand All @@ -114,9 +116,12 @@ private void runRepair(Collection<String> potentialCoordinators, long timeoutMil
.coordinatorHost(coordinator.getHost())
.repairCommandId(commandId)
.build(segmentId));
LOG.info("Repair for segment {} started, status wait will timeout in {} millis",
segmentId, timeoutMillis);

String eventMsg = String.format("Triggered repair of segment %d via host %s",
segment.getId(), coordinator.getHost());
context.storage.updateRepairRun(
repairRun.with().lastEvent(eventMsg).build(repairRun.getId()));
LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId,
timeoutMillis);
try {
condition.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand All @@ -140,6 +145,8 @@ private void runRepair(Collection<String> potentialCoordinators, long timeoutMil
}
} catch (ReaperException e) {
LOG.warn("Failed to connect to a coordinator node for segment {}", segmentId);
String msg = String.format("Postponed because couldn't any of the coordinators");
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
postpone(segment);
}
}
Expand All @@ -152,15 +159,23 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator)
try (JmxProxy hostProxy = context.jmxConnectionFactory.connect(hostName)) {
LOG.debug("checking host '{}' for pending compactions and other repairs (can repair?)"
+ " Run id '{}'", hostName, segment.getRunId());
if (hostProxy.getPendingCompactions() > MAX_PENDING_COMPACTIONS) {
int pendingCompactions = hostProxy.getPendingCompactions();
if (pendingCompactions > MAX_PENDING_COMPACTIONS) {
LOG.warn("SegmentRunner declined to repair segment {} because of too many pending "
+ "compactions (> {}) on host \"{}\"", segmentId, MAX_PENDING_COMPACTIONS,
hostProxy.getHost());
String msg = String.format("Postponed due to pending compactions (%d)",
pendingCompactions);
RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get();
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
return false;
}
if (hostProxy.isRepairRunning()) {
LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) was "
+ "already involved in a repair", segmentId, hostProxy.getHost());
String msg = String.format("Postponed due to affected hosts already doing repairs");
RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get();
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
return false;
}
}
Expand Down

0 comments on commit 0308a54

Please sign in to comment.