[Kai-commits] SF.net SVN: kai:[129] trunk
Kai is a distributed key-value datastore
Status: Beta
Brought to you by:
takemaru
From: <coo...@us...> - 2009-03-22 05:48:12
|
Revision: 129 http://kai.svn.sourceforge.net/kai/?rev=129&view=rev Author: cooldaemon Date: 2009-03-22 05:48:10 +0000 (Sun, 22 Mar 2009) Log Message: ----------- Fixed the kai_tcp_server. Divided the source file of it. Added the monitor process. Modified Paths: -------------- trunk/src/Makefile trunk/src/kai_tcp_server.erl trunk/test/kai.coverspec Added Paths: ----------- trunk/src/kai_tcp_server_acceptor.erl trunk/src/kai_tcp_server_monitor.erl trunk/src/kai_tcp_server_sup.erl Modified: trunk/src/Makefile =================================================================== --- trunk/src/Makefile 2009-03-21 14:13:51 UTC (rev 128) +++ trunk/src/Makefile 2009-03-22 05:48:10 UTC (rev 129) @@ -22,7 +22,9 @@ SOURCES = \ kai_config kai_log kai_hash kai_store kai_store_ets kai_store_dets \ kai_stat kai_version kai_connection kai_sync kai_membership \ - kai_coordinator kai_tcp_server kai_rpc kai_memcache kai_sup kai vclock + kai_coordinator kai_tcp_server \ + kai_tcp_server_sup kai_tcp_server_acceptor kai_tcp_server_monitor \ + kai_rpc kai_memcache kai_sup kai vclock MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET) Modified: trunk/src/kai_tcp_server.erl =================================================================== --- trunk/src/kai_tcp_server.erl 2009-03-21 14:13:51 UTC (rev 128) +++ trunk/src/kai_tcp_server.erl 2009-03-22 05:48:10 UTC (rev 129) @@ -11,13 +11,10 @@ % the License. -module(kai_tcp_server). --behaviour(supervisor). -export([behaviour_info/1]). -export([start_link/1, start_link/2, start_link/3, start_link/4]). -export([stop/0, stop/1]). --export([init/1, acceptor_init/5]). --export([acceptor_start_link/5]). -include("kai.hrl"). @@ -25,146 +22,15 @@ behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}]; behaviour_info(_Other) -> undefined. -% Supervisor - tcp_server -%% External APIs +% External APIs start_link(Mod) -> start_link(Mod, []). start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}). start_link(Mod, Args, Option) -> start_link({local, ?MODULE}, Mod, Args, Option). start_link(Name, Mod, Args, Option) -> - supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]). + kai_tcp_server_sup:start_link(Name, Mod, Args, Option). stop() -> stop(?MODULE). stop(Name) -> - case whereis(Name) of - Pid when is_pid(Pid) -> - exit(Pid, normal), - ok; - _ -> not_started - end. + kai_tcp_server_sup:stop(Name). -%% Callbacks -init([Name, Mod, Args, Option]) -> - case Mod:init(Args) of - {ok, State} -> listen(State, Name, Mod, Option); - {stop, Reason} -> Reason; - Other -> Other % 'ignore' is contained. - end. - -%% Internal Functions -listen(State, Name, Mod, Option) -> - case gen_tcp:listen( - Option#tcp_server_option.port, - Option#tcp_server_option.listen - ) of - {ok, ListenSocket} -> - init_acceptors(ListenSocket, State, Name, Mod, Option); - {error, Reason} -> - ?warning(io_lib:format("listen(~p) ~p", [Mod, {error, Reason}])), - {stop, Reason} - end. - -init_acceptors(ListenSocket, State, {Dest, Name}, Mod, Option) -> - #tcp_server_option{ - max_processes = MaxProcesses, - max_restarts = MaxRestarts, - time = Time, - shutdown = Shutdown - } = Option, - {ok, {{one_for_one, MaxRestarts, Time}, lists:map( - fun (N) -> - AcceptorName = list_to_atom( - atom_to_list(Name) ++ "_acceptor_" ++ integer_to_list(N) - ), - { - AcceptorName, - { - ?MODULE, - acceptor_start_link, - [{Dest, AcceptorName}, ListenSocket, State, Mod, Option] - }, - permanent, - Shutdown, - worker, - [] - } - end, - lists:seq(1, MaxProcesses) - )}}. - -% ProcLib - tcp_acceptor_N -%% External APIs -acceptor_start_link({Dest, Name}, ListenSocket, State, Mod, Option) -> - {ok, Pid} = proc_lib:start_link( - ?MODULE, acceptor_init, [self(), ListenSocket, State, Mod, Option] - ), - case Dest of - local -> register(Name, Pid); - _Global -> global:register_name(Name, Pid) - end, - {ok, Pid}. - -%% Callbacks -acceptor_init(Parent, ListenSocket, State, Mod, Option) -> - proc_lib:init_ack(Parent, {ok, self()}), - acceptor_accept(ListenSocket, State, Mod, Option). - -acceptor_accept(ListenSocket, State, Mod, Option) -> - case gen_tcp:accept( - ListenSocket, Option#tcp_server_option.accept_timeout - ) of - {ok, Socket} -> - acceptor_loop( - proplists:get_value(active, Option#tcp_server_option.listen), - Socket, State, Mod, Option - ), - gen_tcp:close(Socket); - {error, Reason} -> - ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])), - timer:sleep(Option#tcp_server_option.accept_error_sleep_time) - end, - acceptor_accept(ListenSocket, State, Mod, Option). - -acceptor_loop(false, Socket, State, Mod, Option) -> - case gen_tcp:recv( - Socket, - Option#tcp_server_option.recv_length, - Option#tcp_server_option.recv_timeout - ) of - {ok, Data} -> - call_mod(false, Socket, Data, State, Mod, Option); - {error, closed} -> - tcp_closed; - {error, Reason} -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])), - error - end; - -acceptor_loop(true, _DummySocket, State, Mod, Option) -> - receive - {tcp, Socket, Data} -> - call_mod(true, Socket, Data, State, Mod, Option); - {tcp_closed, _Socket} -> - tcp_closed; - Error -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Error}])), - error - after Option#tcp_server_option.recv_timeout -> - tcp_timeout - end. - -call_mod(Active, Socket, Data, State, Mod, Option) -> - case Mod:handle_call(Socket, Data, State) of - {reply, DataToSend, State} -> - gen_tcp:send(Socket, DataToSend), - acceptor_loop(Active, Socket, State, Mod, Option); - {noreply, State} -> - acceptor_loop(Active, Socket, State, Mod, Option); - {close, State} -> - tcp_closed; - {close, DataToSend, State} -> - gen_tcp:send(Socket, DataToSend); - Other -> - ?warning(io_lib:format("call_mod(~p) ~p", [Mod, {unexpected_result, Other}])) - end. - Added: trunk/src/kai_tcp_server_acceptor.erl =================================================================== --- trunk/src/kai_tcp_server_acceptor.erl (rev 0) +++ trunk/src/kai_tcp_server_acceptor.erl 2009-03-22 05:48:10 UTC (rev 129) @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_acceptor). + +-export([start_link/6]). +-export([init/6]). + +-include("kai.hrl"). + +% External APIs +start_link({Dest, Name}, ListenSocket, State, MonitorName, Mod, Option) -> + {ok, Pid} = proc_lib:start_link( + ?MODULE, init, + [self(), ListenSocket, State, MonitorName, Mod, Option] + ), + case Dest of + local -> register(Name, Pid); + _Global -> global:register_name(Name, Pid) + end, + {ok, Pid}. + +% Callbacks +init(Parent, ListenSocket, State, MonitorName, Mod, Option) -> + proc_lib:init_ack(Parent, {ok, self()}), + kai_tcp_server_monitor:regist(MonitorName, self()), + accept(ListenSocket, State, MonitorName, Mod, Option). + +accept(ListenSocket, State, MonitorName, Mod, Option) -> + case gen_tcp:accept( + ListenSocket, Option#tcp_server_option.accept_timeout + ) of + {ok, Socket} -> + kai_tcp_server_monitor:increment(MonitorName, self()), + recv( + proplists:get_value(active, Option#tcp_server_option.listen), + Socket, State, Mod, Option + ), + kai_tcp_server_monitor:decrement(MonitorName, self()), + gen_tcp:close(Socket); + {error, Reason} -> + ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])), + timer:sleep(Option#tcp_server_option.accept_error_sleep_time) + end, + accept(ListenSocket, State, MonitorName, Mod, Option). + +recv(false, Socket, State, Mod, Option) -> + case gen_tcp:recv( + Socket, + Option#tcp_server_option.recv_length, + Option#tcp_server_option.recv_timeout + ) of + {ok, Data} -> + call_mod(false, Socket, Data, State, Mod, Option); + {error, closed} -> + tcp_closed; + {error, Reason} -> + ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])), + error + end; + +recv(true, _DummySocket, State, Mod, Option) -> + receive + {tcp, Socket, Data} -> + call_mod(true, Socket, Data, State, Mod, Option); + {tcp_closed, _Socket} -> + tcp_closed; + Error -> + ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Error}])), + error + after Option#tcp_server_option.recv_timeout -> + tcp_timeout + end. + +call_mod(Active, Socket, Data, State, Mod, Option) -> + case Mod:handle_call(Socket, Data, State) of + {reply, DataToSend, State} -> + gen_tcp:send(Socket, DataToSend), + recv(Active, Socket, State, Mod, Option); + {noreply, State} -> + recv(Active, Socket, State, Mod, Option); + {close, State} -> + tcp_closed; + {close, DataToSend, State} -> + gen_tcp:send(Socket, DataToSend); + Other -> + ?warning(io_lib:format("call_mod(~p) ~p", [Mod, {unexpected_result, Other}])) + end. + Added: trunk/src/kai_tcp_server_monitor.erl =================================================================== --- trunk/src/kai_tcp_server_monitor.erl (rev 0) +++ trunk/src/kai_tcp_server_monitor.erl 2009-03-22 05:48:10 UTC (rev 129) @@ -0,0 +1,92 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_monitor). +-behaviour(gen_server). + +-export([start_link/1, stop/1]). +-export([ + regist/2, + increment/2, decrement/2, + state/1 +]). +-export([ + init/1, + handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3 +]). + +-include("kai.hrl"). + +% External APIs +start_link(Name) -> + gen_server:start_link(Name, ?MODULE, [], []). + +stop(ServerRef) -> + gen_server:call(ServerRef, stop). + +regist(ServerRef, Pid) -> + gen_server:call(ServerRef, {regist, Pid}). + +increment(ServerRef, Pid) -> + gen_server:cast(ServerRef, {increment, Pid}). + +decrement(ServerRef, Pid) -> + gen_server:cast(ServerRef, {decrement, Pid}). + +state(ServerRef) -> + gen_server:call(ServerRef, state). + +% Callbacks +init(_Args) -> + {ok, {_MonitorRefs = [], _Pids = []}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +handle_call({regist, Pid}, _From, {MonitorRefs, Pids}) -> + {reply, ok, { + [erlang:monitor(process, Pid) | MonitorRefs], + Pids + }}; + +handle_call(state, _From, State) -> + {reply, State, State}; + +handle_call(_Message, _From, State) -> + {reply, ok, State}. + +handle_cast({increment, Pid}, {MonitorRefs, Pids}) -> + {noreply, {MonitorRefs, [Pid | Pids]}}; + +handle_cast({decrement, Pid}, {MonitorRefs, Pids}) -> + {noreply, {MonitorRefs, lists:delete(Pid, Pids)}}; + +handle_cast(_Message, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, {MonitorRefs, Pids}) -> + erlang:demonitor(MonitorRef), + {noreply, { + lists:delete(MonitorRef, MonitorRefs), + lists:delete(Pid, Pids) + }}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + Added: trunk/src/kai_tcp_server_sup.erl =================================================================== --- trunk/src/kai_tcp_server_sup.erl (rev 0) +++ trunk/src/kai_tcp_server_sup.erl 2009-03-22 05:48:10 UTC (rev 129) @@ -0,0 +1,128 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_sup). +-behaviour(supervisor). + +-export([start_link/4, stop/1]). +-export([init/1]). + +-include("kai.hrl"). + +% External APIs +start_link(Name, Mod, Args, Option) -> + supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]). + +stop(Name) -> + case whereis(Name) of + Pid when is_pid(Pid) -> + exit(Pid, normal), + ok; + _ -> not_started + end. + +% Callbacks +init([Name, Mod, Args, Option]) -> + case Mod:init(Args) of + {ok, State} -> listen(State, Name, Mod, Option); + {stop, Reason} -> Reason; + Other -> Other % 'ignore' is contained. + end. + +% Internal Functions +listen(State, Name, Mod, Option) -> + case gen_tcp:listen( + Option#tcp_server_option.port, + Option#tcp_server_option.listen + ) of + {ok, ListenSocket} -> + build_result(ListenSocket, State, Name, Mod, Option); + {error, Reason} -> + ?warning(io_lib:format("listen(~p) ~p", [Mod, {error, Reason}])), + {stop, Reason} + end. + +build_result(ListenSocket, State, {Dest, Name}, Mod, Option) -> + #tcp_server_option{ + max_restarts = MaxRestarts, + time = Time + } = Option, + MonitorName = build_monitor_name(Name), + {ok, { + {one_for_one, MaxRestarts, Time}, + [ + monitor_spec({Dest, MonitorName}) | + acceptor_specs( + ListenSocket, State, {Dest, Name}, MonitorName, Mod, Option + ) + ] + }}. + +monitor_spec({Dest, MonitorName}) -> + { + MonitorName, + { + kai_tcp_server_monitor, + start_link, + [{Dest, MonitorName}] + }, + permanent, + brutal_kill, + worker, + [] + }. + +acceptor_specs( + ListenSocket, State, {Dest, Name}, MonitorBaseName, Mod, Option +) -> + #tcp_server_option{ + max_processes = MaxProcesses, + shutdown = Shutdown + } = Option, + MonitorName = case Dest of + local -> MonitorBaseName; + _Global -> {Dest, MonitorBaseName} + end, + lists:map( + fun (N) -> + AcceptorName = build_acceptor_name(Name, N), + { + AcceptorName, + { + kai_tcp_server_acceptor, + start_link, + [ + {Dest, AcceptorName}, + ListenSocket, + State, + MonitorName, + Mod, + Option + ] + }, + permanent, + Shutdown, + worker, + [] + } + end, + lists:seq(1, MaxProcesses) + ). + +build_monitor_name(Prefix) -> + list_to_atom(atom_to_list(Prefix) ++ "_monitor"). + +build_acceptor_name(Prefix, Number) -> + list_to_atom( + atom_to_list(Prefix) ++ "_acceptor_" ++ integer_to_list(Number) + ). + Modified: trunk/test/kai.coverspec =================================================================== --- trunk/test/kai.coverspec 2009-03-21 14:13:51 UTC (rev 128) +++ trunk/test/kai.coverspec 2009-03-22 05:48:10 UTC (rev 129) @@ -2,6 +2,8 @@ {incl_mods, [ kai_config, kai_log, kai_hash, kai_store, kai_stat, kai_version, kai_connection, kai_sync, kai_membership, kai_coordinator, - kai_tcp_server, kai_rpc, kai_memcache + kai_tcp_server, + kai_tcp_server_sup, kai_tcp_server_acceptor, kai_tcp_server_monitor, + kai_rpc, kai_memcache ]}. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |