Thread: [Kai-commits] SF.net SVN: kai:[115] branches/cooldaemon_count_current_tcp_connections
Kai is a distributed key-value datastore
Status: Beta
Brought to you by:
takemaru
From: <coo...@us...> - 2009-03-06 15:38:59
|
Revision: 115 http://kai.svn.sourceforge.net/kai/?rev=115&view=rev Author: cooldaemon Date: 2009-03-06 15:38:51 +0000 (Fri, 06 Mar 2009) Log Message: ----------- Merge branch 'master' into count_current_tcp_connections Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/Makefile branches/cooldaemon_count_current_tcp_connections/src/Makefile branches/cooldaemon_count_current_tcp_connections/src/kai.app.src branches/cooldaemon_count_current_tcp_connections/src/kai_hash.erl branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl branches/cooldaemon_count_current_tcp_connections/src/kai_store.erl branches/cooldaemon_count_current_tcp_connections/src/kai_store_dets.erl branches/cooldaemon_count_current_tcp_connections/src/kai_store_ets.erl branches/cooldaemon_count_current_tcp_connections/src/kai_sup.erl branches/cooldaemon_count_current_tcp_connections/test/Makefile branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec branches/cooldaemon_count_current_tcp_connections/test/kai_hash_SUITE.erl branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl branches/cooldaemon_count_current_tcp_connections/test/kai_store_SUITE.erl Added Paths: ----------- branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl Modified: branches/cooldaemon_count_current_tcp_connections/Makefile =================================================================== --- branches/cooldaemon_count_current_tcp_connections/Makefile 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/Makefile 2009-03-06 15:38:51 UTC (rev 115) @@ -9,7 +9,7 @@ ## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ## License for the specific language governing permissions and limitations ## under the License. -KAI_VSN=0.2.0 +KAI_VSN=0.3.0 ifndef ROOT ROOT=$(shell pwd) Modified: branches/cooldaemon_count_current_tcp_connections/src/Makefile =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/Makefile 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/Makefile 2009-03-06 15:38:51 UTC (rev 115) @@ -21,8 +21,8 @@ SOURCES = \ kai_config kai_log kai_hash kai_store kai_store_ets kai_store_dets \ - kai_version kai_connection kai_sync kai_membership kai_coordinator \ - kai_tcp_server kai_rpc kai_memcache kai_sup kai vclock + kai_stat kai_version kai_connection kai_sync kai_membership \ + kai_coordinator kai_tcp_server kai_rpc kai_memcache kai_sup kai vclock MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET) Modified: branches/cooldaemon_count_current_tcp_connections/src/kai.app.src =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai.app.src 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai.app.src 2009-03-06 15:38:51 UTC (rev 115) @@ -3,12 +3,12 @@ {vsn, "%VSN%"}, {modules, [ kai, kai_sup, kai_memcache, kai_rpc, kai_tcp_server, kai_coordinator, - kai_membership, kai_sync, kai_connection, kai_version, kai_store, - kai_hash, kai_log, kai_config + kai_membership, kai_sync, kai_connection, kai_version, kai_stat, + kai_store, kai_hash, kai_log, kai_config ]}, {registered, [ - kai_sup, kai_membership, kai_sync, kai_connection, kai_version, kai_store, - kai_hash, kai_log, kai_config + kai_sup, kai_membership, kai_sync, kai_connection, kai_version, + kai_stat, kai_store, kai_hash, kai_log, kai_config ]}, {applications, [kernel, stdlib]}, {mod, {kai, []}}, Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_hash.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_hash.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_hash.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -17,7 +17,8 @@ -export([ update_nodes/2, find_bucket/1, find_replica/1, find_nodes/1, choose_node_randomly/0, choose_bucket_randomly/0, - node_info/1, node_info/0, node_list/0, virtual_node_list/0, buckets/0 + node_info/1, node_info/0, node_list/0, virtual_node_list/0, + bucket_list/0, buckets/0 ]). -export([ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -261,10 +262,20 @@ VirtualNodeList = ets:tab2list(virtual_node_list), {reply, {virtual_node_list, VirtualNodeList}, State}. -buckets(State) -> +bucket_list(State) -> Buckets = ets:tab2list(buckets), - {reply, {buckets, Buckets}, State}. + {reply, {bucket_list, lists:sort(Buckets)}, State}. +buckets(State) -> + LocalNode = kai_config:get(node), + Buckets = + lists:filter( + fun(B) -> lists:member(LocalNode, element(2, B)) end, + ets:tab2list(buckets) + ), + Buckets2 = [element(1, B) || B <- Buckets], + {reply, {buckets, lists:sort(Buckets2)}, State}. + handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({update_nodes, NodesToAdd, NodesToRemove}, _From, State) -> @@ -287,6 +298,8 @@ node_list(State); handle_call(virtual_node_list, _From, State) -> virtual_node_list(State); +handle_call(bucket_list, _From, State) -> + bucket_list(State); handle_call(buckets, _From, State) -> buckets(State). handle_cast(_Msg, State) -> @@ -318,5 +331,7 @@ gen_server:call(?SERVER, node_list). virtual_node_list() -> gen_server:call(?SERVER, virtual_node_list). +bucket_list() -> + gen_server:call(?SERVER, bucket_list). buckets() -> gen_server:call(?SERVER, buckets). Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -68,6 +68,16 @@ dispatch(_Socket, ["delete", _Key, _Time, "noreply"], State) -> {reply, <<"CLIENT_ERROR noreply not supported.\r\n">>, State}; +dispatch(_Socket, ["stats"], State) -> + Response = + lists:map( + fun({Name, Value}) -> + ["STAT " ++ atom_to_list(Name) ++ " " ++ Value ++ "\r\n"] + end, + kai_stat:all() + ), + {reply, [Response|"END\r\n"], State}; + dispatch(_Socket, ["quit"], _State) -> quit; dispatch(_Socket, _Unknown, State) -> @@ -78,6 +88,8 @@ Data when is_list(Data) -> {ok, CasUniqueInBinary} = kai_version:cas_unique(Data), Response = get_response(Data, WithCasUnique, CasUniqueInBinary), + kai_stat:incr_cmd_get(), + kai_stat:add_bytes_read(Data), {reply, [Response|"END\r\n"], State}; undefined -> {reply, <<"END\r\n">>, State}; @@ -110,6 +122,8 @@ ) of ok -> gen_tcp:send(Socket, <<"STORED\r\n">>), + kai_stat:incr_cmd_set(), + kai_stat:add_bytes_write(#data{value=Value}), {noreply, State}; _Other -> send_error_and_close("Failed to write.", State) Added: branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl (rev 0) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -0,0 +1,189 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_stat). +-behaviour(gen_server). + +-export([start_link/0, stop/0]). +-export([all/0, incr_cmd_get/0, incr_cmd_set/0, add_bytes_read/1, + add_bytes_write/1]). +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3 + ]). + +-include("kai.hrl"). +-record(state, { + boot_time, cmd_get, cmd_set, bytes_read, bytes_write, + node, quorum, number_of_buckets, number_of_virtual_nodes, store +}). + +-define(SERVER, ?MODULE). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). + +init(_Args) -> + {Msec, Sec, _Usec} = now(), + BootTime = 1000000 * Msec + Sec, + [LocalNode, N, R, W, NumberOfBuckets, NumberOfVirtualNodes, Store] = + kai_config:get([ + node, n, r, w, number_of_buckets, number_of_virtual_nodes, store + ]), + Quorum = join(",", [integer_to_list(X) || X <- [N, R, W]]), + {ok, #state{ + boot_time = BootTime, + cmd_get = 0, + cmd_set = 0, + bytes_read = 0, + bytes_write = 0, + node = LocalNode, + quorum = Quorum, + number_of_buckets = NumberOfBuckets, + number_of_virtual_nodes = NumberOfVirtualNodes, + store = Store + }}. + +terminate(_Reason, _State) -> + ok. + +join(_Delimiter, [], Acc) -> + Acc; +join(_Delimiter, [Token], Acc) -> + Acc ++ Token; +join(Delimiter, [Token|Rest], Acc) -> + join(Delimiter, Rest, Acc ++ Token ++ Delimiter). +join(Delimiter, List) -> + join(Delimiter, List, []). + +node_to_list({{A1,A2,A3,A4}, Port}) -> + Addr = join(".", [integer_to_list(X) || X <- [A1,A2,A3,A4]]), + Addr ++ ":" ++ integer_to_list(Port). + +stat(Name, State) -> + case Name of + uptime -> + {Msec, Sec, _Usec} = now(), + Uptime = 1000000 * Msec + Sec - State#state.boot_time, + {uptime, integer_to_list(Uptime)}; + time -> + {Msec, Sec, _Usec} = now(), + Time = 1000000 * Msec + Sec, + {time, integer_to_list(Time)}; + version -> + Version = + case application:get_key(kai, vsn) of + {ok, V} -> V; + _ -> "0" + end, + {version, Version}; + bytes -> + Bytes = kai_store:info(bytes), + {bytes, integer_to_list(Bytes)}; + curr_items -> + Size = kai_store:info(size), + {curr_items, integer_to_list(Size)}; + cmd_get -> + {cmd_get, integer_to_list(State#state.cmd_get)}; + cmd_set -> + {cmd_set, integer_to_list(State#state.cmd_set)}; + bytes_read -> + {bytes_read, integer_to_list(State#state.bytes_read)}; + bytes_write -> + {bytes_write, integer_to_list(State#state.bytes_write)}; + kai_node -> + {kai_node, node_to_list(State#state.node)}; + kai_quorum -> + {kai_quorum, State#state.quorum}; + kai_number_of_buckets -> + NumberOfBuckets = State#state.number_of_buckets, + {kai_number_of_buckets, integer_to_list(NumberOfBuckets)}; + kai_number_of_virtual_nodes -> + NumberOfVirtualNodes = State#state.number_of_virtual_nodes, + {kai_number_of_virtual_nodes, + integer_to_list(NumberOfVirtualNodes)}; + kai_store -> + {kai_store, atom_to_list(State#state.store)}; + kai_curr_nodes -> + {node_list, Nodes} = kai_hash:node_list(), + NodesInList = lists:sort([node_to_list(N) || N <- Nodes]), + {kai_curr_nodes, join(" ", NodesInList)}; +% kai_curr_buckets -> +% {buckets, Buckets} = kai_hash:buckets(), +% {kai_curr_buckets, join(" ", [integer_to_list(B) || B <- Buckets])}; + erlang_procs -> + ErlangProcs = erlang:system_info(process_count), + {erlang_procs, integer_to_list(ErlangProcs)}; + erlang_version -> + {erlang_version, erlang:system_info(version)} + end. + +all(State) -> + Stats = + lists:map( + fun(Name) -> stat(Name, State) end, + [uptime, time, version, bytes, curr_items, cmd_get, cmd_set, + bytes_read, bytes_write, + kai_node, kai_quorum, kai_number_of_buckets, + kai_number_of_virtual_nodes, kai_store, kai_curr_nodes, + %kai_curr_buckets, + erlang_procs, erlang_version] + ), + {reply, Stats, State}. + +incr_cmd_get(State) -> + State2 = State#state{cmd_get = State#state.cmd_get + 1}, + {noreply, State2}. + +incr_cmd_set(State) -> + State2 = State#state{cmd_set = State#state.cmd_set + 1}, + {noreply, State2}. + +add_bytes_read(Data, State) -> + Bytes = lists:sum([byte_size(D#data.value) || D <- Data]), + State2 = State#state{bytes_read = State#state.bytes_read + Bytes}, + {noreply, State2}. + +add_bytes_write(Data, State) -> + Bytes = byte_size(Data#data.value), + State2 = State#state{bytes_write = State#state.bytes_write + Bytes}, + {noreply, State2}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call(all, _From, State) -> + all(State). +handle_cast(incr_cmd_get, State) -> + incr_cmd_get(State); +handle_cast(incr_cmd_set, State) -> + incr_cmd_set(State); +handle_cast({add_bytes_read, Data}, State) -> + add_bytes_read(Data, State); +handle_cast({add_bytes_write, Data}, State) -> + add_bytes_write(Data, State). +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +stop() -> + gen_server:call(?SERVER, stop). +all() -> + gen_server:call(?SERVER, all). +incr_cmd_get() -> + gen_server:cast(?SERVER, incr_cmd_get). +incr_cmd_set() -> + gen_server:cast(?SERVER, incr_cmd_set). +add_bytes_read(Data) -> + gen_server:cast(?SERVER, {add_bytes_read, Data}). +add_bytes_write(Data) -> + gen_server:cast(?SERVER, {add_bytes_write, Data}). Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_store.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_store.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_store.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -13,7 +13,7 @@ -module(kai_store). -export([start_link/0, stop/0]). --export([list/1, get/1, put/1, delete/1]). +-export([list/1, get/1, put/1, delete/1, info/1]). -include("kai.hrl"). @@ -34,3 +34,5 @@ gen_server:call(?SERVER, {put, Data}). delete(Data) -> gen_server:call(?SERVER, {delete, Data}). +info(Name) -> + gen_server:call(?SERVER, {info, Name}). Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_store_dets.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_store_dets.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_store_dets.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -20,6 +20,7 @@ ]). -include("kai.hrl"). +-record(state, {number_of_tables, tables}). start_link(Server) -> gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []). @@ -40,22 +41,21 @@ end, lists:seq(1, NumberOfTables) ), - {ok, Tables}. + {ok, #state{number_of_tables = NumberOfTables, tables = Tables}}. -terminate(_Reason, Tables) -> +terminate(_Reason, State) -> lists:foreach( fun({_I, Table}) -> dets:close(Table) end, - Tables + State#state.tables ), ok. -bucket_to_table(Bucket, Tables) -> - NumberOfTables = kai_config:get(number_of_tables), - I = Bucket rem NumberOfTables + 1, - proplists:get_value(I, Tables). +bucket_to_table(Bucket, State) -> + I = Bucket rem State#state.number_of_tables + 1, + proplists:get_value(I, State#state.tables). -do_list(Bucket, Tables) -> - Table = bucket_to_table(Bucket, Tables), +do_list(Bucket, State) -> + Table = bucket_to_table(Bucket, State), Head = #data{ key = '$1', bucket = Bucket, @@ -74,42 +74,56 @@ checksum = '$4' }}], ListOfData = dets:select(Table, [{Head, Cond, Body}]), - {reply, {list_of_data, ListOfData}, Tables}. + {reply, {list_of_data, ListOfData}, State}. -do_get(#data{key=Key, bucket=Bucket} = _Data, Tables) -> - Table = bucket_to_table(Bucket, Tables), +do_get(#data{key=Key, bucket=Bucket} = _Data, State) -> + Table = bucket_to_table(Bucket, State), case dets:lookup(Table, Key) of - [Data] -> {reply, Data, Tables}; - _ -> {reply, undefined, Tables} + [Data] -> {reply, Data, State}; + _ -> {reply, undefined, State} end. -do_put(Data, Tables) when is_record(Data, data) -> - Table = bucket_to_table(Data#data.bucket, Tables), +do_put(Data, State) when is_record(Data, data) -> + Table = bucket_to_table(Data#data.bucket, State), case dets:lookup(Table, Data#data.key) of [StoredData] -> case vclock:descends(Data#data.vector_clocks, StoredData#data.vector_clocks) of - true -> insert_and_reply(Data, Table, Tables); - _ -> {reply, {error, "stale or concurrent state found in kai_store"}, Tables} + true -> insert_and_reply(Data, Table, State); + _ -> {reply, {error, "stale or concurrent state found in kai_store"}, State} end; - _ -> insert_and_reply(Data, Table, Tables) + _ -> insert_and_reply(Data, Table, State) end. -insert_and_reply(Data, Table, Tables) -> +insert_and_reply(Data, Table, State) -> dets:insert(Table, Data), dets:sync(Table), - {reply, ok, Tables}. + {reply, ok, State}. -do_delete(#data{key=Key, bucket=Bucket} = _Data, Tables) -> - Table = bucket_to_table(Bucket, Tables), +do_delete(#data{key=Key, bucket=Bucket} = _Data, State) -> + Table = bucket_to_table(Bucket, State), case dets:lookup(Table, Key) of [_Data2] -> dets:delete(Table, Key), dets:sync(Table), - {reply, ok, Tables}; + {reply, ok, State}; _ -> - {reply, undefined, Tables} + {reply, undefined, State} end. +info(Name, State) -> + Values = + lists:map( + fun(I) -> + T = proplists:get_value(I, State#state.tables), + case Name of + bytes -> dets:info(T, file_size); + size -> dets:info(T, size) + end + end, + lists:seq(1, State#state.number_of_tables) + ), + {reply, lists:sum(Values), State}. + handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({list, Bucket}, _From, State) -> @@ -119,7 +133,9 @@ handle_call({put, Data}, _From, State) -> do_put(Data, State); handle_call({delete, Data}, _From, State) -> - do_delete(Data, State). + do_delete(Data, State); +handle_call({info, Name}, _From, State) -> + info(Name, State). handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Info, State) -> Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_store_ets.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_store_ets.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_store_ets.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -83,6 +83,20 @@ {reply, undefined, State} end. +info(Name, State) -> + Value = + case Name of + bytes -> + % this code roughly estimates the size of stored objects, + % since ets only store a reference to the binary + Ets = erlang:system_info(wordsize) + ets:info(?MODULE, memory), + Bin = erlang:memory(binary), + Ets + Bin; + size -> + ets:info(?MODULE, size) + end, + {reply, Value, State}. + handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({list, Bucket}, _From, State) -> @@ -92,7 +106,9 @@ handle_call({put, Data}, _From, State) -> do_put(Data, State); handle_call({delete, Data}, _From, State) -> - do_delete(Data, State). + do_delete(Data, State); +handle_call({info, Name}, _From, State) -> + info(Name, State). handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Info, State) -> Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_sup.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_sup.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_sup.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -46,6 +46,12 @@ permanent, 1000, worker, [kai_store] }, + Stat = { + kai_stat, + {kai_stat, start_link, []}, + permanent, 1000, worker, + [kai_stat] + }, Version = { kai_version, {kai_version, start_link, []}, @@ -83,6 +89,6 @@ [kai_memcache] }, {ok, {{one_for_one, 3, 10}, [ - Config, Log, Hash, Store, Version, Connection, Sync, Membership, Rpc, - Memcache + Config, Log, Hash, Store, Stat, Version, Connection, Sync, Membership, + Rpc, Memcache ]}}. Modified: branches/cooldaemon_count_current_tcp_connections/test/Makefile =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/Makefile 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/test/Makefile 2009-03-06 15:38:51 UTC (rev 115) @@ -26,7 +26,7 @@ erlc -W $(ERL_COMPILE_FLAGS) -I$(INCLUDE) $< MODS = kai_config_SUITE kai_log_SUITE kai_hash_SUITE kai_store_SUITE \ - kai_version_SUITE kai_connection_SUITE kai_sync_SUITE \ + kai_stat_SUITE kai_version_SUITE kai_connection_SUITE kai_sync_SUITE \ kai_membership_SUITE kai_coordinator_SUITE kai_tcp_server_SUITE \ kai_rpc_SUITE kai_memcache_SUITE vclock_SUITE Modified: branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec 2009-03-06 15:38:51 UTC (rev 115) @@ -1,6 +1,6 @@ {level, details}. {incl_mods, [ - kai_config, kai_log, kai_hash, kai_store, kai_version, + kai_config, kai_log, kai_hash, kai_store, kai_stat, kai_version, kai_connection, kai_sync, kai_membership, kai_coordinator, kai_tcp_server, kai_rpc, kai_memcache ]}. Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_hash_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_hash_SUITE.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_hash_SUITE.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -74,9 +74,21 @@ VirtualNodeList1 ), + {bucket_list, BucketList1} = kai_hash:bucket_list(), + ?assertEqual( + [{0, [?NODE1]}, + {1, [?NODE1]}, + {2, [?NODE1]}, + {3, [?NODE1]}, + {4, [?NODE1]}, + {5, [?NODE1]}, + {6, [?NODE1]}, + {7, [?NODE1]}], + BucketList1 + ), + {buckets, Buckets1} = kai_hash:buckets(), - ?assertEqual(8, length(Buckets1)), - ?assertEqual([?NODE1], proplists:get_value(0, Buckets1)), + ?assertEqual([0,1,2,3,4,5,6,7], Buckets1), {replaced_buckets, ReplacedBuckets2} = kai_hash:update_nodes([{?NODE2, ?INFO}, {?NODE3, ?INFO}, {?NODE4, ?INFO}], @@ -106,16 +118,21 @@ VirtualNodeList2 ), + {bucket_list, BucketList2} = kai_hash:bucket_list(), + ?assertEqual( + [{0, [?NODE3, ?NODE2, ?NODE1]}, + {1, [?NODE3, ?NODE2, ?NODE1]}, + {2, [?NODE3, ?NODE2, ?NODE1]}, + {3, [?NODE2, ?NODE1, ?NODE4]}, + {4, [?NODE2, ?NODE1, ?NODE4]}, + {5, [?NODE2, ?NODE4, ?NODE1]}, + {6, [?NODE1, ?NODE3, ?NODE2]}, + {7, [?NODE3, ?NODE2, ?NODE1]}], + BucketList2 + ), + {buckets, Buckets2} = kai_hash:buckets(), - ?assertEqual(8, length(Buckets2)), - ?assertEqual([?NODE3, ?NODE2, ?NODE1], proplists:get_value(0, Buckets2)), - ?assertEqual([?NODE3, ?NODE2, ?NODE1], proplists:get_value(1, Buckets2)), - ?assertEqual([?NODE3, ?NODE2, ?NODE1], proplists:get_value(2, Buckets2)), - ?assertEqual([?NODE2, ?NODE1, ?NODE4], proplists:get_value(3, Buckets2)), - ?assertEqual([?NODE2, ?NODE1, ?NODE4], proplists:get_value(4, Buckets2)), - ?assertEqual([?NODE2, ?NODE4, ?NODE1], proplists:get_value(5, Buckets2)), - ?assertEqual([?NODE1, ?NODE3, ?NODE2], proplists:get_value(6, Buckets2)), - ?assertEqual([?NODE3, ?NODE2, ?NODE1], proplists:get_value(7, Buckets2)), + ?assertEqual([0,1,2,3,4,5,6,7], Buckets2), {bucket, Bucket1} = kai_hash:find_bucket("item-1"), ?assertEqual(3, Bucket1), @@ -160,16 +177,21 @@ VirtualNodeList3 ), + {bucket_list, BucketList3} = kai_hash:bucket_list(), + ?assertEqual( + [{0, [?NODE3, ?NODE1, ?NODE4]}, + {1, [?NODE3, ?NODE1, ?NODE4]}, + {2, [?NODE3, ?NODE1, ?NODE4]}, + {3, [?NODE1, ?NODE4, ?NODE3]}, + {4, [?NODE1, ?NODE4, ?NODE3]}, + {5, [?NODE4, ?NODE1, ?NODE3]}, + {6, [?NODE1, ?NODE3, ?NODE4]}, + {7, [?NODE3, ?NODE1, ?NODE4]}], + BucketList3 + ), + {buckets, Buckets3} = kai_hash:buckets(), - ?assertEqual(8, length(Buckets3)), - ?assertEqual([?NODE3, ?NODE1, ?NODE4], proplists:get_value(0, Buckets3)), - ?assertEqual([?NODE3, ?NODE1, ?NODE4], proplists:get_value(1, Buckets3)), - ?assertEqual([?NODE3, ?NODE1, ?NODE4], proplists:get_value(2, Buckets3)), - ?assertEqual([?NODE1, ?NODE4, ?NODE3], proplists:get_value(3, Buckets3)), - ?assertEqual([?NODE1, ?NODE4, ?NODE3], proplists:get_value(4, Buckets3)), - ?assertEqual([?NODE4, ?NODE1, ?NODE3], proplists:get_value(5, Buckets3)), - ?assertEqual([?NODE1, ?NODE3, ?NODE4], proplists:get_value(6, Buckets3)), - ?assertEqual([?NODE3, ?NODE1, ?NODE4], proplists:get_value(7, Buckets3)), + ?assertEqual([0,1,2,3,4,5,6,7], Buckets3), {nodes, Nodes4} = kai_hash:find_nodes("item-1"), ?assertEqual([?NODE1, ?NODE4, ?NODE3], Nodes4), Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -35,6 +35,7 @@ ]), kai_hash:start_link(), kai_store:start_link(), + kai_stat:start_link(), kai_version:start_link(), kai_connection:start_link(), kai_rpc:start_link(), @@ -65,37 +66,82 @@ {ok, <<"value-1">>}, gen_tcp:recv(MemcacheSocket, byte_size(<<"value-1">>)) ), - gen_tcp:recv(MemcacheSocket, byte_size(<<"\r\nEND\r\n">>)), - inet:setopts(MemcacheSocket, [{packet, raw}]), + inet:setopts(MemcacheSocket, [{packet, line}]), + ?assertEqual( + {ok, <<"\r\n">>}, + gen_tcp:recv(MemcacheSocket, 0) + ), + ?assertEqual( + {ok, <<"END\r\n">>}, + gen_tcp:recv(MemcacheSocket, 0) + ), gen_tcp:send(MemcacheSocket, "delete item-1\r\n"), - ?assertEqual( {ok, <<"DELETED\r\n">>}, gen_tcp:recv(MemcacheSocket, 0) ), gen_tcp:send(MemcacheSocket, "delete item-2 0\r\n"), - ?assertEqual( {ok, <<"NOT_FOUND\r\n">>}, gen_tcp:recv(MemcacheSocket, 0) ), gen_tcp:send(MemcacheSocket, "get item-1\r\n"), - ?assertEqual( {ok, <<"END\r\n">>}, gen_tcp:recv(MemcacheSocket, 0) ), gen_tcp:send(MemcacheSocket, "no_such_command\r\n"), - ?assertEqual( {ok, <<"ERROR\r\n">>}, gen_tcp:recv(MemcacheSocket, 0) ), + gen_tcp:send(MemcacheSocket, "stats\r\n"), + {ok, Uptime } = gen_tcp:recv(MemcacheSocket, 0), + {ok, Time } = gen_tcp:recv(MemcacheSocket, 0), + {ok, Version } = gen_tcp:recv(MemcacheSocket, 0), + {ok, Bytes } = gen_tcp:recv(MemcacheSocket, 0), + {ok, CurrItems } = gen_tcp:recv(MemcacheSocket, 0), + {ok, CmdGet } = gen_tcp:recv(MemcacheSocket, 0), + {ok, CmdSet } = gen_tcp:recv(MemcacheSocket, 0), + {ok, BytesRead } = gen_tcp:recv(MemcacheSocket, 0), + {ok, BytesWrite } = gen_tcp:recv(MemcacheSocket, 0), + {ok, LocalNode } = gen_tcp:recv(MemcacheSocket, 0), + {ok, Quorum } = gen_tcp:recv(MemcacheSocket, 0), + {ok, NumberOfBuckets } = gen_tcp:recv(MemcacheSocket, 0), + {ok, NumberOfVirtualNodes} = gen_tcp:recv(MemcacheSocket, 0), + {ok, Store } = gen_tcp:recv(MemcacheSocket, 0), + {ok, Nodes } = gen_tcp:recv(MemcacheSocket, 0), +% {ok, Buckets } = gen_tcp:recv(MemcacheSocket, 0), + {ok, ErlangProcs } = gen_tcp:recv(MemcacheSocket, 0), + {ok, ErlangVersion } = gen_tcp:recv(MemcacheSocket, 0), + {match, _S1, _L1} = regexp:match(binary_to_list(Uptime), "uptime"), + {match, _S2, _L2} = regexp:match(binary_to_list(Time), "time"), + {match, _S3, _L3} = regexp:match(binary_to_list(Version), "version"), + {match, _S4, _L4} = regexp:match(binary_to_list(Bytes), "bytes"), + {match, _S5, _L5} = regexp:match(binary_to_list(CurrItems), "curr_items"), + {match, _S6, _L6} = regexp:match(binary_to_list(CmdGet), "cmd_get"), + {match, _S7, _L7} = regexp:match(binary_to_list(CmdSet), "cmd_set"), + {match, _S8, _L8} = regexp:match(binary_to_list(BytesRead), "bytes_read"), + {match, _S9, _L9} = regexp:match(binary_to_list(BytesWrite), "bytes_write"), + {match, _S0, _L0} = regexp:match(binary_to_list(LocalNode), "kai_node"), + {match, _Sa, _La} = regexp:match(binary_to_list(Quorum), "kai_quorum"), + {match, _Sb, _Lb} = regexp:match(binary_to_list(NumberOfBuckets), "kai_number_of_buckets"), + {match, _Sc, _Lc} = regexp:match(binary_to_list(NumberOfVirtualNodes), "kai_number_of_virtual_nodes"), + {match, _Sd, _Ld} = regexp:match(binary_to_list(Store), "kai_store"), + {match, _Se, _Le} = regexp:match(binary_to_list(Nodes), "kai_curr_nodes"), +% {match, _Sf, _Lf} = regexp:match(binary_to_list(Buckets), "kai_curr_buckets"), + {match, _Sg, _Lg} = regexp:match(binary_to_list(ErlangProcs), "erlang_procs"), + {match, _Sh, _Lh} = regexp:match(binary_to_list(ErlangVersion), "erlang_version"), + ?assertEqual( + {ok, <<"END\r\n">>}, + gen_tcp:recv(MemcacheSocket, 0) + ), + gen_tcp:send(MemcacheSocket, "quit\r\n"), gen_tcp:close(MemcacheSocket), @@ -104,6 +150,7 @@ kai_rpc:stop(), kai_connection:stop(), kai_version:stop(), + kai_stat:stop(), kai_store:stop(), kai_hash:stop(), kai_config:stop(). Added: branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl (rev 0) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -0,0 +1,82 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_stat_SUITE). +-compile(export_all). + +-include("kai.hrl"). +-include("kai_test.hrl"). + +all() -> [test_all]. + +test_all() -> []. +test_all(_Conf) -> + kai_config:start_link([ + {hostname, "localhost"}, + {rpc_port, 11011}, + {n, 3}, {r, 2}, {w, 2}, + {number_of_buckets, 8}, + {number_of_virtual_nodes, 2}, + {store, ets} + ]), + kai_hash:start_link(), + kai_store:start_link(), + kai_stat:start_link(), + + kai_stat:incr_cmd_get(), + kai_stat:incr_cmd_set(), + + Data = #data{value = (<<"value-1">>)}, + kai_stat:add_bytes_read([Data, Data]), + kai_stat:add_bytes_write(Data), + + [{uptime, Uptime }, + {time, Time }, + {version, Version }, + {bytes, Bytes }, + {curr_items, CurrItems }, + {cmd_get, CmdGet }, + {cmd_set, CmdSet }, + {bytes_read, BytesRead }, + {bytes_write, BytesWrite }, + {kai_node, LocalNode }, + {kai_quorum, Quorum }, + {kai_number_of_buckets, NumberOfBuckets }, + {kai_number_of_virtual_nodes, NumberOfVirtualNodes}, + {kai_store, Store }, + {kai_curr_nodes, Nodes }, +% {kai_curr_buckets, Buckets }, + {erlang_procs, ErlangProcs }, + {erlang_version, ErlangVersion }] = kai_stat:all(), + {match, _S1, _L1} = regexp:match(Uptime, "[0-9]+"), + {match, _S2, _L2} = regexp:match(Time, "[0-9]+"), + {match, _S3, _L3} = regexp:match(Version, "[.0-9]+"), + {match, _S4, _L4} = regexp:match(Bytes, "[0-9]+"), + ?assertEqual("0", CurrItems ), + ?assertEqual("1", CmdGet ), + ?assertEqual("1", CmdSet ), + ?assertEqual("14", BytesRead ), + ?assertEqual("7", BytesWrite ), + ?assertEqual("127.0.0.1:11011", LocalNode ), + ?assertEqual("3,2,2", Quorum ), + ?assertEqual("8", NumberOfBuckets ), + ?assertEqual("2", NumberOfVirtualNodes), + ?assertEqual("ets", Store ), + ?assertEqual("127.0.0.1:11011", Nodes ), +% ?assertEqual("0 1 2 3 4 5 6 7", Buckets ), + {match, _S5, _L5} = regexp:match(ErlangProcs, "[0-9]+"), + {match, _S6, _L6} = regexp:match(ErlangVersion, "[.0-9]+"), + + kai_stat:stop(), + kai_store:stop(), + kai_hash:stop(), + kai_config:stop(). Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_store_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_store_SUITE.erl 2009-02-28 08:39:29 UTC (rev 114) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_store_SUITE.erl 2009-03-06 15:38:51 UTC (rev 115) @@ -124,6 +124,9 @@ ?assertEqual(1, length(ListOfData4)), ?assert(lists:keymember("item-3", 2, ListOfData4)), + ?assert(is_integer(kai_store:info(bytes))), + ?assertEqual(2, kai_store:info(size)), + kai_store:stop(), kai_version:stop(), kai_config:stop(), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <coo...@us...> - 2009-03-06 15:39:35
|
Revision: 116 http://kai.svn.sourceforge.net/kai/?rev=116&view=rev Author: cooldaemon Date: 2009-03-06 15:39:34 +0000 (Fri, 06 Mar 2009) Log Message: ----------- Fixed the kai_tcp_server. Divided the source file of it. Added the monitor process. Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/src/Makefile branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec Added Paths: ----------- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl Modified: branches/cooldaemon_count_current_tcp_connections/src/Makefile =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/Makefile 2009-03-06 15:38:51 UTC (rev 115) +++ branches/cooldaemon_count_current_tcp_connections/src/Makefile 2009-03-06 15:39:34 UTC (rev 116) @@ -22,7 +22,9 @@ SOURCES = \ kai_config kai_log kai_hash kai_store kai_store_ets kai_store_dets \ kai_stat kai_version kai_connection kai_sync kai_membership \ - kai_coordinator kai_tcp_server kai_rpc kai_memcache kai_sup kai vclock + kai_coordinator kai_tcp_server \ + kai_tcp_server_sup kai_tcp_server_acceptor kai_tcp_server_monitor \ + kai_rpc kai_memcache kai_sup kai vclock MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET) Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl 2009-03-06 15:38:51 UTC (rev 115) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl 2009-03-06 15:39:34 UTC (rev 116) @@ -11,13 +11,10 @@ % the License. -module(kai_tcp_server). --behaviour(supervisor). -export([behaviour_info/1]). -export([start_link/1, start_link/2, start_link/3, start_link/4]). -export([stop/0, stop/1]). --export([init/1, acceptor_init/5]). --export([acceptor_start_link/5]). -include("kai.hrl"). @@ -25,146 +22,15 @@ behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}]; behaviour_info(_Other) -> undefined. -% Supervisor - tcp_server -%% External APIs +% External APIs start_link(Mod) -> start_link(Mod, []). start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}). start_link(Mod, Args, Option) -> start_link({local, ?MODULE}, Mod, Args, Option). start_link(Name, Mod, Args, Option) -> - supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]). + kai_tcp_server_sup:start_link(Name, Mod, Args, Option). stop() -> stop(?MODULE). stop(Name) -> - case whereis(Name) of - Pid when is_pid(Pid) -> - exit(Pid, normal), - ok; - _ -> not_started - end. + kai_tcp_server_sup:stop(Name). -%% Callbacks -init([Name, Mod, Args, Option]) -> - case Mod:init(Args) of - {ok, State} -> listen(State, Name, Mod, Option); - {stop, Reason} -> Reason; - Other -> Other % 'ignore' is contained. - end. - -%% Internal Functions -listen(State, Name, Mod, Option) -> - case gen_tcp:listen( - Option#tcp_server_option.port, - Option#tcp_server_option.listen - ) of - {ok, ListenSocket} -> - init_acceptors(ListenSocket, State, Name, Mod, Option); - {error, Reason} -> - ?warning(io_lib:format("listen(~p) ~p", [Mod, {error, Reason}])), - {stop, Reason} - end. - -init_acceptors(ListenSocket, State, {Dest, Name}, Mod, Option) -> - #tcp_server_option{ - max_processes = MaxProcesses, - max_restarts = MaxRestarts, - time = Time, - shutdown = Shutdown - } = Option, - {ok, {{one_for_one, MaxRestarts, Time}, lists:map( - fun (N) -> - AcceptorName = list_to_atom( - atom_to_list(Name) ++ "_acceptor_" ++ integer_to_list(N) - ), - { - AcceptorName, - { - ?MODULE, - acceptor_start_link, - [{Dest, AcceptorName}, ListenSocket, State, Mod, Option] - }, - permanent, - Shutdown, - worker, - [] - } - end, - lists:seq(1, MaxProcesses) - )}}. - -% ProcLib - tcp_acceptor_N -%% External APIs -acceptor_start_link({Dest, Name}, ListenSocket, State, Mod, Option) -> - {ok, Pid} = proc_lib:start_link( - ?MODULE, acceptor_init, [self(), ListenSocket, State, Mod, Option] - ), - case Dest of - local -> register(Name, Pid); - _Global -> global:register_name(Name, Pid) - end, - {ok, Pid}. - -%% Callbacks -acceptor_init(Parent, ListenSocket, State, Mod, Option) -> - proc_lib:init_ack(Parent, {ok, self()}), - acceptor_accept(ListenSocket, State, Mod, Option). - -acceptor_accept(ListenSocket, State, Mod, Option) -> - case gen_tcp:accept( - ListenSocket, Option#tcp_server_option.accept_timeout - ) of - {ok, Socket} -> - acceptor_loop( - proplists:get_value(active, Option#tcp_server_option.listen), - Socket, State, Mod, Option - ), - gen_tcp:close(Socket); - {error, Reason} -> - ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])), - timer:sleep(Option#tcp_server_option.accept_error_sleep_time) - end, - acceptor_accept(ListenSocket, State, Mod, Option). - -acceptor_loop(false, Socket, State, Mod, Option) -> - case gen_tcp:recv( - Socket, - Option#tcp_server_option.recv_length, - Option#tcp_server_option.recv_timeout - ) of - {ok, Data} -> - call_mod(false, Socket, Data, State, Mod, Option); - {error, closed} -> - tcp_closed; - {error, Reason} -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])), - error - end; - -acceptor_loop(true, _DummySocket, State, Mod, Option) -> - receive - {tcp, Socket, Data} -> - call_mod(true, Socket, Data, State, Mod, Option); - {tcp_closed, _Socket} -> - tcp_closed; - Error -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Error}])), - error - after Option#tcp_server_option.recv_timeout -> - tcp_timeout - end. - -call_mod(Active, Socket, Data, State, Mod, Option) -> - case Mod:handle_call(Socket, Data, State) of - {reply, DataToSend, State} -> - gen_tcp:send(Socket, DataToSend), - acceptor_loop(Active, Socket, State, Mod, Option); - {noreply, State} -> - acceptor_loop(Active, Socket, State, Mod, Option); - {close, State} -> - tcp_closed; - {close, DataToSend, State} -> - gen_tcp:send(Socket, DataToSend); - Other -> - ?warning(io_lib:format("call_mod(~p) ~p", [Mod, {unexpected_result, Other}])) - end. - Added: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl (rev 0) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl 2009-03-06 15:39:34 UTC (rev 116) @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_acceptor). + +-export([start_link/6]). +-export([init/6]). + +-include("kai.hrl"). + +% External APIs +start_link({Dest, Name}, ListenSocket, State, MonitorName, Mod, Option) -> + {ok, Pid} = proc_lib:start_link( + ?MODULE, init, + [self(), ListenSocket, State, MonitorName, Mod, Option] + ), + case Dest of + local -> register(Name, Pid); + _Global -> global:register_name(Name, Pid) + end, + {ok, Pid}. + +% Callbacks +init(Parent, ListenSocket, State, MonitorName, Mod, Option) -> + proc_lib:init_ack(Parent, {ok, self()}), + kai_tcp_server_monitor:regist(MonitorName, self()), + accept(ListenSocket, State, MonitorName, Mod, Option). + +accept(ListenSocket, State, MonitorName, Mod, Option) -> + case gen_tcp:accept( + ListenSocket, Option#tcp_server_option.accept_timeout + ) of + {ok, Socket} -> + kai_tcp_server_monitor:increment(MonitorName, self()), + recv( + proplists:get_value(active, Option#tcp_server_option.listen), + Socket, State, Mod, Option + ), + kai_tcp_server_monitor:decrement(MonitorName, self()), + gen_tcp:close(Socket); + {error, Reason} -> + ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])), + timer:sleep(Option#tcp_server_option.accept_error_sleep_time) + end, + accept(ListenSocket, State, MonitorName, Mod, Option). + +recv(false, Socket, State, Mod, Option) -> + case gen_tcp:recv( + Socket, + Option#tcp_server_option.recv_length, + Option#tcp_server_option.recv_timeout + ) of + {ok, Data} -> + call_mod(false, Socket, Data, State, Mod, Option); + {error, closed} -> + tcp_closed; + {error, Reason} -> + ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])), + error + end; + +recv(true, _DummySocket, State, Mod, Option) -> + receive + {tcp, Socket, Data} -> + call_mod(true, Socket, Data, State, Mod, Option); + {tcp_closed, _Socket} -> + tcp_closed; + Error -> + ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Error}])), + error + after Option#tcp_server_option.recv_timeout -> + tcp_timeout + end. + +call_mod(Active, Socket, Data, State, Mod, Option) -> + case Mod:handle_call(Socket, Data, State) of + {reply, DataToSend, State} -> + gen_tcp:send(Socket, DataToSend), + recv(Active, Socket, State, Mod, Option); + {noreply, State} -> + recv(Active, Socket, State, Mod, Option); + {close, State} -> + tcp_closed; + {close, DataToSend, State} -> + gen_tcp:send(Socket, DataToSend); + Other -> + ?warning(io_lib:format("call_mod(~p) ~p", [Mod, {unexpected_result, Other}])) + end. + Added: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl (rev 0) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl 2009-03-06 15:39:34 UTC (rev 116) @@ -0,0 +1,92 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_monitor). +-behaviour(gen_server). + +-export([start_link/1, stop/1]). +-export([ + regist/2, + increment/2, decrement/2, + state/1 +]). +-export([ + init/1, + handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3 +]). + +-include("kai.hrl"). + +% External APIs +start_link(Name) -> + gen_server:start_link(Name, ?MODULE, [], []). + +stop(ServerRef) -> + gen_server:call(ServerRef, stop). + +regist(ServerRef, Pid) -> + gen_server:call(ServerRef, {regist, Pid}). + +increment(ServerRef, Pid) -> + gen_server:cast(ServerRef, {increment, Pid}). + +decrement(ServerRef, Pid) -> + gen_server:cast(ServerRef, {decrement, Pid}). + +state(ServerRef) -> + gen_server:call(ServerRef, state). + +% Callbacks +init(_Args) -> + {ok, {_MonitorRefs = [], _Pids = []}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +handle_call({regist, Pid}, _From, {MonitorRefs, Pids}) -> + {reply, ok, { + [erlang:monitor(process, Pid) | MonitorRefs], + Pids + }}; + +handle_call(state, _From, State) -> + {reply, State, State}; + +handle_call(_Message, _From, State) -> + {reply, ok, State}. + +handle_cast({increment, Pid}, {MonitorRefs, Pids}) -> + {noreply, {MonitorRefs, [Pid | Pids]}}; + +handle_cast({decrement, Pid}, {MonitorRefs, Pids}) -> + {noreply, {MonitorRefs, lists:delete(Pid, Pids)}}; + +handle_cast(_Message, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, {MonitorRefs, Pids}) -> + erlang:demonitor(MonitorRef), + {noreply, { + lists:delete(MonitorRef, MonitorRefs), + lists:delete(Pid, Pids) + }}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + Added: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl (rev 0) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl 2009-03-06 15:39:34 UTC (rev 116) @@ -0,0 +1,128 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_tcp_server_sup). +-behaviour(supervisor). + +-export([start_link/4, stop/1]). +-export([init/1]). + +-include("kai.hrl"). + +% External APIs +start_link(Name, Mod, Args, Option) -> + supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]). + +stop(Name) -> + case whereis(Name) of + Pid when is_pid(Pid) -> + exit(Pid, normal), + ok; + _ -> not_started + end. + +% Callbacks +init([Name, Mod, Args, Option]) -> + case Mod:init(Args) of + {ok, State} -> listen(State, Name, Mod, Option); + {stop, Reason} -> Reason; + Other -> Other % 'ignore' is contained. + end. + +% Internal Functions +listen(State, Name, Mod, Option) -> + case gen_tcp:listen( + Option#tcp_server_option.port, + Option#tcp_server_option.listen + ) of + {ok, ListenSocket} -> + build_result(ListenSocket, State, Name, Mod, Option); + {error, Reason} -> + ?warning(io_lib:format("listen(~p) ~p", [Mod, {error, Reason}])), + {stop, Reason} + end. + +build_result(ListenSocket, State, {Dest, Name}, Mod, Option) -> + #tcp_server_option{ + max_restarts = MaxRestarts, + time = Time + } = Option, + MonitorName = build_monitor_name(Name), + {ok, { + {one_for_one, MaxRestarts, Time}, + [ + monitor_spec({Dest, MonitorName}) | + acceptor_specs( + ListenSocket, State, {Dest, Name}, MonitorName, Mod, Option + ) + ] + }}. + +monitor_spec({Dest, MonitorName}) -> + { + MonitorName, + { + kai_tcp_server_monitor, + start_link, + [{Dest, MonitorName}] + }, + permanent, + brutal_kill, + worker, + [] + }. + +acceptor_specs( + ListenSocket, State, {Dest, Name}, MonitorBaseName, Mod, Option +) -> + #tcp_server_option{ + max_processes = MaxProcesses, + shutdown = Shutdown + } = Option, + MonitorName = case Dest of + local -> MonitorBaseName; + _Global -> {Dest, MonitorBaseName} + end, + lists:map( + fun (N) -> + AcceptorName = build_acceptor_name(Name, N), + { + AcceptorName, + { + kai_tcp_server_acceptor, + start_link, + [ + {Dest, AcceptorName}, + ListenSocket, + State, + MonitorName, + Mod, + Option + ] + }, + permanent, + Shutdown, + worker, + [] + } + end, + lists:seq(1, MaxProcesses) + ). + +build_monitor_name(Prefix) -> + list_to_atom(atom_to_list(Prefix) ++ "_monitor"). + +build_acceptor_name(Prefix, Number) -> + list_to_atom( + atom_to_list(Prefix) ++ "_acceptor_" ++ integer_to_list(Number) + ). + Modified: branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec 2009-03-06 15:38:51 UTC (rev 115) +++ branches/cooldaemon_count_current_tcp_connections/test/kai.coverspec 2009-03-06 15:39:34 UTC (rev 116) @@ -2,6 +2,8 @@ {incl_mods, [ kai_config, kai_log, kai_hash, kai_store, kai_stat, kai_version, kai_connection, kai_sync, kai_membership, kai_coordinator, - kai_tcp_server, kai_rpc, kai_memcache + kai_tcp_server, + kai_tcp_server_sup, kai_tcp_server_acceptor, kai_tcp_server_monitor, + kai_rpc, kai_memcache ]}. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <coo...@us...> - 2009-03-15 07:21:24
|
Revision: 120 http://kai.svn.sourceforge.net/kai/?rev=120&view=rev Author: cooldaemon Date: 2009-03-15 07:21:09 +0000 (Sun, 15 Mar 2009) Log Message: ----------- Merge branch 'master' into count_current_tcp_connections Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl branches/cooldaemon_count_current_tcp_connections/test/kai_coordinator_SUITE.erl branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl 2009-03-13 15:05:25 UTC (rev 119) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_memcache.erl 2009-03-15 07:21:09 UTC (rev 120) @@ -78,6 +78,14 @@ ), {reply, [Response|"END\r\n"], State}; +dispatch(_Socket, ["version"], State) -> + Version = + case application:get_key(kai, vsn) of + {ok, V} -> V; + _ -> "0" + end, + {reply, "VERSION " ++ Version ++ "\r\n", State}; + dispatch(_Socket, ["quit"], _State) -> quit; dispatch(_Socket, _Unknown, State) -> Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_coordinator_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_coordinator_SUITE.erl 2009-03-13 15:05:25 UTC (rev 119) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_coordinator_SUITE.erl 2009-03-15 07:21:09 UTC (rev 120) @@ -17,7 +17,8 @@ -include("kai_test.hrl"). -include("ct.hrl"). -all() -> [test1, get_concurrent_data, put_concurrent_data]. +all() -> [test1, get_concurrent_data, put_concurrent_data, + put_and_overwrite_stale_data, put_and_fail_overwrite_stale_data]. init_per_suite(Config) -> net_kernel:start([hoge, shortnames]), @@ -143,6 +144,46 @@ p("put error reason:", Reason), ok. +put_and_overwrite_stale_data() -> []. +put_and_overwrite_stale_data(Config) -> + Key = "key3", + Node1 = ?config(node1, Config), + _Node2 = ?config(node2, Config), + ok = rpc:call(Node1, kai_coordinator, route, [{put, #data{key=Key, value="value1"}}]), + [Data] = rpc:call(Node1, kai_coordinator, route, [{get, #data{key=Key}}]), + IntentionalAhreadVCAtNode1 = vclock:increment(rpc:call(Node1, kai_config, get, [node]), + Data#data.vector_clocks), + ok = rpc:call(Node1, kai_store, put, [Data#data{vector_clocks=IntentionalAhreadVCAtNode1}]), + ok = rpc:call(Node1, + kai_coordinator, route, + [{put, #data{key=Key, + flags = "0", + value = (<<"value-1">>)} + }]), + ok. + +put_and_fail_overwrite_stale_data() -> []. +put_and_fail_overwrite_stale_data(Config) -> + Key = "key3", + Node1 = ?config(node1, Config), + Node2 = ?config(node2, Config), + ok = rpc:call(Node1, kai_coordinator, route, [{put, #data{key=Key, value="value1"}}]), + [Data] = rpc:call(Node1, kai_coordinator, route, [{get, #data{key=Key}}]), + IntentionalAhreadVCAtNode1 = vclock:increment(rpc:call(Node1, kai_config, get, [node]), + Data#data.vector_clocks), + ok = rpc:call(Node1, kai_store, put, [Data#data{vector_clocks=IntentionalAhreadVCAtNode1}]), + %% FIXME: this rpc:call should succeceed. Correct pattern matching is as below: + %% ok = rpc:call(Node2, + {error, Reason} = rpc:call(Node2, + kai_coordinator, route, + [{put, #data{key=Key, + flags = "0", + value = (<<"value-1">>)} + }]), + ?assertEqual(ebusy, Reason), + p("put error reason:", Reason), + ok. + start_kai_node(MemcachePort, N, R, W) -> {ok, Node} = slave:start(net_adm:localhost(), list_to_atom("kai_test_" ++ integer_to_list(MemcachePort)), Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-03-13 15:05:25 UTC (rev 119) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-03-15 07:21:09 UTC (rev 120) @@ -142,6 +142,10 @@ gen_tcp:recv(MemcacheSocket, 0) ), + gen_tcp:send(MemcacheSocket, "version\r\n"), + {ok, Version2} = gen_tcp:recv(MemcacheSocket, 0), + {match, _Si, _Li} = regexp:match(binary_to_list(Version2), "VERSION "), + gen_tcp:send(MemcacheSocket, "quit\r\n"), gen_tcp:close(MemcacheSocket), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <coo...@us...> - 2009-03-15 07:21:57
|
Revision: 122 http://kai.svn.sourceforge.net/kai/?rev=122&view=rev Author: cooldaemon Date: 2009-03-15 07:21:50 +0000 (Sun, 15 Mar 2009) Log Message: ----------- Caught all the errors of the kai_tcp_server callback module. Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl 2009-03-15 07:21:28 UTC (rev 121) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl 2009-03-15 07:21:50 UTC (rev 122) @@ -40,15 +40,27 @@ ListenSocket, Option#tcp_server_option.accept_timeout ) of {ok, Socket} -> - kai_tcp_server_monitor:increment(MonitorName, self()), - recv( - proplists:get_value(active, Option#tcp_server_option.listen), - Socket, State, Mod, Option - ), - kai_tcp_server_monitor:decrement(MonitorName, self()), - gen_tcp:close(Socket); + try + kai_tcp_server_monitor:increment(MonitorName, self()), + recv( + proplists:get_value( + active, Option#tcp_server_option.listen + ), + Socket, State, Mod, Option + ) + catch + Type:Reason -> + ?warning(io_lib:format( + "accept(~p) ~p", [Mod, {Type, Reason}] + )) + after + kai_tcp_server_monitor:decrement(MonitorName, self()), + gen_tcp:close(Socket) + end; {error, Reason} -> - ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])), + ?warning(io_lib:format( + "accept(~p) ~p", [Mod, {error, Reason}] + )), timer:sleep(Option#tcp_server_option.accept_error_sleep_time) end, accept(ListenSocket, State, MonitorName, Mod, Option). @@ -64,7 +76,7 @@ {error, closed} -> tcp_closed; {error, Reason} -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])), + ?warning(io_lib:format("recv(~p) ~p", [Mod, {error, Reason}])), error end; @@ -75,7 +87,7 @@ {tcp_closed, _Socket} -> tcp_closed; Error -> - ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Error}])), + ?warning(io_lib:format("recv(~p) ~p", [Mod, {error, Error}])), error after Option#tcp_server_option.recv_timeout -> tcp_timeout @@ -93,6 +105,8 @@ {close, DataToSend, State} -> gen_tcp:send(Socket, DataToSend); Other -> - ?warning(io_lib:format("call_mod(~p) ~p", [Mod, {unexpected_result, Other}])) + ?warning(io_lib:format( + "call_mod(~p) ~p", [Mod, {unexpected_result, Other}] + )) end. Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl 2009-03-15 07:21:28 UTC (rev 121) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl 2009-03-15 07:21:50 UTC (rev 122) @@ -18,30 +18,52 @@ -include("kai.hrl"). -include("kai_test.hrl"). -%sequences() -> [{sequences1, [testcase1, testcase2]}]. -sequences() -> [{sequences1, [testcase1]}]. +sequences() -> [{sequences1, [testcase1, testcase2, testcase3]}]. all() -> [{sequence, sequences1}]. init_per_testcase(testcase1, Config) -> + start_server(), + Config; + +init_per_testcase(testcase2, Config) -> + start_server(), + Config; + +init_per_testcase(testcase3, Config) -> + start_server(), + Config; + +init_per_testcase(_TestCase, Config) -> + Config. + +start_server() -> kai_tcp_server:start_link( ?MODULE, [], #tcp_server_option{max_processes=1} - ), - Config; + ). -init_per_testcase(_TestCase, Config) -> Config. - end_per_testcase(testcase1, _Config) -> kai_tcp_server:stop(), ok; -end_per_testcase(_TestCase, _Config) -> ok. +end_per_testcase(testcase2, _Config) -> + kai_tcp_server:stop(), + ok; +end_per_testcase(testcase3, _Config) -> + kai_tcp_server:stop(), + ok; + +end_per_testcase(_TestCase, _Config) -> + ok. + testcase1() -> []. testcase1(_Conf) -> - {ok, Socket} = gen_tcp:connect( - {127,0,0,1}, 11211, [binary, {packet, line}, {active, false}] - ), + normal_test(), + ok. + +normal_test() -> + {ok, Socket} = connect_to_echo_server(), gen_tcp:send(Socket, <<"hello\r\n">>), case gen_tcp:recv(Socket, 0) of {ok, <<"hello\r\n">>} -> ok; @@ -55,11 +77,37 @@ gen_tcp:close(Socket), ok. +testcase2() -> []. +testcase2(_Conf) -> + {ok, Socket} = connect_to_echo_server(), + gen_tcp:send(Socket, <<"error\r\n">>), + {error, closed} = gen_tcp:recv(Socket, 0), + gen_tcp:close(Socket), + + normal_test(), % check the echo server rebooted. + ok. + +testcase3() -> []. +testcase3(_Conf) -> + lists:foreach(fun (_N) -> + {ok, Socket} = connect_to_echo_server(), + gen_tcp:close(Socket) + end, lists:seq(1, 10000)), + ok. + +connect_to_echo_server() -> + gen_tcp:connect( + {127,0,0,1}, 11211, [binary, {packet, line}, {active, false}] + ). + % echo server init(_Args) -> {ok, {}}. handle_call(_Socket, <<"bye\r\n">>, State) -> {close, <<"cya\r\n">>, State}; +handle_call(_Socket, <<"error\r\n">>, State) -> + BadArith = 1/0, + {close, <<"error\r\n">>, State}; handle_call(_Socket, Data, State) -> {reply, Data, State}. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <coo...@us...> - 2009-03-15 07:22:16
|
Revision: 123 http://kai.svn.sourceforge.net/kai/?rev=123&view=rev Author: cooldaemon Date: 2009-03-15 07:22:15 +0000 (Sun, 15 Mar 2009) Log Message: ----------- Added the function for getting the curr_connections. Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl 2009-03-15 07:21:50 UTC (rev 122) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server.erl 2009-03-15 07:22:15 UTC (rev 123) @@ -15,6 +15,7 @@ -export([behaviour_info/1]). -export([start_link/1, start_link/2, start_link/3, start_link/4]). -export([stop/0, stop/1]). +-export([info/1, info/2]). -include("kai.hrl"). @@ -34,3 +35,9 @@ stop(Name) -> kai_tcp_server_sup:stop(Name). +info(Key) -> info(?MODULE, Key). +info(Name, Key) -> + kai_tcp_server_monitor:info( + kai_tcp_server_sup:build_monitor_name(Name), Key + ). + Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl 2009-03-15 07:21:50 UTC (rev 122) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_acceptor.erl 2009-03-15 07:22:15 UTC (rev 123) @@ -35,6 +35,7 @@ kai_tcp_server_monitor:register(MonitorName, self()), accept(ListenSocket, State, MonitorName, Mod, Option). +% Internal Functions accept(ListenSocket, State, MonitorName, Mod, Option) -> case gen_tcp:accept( ListenSocket, Option#tcp_server_option.accept_timeout Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl 2009-03-15 07:21:50 UTC (rev 122) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_monitor.erl 2009-03-15 07:22:15 UTC (rev 123) @@ -17,7 +17,7 @@ -export([ register/2, increment/2, decrement/2, - state/1 + info/2 ]). -export([ init/1, @@ -43,8 +43,8 @@ decrement(ServerRef, Pid) -> gen_server:cast(ServerRef, {decrement, Pid}). -state(ServerRef) -> - gen_server:call(ServerRef, state). +info(ServerRef, Key) -> + gen_server:call(ServerRef, {info, Key}). % Callbacks init(_Args) -> @@ -59,8 +59,8 @@ Pids }}; -handle_call(state, _From, State) -> - {reply, State, State}; +handle_call({info, Key}, _From, State) -> + {reply, state_to_info(State, Key), State}; handle_call(_Message, _From, State) -> {reply, ok, State}. @@ -90,3 +90,10 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +% Internal Functions +state_to_info({_MonitorRefs, Pids}, curr_connections) -> + length(Pids); + +state_to_info(_State, _Key) -> + undefined. + Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl 2009-03-15 07:21:50 UTC (rev 122) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_tcp_server_sup.erl 2009-03-15 07:22:15 UTC (rev 123) @@ -15,6 +15,7 @@ -export([start_link/4, stop/1]). -export([init/1]). +-export([build_monitor_name/1]). -include("kai.hrl"). Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl 2009-03-15 07:21:50 UTC (rev 122) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_tcp_server_SUITE.erl 2009-03-15 07:22:15 UTC (rev 123) @@ -18,43 +18,27 @@ -include("kai.hrl"). -include("kai_test.hrl"). -sequences() -> [{sequences1, [testcase1, testcase2, testcase3]}]. +sequences() -> + [{sequences1, [testcase1, testcase2, testcase3, testcase4]}]. -all() -> [{sequence, sequences1}]. +all() -> + [{sequence, sequences1}]. -init_per_testcase(testcase1, Config) -> - start_server(), +init_per_testcase(testcase4, Config) -> + start_server(10), Config; -init_per_testcase(testcase2, Config) -> - start_server(), - Config; - -init_per_testcase(testcase3, Config) -> - start_server(), - Config; - init_per_testcase(_TestCase, Config) -> + start_server(1), Config. -start_server() -> +start_server(MaxProcesses) -> kai_tcp_server:start_link( - ?MODULE, [], #tcp_server_option{max_processes=1} + ?MODULE, [], #tcp_server_option{max_processes=MaxProcesses} ). -end_per_testcase(testcase1, _Config) -> - kai_tcp_server:stop(), - ok; - -end_per_testcase(testcase2, _Config) -> - kai_tcp_server:stop(), - ok; - -end_per_testcase(testcase3, _Config) -> - kai_tcp_server:stop(), - ok; - end_per_testcase(_TestCase, _Config) -> + kai_tcp_server:stop(), ok. testcase1() -> []. @@ -83,7 +67,6 @@ gen_tcp:send(Socket, <<"error\r\n">>), {error, closed} = gen_tcp:recv(Socket, 0), gen_tcp:close(Socket), - normal_test(), % check the echo server rebooted. ok. @@ -95,6 +78,21 @@ end, lists:seq(1, 10000)), ok. +testcase4() -> []. +testcase4(_Conf) -> + Sockets = lists:map(fun (_N) -> + {ok, Socket} = connect_to_echo_server(), + Socket + end, lists:seq(1, 5)), + case kai_tcp_server:info(curr_connections) of + 5 -> ok; + Error -> + ct:comment(io:format("bad_info:~p", [Error])), + ct:fail(bad_info) + end, + lists:foreach(fun (Socket) -> gen_tcp:close(Socket) end, Sockets), + ok. + connect_to_echo_server() -> gen_tcp:connect( {127,0,0,1}, 11211, [binary, {packet, line}, {active, false}] This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <coo...@us...> - 2009-03-19 10:02:02
|
Revision: 127 http://kai.svn.sourceforge.net/kai/?rev=127&view=rev Author: cooldaemon Date: 2009-03-19 10:01:53 +0000 (Thu, 19 Mar 2009) Log Message: ----------- curr_connections was added to the status. Modified Paths: -------------- branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl Modified: branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl 2009-03-19 10:01:05 UTC (rev 126) +++ branches/cooldaemon_count_current_tcp_connections/src/kai_stat.erl 2009-03-19 10:01:53 UTC (rev 127) @@ -92,6 +92,9 @@ curr_items -> Size = kai_store:info(size), {curr_items, integer_to_list(Size)}; + curr_connections -> + Connections = kai_tcp_server:info(kai_memcache, curr_connections), + {curr_connections, integer_to_list(Connections)}; cmd_get -> {cmd_get, integer_to_list(State#state.cmd_get)}; cmd_set -> @@ -131,7 +134,8 @@ Stats = lists:map( fun(Name) -> stat(Name, State) end, - [uptime, time, version, bytes, curr_items, cmd_get, cmd_set, + [uptime, time, version, bytes, + curr_items, curr_connections, cmd_get, cmd_set, bytes_read, bytes_write, kai_node, kai_quorum, kai_number_of_buckets, kai_number_of_virtual_nodes, kai_store, kai_curr_nodes, Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-03-19 10:01:05 UTC (rev 126) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_memcache_SUITE.erl 2009-03-19 10:01:53 UTC (rev 127) @@ -106,6 +106,7 @@ {ok, Version } = gen_tcp:recv(MemcacheSocket, 0), {ok, Bytes } = gen_tcp:recv(MemcacheSocket, 0), {ok, CurrItems } = gen_tcp:recv(MemcacheSocket, 0), + {ok, CurrConnections } = gen_tcp:recv(MemcacheSocket, 0), {ok, CmdGet } = gen_tcp:recv(MemcacheSocket, 0), {ok, CmdSet } = gen_tcp:recv(MemcacheSocket, 0), {ok, BytesRead } = gen_tcp:recv(MemcacheSocket, 0), @@ -124,19 +125,20 @@ {match, _S3, _L3} = regexp:match(binary_to_list(Version), "version"), {match, _S4, _L4} = regexp:match(binary_to_list(Bytes), "bytes"), {match, _S5, _L5} = regexp:match(binary_to_list(CurrItems), "curr_items"), - {match, _S6, _L6} = regexp:match(binary_to_list(CmdGet), "cmd_get"), - {match, _S7, _L7} = regexp:match(binary_to_list(CmdSet), "cmd_set"), - {match, _S8, _L8} = regexp:match(binary_to_list(BytesRead), "bytes_read"), - {match, _S9, _L9} = regexp:match(binary_to_list(BytesWrite), "bytes_write"), - {match, _S0, _L0} = regexp:match(binary_to_list(LocalNode), "kai_node"), - {match, _Sa, _La} = regexp:match(binary_to_list(Quorum), "kai_quorum"), - {match, _Sb, _Lb} = regexp:match(binary_to_list(NumberOfBuckets), "kai_number_of_buckets"), - {match, _Sc, _Lc} = regexp:match(binary_to_list(NumberOfVirtualNodes), "kai_number_of_virtual_nodes"), - {match, _Sd, _Ld} = regexp:match(binary_to_list(Store), "kai_store"), - {match, _Se, _Le} = regexp:match(binary_to_list(Nodes), "kai_curr_nodes"), -% {match, _Sf, _Lf} = regexp:match(binary_to_list(Buckets), "kai_curr_buckets"), - {match, _Sg, _Lg} = regexp:match(binary_to_list(ErlangProcs), "erlang_procs"), - {match, _Sh, _Lh} = regexp:match(binary_to_list(ErlangVersion), "erlang_version"), + {match, _S6, _L6} = regexp:match(binary_to_list(CurrConnections), "curr_connections"), + {match, _S7, _L7} = regexp:match(binary_to_list(CmdGet), "cmd_get"), + {match, _S8, _L8} = regexp:match(binary_to_list(CmdSet), "cmd_set"), + {match, _S9, _L9} = regexp:match(binary_to_list(BytesRead), "bytes_read"), + {match, _S0, _L0} = regexp:match(binary_to_list(BytesWrite), "bytes_write"), + {match, _Sa, _La} = regexp:match(binary_to_list(LocalNode), "kai_node"), + {match, _Sb, _Lb} = regexp:match(binary_to_list(Quorum), "kai_quorum"), + {match, _Sc, _Lc} = regexp:match(binary_to_list(NumberOfBuckets), "kai_number_of_buckets"), + {match, _Sd, _Ld} = regexp:match(binary_to_list(NumberOfVirtualNodes), "kai_number_of_virtual_nodes"), + {match, _Se, _Le} = regexp:match(binary_to_list(Store), "kai_store"), + {match, _Sf, _Lf} = regexp:match(binary_to_list(Nodes), "kai_curr_nodes"), +% {match, _Sg, _Lg} = regexp:match(binary_to_list(Buckets), "kai_curr_buckets"), + {match, _Sh, _Lh} = regexp:match(binary_to_list(ErlangProcs), "erlang_procs"), + {match, _Si, _Li} = regexp:match(binary_to_list(ErlangVersion), "erlang_version"), ?assertEqual( {ok, <<"END\r\n">>}, gen_tcp:recv(MemcacheSocket, 0) @@ -144,7 +146,7 @@ gen_tcp:send(MemcacheSocket, "version\r\n"), {ok, Version2} = gen_tcp:recv(MemcacheSocket, 0), - {match, _Si, _Li} = regexp:match(binary_to_list(Version2), "VERSION "), + {match, _Sj, _Lj} = regexp:match(binary_to_list(Version2), "VERSION "), gen_tcp:send(MemcacheSocket, "quit\r\n"), Modified: branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl =================================================================== --- branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl 2009-03-19 10:01:05 UTC (rev 126) +++ branches/cooldaemon_count_current_tcp_connections/test/kai_stat_SUITE.erl 2009-03-19 10:01:53 UTC (rev 127) @@ -23,6 +23,8 @@ kai_config:start_link([ {hostname, "localhost"}, {rpc_port, 11011}, + {memcache_port, 11211}, + {memcache_max_processes, 1}, {n, 3}, {r, 2}, {w, 2}, {number_of_buckets, 8}, {number_of_virtual_nodes, 2}, @@ -31,6 +33,7 @@ kai_hash:start_link(), kai_store:start_link(), kai_stat:start_link(), + kai_memcache:start_link(), kai_stat:incr_cmd_get(), kai_stat:incr_cmd_set(), @@ -44,6 +47,7 @@ {version, Version }, {bytes, Bytes }, {curr_items, CurrItems }, + {curr_connections, CurrConnections }, {cmd_get, CmdGet }, {cmd_set, CmdSet }, {bytes_read, BytesRead }, @@ -62,6 +66,7 @@ {match, _S3, _L3} = regexp:match(Version, "[.0-9]+"), {match, _S4, _L4} = regexp:match(Bytes, "[0-9]+"), ?assertEqual("0", CurrItems ), + ?assertEqual("0", CurrConnections ), ?assertEqual("1", CmdGet ), ?assertEqual("1", CmdSet ), ?assertEqual("14", BytesRead ), @@ -76,6 +81,7 @@ {match, _S5, _L5} = regexp:match(ErlangProcs, "[0-9]+"), {match, _S6, _L6} = regexp:match(ErlangVersion, "[.0-9]+"), + kai_memcache:stop(), kai_stat:stop(), kai_store:stop(), kai_hash:stop(), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |