From 51d2f83e6c53d6d75d0220a6c4283a1a5b8cf2b9 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Thu, 16 Apr 2015 22:16:24 +0900 Subject: [PATCH 1/3] Offline deletion tool that searches and collects stale blocks --- priv/tools/internal/offline_delete.erl | 167 +++++++++++++++++++ priv/tools/internal/select_gc_bucket.erl | 204 +++++++++++++++++++++++ 2 files changed, 371 insertions(+) create mode 100644 priv/tools/internal/offline_delete.erl create mode 100644 priv/tools/internal/select_gc_bucket.erl diff --git a/priv/tools/internal/offline_delete.erl b/priv/tools/internal/offline_delete.erl new file mode 100644 index 000000000..df5c1afbf --- /dev/null +++ b/priv/tools/internal/offline_delete.erl @@ -0,0 +1,167 @@ +#!/usr/bin/env escript + +%% --------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% --------------------------------------------------------------------- + +-module(offline_delete). + +-compile(export_all). +-mode(compile). + +%% @doc This is an offline deletion script that'll directly opens +%% bitcask files and reads some file where keys and partitions which +%% should be deleted are written, and then delete them, without +%% bothering KV. +%% +%% Note: make sure you remove AAE tree after this script was run, and +%% turn off AAE on other nodes that's running on the cluster. + +main(["--dry-run", "--old-format", BitcaskDir, BlocksListFile]) -> + offline_delete(BitcaskDir, BlocksListFile, true, true); +main(["--dry-run", BitcaskDir, BlocksListFile]) -> + offline_delete(BitcaskDir, BlocksListFile, true, false); +main(["--old-format", BitcaskDir, BlocksListFile]) -> + offline_delete(BitcaskDir, BlocksListFile, false, true); +main([BitcaskDir, BlocksListFile]) -> + offline_delete(BitcaskDir, BlocksListFile, false, false); +main(_) -> + io:format(standard_error, + "options: [--dry-run][--old-format] ~n" + "\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n", []). + +-spec open_all_bitcask(filename:filename()) -> + orddict:orddict(non_neg_integer(), reference()). +open_all_bitcask(BitcaskDir) -> + {ok, List} = file:list_dir(BitcaskDir), + Result = lists:map(fun(File) -> + Filename = filename:join(BitcaskDir, File), + case bitcask:open(Filename, [read_write]) of + Ref when is_reference(Ref) -> + {list_to_integer(File), Ref}; + Other -> + error({File, Other}) + end + end, List), + orddict:from_list(Result). + +-spec close_all_bitcask(orddict:orddict(non_neg_integer(), reference())) -> ok. +close_all_bitcask(Bitcasks) -> + orddict:map(fun(_, Ref) -> + bitcask:close(Ref) + end, Bitcasks). + +%% New bitcask 1.7 format (Riak 2.0 or later) +-define(VERSION_1, 1). +-define(VERSION_BYTE, ?VERSION_1). + +make_sure(Dir) -> + io:format(standard_error, + "\033[31m[Warning]\033\[0m~n" + "Make sure any Riak process using '~s' is not running " + "or your data may corrupt.~n", [Dir]), + "y\n" = io:get_line("Accept the terms of conditions? [y/N] "). + +offline_delete(BitcaskDir, BlocksListFile, DryRun, OldFormat) -> + make_sure(BitcaskDir), + {ok, Fd} = file:open(BlocksListFile, [read]), + BC = open_all_bitcask(BitcaskDir), + io:format(standard_error, "~p bitcask directories at ~s opened.~n", + [length(BC), BitcaskDir]), + BKVersion = case OldFormat of + false -> ?VERSION_1; + true -> 0 + end, + io:format(standard_error, "Using bitcask key version: ~p.~n", + [BKVersion]), + {ok, Deleted} = for_each_line(Fd, BC, DryRun, 0, BKVersion), + %% io:format(standard_error, "~p~n", [BC]), + io:format(standard_error, "~p blocks at ~s was deleted" + " (dry run: ~p).~n", + [Deleted, BitcaskDir, DryRun]), + close_all_bitcask(BC), + ok = file:close(Fd). + +for_each_line(Fd, BC, DryRun, Count, BKVersion) -> + case Count rem 1000 of + 500 -> + io:format(standard_error, + "~p blocks has been deleted.~n", + [Count]); + _ -> + noop + end, + case file:read_line(Fd) of + {ok, Line} -> + Tokens = string:tokens(Line, "\t \n"), + [V1, V2, V3, B, K, _UUIDStr, _SeqNo] = Tokens, + Bucket = mochihex:to_bin(B), + Key = mochihex:to_bin(K), + %% io:format("trying ~p~n", [{list_to_integer(V1), + %% list_to_integer(V2), + %% list_to_integer(V3), + %% UUIDStr, + %% list_to_integer(SeqNo)}]), + C0 = maybe_delete(BC, list_to_integer(V1), Bucket, Key, DryRun, BKVersion), + C1 = maybe_delete(BC, list_to_integer(V2), Bucket, Key, DryRun, BKVersion), + C2 = maybe_delete(BC, list_to_integer(V3), Bucket, Key, DryRun, BKVersion), + for_each_line(Fd, BC, DryRun, Count+C0+C1+C2, BKVersion); + eof -> + {ok, Count}; + {error, Reason} -> + io:format(standard_error, "Error: ~p~n", Reason) + end. + +maybe_delete(BC, Idx, Bucket, Key, DryRun, BKVersion) -> + case orddict:find(Idx, BC) of + {ok, Bitcask} -> + BitcaskKey = make_bk(BKVersion, Bucket, Key), + case (case DryRun of + true -> + bitcask:get(Bitcask, BitcaskKey); + false -> + bitcask:delete(Bitcask, BitcaskKey) + end) of + {ok, _Value} -> + 1; + ok -> + 1; + Error -> + io:format(standard_error, "error: ~p~n", [Error]), + 0 + end; + error -> + %% Key does not exist here. Ignore. + 0 + end. + +%% Old bitcask format (Riak 1.4 or before) +make_bk(0, Bucket, Key) -> + term_to_binary({Bucket, Key}); +%% New bitcask 1.7 format (Riak 2.0 or later) +make_bk(1, {Type, Bucket}, Key) -> + TypeSz = size(Type), + BucketSz = size(Bucket), + <>; +%% New bitcask 1.7 format (Riak 2.0 or later) +make_bk(1, Bucket, Key) -> + BucketSz = size(Bucket), + <>. diff --git a/priv/tools/internal/select_gc_bucket.erl b/priv/tools/internal/select_gc_bucket.erl new file mode 100644 index 000000000..376fdb31c --- /dev/null +++ b/priv/tools/internal/select_gc_bucket.erl @@ -0,0 +1,204 @@ +%% #!/usr/bin/env escript + +%% --------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% --------------------------------------------------------------------- + +-module(select_gc_bucket). + +-compile(export_all). +-mode(compile). + +-include_lib("riak_cs/include/riak_cs.hrl"). + +-record(state, + {logger :: file:io_device(), + ring_size = 64 :: non_neg_integer(), + threshold :: non_neg_integer() | undefined, + keycount = 0 :: non_neg_integer(), + manifestcount = 0 :: non_neg_integer(), + total_manifestcount = 0 :: non_neg_integer(), + blockcount = 0 :: non_neg_integer() + }). + +options() -> + [{host, $h, "host", {string, "localhost"}, "Address of Riak"}, + {port, $p, "port", {integer, 8087}, "Port number of Riak PB"}, + {leeway, $l, "leeway", {integer, 24*60*60}, "Specify leeway seconds"}, + {ring_size, $r, "ring-size", {integer, 64}, "Ring Size"}, + {threshold, $t, "threshold", {integer, 5*1024*1024}, "Threshold"}, + {start, $s, "start", {string, "19700101"}, "Start of the seek period"}, + {'end', $e, "end", {string, "yesterday"}, "End of the seek period"}, + {output, $o, "output", {string, "/tmp/tmp.txt"}, + "Output file (absolute path)"}, + {timeout, $w, "timeout", {integer, 6}, "Timeout in seconds"}]. + +pgv(Key,Proplist) -> + case proplists:get_value(Key, Proplist) of + undefined -> getopt:usage(options(), "riak-cs escript me"), halt(-1); + Value -> Value + end. + +maybe_date("today") -> + list_to_binary(integer_to_list(riak_cs_gc:timestamp())); +maybe_date("yesterday") -> + list_to_binary(integer_to_list(riak_cs_gc:timestamp() - 86400)); +maybe_date([Y0,Y1,Y2,Y3,M0,M1,D0,D1]) -> + DateTime = {{list_to_integer([Y0,Y1,Y2,Y3]), + list_to_integer([M0,M1]), + list_to_integer([D0,D1])}, + {0,0,0}}, + Sec = calendar:datetime_to_gregorian_seconds(DateTime) - 62167219200, + list_to_binary(integer_to_list(Sec)). + +main(Args) -> + case getopt:parse(options(), Args) of + {ok, {Options, _}} -> + Host = pgv(host, Options), + Port = pgv(port, Options), + RingSize = pgv(ring_size, Options), + Threshold = pgv(threshold, Options), + %% TODO: make this configurable, take leeway into account + StartKey = maybe_date(pgv(start, Options)), + EndKey = maybe_date(pgv('end', Options)), + OutputFile = pgv(output, Options), + Timeout = pgv(timeout, Options) * 1000, + State = #state{ring_size = RingSize, threshold = Threshold}, + work(Host, Port, StartKey, EndKey, Timeout, OutputFile, State); + _E -> + getopt:usage(options(), "select_gc_bucket.erl") + end. + +work(Host, Port, StartKey, EndKey, Timeout, OutputFile, State0) -> + io:format(standard_error, "Connecting ~p:~p~n", [Host, Port]), + Opts = [%% {max_results, 1000}, + {start_key, StartKey}, + {end_key, EndKey}, + {timeout, Timeout}], + {ok, Pid} = riakc_pb_socket:start_link(Host, Port), + Options = [write, delayed_write], %, compressed], + {ok, File} = file:open(OutputFile, Options), + State = State0#state{logger=File}, + try + {ok, ReqID} = riakc_pb_socket:cs_bucket_fold(Pid, ?GC_BUCKET, Opts), + handle_fold_results(Pid, ReqID, State), + io:format(standard_error, + "Finished!~n" + "Next action is to run offline delete. Use Riak command like this:~n" + "$ riak escript /usr/lib/riak-cs/lib/riak-cs_2.0.0/priv/tools/internal/offline_delete.erl --dry-run /var/lib/riak/bitcask ~s~n", + [OutputFile]) + after + riakc_pb_socket:stop(Pid), + file:close(File) + end. + +handle_fold_results(Pid, ReqID, State = #state{total_manifestcount=TMC, + keycount=KC, + manifestcount=MC, + blockcount=BC}) -> + receive + {ReqID, {ok, Objs}} -> + Nums = [begin + ManifestSet = riak_cs_gc:decode_and_merge_siblings( + Obj, twop_set:new()), + ManifestList = twop_set:to_list(ManifestSet), + {MK, MB} = lists:foldl(fun(Manifest, {MK0, MB0}) -> + {MK1, MB1} = handle_manifest(Manifest, State), + {MK0+MK1, MB0+MB1} + end, {0, 0}, ManifestList), + {length(ManifestList), MK, MB} + end + || Obj <- Objs], + %% io:format(standard_error, "============================== ~p gc keys found.~n", [length(Objs0)]), + {A,B,C} = lists:foldl(fun({A0,B0,C0},{A1,B1,C1}) -> {A0+A1, B0+B1, C0+C1} end, {0, 0, 0}, Nums), + handle_fold_results(Pid, ReqID, + State#state{total_manifestcount=A+TMC, + keycount=KC+length(Objs), + manifestcount=MC+B, + blockcount=BC+C}); + %% {ReqID, {done, Other}} when is_list(Other) -> + %% handle_fold_results(Pid, ReqID, Other, State); + {ReqID, {done, _}} -> + io:format(standard_error, + "keycount: ~p, total_manifestcount ~p, manifestcount ~p, blockcount ~p~n", + [KC, TMC, MC, BC]), + done; + Other -> + io:format(standard_error, "Boom!!! Other; ~p", [Other]), + error + end. + +%% => {matched_keys, matched_blocks} +handle_manifest({_UUID, + ?MANIFEST{content_length=ContentLength} = _Manifest}, + #state{threshold=Threshold} = _State) + when ContentLength < Threshold -> + {0, 0}; +handle_manifest({_UUID, ?MANIFEST{bkey=BKey={Bucket,_}, + uuid=UUID, + content_length=ContentLength, + state=pending_delete} = M}, + _State = #state{ring_size=RingSize, + logger=File} + ) -> + io:format(standard_error, "~p (~p) ~p~n", [BKey, mochihex:to_hex(UUID), ContentLength]), + BlockSequences = riak_cs_lfs_utils:block_sequences_for_manifest(M), + Count = ordsets:fold(fun({UUID1, SeqNo}, Count0) -> + BK = {B,K} = full_bkey(Bucket, dummy, UUID1, SeqNo), + VNodes = [[integer_to_list(VNode), $\t] || VNode <- vnode_ids(BK, RingSize, 3)], + %% Partitions, UUID, SeqNo + file:write(File, [VNodes, $\t, + mochihex:to_hex(B), $\t, + mochihex:to_hex(K), $\t, + mochihex:to_hex(UUID1), $\t, + integer_to_list(SeqNo), $\n]), + Count0 + 1 + end, 0, BlockSequences), + {1, Count}. + +%% From riak_cs +full_bkey(Bucket, Key, UUID, BlockId) -> + PrefixedBucket = riak_cs_utils:to_bucket_name(blocks, Bucket), + FullKey = riak_cs_lfs_utils:block_name(Key, UUID, BlockId), + {PrefixedBucket, FullKey}. + +-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space + +%% (hash({B,K}) div Inc) + +vnode_id(BKey, RingSize) -> + <> = key_of(BKey), + Inc = ?RINGTOP div RingSize, + %% io:format(standard_error, "RingSize ~p, RINGTOP ~p Inc ~p ~n", [RingSize, ?RINGTOP, Inc]), + PartitionId = ((HashKey div Inc) + 1) rem RingSize, + PartitionId * Inc. + +vnode_ids(BKey, RingSize, NVal) -> + <> = key_of(BKey), + Inc = ?RINGTOP div RingSize, + %% io:format(standard_error, "RingSize ~p, RINGTOP ~p Inc ~p ~n", [RingSize, ?RINGTOP, Inc]), + PartitionId = ((HashKey div Inc) + 1) rem RingSize, + [((PartitionId+N) rem RingSize) * Inc || N <- lists:seq(0, NVal-1)]. + +%% From riak_core +sha(Bin) -> + crypto:hash(sha, Bin). + +key_of(ObjectName) -> + sha(term_to_binary(ObjectName)). From 2453cefec9d112c38decf086143f11e8dd381150 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 28 Apr 2015 21:23:43 +0900 Subject: [PATCH 2/3] Remove unused option: leeway in select_gc_bucket.erl --- priv/tools/internal/select_gc_bucket.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/priv/tools/internal/select_gc_bucket.erl b/priv/tools/internal/select_gc_bucket.erl index 376fdb31c..86c78390b 100644 --- a/priv/tools/internal/select_gc_bucket.erl +++ b/priv/tools/internal/select_gc_bucket.erl @@ -40,7 +40,6 @@ options() -> [{host, $h, "host", {string, "localhost"}, "Address of Riak"}, {port, $p, "port", {integer, 8087}, "Port number of Riak PB"}, - {leeway, $l, "leeway", {integer, 24*60*60}, "Specify leeway seconds"}, {ring_size, $r, "ring-size", {integer, 64}, "Ring Size"}, {threshold, $t, "threshold", {integer, 5*1024*1024}, "Threshold"}, {start, $s, "start", {string, "19700101"}, "Start of the seek period"}, From 885f839c4247eb87e478a5e0a3b2cb967d2f559a Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Thu, 30 Apr 2015 12:42:37 +0900 Subject: [PATCH 3/3] Update to the comments --- priv/tools/internal/offline_delete.erl | 7 ++++--- priv/tools/internal/select_gc_bucket.erl | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/priv/tools/internal/offline_delete.erl b/priv/tools/internal/offline_delete.erl index df5c1afbf..946abb818 100644 --- a/priv/tools/internal/offline_delete.erl +++ b/priv/tools/internal/offline_delete.erl @@ -43,8 +43,9 @@ main([BitcaskDir, BlocksListFile]) -> offline_delete(BitcaskDir, BlocksListFile, false, false); main(_) -> io:format(standard_error, - "options: [--dry-run][--old-format] ~n" - "\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n", []). + "options: [--dry-run] [--old-format] ~n" + "\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n" + "It'd be better if all hinted handoff have been finished before stopping Riak.~n", []). -spec open_all_bitcask(filename:filename()) -> orddict:orddict(non_neg_integer(), reference()). @@ -75,7 +76,7 @@ make_sure(Dir) -> io:format(standard_error, "\033[31m[Warning]\033\[0m~n" "Make sure any Riak process using '~s' is not running " - "or your data may corrupt.~n", [Dir]), + "or your data may corrupt.~n", [filename:absname(Dir)]), "y\n" = io:get_line("Accept the terms of conditions? [y/N] "). offline_delete(BitcaskDir, BlocksListFile, DryRun, OldFormat) -> diff --git a/priv/tools/internal/select_gc_bucket.erl b/priv/tools/internal/select_gc_bucket.erl index 86c78390b..f68389fc4 100644 --- a/priv/tools/internal/select_gc_bucket.erl +++ b/priv/tools/internal/select_gc_bucket.erl @@ -160,9 +160,10 @@ handle_manifest({_UUID, ?MANIFEST{bkey=BKey={Bucket,_}, BlockSequences = riak_cs_lfs_utils:block_sequences_for_manifest(M), Count = ordsets:fold(fun({UUID1, SeqNo}, Count0) -> BK = {B,K} = full_bkey(Bucket, dummy, UUID1, SeqNo), - VNodes = [[integer_to_list(VNode), $\t] || VNode <- vnode_ids(BK, RingSize, 3)], + VNodes = [[integer_to_list(VNode), $\t] + || VNode <- vnode_ids(BK, RingSize, 3)], %% Partitions, UUID, SeqNo - file:write(File, [VNodes, $\t, + file:write(File, [VNodes, mochihex:to_hex(B), $\t, mochihex:to_hex(K), $\t, mochihex:to_hex(UUID1), $\t,