Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable proxy get in multibag environment [rebase1] #1171

Merged
merged 16 commits into from
Jun 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dialyzer.ignore-warnings.ee
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Errors
riak_cs_block_server.erl:312: The pattern Success = {'ok', _} can never match the type {'error',_}
riak_cs_block_server.erl:347: The pattern {'ok', RiakObject} can never match the type {'error',_}
riak_cs_config.erl:248: The pattern {'ok', ClusterID} can never match the type {'error',_}
riak_cs_pbc.erl:100: The pattern {'ok', ClusterID} can never match the type {'error',_}
# Warnings
Unknown functions:
app_helper:get_prop_or_env/3
Expand Down
Binary file modified rebar
Binary file not shown.
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{require_otp_vsn, "R16"}.

{cover_enabled, true}.
{cover_enabled, false}.

%% EDoc options
{edoc_opts, [preprocess]}.
Expand Down Expand Up @@ -54,5 +54,5 @@

{deps_ee, [
{riak_repl_pb_api,".*",{git,"git@github.com:basho/riak_repl_pb_api.git", {tag, "2.1.0"}}},
{riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.0.0"}}}
{riak_cs_multibag,".*",{git,"git@github.com:basho/riak_cs_multibag.git", {tag, "2.1.0-pre1"}}}
]}.
9 changes: 9 additions & 0 deletions rel/files/riak_cs.schema
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,15 @@
{datatype, integer}
]}.

%% == Multi-Datacenter Replication ==
%% @doc Switch to use proxy_get feature of
%% Multi-Datacenter Replication. Make sure
%% to set proxy_get on also in riak side.
{mapping, "proxy_get", "riak_cs.proxy_get", [
{default, off},
{datatype, flag}
]}.

%% == DTrace ==

