diff --git a/pom.xml b/pom.xml index 3c2faa29b..4758f8034 100755 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ UTF-8 - 1.1.0 + 1.0.7 4.1.0 2.2.7 1.2.5 diff --git a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java index 0fdeec709..8b36178fe 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java +++ b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java @@ -254,7 +254,7 @@ public void setCassandraFactory(CassandraFactory cassandra) { } public Boolean getAllowUnreachableNodes() { - return allowUnreachableNodes; + return allowUnreachableNodes != null ? allowUnreachableNodes : false; } public void setAllowUnreachableNodes(Boolean allow) { diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index b94736fc3..98503e87c 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -229,7 +229,7 @@ private void startNextSegment() throws ReaperException { // Just checking that no currently running segment runner is stuck. RepairSegment supposedlyRunningSegment = - context.storage.getRepairSegment(currentlyRunningSegments.get(rangeIndex)).get(); + context.storage.getRepairSegment(repairRunId, currentlyRunningSegments.get(rangeIndex)).get(); DateTime startTime = supposedlyRunningSegment.getStartTime(); if (startTime != null && startTime.isBefore(DateTime.now().minusDays(1))) { LOG.warn("Looks like segment #{} has been running more than a day. Start time: {}", @@ -343,7 +343,8 @@ private boolean repairSegment(final int rangeIndex, final UUID segmentId, RingRa } } else { - potentialCoordinators = Arrays.asList(context.storage.getRepairSegment(segmentId).get().getCoordinatorHost()); + potentialCoordinators + = Arrays.asList(context.storage.getRepairSegment(repairRunId, segmentId).get().getCoordinatorHost()); } SegmentRunner segmentRunner = new SegmentRunner(context, segmentId, potentialCoordinators, @@ -369,7 +370,7 @@ public void onFailure(Throwable t) { } private void handleResult(UUID segmentId) { - RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + RepairSegment segment = context.storage.getRepairSegment(repairRunId, segmentId).get(); RepairSegment.State segmentState = segment.getState(); LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}", repairRunId, segmentId, segmentState); diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 14ad5d1b0..e642a6021 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -79,6 +79,8 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable { public SegmentRunner(AppContext context, UUID segmentId, Collection potentialCoordinators, long timeoutMillis, double intensity, RepairParallelism validationParallelism, String clusterName, RepairUnit repairUnit, RepairRunner repairRunner) { + + assert !segmentRunners.containsKey(segmentId) : "SegmentRunner already exists for segment with ID: " + segmentId; this.context = context; this.segmentId = segmentId; this.potentialCoordinators = potentialCoordinators; @@ -93,7 +95,7 @@ public SegmentRunner(AppContext context, UUID segmentId, Collection pote @Override public void run() { - final RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId); runRepair(); @@ -129,7 +131,7 @@ public static void abort(AppContext context, RepairSegment segment, JmxProxy jmx */ public void postponeCurrentSegment() { synchronized (condition) { - RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); postpone(context, segment, context.storage.getRepairUnit(segment.getRepairUnitId())); } } @@ -149,7 +151,7 @@ private long getOpenFilesAmount() { private void runRepair() { LOG.debug("Run repair for segment #{}", segmentId); - final RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); try (JmxProxy coordinator = context.jmxConnectionFactory .connectAny(Optional.fromNullable(this), potentialCoordinators)) { @@ -220,7 +222,7 @@ protected Set initialize() { } catch (InterruptedException e) { LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId, e); } finally { - RepairSegment resultingSegment = context.storage.getRepairSegment(segmentId).get(); + RepairSegment resultingSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); LOG.info("Repair command {} on segment {} returned with state {}", commandId, segmentId, resultingSegment.getState()); if (resultingSegment.getState() == RepairSegment.State.RUNNING) { @@ -382,7 +384,7 @@ private void abort(RepairSegment segment, JmxProxy jmxConnection) { */ @Override public void handle(int repairNumber, Optional status, Optional progress, String message) { - final RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId); LOG.debug( "handle called for repairCommandId {}, outcome {} / {} and message: {}", @@ -396,7 +398,7 @@ public void handle(int repairNumber, Optional status boolean failOutsideSynchronizedBlock = false; // DO NOT ADD EXTERNAL CALLS INSIDE THIS SYNCHRONIZED BLOCK (JMX PROXY ETC) synchronized (condition) { - RepairSegment currentSegment = context.storage.getRepairSegment(segmentId).get(); + RepairSegment currentSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); // See status explanations at: https://wiki.apache.org/cassandra/RepairAsyncAPI // Old repair API if(status.isPresent()) { @@ -524,7 +526,7 @@ public static String parseRepairId(String message) { * @return the delay in milliseconds. */ long intensityBasedDelayMillis(double intensity) { - RepairSegment repairSegment = context.storage.getRepairSegment(segmentId).get(); + RepairSegment repairSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get(); if (repairSegment.getEndTime() == null && repairSegment.getStartTime() == null) { return 0; } else if (repairSegment.getEndTime() != null && repairSegment.getStartTime() != null) { diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index 8c6e59977..2f3358410 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -75,16 +75,13 @@ public final class CassandraStorage implements IStorage { private PreparedStatement getRepairUnitPrepStmt; private PreparedStatement insertRepairSegmentPrepStmt; private PreparedStatement getRepairSegmentPrepStmt; - private PreparedStatement insertRepairSegmentByRunPrepStmt; - private PreparedStatement getRepairSegmentByRunIdPrepStmt; + private PreparedStatement getRepairSegmentsByRunIdPrepStmt; private PreparedStatement insertRepairSchedulePrepStmt; private PreparedStatement getRepairSchedulePrepStmt; private PreparedStatement getRepairScheduleByClusterAndKsPrepStmt; private PreparedStatement insertRepairScheduleByClusterAndKsPrepStmt; private PreparedStatement deleteRepairSchedulePrepStmt; private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt; - private PreparedStatement deleteRepairSegmentPrepStmt; - private PreparedStatement deleteRepairSegmentByRunId; public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) { cassandra = config.getCassandraFactory().build(environment).register(QueryLogger.builder().build()); @@ -96,7 +93,7 @@ public CassandraStorage(ReaperApplicationConfiguration config, Environment envir Database database = new Database(cassandra, config.getCassandraFactory().getKeyspace()); MigrationTask migration = new MigrationTask(database, new MigrationRepository("db/cassandra")); migration.migrate(); - + prepareStatements(); } @@ -107,23 +104,20 @@ private void prepareStatements(){ insertRepairRunPrepStmt = session.prepare("INSERT INTO repair_run(id, cluster_name, repair_unit_id, cause, owner, state, creation_time, start_time, end_time, pause_time, intensity, last_event, segment_count, repair_parallelism) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); insertRepairRunClusterIndexPrepStmt = session.prepare("INSERT INTO repair_run_by_cluster(cluster_name, id) values(?, ?)"); insertRepairRunUnitIndexPrepStmt = session.prepare("INSERT INTO repair_run_by_unit(repair_unit_id, id) values(?, ?)"); - getRepairRunPrepStmt = session.prepare("SELECT * FROM repair_run WHERE id = ?"); + getRepairRunPrepStmt = session.prepare("SELECT id,cluster_name,repair_unit_id,cause,owner,state,creation_time,start_time,end_time,pause_time,intensity,last_event,segment_count,repair_parallelism FROM repair_run WHERE id = ? LIMIT 1"); getRepairRunForClusterPrepStmt = session.prepare("SELECT * FROM repair_run_by_cluster WHERE cluster_name = ?"); getRepairRunForUnitPrepStmt = session.prepare("SELECT * FROM repair_run_by_unit WHERE repair_unit_id = ?"); deleteRepairRunPrepStmt = session.prepare("DELETE FROM repair_run WHERE id = ?"); deleteRepairRunByClusterPrepStmt = session.prepare("DELETE FROM repair_run_by_cluster WHERE id = ? and cluster_name = ?"); deleteRepairRunByUnitPrepStmt = session.prepare("DELETE FROM repair_run_by_unit WHERE id = ? and repair_unit_id= ?"); - deleteRepairSegmentPrepStmt = session.prepare("DELETE FROM repair_segment WHERE id = ?"); - deleteRepairSegmentByRunId = session.prepare("DELETE FROM repair_segment_by_run_id WHERE run_id = ?"); insertRepairUnitPrepStmt = session.prepare("INSERT INTO repair_unit(id, cluster_name, keyspace_name, column_families, incremental_repair) VALUES(?, ?, ?, ?, ?)"); getRepairUnitPrepStmt = session.prepare("SELECT * FROM repair_unit WHERE id = ?"); - insertRepairSegmentPrepStmt = session.prepare("INSERT INTO repair_segment(id, repair_unit_id, run_id, start_token, end_token, state, coordinator_host, start_time, end_time, fail_count) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); - getRepairSegmentPrepStmt = session.prepare("SELECT * FROM repair_segment WHERE id = ?"); - insertRepairSegmentByRunPrepStmt = session.prepare("INSERT INTO repair_segment_by_run_id(run_id, segment_id) VALUES(?, ?)"); - getRepairSegmentByRunIdPrepStmt = session.prepare("SELECT * FROM repair_segment_by_run_id WHERE run_id = ?"); + insertRepairSegmentPrepStmt = session.prepare("INSERT INTO repair_run(id, segment_id, repair_unit_id, start_token, end_token, segment_state, coordinator_host, segment_start_time, segment_end_time, fail_count) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + getRepairSegmentPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ? and segment_id = ?"); + getRepairSegmentsByRunIdPrepStmt = session.prepare("SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,segment_end_time,fail_count FROM repair_run WHERE id = ?"); insertRepairSchedulePrepStmt = session.prepare("INSERT INTO repair_schedule(id, repair_unit_id, state, days_between, next_activation, run_history, segment_count, repair_parallelism, intensity, creation_time, owner, pause_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); getRepairSchedulePrepStmt = session.prepare("SELECT * FROM repair_schedule WHERE id = ?"); - insertRepairScheduleByClusterAndKsPrepStmt = session.prepare("INSERT INTO repair_schedule_by_cluster_and_keyspace(cluster_name, keyspace_name, repair_schedule_id) VALUES(?, ?, ?)"); + insertRepairScheduleByClusterAndKsPrepStmt = session.prepare("INSERT INTO repair_schedule_by_cluster_and_keyspace(cluster_name, keyspace_name, repair_schedule_id) VALUES(?, ?, ?)"); getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?"); deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule WHERE id = ?"); deleteRepairScheduleByClusterAndKsPrepStmt = session.prepare("DELETE FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ? and repair_schedule_id = ?"); @@ -181,25 +175,49 @@ public Optional deleteCluster(String clusterName) { public RepairRun addRepairRun(Builder repairRun, Collection newSegments) { RepairRun newRepairRun = repairRun.build(UUIDs.timeBased()); BatchStatement batch = new BatchStatement(); - batch.add(insertRepairRunPrepStmt.bind(newRepairRun.getId(), - newRepairRun.getClusterName(), - newRepairRun.getRepairUnitId(), - newRepairRun.getCause(), - newRepairRun.getOwner(), - newRepairRun.getRunState().toString(), - newRepairRun.getCreationTime()==null?null:newRepairRun.getCreationTime(), - newRepairRun.getStartTime()==null?null:newRepairRun.getStartTime(), - newRepairRun.getEndTime()==null?null:newRepairRun.getEndTime(), - newRepairRun.getPauseTime()==null?null:newRepairRun.getPauseTime(), - newRepairRun.getIntensity(), - newRepairRun.getLastEvent(), - newRepairRun.getSegmentCount(), - newRepairRun.getRepairParallelism().toString()) - ); + BatchStatement repairRunBatch = new BatchStatement(BatchStatement.Type.UNLOGGED); + + repairRunBatch.add(insertRepairRunPrepStmt.bind( + newRepairRun.getId(), + newRepairRun.getClusterName(), + newRepairRun.getRepairUnitId(), + newRepairRun.getCause(), + newRepairRun.getOwner(), + newRepairRun.getRunState().toString(), + newRepairRun.getCreationTime()==null?null:newRepairRun.getCreationTime(), + newRepairRun.getStartTime()==null?null:newRepairRun.getStartTime(), + newRepairRun.getEndTime()==null?null:newRepairRun.getEndTime(), + newRepairRun.getPauseTime()==null?null:newRepairRun.getPauseTime(), + newRepairRun.getIntensity(), + newRepairRun.getLastEvent(), + newRepairRun.getSegmentCount(), + newRepairRun.getRepairParallelism().toString())); + batch.add(insertRepairRunClusterIndexPrepStmt.bind(newRepairRun.getClusterName(), newRepairRun.getId())); batch.add(insertRepairRunUnitIndexPrepStmt.bind(newRepairRun.getRepairUnitId(), newRepairRun.getId())); session.execute(batch); - addRepairSegments(newSegments, newRepairRun.getId()); + + for(RepairSegment.Builder builder:newSegments){ + RepairSegment segment = builder.withRunId(newRepairRun.getId()).build(UUIDs.timeBased()); + + repairRunBatch.add(insertRepairSegmentPrepStmt.bind( + segment.getRunId(), + segment.getId(), + segment.getRepairUnitId(), + segment.getStartToken(), + segment.getEndToken(), + segment.getState().ordinal(), + segment.getCoordinatorHost(), + segment.getStartTime(), + segment.getEndTime(), + segment.getFailCount())); + + if(100 == repairRunBatch.size()){ + session.execute(repairRunBatch); + repairRunBatch = new BatchStatement(BatchStatement.Type.UNLOGGED); + } + } + session.execute(repairRunBatch); return newRepairRun; } @@ -256,7 +274,7 @@ public Collection getRepairRunsForUnit(UUID repairUnitId) { /** * Create a collection of RepairRun objects out of a list of ResultSetFuture. * Used to handle async queries on the repair_run table with a list of ids. - * + * * @param repairRunFutures * @return */ @@ -299,21 +317,6 @@ public Optional deleteRepairRun(UUID id) { batch.add(deleteRepairRunByUnitPrepStmt.bind(id, repairRun.get().getRepairUnitId())); session.execute(batch); } - - // Delete all segments for the run we've deleted - List futures= Lists.newArrayList(); - Collection segments = getRepairSegmentsForRun(id); - int i=0; - final int nbSegments = segments.size(); - futures.add(session.executeAsync(deleteRepairSegmentByRunId.bind(id))); - for(RepairSegment segment:segments){ - futures.add(session.executeAsync(deleteRepairSegmentPrepStmt.bind(segment.getId()))); - i++; - if(i%100==0 || i==nbSegments-1){ - futures.stream().forEach(f -> f.getUninterruptibly()); - } - } - return repairRun; } @@ -338,7 +341,7 @@ public Optional getRepairUnit(UUID id) { public Optional getRepairUnit(String cluster, String keyspace, Set columnFamilyNames) { // brute force again RepairUnit repairUnit=null; - ResultSet results = session.execute("SELECT * FROM repair_unit"); + ResultSet results = session.execute("SELECT * FROM repair_unit"); for(Row repairUnitRow:results){ if(repairUnitRow.getString("cluster_name").equals(cluster) && repairUnitRow.getString("keyspace_name").equals(keyspace) @@ -352,60 +355,31 @@ public Optional getRepairUnit(String cluster, String keyspace, Set newSegments, UUID runId) { - List insertFutures = Lists.newArrayList(); - BatchStatement batch = new BatchStatement(); - for(RepairSegment.Builder builder:newSegments){ - RepairSegment segment = builder.withRunId(runId).build(UUIDs.timeBased()); - insertFutures.add(session.executeAsync( - insertRepairSegmentPrepStmt.bind( - segment.getId(), - segment.getRepairUnitId(), - segment.getRunId(), - segment.getStartToken(), - segment.getEndToken(), - segment.getState().ordinal(), - segment.getCoordinatorHost(), - segment.getStartTime(), - segment.getEndTime(), - segment.getFailCount()))); - - batch.add(insertRepairSegmentByRunPrepStmt.bind(segment.getRunId(), segment.getId())); - if(insertFutures.size()%100==0){ - // cluster ddos protection - session.execute(batch); - batch.clear(); - for(ResultSetFuture insertFuture:insertFutures){ - insertFuture.getUninterruptibly(); - } - insertFutures = Lists.newArrayList(); - } - } - - // Wait for last queries to ack - if(batch.size()>0) { - session.execute(batch); - } - - for(ResultSetFuture insertFuture:insertFutures){ - insertFuture.getUninterruptibly(); - } - } - @Override public boolean updateRepairSegment(RepairSegment newRepairSegment) { Date startTime = null; if (newRepairSegment.getStartTime() != null) { startTime = newRepairSegment.getStartTime().toDate(); } - session.executeAsync(insertRepairSegmentPrepStmt.bind(newRepairSegment.getId(), newRepairSegment.getRepairUnitId(), newRepairSegment.getRunId(), newRepairSegment.getStartToken(), newRepairSegment.getEndToken(), newRepairSegment.getState().ordinal(), newRepairSegment.getCoordinatorHost(), startTime, newRepairSegment.getEndTime().toDate(), newRepairSegment.getFailCount())); + session.executeAsync(insertRepairSegmentPrepStmt.bind( + newRepairSegment.getRunId(), + newRepairSegment.getId(), + newRepairSegment.getRepairUnitId(), + newRepairSegment.getStartToken(), + newRepairSegment.getEndToken(), + newRepairSegment.getState().ordinal(), + newRepairSegment.getCoordinatorHost(), + startTime, + newRepairSegment.getEndTime().toDate(), + newRepairSegment.getFailCount())); + return true; } @Override - public Optional getRepairSegment(UUID id) { + public Optional getRepairSegment(UUID runId, UUID segmentId) { RepairSegment segment = null; - Row segmentRow = session.execute(getRepairSegmentPrepStmt.bind(id)).one(); + Row segmentRow = session.execute(getRepairSegmentPrepStmt.bind(runId, segmentId)).one(); if(segmentRow != null){ segment = createRepairSegmentFromRow(segmentRow); } @@ -415,53 +389,28 @@ public Optional getRepairSegment(UUID id) { @Override public Collection getRepairSegmentsForRun(UUID runId) { - List segmentsFuture = Lists.newArrayList(); Collection segments = Lists.newArrayList(); - // First gather segments ids - ResultSet segmentsIdResultSet = session.execute(getRepairSegmentByRunIdPrepStmt.bind(runId)); - int i=0; - for(Row segmentIdResult:segmentsIdResultSet) { - // Then get segments by id - segmentsFuture.add(session.executeAsync(getRepairSegmentPrepStmt.bind(segmentIdResult.getUUID("segment_id")))); - i++; - if(i%100==0 || segmentsIdResultSet.isFullyFetched()) { - segments.addAll(fetchRepairSegmentFromFutures(segmentsFuture)); - segmentsFuture = Lists.newArrayList(); - } - } - - return segments; - } - - private Collection fetchRepairSegmentFromFutures(List segmentsFuture){ - Collection segments = Lists.newArrayList(); - - for(ResultSetFuture segmentResult:segmentsFuture) { - Row segmentRow = segmentResult.getUninterruptibly().one(); - if(segmentRow!=null){ + ResultSet segmentsIdResultSet = session.execute(getRepairSegmentsByRunIdPrepStmt.bind(runId)); + for(Row segmentRow : segmentsIdResultSet) { segments.add(createRepairSegmentFromRow(segmentRow)); - } - } - + } return segments; - } - private RepairSegment createRepairSegmentFromRow(Row segmentRow){ - return createRepairSegmentFromRow(segmentRow, segmentRow.getUUID("id")); - } - private RepairSegment createRepairSegmentFromRow(Row segmentRow, UUID segmentId){ + private static RepairSegment createRepairSegmentFromRow(Row segmentRow){ return new RepairSegment.Builder( - new RingRange(new BigInteger(segmentRow.getVarint("start_token") +""), new BigInteger(segmentRow.getVarint("end_token")+"")), + new RingRange( + new BigInteger(segmentRow.getVarint("start_token") +""), + new BigInteger(segmentRow.getVarint("end_token")+"")), segmentRow.getUUID("repair_unit_id")) - .withRunId(segmentRow.getUUID("run_id")) + .withRunId(segmentRow.getUUID("id")) .coordinatorHost(segmentRow.getString("coordinator_host")) - .endTime(new DateTime(segmentRow.getTimestamp("end_time"))) + .endTime(new DateTime(segmentRow.getTimestamp("segment_end_time"))) .failCount(segmentRow.getInt("fail_count")) - .startTime(new DateTime(segmentRow.getTimestamp("start_time"))) - .state(State.values()[segmentRow.getInt("state")]) - .build(segmentRow.getUUID("id")); + .startTime(new DateTime(segmentRow.getTimestamp("segment_start_time"))) + .state(State.values()[segmentRow.getInt("segment_state")]) + .build(segmentRow.getUUID("segment_id")); } @@ -482,7 +431,7 @@ public int compare(RepairSegment seg1, RepairSegment seg2) { for(RepairSegment seg:segments){ if(seg.getState().equals(State.NOT_STARTED) // State condition - && ((range.isPresent() && + && ((range.isPresent() && (range.get().getStart().compareTo(seg.getStartToken())>=0 || range.get().getEnd().compareTo(seg.getEndToken())<=0) ) || !range.isPresent()) // Token range condition ){ @@ -581,13 +530,13 @@ public Optional getRepairSchedule(UUID repairScheduleId) { private RepairSchedule createRepairScheduleFromRow(Row repairScheduleRow){ return new RepairSchedule.Builder(repairScheduleRow.getUUID("repair_unit_id"), - RepairSchedule.State.valueOf(repairScheduleRow.getString("state")), - repairScheduleRow.getInt("days_between"), - new DateTime(repairScheduleRow.getTimestamp("next_activation")), + RepairSchedule.State.valueOf(repairScheduleRow.getString("state")), + repairScheduleRow.getInt("days_between"), + new DateTime(repairScheduleRow.getTimestamp("next_activation")), ImmutableList.copyOf(repairScheduleRow.getSet("run_history", UUID.class)), - repairScheduleRow.getInt("segment_count"), - RepairParallelism.fromName(repairScheduleRow.getString("repair_parallelism")), - repairScheduleRow.getDouble("intensity"), + repairScheduleRow.getInt("segment_count"), + RepairParallelism.fromName(repairScheduleRow.getString("repair_parallelism")), + repairScheduleRow.getDouble("intensity"), new DateTime(repairScheduleRow.getTimestamp("creation_time"))) .owner(repairScheduleRow.getString("owner")) .pauseTime(new DateTime(repairScheduleRow.getTimestamp("pause_time"))).build(repairScheduleRow.getUUID("id")); @@ -655,17 +604,17 @@ public boolean updateRepairSchedule(RepairSchedule newRepairSchedule) { final Set repairHistory = Sets.newHashSet(); repairHistory.addAll(newRepairSchedule.getRunHistory()); - batch.add(insertRepairSchedulePrepStmt.bind(newRepairSchedule.getId(), - newRepairSchedule.getRepairUnitId(), - newRepairSchedule.getState().toString(), - newRepairSchedule.getDaysBetween(), - newRepairSchedule.getNextActivation(), - repairHistory, + batch.add(insertRepairSchedulePrepStmt.bind(newRepairSchedule.getId(), + newRepairSchedule.getRepairUnitId(), + newRepairSchedule.getState().toString(), + newRepairSchedule.getDaysBetween(), + newRepairSchedule.getNextActivation(), + repairHistory, newRepairSchedule.getSegmentCount(), - newRepairSchedule.getRepairParallelism().toString(), - newRepairSchedule.getIntensity(), - newRepairSchedule.getCreationTime(), - newRepairSchedule.getOwner(), + newRepairSchedule.getRepairParallelism().toString(), + newRepairSchedule.getIntensity(), + newRepairSchedule.getCreationTime(), + newRepairSchedule.getOwner(), newRepairSchedule.getPauseTime()) ); RepairUnit repairUnit = getRepairUnit(newRepairSchedule.getRepairUnitId()).get(); @@ -700,33 +649,39 @@ public Collection getClusterRunStatuses(String clusterName, int for (RepairRun repairRun:repairRuns){ Collection segments = getRepairSegmentsForRun(repairRun.getId()); Optional repairUnit = getRepairUnit(repairRun.getRepairUnitId()); - + int segmentsRepaired = (int) segments.stream() .filter(seg -> seg.getState().equals(RepairSegment.State.DONE)) .count(); - + repairRunStatuses.add(new RepairRunStatus(repairRun, repairUnit.get(), segmentsRepaired)); } - + return repairRunStatuses; } @Override public Collection getClusterScheduleStatuses(String clusterName) { Collection repairSchedules = getRepairSchedulesForCluster(clusterName); - + Collection repairScheduleStatuses = repairSchedules .stream() .map(sched -> new RepairScheduleStatus(sched, getRepairUnit(sched.getRepairUnitId()).get())) .collect(Collectors.toList()); - + return repairScheduleStatuses; } private RepairRun buildRepairRunFromRow(Row repairRunResult, UUID id){ - return new RepairRun.Builder(repairRunResult.getString("cluster_name"), repairRunResult.getUUID("repair_unit_id"), new DateTime(repairRunResult.getTimestamp("creation_time")), repairRunResult.getDouble("intensity"), repairRunResult.getInt("segment_count"), RepairParallelism.fromName(repairRunResult.getString("repair_parallelism"))) + return new RepairRun.Builder( + repairRunResult.getString("cluster_name"), + repairRunResult.getUUID("repair_unit_id"), + new DateTime(repairRunResult.getTimestamp("creation_time")), + repairRunResult.getDouble("intensity"), + repairRunResult.getInt("segment_count"), + RepairParallelism.fromName(repairRunResult.getString("repair_parallelism"))) .cause(repairRunResult.getString("cause")) .owner(repairRunResult.getString("owner")) .endTime(new DateTime(repairRunResult.getTimestamp("end_time"))) diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 52cb8db33..f5c74fb18 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -94,7 +94,7 @@ Optional getRepairUnit(String cluster, String keyspace, boolean updateRepairSegment(RepairSegment newRepairSegment); - Optional getRepairSegment(UUID id); + Optional getRepairSegment(UUID runId, UUID segmentId); Collection getRepairSegmentsForRun(UUID runId); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index d5ede9b90..e72718917 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -242,7 +242,7 @@ private void addRepairSegments(Collection segments, UUID @Override public boolean updateRepairSegment(RepairSegment newRepairSegment) { - if (getRepairSegment(newRepairSegment.getId()) == null) { + if (getRepairSegment(newRepairSegment.getRunId(), newRepairSegment.getId()) == null) { return false; } else { repairSegments.put(newRepairSegment.getId(), newRepairSegment); @@ -254,8 +254,8 @@ public boolean updateRepairSegment(RepairSegment newRepairSegment) { } @Override - public Optional getRepairSegment(UUID id) { - return Optional.fromNullable(repairSegments.get(id)); + public Optional getRepairSegment(UUID runId, UUID segmentId) { + return Optional.fromNullable(repairSegments.get(segmentId)); } @Override diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index a953c949d..c3187c50e 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -310,10 +310,10 @@ public boolean updateRepairSegment(RepairSegment repairSegment) { } @Override - public Optional getRepairSegment(UUID id) { + public Optional getRepairSegment(UUID runId, UUID segmentId) { RepairSegment result; try (Handle h = jdbi.open()) { - result = getPostgresStorage(h).getRepairSegment(toSequenceId(id)); + result = getPostgresStorage(h).getRepairSegment(toSequenceId(segmentId)); } return Optional.fromNullable(result); } diff --git a/src/main/resources/db/cassandra/001_Initialize_db.cql b/src/main/resources/db/cassandra/001_Initialize_db.cql index 0830ffdd1..06e67f952 100644 --- a/src/main/resources/db/cassandra/001_Initialize_db.cql +++ b/src/main/resources/db/cassandra/001_Initialize_db.cql @@ -23,24 +23,32 @@ CREATE TABLE IF NOT EXISTS repair_unit ( column_families set, incremental_repair boolean ) - WITH compaction = {'class': 'LeveledCompactionStrategy'} - AND caching = {'rows_per_partition': 10}; + WITH compaction = {'class': 'LeveledCompactionStrategy'}; CREATE TABLE IF NOT EXISTS repair_run ( - id timeuuid PRIMARY KEY, - cluster_name text, - repair_unit_id timeuuid, - cause text, - owner text, - state text, - creation_time timestamp, - start_time timestamp, - end_time timestamp, - pause_time timestamp, - intensity double , - last_event text , - segment_count int , - repair_parallelism text + id timeuuid, + cluster_name text STATIC, + repair_unit_id timeuuid STATIC, + cause text STATIC, + owner text STATIC, + state text STATIC, + creation_time timestamp STATIC, + start_time timestamp STATIC, + end_time timestamp STATIC, + pause_time timestamp STATIC, + intensity double STATIC, + last_event text STATIC, + segment_count int STATIC, + repair_parallelism text STATIC, + segment_id timeuuid, + start_token varint, + end_token varint, + segment_state int, + coordinator_host text, + segment_start_time timestamp, + segment_end_time timestamp, + fail_count int, + PRIMARY KEY (id, segment_id) ) WITH compaction = {'class': 'LeveledCompactionStrategy'} AND caching = {'rows_per_partition': 10}; @@ -61,29 +69,6 @@ CREATE TABLE IF NOT EXISTS repair_run_by_unit( WITH compaction = {'class': 'LeveledCompactionStrategy'} AND caching = {'rows_per_partition': 10}; -CREATE TABLE IF NOT EXISTS repair_segment ( - id timeuuid PRIMARY KEY, - repair_unit_id timeuuid, - run_id timeuuid, - start_token varint, - end_token varint, - state int , - coordinator_host text, - start_time timestamp, - end_time timestamp, - fail_count INT -) - WITH compaction = {'class': 'LeveledCompactionStrategy'} - AND caching = {'rows_per_partition': 10}; - -CREATE TABLE IF NOT EXISTS repair_segment_by_run_id ( - run_id timeuuid, - segment_id timeuuid, - PRIMARY KEY(run_id, segment_id) -) - WITH compaction = {'class': 'LeveledCompactionStrategy'} - AND caching = {'rows_per_partition': 10}; - CREATE TABLE IF NOT EXISTS repair_schedule ( id timeuuid PRIMARY KEY, @@ -99,8 +84,7 @@ CREATE TABLE IF NOT EXISTS repair_schedule ( owner text , pause_time timestamp ) - WITH compaction = {'class': 'LeveledCompactionStrategy'} - AND caching = {'rows_per_partition': 10}; + WITH compaction = {'class': 'LeveledCompactionStrategy'}; CREATE TABLE IF NOT EXISTS repair_schedule_by_cluster_and_keyspace( diff --git a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java index 81c5e71c7..0aee74c43 100644 --- a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java @@ -93,7 +93,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException { final UUID RUN_ID = run.getId(); final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); - assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), + assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); AppContext context = new AppContext(); context.storage = storage; @@ -121,7 +121,7 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) throws Throwable { assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); final int repairNumber = repairAttempts.getAndIncrement(); switch (repairNumber) { @@ -132,7 +132,7 @@ public void run() { handler.get() .handle(repairNumber, Optional.of(ActiveRepairService.Status.STARTED), Optional.absent(), null); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); } }.start(); break; @@ -143,11 +143,11 @@ public void run() { handler.get() .handle(repairNumber, Optional.of(ActiveRepairService.Status.STARTED), Optional.absent(), null); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); handler.get() .handle(repairNumber, Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), Optional.absent(), null); assertEquals(RepairSegment.State.DONE, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); handler.get() .handle(repairNumber, Optional.of(ActiveRepairService.Status.FINISHED), Optional.absent(), null); mutex.release(); @@ -196,7 +196,7 @@ public void testHangingRepairNewAPI() throws InterruptedException, ReaperExcepti final UUID RUN_ID = run.getId(); final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); - assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), + assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); AppContext context = new AppContext(); context.storage = storage; @@ -225,7 +225,7 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) throws Throwable { assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); final int repairNumber = repairAttempts.getAndIncrement(); switch (repairNumber) { @@ -236,7 +236,7 @@ public void run() { handler.get() .handle(repairNumber, Optional.absent(), Optional.of(ProgressEventType.START), null); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); } }.start(); break; @@ -247,11 +247,11 @@ public void run() { handler.get() .handle(repairNumber, Optional.absent(), Optional.of(ProgressEventType.START), null); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); handler.get() .handle(repairNumber, Optional.absent(), Optional.of(ProgressEventType.SUCCESS), null); assertEquals(RepairSegment.State.DONE, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); handler.get() .handle(repairNumber, Optional.absent(), Optional.of(ProgressEventType.COMPLETE), null); mutex.release(); @@ -308,7 +308,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException { context.repairManager.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, TimeUnit.MILLISECONDS); - assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), + assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); context.jmxConnectionFactory = new JmxConnectionFactory() { @Override @@ -327,7 +327,7 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) throws Throwable { assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(SEGMENT_ID).get().getState()); + storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState()); new Thread() { @Override public void run() { diff --git a/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java index 683e03d41..e5bf8d10c 100644 --- a/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java @@ -73,6 +73,8 @@ public void timeoutTest() throws InterruptedException, ReaperException, Executio RepairRun run = context.storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); + + final UUID runId = run.getId(); final UUID segmentId = context.storage.getNextFreeSegment(run.getId()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -92,14 +94,14 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) { assertEquals(RepairSegment.State.NOT_STARTED, - context.storage.getRepairSegment(segmentId).get().getState()); + context.storage.getRepairSegment(runId, segmentId).get().getState()); future.setValue(executor.submit(new Thread() { @Override public void run() { handler.get().handle(1, Optional.of(ActiveRepairService.Status.STARTED), Optional.absent(), "Repair command 1 has started"); assertEquals(RepairSegment.State.RUNNING, - context.storage.getRepairSegment(segmentId).get().getState()); + context.storage.getRepairSegment(runId, segmentId).get().getState()); } })); return 1; @@ -119,8 +121,8 @@ public void run() { executor.shutdown(); assertEquals(RepairSegment.State.NOT_STARTED, - context.storage.getRepairSegment(segmentId).get().getState()); - assertEquals(1, context.storage.getRepairSegment(segmentId).get().getFailCount()); + context.storage.getRepairSegment(runId, segmentId).get().getState()); + assertEquals(1, context.storage.getRepairSegment(runId, segmentId).get().getFailCount()); } @Test @@ -131,6 +133,7 @@ public void successTest() throws InterruptedException, ReaperException, Executio RepairRun run = storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); + final UUID runId = run.getId(); final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -152,25 +155,25 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) { assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); future.setValue(executor.submit(new Runnable() { @Override public void run() { handler.get().handle(1, Optional.of(ActiveRepairService.Status.STARTED), Optional.absent(), "Repair command 1 has started"); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); // report about an unrelated repair. Shouldn't affect anything. handler.get().handle(2, Optional.of(ActiveRepairService.Status.SESSION_FAILED), Optional.absent(), "Repair command 2 has failed"); handler.get().handle(1, Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), Optional.absent(), "Repair session succeeded in command 1"); assertEquals(RepairSegment.State.DONE, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); handler.get().handle(1, Optional.of(ActiveRepairService.Status.FINISHED), Optional.absent(), "Repair command 1 has finished"); assertEquals(RepairSegment.State.DONE, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); } })); return 1; @@ -189,8 +192,8 @@ public void run() { future.getValue().get(); executor.shutdown(); - assertEquals(RepairSegment.State.DONE, storage.getRepairSegment(segmentId).get().getState()); - assertEquals(0, storage.getRepairSegment(segmentId).get().getFailCount()); + assertEquals(RepairSegment.State.DONE, storage.getRepairSegment(runId, segmentId).get().getState()); + assertEquals(0, storage.getRepairSegment(runId, segmentId).get().getFailCount()); } @Test @@ -202,6 +205,7 @@ public void failureTest() throws InterruptedException, ReaperException, Executio RepairRun run = storage.addRepairRun( new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL), Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId()))); + final UUID runId = run.getId(); final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); final ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -223,22 +227,22 @@ public JmxProxy connect(final Optional handler, String host @Override public Integer answer(InvocationOnMock invocation) { assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); future.setValue(executor.submit(new Runnable() { @Override public void run() { handler.get().handle(1, Optional.of(ActiveRepairService.Status.STARTED), Optional.absent(), "Repair command 1 has started"); assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); handler.get().handle(1, Optional.of(ActiveRepairService.Status.SESSION_FAILED), Optional.absent(), "Repair command 1 has failed"); assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); handler.get().handle(1, Optional.of(ActiveRepairService.Status.FINISHED), Optional.absent(), "Repair command 1 has finished"); assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); + storage.getRepairSegment(runId, segmentId).get().getState()); } })); @@ -259,8 +263,8 @@ public void run() { executor.shutdown(); assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - assertEquals(1, storage.getRepairSegment(segmentId).get().getFailCount()); + storage.getRepairSegment(runId, segmentId).get().getState()); + assertEquals(1, storage.getRepairSegment(runId, segmentId).get().getFailCount()); } @Test diff --git a/src/test/resources/cassandra-reaper-cassandra-at.yaml b/src/test/resources/cassandra-reaper-cassandra-at.yaml index bb04f9c8c..ccaff49ef 100644 --- a/src/test/resources/cassandra-reaper-cassandra-at.yaml +++ b/src/test/resources/cassandra-reaper-cassandra-at.yaml @@ -8,6 +8,7 @@ repairRunThreadCount: 15 hangingRepairTimeoutMins: 1 storageType: cassandra incrementalRepair: false +allowUnreachableNodes: true logging: level: DEBUG