diff --git a/include/riak_cs.hrl b/include/riak_cs.hrl index 8e0296fd9..4b535b7ca 100644 --- a/include/riak_cs.hrl +++ b/include/riak_cs.hrl @@ -91,6 +91,18 @@ auth_module :: atom(), response_module :: atom(), policy_module :: atom(), + %% Key for API rate and latency stats. + %% If `stats_prefix' or `stats_key' is `no_stats', no stats + %% will be gathered by riak_cs_wm_common. + %% The prefix is defined by `stats_prefix()' callback of sub-module. + %% If sub-module provides only `stats_prefix' (almost the case), + %% stats key is [Prefix, HttpMethod]. Otherwise, sum-module + %% can set specific `stats_key' by any callback that returns + %% this context. + stats_prefix = no_stats :: atom(), + stats_key=prefix_and_method :: prefix_and_method | + no_stats | + riak_cs_stats:key(), local_context :: term(), api :: atom() }). diff --git a/riak_test/src/rtcs.erl b/riak_test/src/rtcs.erl index e86f1f61f..1e51ef5d1 100644 --- a/riak_test/src/rtcs.erl +++ b/riak_test/src/rtcs.erl @@ -439,7 +439,7 @@ riakcscmd(Path, N, Cmd) -> lists:flatten(io_lib:format("~s ~s", [riakcs_binpath(Path, N), Cmd])). riakcs_switchcmd(Path, N, Cmd) -> - lists:flatten(io_lib:format("~s-stanchion ~s", [riakcs_binpath(Path, N), Cmd])). + lists:flatten(io_lib:format("~s-admin stanchion ~s", [riakcs_binpath(Path, N), Cmd])). riakcs_gccmd(Path, N, Cmd) -> lists:flatten(io_lib:format("~s-admin gc ~s", [riakcs_binpath(Path, N), Cmd])). diff --git a/riak_test/tests/stats_test.erl b/riak_test/tests/stats_test.erl index 9815d24ce..dad9905df 100644 --- a/riak_test/tests/stats_test.erl +++ b/riak_test/tests/stats_test.erl @@ -72,7 +72,7 @@ query_stats(UserConfig, Port) -> confirm_initial_stats(StatData) -> %% Check for values for all meters to be 0 when system is initially started - ?assertEqual(132, length(StatData)), + ?assertEqual(614, length(StatData)), [?assert(proplists:is_defined(StatType, StatData)) || StatType <- [<<"block_gets">>, <<"block_puts">>, diff --git a/src/riak_cs_bucket.erl b/src/riak_cs_bucket.erl index 811e71d2f..8012df34e 100644 --- a/src/riak_cs_bucket.erl +++ b/src/riak_cs_bucket.erl @@ -78,7 +78,7 @@ create_bucket(User, UserObj, Bucket, BagId, ACL, RcPid) -> User, UserObj, create, - [bucket, create], + [velvet, create_bucket], RcPid); false -> {error, invalid_bucket_name} @@ -183,7 +183,7 @@ delete_bucket(User, UserObj, Bucket, RcPid) -> User, UserObj, delete, - [bucket, delete], + [velvet, delete_bucket], RcPid); false -> LocalError @@ -294,7 +294,7 @@ set_bucket_acl(User, UserObj, Bucket, ACL, RcPid) -> User, UserObj, update_acl, - [bucket, put_acl], + [velvet, set_bucket_acl], RcPid). %% @doc Set the policy for a bucket. Existing policy is only overwritten. @@ -306,7 +306,7 @@ set_bucket_policy(User, UserObj, Bucket, PolicyJson, RcPid) -> User, UserObj, update_policy, - [bucket, put_policy], + [velvet, set_bucket_policy], RcPid). %% @doc Set the policy for a bucket. Existing policy is only overwritten. @@ -318,10 +318,10 @@ delete_bucket_policy(User, UserObj, Bucket, RcPid) -> User, UserObj, delete_policy, - [bucket, put_policy], + [velvet, delete_bucket_policy], RcPid). -% @doc fetch moss.bucket and return acl and policy +%% @doc fetch moss.bucket and return acl and policy -spec get_bucket_acl_policy(binary(), atom(), riak_client()) -> {acl(), policy()} | {error, term()}. get_bucket_acl_policy(Bucket, PolicyMod, RcPid) -> @@ -389,7 +389,7 @@ bucket_empty(Bucket, RcPid) -> -spec bucket_empty_handle_list_keys(riak_client(), binary(), {ok, list()} | {error, term()}) -> - boolean(). + boolean(). bucket_empty_handle_list_keys(RcPid, Bucket, {ok, Keys}) -> AnyPred = bucket_empty_any_pred(RcPid, Bucket), %% `lists:any/2' will break out early as soon @@ -399,7 +399,7 @@ bucket_empty_handle_list_keys(_RcPid, _Bucket, _Error) -> false. -spec bucket_empty_any_pred(riak_client(), Bucket :: binary()) -> - fun((Key :: binary()) -> boolean()). + fun((Key :: binary()) -> boolean()). bucket_empty_any_pred(RcPid, Bucket) -> fun (Key) -> riak_cs_utils:key_exists(RcPid, Bucket, Key) @@ -424,7 +424,7 @@ fetch_bucket_object(BucketName, RcPid) -> %% @doc Fetches the bucket object, even it is marked as free -spec fetch_bucket_object_raw(binary(), riak_client()) -> - {ok, riakc_obj:riakc_obj()} | {error, term()}. + {ok, riakc_obj:riakc_obj()} | {error, term()}. fetch_bucket_object_raw(BucketName, RcPid) -> case riak_cs_riak_client:get_bucket(RcPid, BucketName) of {ok, Obj} -> @@ -438,7 +438,7 @@ fetch_bucket_object_raw(BucketName, RcPid) -> -spec maybe_log_sibling_warning(binary(), list(riakc_obj:value())) -> ok. maybe_log_sibling_warning(Bucket, Values) when length(Values) > 1 -> _ = lager:warning("The bucket ~s has ~b siblings that may need resolution.", - [binary_to_list(Bucket), length(Values)]), + [binary_to_list(Bucket), length(Values)]), ok; maybe_log_sibling_warning(_, _) -> ok. @@ -446,8 +446,8 @@ maybe_log_sibling_warning(_, _) -> -spec maybe_log_bucket_owner_error(binary(), list(riakc_obj:value())) -> ok. maybe_log_bucket_owner_error(Bucket, Values) when length(Values) > 1 -> _ = lager:error("The bucket ~s has ~b owners." - " This situation requires administrator intervention.", - [binary_to_list(Bucket), length(Values)]), + " This situation requires administrator intervention.", + [binary_to_list(Bucket), length(Values)]), ok; maybe_log_bucket_owner_error(_, _) -> ok. @@ -550,23 +550,20 @@ bucket_json(Bucket, BagId, ACL, KeyId) -> mochijson2:encode({struct, [{<<"bucket">>, Bucket}, {<<"requester">>, list_to_binary(KeyId)}, stanchion_acl_utils:acl_to_json_term(ACL)] ++ - BagElement}))). + BagElement}))). %% @doc Return a bucket record for the specified bucket name. -spec bucket_record(binary(), bucket_operation()) -> cs_bucket(). bucket_record(Name, Operation) -> - case Operation of - create -> - Action = created; - delete -> - Action = deleted; - _ -> - Action = undefined - end, + Action = case Operation of + create -> created; + delete -> deleted; + _ -> undefined + end, ?RCS_BUCKET{name=binary_to_list(Name), - last_action=Action, - creation_date=riak_cs_wm_utils:iso_8601_datetime(), - modification_time=os:timestamp()}. + last_action=Action, + creation_date=riak_cs_wm_utils:iso_8601_datetime(), + modification_time=os:timestamp()}. %% @doc Check for and resolve any conflict between %% a bucket record from a user record sibling and @@ -608,21 +605,21 @@ bucket_sorter(?RCS_BUCKET{name=Bucket1}, cleanup_bucket(?RCS_BUCKET{last_action=created}) -> false; cleanup_bucket(?RCS_BUCKET{last_action=deleted, - modification_time=ModTime}) -> + modification_time=ModTime}) -> %% the prune-time is specified in seconds, so we must %% convert Erlang timestamps to seconds first NowSeconds = riak_cs_utils:second_resolution_timestamp(os:timestamp()), ModTimeSeconds = riak_cs_utils:second_resolution_timestamp(ModTime), (NowSeconds - ModTimeSeconds) > - riak_cs_config:user_buckets_prune_time(). + riak_cs_config:user_buckets_prune_time(). %% @doc Determine if an existing bucket from the resolution list %% should be kept or replaced when a conflict occurs. -spec keep_existing_bucket(cs_bucket(), cs_bucket()) -> boolean(). keep_existing_bucket(?RCS_BUCKET{last_action=LastAction1, - modification_time=ModTime1}, + modification_time=ModTime1}, ?RCS_BUCKET{last_action=LastAction2, - modification_time=ModTime2}) -> + modification_time=ModTime2}) -> if LastAction1 == LastAction2 andalso @@ -661,13 +658,13 @@ resolve_buckets([HeadUserRec | RestUserRecs], Buckets, _KeepDeleted) -> rcs_user(), riakc_obj:riakc_obj(), bucket_operation(), - riak_cs_stats:metric_name(), + riak_cs_stats:key(), riak_client()) -> ok | {error, term()}. -serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatName, RcPid) -> +serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatKey, RcPid) -> serialized_bucket_op(Bucket, undefined, ACL, User, UserObj, - BucketOp, StatName, RcPid). + BucketOp, StatKey, RcPid). %% @doc Shared code used when doing a bucket creation or deletion. -spec serialized_bucket_op(binary(), @@ -676,12 +673,13 @@ serialized_bucket_op(Bucket, ACL, User, UserObj, BucketOp, StatName, RcPid) -> rcs_user(), riakc_obj:riakc_obj(), bucket_operation(), - riak_cs_stats:metric_name(), + riak_cs_stats:key(), riak_client()) -> ok | {error, term()}. -serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPid) -> +serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatsKey, RcPid) -> StartTime = os:timestamp(), + _ = riak_cs_stats:inflow(StatsKey), case riak_cs_config:admin_creds() of {ok, AdminCreds} -> BucketFun = bucket_fun(BucketOp, @@ -691,29 +689,22 @@ serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPi User?RCS_USER.key_id, AdminCreds, riak_cs_utils:stanchion_data()), - %% Make a call to the bucket request - %% serialization service. + %% Make a call to the request serialization service. OpResult = BucketFun(), + _ = riak_cs_stats:update_with_start(StatsKey, StartTime, OpResult), case OpResult of ok -> BucketRecord = bucket_record(Bucket, BucketOp), case update_user_buckets(User, BucketRecord) of {ok, ignore} when BucketOp == update_acl -> - ok = riak_cs_stats:update_with_start(StatName, - StartTime), OpResult; {ok, ignore} -> OpResult; {ok, UpdUser} -> - X = riak_cs_user:save_user(UpdUser, UserObj, RcPid), - ok = riak_cs_stats:update_with_start(StatName, - StartTime), - X + riak_cs_user:save_user(UpdUser, UserObj, RcPid) end; - {error, {error_status, Status, _, ErrorDoc}} -> handle_stanchion_response(Status, ErrorDoc, BucketOp, Bucket); - {error, _} -> OpResult end; @@ -726,8 +717,8 @@ serialized_bucket_op(Bucket, BagId, ACL, User, UserObj, BucketOp, StatName, RcPi %% could come up, add branch here. See tests in %% tests/riak_cs_bucket_test.erl -spec handle_stanchion_response(200..503, string(), delete|create, binary()) -> - {error, remaining_multipart_upload} | - {error, atom()}. + {error, remaining_multipart_upload} | + {error, atom()}. handle_stanchion_response(409, ErrorDoc, Op, Bucket) when Op =:= delete orelse Op =:= create -> diff --git a/src/riak_cs_stats.erl b/src/riak_cs_stats.erl index 997faeb9e..12eacb38b 100644 --- a/src/riak_cs_stats.erl +++ b/src/riak_cs_stats.erl @@ -21,201 +21,107 @@ -module(riak_cs_stats). %% API --export([safe_update/2, - update/2, +-export([update/2, + update_with_start/3, update_with_start/2, + update_error_with_start/2, + inflow/1, report_json/0, report_pretty_json/0, get_stats/0]). +%% Lower level API, mainly for debugging or investigation from shell +-export([report_duration/3, + report_pool/1]). + -export([init/0]). --type metric_name() :: list(atom()). --export_type([metric_name/0]). - --define(METRICS, - %% [{metric_name(), exometer:type(), [exometer:option()], Aliases}] - [{[block, get], spiral, [], - [{one, block_gets}, {count, block_gets_total}]}, - {[block, get, retry], spiral, [], - [{one, block_gets_retry}, {count, block_gets_retry_total}]}, - {[block, put], spiral, [], - [{one, block_puts}, {count, block_puts_total}]}, - {[block, delete], spiral, [], - [{one, block_deletes}, {count, block_deletes_total}]}, - - {[block, get, time], histogram, [], - [{mean , block_get_time_mean}, - {median, block_get_time_median}, - {95 , block_get_time_95}, - {99 , block_get_time_99}, - {100 , block_get_time_100}]}, - {[block, get, retry, time], histogram, [], - [{mean , block_get_retry_time_mean}, - {median, block_get_retry_time_median}, - {95 , block_get_retry_time_95}, - {99 , block_get_retry_time_99}, - {100 , block_get_retry_time_100}]}, - {[block, put, time], histogram, [], - [{mean , block_put_time_mean}, - {median, block_put_time_median}, - {95 , block_put_time_95}, - {99 , block_put_time_99}, - {100 , block_put_time_100}]}, - {[block, delete, time], histogram, [], - [{mean , block_delete_time_mean}, - {median, block_delete_time_median}, - {95 , block_delete_time_95}, - {99 , block_delete_time_99}, - {100 , block_delete_time_100}]}, - - {[service, get, buckets], spiral, [], - [{one, service_get_buckets}, - {count, service_get_buckets_total}]}, - {[service, get, buckets, time], histogram, [], - [{mean , service_get_buckets_time_mean}, - {median, service_get_buckets_time_median}, - {95 , service_get_buckets_time_95}, - {99 , service_get_buckets_time_99}, - {100 , service_get_buckets_time_100}]}, - - {[bucket, list_keys], spiral, [], - [{one, bucket_list_keys}, {count, bucket_list_keys_total}]}, - {[bucket, create], spiral, [], - [{one, bucket_creates}, {count, bucket_creates_total}]}, - {[bucket, delete], spiral, [], - [{one, bucket_deletes}, {count, bucket_deletes_total}]}, - {[bucket, get_acl], spiral, [], - [{one, bucket_get_acl}, {count, bucket_get_acl_total}]}, - {[bucket, put_acl], spiral, [], - [{one, bucket_put_acl}, {count, bucket_put_acl_total}]}, - {[bucket, put_policy], spiral, [], - [{one, bucket_put_policy}, {count, bucket_put_policy_total}]}, - - {[bucket, list_keys, time], histogram, [], - [{mean , bucket_list_keys_time_mean}, - {median, bucket_list_keys_time_median}, - {95 , bucket_list_keys_time_95}, - {99 , bucket_list_keys_time_99}, - {100 , bucket_list_keys_time_100}]}, - {[bucket, create, time], histogram, [], - [{mean , bucket_create_time_mean}, - {median, bucket_create_time_median}, - {95 , bucket_create_time_95}, - {99 , bucket_create_time_99}, - {100 , bucket_create_time_100}]}, - {[bucket, delete, time], histogram, [], - [{mean , bucket_delete_time_mean}, - {median, bucket_delete_time_median}, - {95 , bucket_delete_time_95}, - {99 , bucket_delete_time_99}, - {100 , bucket_delete_time_100}]}, - {[bucket, get_acl, time], histogram, [], - [{mean , bucket_get_acl_time_mean}, - {median, bucket_get_acl_time_median}, - {95 , bucket_get_acl_time_95}, - {99 , bucket_get_acl_time_99}, - {100 , bucket_get_acl_time_100}]}, - {[bucket, put_acl, time], histogram, [], - [{mean , bucket_put_acl_time_mean}, - {median, bucket_put_acl_time_median}, - {95 , bucket_put_acl_time_95}, - {99 , bucket_put_acl_time_99}, - {100 , bucket_put_acl_time_100}]}, - {[bucket, put_policy, time], histogram, [], - [{mean , bucket_put_policy_time_mean}, - {median, bucket_put_policy_time_median}, - {95 , bucket_put_policy_time_95}, - {99 , bucket_put_policy_time_99}, - {100 , bucket_put_policy_time_100}]}, - - {[object, get], spiral, [], - [{one, object_gets}, {count, object_gets_total}]}, - {[object, put], spiral, [], - [{one, object_puts}, {count, object_puts_total}]}, - {[object, head], spiral, [], - [{one, object_heads}, {count, object_heads_total}]}, - {[object, delete], spiral, [], - [{one, object_deletes}, {count, object_deletes_total}]}, - {[object, get_acl], spiral, [], - [{one, object_get_acl}, {count, object_get_acl_total}]}, - {[object, put_acl], spiral, [], - [{one, object_put_acl}, {count, object_put_acl_total}]}, - - {[object, get, time], histogram, [], - [{mean , object_get_time_mean}, - {median, object_get_time_median}, - {95 , object_get_time_95}, - {99 , object_get_time_99}, - {100 , object_get_time_100}]}, - {[object, put, time], histogram, [], - [{mean , object_put_time_mean}, - {median, object_put_time_median}, - {95 , object_put_time_95}, - {99 , object_put_time_99}, - {100 , object_put_time_100}]}, - {[object, head, time], histogram, [], - [{mean , object_head_time_mean}, - {median, object_head_time_median}, - {95 , object_head_time_95}, - {99 , object_head_time_99}, - {100 , object_head_time_100}]}, - {[object, delete, time], histogram, [], - [{mean , object_delete_time_mean}, - {median, object_delete_time_median}, - {95 , object_delete_time_95}, - {99 , object_delete_time_99}, - {100 , object_delete_time_100}]}, - {[object, get_acl, time], histogram, [], - [{mean , object_get_acl_time_mean}, - {median, object_get_acl_time_median}, - {95 , object_get_acl_time_95}, - {99 , object_get_acl_time_99}, - {100 , object_get_acl_time_100}]}, - {[object, put_acl, time], histogram, [], - [{mean , object_put_acl_time_mean}, - {median, object_put_acl_time_median}, - {95 , object_put_acl_time_95}, - {99 , object_put_acl_time_99}, - {100 , object_put_acl_time_100}]}, - - {[manifest, siblings_bp_sleep], spiral, [], - [{one, manifest_siblings_bp_sleep}, - {count, manifest_siblings_bp_sleep_total}]}, - {[manifest, siblings_bp_sleep, time], histogram, [], - [{mean , manifest_siblings_bp_sleep_time_mean}, - {median, manifest_siblings_bp_sleep_time_median}, - {95 , manifest_siblings_bp_sleep_time_95}, - {99 , manifest_siblings_bp_sleep_time_99}, - {100 , manifest_siblings_bp_sleep_time_100}]} - ]). +-type key() :: [atom()]. +-export_type([key/0]). + +-type ok_error_res() :: ok | {ok, _} | {error, _}. + +-spec duration_metrics() -> [key()]. +duration_metrics() -> + [ + [service, get], + + [bucket, put], + [bucket, head], + [bucket, delete], + [bucket_acl, get], + [bucket_acl, put], + [bucket_policy, get], + [bucket_policy, put], + [bucket_policy, delete], + [bucket_location, get], + [bucket_versioning, get], + [list_uploads, get], + [multiple_delete, post], + [list_objects, get], + + [object, get], + [object, put], + [object, put_copy], + [object, head], + [object, delete], + [object_acl, get], + [object_acl, put], + + [multipart, post], % Initiate + [multipart_upload, put], % Upload Part (Copy) + [multipart_upload, post], % Complete + [multipart_upload, delete], % Abort + [multipart_upload, get], % List Parts + + [velvet, create_user], + [velvet, update_user], + [velvet, create_bucket], + [velvet, delete_bucket], + [velvet, set_bucket_acl], + [velvet, set_bucket_policy], + [velvet, delete_bucket_policy], + + %% TODO: Remove backpresure sleep + [manifest, siblings_bp_sleep], + + [block, get], + [block, get, retry], + [block, put], + [block, delete] + ]. + +duration_subkeys() -> + [{[in], spiral}, + {[out], spiral}, + {[time], histogram}, + {[out, error], spiral}, + {[time, error], histogram}]. %% ==================================================================== %% API %% ==================================================================== +-spec inflow(key()) -> ok. +inflow(Key) -> + lager:debug("~p:inflow Key: ~p", [?MODULE, Key]), + ok = exometer:update([riak_cs, in | Key], 1). +-spec update_with_start(key(), erlang:timestamp(), ok_error_res()) -> ok. +update_with_start(Key, StartTime, ok) -> + update_with_start(Key, StartTime); +update_with_start(Key, StartTime, {ok, _}) -> + update_with_start(Key, StartTime); +update_with_start(Key, StartTime, {error, _}) -> + update_error_with_start(Key, StartTime). --spec safe_update(metric_name(), integer()) -> ok | {error, any()}. -safe_update(BaseId, ElapsedUs) -> - %% Just in case those metrics happen to be not registered; should - %% be a bug and also should not interrupt handling requests by - %% crashing. - try - update(BaseId, ElapsedUs) - catch T:E -> - lager:error("Failed on storing some metrics: ~p,~p", [T,E]) - end. - --spec update(metric_name(), integer()) -> ok | {error, any()}. -update(BaseId, ElapsedUs) -> - ok = exometer:update([riak_cs|BaseId], 1), - ok = exometer:update([riak_cs|BaseId]++[time], ElapsedUs). +-spec update_with_start(key(), erlang:timestamp()) -> ok. +update_with_start(Key, StartTime) -> + update(Key, timer:now_diff(os:timestamp(), StartTime)). --spec update_with_start(metric_name(), erlang:timestamp()) -> - ok | {error, any()}. -update_with_start(BaseId, StartTime) -> - update(BaseId, timer:now_diff(os:timestamp(), StartTime)). +-spec update_error_with_start(key(), erlang:timestamp()) -> ok. +update_error_with_start(Key, StartTime) -> + update([error | Key], timer:now_diff(os:timestamp(), StartTime)). -spec report_json() -> string(). report_json() -> @@ -227,88 +133,95 @@ report_pretty_json() -> -spec get_stats() -> proplists:proplist(). get_stats() -> - Stats = [raw_report_item(I) || I <- ?METRICS] - ++ [raw_report_pool(P) || P <- [request_pool, bucket_list_pool]], - lists:flatten(Stats). - -init() -> - _ = [init_item(I) || I <- ?METRICS], - ok. + DurationStats = + [report_duration(Key, SubKey, ExometerType) || + Key <- duration_metrics(), + {SubKey, ExometerType} <- duration_subkeys()], + PoolStats = [report_pool(P) || P <- [request_pool, bucket_list_pool]], + lists:flatten([DurationStats, PoolStats]). %% ==================================================================== %% Internal %% ==================================================================== -init_item({Name, Type, Opts, Aliases}) -> - ok = exometer:re_register([riak_cs|Name], Type, - [{aliases, Aliases}|Opts]). - -raw_report_item({Name, _Type, _Options, Aliases}) -> - - {ok, Values} = exometer:get_value([riak_cs|Name], [D||{D,_Alias}<-Aliases]), - [{Alias, Value} || - {{D, Alias}, {D, Value}} <- lists:zip(Aliases, Values)]. +init() -> + _ = [init_duration_item(I) || I <- duration_metrics()], + ok. -raw_report_pool(Pool) -> +init_duration_item(Key) -> + [ok = exometer:re_register([riak_cs | SubKey ++ Key], ExometerType, []) || + {SubKey, ExometerType} <- duration_subkeys()]. + +-spec update(key(), integer()) -> ok. +update(Key, ElapsedUs) -> + lager:debug("~p:update Key: ~p", [?MODULE, Key]), + ok = exometer:update([riak_cs, out | Key], 1), + ok = exometer:update([riak_cs, time | Key], ElapsedUs). + +-spec report_duration(key(), [atom()], exometer:type()) -> [{atom(), integer()}]. +report_duration(Key, SubKey, ExometerType) -> + AtomKeys = [metric_to_atom(Key ++ SubKey, Suffix) || + Suffix <- suffixes(ExometerType)], + {ok, Values} = exometer:get_value([riak_cs | SubKey ++ Key], + datapoints(ExometerType)), + [{AtomKey, Value} || + {AtomKey, {_DP, Value}} <- lists:zip(AtomKeys, Values)]. + +datapoints(histogram) -> + [mean, median, 95, 99, max]; +datapoints(spiral) -> + [one, count]. + +suffixes(histogram) -> + ["_mean", "_median", "_95", "_99", "_100"]; +suffixes(spiral) -> + ["_one", "_count"]. + +-spec report_pool(atom()) -> [{atom(), integer()}]. +report_pool(Pool) -> {_PoolState, PoolWorkers, PoolOverflow, PoolSize} = poolboy:status(Pool), Name = binary_to_list(atom_to_binary(Pool, latin1)), [{list_to_atom(lists:flatten([Name, $_, "workers"])), PoolWorkers}, {list_to_atom(lists:flatten([Name, $_, "overflow"])), PoolOverflow}, {list_to_atom(lists:flatten([Name, $_, "size"])), PoolSize}]. - +metric_to_atom(Key, Suffix) -> + StringKey = string:join([atom_to_list(Token) || Token <- Key], "_"), + list_to_atom(lists:flatten([StringKey, Suffix])). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -stats_metric_test() -> - [begin - ?debugVal(Key), - case lists:last(Key) of - time -> - ?assertEqual(histogram, Type), - [?assert(proplists:is_defined(M, Aliases)) - || M <- [mean, median, 95, 99, 100]]; - _ -> - ?assertNotEqual(false, lists:keyfind(Key, 1, ?METRICS)), - ?assertEqual(spiral, Type), - ?assert(proplists:is_defined(one, Aliases)), - ?assert(proplists:is_defined(count, Aliases)) - end, - ?assertEqual([], Options) - end || {Key, Type, Options, Aliases} <- ?METRICS]. - stats_test_() -> Apps = [setup, compiler, syntax_tools, goldrush, lager, exometer_core], {setup, fun() -> [ok = application:start(App) || App <- Apps], - ok = riak_cs_stats:init() + ok = init() end, fun(_) -> [ok = application:stop(App) || App <- Apps] end, [{inparallel, [fun() -> - %% ?debugVal(Key), - case lists:last(Key) of - time -> ok; - _ -> riak_cs_stats:update(Key, 16#deadbeef) - end - end || {Key, _, _, _} <- ?METRICS]}, - fun() -> - [begin - Items = raw_report_item(I), - %% ?debugVal(Items), - case length(Items) of - 2 -> - ?assertEqual([1, 1], - [N || {_, N} <- Items]); - 5 -> - ?assertEqual([16#deadbeef, 16#deadbeef, 16#deadbeef, 16#deadbeef, 0], - [N || {_, N} <- Items]) - end - end || I <- ?METRICS] - end]}. + inflow(Key), + update(Key, 16#deadbeef), + update([error | Key], 16#deadbeef) + end || Key <- duration_metrics()]}, + fun() -> + [begin + Report = [N || {_, N} <- report_duration(Key, SubKey, ExometerType)], + case ExometerType of + spiral -> + ?assertEqual([1, 1], Report); + histogram -> + ?assertEqual( + [16#deadbeef, 16#deadbeef, 16#deadbeef, + 16#deadbeef, 16#deadbeef], + Report) + end + end || Key <- duration_metrics(), + {SubKey, ExometerType} <- duration_subkeys()] + end]}. -endif. diff --git a/src/riak_cs_user.erl b/src/riak_cs_user.erl index 3dcb69914..ff6b0b6b2 100644 --- a/src/riak_cs_user.erl +++ b/src/riak_cs_user.erl @@ -75,11 +75,15 @@ create_credentialed_user({error, _}=Error, _User) -> create_credentialed_user({ok, AdminCreds}, User) -> {StIp, StPort, StSSL} = riak_cs_utils:stanchion_data(), %% Make a call to the user request serialization service. + StatsKey = [velvet, create_user], + _ = riak_cs_stats:inflow(StatsKey), + StartTime = os:timestamp(), Result = velvet:create_user(StIp, StPort, "application/json", binary_to_list(riak_cs_json:to_json(User)), [{ssl, StSSL}, {auth_creds, AdminCreds}]), + _ = riak_cs_stats:update_with_start(StatsKey, StartTime, Result), handle_create_user(Result, User). handle_create_user(ok, User) -> @@ -115,6 +119,9 @@ update_user(User, UserObj, RcPid) -> case riak_cs_config:admin_creds() of {ok, AdminCreds} -> Options = [{ssl, StSSL}, {auth_creds, AdminCreds}], + StatsKey = [velvet, update_user], + _ = riak_cs_stats:inflow(StatsKey), + StartTime = os:timestamp(), %% Make a call to the user request serialization service. Result = velvet:update_user(StIp, StPort, @@ -122,6 +129,7 @@ update_user(User, UserObj, RcPid) -> User?RCS_USER.key_id, binary_to_list(riak_cs_json:to_json(User)), Options), + _ = riak_cs_stats:update_with_start(StatsKey, StartTime, Result), handle_update_user(Result, User, UserObj, RcPid); {error, _}=Error -> Error diff --git a/src/riak_cs_utils.erl b/src/riak_cs_utils.erl index 77e13e896..ebe3621a2 100644 --- a/src/riak_cs_utils.erl +++ b/src/riak_cs_utils.erl @@ -163,7 +163,6 @@ close_riak_connection(Pool, Pid) -> -spec delete_object(binary(), binary(), riak_client()) -> {ok, [binary()]} | {error, term()}. delete_object(Bucket, Key, RcPid) -> - ok = riak_cs_stats:update_with_start([object, delete], os:timestamp()), riak_cs_gc:gc_active_manifests(Bucket, Key, RcPid). -spec encode_term(term()) -> binary(). @@ -463,18 +462,14 @@ riak_connection(Pool) -> -spec set_object_acl(binary(), binary(), lfs_manifest(), acl(), riak_client()) -> ok | {error, term()}. set_object_acl(Bucket, Key, Manifest, Acl, RcPid) -> - StartTime = os:timestamp(), {ok, ManiPid} = riak_cs_manifest_fsm:start_link(Bucket, Key, RcPid), - _ActiveMfst = riak_cs_manifest_fsm:get_active_manifest(ManiPid), - UpdManifest = Manifest?MANIFEST{acl=Acl}, - Res = riak_cs_manifest_fsm:update_manifest_with_confirmation(ManiPid, UpdManifest), - riak_cs_manifest_fsm:stop(ManiPid), - if Res == ok -> - ok = riak_cs_stats:update_with_start([object, put_acl], StartTime); - true -> - ok - end, - Res. + try + _ActiveMfst = riak_cs_manifest_fsm:get_active_manifest(ManiPid), + UpdManifest = Manifest?MANIFEST{acl=Acl}, + riak_cs_manifest_fsm:update_manifest_with_confirmation(ManiPid, UpdManifest) + after + riak_cs_manifest_fsm:stop(ManiPid) + end. -spec second_resolution_timestamp(erlang:timestamp()) -> non_neg_integer(). %% @doc Return the number of seconds this timestamp represents. Truncated to diff --git a/src/riak_cs_wm_bucket.erl b/src/riak_cs_wm_bucket.erl index 38855205a..76433997e 100644 --- a/src/riak_cs_wm_bucket.erl +++ b/src/riak_cs_wm_bucket.erl @@ -20,7 +20,8 @@ -module(riak_cs_wm_bucket). --export([content_types_provided/2, +-export([stats_prefix/0, + content_types_provided/2, to_xml/2, allowed_methods/0, malformed_request/2, @@ -32,6 +33,9 @@ -include("riak_cs.hrl"). -include_lib("webmachine/include/webmachine.hrl"). +-spec stats_prefix() -> bucket. +stats_prefix() -> bucket. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> @@ -120,7 +124,7 @@ accept_body(RD, Ctx=#context{user=User, bucket=Bucket, response_module=ResponseMod, riak_client=RcPid}) -> - riak_cs_dtrace:dt_bucket_entry(?MODULE, <<"bucket_create">>, + riak_cs_dtrace:dt_bucket_entry(?MODULE, <<"bucket_put">>, [], [riak_cs_wm_utils:extract_name(User), Bucket]), BagId = riak_cs_mb_helper:choose_bag_id(manifest, Bucket), case riak_cs_bucket:create_bucket(User, @@ -130,12 +134,12 @@ accept_body(RD, Ctx=#context{user=User, ACL, RcPid) of ok -> - riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_create">>, + riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_put">>, [200], [riak_cs_wm_utils:extract_name(User), Bucket]), {{halt, 200}, RD, Ctx}; {error, Reason} -> Code = ResponseMod:status_code(Reason), - riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_create">>, + riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_put">>, [Code], [riak_cs_wm_utils:extract_name(User), Bucket]), ResponseMod:api_error(Reason, RD, Ctx) end. diff --git a/src/riak_cs_wm_bucket_acl.erl b/src/riak_cs_wm_bucket_acl.erl index c01d2672f..2e52d80c1 100644 --- a/src/riak_cs_wm_bucket_acl.erl +++ b/src/riak_cs_wm_bucket_acl.erl @@ -20,7 +20,8 @@ -module(riak_cs_wm_bucket_acl). --export([content_types_provided/2, +-export([stats_prefix/0, + content_types_provided/2, to_xml/2, allowed_methods/0, malformed_request/2, @@ -35,6 +36,9 @@ -include_lib("webmachine/include/webmachine.hrl"). +-spec stats_prefix() -> bucket_acl. +stats_prefix() -> bucket_acl. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> @@ -72,8 +76,7 @@ authorize(RD, Ctx) -> -spec to_xml(#wm_reqdata{}, #context{}) -> {binary() | {'halt', non_neg_integer()}, #wm_reqdata{}, #context{}}. -to_xml(RD, Ctx=#context{start_time=StartTime, - user=User, +to_xml(RD, Ctx=#context{user=User, bucket=Bucket, riak_client=RcPid}) -> riak_cs_dtrace:dt_bucket_entry(?MODULE, <<"bucket_get_acl">>, @@ -81,14 +84,13 @@ to_xml(RD, Ctx=#context{start_time=StartTime, case riak_cs_acl:fetch_bucket_acl(Bucket, RcPid) of {ok, Acl} -> X = {riak_cs_xml:to_xml(Acl), RD, Ctx}, - ok = riak_cs_stats:update_with_start([bucket, get_acl], StartTime), - riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_get_acl">>, + riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_acl_get">>, [200], [riak_cs_wm_utils:extract_name(User), Bucket]), X; {error, Reason} -> Code = riak_cs_s3_response:status_code(Reason), X = riak_cs_s3_response:api_error(Reason, RD, Ctx), - riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_get_acl">>, + riak_cs_dtrace:dt_bucket_return(?MODULE, <<"bucket_acl">>, [Code], [riak_cs_wm_utils:extract_name(User), Bucket]), X end. diff --git a/src/riak_cs_wm_bucket_delete.erl b/src/riak_cs_wm_bucket_delete.erl index ee80732ea..2594b86aa 100644 --- a/src/riak_cs_wm_bucket_delete.erl +++ b/src/riak_cs_wm_bucket_delete.erl @@ -23,6 +23,7 @@ -module(riak_cs_wm_bucket_delete). -export([init/1, + stats_prefix/0, allowed_methods/0, post_is_create/2, process_post/2 @@ -44,6 +45,9 @@ init(Ctx) -> {ok, Ctx#context{rc_pool=?RIAKCPOOL}}. +-spec stats_prefix() -> multiple_delete. +stats_prefix() -> multiple_delete. + -spec allowed_methods() -> [atom()]. allowed_methods() -> %% POST is for Delete Multiple Objects diff --git a/src/riak_cs_wm_bucket_location.erl b/src/riak_cs_wm_bucket_location.erl index 830dd2a77..a311348fc 100644 --- a/src/riak_cs_wm_bucket_location.erl +++ b/src/riak_cs_wm_bucket_location.erl @@ -21,7 +21,8 @@ -module(riak_cs_wm_bucket_location). % TODO: add PUT --export([content_types_provided/2, +-export([stats_prefix/0, + content_types_provided/2, to_xml/2, allowed_methods/0 ]). @@ -31,6 +32,9 @@ -include("riak_cs.hrl"). -include_lib("webmachine/include/webmachine.hrl"). +-spec stats_prefix() -> bucket_location. +stats_prefix() -> bucket_location. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> diff --git a/src/riak_cs_wm_bucket_policy.erl b/src/riak_cs_wm_bucket_policy.erl index ed5f68cc3..d45b24f81 100644 --- a/src/riak_cs_wm_bucket_policy.erl +++ b/src/riak_cs_wm_bucket_policy.erl @@ -20,7 +20,8 @@ -module(riak_cs_wm_bucket_policy). --export([content_types_provided/2, +-export([stats_prefix/0, + content_types_provided/2, to_json/2, allowed_methods/0, content_types_accepted/2, @@ -36,6 +37,9 @@ -include_lib("riak_pb/include/riak_pb_kv_codec.hrl"). +-spec stats_prefix() -> bucket_policy. +stats_prefix() -> bucket_policy. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> diff --git a/src/riak_cs_wm_bucket_uploads.erl b/src/riak_cs_wm_bucket_uploads.erl index 05439a9c1..ae7abda16 100644 --- a/src/riak_cs_wm_bucket_uploads.erl +++ b/src/riak_cs_wm_bucket_uploads.erl @@ -28,6 +28,7 @@ -module(riak_cs_wm_bucket_uploads). -export([init/1, + stats_prefix/0, authorize/2, content_types_provided/2, to_xml/2, @@ -46,6 +47,9 @@ init(Ctx) -> {ok, Ctx#context{local_context=#key_context{}}}. +-spec stats_prefix() -> list_uploads. +stats_prefix() -> list_uploads. + -spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}. malformed_request(RD,Ctx=#context{local_context=LocalCtx0}) -> Bucket = list_to_binary(wrq:path_info(bucket, RD)), diff --git a/src/riak_cs_wm_bucket_versioning.erl b/src/riak_cs_wm_bucket_versioning.erl index b73251ce9..7cafe5d09 100644 --- a/src/riak_cs_wm_bucket_versioning.erl +++ b/src/riak_cs_wm_bucket_versioning.erl @@ -20,7 +20,8 @@ -module(riak_cs_wm_bucket_versioning). --export([content_types_provided/2, +-export([stats_prefix/0, + content_types_provided/2, to_xml/2, allowed_methods/0]). @@ -29,6 +30,9 @@ -include("riak_cs.hrl"). -include_lib("webmachine/include/webmachine.hrl"). +-spec stats_prefix() -> bucket_versioning. +stats_prefix() -> bucket_versioning. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> diff --git a/src/riak_cs_wm_buckets.erl b/src/riak_cs_wm_buckets.erl index 9f2bd621b..1bc9e2a4a 100644 --- a/src/riak_cs_wm_buckets.erl +++ b/src/riak_cs_wm_buckets.erl @@ -20,7 +20,8 @@ -module(riak_cs_wm_buckets). --export([allowed_methods/0, +-export([stats_prefix/0, + allowed_methods/0, api_request/2, anon_ok/0]). @@ -28,18 +29,19 @@ -include("riak_cs_api.hrl"). -include_lib("webmachine/include/webmachine.hrl"). +-spec stats_prefix() -> service. +stats_prefix() -> service. + %% @doc Get the list of methods this resource supports. -spec allowed_methods() -> [atom()]. allowed_methods() -> ['GET']. -spec api_request(#wm_reqdata{}, #context{}) -> ?LBRESP{}. -api_request(_RD, #context{user=User, - start_time=StartTime}) -> +api_request(_RD, #context{user=User}) -> UserName = riak_cs_wm_utils:extract_name(User), riak_cs_dtrace:dt_service_entry(?MODULE, <<"service_get_buckets">>, [], [UserName]), Res = riak_cs_api:list_buckets(User), - ok = riak_cs_stats:update_with_start([service, get, buckets], StartTime), riak_cs_dtrace:dt_service_return(?MODULE, <<"service_get_buckets">>, [], [UserName]), Res. diff --git a/src/riak_cs_wm_common.erl b/src/riak_cs_wm_common.erl index 0900e4ade..4e2336552 100644 --- a/src/riak_cs_wm_common.erl +++ b/src/riak_cs_wm_common.erl @@ -45,6 +45,7 @@ finish_request/2]). -export([default_allowed_methods/0, + default_stats_prefix/0, default_content_types_accepted/2, default_content_types_provided/2, default_generate_etag/2, @@ -80,11 +81,13 @@ init(Config) -> PolicyModule = proplists:get_value(policy_module, Config), Exports = orddict:from_list(Mod:module_info(exports)), ExportsFun = exports_fun(Exports), + StatsPrefix = resource_call(Mod, stats_prefix, [], ExportsFun), Ctx = #context{auth_bypass=AuthBypass, auth_module=AuthModule, response_module=RespModule, policy_module=PolicyModule, exports_fun=ExportsFun, + stats_prefix=StatsPrefix, start_time=os:timestamp(), submodule=Mod, api=Api}, @@ -106,7 +109,11 @@ service_available(RD, Ctx=#context{submodule=Mod, rc_pool=Pool}) -> -spec malformed_request(#wm_reqdata{}, #context{}) -> {boolean(), #wm_reqdata{}, #context{}}. malformed_request(RD, Ctx=#context{submodule=Mod, - exports_fun=ExportsFun}) -> + exports_fun=ExportsFun, + stats_prefix=StatsPrefix}) -> + %% Methoid is used in stats keys, updating inflow should be *after* + %% allowed_methods assertion. + _ = update_stats_inflow(RD, StatsPrefix), riak_cs_dtrace:dt_wm_entry({?MODULE, Mod}, <<"malformed_request">>), {Malformed, _, _} = R = resource_call(Mod, malformed_request, @@ -366,6 +373,7 @@ finish_request(RD, Ctx=#context{riak_client=RcPid, [RD, Ctx], ExportsFun), riak_cs_dtrace:dt_wm_return({?MODULE, Mod}, <<"finish_request">>, [0], []), + update_stats(RD, Ctx), Res; finish_request(RD, Ctx0=#context{riak_client=RcPid, rc_pool=Pool, @@ -379,6 +387,7 @@ finish_request(RD, Ctx0=#context{riak_client=RcPid, [RD, Ctx], ExportsFun), riak_cs_dtrace:dt_wm_return({?MODULE, Mod}, <<"finish_request">>, [1], []), + update_stats(RD, Ctx), Res. %% =================================================================== @@ -494,6 +503,34 @@ post_authentication({error, Reason}, RD, Ctx, _, _) -> _ = lager:debug("Authentication error: ~p", [Reason]), riak_cs_wm_utils:deny_invalid_key(RD, Ctx). +update_stats_inflow(_RD, undefined = _StatsPrefix) -> + ok; +update_stats_inflow(RD, StatsPrefix) -> + Method = riak_cs_wm_utils:lower_case_method(wrq:method(RD)), + Key = [StatsPrefix, Method], + riak_cs_stats:inflow(Key). + +update_stats(_RD, #context{stats_key=no_stats}) -> + ok; +update_stats(_RD, #context{stats_prefix=no_stats}) -> + ok; +update_stats(RD, #context{start_time=StartTime, + stats_prefix=StatsPrefix, stats_key=StatsKey}) -> + update_stats(StartTime, wrq:response_code(RD), + StatsPrefix, riak_cs_wm_utils:lower_case_method(wrq:method(RD)), + StatsKey). + +update_stats(StartTime, Code, StatsPrefix, Method, StatsKey0) -> + StatsKey = case StatsKey0 of + prefix_and_method -> [StatsPrefix, Method]; + _ -> StatsKey0 + end, + case Code of + Success when is_integer(Success) andalso Success < 400 -> + riak_cs_stats:update_with_start(StatsKey, StartTime); + _Error -> + riak_cs_stats:update_error_with_start(StatsKey, StartTime) + end. %% =================================================================== %% Resource function defaults @@ -501,6 +538,8 @@ post_authentication({error, Reason}, RD, Ctx, _, _) -> default(init) -> default_init; +default(stats_prefix) -> + default_stats_prefix; default(allowed_methods) -> default_allowed_methods; default(content_types_accepted) -> @@ -533,6 +572,9 @@ default(_) -> default_init(Ctx) -> {ok, Ctx}. +default_stats_prefix() -> + no_stats. + default_malformed_request(RD, Ctx) -> {false, RD, Ctx}. diff --git a/src/riak_cs_wm_object.erl b/src/riak_cs_wm_object.erl index f484b71e3..d8c32e8ac 100644 --- a/src/riak_cs_wm_object.erl +++ b/src/riak_cs_wm_object.erl @@ -21,6 +21,7 @@ -module(riak_cs_wm_object). -export([init/1, + stats_prefix/0, authorize/2, content_types_provided/2, generate_etag/2, @@ -41,6 +42,9 @@ init(Ctx) -> {ok, Ctx#context{local_context=#key_context{}}}. +-spec stats_prefix() -> object. +stats_prefix() -> object. + -spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}. malformed_request(RD, Ctx) -> ContextWithKey = riak_cs_wm_utils:extract_key(RD, Ctx), @@ -193,7 +197,10 @@ produce_body(RD, Ctx=#context{rc_pool=RcPool, {Ctx, fun() -> {<<>>, done} end}; false -> riak_cs_get_fsm:continue(GetFsmPid, {Start, End}), - {Ctx#context{auto_rc_close=false}, + %% Streaming by `known_length_stream' and `StreamBody' function + %% will be handled *after* WM's `finish_request' callback complets. + %% Use `no_stats` to avoid auto stats update by `riak_cs_wm_common'. + {Ctx#context{auto_rc_close=false, stats_key=no_stats}, {<<>>, fun() -> riak_cs_wm_utils:streaming_get( RcPool, RcPid, GetFsmPid, StartTime, UserName, BFile_str) @@ -201,8 +208,7 @@ produce_body(RD, Ctx=#context{rc_pool=RcPool, end, if Method == 'HEAD' -> riak_cs_dtrace:dt_object_return(?MODULE, <<"object_head">>, - [], [UserName, BFile_str]), - ok = riak_cs_stats:update_with_start([object, head], StartTime); + [], [UserName, BFile_str]); true -> ok end, @@ -311,7 +317,8 @@ accept_body(RD, #context{response_module=ResponseMod} = Ctx) -> {error, _} = Err -> ResponseMod:api_error(Err, RD, Ctx); {SrcBucket, SrcKey} -> - handle_copy_put(RD, Ctx, SrcBucket, SrcKey) + handle_copy_put(RD, Ctx#context{stats_key=[object, put_copy]}, + SrcBucket, SrcKey) end. -spec handle_normal_put(#wm_reqdata{}, #context{}) -> @@ -441,7 +448,6 @@ accept_streambody(RD, -spec finalize_request(#wm_reqdata{}, #context{}, pid()) -> {{halt, 200}, #wm_reqdata{}, #context{}}. finalize_request(RD, Ctx=#context{local_context=LocalCtx, - start_time=StartTime, response_module=ResponseMod, user=User}, Pid) -> @@ -465,7 +471,6 @@ finalize_request(RD, {error, Reason} -> ResponseMod:api_error(Reason, RD, Ctx) end, - ok = riak_cs_stats:update_with_start([object, put], StartTime), riak_cs_dtrace:dt_wm_return(?MODULE, <<"finalize_request">>, [S], [UserName, BFile_str]), riak_cs_dtrace:dt_object_return(?MODULE, <<"object_put">>, [S], [UserName, BFile_str]), Response. @@ -482,7 +487,9 @@ check_0length_metadata_update(Length, RD, Ctx=#context{local_context=LocalCtx}) true -> UpdLocalCtx = LocalCtx#key_context{size=Length, update_metadata=true}, - {true, RD, Ctx#context{local_context=UpdLocalCtx}} + {true, RD, Ctx#context{ + stats_key=[object, put_copy], + local_context=UpdLocalCtx}} end. zero_length_metadata_update_p(0, RD) -> diff --git a/src/riak_cs_wm_object_acl.erl b/src/riak_cs_wm_object_acl.erl index 8572a2d79..da2f36cd1 100644 --- a/src/riak_cs_wm_object_acl.erl +++ b/src/riak_cs_wm_object_acl.erl @@ -21,6 +21,7 @@ -module(riak_cs_wm_object_acl). -export([init/1, + stats_prefix/0, allowed_methods/0, malformed_request/2, authorize/2, @@ -35,6 +36,9 @@ init(Ctx) -> {ok, Ctx#context{local_context=#key_context{}}}. +-spec stats_prefix() -> object_acl. +stats_prefix() -> object_acl. + -spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}. malformed_request(RD,Ctx) -> case riak_cs_wm_utils:has_acl_header_and_body(RD) of @@ -139,27 +143,22 @@ content_types_accepted(RD, Ctx=#context{local_context=LocalCtx0}) -> -spec produce_body(term(), term()) -> {iolist()|binary(), term(), term()}. produce_body(RD, Ctx=#context{local_context=LocalCtx, requested_perm='READ_ACP', - start_time=StartTime, user=User}) -> #key_context{get_fsm_pid=GetFsmPid, manifest=Mfst} = LocalCtx, {Bucket, File} = Mfst?MANIFEST.bkey, BFile_str = [Bucket, $,, File], UserName = riak_cs_wm_utils:extract_name(User), - riak_cs_dtrace:dt_object_entry(?MODULE, <<"object_get_acl">>, - [], [UserName, BFile_str]), + riak_cs_dtrace:dt_object_entry(?MODULE, <<"object_acl_get">>, + [], [UserName, BFile_str]), riak_cs_get_fsm:stop(GetFsmPid), - ok = riak_cs_stats:update_with_start([object, get_acl], StartTime), Acl = Mfst?MANIFEST.acl, - case Acl of - undefined -> - riak_cs_dtrace:dt_object_return(?MODULE, <<"object_get_acl">>, - [-1], [UserName, BFile_str]), - {riak_cs_acl_utils:empty_acl_xml(), RD, Ctx}; - _ -> - riak_cs_dtrace:dt_object_return(?MODULE, <<"object_get_acl">>, - [-2], [UserName, BFile_str]), - {riak_cs_xml:to_xml(Acl), RD, Ctx} - end. + {AclXml, DtraceTag} = case Acl of + undefined -> {riak_cs_acl_utils:empty_acl_xml(), -1}; + _ -> {riak_cs_xml:to_xml(Acl), -2} + end, + riak_cs_dtrace:dt_object_return(?MODULE, <<"object_acl_get">>, + [DtraceTag], [UserName, BFile_str]), + {AclXml, RD, Ctx}. -spec accept_body(term(), term()) -> {boolean() | {halt, term()}, term(), term()}. @@ -197,18 +196,18 @@ accept_body(RD, Ctx=#context{local_context=#key_context{get_fsm_pid=GetFsmPid, Key = list_to_binary(KeyStr), case riak_cs_utils:set_object_acl(Bucket, Key, Mfst, Acl, RcPid) of ok -> - riak_cs_dtrace:dt_object_return(?MODULE, <<"object_put_acl">>, + riak_cs_dtrace:dt_object_return(?MODULE, <<"object_acl_put">>, [200], [UserName, BFile_str]), {{halt, 200}, RD, Ctx}; {error, Reason} -> Code = riak_cs_s3_response:status_code(Reason), - riak_cs_dtrace:dt_object_return(?MODULE, <<"object_put_acl">>, + riak_cs_dtrace:dt_object_return(?MODULE, <<"object_acl_put">>, [Code], [UserName, BFile_str]), riak_cs_s3_response:api_error(Reason, RD, Ctx) end; {error, Reason2} -> Code = riak_cs_s3_response:status_code(Reason2), - riak_cs_dtrace:dt_object_return(?MODULE, <<"object_put_acl">>, + riak_cs_dtrace:dt_object_return(?MODULE, <<"object_acl_put">>, [Code], [UserName, BFile_str]), riak_cs_s3_response:api_error(Reason2, RD, Ctx) end. diff --git a/src/riak_cs_wm_object_upload.erl b/src/riak_cs_wm_object_upload.erl index 09b7d65c2..90fdd9f11 100644 --- a/src/riak_cs_wm_object_upload.erl +++ b/src/riak_cs_wm_object_upload.erl @@ -21,6 +21,7 @@ -module(riak_cs_wm_object_upload). -export([init/1, + stats_prefix/0, authorize/2, content_types_provided/2, allowed_methods/0, @@ -38,6 +39,9 @@ init(Ctx) -> {ok, Ctx#context{local_context=#key_context{}}}. +-spec stats_prefix() -> multipart. +stats_prefix() -> multipart. + -spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}. malformed_request(RD,Ctx) -> ContextWithKey = riak_cs_wm_utils:extract_key(RD, Ctx), diff --git a/src/riak_cs_wm_object_upload_part.erl b/src/riak_cs_wm_object_upload_part.erl index db570da3a..02152d9f2 100644 --- a/src/riak_cs_wm_object_upload_part.erl +++ b/src/riak_cs_wm_object_upload_part.erl @@ -21,6 +21,7 @@ -module(riak_cs_wm_object_upload_part). -export([init/1, + stats_prefix/0, authorize/2, content_types_provided/2, allowed_methods/0, @@ -44,8 +45,11 @@ init(Ctx) -> %% {ok, Ctx#context{local_context=#key_context{}}}. {ok, Ctx#context{local_context=#key_context{}}}. +-spec stats_prefix() -> multipart_upload. +stats_prefix() -> multipart_upload. + -spec malformed_request(#wm_reqdata{}, #context{}) -> - {false, #wm_reqdata{}, #context{}} | {{halt, pos_integer()}, #wm_reqdata{}, #context{}}. + {false, #wm_reqdata{}, #context{}} | {{halt, pos_integer()}, #wm_reqdata{}, #context{}}. malformed_request(RD,Ctx) -> Method = wrq:method(RD), case Method == 'PUT' andalso not valid_part_number(RD) of diff --git a/src/riak_cs_wm_objects.erl b/src/riak_cs_wm_objects.erl index d069a2474..11b4fc666 100644 --- a/src/riak_cs_wm_objects.erl +++ b/src/riak_cs_wm_objects.erl @@ -23,6 +23,7 @@ -module(riak_cs_wm_objects). -export([init/1, + stats_prefix/0, allowed_methods/0, api_request/2 ]). @@ -40,6 +41,9 @@ init(Ctx) -> {ok, Ctx#context{rc_pool=?RIAKCPOOL}}. +-spec stats_prefix() -> list_objects. +stats_prefix() -> list_objects. + -spec allowed_methods() -> [atom()]. allowed_methods() -> %% GET is for object listing @@ -54,8 +58,7 @@ authorize(RD, Ctx) -> -spec api_request(#wm_reqdata{}, #context{}) -> {ok, ?LORESP{}} | {error, term()}. api_request(RD, Ctx=#context{bucket=Bucket, riak_client=RcPid, - user=User, - start_time=StartTime}) -> + user=User}) -> UserName = riak_cs_wm_utils:extract_name(User), riak_cs_dtrace:dt_bucket_entry(?MODULE, <<"list_keys">>, [], [UserName, Bucket]), Res = riak_cs_api:list_objects( @@ -65,7 +68,6 @@ api_request(RD, Ctx=#context{bucket=Bucket, get_max_keys(RD), get_options(RD), RcPid), - ok = riak_cs_stats:update_with_start([bucket, list_keys], StartTime), riak_cs_dtrace:dt_bucket_return(?MODULE, <<"list_keys">>, [200], [UserName, Bucket]), Res. diff --git a/src/riak_cs_wm_utils.erl b/src/riak_cs_wm_utils.erl index 40e638698..f2d378b10 100644 --- a/src/riak_cs_wm_utils.erl +++ b/src/riak_cs_wm_utils.erl @@ -22,6 +22,7 @@ -export([service_available/2, service_available/3, + lower_case_method/1, iso_8601_datetime/0, iso_8601_datetime/1, to_iso_8601/1, @@ -348,6 +349,16 @@ streaming_get(RcPool, RcPid, FsmPid, StartTime, UserName, BFile_str) -> {Chunk, fun() -> streaming_get(RcPool, RcPid, FsmPid, StartTime, UserName, BFile_str) end} end. +-spec lower_case_method(atom()) -> atom(). +lower_case_method('GET') -> get; +lower_case_method('HEAD') -> head; +lower_case_method('POST') -> post; +lower_case_method('PUT') -> put; +lower_case_method('DELETE') -> delete; +lower_case_method('TRACE') -> trace; +lower_case_method('CONNECT') -> connect; +lower_case_method('OPTIONS') -> options. + %% @doc Get an ISO 8601 formatted timestamp representing %% current time. -spec iso_8601_datetime() -> string().