%% @doc If your Erlang virtual machine supports DTrace (or
Expand Down
89 changes: 54 additions & 35 deletions riak_test/src/rtcs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ flavored_setup(NumNodes, {multibag, _} = Flavor, Configs, Vsn)
rtcs_bag:flavored_setup(NumNodes, Flavor, Configs, Vsn).

setup_clusters(Configs, JoinFun, NumNodes, Vsn) ->
ConfigFun = fun(_Type, Config, _Node) -> Config end,
setup_clusters(Configs, ConfigFun, JoinFun, NumNodes, Vsn).

setup_clusters(Configs, ConfigFun, JoinFun, NumNodes, Vsn) ->
%% Start the erlcloud app
erlcloud:start(),

Expand All @@ -113,14 +117,15 @@ setup_clusters(Configs, JoinFun, NumNodes, Vsn) ->

Cfgs = configs(Configs),
lager:info("Configs = ~p", [ Cfgs]),
{RiakNodes, _CSNodes, _Stanchion} = Nodes = deploy_nodes(NumNodes, Cfgs, Vsn),
{RiakNodes, _CSNodes, _Stanchion} = Nodes =
deploy_nodes(NumNodes, Cfgs, ConfigFun, Vsn),
rt:wait_until_nodes_ready(RiakNodes),
lager:info("Make cluster"),
JoinFun(RiakNodes),
?assertEqual(ok, wait_until_nodes_ready(RiakNodes)),
?assertEqual(ok, wait_until_no_pending_changes(RiakNodes)),
rt:wait_until_ring_converged(RiakNodes),
{AdminKeyId, AdminSecretKey} = setup_admin_user(NumNodes, Cfgs, Vsn),
{AdminKeyId, AdminSecretKey} = setup_admin_user(NumNodes, Cfgs, ConfigFun, Vsn),
AdminConfig = rtcs:config(AdminKeyId,
AdminSecretKey,
rtcs:cs_port(hd(RiakNodes))),
Expand Down Expand Up @@ -178,7 +183,8 @@ riak_id_per_cluster(NumNodes) ->

deploy_stanchion(Config) ->
%% Set initial config
update_stanchion_config(rt_config:get(?STANCHION_CURRENT), Config),
ConfigFun = fun(_, Config0, _) -> Config0 end,
update_stanchion_config(rt_config:get(?STANCHION_CURRENT), Config, ConfigFun),

start_stanchion(),
lager:info("Stanchion started").
Expand Down Expand Up @@ -461,11 +467,9 @@ riak_root_and_vsn(previous, ee) -> {?EE_ROOT, ee_previous}.
cs_current() ->
?CS_CURRENT.

deploy_nodes(NumNodes, InitialConfig) ->
deploy_nodes(NumNodes, InitialConfig, current).

-spec deploy_nodes(list(), list(), current|previous) -> any().
deploy_nodes(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= previous ->
-spec deploy_nodes(list(), list(), fun(), current|previous) -> any().
deploy_nodes(NumNodes, InitialConfig, ConfigFun, Vsn)
when Vsn =:= current orelse Vsn =:= previous ->
lager:info("Initial Config: ~p", [InitialConfig]),
NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)],
RiakNodes = [?DEV(N) || N <- lists:seq(1, NumNodes)],
Expand Down Expand Up @@ -504,7 +508,7 @@ deploy_nodes(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= p
{_Versions, Configs} = lists:unzip(NodeConfig),

%% Set initial config
set_configs(NodeList, Configs, Vsn),
set_configs(NodeList, Configs, ConfigFun, Vsn),
start_all_nodes(NodeList, Vsn),

Nodes = {RiakNodes, CSNodes, StanchionNode},
Expand All @@ -519,10 +523,8 @@ node_id(Node) ->
NodeMap = rt_config:get(rt_cs_nodes),
orddict:fetch(Node, NodeMap).

setup_admin_user(NumNodes, InitialConfig) ->
setup_admin_user(NumNodes, InitialConfig, current).

setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =:= previous ->
setup_admin_user(NumNodes, InitialConfig, ConfigFun, Vsn)
when Vsn =:= current orelse Vsn =:= previous ->
lager:info("Initial Config: ~p", [InitialConfig]),
NodeConfig = [{current, InitialConfig} || _ <- lists:seq(1,NumNodes)],
RiakNodes = [?DEV(N) || N <- lists:seq(1, NumNodes)],
Expand All @@ -549,10 +551,7 @@ setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =
%% Create admin user and set in cs and stanchion configs
{KeyID, KeySecret} = AdminCreds = create_admin_user(hd(RiakNodes)),

%% Restart cs and stanchion nodes so admin user takes effect
%% stop_cs_and_stanchion_nodes(NodeList),

set_admin_creds_in_configs(NodeList, Configs, AdminCreds, Vsn),
set_admin_creds_in_configs(NodeList, Configs, ConfigFun, AdminCreds, Vsn),

UpdateFun = fun({Node, App}) ->
ok = rpc:call(Node, application, set_env,
Expand All @@ -564,9 +563,6 @@ setup_admin_user(NumNodes, InitialConfig, Vsn) when Vsn =:= current orelse Vsn =
[ {CSNode, riak_cs} || CSNode <- CSNodes ]],
lists:foreach(UpdateFun, ZippedNodes),

%% start_cs_and_stanchion_nodes(NodeList),
%% [ok = rt:wait_until_pingable(N) || N <- CSNodes ++ [StanchionNode]],

lager:info("NodeConfig: ~p", [ NodeConfig ]),
lager:info("RiakNodes: ~p", [RiakNodes]),
lager:info("CSNodes: ~p", [CSNodes]),
Expand Down Expand Up @@ -653,44 +649,47 @@ get_rt_config(cs, previous) -> rt_config:get(?CS_PREVIOUS);
get_rt_config(stanchion, current) -> rt_config:get(?STANCHION_CURRENT);
get_rt_config(stanchion, previous) -> rt_config:get(?STANCHION_PREVIOUS).

set_configs(NodeList, Configs, Vsn) ->
set_configs(NodeList, Configs, ConfigFun, Vsn) ->
rt:pmap(fun({_, default}) ->
ok;
({{_CSNode, RiakNode, _Stanchion}, Config}) ->
N = rt_cs_dev:node_id(RiakNode),
rt_cs_dev:update_app_config(RiakNode, proplists:get_value(riak,
Config)),
update_cs_config(get_rt_config(cs, Vsn), N,
proplists:get_value(cs, Config)),
proplists:get_value(cs, Config), ConfigFun),
update_stanchion_config(get_rt_config(stanchion, Vsn),
proplists:get_value(stanchion, Config));
proplists:get_value(stanchion, Config),
ConfigFun);
({{_CSNode, RiakNode}, Config}) ->
N = rt_cs_dev:node_id(RiakNode),
rt_cs_dev:update_app_config(RiakNode,
proplists:get_value(riak, Config)),
update_cs_config(get_rt_config(cs, Vsn), N,
proplists:get_value(cs, Config))
proplists:get_value(cs, Config), ConfigFun)
end,
lists:zip(NodeList, Configs)),
enable_zdbbl(Vsn).

