From 514e01e95154ca50edcf74ac6f05885d663c83da Mon Sep 17 00:00:00 2001 From: mck Date: Thu, 6 Jul 2017 08:19:29 +1000 Subject: [PATCH] Remove `IStorage.getNextFreeSegment(..)` as `getNextFreeSegmentInRange(..)` is always used now. --- .../spotify/reaper/service/RepairRunner.java | 4 +-- .../reaper/storage/CassandraStorage.java | 36 ++++++------------- .../com/spotify/reaper/storage/IStorage.java | 4 +-- .../spotify/reaper/storage/MemoryStorage.java | 18 +++++----- .../reaper/storage/PostgresStorage.java | 31 ++++++++-------- .../reaper/unit/service/RepairRunnerTest.java | 6 ++-- .../unit/service/SegmentRunnerTest.java | 6 ++-- 7 files changed, 47 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index d33089341..33dc1d3a2 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -254,8 +254,8 @@ private void startNextSegment() throws ReaperException, InterruptedException { // We have an empty slot, so let's start new segment runner if possible. LOG.info("Running segment for range {}", parallelRanges.get(rangeIndex)); - Optional nextRepairSegment = - context.storage.getNextFreeSegmentInRange(repairRunId, parallelRanges.get(rangeIndex)); + Optional nextRepairSegment + = context.storage.getNextFreeSegmentInRange(repairRunId, Optional.of(parallelRanges.get(rangeIndex))); if (!nextRepairSegment.isPresent()) { LOG.debug("No repair segment available for range {}", parallelRanges.get(rangeIndex)); diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index feb234e25..7bc96b4c0 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -453,9 +453,8 @@ public Collection getRepairSegmentsForRunInLocalMode(UUID runId, return segments; } - private boolean segmentIsWithinRange(RepairSegment segment, RingRange range) { + private static boolean segmentIsWithinRange(RepairSegment segment, RingRange range) { return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken())); - } private static RepairSegment createRepairSegmentFromRow(Row segmentRow){ @@ -473,35 +472,18 @@ private static RepairSegment createRepairSegmentFromRow(Row segmentRow){ .build(segmentRow.getUUID("segment_id")); } - public Optional getSegment(UUID runId, Optional range) { + + @Override + public Optional getNextFreeSegmentInRange(UUID runId, Optional range) { List segments = Lists.newArrayList(getRepairSegmentsForRun(runId)); Collections.shuffle(segments); - RepairSegment segment = null; for(RepairSegment seg:segments){ - if(seg.getState().equals(State.NOT_STARTED) // State condition - && ((range.isPresent() && - (segmentIsWithinRange(seg, range.get())) - ) || !range.isPresent()) // Token range condition - ){ - if(takeLeadOnSegment(seg.getId())) { - segment = seg; - break; - } + if (seg.getState().equals(State.NOT_STARTED) && withinRange(seg, range) && takeLeadOnSegment(seg.getId())) { + return Optional.of(seg); } } - return Optional.fromNullable(segment); - } - - - @Override - public Optional getNextFreeSegment(UUID runId) { - return getSegment(runId, Optional.absent()); - } - - @Override - public Optional getNextFreeSegmentInRange(UUID runId, RingRange range) { - return getSegment(runId, Optional.fromNullable(range)); + return Optional.absent(); } @Override @@ -816,4 +798,8 @@ public void saveHeartbeat() { lastHeartBeat = now; } } + + private static boolean withinRange(RepairSegment segment, Optional range) { + return !range.isPresent() || segmentIsWithinRange(segment, range.get()); + } } diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 74aef3de2..0a25d7eb9 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -98,8 +98,6 @@ Optional getRepairUnit(String cluster, String keyspace, Collection getRepairSegmentsForRun(UUID runId); - Optional getNextFreeSegment(UUID runId); - /** * @param runId the run id that the segment belongs to. * @param range a ring range. The start of the range may be greater than or equal to the end. @@ -107,7 +105,7 @@ Optional getRepairUnit(String cluster, String keyspace, * that covers the whole ring. * @return a segment enclosed by the range with state NOT_STARTED, or nothing. */ - Optional getNextFreeSegmentInRange(UUID runId, RingRange range); + Optional getNextFreeSegmentInRange(UUID runId, Optional range); Collection getSegmentsWithState(UUID runId, RepairSegment.State segmentState); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 522c51b0a..5914b9bd9 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -264,8 +264,7 @@ public Collection getRepairSegmentsForRun(UUID runId) { return repairSegmentsByRunId.get(runId).values(); } - @Override - public Optional getNextFreeSegment(UUID runId) { + private Optional getNextFreeSegment(UUID runId) { for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) { if (segment.getState() == RepairSegment.State.NOT_STARTED) { return Optional.of(segment); @@ -275,12 +274,15 @@ public Optional getNextFreeSegment(UUID runId) { } @Override - public Optional getNextFreeSegmentInRange(UUID runId, RingRange range) { - for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) { - if (segment.getState() == RepairSegment.State.NOT_STARTED && - range.encloses(segment.getTokenRange())) { - return Optional.of(segment); - } + public Optional getNextFreeSegmentInRange(UUID runId, Optional range) { + if (range.isPresent()) { + for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) { + if (segment.getState() == RepairSegment.State.NOT_STARTED && range.get().encloses(segment.getTokenRange())) { + return Optional.of(segment); + } + } + } else { + return getNextFreeSegment(runId); } return Optional.absent(); } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index b651550fe..7de283960 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -316,8 +316,7 @@ public Collection getRepairSegmentsForRun(UUID runId) { } } - @Override - public Optional getNextFreeSegment(UUID runId) { + private Optional getNextFreeSegment(UUID runId) { RepairSegment result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getNextFreeRepairSegment(UuidUtil.toSequenceId(runId)); @@ -326,19 +325,23 @@ public Optional getNextFreeSegment(UUID runId) { } @Override - public Optional getNextFreeSegmentInRange(UUID runId, RingRange range) { - RepairSegment result; - try (Handle h = jdbi.open()) { - IStoragePostgreSQL storage = getPostgresStorage(h); - if (!range.isWrapping()) { - result = storage.getNextFreeRepairSegmentInNonWrappingRange(UuidUtil.toSequenceId(runId), range.getStart(), - range.getEnd()); - } else { - result = storage.getNextFreeRepairSegmentInWrappingRange(UuidUtil.toSequenceId(runId), range.getStart(), - range.getEnd()); - } + public Optional getNextFreeSegmentInRange(UUID runId, Optional range) { + if (range.isPresent()) { + RepairSegment result; + try (Handle h = jdbi.open()) { + IStoragePostgreSQL storage = getPostgresStorage(h); + if (!range.get().isWrapping()) { + result = storage.getNextFreeRepairSegmentInNonWrappingRange(UuidUtil.toSequenceId(runId), range.get().getStart(), + range.get().getEnd()); + } else { + result = storage.getNextFreeRepairSegmentInWrappingRange(UuidUtil.toSequenceId(runId), range.get().getStart(), + range.get().getEnd()); + } + } + return Optional.fromNullable(result); + } else { + return getNextFreeSegment(runId); } - return Optional.fromNullable(result); } @Override diff --git a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java index 574512f8c..02b5a895f 100644 --- a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java @@ -96,7 +96,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException { new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ZERO, BigInteger.ONE), cf.getId()))); final UUID RUN_ID = run.getId(); - final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); @@ -209,7 +209,7 @@ public void testHangingRepairNewAPI() throws InterruptedException, ReaperExcepti new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ZERO, BigInteger.ONE), cf.getId()))); final UUID RUN_ID = run.getId(); - final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); @@ -331,7 +331,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException { .repairCommandId(1337), new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf))); final UUID RUN_ID = run.getId(); - final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); context.repairManager.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, TimeUnit.MILLISECONDS); diff --git a/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java index 2d728e468..b662a548a 100644 --- a/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java @@ -79,7 +79,7 @@ public void timeoutTest() throws InterruptedException, ReaperException, Executio Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); final UUID runId = run.getId(); - final UUID segmentId = context.storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID segmentId = context.storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); final MutableObject> future = new MutableObject<>(); @@ -138,7 +138,7 @@ public void successTest() throws InterruptedException, ReaperException, Executio new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); final UUID runId = run.getId(); - final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID segmentId = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); final MutableObject> future = new MutableObject<>(); @@ -212,7 +212,7 @@ public void failureTest() throws InterruptedException, ReaperException, Executio new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); final UUID runId = run.getId(); - final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); + final UUID segmentId = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); final MutableObject> future = new MutableObject<>();