Skip to content

Commit

Permalink
Don't handle success callbacks after timeout (C* <= 2.1) (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen authored and michaelsembwever committed Jun 27, 2017
1 parent 7136e65 commit 5012b60
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -68,6 +69,7 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable {
private final RepairRunner repairRunner;
private final RepairUnit repairUnit;
private int commandId;
private AtomicBoolean timedOut;

// Caching all active SegmentRunners.
@VisibleForTesting
Expand All @@ -85,6 +87,7 @@ public SegmentRunner(AppContext context, long segmentId, Collection<String> pote
this.clusterName = clusterName;
this.repairUnit = repairUnit;
this.repairRunner = repairRunner;
this.timedOut = new AtomicBoolean(false);
}

@Override
Expand Down Expand Up @@ -222,6 +225,7 @@ protected Set<String> initialize() {
if (resultingSegment.getState() == RepairSegment.State.RUNNING) {
LOG.info("Repair command {} on segment {} has been cancelled while running", commandId,
segmentId);
timedOut.set(true);
abort(resultingSegment, coordinator);
} else if (resultingSegment.getState() == RepairSegment.State.DONE) {
LOG.debug("Repair segment with id '{}' was repaired in {} seconds",
Expand Down Expand Up @@ -406,12 +410,18 @@ public void handle(int repairNumber, Optional<ActiveRepairService.Status> status
break;

case SESSION_SUCCESS:
LOG.debug("repair session succeeded for segment with id '{}' and repair number '{}'",
segmentId, repairNumber);
context.storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
if (timedOut.get()) {
LOG.debug("Got SESSION_SUCCESS for segment with id '{}' and repair number '{}', " +
"but it had already timed out",
segmentId, repairNumber);
} else {
LOG.debug("repair session succeeded for segment with id '{}' and repair number '{}'",
segmentId, repairNumber);
context.storage.updateRepairSegment(currentSegment.with()
.state(RepairSegment.State.DONE)
.endTime(DateTime.now())
.build(segmentId));
}
break;

case SESSION_FAILED:
Expand Down

0 comments on commit 5012b60

Please sign in to comment.