set_admin_creds_in_configs(NodeList, Configs, AdminCreds, Vsn) ->
set_admin_creds_in_configs(NodeList, Configs, ConfigFun, AdminCreds, Vsn) ->
rt:pmap(fun({_, default}) ->
ok;
({{_CSNode, RiakNode, _Stanchion}, Config}) ->
N = rt_cs_dev:node_id(RiakNode),
update_cs_config(get_rt_config(cs, Vsn),
N,
proplists:get_value(cs, Config),
ConfigFun,
AdminCreds),
update_stanchion_config(get_rt_config(stanchion, Vsn),
proplists:get_value(stanchion, Config),
AdminCreds);
ConfigFun, AdminCreds);
({{_CSNode, RiakNode}, Config}) ->
N = rt_cs_dev:node_id(RiakNode),
update_cs_config(get_rt_config(cs, Vsn),
N,
proplists:get_value(cs, Config),
ConfigFun,
AdminCreds)
end,
lists:zip(NodeList, Configs)).
Expand Down Expand Up @@ -801,6 +800,14 @@ calculate_storage(N, Vsn) ->
lager:info("Running ~p", [Cmd]),
os:cmd(Cmd).

enable_proxy_get(SrcN, Vsn, SinkCluster) ->
rtdev:run_riak_repl(SrcN, get_rt_config(riak, Vsn),
"proxy_get enable " ++ SinkCluster).

disable_proxy_get(SrcN, Vsn, SinkCluster) ->
rtdev:run_riak_repl(SrcN, get_rt_config(riak, Vsn),
"proxy_get disable " ++ SinkCluster).

read_config(Vsn, N, Who) ->
Prefix = get_rt_config(Who, Vsn),
EtcPath = case Who of
Expand All @@ -815,16 +822,22 @@ read_config(Vsn, N, Who) ->
Config
end.

update_cs_config(Prefix, N, Config, {AdminKey, AdminSecret}) ->
update_cs_config(Prefix, N, Config, {_,_} = AdminCred) ->
update_cs_config(Prefix, N, Config, fun(_,Config0,_) -> Config0 end, AdminCred);
update_cs_config(Prefix, N, Config, ConfigUpdateFun) when is_function(ConfigUpdateFun) ->
update_cs_config1(Prefix, N, Config, ConfigUpdateFun).

update_cs_config(Prefix, N, Config, ConfigUpdateFun, {AdminKey, AdminSecret}) ->
CSSection = proplists:get_value(riak_cs, Config),
UpdConfig = [{riak_cs, update_admin_creds(CSSection, AdminKey, AdminSecret)} |
proplists:delete(riak_cs, Config)],
update_cs_config(Prefix, N, UpdConfig).
update_cs_config1(Prefix, N, UpdConfig, ConfigUpdateFun).

update_cs_config(Prefix, N, Config) ->
update_cs_config1(Prefix, N, Config, ConfigUpdateFun) ->
CSSection = proplists:get_value(riak_cs, Config),
UpdConfig = [{riak_cs, update_cs_port(CSSection, N)} |
proplists:delete(riak_cs, Config)],
UpdConfig0 = [{riak_cs, update_cs_port(CSSection, N)} |
proplists:delete(riak_cs, Config)],
UpdConfig = ConfigUpdateFun(cs, UpdConfig0, N),
update_app_config(riakcs_etcpath(Prefix, N), UpdConfig).

update_admin_creds(Config, AdminKey, AdminSecret) ->
Expand All @@ -836,16 +849,22 @@ update_cs_port(Config, N) ->
Config2 = [{riak_host, {"127.0.0.1", pb_port(N)}} | proplists:delete(riak_host, Config)],
[{listener, {"127.0.0.1", cs_port(N)}} | proplists:delete(listener, Config2)].

update_stanchion_config(Prefix, Config, {AdminKey, AdminSecret}) ->
update_stanchion_config(Prefix, Config, {_,_} = AdminCreds) ->
update_stanchion_config(Prefix, Config, fun(_,Config0,_) -> Config0 end, AdminCreds);
update_stanchion_config(Prefix, Config, ConfigUpdateFun) when is_function(ConfigUpdateFun) ->
update_stanchion_config1(Prefix, Config, ConfigUpdateFun).

update_stanchion_config(Prefix, Config, ConfigUpdateFun, {AdminKey, AdminSecret}) ->
StanchionSection = proplists:get_value(stanchion, Config),
UpdConfig = [{stanchion, update_admin_creds(StanchionSection, AdminKey, AdminSecret)} |
proplists:delete(stanchion, Config)],
update_stanchion_config(Prefix, UpdConfig).
update_stanchion_config1(Prefix, UpdConfig, ConfigUpdateFun).

update_stanchion_config(Prefix, Config) ->
update_stanchion_config1(Prefix, Config0, ConfigUpdateFun) when is_function(ConfigUpdateFun) ->
Config = ConfigUpdateFun(stanchion, Config0, undefined),
update_app_config(stanchion_etcpath(Prefix), Config).

update_app_config(Path, Config) ->
update_app_config(Path, Config) ->
lager:debug("rtcs:update_app_config(~s,~p)", [Path, Config]),
FileFormatString = "~s/~s.config",
AppConfigFile = io_lib:format(FileFormatString, [Path, "app"]),
Expand Down
12 changes: 10 additions & 2 deletions riak_test/src/rtcs_multipart.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,24 @@ upload_parts(Bucket, Key, UploadId, Config, PartCount, [Size | Sizes], Contents,
Sizes, [Content | Contents], [{PartCount, PartEtag} | Parts]).

upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, Config) ->

upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, undefined, Config).

upload_part_copy(BucketName, Key, UploadId, PartNum, SrcBucket, SrcKey, SrcRange, Config) ->
Url = "/" ++ Key,
Source = filename:join([SrcBucket, SrcKey]),
Subresources = [{"partNumber", integer_to_list(PartNum)},
{"uploadId", UploadId}],
Headers = [%%{"content-length", byte_size(PartData)},
{"x-amz-copy-source", Source}],
{"x-amz-copy-source", Source} |
source_range(SrcRange)],
erlcloud_s3:s3_request(Config, put, BucketName, Url,
Subresources, [], {<<>>, []}, Headers).

source_range(undefined) -> [];
source_range({First, Last}) ->
[{"x-amz-copy-source-range",
lists:flatten(io_lib:format("bytes=~b-~b", [First, Last]))}].

upload_and_assert_part(Bucket, Key, UploadId, PartNum, PartData, Config) ->
{RespHeaders, _UploadRes} = erlcloud_s3_multipart:upload_part(Bucket, Key, UploadId, PartNum, PartData, Config),
assert_part(Bucket, Key, UploadId, PartNum, Config, RespHeaders).
Expand Down
64 changes: 64 additions & 0 deletions riak_test/src/rtcs_object.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(rtcs_object).

-compile(export_all).
-include_lib("eunit/include/eunit.hrl").

upload(UserConfig, normal, B, K) ->
Content = crypto:rand_bytes(mb(4)),
erlcloud_s3:put_object(B, K, Content, UserConfig),
{B, K, Content};
upload(UserConfig, multipart, B, K) ->
Content = rtcs_multipart:multipart_upload(B, K, [mb(10), 400], UserConfig),
{B, K, Content}.

upload(UserConfig, normal_copy, B, DstK, SrcK) ->
?assertEqual([{copy_source_version_id, "false"}, {version_id, "null"}],
erlcloud_s3:copy_object(B, DstK, B, SrcK, UserConfig));
upload(UserConfig, multipart_copy, B, DstK, SrcK) ->
InitUploadRes = erlcloud_s3_multipart:initiate_upload(B, DstK, "text/plain", [], UserConfig),
UploadId = erlcloud_s3_multipart:upload_id(InitUploadRes),

{RespHeaders1, _} = rtcs_multipart:upload_part_copy(
B, DstK, UploadId, 1, B, SrcK, {0, mb(5)-1}, UserConfig),
Etag1 = rtcs_multipart:assert_part(B, DstK, UploadId, 1, UserConfig, RespHeaders1),
{RespHeaders2, _} = rtcs_multipart:upload_part_copy(
B, DstK, UploadId, 2, B, SrcK, {mb(5), mb(10)+400-1}, UserConfig),
Etag2 = rtcs_multipart:assert_part(B, DstK, UploadId, 2, UserConfig, RespHeaders2),

EtagList = [ {1, Etag1}, {2, Etag2} ],
?assertEqual(ok, erlcloud_s3_multipart:complete_upload(
B, DstK, UploadId, EtagList, UserConfig)).

mb(MegaBytes) ->
MegaBytes * 1024 * 1024.

assert_whole_content(UserConfig, Bucket, Key, ExpectedContent) ->
Obj = erlcloud_s3:get_object(Bucket, Key, UserConfig),
assert_whole_content(ExpectedContent, Obj).

assert_whole_content(ExpectedContent, ResultObj) ->
Content = proplists:get_value(content, ResultObj),
ContentLength = proplists:get_value(content_length, ResultObj),
?assertEqual(byte_size(ExpectedContent), list_to_integer(ContentLength)),
?assertEqual(byte_size(ExpectedContent), byte_size(Content)),
?assertEqual(ExpectedContent, Content).
Loading