[Kai-commits] SF.net SVN: kai:[150] branches/takemaru_refactoring
Kai is a distributed key-value datastore
Status: Beta
Brought to you by:
takemaru
From: <tak...@us...> - 2009-06-02 15:06:35
|
Revision: 150 http://kai.svn.sourceforge.net/kai/?rev=150&view=rev Author: takemaru Date: 2009-06-02 15:06:32 +0000 (Tue, 02 Jun 2009) Log Message: ----------- * test/kai_connection_SUITE.erl - Added tests for the LRU algorithm. * test/kai_tcp_server_SUITE.erl - Decreased the number of connections used in multiple_connections test, because it would be short of TCP ports in some environments. * src/kai_store_ets.erl, src/kai_store_dets.erl - Replaced successful tags like {T, V} by {ok, V} * src/kai_sync.erl - Uses record for server status. Modified Paths: -------------- branches/takemaru_refactoring/src/kai_coordinator.erl branches/takemaru_refactoring/src/kai_membership.erl branches/takemaru_refactoring/src/kai_rpc.erl branches/takemaru_refactoring/src/kai_store_dets.erl branches/takemaru_refactoring/src/kai_store_ets.erl branches/takemaru_refactoring/src/kai_sync.erl branches/takemaru_refactoring/src/kai_tcp_server.erl branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl branches/takemaru_refactoring/src/kai_tcp_server_sup.erl branches/takemaru_refactoring/src/kai_version.erl branches/takemaru_refactoring/test/kai_connection_SUITE.erl branches/takemaru_refactoring/test/kai_membership_SUITE.erl branches/takemaru_refactoring/test/kai_rpc_SUITE.erl branches/takemaru_refactoring/test/kai_store_SUITE.erl branches/takemaru_refactoring/test/kai_sync_SUITE.erl branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl branches/takemaru_refactoring/test/kai_version_SUITE.erl Modified: branches/takemaru_refactoring/src/kai_coordinator.erl =================================================================== --- branches/takemaru_refactoring/src/kai_coordinator.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_coordinator.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_coordinator). @@ -19,6 +19,28 @@ -define(SERVER, ?MODULE). +route({_Type, Data} = Request) -> + Ref = make_ref(), + %% Though don't know the reason, application exits abnormally if it doesn't + %% spawn the process + spawn(?MODULE, start_route, [Request, self(), Ref]), + receive + {Ref, Result} -> Result + after ?TIMEOUT -> + ?warning(io_lib:format("route(~p): timeout", [Data#data.key])), + [] + end. + +start_route({_Type, Data} = Request, Pid, Ref) -> + LocalNode = kai_config:get(node), + {ok, Nodes} = kai_hash:find_nodes(Data#data.key), + Results = + case lists:member(LocalNode, Nodes) of + true -> dispatch(Request); + _ -> do_route(Request, Nodes) + end, + Pid ! {Ref, Results}. + dispatch({Type, Data} = _Request) -> case Type of get -> coordinate_get(Data); @@ -30,7 +52,7 @@ do_route(_Request, []) -> {error, ebusy}; do_route({_Type, Data} = Request, [Node|RestNodes]) -> - % TODO: introduce TTL, in order to avoid infinite loop + %% TODO: introduce TTL, in order to avoid infinite loop case kai_rpc:route(Node, Request) of {error, Reason} -> ?warning(io_lib:format("do_route(~p, ~p): ~p", @@ -40,35 +62,13 @@ Results end. -start_route({_Type, Data} = Request, Pid, Ref) -> - LocalNode = kai_config:get(node), - {ok, Nodes} = kai_hash:find_nodes(Data#data.key), - Results = - case lists:member(LocalNode, Nodes) of - true -> dispatch(Request); - _ -> do_route(Request, Nodes) - end, - Pid ! {Ref, Results}. - -route({_Type, Data} = Request) -> - Ref = make_ref(), - % Though don't know the reason, application exits abnormally if it doesn't - % spawn the process - spawn(?MODULE, start_route, [Request, self(), Ref]), - receive - {Ref, Result} -> Result - after ?TIMEOUT -> - ?warning(io_lib:format("route(~p): timeout", [Data#data.key])), - [] - end. - coordinate_get(Data) -> {ok, Bucket} = kai_hash:find_bucket(Data#data.key), {ok, Nodes } = kai_hash:find_nodes(Bucket), Data2 = Data#data{bucket=Bucket}, Ref = make_ref(), lists:foreach( - fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, % Don't link + fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, %% Don't link Nodes ), {N,R,_W} = kai_config:get(quorum), Modified: branches/takemaru_refactoring/src/kai_membership.erl =================================================================== --- branches/takemaru_refactoring/src/kai_membership.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_membership.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_membership). -behaviour(gen_fsm). @@ -22,13 +22,15 @@ -include("kai.hrl"). +-record(state, {node}). + -define(SERVER, ?MODULE). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). init(_Args) -> - {ok, ready, [], ?TIMER}. + {ok, ready, #state{node = kai_config:get(node)}, ?TIMER}. terminate(_Reason, _StateName, _StateData) -> ok. @@ -44,45 +46,40 @@ ping_nodes(Nodes, AvailableNodes, [Node|DownNodes]) end. -retrieve_node_list(Node) -> +retrieve_node_list(Node, State) -> case kai_rpc:node_list(Node) of {ok, RemoteNodeList} -> {ok, LocalNodeList} = kai_hash:node_list(), NewNodes = RemoteNodeList -- LocalNodeList, OldNodes = LocalNodeList -- RemoteNodeList, Nodes = NewNodes ++ OldNodes, - LocalNode = kai_config:get(node), - ping_nodes(Nodes -- [LocalNode], [], []); + ping_nodes(Nodes -- [State#state.node], [], []); {error, Reason} -> ?warning(io_lib:format("retrieve_node_list/1: ~p", [{error, Reason}])), {[], [Node]} end. -sync_buckets([], _LocalNode) -> +sync_buckets([]) -> ok; -sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets], LocalNode) -> +sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets]) -> case {NewReplica, OldReplica} of {NewReplica, undefined } -> kai_sync:update_bucket(Bucket); {undefined, OldReplica} -> kai_sync:delete_bucket(Bucket); _ -> nop end, - sync_buckets(ReplacedBuckets, LocalNode). + sync_buckets(ReplacedBuckets). -sync_buckets(ReplacedBuckets) -> - LocalNode = kai_config:get(node), - sync_buckets(ReplacedBuckets, LocalNode). - -do_check_node({Address, Port}) -> - {AvailableNodes, DownNodes} = retrieve_node_list({Address, Port}), +do_check_node({IpAddr, Port}, State) -> + {AvailableNodes, DownNodes} = retrieve_node_list({IpAddr, Port}, State), {ok, ReplacedBuckets} = kai_hash:update_nodes(AvailableNodes, DownNodes), sync_buckets(ReplacedBuckets). ready({check_node, Node}, State) -> - do_check_node(Node), + do_check_node(Node, State), {next_state, ready, State, ?TIMER}; ready(timeout, State) -> case kai_hash:choose_node_randomly() of - {node, Node} -> do_check_node(Node); + {node, Node} -> do_check_node(Node, State); _ -> nop end, {next_state, ready, State, ?TIMER}. Modified: branches/takemaru_refactoring/src/kai_rpc.erl =================================================================== --- branches/takemaru_refactoring/src/kai_rpc.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_rpc.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_rpc). -behaviour(kai_tcp_server). @@ -24,6 +24,8 @@ -include("kai.hrl"). +-record(state, {node_info}). + start_link() -> kai_tcp_server:start_link( {local, ?MODULE}, @@ -38,7 +40,8 @@ stop() -> kai_tcp_server:stop(?MODULE). -init(_Args) -> {ok, {}}. +init(_Args) -> + {ok, #state{node_info = kai_config:node_info()}}. handle_call(Socket, Data, State) -> dispatch(Socket, binary_to_term(Data), State). @@ -47,7 +50,7 @@ reply(ok, State); dispatch(_Socket, node_info, State) -> - reply(kai_config:node_info(), State); + reply(State#state.node_info, State); dispatch(_Socket, node_list, State) -> reply(kai_hash:node_list(), State); @@ -85,9 +88,9 @@ {error, Reason} -> {error, Reason} - % Don't place Other alternative here. This is to avoid to catch event - % messages, '$gen_event' or something like that. Remember that this - % function can be called from gen_fsm/gen_event. + %% Don't place Other alternative here. This is to avoid to catch event + %% messages, '$gen_event' or something like that. Remember that this + %% function can be called from gen_fsm/gen_event. after ?TIMEOUT -> {error, timeout} @@ -126,7 +129,7 @@ end. is_local_node(Node) -> - LocalNode = kai_config:get(node), + LocalNode = kai_config:get(node), %% TODO: Don't call kai_config:get/1 every time. Node =:= LocalNode. ok(Node) -> Modified: branches/takemaru_refactoring/src/kai_store_dets.erl =================================================================== --- branches/takemaru_refactoring/src/kai_store_dets.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_store_dets.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -73,8 +73,8 @@ vector_clocks = '$3', checksum = '$4' }}], - ListOfData = dets:select(Table, [{Head, Cond, Body}]), - {reply, {list_of_data, ListOfData}, State}. + KeyList = dets:select(Table, [{Head, Cond, Body}]), + {reply, {ok, KeyList}, State}. do_get(#data{key=Key, bucket=Bucket} = _Data, State) -> Table = bucket_to_table(Bucket, State), Modified: branches/takemaru_refactoring/src/kai_store_ets.erl =================================================================== --- branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -50,8 +50,8 @@ vector_clocks = '$3', checksum = '$4' }}], - ListOfData = ets:select(?MODULE, [{Head, Cond, Body}]), - {reply, {list_of_data, ListOfData}, State}. + KeyList = ets:select(?MODULE, [{Head, Cond, Body}]), + {reply, {ok, KeyList}, State}. do_get(#data{key=Key} = _Data, State) -> case ets:lookup(?MODULE, Key) of Modified: branches/takemaru_refactoring/src/kai_sync.erl =================================================================== --- branches/takemaru_refactoring/src/kai_sync.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_sync.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_sync). -behaviour(gen_fsm). @@ -22,13 +22,15 @@ -include("kai.hrl"). +-record(state, {node}). + -define(SERVER, ?MODULE). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). init(_Args) -> - {ok, ready, [], ?TIMER}. + {ok, ready, #state{node = kai_config:get(node)}, ?TIMER}. terminate(_Reason, _StateName, _StateData) -> ok. @@ -56,17 +58,15 @@ {error, enodata}; do_update_bucket(Bucket, [Node|Rest]) -> case kai_rpc:list(Node, Bucket) of - {list_of_data, ListOfData} -> - retrieve_data(Node, ListOfData); + {ok, KeyList} -> + retrieve_data(Node, KeyList); {error, Reason} -> ?warning(io_lib:format("do_update_bucket/2: ~p", [{error, Reason}])), do_update_bucket(Bucket, Rest) - end. - -do_update_bucket(Bucket) -> + end; +do_update_bucket(Bucket, State) when is_record(State, state) -> {ok, Nodes} = kai_hash:find_nodes(Bucket), - LocalNode = kai_config:get(node), - do_update_bucket(Bucket, Nodes -- [LocalNode]). + do_update_bucket(Bucket, Nodes -- [State#state.node]). do_delete_bucket([]) -> ok; @@ -74,19 +74,19 @@ kai_store:delete(Metadata), do_delete_bucket(Rest); do_delete_bucket(Bucket) -> - {list_of_data, ListOfData} = kai_store:list(Bucket), - do_delete_bucket(ListOfData). + {ok, KeyList} = kai_store:list(Bucket), + do_delete_bucket(KeyList). ready({update_bucket, Bucket}, State) -> - do_update_bucket(Bucket), + do_update_bucket(Bucket, State), {next_state, ready, State, ?TIMER}; ready({delete_bucket, Bucket}, State) -> do_delete_bucket(Bucket), {next_state, ready, State, ?TIMER}; ready(timeout, State) -> case kai_hash:choose_bucket_randomly() of - {ok, Bucket} -> do_update_bucket(Bucket); - _ -> nop + {ok, Bucket} -> do_update_bucket(Bucket, State); + _ -> nop end, {next_state, ready, State, ?TIMER}. Modified: branches/takemaru_refactoring/src/kai_tcp_server.erl =================================================================== --- branches/takemaru_refactoring/src/kai_tcp_server.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_tcp_server.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_tcp_server). @@ -19,11 +19,11 @@ -include("kai.hrl"). -% Behaviour Callbacks +%% Behaviour Callbacks behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}]; behaviour_info(_Other) -> undefined. -% External APIs +%% External APIs start_link(Mod) -> start_link(Mod, []). start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}). start_link(Mod, Args, Option) -> @@ -40,4 +40,3 @@ kai_tcp_server_monitor:info( kai_tcp_server_sup:build_monitor_name(Name), Key ). - Modified: branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl =================================================================== --- branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_tcp_server_acceptor). @@ -17,7 +17,7 @@ -include("kai.hrl"). -% External APIs +%% External APIs start_link({Dest, Name}, ListenSocket, State, MonitorName, Mod, Option) -> {ok, Pid} = proc_lib:start_link( ?MODULE, init, @@ -29,13 +29,13 @@ end, {ok, Pid}. -% Callbacks +%% Callbacks init(Parent, ListenSocket, State, MonitorName, Mod, Option) -> proc_lib:init_ack(Parent, {ok, self()}), kai_tcp_server_monitor:register(MonitorName, self()), accept(ListenSocket, State, MonitorName, Mod, Option). -% Internal Functions +%% Internal Functions accept(ListenSocket, State, MonitorName, Mod, Option) -> case gen_tcp:accept( ListenSocket, Option#tcp_server_option.accept_timeout Modified: branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl =================================================================== --- branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_tcp_server_monitor). -behaviour(gen_server). @@ -27,7 +27,7 @@ -include("kai.hrl"). -% External APIs +%% External APIs start_link(Name) -> gen_server:start_link(Name, ?MODULE, [], []). @@ -46,7 +46,7 @@ info(ServerRef, Key) -> gen_server:call(ServerRef, {info, Key}). -% Callbacks +%% Callbacks init(_Args) -> {ok, {_MonitorRefs = [], _Pids = []}}. @@ -90,7 +90,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -% Internal Functions +%% Internal Functions state_to_info({_MonitorRefs, Pids}, curr_connections) -> length(Pids); Modified: branches/takemaru_refactoring/src/kai_tcp_server_sup.erl =================================================================== --- branches/takemaru_refactoring/src/kai_tcp_server_sup.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_tcp_server_sup.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -1,14 +1,14 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. -module(kai_tcp_server_sup). -behaviour(supervisor). @@ -19,7 +19,7 @@ -include("kai.hrl"). -% External APIs +%% External APIs start_link(Name, Mod, Args, Option) -> supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]). @@ -31,15 +31,15 @@ _ -> not_started end. -% Callbacks +%% Callbacks init([Name, Mod, Args, Option]) -> case Mod:init(Args) of {ok, State} -> listen(State, Name, Mod, Option); {stop, Reason} -> Reason; - Other -> Other % 'ignore' is contained. + Other -> Other %% Includes 'ignore'. end. -% Internal Functions +%% Internal Functions listen(State, Name, Mod, Option) -> case gen_tcp:listen( Option#tcp_server_option.port, Modified: branches/takemaru_refactoring/src/kai_version.erl =================================================================== --- branches/takemaru_refactoring/src/kai_version.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/src/kai_version.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -59,25 +59,25 @@ end end. -order(ListOfData, State) when is_list(ListOfData) -> - OrderedData = do_order(ListOfData, []), +order(DataList, State) when is_list(DataList) -> + OrderedData = do_order(DataList, []), {reply, OrderedData, State}; order(_Other, State) -> {reply, undefined, State}. %% TODO: raise error if length > 15(=2#1111) -cas_unique(ListOfData) when length(ListOfData) > 2#1111 -> - {error, lists:flatten(io_lib:format("data list is too long (~p)", [length(ListOfData)]))}; -cas_unique(ListOfData) -> - Length = length(ListOfData), +cas_unique(DataList) when length(DataList) > 2#1111 -> + {error, lists:flatten(io_lib:format("data list is too long (~p)", [length(DataList)]))}; +cas_unique(DataList) -> + Length = length(DataList), EachBits = trunc(60/Length), %% TODO: make 128 contant 2008/11/06 by shino RestBits = 128- EachBits, cas_unique(lists:map(fun (Data) -> <<CheckSum:EachBits, _:RestBits>> = Data#data.checksum, CheckSum - end, ListOfData), + end, DataList), EachBits, Length, 4). @@ -94,8 +94,8 @@ {stop, normal, stopped, State}; handle_call({update, Data}, _From, State) -> update(Data, State); -handle_call({order, ListOfData}, _From, State) -> - order(ListOfData, State). +handle_call({order, DataList}, _From, State) -> + order(DataList, State). handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Info, State) -> @@ -107,5 +107,5 @@ gen_server:call(?SERVER, stop). update(Data) -> gen_server:call(?SERVER, {update, Data}). -order(ListOfData) -> - gen_server:call(?SERVER, {order, ListOfData}). +order(DataList) -> + gen_server:call(?SERVER, {order, DataList}). Modified: branches/takemaru_refactoring/test/kai_connection_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -17,11 +17,14 @@ -include("kai.hrl"). -include("kai_test.hrl"). +-record(connection, {node, available, socket}). %% Defined in src/kai_connection.erl + -define(UNKNOWN, {{127,0,0,1}, 1}). -define(MAX_CONNS, 4). sequences() -> - [{seq, [lease, return, close, unknown, multiple_processes, max_connections]}]. + [{seq, [lease, return, close, unknown, multiple_processes, max_connections, + lru]}]. all() -> [{sequence, seq}]. @@ -107,17 +110,31 @@ ), %% The number of connections can be greater than MAX_CONNS when all - %% connections are in use + %% connections are in use. {ok, Conns} = kai_connection:connections(), ?assertEqual(?MAX_CONNS + 1, length(Conns)), %% The number of connections equals to MAX_CONNS, because a connection has - %% been returned + %% been returned. [Socket|_] = Sockets, ok = kai_connection:return(Socket), {ok, Conns2} = kai_connection:connections(), ?assertEqual(?MAX_CONNS, length(Conns2)). +lru(_Conf) -> + {ok, _Socket} = kai_connection:lease(?NODE2, self()), + {ok, Socket2} = kai_connection:lease(?NODE2, self()), + {ok, Socket3} = kai_connection:lease(?NODE2, self()), + + %% Here, the connections are ordered like 3, 2, and 1. + {ok, [Conn|_]} = kai_connection:connections(), + Socket3 = Conn#connection.socket, + + %% Socket2 is moved to the head. + ok = kai_connection:return(Socket2), + {ok, [Conn2|_]} = kai_connection:connections(), + Socket2 = Conn2#connection.socket. + echo_start(Port) -> {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 4}, {reuseaddr, true}]), Modified: branches/takemaru_refactoring/test/kai_membership_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -230,7 +230,7 @@ count(_Node, 0, DataNum) -> DataNum; count(Node, Bucket, DataNum) -> - {list_of_data, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]), + {ok, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]), count(Node, Bucket-1, DataNum + length(KeyList)). %% TODO: Check whether data is synchronized Modified: branches/takemaru_refactoring/test/kai_rpc_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -77,11 +77,8 @@ Data = rpc:call(Node1, kai_rpc, get, [?NODE2, #data{key="key1", bucket=3}]), - {list_of_data, [Data2]} = rpc:call(Node1, kai_rpc, list, [?NODE2, 3]), - ?assertEqual( - "key1", - Data2#data.key - ), + {ok, [Key]} = rpc:call(Node1, kai_rpc, list, [?NODE2, 3]), + "key1" = Key#data.key, ok = rpc:call(Node1, kai_rpc, delete, [?NODE2, #data{key="key1", bucket=3}]). Modified: branches/takemaru_refactoring/test/kai_store_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -70,9 +70,8 @@ Data = kai_store:get(#data{key="key1", bucket=3}), undefined = kai_store:get(#data{key="key2", bucket=1}), - {list_of_data, KeyList} = kai_store:list(3), - ?assertEqual(1, length(KeyList)), - ?assert(lists:keymember("key1", 2, KeyList)), + {ok, [Key]} = kai_store:list(3), + "key1" = Key#data.key, Data2 = #data{ key = "key1", Modified: branches/takemaru_refactoring/test/kai_sync_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_sync_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_sync_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -46,16 +46,14 @@ {ok, _ReplacedBuckets} = rpc:call(Node1, kai_hash, update_nodes, [[{?NODE2, ?INFO}], []]), - {list_of_data, KeyList} = rpc:call(Node1, kai_store, list, [3]), - ?assertEqual([], KeyList), + {ok, []} = rpc:call(Node1, kai_store, list, [3]), %% Node1 tries to synchronize Bucket3 with Node2 ok = rpc:call(Node1, kai_sync, update_bucket, [3]), wait(), - {list_of_data, KeyList2} = rpc:call(Node1, kai_store, list, [3]), - ?assertEqual(1, length(KeyList2)). + {ok, [_]} = rpc:call(Node1, kai_store, list, [3]). sync_by_timeout(Conf) -> Node1 = ?config(node1, Conf), @@ -74,8 +72,7 @@ {ok, _ReplacedBuckets} = rpc:call(Node1, kai_hash, update_nodes, [[{?NODE2, ?INFO}], []]), - {list_of_data, KeyList} = rpc:call(Node1, kai_store, list, [3]), - ?assertEqual([], KeyList), + {ok, []} = rpc:call(Node1, kai_store, list, [3]), %% Node1 tries to synchronize Bucket3 every TIMER sec. check(Node1, 16). @@ -84,7 +81,7 @@ ct:fail(never_synchronized); check(Node, I) -> timer:sleep(?TIMER), - {list_of_data, KeyList} = rpc:call(Node, kai_store, list, [3]), + {ok, KeyList} = rpc:call(Node, kai_store, list, [3]), case KeyList of [_] -> ok; [] -> check(Node, I-1) Modified: branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -67,7 +67,7 @@ lists:foreach(fun (_N) -> {ok, Socket} = connect_to_echo_server(), gen_tcp:close(Socket) - end, lists:seq(1, 10000)). + end, lists:seq(1, 1024)). connection_counter(_Conf) -> Sockets = lists:map(fun (_N) -> Modified: branches/takemaru_refactoring/test/kai_version_SUITE.erl =================================================================== --- branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149) +++ branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150) @@ -50,22 +50,20 @@ vector_clocks = VClock1 }, - %% trivial case + %% Trivial case ?assertEqual(1, length(kai_version:order([Data1]))), - %% two concurrent data + %% Two concurrent data Data2 = Data1#data{ vector_clocks = vclock:increment(otherNode, vclock:fresh()) }, - ListOfData12 = kai_version:order([Data1, Data2]), - ?assertEqual(2, length(ListOfData12)), + [_,_] = kai_version:order([Data1, Data2]), - %% one data is dropped + %% One data is dropped Data3 = Data1#data{ vector_clocks = vclock:increment(otherNode2, Data1#data.vector_clocks) }, - ListOfData23 = kai_version:order([Data1, Data2, Data3]), - ?assertEqual(2, length(ListOfData23)). + [_,_] = kai_version:order([Data1, Data2, Data3]). cas_unique1(_Conf) -> Data1 = #data{ @@ -88,21 +86,21 @@ cas_unique7(_Conf) -> %% trunc(60/7) = 8 - ListOfData = lists:map(fun (I) -> + DataList = lists:map(fun (I) -> #data{checksum = <<I:8, 0:120>>} end, lists:seq(1,7)), - {ok, CasUnique} = kai_version:cas_unique(ListOfData), + {ok, CasUnique} = kai_version:cas_unique(DataList), Expected = <<7:4, 1:8, 2:8, 3:8, 4:8, 5:8, 6:8, 7:8, 0:4>>, ?assertEqual(Expected, CasUnique). cas_unique16(_Conf) -> %% 16 exceeds 4bit range (2#1111 = 15) - ListOfData = lists:map(fun (I) -> + DataList = lists:map(fun (I) -> #data{checksum = <<I:4, 0:60>>} end, lists:seq(1,16)), - {error, Reason} = kai_version:cas_unique(ListOfData), + {error, Reason} = kai_version:cas_unique(DataList), ?assert(string:str(Reason, "16") > 0). all_bit_on(Bytes) -> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |