From 5012b6067ba2d00282bbe07337269ea0ec7fc486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Tue, 9 May 2017 15:15:42 +0200 Subject: [PATCH] Don't handle success callbacks after timeout (C* <= 2.1) (#97) --- .../spotify/reaper/service/SegmentRunner.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index e56c7dc9c..a4c2140dd 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -68,6 +69,7 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable { private final RepairRunner repairRunner; private final RepairUnit repairUnit; private int commandId; + private AtomicBoolean timedOut; // Caching all active SegmentRunners. @VisibleForTesting @@ -85,6 +87,7 @@ public SegmentRunner(AppContext context, long segmentId, Collection pote this.clusterName = clusterName; this.repairUnit = repairUnit; this.repairRunner = repairRunner; + this.timedOut = new AtomicBoolean(false); } @Override @@ -222,6 +225,7 @@ protected Set initialize() { if (resultingSegment.getState() == RepairSegment.State.RUNNING) { LOG.info("Repair command {} on segment {} has been cancelled while running", commandId, segmentId); + timedOut.set(true); abort(resultingSegment, coordinator); } else if (resultingSegment.getState() == RepairSegment.State.DONE) { LOG.debug("Repair segment with id '{}' was repaired in {} seconds", @@ -406,12 +410,18 @@ public void handle(int repairNumber, Optional status break; case SESSION_SUCCESS: - LOG.debug("repair session succeeded for segment with id '{}' and repair number '{}'", - segmentId, repairNumber); - context.storage.updateRepairSegment(currentSegment.with() - .state(RepairSegment.State.DONE) - .endTime(DateTime.now()) - .build(segmentId)); + if (timedOut.get()) { + LOG.debug("Got SESSION_SUCCESS for segment with id '{}' and repair number '{}', " + + "but it had already timed out", + segmentId, repairNumber); + } else { + LOG.debug("repair session succeeded for segment with id '{}' and repair number '{}'", + segmentId, repairNumber); + context.storage.updateRepairSegment(currentSegment.with() + .state(RepairSegment.State.DONE) + .endTime(DateTime.now()) + .build(segmentId)); + } break; case SESSION_FAILED: