diff --git a/dialyzer.ignore-warnings.ee b/dialyzer.ignore-warnings.ee index 3218a2c0e..cd631631c 100644 --- a/dialyzer.ignore-warnings.ee +++ b/dialyzer.ignore-warnings.ee @@ -1,7 +1,7 @@ # Errors riak_cs_block_server.erl:312: The pattern Success = {'ok', _} can never match the type {'error',_} riak_cs_block_server.erl:347: The pattern {'ok', RiakObject} can never match the type {'error',_} -riak_cs_config.erl:248: The pattern {'ok', ClusterID} can never match the type {'error',_} +riak_cs_pbc.erl:100: The pattern {'ok', ClusterID} can never match the type {'error',_} # Warnings Unknown functions: app_helper:get_prop_or_env/3 diff --git a/rebar b/rebar index b2434fe6b..60410665a 100755 Binary files a/rebar and b/rebar differ diff --git a/rebar.config b/rebar.config index fc2e1ebdc..4528769b2 100644 --- a/rebar.config +++ b/rebar.config @@ -2,7 +2,7 @@ {require_otp_vsn, "R16"}. -{cover_enabled, true}. +{cover_enabled, false}. %% EDoc options {edoc_opts, [preprocess]}. @@ -54,5 +54,5 @@ {deps_ee, [ {riak_repl_pb_api,".*",{git,"git@github.com:basho/riak_repl_pb_api.git", {tag, "2.1.0"}}}, - {riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.0.0"}}} + {riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.1.0-pre1"}}} ]}. diff --git a/rel/files/riak_cs.schema b/rel/files/riak_cs.schema index 9e90375e4..0e5c6e0b6 100644 --- a/rel/files/riak_cs.schema +++ b/rel/files/riak_cs.schema @@ -367,6 +367,15 @@ {datatype, integer} ]}. +%% == Multi-Datacenter Replication == +%% @doc Switch to use proxy_get feature of +%% Multi-Datacenter Replication. Make sure +%% to set proxy_get on also in riak side. +{mapping, "proxy_get", "riak_cs.proxy_get", [ + {default, off}, + {datatype, flag} +]}. + %% == DTrace == %% @doc If your Erlang virtual machine supports DTrace (or diff --git a/riak_test/src/rtcs.erl b/riak_test/src/rtcs.erl index 194493fbb..e86f1f61f 100644 --- a/riak_test/src/rtcs.erl +++ b/riak_test/src/rtcs.erl @@ -104,6 +104,10 @@ flavored_setup(NumNodes, {multibag, _} = Flavor, Configs, Vsn) rtcs_bag:flavored_setup(NumNodes, Flavor, Configs, Vsn). setup_clusters(Configs, JoinFun, NumNodes, Vsn) -> + ConfigFun = fun(_Type, Config, _Node) -> Config end, + setup_clusters(Configs, ConfigFun, JoinFun, NumNodes, Vsn). + +setup_clusters(Configs, ConfigFun, JoinFun, NumNodes, Vsn) -> %% Start the erlcloud app erlcloud:start(), @@ -113,14 +117,15 @@ setup_clusters(Configs, JoinFun, NumNodes, Vsn) -> Cfgs = configs(Configs), lager:info("Configs = ~p", [ Cfgs]), - {RiakNodes, _CSNodes, _Stanchion} = Nodes = deploy_nodes(NumNodes, Cfgs, Vsn), + {RiakNodes, _CSNodes, _Stanchion} = Nodes = + deploy_nodes(NumNodes, Cfgs, ConfigFun, Vsn), rt:wait_until_nodes_ready(RiakNodes), lager:info("Make cluster"), JoinFun(RiakNodes), ?assertEqual(ok, wait_until_nodes_ready(RiakNodes)), ?assertEqual(ok, wait_until_no_pending_changes(RiakNodes)), rt:wait_until_ring_converged(RiakNodes), - {AdminKeyId, AdminSecretKey} = setup_admin_user(NumNodes, Cfgs, Vsn), + {AdminKeyId, AdminSecretKey} = setup_admin_user(NumNodes, Cfgs, ConfigFun, Vsn), AdminConfig = rtcs:config(AdminKeyId, AdminSecretKey, rtcs:cs_port(hd(RiakNodes))), @@ -178,7 +183,8 @@ riak_id_per_cluster(NumNodes) -> deploy_stanchion(Config) -> %% Set initial config - update_stanchion_config(rt_config:get(?STANCHION_CURRENT), Config), + ConfigFun = fun(_, Config0, _) -> Config0 end, + update_stanchion_config(rt_config:get(?STANCHION_CURRENT), Config, ConfigFun), start_stanchion(), lager:info("Stanchion started"). @@ -461,11 +467,9 @@ riak_root_and_vsn(previous, ee) -> {?EE_ROOT, ee_previous}. cs_current() -> ?CS_CURRENT. -deploy_nodes(NumNodes, InitialConfig) -> - deploy_nodes(NumNodes, InitialConfig, current). - --spec deploy_nodes(list(), list(), current|previous) -> any(). -deploy_nodes(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= previous -> +-spec deploy_nodes(list(), list(), fun(), current|previous) -> any(). +deploy_nodes(NumNodes, InitialConfig, ConfigFun, Vsn) + when Vsn =:= current orelse Vsn =:= previous -> lager:info("Initial Config: ~p", [InitialConfig]), NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)], RiakNodes = [?DEV(N) || N <- lists:seq(1, NumNodes)], @@ -504,7 +508,7 @@ deploy_nodes(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= p {_Versions, Configs} = lists:unzip(NodeConfig), %% Set initial config - set_configs(NodeList, Configs, Vsn), + set_configs(NodeList, Configs, ConfigFun, Vsn), start_all_nodes(NodeList, Vsn), Nodes = {RiakNodes, CSNodes, StanchionNode}, @@ -519,10 +523,8 @@ node_id(Node) -> NodeMap = rt_config:get(rt_cs_nodes), orddict:fetch(Node, NodeMap). -setup_admin_user(NumNodes, InitialConfig) -> - setup_admin_user(NumNodes, InitialConfig, current). - -setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= previous -> +setup_admin_user(NumNodes, InitialConfig, ConfigFun, Vsn) + when Vsn =:= current orelse Vsn =:= previous -> lager:info("Initial Config: ~p", [InitialConfig]), NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)], RiakNodes = [?DEV(N) || N <- lists:seq(1, NumNodes)], @@ -549,10 +551,7 @@ setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn = %% Create admin user and set in cs and stanchion configs {KeyID, KeySecret} = AdminCreds = create_admin_user(hd(RiakNodes)), - %% Restart cs and stanchion nodes so admin user takes effect - %% stop_cs_and_stanchion_nodes(NodeList), - - set_admin_creds_in_configs(NodeList, Configs, AdminCreds, Vsn), + set_admin_creds_in_configs(NodeList, Configs, ConfigFun, AdminCreds, Vsn), UpdateFun = fun({Node, App}) -> ok = rpc:call(Node, application, set_env, @@ -564,9 +563,6 @@ setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn = [ {CSNode, riak_cs} || CSNode <- CSNodes ]], lists:foreach(UpdateFun, ZippedNodes), - %% start_cs_and_stanchion_nodes(NodeList), - %% [ok = rt:wait_until_pingable(N) || N <- CSNodes ++ [StanchionNode]], - lager:info("NodeConfig: ~p", [ NodeConfig ]), lager:info("RiakNodes: ~p", [RiakNodes]), lager:info("CSNodes: ~p", [CSNodes]), @@ -653,7 +649,7 @@ get_rt_config(cs, previous) -> rt_config:get(?CS_PREVIOUS); get_rt_config(stanchion, current) -> rt_config:get(?STANCHION_CURRENT); get_rt_config(stanchion, previous) -> rt_config:get(?STANCHION_PREVIOUS). -set_configs(NodeList, Configs, Vsn) -> +set_configs(NodeList, Configs, ConfigFun, Vsn) -> rt:pmap(fun({_, default}) -> ok; ({{_CSNode, RiakNode, _Stanchion}, Config}) -> @@ -661,20 +657,21 @@ set_configs(NodeList, Configs, Vsn) -> rt_cs_dev:update_app_config(RiakNode, proplists:get_value(riak, Config)), update_cs_config(get_rt_config(cs, Vsn), N, - proplists:get_value(cs, Config)), + proplists:get_value(cs, Config), ConfigFun), update_stanchion_config(get_rt_config(stanchion, Vsn), - proplists:get_value(stanchion, Config)); + proplists:get_value(stanchion, Config), + ConfigFun); ({{_CSNode, RiakNode}, Config}) -> N = rt_cs_dev:node_id(RiakNode), rt_cs_dev:update_app_config(RiakNode, proplists:get_value(riak, Config)), update_cs_config(get_rt_config(cs, Vsn), N, - proplists:get_value(cs, Config)) + proplists:get_value(cs, Config), ConfigFun) end, lists:zip(NodeList, Configs)), enable_zdbbl(Vsn). -set_admin_creds_in_configs(NodeList, Configs, AdminCreds, Vsn) -> +set_admin_creds_in_configs(NodeList, Configs, ConfigFun, AdminCreds, Vsn) -> rt:pmap(fun({_, default}) -> ok; ({{_CSNode, RiakNode, _Stanchion}, Config}) -> @@ -682,15 +679,17 @@ set_admin_creds_in_configs(NodeList, Configs, AdminCreds, Vsn) -> update_cs_config(get_rt_config(cs, Vsn), N, proplists:get_value(cs, Config), + ConfigFun, AdminCreds), update_stanchion_config(get_rt_config(stanchion, Vsn), proplists:get_value(stanchion, Config), - AdminCreds); + ConfigFun, AdminCreds); ({{_CSNode, RiakNode}, Config}) -> N = rt_cs_dev:node_id(RiakNode), update_cs_config(get_rt_config(cs, Vsn), N, proplists:get_value(cs, Config), + ConfigFun, AdminCreds) end, lists:zip(NodeList, Configs)). @@ -801,6 +800,14 @@ calculate_storage(N, Vsn) -> lager:info("Running ~p", [Cmd]), os:cmd(Cmd). +enable_proxy_get(SrcN, Vsn, SinkCluster) -> + rtdev:run_riak_repl(SrcN, get_rt_config(riak, Vsn), + "proxy_get enable " ++ SinkCluster). + +disable_proxy_get(SrcN, Vsn, SinkCluster) -> + rtdev:run_riak_repl(SrcN, get_rt_config(riak, Vsn), + "proxy_get disable " ++ SinkCluster). + read_config(Vsn, N, Who) -> Prefix = get_rt_config(Who, Vsn), EtcPath = case Who of @@ -815,16 +822,22 @@ read_config(Vsn, N, Who) -> Config end. -update_cs_config(Prefix, N, Config, {AdminKey, AdminSecret}) -> +update_cs_config(Prefix, N, Config, {_,_} = AdminCred) -> + update_cs_config(Prefix, N, Config, fun(_,Config0,_) -> Config0 end, AdminCred); +update_cs_config(Prefix, N, Config, ConfigUpdateFun) when is_function(ConfigUpdateFun) -> + update_cs_config1(Prefix, N, Config, ConfigUpdateFun). + +update_cs_config(Prefix, N, Config, ConfigUpdateFun, {AdminKey, AdminSecret}) -> CSSection = proplists:get_value(riak_cs, Config), UpdConfig = [{riak_cs, update_admin_creds(CSSection, AdminKey, AdminSecret)} | proplists:delete(riak_cs, Config)], - update_cs_config(Prefix, N, UpdConfig). + update_cs_config1(Prefix, N, UpdConfig, ConfigUpdateFun). -update_cs_config(Prefix, N, Config) -> +update_cs_config1(Prefix, N, Config, ConfigUpdateFun) -> CSSection = proplists:get_value(riak_cs, Config), - UpdConfig = [{riak_cs, update_cs_port(CSSection, N)} | - proplists:delete(riak_cs, Config)], + UpdConfig0 = [{riak_cs, update_cs_port(CSSection, N)} | + proplists:delete(riak_cs, Config)], + UpdConfig = ConfigUpdateFun(cs, UpdConfig0, N), update_app_config(riakcs_etcpath(Prefix, N), UpdConfig). update_admin_creds(Config, AdminKey, AdminSecret) -> @@ -836,16 +849,22 @@ update_cs_port(Config, N) -> Config2 = [{riak_host, {"127.0.0.1", pb_port(N)}} | proplists:delete(riak_host, Config)], [{listener, {"127.0.0.1", cs_port(N)}} | proplists:delete(listener, Config2)]. -update_stanchion_config(Prefix, Config, {AdminKey, AdminSecret}) -> +update_stanchion_config(Prefix, Config, {_,_} = AdminCreds) -> + update_stanchion_config(Prefix, Config, fun(_,Config0,_) -> Config0 end, AdminCreds); +update_stanchion_config(Prefix, Config, ConfigUpdateFun) when is_function(ConfigUpdateFun) -> + update_stanchion_config1(Prefix, Config, ConfigUpdateFun). + +update_stanchion_config(Prefix, Config, ConfigUpdateFun, {AdminKey, AdminSecret}) -> StanchionSection = proplists:get_value(stanchion, Config), UpdConfig = [{stanchion, update_admin_creds(StanchionSection, AdminKey, AdminSecret)} | proplists:delete(stanchion, Config)], - update_stanchion_config(Prefix, UpdConfig). + update_stanchion_config1(Prefix, UpdConfig, ConfigUpdateFun). -update_stanchion_config(Prefix, Config) -> +update_stanchion_config1(Prefix, Config0, ConfigUpdateFun) when is_function(ConfigUpdateFun) -> + Config = ConfigUpdateFun(stanchion, Config0, undefined), update_app_config(stanchion_etcpath(Prefix), Config). -update_app_config(Path, Config) -> +update_app_config(Path, Config) -> lager:debug("rtcs:update_app_config(~s,~p)", [Path, Config]), FileFormatString = "~s/~s.config", AppConfigFile = io_lib:format(FileFormatString, [Path, "app"]), diff --git a/riak_test/src/rtcs_multipart.erl b/riak_test/src/rtcs_multipart.erl index c3209c7ee..d64cd9277 100644 --- a/riak_test/src/rtcs_multipart.erl +++ b/riak_test/src/rtcs_multipart.erl @@ -47,16 +47,24 @@ upload_parts(Bucket, Key, UploadId, Config, PartCount, [Size | Sizes], Contents, Sizes, [Content | Contents], [{PartCount, PartEtag} | Parts]). upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, Config) -> - + upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, undefined, Config). + +upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, SrcRange, Config) -> Url = "/" ++ Key, Source = filename:join([SrcBucket, SrcKey]), Subresources = [{"partNumber", integer_to_list(PartNum)}, {"uploadId", UploadId}], Headers = [%%{"content-length", byte_size(PartData)}, - {"x-amz-copy-source", Source}], + {"x-amz-copy-source", Source} | + source_range(SrcRange)], erlcloud_s3:s3_request(Config, put, BucketName, Url, Subresources, [], {<<>>, []}, Headers). +source_range(undefined) -> []; +source_range({First, Last}) -> + [{"x-amz-copy-source-range", + lists:flatten(io_lib:format("bytes=~b-~b", [First, Last]))}]. + upload_and_assert_part(Bucket, Key, UploadId, PartNum, PartData, Config) -> {RespHeaders, _UploadRes} = erlcloud_s3_multipart:upload_part(Bucket, Key, UploadId, PartNum, PartData, Config), assert_part(Bucket, Key, UploadId, PartNum, Config, RespHeaders). diff --git a/riak_test/src/rtcs_object.erl b/riak_test/src/rtcs_object.erl new file mode 100644 index 000000000..f76bd3e48 --- /dev/null +++ b/riak_test/src/rtcs_object.erl @@ -0,0 +1,64 @@ +%% --------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% --------------------------------------------------------------------- + +-module(rtcs_object). + +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). + +upload(UserConfig, normal, B, K) -> + Content = crypto:rand_bytes(mb(4)), + erlcloud_s3:put_object(B, K, Content, UserConfig), + {B, K, Content}; +upload(UserConfig, multipart, B, K) -> + Content = rtcs_multipart:multipart_upload(B, K, [mb(10), 400], UserConfig), + {B, K, Content}. + +upload(UserConfig, normal_copy, B, DstK, SrcK) -> + ?assertEqual([{copy_source_version_id, "false"}, {version_id, "null"}], + erlcloud_s3:copy_object(B, DstK, B, SrcK, UserConfig)); +upload(UserConfig, multipart_copy, B, DstK, SrcK) -> + InitUploadRes = erlcloud_s3_multipart:initiate_upload(B, DstK, "text/plain", [], UserConfig), + UploadId = erlcloud_s3_multipart:upload_id(InitUploadRes), + + {RespHeaders1, _} = rtcs_multipart:upload_part_copy( + B, DstK, UploadId, 1, B, SrcK, {0, mb(5)-1}, UserConfig), + Etag1 = rtcs_multipart:assert_part(B, DstK, UploadId, 1, UserConfig, RespHeaders1), + {RespHeaders2, _} = rtcs_multipart:upload_part_copy( + B, DstK, UploadId, 2, B, SrcK, {mb(5), mb(10)+400-1}, UserConfig), + Etag2 = rtcs_multipart:assert_part(B, DstK, UploadId, 2, UserConfig, RespHeaders2), + + EtagList = [ {1, Etag1}, {2, Etag2} ], + ?assertEqual(ok, erlcloud_s3_multipart:complete_upload( + B, DstK, UploadId, EtagList, UserConfig)). + +mb(MegaBytes) -> + MegaBytes * 1024 * 1024. + +assert_whole_content(UserConfig, Bucket, Key, ExpectedContent) -> + Obj = erlcloud_s3:get_object(Bucket, Key, UserConfig), + assert_whole_content(ExpectedContent, Obj). + +assert_whole_content(ExpectedContent, ResultObj) -> + Content = proplists:get_value(content, ResultObj), + ContentLength = proplists:get_value(content_length, ResultObj), + ?assertEqual(byte_size(ExpectedContent), list_to_integer(ContentLength)), + ?assertEqual(byte_size(ExpectedContent), byte_size(Content)), + ?assertEqual(ExpectedContent, Content). diff --git a/src/riak_cs_block_server.erl b/src/riak_cs_block_server.erl index ea32d8bf6..6465bc57b 100644 --- a/src/riak_cs_block_server.erl +++ b/src/riak_cs_block_server.erl @@ -51,7 +51,8 @@ format_status/2]). -record(state, {riak_client :: riak_client(), - close_riak_connection=true :: boolean()}). + close_riak_connection=true :: boolean(), + bag_id :: bag_id()}). %%%=================================================================== %%% API @@ -146,7 +147,8 @@ init([Manifest, RcPid]) -> init(Manifest, RcPid, State) -> process_flag(trap_exit, true), ok = riak_cs_riak_client:set_manifest(RcPid, Manifest), - {ok, State#state{riak_client=RcPid}}. + BagId = riak_cs_mb_helper:bag_id_from_manifest(Manifest), + {ok, State#state{riak_client=RcPid, bag_id=BagId}}. %%-------------------------------------------------------------------- %% @private @@ -177,8 +179,8 @@ handle_call(stop, _From, State) -> %%-------------------------------------------------------------------- handle_cast({get_block, ReplyPid, Bucket, Key, ClusterID, UUID, BlockNumber}, - State=#state{riak_client=RcPid}) -> - get_block(ReplyPid, Bucket, Key, ClusterID, UUID, BlockNumber, RcPid), + State=#state{riak_client=RcPid, bag_id=BagId}) -> + get_block(ReplyPid, Bucket, Key, ClusterID, BagId, UUID, BlockNumber, RcPid), {noreply, State}; handle_cast({put_block, ReplyPid, Bucket, Key, UUID, BlockNumber, Value, BCSum}, State=#state{riak_client=RcPid}) -> @@ -219,18 +221,18 @@ handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#sta handle_cast(_Msg, State) -> {noreply, State}. -get_block(ReplyPid, Bucket, Key, ClusterID, UUID, BlockNumber, RcPid) -> +get_block(ReplyPid, Bucket, Key, ClusterId, BagId, UUID, BlockNumber, RcPid) -> %% don't use proxy get if it's a local get %% or proxy get is disabled ProxyActive = riak_cs_config:proxy_get_active(), - UseProxyGet = use_proxy_get(RcPid, ClusterID), + UseProxyGet = use_proxy_get(ClusterId, BagId), case riak_cs_utils:n_val_1_get_requests() of true -> - do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive, UUID, + do_get_block(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, ProxyActive, UUID, BlockNumber, RcPid); false -> - normal_nval_block_get(ReplyPid, Bucket, Key, ClusterID, + normal_nval_block_get(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, UUID, BlockNumber, RcPid) end. @@ -551,10 +553,12 @@ n_val_one_options() -> r_one_options() -> [{r, 1}, {notfound_ok, false}, {basic_quorum, false}]. --spec use_proxy_get(riak_client(), term()) -> boolean(). -use_proxy_get(RcPid, ClusterID) -> - LocalClusterID = riak_cs_config:cluster_id(RcPid), - ClusterID /= undefined andalso LocalClusterID /= ClusterID. +-spec use_proxy_get(cluster_id(), bag_id()) -> boolean(). +use_proxy_get(undefined, _BagId) -> + false; +use_proxy_get(SourceClusterId, BagId) when is_binary(SourceClusterId) -> + LocalClusterID = riak_cs_mb_helper:cluster_id(BagId), + LocalClusterID =/= SourceClusterId. dt_entry(Func, Ints, Strings) -> riak_cs_dtrace:dtrace(?DT_BLOCK_OP, 1, Ints, ?MODULE, Func, Strings). diff --git a/src/riak_cs_config.erl b/src/riak_cs_config.erl index 7ca6e18a4..1c1c44857 100644 --- a/src/riak_cs_config.erl +++ b/src/riak_cs_config.erl @@ -90,7 +90,8 @@ fold_objects_timeout/0, %% for cs_bucket_fold get_index_range_gckeys_timeout/0, get_index_range_gckeys_call_timeout/0, - get_index_list_multipart_uploads_timeout/0 + get_index_list_multipart_uploads_timeout/0, + cluster_id_timeout/0 ]). %% OpenStack config @@ -247,45 +248,17 @@ use_t2b_compression() -> %% doc Return the current cluster ID. Used for repl %% After obtaining the clusterid the first time, %% store the value in app:set_env --spec cluster_id(riak_client()) -> binary(). -cluster_id(RcPid) -> +-spec cluster_id(fun()) -> binary(). +cluster_id(GetClusterIdFun) -> case application:get_env(riak_cs, cluster_id) of {ok, ClusterID} -> ClusterID; undefined -> - Timeout = case application:get_env(riak_cs, cluster_id_timeout) of - {ok, Value} -> - Value; - undefined -> - ?DEFAULT_CLUSTER_ID_TIMEOUT - end, - maybe_get_cluster_id(proxy_get_active(), RcPid, Timeout) + ClusterId = GetClusterIdFun(undefined), + application:set_env(riak_cs, cluster_id, ClusterId), + ClusterId end. -%% @doc If `proxy_get' is enabled then attempt to determine the cluster id --spec maybe_get_cluster_id(boolean(), riak_client(), integer()) -> undefined | binary(). -maybe_get_cluster_id(true, RcPid, Timeout) -> - try - %% TODO && FIXME!!: DO NOT support multibag YET!!! - {ok, MasterPbc} = riak_cs_riak_client:master_pbc(RcPid), - case riak_repl_pb_api:get_clusterid(MasterPbc, Timeout) of - {ok, ClusterID} -> - application:set_env(riak_cs, cluster_id, ClusterID), - ClusterID; - _ -> - _ = lager:debug("Unable to obtain cluster ID"), - undefined - end - catch _:_ -> - %% Disable `proxy_get' so we do not repeatedly have to - %% handle this same exception. This would happen if an OSS - %% install has `proxy_get' enabled. - application:set_env(riak_cs, proxy_get, disabled), - undefined - end; -maybe_get_cluster_id(false, _, _) -> - undefined. - %% @doc Return the configured md5 chunk size -spec md5_chunk_size() -> non_neg_integer(). md5_chunk_size() -> @@ -488,6 +461,7 @@ local_get_block_timeout() -> ?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_timeout). ?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_call_timeout). ?TIMEOUT_CONFIG_FUNC(get_index_list_multipart_uploads_timeout). +?TIMEOUT_CONFIG_FUNC(cluster_id_timeout). -undef(TIMEOUT_CONFIG_FUNC). diff --git a/src/riak_cs_lfs_utils.erl b/src/riak_cs_lfs_utils.erl index f08281435..722f850d6 100644 --- a/src/riak_cs_lfs_utils.erl +++ b/src/riak_cs_lfs_utils.erl @@ -37,9 +37,8 @@ initial_blocks/2, block_sequences_for_manifest/1, block_sequences_for_manifest/2, - new_manifest/9, - new_manifest/11, new_manifest/12, + set_bag_id/2, remove_write_block/2, remove_delete_block/2]). @@ -269,37 +268,6 @@ get_fsm_buffer_size_factor() -> end. %% @doc Initialize a new file manifest --spec new_manifest(binary(), - binary(), - binary(), - non_neg_integer(), - binary(), - term(), - term(), - pos_integer(), - acl() | no_acl_yet) -> lfs_manifest(). -new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, - MetaData, BlockSize, Acl) -> - new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, - MetaData, BlockSize, Acl, [], undefined). - --spec new_manifest(binary(), - binary(), - binary(), - non_neg_integer(), - binary(), - term(), - term(), - pos_integer(), - acl() | no_acl_yet, - proplists:proplist(), - cluster_id()) -> lfs_manifest(). -new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, - MetaData, BlockSize, Acl, Props, ClusterID) -> - BagId = riak_cs_mb_helper:choose_bag_id(block, {Bucket, FileName, UUID}), - new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, - MetaData, BlockSize, Acl, Props, ClusterID, BagId). - -spec new_manifest(binary(), binary(), binary(), @@ -311,7 +279,7 @@ new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, acl() | no_acl_yet, proplists:proplist(), cluster_id(), - BagId::binary()) -> lfs_manifest(). + bag_id()) -> lfs_manifest(). new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, MetaData, BlockSize, Acl, Props, ClusterID, BagId) -> Blocks = ordsets:from_list(initial_blocks(ContentLength, BlockSize)), @@ -327,6 +295,10 @@ new_manifest(Bucket, FileName, UUID, ContentLength, ContentType, ContentMd5, acl=Acl, props=Props, cluster_id=ClusterID}, + set_bag_id(BagId, Manifest). + +-spec set_bag_id(bag_id(), lfs_manifest()) -> lfs_manifest(). +set_bag_id(BagId, Manifest) -> riak_cs_mb_helper:set_bag_id_to_manifest(BagId, Manifest). %% @doc Remove a chunk from the diff --git a/src/riak_cs_manifest_utils.erl b/src/riak_cs_manifest_utils.erl index 4ff07b27f..35acd1a13 100644 --- a/src/riak_cs_manifest_utils.erl +++ b/src/riak_cs_manifest_utils.erl @@ -467,6 +467,9 @@ new_mani_helper() -> undefined, %% md5 orddict:new(), 10, + undefined, + [], + undefined, undefined). manifest_test_() -> diff --git a/src/riak_cs_mb_helper.erl b/src/riak_cs_mb_helper.erl index 796f7f58f..9d01ad895 100644 --- a/src/riak_cs_mb_helper.erl +++ b/src/riak_cs_mb_helper.erl @@ -23,6 +23,8 @@ -module(riak_cs_mb_helper). -export([process_specs/0, bags/0, + cluster_id/1, + get_cluster_id/1, choose_bag_id/2, set_bag_id_to_manifest/2, bag_id_from_manifest/1]). @@ -64,3 +66,26 @@ bag_id_from_manifest(?MANIFEST{props = Props}) -> {block_bag, BagId} -> BagId end end. + +-spec cluster_id(bag_id()) -> cluster_id(). +cluster_id(BagId) -> + case riak_cs_config:proxy_get_active() of + false -> + undefined; + true -> + Fun = fun get_cluster_id/1, + ?MB_ENABLED(riak_cs_config:cluster_id(Fun), + riak_cs_multibag:cluster_id(Fun, BagId)) + end. + +-spec get_cluster_id(bag_id())-> cluster_id(). +get_cluster_id(BagId) -> + PbcPool = riak_cs_riak_client:pbc_pool_name(BagId), + {ok, Pbc} = riak_cs_utils:riak_connection(PbcPool), + try + ClusterId = riak_cs_pbc:get_cluster_id(Pbc), + lager:info("Cluster ID for bag ~p : ~p", [BagId, ClusterId]), + ClusterId + after + riak_cs_utils:close_riak_connection(PbcPool, Pbc) + end. diff --git a/src/riak_cs_mp_utils.erl b/src/riak_cs_mp_utils.erl index 65b7af075..d0b0b61a9 100644 --- a/src/riak_cs_mp_utils.erl +++ b/src/riak_cs_mp_utils.erl @@ -52,7 +52,6 @@ make_content_types_accepted/3, make_special_error/1, make_special_error/4, - new_manifest/5, upload_part/6, upload_part/7, upload_part_1blob/2, upload_part_finished/7, upload_part_finished/8, @@ -242,7 +241,6 @@ list_parts(Bucket, Key, UploadId, Caller, Opts, RcPidUnW) -> new_manifest(Bucket, Key, ContentType, {_, _, _} = Owner, Opts) -> UUID = druuid:v4(), %% TODO: add object metadata here, e.g. content-disposition et al. - %% TODO: add cluster_id ... which means calling new_manifest/11 not /9. MetaData = case proplists:get_value(meta_data, Opts) of undefined -> []; AsIsHdrs -> AsIsHdrs @@ -257,7 +255,11 @@ new_manifest(Bucket, Key, ContentType, {_, _, _} = Owner, Opts) -> MetaData, riak_cs_lfs_utils:block_size(), %% ACL: needs Riak client pid, so we wait - no_acl_yet), + no_acl_yet, + [], + %% Cluster ID and Bag ID are added later + undefined, + undefined), MpM = ?MULTIPART_MANIFEST{upload_id = UUID, owner = Owner}, M?MANIFEST{props = replace_mp_manifest(MpM, M?MANIFEST.props)}. @@ -294,7 +296,7 @@ upload_part_finished(Bucket, Key, UploadId, _PartNumber, PartUUID, MD5, do_part_common(upload_part_finished, Bucket, Key, UploadId, Caller, [{upload_part_finished, Extra}], RcPidUnW). -write_new_manifest(M, Opts, RcPidUnW) -> +write_new_manifest(?MANIFEST{bkey={Bucket, Key}, uuid=UUID}=M, Opts, RcPidUnW) -> MpM = get_mp_manifest(M), Owner = MpM?MULTIPART_MANIFEST.owner, case wrap_riak_client(RcPidUnW) of @@ -302,21 +304,21 @@ write_new_manifest(M, Opts, RcPidUnW) -> try Acl = case proplists:get_value(acl, Opts) of undefined -> - % 4th arg, pid(), unused but honor the contract riak_cs_acl_utils:canned_acl("private", Owner, undefined); AnAcl -> AnAcl end, - ClusterId = riak_cs_config:cluster_id(?PID(RcPid)), - M2 = M?MANIFEST{acl = Acl, - cluster_id = ClusterId, - write_start_time=os:timestamp()}, - {Bucket, Key} = M?MANIFEST.bkey, + BagId = riak_cs_mb_helper:choose_bag_id(block, {Bucket, Key, UUID}), + M2 = riak_cs_lfs_utils:set_bag_id(BagId, M), + ClusterId = riak_cs_mb_helper:cluster_id(BagId), + M3 = M2?MANIFEST{acl = Acl, + cluster_id=ClusterId, + write_start_time=os:timestamp()}, {ok, ManiPid} = riak_cs_manifest_fsm:start_link(Bucket, Key, ?PID(RcPid)), try - ok = riak_cs_manifest_fsm:add_new_manifest(ManiPid, M2), - {ok, M2?MANIFEST.uuid} + ok = riak_cs_manifest_fsm:add_new_manifest(ManiPid, M3), + {ok, M3?MANIFEST.uuid} after ok = riak_cs_manifest_fsm:stop(ManiPid) end @@ -472,7 +474,7 @@ do_part_common2(upload_part, RcPid, M, _Obj, MpM, Props) -> {Bucket, Key, Size, <<"x-riak/multipart-part">>, orddict:new(), BlockSize, M?MANIFEST.acl, infinity, self(), RcPid}, - {false, BagId}), + false, BagId), try ?MANIFEST{content_length = ContentLength, props = MProps} = M, diff --git a/src/riak_cs_pbc.erl b/src/riak_cs_pbc.erl index b2d819525..a3103e804 100644 --- a/src/riak_cs_pbc.erl +++ b/src/riak_cs_pbc.erl @@ -29,6 +29,7 @@ put_with_no_meta/2, put_with_no_meta/3, list_keys/3, + get_cluster_id/1, check_connection_status/2]). %% @doc Get an object from Riak @@ -87,6 +88,26 @@ list_keys(PbcPid, BucketName, Timeout) -> Error end. +%% @doc Attempt to determine the cluster id +-spec get_cluster_id(pid()) -> undefined | binary(). +get_cluster_id(Pbc) -> + Timeout = riak_cs_config:cluster_id_timeout(), + try + case riak_repl_pb_api:get_clusterid(Pbc, Timeout) of + {ok, ClusterID} -> + ClusterID; + _ -> + _ = lager:debug("Unable to obtain cluster ID"), + undefined + end + catch _:_ -> + %% Disable `proxy_get' so we do not repeatedly have to + %% handle this same exception. This would happen if an OSS + %% install has `proxy_get' enabled. + application:set_env(riak_cs, proxy_get, disabled), + undefined + end. + %% @doc don't reuse return value -spec check_connection_status(pid(), term()) -> any(). check_connection_status(Pbc, Where) -> diff --git a/src/riak_cs_put_fsm.erl b/src/riak_cs_put_fsm.erl index 7e8b308cf..3a99fc35b 100644 --- a/src/riak_cs_put_fsm.erl +++ b/src/riak_cs_put_fsm.erl @@ -27,7 +27,7 @@ -include("riak_cs.hrl"). %% API --export([start_link/1, start_link/2, +-export([start_link/1, start_link/3, get_uuid/1, augment_data/2, block_written/2, @@ -63,7 +63,8 @@ reply_pid :: {pid(), reference()}, riak_client :: riak_client(), mani_pid :: undefined | pid(), - make_new_manifest_p :: true | {false, bag_id()}, + make_new_manifest_p :: boolean(), + bag_id :: bag_id(), timer_ref :: reference(), bucket :: binary(), key :: binary(), @@ -90,11 +91,11 @@ term(), pos_integer(), acl(), timeout(), pid(), riak_client()}) -> {ok, pid()} | {error, term()}. start_link(Tuple) when is_tuple(Tuple) -> - start_link(Tuple, true). + start_link(Tuple, true, undefined). -spec start_link({binary(), binary(), non_neg_integer(), binary(), term(), pos_integer(), acl(), timeout(), pid(), riak_client()}, - true | {false, bag_id()}) -> + boolean(), bag_id()) -> {ok, pid()} | {error, term()}. start_link({_Bucket, _Key, @@ -106,8 +107,9 @@ start_link({_Bucket, _Timeout, _Caller, _RcPid}=Arg1, - MakeNewManifestP) -> - gen_fsm:start_link(?MODULE, {Arg1, MakeNewManifestP}, []). + MakeNewManifestP, + BagId) -> + gen_fsm:start_link(?MODULE, {Arg1, MakeNewManifestP, BagId}, []). %% -spec get_uuid(pid()) -> binary(). get_uuid(Pid) -> @@ -141,11 +143,11 @@ block_written(Pid, BlockID) -> %% make things more confusing? -spec init({{binary(), binary(), non_neg_integer(), binary(), term(), pos_integer(), acl(), timeout(), pid(), riak_client()}, - true | {false, bag_id()}}) -> + boolean(), bag_id()}) -> {ok, prepare, #state{}, timeout()}. init({{Bucket, Key, ContentLength, ContentType, Metadata, BlockSize, Acl, Timeout, Caller, RcPid}, - MakeNewManifestP}) -> + MakeNewManifestP, BagId0}) -> %% We need to do this (the monitor) for two reasons %% 1. We're started through a supervisor, so the %% proc that actually intends to start us isn't @@ -157,6 +159,12 @@ init({{Bucket, Key, ContentLength, ContentType, CallerRef = erlang:monitor(process, Caller), UUID = druuid:v4(), + BagId = case BagId0 of + undefined -> + riak_cs_mb_helper:choose_bag_id(block, {Bucket, Key, UUID}); + _ -> + BagId0 + end, {ok, prepare, #state{bucket=Bucket, key=Key, block_size=BlockSize, @@ -168,6 +176,7 @@ init({{Bucket, Key, ContentLength, ContentType, content_type=ContentType, riak_client=RcPid, make_new_manifest_p=MakeNewManifestP, + bag_id=BagId, timeout=Timeout}, 0}. @@ -390,44 +399,26 @@ prepare(State=#state{bucket=Bucket, metadata=Metadata, acl=Acl, riak_client=RcPid, - make_new_manifest_p=MakeNewManifestP}) + make_new_manifest_p=MakeNewManifestP, + bag_id=BagId}) when is_integer(ContentLength), ContentLength >= 0 -> %% 1. start the manifest_fsm proc {ok, ManiPid} = maybe_riak_cs_manifest_fsm_start_link( MakeNewManifestP, Bucket, Key, RcPid), - %% TODO: - %% this shouldn't be hardcoded. - %% for now, always populate cluster_id - ClusterID = riak_cs_config:cluster_id(RcPid), - Manifest = case MakeNewManifestP of - true -> - riak_cs_lfs_utils:new_manifest(Bucket, - Key, - UUID, - ContentLength, - ContentType, - %% we don't know the md5 yet - undefined, - Metadata, - BlockSize, - Acl, - [], - ClusterID); - {false, BagId} -> - riak_cs_lfs_utils:new_manifest(Bucket, - Key, - UUID, - ContentLength, - ContentType, - %% we don't know the md5 yet - undefined, - Metadata, - BlockSize, - Acl, - [], - ClusterID, - BagId) - end, + ClusterID = riak_cs_mb_helper:cluster_id(BagId), + Manifest = riak_cs_lfs_utils:new_manifest(Bucket, + Key, + UUID, + ContentLength, + ContentType, + %% we don't know the md5 yet + undefined, + Metadata, + BlockSize, + Acl, + [], + ClusterID, + BagId), NewManifest = Manifest?MANIFEST{write_start_time=os:timestamp()}, Md5 = riak_cs_utils:md5_init(), @@ -649,7 +640,7 @@ handle_receiving_last_chunk(NewData, State=#state{buffer_queue=BufferQueue, Reply = ok, {reply, Reply, all_received, NewStateData}. -maybe_riak_cs_manifest_fsm_start_link({false, _BagId}, _Bucket, _Key, _RcPid) -> +maybe_riak_cs_manifest_fsm_start_link(false, _Bucket, _Key, _RcPid) -> {ok, undefined}; maybe_riak_cs_manifest_fsm_start_link(true, Bucket, Key, RcPid) -> riak_cs_manifest_fsm:start_link(Bucket, Key, RcPid). diff --git a/src/riak_cs_riak_client.erl b/src/riak_cs_riak_client.erl index 05d72a641..864416bf3 100644 --- a/src/riak_cs_riak_client.erl +++ b/src/riak_cs_riak_client.erl @@ -107,6 +107,8 @@ checkin(Pool, RcPid) -> -spec pbc_pool_name(master | bag_id()) -> atom(). pbc_pool_name(master) -> pbc_pool_master; +pbc_pool_name(undefined) -> + pbc_pool_master; pbc_pool_name(BagId) when is_binary(BagId) -> list_to_atom(lists:flatten(io_lib:format("pbc_pool_~s", [BagId]))). diff --git a/test/riak_cs_config_test.erl b/test/riak_cs_config_test.erl index acb5b2471..89ec5e3eb 100644 --- a/test/riak_cs_config_test.erl +++ b/test/riak_cs_config_test.erl @@ -17,6 +17,7 @@ default_config_test() -> cuttlefish_unit:assert_not_configured(Config, "riak_cs.admin_port"), cuttlefish_unit:assert_config(Config, "riak_cs.cs_root_host", "s3.amazonaws.com"), cuttlefish_unit:assert_config(Config, "riak_cs.cs_version", 10300), + cuttlefish_unit:assert_config(Config, "riak_cs.proxy_get", false), cuttlefish_unit:assert_not_configured(Config, "riak_cs.rewrite_module"), cuttlefish_unit:assert_not_configured(Config, "riak_cs.auth_module"), cuttlefish_unit:assert_config(Config, "riak_cs.fold_objects_for_list_keys", true), @@ -157,6 +158,12 @@ max_buckets_per_user_test() -> cuttlefish_unit:assert_config(NoConfig, "riak_cs.max_buckets_per_user", 100), ok. +proxy_get_test() -> + DefConf = [{["proxy_get"], "on"}], + DefConfig = cuttlefish_unit:generate_templated_config(schema_files(), DefConf, context()), + cuttlefish_unit:assert_config(DefConfig, "riak_cs.proxy_get", true), + ok. + wm_log_config_test_() -> {setup, fun() -> diff --git a/test/riak_cs_delete_deadlock.erl b/test/riak_cs_delete_deadlock.erl index 5bb0d9cb2..7bb0afa15 100644 --- a/test/riak_cs_delete_deadlock.erl +++ b/test/riak_cs_delete_deadlock.erl @@ -72,17 +72,21 @@ prop_delete_deadlock() -> part_manifests()}, begin BlockSize = riak_cs_lfs_utils:block_size(), - Manifest = riak_cs_lfs_utils:new_manifest(<<"bucket">>, - "test_file", - UUID, - ?CONTENT_LENGTH, - <<"ctype">>, - "md5", - dict:new(), - BlockSize, - riak_cs_acl_utils:default_acl("tester", - "tester_id", - "tester_key_id")), + Manifest = riak_cs_lfs_utils:new_manifest( + <<"bucket">>, + "test_file", + UUID, + ?CONTENT_LENGTH, + <<"ctype">>, + "md5", + dict:new(), + BlockSize, + riak_cs_acl_utils:default_acl("tester", + "tester_id", + "tester_key_id"), + [], + undefined, + undefined), MpM = ?MULTIPART_MANIFEST{parts = Parts}, NewManifest = Manifest?MANIFEST{props = riak_cs_mp_utils:replace_mp_manifest(MpM, Manifest?MANIFEST.props)}, diff --git a/test/riak_cs_dummy_reader.erl b/test/riak_cs_dummy_reader.erl index 52bce99aa..c35668de7 100644 --- a/test/riak_cs_dummy_reader.erl +++ b/test/riak_cs_dummy_reader.erl @@ -97,15 +97,19 @@ init([CallerPid, Bucket, Key, ContentLength, BlockSize]) -> handle_call(get_manifest, _From, #state{bucket=Bucket, key=Key, content_length=ContentLength}=State) -> - Manifest = riak_cs_lfs_utils:new_manifest(Bucket, - Key, - druuid:v4(), - ContentLength, - "application/test", - <<"md5">>, - orddict:new(), - riak_cs_lfs_utils:block_size(), - riak_cs_acl_utils:default_acl("tester", "id123", "keyid123")), + Manifest = riak_cs_lfs_utils:new_manifest( + Bucket, + Key, + druuid:v4(), + ContentLength, + "application/test", + <<"md5">>, + orddict:new(), + riak_cs_lfs_utils:block_size(), + riak_cs_acl_utils:default_acl("tester", "id123", "keyid123"), + [], + undefined, + undefined), {reply, {ok, Manifest}, State}; handle_call(_Msg, _From, State) -> {reply, ok, State}. diff --git a/test/riak_cs_lfs_utils_eqc.erl b/test/riak_cs_lfs_utils_eqc.erl index 92d9c0dd6..23d248b4f 100644 --- a/test/riak_cs_lfs_utils_eqc.erl +++ b/test/riak_cs_lfs_utils_eqc.erl @@ -82,18 +82,21 @@ prop_manifest_manipulation() -> begin application:set_env(riak_cs, lfs_block_size, 1048576), - Manifest = riak_cs_lfs_utils:new_manifest(Bucket, - FileName, - UUID, - CLength, - <<"ctype">>, - Md5, - MD, - riak_cs_lfs_utils:block_size(), - riak_cs_acl_utils:default_acl("tester", - "tester_id", - "tester_key_id")), - + Manifest = riak_cs_lfs_utils:new_manifest( + Bucket, + FileName, + UUID, + CLength, + <<"ctype">>, + Md5, + MD, + riak_cs_lfs_utils:block_size(), + riak_cs_acl_utils:default_acl("tester", + "tester_id", + "tester_key_id"), + [], + undefined, + undefined), Blocks = riak_cs_lfs_utils:initial_blocks(CLength, riak_cs_lfs_utils:block_size()), %% TODO: maybe we should shuffle blocks? FoldFun = fun (Chunk, Mani) -> riak_cs_lfs_utils:remove_write_block(Mani, Chunk) end, diff --git a/tools.mk b/tools.mk index 445dd0cbc..c3d61a822 100644 --- a/tools.mk +++ b/tools.mk @@ -122,7 +122,7 @@ dialyzer-run: | grep -F -f dialyzer.ignore-warnings.tmp -v \ | sed -E 's/^[[:space:]]*[0-9]+[[:space:]]*//' \ | sed -E 's/([]\^:+?|()*.$${}\[])/\\\1/g' \ - | sed -E 's/(\\\.erl\\\:)/\1\\d+:/g' \ + | sed -E 's/(\\\.erl\\\:)/\1[[:digit:]]+:/g' \ | sed -E 's/^(.*)$$/^[[:space:]]*\1$$/g' \ > dialyzer_unhandled_warnings ; \ rm dialyzer.ignore-warnings.tmp; \