Skip to content

Commit

Permalink
Merge pull request #1180 from basho/feature/stats-at-wm-common
Browse files Browse the repository at this point in the history
Add latency stats items to S3 API and velvet calls  [JIRA: RCS-220]

Reviewed-by: kuenishi
  • Loading branch information
borshop committed Jul 10, 2015
2 parents 055b843 + 66f4012 commit eff74e5
Show file tree
Hide file tree
Showing 22 changed files with 357 additions and 341 deletions.
12 changes: 12 additions & 0 deletions include/riak_cs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@
auth_module :: atom(),
response_module :: atom(),
policy_module :: atom(),
%% Key for API rate and latency stats.
%% If `stats_prefix' or `stats_key' is `no_stats', no stats
%% will be gathered by riak_cs_wm_common.
%% The prefix is defined by `stats_prefix()' callback of sub-module.
%% If sub-module provides only `stats_prefix' (almost the case),
%% stats key is [Prefix, HttpMethod]. Otherwise, sum-module
%% can set specific `stats_key' by any callback that returns
%% this context.
stats_prefix = no_stats :: atom(),
stats_key=prefix_and_method :: prefix_and_method |
no_stats |
riak_cs_stats:key(),
local_context :: term(),
api :: atom()
}).
Expand Down
2 changes: 1 addition & 1 deletion riak_test/src/rtcs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ riakcscmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s ~s", [riakcs_binpath(Path, N), Cmd])).

riakcs_switchcmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-stanchion ~s", [riakcs_binpath(Path, N), Cmd])).
lists:flatten(io_lib:format("~s-admin stanchion ~s", [riakcs_binpath(Path, N), Cmd])).

riakcs_gccmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-admin gc ~s", [riakcs_binpath(Path, N), Cmd])).
Expand Down
2 changes: 1 addition & 1 deletion riak_test/tests/stats_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ query_stats(UserConfig, Port) ->

confirm_initial_stats(StatData) ->
%% Check for values for all meters to be 0 when system is initially started
?assertEqual(132, length(StatData)),
?assertEqual(614, length(StatData)),
[?assert(proplists:is_defined(StatType, StatData))
|| StatType <- [<<"block_gets">>,
<<"block_puts">>,
Expand Down
81 changes: 36 additions & 45 deletions src/riak_cs_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ create_bucket(User, UserObj, Bucket, BagId, ACL, RcPid) ->
User,
UserObj,
create,
[bucket, create],
[velvet, create_bucket],
RcPid);
false ->
{error, invalid_bucket_name}
Expand Down Expand Up @@ -183,7 +183,7 @@ delete_bucket(User, UserObj, Bucket, RcPid) ->
User,
UserObj,
delete,
[bucket, delete],
[velvet, delete_bucket],
RcPid);
false ->
LocalError
Expand Down Expand Up @@ -294,7 +294,7 @@ set_bucket_acl(User, UserObj, Bucket, ACL, RcPid) ->
User,
UserObj,
update_acl,
[bucket, put_acl],
[velvet, set_bucket_acl],
RcPid).

%% @doc Set the policy for a bucket. Existing policy is only overwritten.
Expand All @@ -306,7 +306,7 @@ set_bucket_policy(User, UserObj, Bucket, PolicyJson, RcPid) ->
User,
UserObj,
update_policy,
[bucket, put_policy],
[velvet, set_bucket_policy],
RcPid).

%% @doc Set the policy for a bucket. Existing policy is only overwritten.
Expand All @@ -318,10 +318,10 @@ delete_bucket_policy(User, UserObj, Bucket, RcPid) ->
User,
UserObj,
delete_policy,
[bucket, put_policy],
[velvet, delete_bucket_policy],
RcPid).

% @doc fetch moss.bucket and return acl and policy
%% @doc fetch moss.bucket and return acl and policy
-spec get_bucket_acl_policy(binary(), atom(), riak_client()) ->
{acl(), policy()} | {error, term()}.
get_bucket_acl_policy(Bucket, PolicyMod, RcPid) ->
Expand Down Expand Up @@ -389,7 +389,7 @@ bucket_empty(Bucket, RcPid) ->
-spec bucket_empty_handle_list_keys(riak_client(), binary(),
{ok, list()} |
{error, term()}) ->
boolean().
boolean().
bucket_empty_handle_list_keys(RcPid, Bucket, {ok, Keys}) ->
AnyPred = bucket_empty_any_pred(RcPid, Bucket),
%% `lists:any/2' will break out early as soon
Expand All @@ -399,7 +399,7 @@ bucket_empty_handle_list_keys(_RcPid, _Bucket, _Error) ->
false.

-spec bucket_empty_any_pred(riak_client(), Bucket :: binary()) ->
fun((Key :: binary()) -> boolean()).
fun((Key :: binary()) -> boolean()).
bucket_empty_any_pred(RcPid, Bucket) ->
fun (Key) ->
riak_cs_utils:key_exists(RcPid, Bucket, Key)
Expand All @@ -424,7 +424,7 @@ fetch_bucket_object(BucketName, RcPid) ->

%% @doc Fetches the bucket object, even it is marked as free
-spec fetch_bucket_object_raw(binary(), riak_client()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
{ok, riakc_obj:riakc_obj()} | {error, term()}.
fetch_bucket_object_raw(BucketName, RcPid) ->
case riak_cs_riak_client:get_bucket(RcPid, BucketName) of
{ok, Obj} ->
Expand All @@ -438,16 +438,16 @@ fetch_bucket_object_raw(BucketName, RcPid) ->
-spec maybe_log_sibling_warning(binary(), list(riakc_obj:value())) -> ok.
maybe_log_sibling_warning(Bucket, Values) when length(Values) > 1 ->
_ = lager:warning("The bucket ~s has ~b siblings that may need resolution.",
[binary_to_list(Bucket), length(Values)]),
[binary_to_list(Bucket), length(Values)]),
ok;
maybe_log_sibling_warning(_, _) ->
ok.

-spec maybe_log_bucket_owner_error(binary(), list(riakc_obj:value())) -> ok.
maybe_log_bucket_owner_error(Bucket, Values) when length(Values) > 1 ->
_ = lager:error("The bucket ~s has ~b owners."
" This situation requires administrator intervention.",
[binary_to_list(Bucket), length(Values)]),
" This situation requires administrator intervention.",
[binary_to_list(Bucket), length(Values)]),
ok;
maybe_log_bucket_owner_error(_, _) ->
ok.
Expand Down Expand Up @@ -550,23 +550,20 @@ bucket_json(Bucket, BagId, ACL, KeyId) ->
mochijson2:encode({struct, [{<<"bucket">>, Bucket},
{<<"requester">>, list_to_binary(KeyId)},
stanchion_acl_utils:acl_to_json_term(ACL)] ++
BagElement}))).
BagElement}))).

%% @doc Return a bucket record for the specified bucket name.
-spec bucket_record(binary(), bucket_operation()) -> cs_bucket().
bucket_record(Name, Operation) ->
case Operation of
create ->
Action = created;
delete ->
Action = deleted;
_ ->
Action = undefined
end,
Action = case Operation of
create -> created;
delete -> deleted;
_ -> undefined
end,
?RCS_BUCKET{name=binary_to_list(Name),
last_action=Action,
creation_date=riak_cs_wm_utils:iso_8601_datetime(),
modification_time=os:timestamp()}.
last_action=Action,
creation_date=riak_cs_wm_utils:iso_8601_datetime(),
modification_time=os:timestamp()}.

