diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index 9c6ce0e99..d79653a91 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -31,6 +31,8 @@ import sun.misc.Signal; import sun.misc.SignalHandler; +import java.util.concurrent.TimeUnit; + import io.dropwizard.Application; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; @@ -68,7 +70,7 @@ public void run(ReaperApplicationConfiguration config, LOG.info("initializing runner thread pool with {} threads", config.getRepairRunThreadCount()); RepairRunner.initializeThreadPool(config.getRepairRunThreadCount(), - config.getHangingRepairTimeoutMins() * 60); + config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES); LOG.info("initializing storage of type: {}", config.getStorageType()); IStorage storage = initializeStorage(config, environment); diff --git a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java b/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java index 7a5dc29b6..42766a1e0 100644 --- a/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java +++ b/src/main/java/com/spotify/reaper/service/JmxConnectionFactory.java @@ -23,7 +23,7 @@ public class JmxConnectionFactory { - public JmxProxy create(String host) throws ReaperException { + public final JmxProxy create(String host) throws ReaperException { return create(Optional.absent(), host); } @@ -32,7 +32,7 @@ public JmxProxy create(Optional handler, String host) return JmxProxy.connect(handler, host); } - public JmxProxy connectAny(Optional handler, Collection hosts) + public final JmxProxy connectAny(Optional handler, Collection hosts) throws ReaperException { return create(handler, hosts.iterator().next()); } diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 37aa220f6..307a7e005 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -24,7 +24,6 @@ import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.IStorage; -import org.apache.cassandra.service.ActiveRepairService; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +31,8 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -/** - * A repair runner that only triggers one segment repair at a time. - */ public class RepairRunner implements Runnable { // TODO: test // TODO: handle failed storage updates @@ -47,11 +42,11 @@ public class RepairRunner implements Runnable { private static final int JMX_FAILURE_SLEEP_DELAY_SECONDS = 30; private static ScheduledExecutorService executor = null; - private static int repairTimeoutSecs; + private static long repairTimeoutMillis; - public static void initializeThreadPool(int threadAmount, int repairTimeoutSecs) { + public static void initializeThreadPool(int threadAmount, long repairTimeout, TimeUnit timeUnit) { executor = Executors.newScheduledThreadPool(threadAmount); - RepairRunner.repairTimeoutSecs = repairTimeoutSecs; + RepairRunner.repairTimeoutMillis = timeUnit.toMillis(repairTimeout); } public static void startNewRepairRun(IStorage storage, long repairRunID, @@ -73,12 +68,6 @@ public static void startNewRepairRun(IStorage storage, long repairRunID, private final JmxConnectionFactory jmxConnectionFactory; private JmxProxy jmxConnection; - // These fields are only set when a segment is being repaired. - // TODO: bundle them into a class? - private ScheduledFuture repairTimeout = null; - private int currentCommandId = -1; - private long currentSegmentId = -1; - @VisibleForTesting RepairRunner(IStorage storage, long repairRunId, JmxConnectionFactory jmxConnectionFactory) throws ReaperException { @@ -94,6 +83,9 @@ public static void startNewRepairRun(IStorage storage, long repairRunID, */ @Override public void run() { + // TODO: just like SegmentRunner, RepairRunner should probably be blocking. + // TODO: the best way to do that is probably to remove the Runnable interface and do everything + // TODO: in a while loop. RepairRun.RunState state = storage.getRepairRun(repairRunId).getRunState(); LOG.debug("run() called for repair run #{} with run state {}", repairRunId, state); switch (state) { @@ -142,47 +134,23 @@ private void end() { * as NOT_STARTED to queue it up for a retry. */ private void startNextSegment() { - RepairSegment running = storage.getTheRunningSegment(repairRunId); - if (running != null) { - abortSegment(running); - startNextSegment(); + // Currently not allowing parallel repairs. + assert storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.RUNNING) == 0; + RepairSegment next = storage.getNextFreeSegment(repairRunId); + if (next != null) { + doRepairSegment(next.getId(), next.getTokenRange()); } else { - assert !repairIsTriggered(); - RepairSegment next = storage.getNextFreeSegment(repairRunId); - if (next != null) { - doRepairSegment(next); - } else { - end(); - } + end(); } } - /** - * Set the running repair segment back to NOT_STARTED. - * - * @param runningSegment the running repair segment. - */ - public void abortSegment(RepairSegment runningSegment) { - // TODO: actually abort the repair. (runningSegment.getRepairCommandId() should be set) - assert runningSegment.getRepairCommandId() != null; - LOG.debug("Aborting repair with and segmentId {} in repair run #{}", - runningSegment.getId(), repairRunId); - storage.updateRepairSegment(runningSegment.with() - .startTime(null) - .repairCommandId(null) - .state(RepairSegment.State.NOT_STARTED) - .build(runningSegment.getId())); - } - /** * Start the repair of a segment. * - * @param next the segment to repair. + * @param segmentId id of the segment to repair. + * @param tokenRange token range of the segment to repair. */ - private synchronized void doRepairSegment(RepairSegment next) { - // TODO: directly store the right host to contact per segment (or per run, if we guarantee that - // TODO: one host can coordinate all repair segments). - + private void doRepairSegment(long segmentId, RingRange tokenRange) { ColumnFamily columnFamily = storage.getColumnFamily(storage.getRepairRun(repairRunId).getColumnFamilyId()); String keyspace = columnFamily.getKeyspaceName(); @@ -202,9 +170,7 @@ private synchronized void doRepairSegment(RepairSegment next) { LOG.info("successfully reestablished JMX proxy for repair runner on run id: {}", repairRunId); } - - List potentialCoordinators = jmxConnection - .tokenRangeToEndpoint(keyspace, storage.getNextFreeSegment(repairRunId).getTokenRange()); + List potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange); if (potentialCoordinators == null) { // This segment has a faulty token range. Abort the entire repair run. RepairRun repairRun = storage.getRepairRun(repairRunId); @@ -214,151 +180,43 @@ private synchronized void doRepairSegment(RepairSegment next) { return; } - // Connect to a node that can act as coordinator for the new repair. try { - jmxConnection.close(); - jmxConnection = jmxConnectionFactory.connectAny(Optional.of( - new RepairStatusHandler() { - @Override - public void handle(int repairNumber, ActiveRepairService.Status status, - String message) { - RepairOutcome outcome; - switch (status) { - case STARTED: - outcome = RepairOutcome.STARTED; - break; - case SESSION_FAILED: - outcome = RepairOutcome.FAILED; - break; - case FINISHED: - outcome = RepairOutcome.FINISHED; - break; - default: - // Do nothing, wait for FINISHED. - return; - } - handleRepairOutcome(repairNumber, outcome, message); - } - }), potentialCoordinators); + SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis, + jmxConnectionFactory); } catch (ReaperException e) { e.printStackTrace(); - LOG.warn("Failed to connect to a coordinator node for next repair in runner #{}, " - + "reattempting in {} seconds", repairRunId, JMX_FAILURE_SLEEP_DELAY_SECONDS); - executor.schedule(this, JMX_FAILURE_SLEEP_DELAY_SECONDS, TimeUnit.SECONDS); - return; + } catch (InterruptedException e) { + e.printStackTrace(); } - currentSegmentId = next.getId(); - // TODO: ensure that no repair is already running (abort all repairs) - currentCommandId = jmxConnection - .triggerRepair(next.getStartToken(), next.getEndToken(), keyspace, columnFamily.getName()); - repairTimeout = executor.schedule(new Runnable() { - @Override - public void run() { - handleRepairOutcome(currentCommandId, RepairOutcome.TIMEOUT, - "[Reaper] Repair command timed out"); - } - }, repairTimeoutSecs, TimeUnit.SECONDS); - LOG.debug("Triggered repair with command id {}", currentCommandId); - LOG.info("Repair for segment {} started in repair run #{}", currentSegmentId, repairRunId); - storage.updateRepairSegment(next.with() - .state(RepairSegment.State.RUNNING) - .repairCommandId(currentCommandId) - .build(currentSegmentId)); + handleResult(segmentId); } - - public static enum RepairOutcome { - STARTED, FINISHED, FAILED, TIMEOUT - } - - /** - * Called when there is an event coming either from JMX or this runner regarding on-going - * repairs. - * - * @param repairCommandId repair sequence number, obtained when triggering a repair - * @param outcome new status of the repair (STARTED, FINISHED, FAILED, TIMEOUT) - * @param message additional information about the repair - */ - public synchronized void handleRepairOutcome(int repairCommandId, RepairOutcome outcome, - String message) { - LOG.debug( - "handleRepairOutcome called for repair run #{}, repairCommandId {}, outcome {} and message: {}", - repairRunId, repairCommandId, outcome, message); - if (repairCommandId != currentCommandId) { - LOG.warn("Repair run id != current command id. {} != {}", repairCommandId, currentCommandId); - // This can be reached if timeout happens while finished repair is being handled, or vice - // versa. Since this method is synchronized, only one will get through. - - // Another cause for getting here is other repairs running on the node than what this runner - // has initiated. - return; - } - - if (repairIsTriggered()) { - RepairSegment currentSegment = storage.getRepairSegment(currentSegmentId); - // See status explanations from: https://wiki.apache.org/cassandra/RepairAsyncAPI - switch (outcome) { - case STARTED: - DateTime now = DateTime.now(); - storage.updateRepairSegment(currentSegment.with() - .startTime(now) - .build(currentSegmentId)); - // We already set the state of the segment to RUNNING. - break; - case FINISHED: { - RepairSegment updatedSegment = currentSegment.with() - .state(RepairSegment.State.DONE) - .endTime(DateTime.now()) - .build(currentSegmentId); - storage.updateRepairSegment(updatedSegment); - closeRepairCommand(); - executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS); - } + private void handleResult(long segmentId) { + RepairSegment segment = storage.getRepairSegment(segmentId); + RepairSegment.State state = segment.getState(); + LOG.debug("In repair run #{}, triggerRepair on segment {} terminated with state {}", + repairRunId, segmentId, state); + switch (state) { + case NOT_STARTED: + // Repair timed out + executor.submit(this); break; - case FAILED: { - // TODO: Bj0rn: How should we handle this? Here, it's almost treated like a success. - RepairSegment updatedSegment = currentSegment.with() - .state(RepairSegment.State.ERROR) - .endTime(DateTime.now()) - .build(currentSegmentId); - storage.updateRepairSegment(updatedSegment); - closeRepairCommand(); - executor.schedule(this, intensityBasedDelayMillis(updatedSegment), TimeUnit.MILLISECONDS); - } + case DONE: + // Successful repair + executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS); break; - case TIMEOUT: { - closeRepairCommand(); - abortSegment(currentSegment); - executor.submit(this); - } + case ERROR: + // Unsuccessful repair + executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS); break; - } + case RUNNING: + // Another thread has started a new repair on this segment already + // Or maybe the same repair segment id should never be re-run in which case this is an error + executor.submit(this); } } - /** - * @return true if this RepairRunner has triggered a repair and is currently waiting - * for a repair status notification from JMX. - */ - boolean repairIsTriggered() { - return repairTimeout != null; - } - - /** - * Stop countdown for repair, and stop listening for JMX notifications for the current repair. - */ - void closeRepairCommand() { - LOG.debug("Closing repair command with commandId {} and segmentId {} in repair run #{}", - currentCommandId, currentSegmentId, repairRunId); - assert repairTimeout != null; - - repairTimeout.cancel(false); - repairTimeout = null; - currentCommandId = -1; - currentSegmentId = -1; - } - /** * Calculate the delay that should be used before starting the next repair segment. * diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 054bb41e7..916ea118f 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -50,7 +50,8 @@ private SegmentRunner(IStorage storage, long segmentId, Collection poten this.storage = storage; this.segmentId = segmentId; - // TODO: don't trigger the repair in the constructor. Requires commandId to be mutable + // TODO: don't trigger the repair in the constructor. The change will force commandId to be + // TODO: mutable, but that's better than this. synchronized (this) { jmxConnection = jmxConnectionFactory.connectAny(Optional.of(this), potentialCoordinators); @@ -59,6 +60,7 @@ private SegmentRunner(IStorage storage, long segmentId, Collection poten storage.getColumnFamily(segment.getColumnFamilyId()); String keyspace = columnFamily.getKeyspaceName(); + assert !segment.getState().equals(RepairSegment.State.RUNNING); commandId = jmxConnection .triggerRepair(segment.getStartToken(), segment.getEndToken(), keyspace, columnFamily.getName()); diff --git a/src/test/java/com/spotify/reaper/resources/TableResourceTest.java b/src/test/java/com/spotify/reaper/resources/TableResourceTest.java index ff06bf194..b36c06445 100644 --- a/src/test/java/com/spotify/reaper/resources/TableResourceTest.java +++ b/src/test/java/com/spotify/reaper/resources/TableResourceTest.java @@ -12,6 +12,7 @@ import com.spotify.reaper.resources.view.ColumnFamilyStatus; import com.spotify.reaper.service.JmxConnectionFactory; import com.spotify.reaper.service.RepairRunner; +import com.spotify.reaper.service.RingRange; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; import org.joda.time.DateTimeUtils; @@ -21,7 +22,9 @@ import java.math.BigInteger; import java.net.URI; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; @@ -80,13 +83,11 @@ public void setUp() throws Exception { when(proxy.getPartitioner()).thenReturn(PARTITIONER); when(proxy.getTokens()).thenReturn(TOKENS); when(proxy.tableExists(anyString(), anyString())).thenReturn(Boolean.TRUE); + when(proxy.isConnectionAlive()).thenReturn(Boolean.TRUE); + when(proxy.tokenRangeToEndpoint(anyString(), any(RingRange.class))).thenReturn(Collections.singletonList("")); factory = new JmxConnectionFactory() { @Override - public JmxProxy create(String host) throws ReaperException { - return proxy; - } - @Override - public JmxProxy connectAny(Optional handler, Collection hosts) { + public JmxProxy create(Optional handler, String host) throws ReaperException { return proxy; } }; @@ -132,7 +133,7 @@ public void testAddTableWithTrigger() throws Exception { Optional owner = Optional.of("test"); Optional cause = Optional.of("tetsCase"); - RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S); + RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS); DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); Response response = resource.addTable(uriInfo, clusterName, seedHost, keyspace, table, diff --git a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java index f22bc9935..125e71ff7 100644 --- a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java @@ -31,19 +31,18 @@ import org.joda.time.DateTimeUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.math.BigInteger; import java.util.Collections; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.Collections; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -82,7 +81,7 @@ public void noSegmentsTest() throws InterruptedException { // start the repair DateTimeUtils.setCurrentMillisFixed(TIME_START); - RepairRunner.initializeThreadPool(1, 180); + RepairRunner.initializeThreadPool(1, 3, TimeUnit.HOURS); RepairRunner.startNewRepairRun(storage, RUN_ID, new JmxConnectionFactory() { @Override public JmxProxy create(Optional handler, String host) @@ -103,7 +102,6 @@ public JmxProxy create(Optional handler, String host) assertEquals(TIME_START, endTime.getMillis()); } - @Test public void testHangingRepair() throws ReaperException, InterruptedException { final String CLUSTER_NAME = "reaper"; @@ -113,74 +111,104 @@ public void testHangingRepair() throws ReaperException, InterruptedException { final long TIME_RERUN = 42l; final double INTENSITY = 0.5f; - IStorage storage = new MemoryStorage(); + final IStorage storage = new MemoryStorage(); storage.addCluster(new Cluster(CLUSTER_NAME, null, Collections.singleton(null))); - ColumnFamily cf = storage.addColumnFamily(new ColumnFamily.Builder(CLUSTER_NAME, KS_NAME, CF_NAME, 1, false)); - DateTimeUtils.setCurrentMillisFixed(TIME_RUN); - RepairRun repairRun = storage.addRepairRun( + RepairRun run = storage.addRepairRun( new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY)); - storage.addRepairSegments(Collections.singleton( - new RepairSegment.Builder(repairRun.getId(), new RingRange(BigInteger.ZERO, BigInteger.ONE), cf.getId())), repairRun.getId()); - - final JmxProxy jmx = mock(JmxProxy.class); - - when(jmx.getClusterName()).thenReturn(CLUSTER_NAME); - when(jmx.isConnectionAlive()).thenReturn(true); - when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) - .thenReturn(Lists.newArrayList("")); - - final AtomicInteger repairAttempts = new AtomicInteger(0); - when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), anyString())) - .then(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - return repairAttempts.incrementAndGet(); - } - }); - - RepairRunner.initializeThreadPool(1, 1); - final RepairRunner repairRunner = new RepairRunner(storage, 1, new JmxConnectionFactory() { + new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ZERO, BigInteger.ONE), + cf.getId())), run.getId()); + final long RUN_ID = run.getId(); + final long SEGMENT_ID = storage.getNextFreeSegment(run.getId()).getId(); + + RepairRunner.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS); + + assertEquals(storage.getRepairSegment(SEGMENT_ID).getState(), RepairSegment.State.NOT_STARTED); + RepairRunner.startNewRepairRun(storage, RUN_ID, new JmxConnectionFactory() { + final AtomicInteger repairAttempts = new AtomicInteger(0); + @Override - public JmxProxy create(Optional handler, String host) + public JmxProxy create(final Optional handler, String host) throws ReaperException { + final JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getClusterName()).thenReturn(CLUSTER_NAME); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) + .thenReturn(Lists.newArrayList("")); + when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), + anyString())).then( + new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(SEGMENT_ID).getState()); + + final int repairNumber = repairAttempts.getAndIncrement(); + switch (repairNumber) { + case 0: + new Thread() { + @Override + public void run() { + try { + sleep(10); + + handler.get() + .handle(repairNumber, ActiveRepairService.Status.STARTED, null); + sleep(100); + + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(SEGMENT_ID).getState()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + break; + case 1: + new Thread() { + @Override + public void run() { + try { + sleep(10); + + handler.get() + .handle(repairNumber, ActiveRepairService.Status.STARTED, null); + sleep(100); + + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(SEGMENT_ID).getState()); + handler.get() + .handle(repairNumber, ActiveRepairService.Status.SESSION_SUCCESS, null); + sleep(10); + + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(SEGMENT_ID).getState()); + handler.get() + .handle(repairNumber, ActiveRepairService.Status.FINISHED, null); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + break; + default: + fail("triggerRepair should only have been called twice"); + } + return repairNumber; + } + }); return jmx; } }); - assertEquals(storage.getRepairSegment(1).getState(), RepairSegment.State.NOT_STARTED); - assertEquals(0, repairAttempts.get()); - repairRunner.run(); - assertEquals(1, repairAttempts.get()); - assertEquals(storage.getRepairSegment(1).getState(), RepairSegment.State.RUNNING); - assertTrue(storage.getRepairSegment(1).getStartTime() == null); - repairRunner.handleRepairOutcome(repairAttempts.get(), RepairRunner.RepairOutcome.STARTED, - "Repair " + repairAttempts + " started"); - assertEquals(DateTime.now(), storage.getRepairSegment(1).getStartTime()); - assertEquals(RepairRun.RunState.RUNNING, storage.getRepairRun(1).getRunState()); - - Thread.sleep(1500); - assertEquals(2, repairAttempts.get()); - assertEquals(storage.getRepairSegment(1).getState(), RepairSegment.State.RUNNING); - - DateTimeUtils.setCurrentMillisFixed(TIME_RERUN); - assertTrue(storage.getRepairSegment(1).getStartTime() == null); - repairRunner.handleRepairOutcome(repairAttempts.get(), RepairRunner.RepairOutcome.STARTED, - "Repair " + repairAttempts + " started"); - assertEquals(DateTime.now(), storage.getRepairSegment(1).getStartTime()); - assertEquals(RepairRun.RunState.RUNNING, storage.getRepairRun(1).getRunState()); - - repairRunner.handleRepairOutcome(repairAttempts.get(), RepairRunner.RepairOutcome.FINISHED, - "Repair " + repairAttempts + " finished"); - Thread.sleep(100); - assertEquals(RepairRun.RunState.DONE, storage.getRepairRun(1).getRunState()); + Thread.sleep(1000); + assertEquals(RepairRun.RunState.DONE, storage.getRepairRun(RUN_ID).getRunState()); } - @Test public void testAlreadyStartedRepair() { // TODO diff --git a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java index b78562d4d..78411b91f 100644 --- a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java +++ b/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java @@ -69,7 +69,9 @@ public JmxProxy create(final Optional handler, String host) anyString())) .then(new Answer() { @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { + public Integer answer(InvocationOnMock invocation) { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).getState()); new Thread() { @Override public void run() { @@ -124,6 +126,8 @@ public JmxProxy create(final Optional handler, String host) .then(new Answer() { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).getState()); new Thread() { @Override public void run() { @@ -191,6 +195,8 @@ public JmxProxy create(final Optional handler, String host) .then(new Answer() { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).getState()); new Thread() { @Override public void run() {