diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 27a52cf7c..36f9689f9 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -22,6 +22,7 @@ import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.resources.view.RepairRunStatus; import com.spotify.reaper.resources.view.RepairScheduleStatus; +import com.spotify.reaper.service.RepairParameters; import com.spotify.reaper.service.RingRange; import java.util.Collection; @@ -110,6 +111,8 @@ Optional getRepairUnit(String cluster, String keyspace, Collection getSegmentsWithState(long runId, RepairSegment.State segmentState); + Collection getOngoingRepairsInCluster(String clusterName); + Collection getRepairRunIdsForCluster(String clusterName); int getSegmentAmountForRepairRun(long runId); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 07ea122f4..ce29c6c46 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -13,7 +13,10 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Collections2; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -25,6 +28,7 @@ import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.resources.view.RepairRunStatus; import com.spotify.reaper.resources.view.RepairScheduleStatus; +import com.spotify.reaper.service.RepairParameters; import com.spotify.reaper.service.RingRange; import java.util.ArrayList; @@ -299,6 +303,31 @@ public Collection getSegmentsWithState(long runId, return segments; } + @Override + public Collection getOngoingRepairsInCluster(String clusterName) { + Collection runningRuns = getRepairRunsWithState(RepairRun.RunState.RUNNING); + FluentIterable ongoingRepairs = FluentIterable.from(runningRuns).transformAndConcat( + new Function>() { + @Override + public Iterable apply(final RepairRun run) { + return Collections2.transform( + getSegmentsWithState(run.getId(), RepairSegment.State.RUNNING), + new Function() { + @Override + public RepairParameters apply(RepairSegment segment) { + RepairUnit unit = getRepairUnit(segment.getRepairUnitId()).get(); + return new RepairParameters( + segment.getTokenRange(), + unit.getKeyspaceName(), + unit.getColumnFamilies(), + run.getRepairParallelism()); + } + }); + } + }); + return ongoingRepairs.toList(); + } + @Override public Collection getRepairRunIdsForCluster(String clusterName) { Collection repairRunIds = new HashSet<>(); diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index c5984aaa9..14dd0906a 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -25,6 +25,7 @@ import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.resources.view.RepairRunStatus; import com.spotify.reaper.resources.view.RepairScheduleStatus; +import com.spotify.reaper.service.RepairParameters; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.storage.postgresql.BigIntegerArgumentFactory; import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; @@ -367,6 +368,13 @@ public Collection getSegmentsWithState(long runId, return result; } + @Override + public Collection getOngoingRepairsInCluster(String clusterName) { + try (Handle h = jdbi.open()) { + return getPostgresStorage(h).getRunningRepairsForCluster(clusterName); + } + } + @Override public Collection getRepairRunIdsForCluster(String clusterName) { Collection result; diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java index a32b453ba..77d3bda49 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -20,6 +20,7 @@ import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.resources.view.RepairRunStatus; import com.spotify.reaper.resources.view.RepairScheduleStatus; +import com.spotify.reaper.service.RepairParameters; import org.skife.jdbi.v2.sqlobject.Bind; import org.skife.jdbi.v2.sqlobject.BindBean; @@ -120,6 +121,12 @@ public interface IStoragePostgreSQL { String SQL_GET_REPAIR_SEGMENTS_FOR_RUN_WITH_STATE = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE " + "run_id = :runId AND state = :state"; + String SQL_GET_RUNNING_REPAIRS_FOR_CLUSTER = + "SELECT start_token, end_token, keyspace_name, column_families, repair_parallelism " + + "FROM repair_segment " + + "JOIN repair_run ON run_id = repair_run.id " + + "JOIN repair_unit ON repair_run.repair_unit_id = repair_unit.id " + + "WHERE repair_segment.state = 1 AND repair_unit.cluster_name = :clusterName"; String SQL_GET_NEXT_FREE_REPAIR_SEGMENT = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE run_id = :runId " + "AND state = 0 ORDER BY fail_count ASC, start_token ASC LIMIT 1"; @@ -289,6 +296,11 @@ Collection getRepairSegmentsForRunWithState( @Bind("runId") long runId, @Bind("state") RepairSegment.State state); + @SqlQuery(SQL_GET_RUNNING_REPAIRS_FOR_CLUSTER) + @Mapper(RepairParametersMapper.class) + Collection getRunningRepairsForCluster( + @Bind("clusterName") String clusterName); + @SqlQuery(SQL_GET_NEXT_FREE_REPAIR_SEGMENT) @Mapper(RepairSegmentMapper.class) RepairSegment getNextFreeRepairSegment(@Bind("runId") long runId); diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairParametersMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairParametersMapper.java new file mode 100644 index 000000000..8dc685aee --- /dev/null +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairParametersMapper.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.spotify.reaper.storage.postgresql; + +import com.google.common.collect.Sets; + +import com.spotify.reaper.service.RepairParameters; +import com.spotify.reaper.service.RingRange; + +import org.apache.cassandra.repair.RepairParallelism; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class RepairParametersMapper implements ResultSetMapper { + @Override + public RepairParameters map(int index, ResultSet r, StatementContext ctx) throws SQLException { + RingRange range = new RingRange(r.getBigDecimal("start_token").toBigInteger(), + r.getBigDecimal("end_token").toBigInteger()); + String[] columnFamilies = (String[]) r.getArray("column_families").getArray(); + RepairParallelism repairParallelism = + RepairParallelism.valueOf(r.getString("repair_parallelism")); + return new RepairParameters(range, + r.getString("keyspace_name"), + Sets.newHashSet(columnFamilies), + repairParallelism); + } +}