%% @doc Check for and resolve any conflict between
%% a bucket record from a user record sibling and
Expand Down Expand Up @@ -608,21 +605,21 @@ bucket_sorter(?RCS_BUCKET{name=Bucket1},
cleanup_bucket(?RCS_BUCKET{last_action=created}) ->
false;
cleanup_bucket(?RCS_BUCKET{last_action=deleted,
modification_time=ModTime}) ->
modification_time=ModTime}) ->
%% the prune-time is specified in seconds, so we must
%% convert Erlang timestamps to seconds first
NowSeconds = riak_cs_utils:second_resolution_timestamp(os:timestamp()),
ModTimeSeconds = riak_cs_utils:second_resolution_timestamp(ModTime),
(NowSeconds - ModTimeSeconds) >
riak_cs_config:user_buckets_prune_time().
riak_cs_config:user_buckets_prune_time().

%% @doc Determine if an existing bucket from the resolution list
%% should be kept or replaced when a conflict occurs.
-spec keep_existing_bucket(cs_bucket(), cs_bucket()) -> boolean().
keep_existing_bucket(?RCS_BUCKET{last_action=LastAction1,
modification_time=ModTime1},
modification_time=ModTime1},
?RCS_BUCKET{last_action=LastAction2,
modification_time=ModTime2}) ->
modification_time=ModTime2}) ->
if
LastAction1 == LastAction2
andalso
Expand Down Expand Up @@ -661,13 +658,13 @@ resolve_buckets([HeadUserRec | RestUserRecs], Buckets, _KeepDeleted) ->
rcs_user(),
riakc_obj:riakc_obj(),
bucket_operation(),
riak_cs_stats:metric_name(),
riak_cs_stats:key(),
riak_client()) ->
ok |
{error, term()}.
serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatName, RcPid) ->
serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatKey, RcPid) ->
serialized_bucket_op(Bucket, undefined, ACL, User, UserObj,
BucketOp, StatName, RcPid).
BucketOp, StatKey, RcPid).

%% @doc Shared code used when doing a bucket creation or deletion.
-spec serialized_bucket_op(binary(),
Expand All @@ -676,12 +673,13 @@ serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatName, RcPid) ->
rcs_user(),
riakc_obj:riakc_obj(),
bucket_operation(),
riak_cs_stats:metric_name(),
riak_cs_stats:key(),
riak_client()) ->
ok |
{error, term()}.
serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPid) ->
serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatsKey, RcPid) ->
StartTime = os:timestamp(),
_ = riak_cs_stats:inflow(StatsKey),
case riak_cs_config:admin_creds() of
{ok, AdminCreds} ->
BucketFun = bucket_fun(BucketOp,
Expand All @@ -691,29 +689,22 @@ serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPi
User?RCS_USER.key_id,
AdminCreds,
riak_cs_utils:stanchion_data()),
%% Make a call to the bucket request
%% serialization service.
%% Make a call to the request serialization service.
OpResult = BucketFun(),
_ = riak_cs_stats:update_with_start(StatsKey, StartTime, OpResult),
case OpResult of
ok ->
BucketRecord = bucket_record(Bucket, BucketOp),
case update_user_buckets(User, BucketRecord) of
{ok, ignore} when BucketOp == update_acl ->
ok = riak_cs_stats:update_with_start(StatName,
StartTime),
OpResult;
{ok, ignore} ->
OpResult;
{ok, UpdUser} ->
X = riak_cs_user:save_user(UpdUser, UserObj, RcPid),
ok = riak_cs_stats:update_with_start(StatName,
StartTime),
X
riak_cs_user:save_user(UpdUser, UserObj, RcPid)
end;

{error, {error_status, Status, _, ErrorDoc}} ->
handle_stanchion_response(Status, ErrorDoc, BucketOp, Bucket);

{error, _} ->
OpResult
end;
Expand All @@ -726,8 +717,8 @@ serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPi
%% could come up, add branch here. See tests in
%% tests/riak_cs_bucket_test.erl
-spec handle_stanchion_response(200..503, string(), delete|create, binary()) ->
{error, remaining_multipart_upload} |
{error, atom()}.
{error, remaining_multipart_upload} |
{error, atom()}.
handle_stanchion_response(409, ErrorDoc, Op, Bucket)
when Op =:= delete orelse Op =:= create ->

Expand Down
Loading

0 comments on commit eff74e5

Please sign in to comment.