Skip to content

Commit

Permalink
Merge pull request #1174 from basho/feature/active-delete
Browse files Browse the repository at this point in the history
Synchronous deletion of small objects

Reviewed-by: shino
  • Loading branch information
borshop committed Jul 7, 2015
2 parents c116e17 + dae708d commit 055b843
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 56 deletions.
9 changes: 7 additions & 2 deletions priv/tools/internal/riak_cs_inspector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,13 @@ get_manifest(RiakcPid, Bucket, Key)->
{UUID, M} <- case MD of
tombstone ->
[{tombstone, {tombstone, {Bucket, Key}}}];
_ ->
binary_to_term(Value)
_V ->
case dict:find(<<"X-Riak-Deleted">>, MD) of
{ok, true} ->
[{tombstone, {tombstone, {Bucket, Key}}}];
_ ->
binary_to_term(Value)
end
end]).

get_gc_manifest(RiakcPid, Bucket, Key)->
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@

{deps_ee, [
{riak_repl_pb_api,".*",{git,"git@github.com:basho/riak_repl_pb_api.git", {tag, "2.1.0"}}},
{riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.1.0-pre1"}}}
{riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.1.0-pre2"}}}
]}.
11 changes: 11 additions & 0 deletions rel/files/riak_cs.schema
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@
hidden
]}.

%% @doc Size threshold of deletion optimization for small objects.
%% Blocks of objects smaller than this size will be synchronously
%% deleted on the fly. If this is used in combination with MDC
%% replication, cluster configuration must be carefully
%% designed. Please consult our documentation for further
%% information. To turn off synchronous deletion, set this as 0.
{mapping, "active_delete_threshold", "riak_cs.active_delete_threshold", [
{default, "0"},
{datatype, bytesize}
]}.

%% == Access statistics ==

%% @doc How often to flush the access stats; integer factor of
Expand Down
38 changes: 38 additions & 0 deletions riak_test/tests/active_delete_test.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(active_delete_test).

%% @doc `riak_test' module for testing active delete situation behavior.

-export([confirm/0]).

confirm() ->
%% 10MB threshold, for 3MB objects are used in cs_suites:run/2
CSConfig = rtcs:cs_config([{active_delete_threshold, 10000000}]),
Setup = rtcs:setup(1, [{cs, CSConfig}]),

%% Just do verify on typical normal case
History = [{cs_suites, run, ["run-1"]}],
{ok, InitialState} = cs_suites:new(Setup),
{ok, EvolvedState} = cs_suites:fold_with_state(InitialState, History),
{ok, _FinalState} = cs_suites:cleanup(EvolvedState),

rtcs:pass().
9 changes: 8 additions & 1 deletion src/riak_cs_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
stanchion/0,
use_2i_for_storage_calc/0,
detailed_storage_calc/0,
quota_modules/0
quota_modules/0,
active_delete_threshold/0
]).

%% Timeouts hitting Riak
Expand Down Expand Up @@ -416,6 +417,12 @@ detailed_storage_calc() ->
quota_modules() ->
get_env(riak_cs, quota_modules, []).

%% @doc smaller than block size recommended, to avoid multiple DELETE
%% calls to riak per single manifest deletion.
-spec active_delete_threshold() -> non_neg_integer().
active_delete_threshold() ->
get_env(riak_cs, active_delete_threshold, 0).

%% ===================================================================
%% ALL Timeouts hitting Riak
%% ===================================================================
Expand Down
57 changes: 34 additions & 23 deletions src/riak_cs_delete_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@
uuid :: binary(),
manifest :: lfs_manifest(),
riak_client :: riak_client(),
gc_worker_pid :: pid(),
%% Key in GC bucket which this manifest belongs to.
finished_callback :: fun(),
%% For GC, Key in GC bucket which this manifest belongs to.
%% For active deletion, not used.
%% Set only once at init and unchanged. Used only for logs.
gc_key :: binary(),
delete_blocks_remaining :: ordsets:ordset({binary(), integer()}),
unacked_deletes=ordsets:new() :: ordsets:ordset(integer()),
all_delete_workers=[] :: list(pid()),
free_deleters = ordsets:new() :: ordsets:ordset(pid()),
deleted_blocks = 0 :: non_neg_integer(),
total_blocks = 0 :: non_neg_integer()}).
total_blocks = 0 :: non_neg_integer(),
cleanup_manifests = true :: boolean()}).

-type state() :: #state{}.

Expand All @@ -80,18 +82,21 @@ block_deleted(Pid, Response) ->
%% gen_fsm callbacks
%% ====================================================================

init([BagId, {UUID, Manifest}, GCWorkerPid, GCKey, _Options]) ->
init([BagId, {UUID, Manifest}, FinishedCallback, GCKey, Options]) ->
{Bucket, Key} = Manifest?MANIFEST.bkey,
{ok, RcPid} = riak_cs_riak_client:checkout(),
ok = riak_cs_riak_client:set_manifest_bag(RcPid, BagId),
ok = riak_cs_riak_client:set_manifest(RcPid, Manifest),
CleanupManifests = proplists:get_value(cleanup_manifests,
Options, true),
State = #state{bucket=Bucket,
key=Key,
manifest=Manifest,
uuid=UUID,
riak_client=RcPid,
gc_worker_pid=GCWorkerPid,
gc_key=GCKey},
finished_callback=FinishedCallback,
gc_key=GCKey,
cleanup_manifests=CleanupManifests},
{ok, prepare, State, 0}.

%% @TODO Make sure we avoid any race conditions here
Expand Down Expand Up @@ -121,15 +126,22 @@ handle_sync_event(_Event, _From, StateName, State) ->
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.

terminate(Reason, _StateName, #state{all_delete_workers=AllDeleteWorkers,
manifest=?MANIFEST{state=ManifestState},
bucket=Bucket,
key=Key,
uuid=UUID,
riak_client=RcPid} = State) ->
manifest_cleanup(ManifestState, Bucket, Key, UUID, RcPid),
terminate(Reason, _StateName,
#state{all_delete_workers=AllDeleteWorkers,
manifest=?MANIFEST{state=ManifestState},
bucket=Bucket,
key=Key,
uuid=UUID,
riak_client=RcPid,
finished_callback=FinishedCallback,
cleanup_manifests=CleanupManifests} = State) ->
if CleanupManifests ->
manifest_cleanup(ManifestState, Bucket, Key, UUID, RcPid);
true ->
noop
end,
_ = [riak_cs_block_server:stop(P) || P <- AllDeleteWorkers],
notify_gc_worker(Reason, State),
FinishedCallback(notification_msg(Reason, State)),
ok.

code_change(_OldVsn, StateName, State, _Extra) ->
Expand Down Expand Up @@ -240,18 +252,17 @@ maybe_delete_blocks(State=#state{bucket=Bucket,
free_deleters=NewFreeDeleters,
delete_blocks_remaining=NewDeleteBlocksRemaining}).

-spec notify_gc_worker(term(), state()) -> term().
notify_gc_worker(Reason, State) ->
gen_fsm:sync_send_event(State#state.gc_worker_pid,
notification_msg(Reason, State),
infinity).

-spec notification_msg(term(), state()) -> {pid(),
{ok, {non_neg_integer(), non_neg_integer()}} |
{error, term()}}.
notification_msg(normal, #state{deleted_blocks = DeletedBlocks,
total_blocks = TotalBlocks}) ->
{self(), {ok, {DeletedBlocks, TotalBlocks}}};
notification_msg(normal, #state{
bucket=Bucket,
key=Key,
uuid=UUID,
deleted_blocks = DeletedBlocks,
total_blocks = TotalBlocks}) ->
Reply = {ok, {Bucket, Key, UUID, DeletedBlocks, TotalBlocks}},
{self(), Reply};
notification_msg(Reason, _State) ->
{self(), {error, Reason}}.

Expand Down
122 changes: 103 additions & 19 deletions src/riak_cs_gc.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -181,26 +181,46 @@ gc_specific_manifests(UUIDsToMark, RiakObject, Bucket, Key, RcPid) ->
{error, term()} | {ok, riakc_obj:riakc_obj()}.
handle_mark_as_pending_delete({ok, RiakObject}, Bucket, Key, UUIDsToMark, RcPid) ->
Manifests = riak_cs_manifest:manifests_from_riak_object(RiakObject),
PDManifests = riak_cs_manifest_utils:manifests_to_gc(UUIDsToMark, Manifests),
MoveResult = move_manifests_to_gc_bucket(PDManifests, RcPid),
PDUUIDs = [UUID || {UUID, _} <- PDManifests],
handle_move_result(MoveResult, RiakObject, Bucket, Key, PDUUIDs, RcPid);
PDManifests0 = riak_cs_manifest_utils:manifests_to_gc(UUIDsToMark, Manifests),
{ToGC, DeletedUUIDs} =
case riak_cs_config:active_delete_threshold() of
Threshold when is_integer(Threshold) andalso Threshold > 0 ->
%% We do synchronous delete after it is marked
%% pending_delete, to reduce the possibility where
%% concurrent requests find active manifest (UUID) and
%% go find deleted blocks resulting notfound stuff.
%% However, there are still corner cases where
%% concurrent requests interleaves between marking
%% pending_delete here and deleting blocks, like:
%%
%% 1. Request A refers to a manifest finding active UUID x
%% 2. Request B deletes an object marking active UUID x as pending_delete
%% 3. Request B deletes blocks of UUID x according to this synchronous delete -> ok
%% 4. Request A refers to blocks pointed by UUID x -> notfound
%%
%% Manifests with blocks deleted here, have
%% `scheduled_delete' state here. They won't be
%% collected by garbage collector, as they are not
%% stored in GC bucket. Instead they will be collected
%% in `riak_cs_manifest_utils:prune/1' invoked via GET
%% object, after leeway period has passed.
maybe_delete_small_objects(PDManifests0, RcPid, Threshold);
_ ->
{PDManifests0, []}
end,

case move_manifests_to_gc_bucket(ToGC, RcPid) of
ok ->
PDUUIDs = [UUID || {UUID, _} <- ToGC],
mark_as_scheduled_delete(PDUUIDs ++ DeletedUUIDs, RiakObject, Bucket, Key, RcPid);
{error, _} = Error ->
Error
end;

handle_mark_as_pending_delete({error, _Error}=Error, _Bucket, _Key, _UUIDsToMark, _RcPid) ->
_ = lager:warning("Failed to mark as pending_delete, reason: ~p", [Error]),
Error.

%% @private
-spec handle_move_result(ok | {error, term()},
riakc_obj:riakc_obj(),
binary(), binary(),
[binary()],
riak_client()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
handle_move_result(ok, RiakObject, Bucket, Key, PDUUIDs, RcPid) ->
mark_as_scheduled_delete(PDUUIDs, RiakObject, Bucket, Key, RcPid);
handle_move_result({error, _Reason}=Error, _RiakObject, _Bucket, _Key, _PDUUIDs, _RcPid) ->
Error.

%% @doc Return the number of seconds to wait after finishing garbage
%% collection of a set of files before starting the next.
-spec gc_interval() -> non_neg_integer() | infinity.
Expand Down Expand Up @@ -334,7 +354,7 @@ mark_as_pending_delete(UUIDsToMark, RiakObject, Bucket, Key, RcPid) ->

%% @doc Mark a list of manifests as `scheduled_delete' based upon the
%% UUIDs specified.
-spec mark_as_scheduled_delete([binary()], riakc_obj:riakc_obj(), binary(), binary(), riak_client()) ->
-spec mark_as_scheduled_delete([cs_uuid()], riakc_obj:riakc_obj(), binary(), binary(), riak_client()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
mark_as_scheduled_delete(UUIDsToMark, RiakObject, Bucket, Key, RcPid) ->
mark_manifests(RiakObject, Bucket, Key, UUIDsToMark,
Expand All @@ -359,7 +379,71 @@ mark_manifests(RiakObject, Bucket, Key, UUIDsToMark, ManiFunction, RcPid) ->
%% with vector clock. This allows us to do a PUT
%% again without having to re-retrieve the object
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
riak_cs_pbc:put(ManifestPbc, UpdObj, [return_body], riak_cs_config:put_gckey_timeout()).
riak_cs_pbc:put(ManifestPbc, UpdObj, [return_body],
riak_cs_config:put_gckey_timeout()). %% <= bug: put_manifest_timeout should be used here

-spec maybe_delete_small_objects([cs_uuid_and_manifest()], riak_client(), non_neg_integer()) ->
{[cs_uuid_and_manifest()], [cs_uuid()]}.
maybe_delete_small_objects(Manifests, RcPid, Threshold) ->
{ok, BagId} = riak_cs_riak_client:get_manifest_bag(RcPid),
DelFun= fun({UUID, Manifest = ?MANIFEST{state=pending_delete,
content_length=ContentLength}},
{Survivors, UUIDsToDelete})
when ContentLength < Threshold ->
%% actually this won't be scheduled :P
UUIDManifest = {UUID, Manifest?MANIFEST{state=scheduled_delete}},
_ = lager:debug("trying to delete ~p at ~p", [UUIDManifest, BagId]),
case try_delete_blocks(BagId, UUIDManifest) of
ok ->
{Survivors, [UUID|UUIDsToDelete]};
{error, _} ->
%% Error logs were handled in the function
{[{UUID, Manifest}|Survivors], UUIDsToDelete}
end;
({UUID, M}, {Survivors, UUIDsToDelete}) ->
ContentLength = M?MANIFEST.content_length,
_ = lager:debug("~p is not being deleted: (CL, threshold)=(~p, ~p)",
[UUID, ContentLength, Threshold]),
{[{UUID, M}|Survivors], UUIDsToDelete}
end,
%% Obtain a new history!
lists:foldl(DelFun, {[], []}, Manifests).

-spec try_delete_blocks(binary(), cs_uuid_and_manifest()) -> ok | {error, term()}.
try_delete_blocks(BagId, {UUID, _} = UUIDManifest) ->
Self = self(),
FinishedFun = fun(Msg) -> Self ! Msg end,
Args = [BagId, UUIDManifest, FinishedFun,
dummy_gc_key_in_sync_delete,
[{cleanup_manifests, false}]],
{ok, Pid} = riak_cs_delete_fsm_sup:start_delete_fsm(node(), Args),
Ref = erlang:monitor(process, Pid),
receive
{Pid, {ok, {_, _, _, TotalBlocks, TotalBlocks}}} ->
%% successfully deleted
erlang:demonitor(Ref, [flush]),
_ = lager:debug("Active deletion of ~p succeeded", [UUID]),
ok;
{Pid, {ok, {_, _, _, NumDeleted, TotalBlocks}}} ->
erlang:demonitor(Ref, [flush]),
_ = lager:debug("Only ~p/~p blocks of ~p deleted",
[NumDeleted, TotalBlocks, UUID]),
{error, partially_deleted};
{Pid, {error, _} = E} ->
erlang:demonitor(Ref, [flush]),
_ = lager:warning("Active deletion of ~p failed. Reason: ~p",
[UUID, E]),
E;
{'DOWN', Ref, _Type, Pid, Reason} ->
_ = lager:warning("Delete FSM for ~p crashed. Reason: ~p", [Reason]),
{error, Reason};
Other ->
%% Handling unknown error, or died unexpectedly
erlang:demonitor(Ref, [flush]),
_ = lager:error("Active deletion failed. Reason: ~p", [Other]),
{error, Other}
end.


%% @doc Copy data for a list of manifests to the
%% `riak-cs-gc' bucket to schedule them for deletion.
Expand Down
9 changes: 6 additions & 3 deletions src/riak_cs_gc_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ initiating_file_delete(continue, ?STATE{batch=[CurrentFileSetKey | _],
%% This is because one riak client process is assumed to be used for
%% blocks in single bag. It's possible to use one riak client process
%% for multiple block bags, but it introduce complexity of mutation.
Args = [BagId, Manifest, self(), CurrentFileSetKey, []],
Self = self(),
Args = [BagId, Manifest,
fun(Msg) -> gen_fsm:sync_send_event(Self, Msg, infinity) end,
CurrentFileSetKey, []],
%% The delete FSM is hard-coded to send a sync event to our registered
%% name upon terminate(), so we do not have to pass our pid to it
%% in order to get a reply.
Expand Down Expand Up @@ -270,7 +273,7 @@ ok_reply(NextState, NextStateData) ->
%% Refactor TODO:
%% 1. delete_fsm_pid=undefined is desirable in both ok & error cases?
%% 2. It's correct to *not* change pause_state?
handle_delete_fsm_reply({ok, {TotalBlocks, TotalBlocks}},
handle_delete_fsm_reply({ok, {_, _, _, TotalBlocks, TotalBlocks}},
?STATE{current_files=[CurrentManifest | RestManifests],
current_fileset=FileSet,
block_count=BlockCount} = State) ->
Expand All @@ -280,7 +283,7 @@ handle_delete_fsm_reply({ok, {TotalBlocks, TotalBlocks}},
current_fileset=UpdFileSet,
current_files=RestManifests,
block_count=BlockCount+TotalBlocks};
handle_delete_fsm_reply({ok, {NumDeleted, _TotalBlocks}},
handle_delete_fsm_reply({ok, {_, _, _, NumDeleted, _TotalBlocks}},
?STATE{current_files=[_CurrentManifest | RestManifests],
block_count=BlockCount} = State) ->
ok = continue(),
Expand Down
Loading

0 comments on commit 055b843

Please sign in to comment.