Skip to content

Commit

Permalink
Add storage query for all repair commands currently running in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Nov 9, 2015
1 parent 1c7d421 commit 8d89ba8
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;
import com.spotify.reaper.resources.view.RepairScheduleStatus;
import com.spotify.reaper.service.RepairParameters;
import com.spotify.reaper.service.RingRange;

import java.util.Collection;
Expand Down Expand Up @@ -110,6 +111,8 @@ Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,

Collection<RepairSegment> getSegmentsWithState(long runId, RepairSegment.State segmentState);

Collection<RepairParameters> getOngoingRepairsInCluster(String clusterName);

Collection<Long> getRepairRunIdsForCluster(String clusterName);

int getSegmentAmountForRepairRun(long runId);
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -25,6 +28,7 @@
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;
import com.spotify.reaper.resources.view.RepairScheduleStatus;
import com.spotify.reaper.service.RepairParameters;
import com.spotify.reaper.service.RingRange;

import java.util.ArrayList;
Expand Down Expand Up @@ -299,6 +303,31 @@ public Collection<RepairSegment> getSegmentsWithState(long runId,
return segments;
}

@Override
public Collection<RepairParameters> getOngoingRepairsInCluster(String clusterName) {
Collection<RepairRun> runningRuns = getRepairRunsWithState(RepairRun.RunState.RUNNING);
FluentIterable<RepairParameters> ongoingRepairs = FluentIterable.from(runningRuns).transformAndConcat(
new Function<RepairRun, Iterable<RepairParameters>>() {
@Override
public Iterable<RepairParameters> apply(final RepairRun run) {
return Collections2.transform(
getSegmentsWithState(run.getId(), RepairSegment.State.RUNNING),
new Function<RepairSegment, RepairParameters>() {
@Override
public RepairParameters apply(RepairSegment segment) {
RepairUnit unit = getRepairUnit(segment.getRepairUnitId()).get();
return new RepairParameters(
segment.getTokenRange(),
unit.getKeyspaceName(),
unit.getColumnFamilies(),
run.getRepairParallelism());
}
});
}
});
return ongoingRepairs.toList();
}

@Override
public Collection<Long> getRepairRunIdsForCluster(String clusterName) {
Collection<Long> repairRunIds = new HashSet<>();
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/spotify/reaper/storage/PostgresStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;
import com.spotify.reaper.resources.view.RepairScheduleStatus;
import com.spotify.reaper.service.RepairParameters;
import com.spotify.reaper.service.RingRange;
import com.spotify.reaper.storage.postgresql.BigIntegerArgumentFactory;
import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL;
Expand Down Expand Up @@ -367,6 +368,13 @@ public Collection<RepairSegment> getSegmentsWithState(long runId,
return result;
}

@Override
public Collection<RepairParameters> getOngoingRepairsInCluster(String clusterName) {
try (Handle h = jdbi.open()) {
return getPostgresStorage(h).getRunningRepairsForCluster(clusterName);
}
}

@Override
public Collection<Long> getRepairRunIdsForCluster(String clusterName) {
Collection<Long> result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.resources.view.RepairRunStatus;
import com.spotify.reaper.resources.view.RepairScheduleStatus;
import com.spotify.reaper.service.RepairParameters;

import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.BindBean;
Expand Down Expand Up @@ -120,6 +121,12 @@ public interface IStoragePostgreSQL {
String SQL_GET_REPAIR_SEGMENTS_FOR_RUN_WITH_STATE =
"SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE "
+ "run_id = :runId AND state = :state";
String SQL_GET_RUNNING_REPAIRS_FOR_CLUSTER =
"SELECT start_token, end_token, keyspace_name, column_families, repair_parallelism "
+ "FROM repair_segment "
+ "JOIN repair_run ON run_id = repair_run.id "
+ "JOIN repair_unit ON repair_run.repair_unit_id = repair_unit.id "
+ "WHERE repair_segment.state = 1 AND repair_unit.cluster_name = :clusterName";
String SQL_GET_NEXT_FREE_REPAIR_SEGMENT =
"SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE run_id = :runId "
+ "AND state = 0 ORDER BY fail_count ASC, start_token ASC LIMIT 1";
Expand Down Expand Up @@ -289,6 +296,11 @@ Collection<RepairSegment> getRepairSegmentsForRunWithState(
@Bind("runId") long runId,
@Bind("state") RepairSegment.State state);

@SqlQuery(SQL_GET_RUNNING_REPAIRS_FOR_CLUSTER)
@Mapper(RepairParametersMapper.class)
Collection<RepairParameters> getRunningRepairsForCluster(
@Bind("clusterName") String clusterName);

@SqlQuery(SQL_GET_NEXT_FREE_REPAIR_SEGMENT)
@Mapper(RepairSegmentMapper.class)
RepairSegment getNextFreeRepairSegment(@Bind("runId") long runId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.spotify.reaper.storage.postgresql;

import com.google.common.collect.Sets;

import com.spotify.reaper.service.RepairParameters;
import com.spotify.reaper.service.RingRange;

import org.apache.cassandra.repair.RepairParallelism;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class RepairParametersMapper implements ResultSetMapper<RepairParameters> {
@Override
public RepairParameters map(int index, ResultSet r, StatementContext ctx) throws SQLException {
RingRange range = new RingRange(r.getBigDecimal("start_token").toBigInteger(),
r.getBigDecimal("end_token").toBigInteger());
String[] columnFamilies = (String[]) r.getArray("column_families").getArray();
RepairParallelism repairParallelism =
RepairParallelism.valueOf(r.getString("repair_parallelism"));
return new RepairParameters(range,
r.getString("keyspace_name"),
Sets.newHashSet(columnFamilies),
repairParallelism);
}
}

0 comments on commit 8d89ba8

Please sign in to comment.