[Kai-commits] SF.net SVN: kai:[93] trunk
Kai is a distributed key-value datastore
Status: Beta
Brought to you by:
takemaru
From: <tak...@us...> - 2008-11-01 09:57:42
|
Revision: 93 http://kai.svn.sourceforge.net/kai/?rev=93&view=rev Author: takemaru Date: 2008-11-01 09:57:39 +0000 (Sat, 01 Nov 2008) Log Message: ----------- Merge -r 87:92 branches/takemaru_store_dets Modified Paths: -------------- trunk/include/kai.hrl trunk/kai.config trunk/src/Makefile trunk/src/kai.app.src trunk/src/kai.erl trunk/src/kai_config.erl trunk/src/kai_connection.erl trunk/src/kai_coordinator.erl trunk/src/kai_membership.erl trunk/src/kai_memcache.erl trunk/src/kai_store.erl trunk/src/kai_sup.erl trunk/src/kai_sync.erl trunk/test/Makefile trunk/test/kai.coverspec trunk/test/kai_config_SUITE.erl trunk/test/kai_connection_SUITE.erl trunk/test/kai_coordinator_SUITE.erl trunk/test/kai_hash_SUITE.erl trunk/test/kai_log_SUITE.erl trunk/test/kai_membership_SUITE.erl trunk/test/kai_memcache_SUITE.erl trunk/test/kai_store_SUITE.erl trunk/test/kai_sync_SUITE.erl Added Paths: ----------- trunk/src/kai_rpc.erl trunk/src/kai_store_dets.erl trunk/src/kai_store_ets.erl trunk/test/kai_rpc_SUITE.erl Removed Paths: ------------- trunk/src/kai_api.erl trunk/test/kai_api_SUITE.erl Modified: trunk/include/kai.hrl =================================================================== --- trunk/include/kai.hrl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/include/kai.hrl 2008-11-01 09:57:39 UTC (rev 93) @@ -33,3 +33,6 @@ %-define(debug(Data), kai_log:log(debug, self(), ?FILE, ?LINE, Data)). -define(debug(_Data), ok). + +-define(TIMEOUT, 5000). +-define(TIMER, 1000). Modified: trunk/kai.config =================================================================== --- trunk/kai.config 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/kai.config 2008-11-01 09:57:39 UTC (rev 93) @@ -1,7 +1,7 @@ [{kai, [%{logfile, "kai.log"}, %{hostname, "localhost"}, - {api_port, 11011}, - {api_max_processes, 30}, + {rpc_port, 11011}, + {rpc_max_processes, 30}, {memcache_port, 11211}, {memcache_max_processes, 10}, {max_connections, 32}, @@ -9,4 +9,8 @@ {r, 2}, {w, 2}, {number_of_buckets, 1024}, - {number_of_virtual_nodes, 128}]}]. + {number_of_virtual_nodes, 128}, + {store, ets}, + %{dets_dir, "/path/to/dir"}, + {number_of_tables, 256} +]}]. Modified: trunk/src/Makefile =================================================================== --- trunk/src/Makefile 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/Makefile 2008-11-01 09:57:39 UTC (rev 93) @@ -20,9 +20,9 @@ +debug_info SOURCES = \ - kai_config kai_log kai_hash kai_store kai_version kai_connection \ - kai_sync kai_membership kai_coordinator kai_tcp_server kai_api \ - kai_memcache kai_sup kai vclock + kai_config kai_log kai_hash kai_store kai_store_ets kai_store_dets \ + kai_version kai_connection kai_sync kai_membership kai_coordinator \ + kai_tcp_server kai_rpc kai_memcache kai_sup kai vclock MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET) Modified: trunk/src/kai.app.src =================================================================== --- trunk/src/kai.app.src 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai.app.src 2008-11-01 09:57:39 UTC (rev 93) @@ -2,7 +2,7 @@ [{description, "Kai - A distributed hashtable like Amazon's Dynamo"}, {vsn, "%VSN%"}, {modules, [ - kai, kai_sup, kai_memcache, kai_api, kai_tcp_server, kai_coordinator, + kai, kai_sup, kai_memcache, kai_rpc, kai_tcp_server, kai_coordinator, kai_membership, kai_sync, kai_connection, kai_version, kai_store, kai_hash, kai_log, kai_config ]}, @@ -14,13 +14,15 @@ {mod, {kai, []}}, {start_phases, []}, {env, [ - {api_port, 11011}, - {api_max_processes, 30}, + {rpc_port, 11011}, + {rpc_max_processes, 30}, {memcache_port, 11211}, {memcache_max_processes, 10}, {max_connections, 32}, {n, 3}, {r, 2}, {w, 2}, {number_of_buckets, 1024}, - {number_of_virtual_nodes, 128} + {number_of_virtual_nodes, 128}, + {store, ets}, + {number_of_tables, 256} ]} ]}. Modified: trunk/src/kai.erl =================================================================== --- trunk/src/kai.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -26,11 +26,12 @@ start(_Type, _Args) -> Args = config([ logfile, hostname, - api_port, api_max_processes, + rpc_port, rpc_max_processes, memcache_port, memcache_max_processes, max_connections, n, r, w, - number_of_buckets, number_of_virtual_nodes + number_of_buckets, number_of_virtual_nodes, + store, dets_dir, number_of_tables ], []), kai_sup:start_link(Args). Deleted: trunk/src/kai_api.erl =================================================================== --- trunk/src/kai_api.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_api.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -1,176 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(kai_api). --behaviour(kai_tcp_server). - --export([start_link/0, stop/0]). --export([init/1, handle_call/3]). --export([ - node_info/1, node_list/1, - list/2, get/2, put/2, delete/2, - check_node/2, route/2 -]). - --include("kai.hrl"). - --define(TIMEOUT, 3000). - -start_link() -> - kai_tcp_server:start_link( - {local, ?MODULE}, - ?MODULE, - [], - #tcp_server_option{ - listen = [binary, {packet, 4}, {active, true}, {reuseaddr, true}], - port = kai_config:get(api_port), - max_processes = kai_config:get(api_max_processes) - } - ). - -stop() -> kai_tcp_server:stop(?MODULE). - -init(_Args) -> {ok, {}}. - -handle_call(Socket, Data, State) -> - dispatch(Socket, binary_to_term(Data), State). - -dispatch(_Socket, node_info, State) -> - reply(kai_config:node_info(), State); - -dispatch(_Socket, node_list, State) -> - reply(kai_hash:node_list(), State); - -dispatch(_Socket, {list, Bucket}, State) -> - reply(kai_store:list(Bucket), State); - -dispatch(_Socket, {get, Key}, State) -> - reply(kai_store:get(Key), State); - -dispatch(_Socket, {put, Data}, State) when is_record(Data, data)-> - reply(kai_store:put(Data), State); - -dispatch(_Socket, {delete, Key}, State) -> - reply(kai_store:delete(Key), State); - -dispatch(_Socket, {check_node, Node}, State) -> - reply(kai_membership:check_node(Node), State); - -dispatch(_Socket, {route, Request}, State) -> - reply(kai_coordinator:route(Request), State); - -dispatch(_Socket, _Unknown, State) -> - reply({error, enotsup}, State). - -reply(Data, State) -> - {reply, term_to_binary(Data), State}. - -recv_response(ApiSocket) -> - receive - {tcp, ApiSocket, Bin} -> - {ok, binary_to_term(Bin)}; - {tcp_closed, ApiSocket} -> - {error, econnreset}; - {error, Reason} -> - {error, Reason} - - % Don't place Other alternative here. This is to avoid to catch event - % messages, '$gen_event' or something like that. Remember that this - % function can be called from gen_fsm/gen_event. - - after ?TIMEOUT -> - {error, timeout} - end. - -do_request(Node, Message) -> - case kai_connection:lease(Node, self()) of - {ok, ApiSocket} -> - case gen_tcp:send(ApiSocket, term_to_binary(Message)) of - ok -> - case recv_response(ApiSocket) of - {ok, Result} -> - kai_connection:return(ApiSocket), - {ok, Result}; - {error, Reason} -> - kai_connection:close(ApiSocket), - {error, Reason} - end; - {error, Reason} -> - kai_connection:close(ApiSocket), - {error, Reason} - end; - {error, Reason} -> - {error, Reason} - end. - -request(Node, Message) -> - case do_request(Node, Message) of - {ok, Result} -> - Result; - {error, Reason} -> - ?warning(io_lib:format("request(~p, ~p): ~p", - [Node, Message, {error, Reason}])), -% kai_membership:check_node(Node), - {error, Reason} - end. - -is_local_node(Node) -> - LocalNode = kai_config:get(node), - Node =:= LocalNode. - -node_info(Node) -> - case is_local_node(Node) of - true -> kai_config:node_info(); - _ -> request(Node, node_info) - end. - -node_list(Node) -> - case is_local_node(Node) of - true -> kai_hash:node_list(); - _ -> request(Node, node_list) - end. - -list(Node, Bucket) -> - case is_local_node(Node) of - true -> kai_store:list(Bucket); - _ -> request(Node, {list, Bucket}) - end. - -get(Node, Key) -> - case is_local_node(Node) of - true -> kai_store:get(Key); - _ -> request(Node, {get, Key}) - end. - -put(Node, Data) -> - case is_local_node(Node) of - true -> kai_store:put(Data); - _ -> request(Node, {put, Data}) - end. - -delete(Node, Key) -> - case is_local_node(Node) of - true -> kai_store:delete(Key); - _ -> request(Node, {delete, Key}) - end. - -check_node(Node, Node2) -> - case is_local_node(Node) of - true -> kai_membership:check_node(Node2); - _ -> request(Node, {check_node, Node2}) - end. - -route(Node, Request) -> - case is_local_node(Node) of - true -> {error, ewouldblock}; - _ -> request(Node, {route, Request}) - end. Modified: trunk/src/kai_config.erl =================================================================== --- trunk/src/kai_config.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_config.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -41,7 +41,7 @@ H -> H end, {ok, Address} = inet:getaddr(Hostname, inet), - Port = proplists:get_value(api_port, Args), + Port = proplists:get_value(rpc_port, Args), ets:insert(config, {node, {Address, Port}}), NumberOfBuckets = proplists:get_value(number_of_buckets, Args), Modified: trunk/src/kai_connection.erl =================================================================== --- trunk/src/kai_connection.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_connection.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -23,7 +23,6 @@ -include("kai.hrl"). -define(SERVER, ?MODULE). --define(TIMEOUT, 3000). -record(connection, {node, available, socket}). Modified: trunk/src/kai_coordinator.erl =================================================================== --- trunk/src/kai_coordinator.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_coordinator.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -18,7 +18,6 @@ -include("kai.hrl"). -define(SERVER, ?MODULE). --define(TIMEOUT_GATHER, 200). dispatch({Type, Data} = _Request) -> case Type of @@ -32,7 +31,7 @@ {error, ebusy}; do_route({_Type, Data} = Request, [Node|RestNodes]) -> % TODO: introduce TTL, in order to avoid infinite loop - case kai_api:route(Node, Request) of + case kai_rpc:route(Node, Request) of {error, Reason} -> ?warning(io_lib:format("do_route(~p, ~p): ~p", [Data#data.key, Node, {error, Reason}])), @@ -58,16 +57,18 @@ spawn(?MODULE, start_route, [Request, self(), Ref]), receive {Ref, Result} -> Result - after ?TIMEOUT_GATHER -> + after ?TIMEOUT -> ?warning(io_lib:format("route(~p): timeout", [Data#data.key])), [] end. -coordinate_get(#data{key=Key} = _Data) -> - {nodes, Nodes} = kai_hash:find_nodes(Key), +coordinate_get(Data) -> + {bucket, Bucket} = kai_hash:find_bucket(Data#data.key), + {nodes, Nodes } = kai_hash:find_nodes(Bucket), + Data2 = Data#data{bucket=Bucket}, Ref = make_ref(), lists:foreach( - fun(Node) -> spawn(?MODULE, map_in_get, [Node, Key, Ref, self()]) end, % Don't link + fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, % Don't link Nodes ), [N, R] = kai_config:get([n, r]), @@ -79,8 +80,8 @@ undefined end. -map_in_get(Node, Key, Ref, Pid) -> - case kai_api:get(Node, Key) of +map_in_get(Node, Data, Ref, Pid) -> + case kai_rpc:get(Node, Data) of {error, Reason} -> % kai_membership:check_node(Node), Pid ! {Ref, {error, Reason}}; @@ -100,7 +101,7 @@ gather_in_get(Ref, N-1, R-1, Results); {Ref, _Other} -> gather_in_get(Ref, N-1, R, Results) - after ?TIMEOUT_GATHER -> + after ?TIMEOUT -> ?warning("gather_in_get/4: timeout"), Results end. @@ -113,7 +114,7 @@ {nodes, Nodes } = kai_hash:find_nodes(Bucket), Ref = make_ref(), Data1 = - case kai_store:get(Key) of + case kai_store:get(Data#data{bucket=Bucket}) of PreviousData when is_record(PreviousData, data) -> PreviousData; undefined -> @@ -134,7 +135,7 @@ gather_in_put(Ref, N, W). map_in_put(Node, Data, Ref, Pid) -> - case kai_api:put(Node, Data) of + case kai_rpc:put(Node, Data) of {error, Reason} -> % kai_membership:check_node(Node), Pid ! {Ref, {error, Reason}}; @@ -150,23 +151,25 @@ receive {Ref, ok} -> gather_in_put(Ref, N-1, W-1); {Ref, _Other} -> gather_in_put(Ref, N-1, W) - after ?TIMEOUT_GATHER -> + after ?TIMEOUT -> ?warning("gather_in_put/3: timeout"), {error, etimedout} end. -coordinate_delete(#data{key=Key} = _Data) -> - {nodes, Nodes} = kai_hash:find_nodes(Key), +coordinate_delete(Data) -> + {bucket, Bucket} = kai_hash:find_bucket(Data#data.key), + {nodes, Nodes } = kai_hash:find_nodes(Bucket), + Data2 = Data#data{bucket=Bucket}, Ref = make_ref(), lists:foreach( - fun(Node) -> spawn(?MODULE, map_in_delete, [Node, Key, Ref, self()]) end, + fun(Node) -> spawn(?MODULE, map_in_delete, [Node, Data2, Ref, self()]) end, Nodes ), [N, W] = kai_config:get([n, w]), gather_in_delete(Ref, N, W, []). -map_in_delete(Node, Key, Ref, Pid) -> - case kai_api:delete(Node, Key) of +map_in_delete(Node, Data, Ref, Pid) -> + case kai_rpc:delete(Node, Data) of {error, Reason} -> % kai_membership:check_node(Node), Pid ! {Ref, {error, Reason}}; @@ -189,7 +192,7 @@ gather_in_delete(Ref, N-1, W-1, [undefined|Results]); {Ref, _Other} -> gather_in_delete(Ref, N-1, W, Results) - after ?TIMEOUT_GATHER -> + after ?TIMEOUT -> ?warning("gather_in_delete/4: timeout"), {error, etimedout} end. Modified: trunk/src/kai_membership.erl =================================================================== --- trunk/src/kai_membership.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_membership.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -23,8 +23,6 @@ -include("kai.hrl"). -define(SERVER, ?MODULE). --define(TIMEOUT, 3000). --define(TIMER, 1000). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). @@ -38,7 +36,7 @@ ping_nodes([], AvailableNodes, DownNodes) -> {AvailableNodes, DownNodes}; ping_nodes([Node|Nodes], AvailableNodes, DownNodes) -> - case kai_api:node_info(Node) of + case kai_rpc:node_info(Node) of {node_info, Node2, Info} -> ping_nodes(Nodes, [{Node2, Info}|AvailableNodes], DownNodes); {error, Reason} -> @@ -47,7 +45,7 @@ end. retrieve_node_list(Node) -> - case kai_api:node_list(Node) of + case kai_rpc:node_list(Node) of {node_list, RemoteNodeList} -> {node_list, LocalNodeList} = kai_hash:node_list(), NewNodes = RemoteNodeList -- LocalNodeList, @@ -93,9 +91,9 @@ handle_event(stop, _StateName, StateData) -> {stop, normal, StateData}. handle_sync_event(_Event, _From, _StateName, StateData) -> - {next_state, ready, StateData, 3000}. + {next_state, ready, StateData, ?TIMEOUT}. handle_info(_Info, _StateName, StateData) -> - {next_state, ready, StateData, 3000}. + {next_state, ready, StateData, ?TIMEOUT}. code_change(_OldVsn, _StateName, StateData, _Extra) -> {ok, ready, StateData}. Modified: trunk/src/kai_memcache.erl =================================================================== --- trunk/src/kai_memcache.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_memcache.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -18,7 +18,7 @@ -include("kai.hrl"). --define(TIMEOUT_CLIENT, 3000). +-define(MEMCACHE_TIMEOUT, ?TIMEOUT). start_link() -> kai_tcp_server:start_link( @@ -82,9 +82,9 @@ end, Data). recv_set_data(Socket, ["set", Key, Flags, "0", Bytes], State) -> - case gen_tcp:recv(Socket, list_to_integer(Bytes), ?TIMEOUT_CLIENT) of + case gen_tcp:recv(Socket, list_to_integer(Bytes), ?MEMCACHE_TIMEOUT) of {ok, Value} -> - gen_tcp:recv(Socket, 2, ?TIMEOUT_CLIENT), + gen_tcp:recv(Socket, 2, ?MEMCACHE_TIMEOUT), case kai_coordinator:route( {put, #data{key=Key, flags=Flags, value=Value}} ) of Copied: trunk/src/kai_rpc.erl (from rev 92, branches/takemaru_store_dets/src/kai_rpc.erl) =================================================================== --- trunk/src/kai_rpc.erl (rev 0) +++ trunk/src/kai_rpc.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -0,0 +1,174 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_rpc). +-behaviour(kai_tcp_server). + +-export([start_link/0, stop/0]). +-export([init/1, handle_call/3]). +-export([ + node_info/1, node_list/1, + list/2, get/2, put/2, delete/2, + check_node/2, route/2 +]). + +-include("kai.hrl"). + +start_link() -> + kai_tcp_server:start_link( + {local, ?MODULE}, + ?MODULE, + [], + #tcp_server_option{ + listen = [binary, {packet, 4}, {active, true}, {reuseaddr, true}], + port = kai_config:get(rpc_port), + max_processes = kai_config:get(rpc_max_processes) + } + ). + +stop() -> kai_tcp_server:stop(?MODULE). + +init(_Args) -> {ok, {}}. + +handle_call(Socket, Data, State) -> + dispatch(Socket, binary_to_term(Data), State). + +dispatch(_Socket, node_info, State) -> + reply(kai_config:node_info(), State); + +dispatch(_Socket, node_list, State) -> + reply(kai_hash:node_list(), State); + +dispatch(_Socket, {list, Bucket}, State) -> + reply(kai_store:list(Bucket), State); + +dispatch(_Socket, {get, Data}, State) -> + reply(kai_store:get(Data), State); + +dispatch(_Socket, {put, Data}, State) when is_record(Data, data)-> + reply(kai_store:put(Data), State); + +dispatch(_Socket, {delete, Data}, State) -> + reply(kai_store:delete(Data), State); + +dispatch(_Socket, {check_node, Node}, State) -> + reply(kai_membership:check_node(Node), State); + +dispatch(_Socket, {route, Request}, State) -> + reply(kai_coordinator:route(Request), State); + +dispatch(_Socket, _Unknown, State) -> + reply({error, enotsup}, State). + +reply(Data, State) -> + {reply, term_to_binary(Data), State}. + +recv_response(ApiSocket) -> + receive + {tcp, ApiSocket, Bin} -> + {ok, binary_to_term(Bin)}; + {tcp_closed, ApiSocket} -> + {error, econnreset}; + {error, Reason} -> + {error, Reason} + + % Don't place Other alternative here. This is to avoid to catch event + % messages, '$gen_event' or something like that. Remember that this + % function can be called from gen_fsm/gen_event. + + after ?TIMEOUT -> + {error, timeout} + end. + +do_request(Node, Message) -> + case kai_connection:lease(Node, self()) of + {ok, ApiSocket} -> + case gen_tcp:send(ApiSocket, term_to_binary(Message)) of + ok -> + case recv_response(ApiSocket) of + {ok, Result} -> + kai_connection:return(ApiSocket), + {ok, Result}; + {error, Reason} -> + kai_connection:close(ApiSocket), + {error, Reason} + end; + {error, Reason} -> + kai_connection:close(ApiSocket), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +request(Node, Message) -> + case do_request(Node, Message) of + {ok, Result} -> + Result; + {error, Reason} -> + ?warning(io_lib:format("request(~p, ~p): ~p", + [Node, Message, {error, Reason}])), +% kai_membership:check_node(Node), + {error, Reason} + end. + +is_local_node(Node) -> + LocalNode = kai_config:get(node), + Node =:= LocalNode. + +node_info(Node) -> + case is_local_node(Node) of + true -> kai_config:node_info(); + _ -> request(Node, node_info) + end. + +node_list(Node) -> + case is_local_node(Node) of + true -> kai_hash:node_list(); + _ -> request(Node, node_list) + end. + +list(Node, Bucket) -> + case is_local_node(Node) of + true -> kai_store:list(Bucket); + _ -> request(Node, {list, Bucket}) + end. + +get(Node, Data) -> + case is_local_node(Node) of + true -> kai_store:get(Data); + _ -> request(Node, {get, Data}) + end. + +put(Node, Data) -> + case is_local_node(Node) of + true -> kai_store:put(Data); + _ -> request(Node, {put, Data}) + end. + +delete(Node, Data) -> + case is_local_node(Node) of + true -> kai_store:delete(Data); + _ -> request(Node, {delete, Data}) + end. + +check_node(Node, Node2) -> + case is_local_node(Node) of + true -> kai_membership:check_node(Node2); + _ -> request(Node, {check_node, Node2}) + end. + +route(Node, Request) -> + case is_local_node(Node) of + true -> {error, ewouldblock}; + _ -> request(Node, {route, Request}) + end. Modified: trunk/src/kai_store.erl =================================================================== --- trunk/src/kai_store.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_store.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -11,94 +11,26 @@ % the License. -module(kai_store). --behaviour(gen_server). -export([start_link/0, stop/0]). -export([list/1, get/1, put/1, delete/1]). --export([ - init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3 -]). -include("kai.hrl"). -define(SERVER, ?MODULE). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). + Store = kai_config:get(store), + Module = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(Store)), + apply(Module, start_link, [?SERVER]). -init(_Args) -> - ets:new(data, [set, private, named_table, {keypos, 2}]), - {ok, []}. - -terminate(_Reason, _State) -> - ets:delete(data), - ok. - -do_list(Bucket, State) -> - Head = #data{ - key = '$1', - bucket = Bucket, - last_modified = '$2', - vector_clocks = '$3', - checksum = '$4', - flags = '_', - value = '_' - }, - Cond = [], - Body = [{#data{ - key = '$1', - bucket = Bucket, - last_modified = '$2', - vector_clocks = '$3', - checksum = '$4' - }}], - ListOfData = ets:select(data, [{Head, Cond, Body}]), - {reply, {list_of_data, ListOfData}, State}. - -do_get(Key, State) -> - case ets:lookup(data, Key) of - [Data] -> {reply, Data, State}; - _ -> {reply, undefined, State} - end. - -do_put(Data, State) when is_record(Data, data) -> - ets:insert(data, Data), - {reply, ok, State}. - -do_delete(Key, State) -> - case ets:lookup(data, Key) of - [_Data] -> - ets:delete(data, Key), - {reply, ok, State}; - _ -> - {reply, undefined, State} - end. - -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}; -handle_call({list, Bucket}, _From, State) -> - do_list(Bucket, State); -handle_call({get, Key}, _From, State) -> - do_get(Key, State); -handle_call({put, Data}, _From, State) -> - do_put(Data, State); -handle_call({delete, Key}, _From, State) -> - do_delete(Key, State). -handle_cast(_Msg, State) -> - {noreply, State}. -handle_info(_Info, State) -> - {noreply, State}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - stop() -> gen_server:call(?SERVER, stop). list(Bucket) -> gen_server:call(?SERVER, {list, Bucket}). -get(Key) -> - gen_server:call(?SERVER, {get, Key}). +get(Data) -> + gen_server:call(?SERVER, {get, Data}). put(Data) -> gen_server:call(?SERVER, {put, Data}). -delete(Key) -> - gen_server:call(?SERVER, {delete, Key}). +delete(Data) -> + gen_server:call(?SERVER, {delete, Data}). Copied: trunk/src/kai_store_dets.erl (from rev 92, branches/takemaru_store_dets/src/kai_store_dets.erl) =================================================================== --- trunk/src/kai_store_dets.erl (rev 0) +++ trunk/src/kai_store_dets.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -0,0 +1,116 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_store_dets). +-behaviour(gen_server). + +-export([start_link/1]). +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3 +]). + +-include("kai.hrl"). + +start_link(Server) -> + gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []). + +init(_Args) -> + Dir = kai_config:get(dets_dir), + NumberOfTables = kai_config:get(number_of_tables), + Tables = + lists:map( + fun(I) -> + Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(I)), + File = Dir ++ "/" ++ integer_to_list(I), + {ok, Table} = + dets:open_file(Name, [{type, set}, {keypos, 2}, {file, File}]), + {I, Table} + end, + lists:seq(1, NumberOfTables) + ), + {ok, Tables}. + +terminate(_Reason, Tables) -> + lists:foreach( + fun({_I, Table}) -> dets:close(Table) end, + Tables + ), + ok. + +bucket_to_table(Bucket, Tables) -> + NumberOfTables = kai_config:get(number_of_tables), + I = Bucket rem NumberOfTables + 1, + proplists:get_value(I, Tables). + +do_list(Bucket, Tables) -> + Table = bucket_to_table(Bucket, Tables), + Head = #data{ + key = '$1', + bucket = Bucket, + last_modified = '$2', + vector_clocks = '$3', + checksum = '$4', + flags = '_', + value = '_' + }, + Cond = [], + Body = [{#data{ + key = '$1', + bucket = Bucket, + last_modified = '$2', + vector_clocks = '$3', + checksum = '$4' + }}], + ListOfData = dets:select(Table, [{Head, Cond, Body}]), + {reply, {list_of_data, ListOfData}, Tables}. + +do_get(#data{key=Key, bucket=Bucket} = _Data, Tables) -> + Table = bucket_to_table(Bucket, Tables), + case dets:lookup(Table, Key) of + [Data] -> {reply, Data, Tables}; + _ -> {reply, undefined, Tables} + end. + +do_put(Data, Tables) when is_record(Data, data) -> + Table = bucket_to_table(Data#data.bucket, Tables), + dets:insert(Table, Data), + dets:sync(Table), + {reply, ok, Tables}. + +do_delete(#data{key=Key, bucket=Bucket} = _Data, Tables) -> + Table = bucket_to_table(Bucket, Tables), + case dets:lookup(Table, Key) of + [_Data2] -> + dets:delete(Table, Key), + dets:sync(Table), + {reply, ok, Tables}; + _ -> + {reply, undefined, Tables} + end. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call({list, Bucket}, _From, State) -> + do_list(Bucket, State); +handle_call({get, Data}, _From, State) -> + do_get(Data, State); +handle_call({put, Data}, _From, State) -> + do_put(Data, State); +handle_call({delete, Data}, _From, State) -> + do_delete(Data, State). +handle_cast(_Msg, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. Copied: trunk/src/kai_store_ets.erl (from rev 92, branches/takemaru_store_dets/src/kai_store_ets.erl) =================================================================== --- trunk/src/kai_store_ets.erl (rev 0) +++ trunk/src/kai_store_ets.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -0,0 +1,90 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_store_ets). +-behaviour(gen_server). + +-export([start_link/1]). +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3 +]). + +-include("kai.hrl"). + +start_link(Server) -> + gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []). + +init(_Args) -> + ets:new(?MODULE, [set, private, named_table, {keypos, 2}]), + {ok, []}. + +terminate(_Reason, _State) -> + ets:delete(?MODULE), + ok. + +do_list(Bucket, State) -> + Head = #data{ + key = '$1', + bucket = Bucket, + last_modified = '$2', + vector_clocks = '$3', + checksum = '$4', + flags = '_', + value = '_' + }, + Cond = [], + Body = [{#data{ + key = '$1', + bucket = Bucket, + last_modified = '$2', + vector_clocks = '$3', + checksum = '$4' + }}], + ListOfData = ets:select(?MODULE, [{Head, Cond, Body}]), + {reply, {list_of_data, ListOfData}, State}. + +do_get(#data{key=Key} = _Data, State) -> + case ets:lookup(?MODULE, Key) of + [Data] -> {reply, Data, State}; + _ -> {reply, undefined, State} + end. + +do_put(Data, State) when is_record(Data, data) -> + ets:insert(?MODULE, Data), + {reply, ok, State}. + +do_delete(#data{key=Key} = _Data, State) -> + case ets:lookup(?MODULE, Key) of + [_Data2] -> + ets:delete(?MODULE, Key), + {reply, ok, State}; + _ -> + {reply, undefined, State} + end. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call({list, Bucket}, _From, State) -> + do_list(Bucket, State); +handle_call({get, Data}, _From, State) -> + do_get(Data, State); +handle_call({put, Data}, _From, State) -> + do_put(Data, State); +handle_call({delete, Data}, _From, State) -> + do_delete(Data, State). +handle_cast(_Msg, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. Modified: trunk/src/kai_sup.erl =================================================================== --- trunk/src/kai_sup.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_sup.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -70,11 +70,11 @@ permanent, 1000, worker, [kai_membership] }, - Api = { - kai_api, - {kai_api, start_link, []}, + Rpc = { + kai_rpc, + {kai_rpc, start_link, []}, permanent, 1000, worker, - [kai_api] + [kai_rpc] }, Memcache = { kai_memcache, @@ -83,6 +83,6 @@ [kai_memcache] }, {ok, {{one_for_one, 3, 10}, [ - Config, Log, Hash, Store, Version, Connection, Sync, Membership, Api, + Config, Log, Hash, Store, Version, Connection, Sync, Membership, Rpc, Memcache ]}}. Modified: trunk/src/kai_sync.erl =================================================================== --- trunk/src/kai_sync.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/src/kai_sync.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -23,8 +23,6 @@ -include("kai.hrl"). -define(SERVER, ?MODULE). --define(TIMEOUT, 3000). --define(TIMER, 1000). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []). @@ -38,12 +36,11 @@ retrieve_data(_Node, []) -> ok; retrieve_data(Node, [Metadata|Rest]) -> - Key = Metadata#data.key, - case kai_store:get(Key) of + case kai_store:get(Metadata) of Data when is_record(Data, data) -> retrieve_data(Node, Rest); undefined -> - case kai_api:get(Node, Key) of + case kai_rpc:get(Node, Metadata) of Data when is_record(Data, data) -> kai_store:put(Data), retrieve_data(Node, Rest); @@ -58,7 +55,7 @@ do_update_bucket(_Bucket, []) -> {error, enodata}; do_update_bucket(Bucket, [Node|Rest]) -> - case kai_api:list(Node, Bucket) of + case kai_rpc:list(Node, Bucket) of {list_of_data, ListOfData} -> retrieve_data(Node, ListOfData); {error, Reason} -> @@ -74,7 +71,7 @@ do_delete_bucket([]) -> ok; do_delete_bucket([Metadata|Rest]) -> - kai_store:delete(Metadata#data.key), + kai_store:delete(Metadata), do_delete_bucket(Rest); do_delete_bucket(Bucket) -> {list_of_data, ListOfData} = kai_store:list(Bucket), @@ -96,7 +93,7 @@ handle_event(stop, _StateName, StateData) -> {stop, normal, StateData}. handle_sync_event(_Event, _From, _StateName, StateData) -> - {next_state, wait, StateData, 3000}. + {next_state, wait, StateData, ?TIMEOUT}. handle_info(_Info, _StateName, StateData) -> {next_state, ready, StateData, ?TIMER}. code_change(_OldVsn, _StateName, StateData, _Extra) -> Modified: trunk/test/Makefile =================================================================== --- trunk/test/Makefile 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/Makefile 2008-11-01 09:57:39 UTC (rev 93) @@ -28,7 +28,7 @@ MODS = kai_config_SUITE kai_log_SUITE kai_hash_SUITE kai_store_SUITE \ kai_version_SUITE kai_connection_SUITE kai_sync_SUITE \ kai_membership_SUITE kai_coordinator_SUITE kai_tcp_server_SUITE \ - kai_api_SUITE kai_memcache_SUITE + kai_rpc_SUITE kai_memcache_SUITE all: compile Modified: trunk/test/kai.coverspec =================================================================== --- trunk/test/kai.coverspec 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai.coverspec 2008-11-01 09:57:39 UTC (rev 93) @@ -2,6 +2,6 @@ {incl_mods, [ kai_config, kai_log, kai_hash, kai_store, kai_version, kai_connection, kai_sync, kai_membership, kai_coordinator, - kai_tcp_server, kai_api, kai_memcache + kai_tcp_server, kai_rpc, kai_memcache ]}. Deleted: trunk/test/kai_api_SUITE.erl =================================================================== --- trunk/test/kai_api_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_api_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -1,70 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(kai_api_SUITE). --compile(export_all). - --include("kai.hrl"). --include("kai_test.hrl"). - -all() -> [test1]. - -test1() -> []. -test1(_Conf) -> - kai_config:start_link([ - {hostname, "localhost"}, - {api_port, 11011}, - {api_max_processes, 2}, - {n, 3}, - {number_of_buckets, 8}, - {number_of_virtual_nodes, 2}]), - kai_hash:start_link(), - kai_store:start_link(), - kai_connection:start_link(), - kai_api:start_link(), - - timer:sleep(100), % wait for starting kai_api - - {node_info, ?NODE1, ?INFO} = kai_api:node_info(?NODE1), - - {node_list, [?NODE1]} = kai_api:node_list(?NODE1), - - Data = #data{ - key = "item-1", - bucket = 3, - last_modified = now(), - checksum = erlang:md5(<<"value-1">>), - flags = "0", - value = (<<"value-1">>) - }, - ok = kai_api:put(?NODE1, Data), - ?assertEqual(Data, kai_store:get("item-1")), - - ListOfData = #data{ - key = "item-1", - bucket = 3, - last_modified = Data#data.last_modified, - checksum = erlang:md5(<<"value-1">>) - }, - {list_of_data, [ListOfData]} = kai_api:list(?NODE1, 3), - - Data = kai_api:get(?NODE1, "item-1"), - - ok = kai_api:delete(?NODE1, "item-1"), - - undefined = kai_api:get(?NODE1, "item-1"), - - kai_api:stop(), - kai_connection:stop(), - kai_store:stop(), - kai_hash:stop(), - kai_config:stop(). Modified: trunk/test/kai_config_SUITE.erl =================================================================== --- trunk/test/kai_config_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_config_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -22,7 +22,7 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {n, 2}, {number_of_buckets, 16384}, {number_of_virtual_nodes, 128} Modified: trunk/test/kai_connection_SUITE.erl =================================================================== --- trunk/test/kai_connection_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_connection_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -44,7 +44,7 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {max_connections, 32}, {n, 3}, {number_of_buckets, 8}, @@ -95,7 +95,7 @@ kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {max_connections, MaxConnections}, {n, 3}, {number_of_buckets, 8}, Modified: trunk/test/kai_coordinator_SUITE.erl =================================================================== --- trunk/test/kai_coordinator_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_coordinator_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -22,18 +22,19 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, - {api_max_processes, 2}, + {rpc_port, 11011}, + {rpc_max_processes, 2}, {max_connections, 32}, {n, 1}, {r, 1}, {w, 1}, {number_of_buckets, 8}, - {number_of_virtual_nodes, 2} + {number_of_virtual_nodes, 2}, + {store, ets} ]), kai_hash:start_link(), kai_store:start_link(), kai_version:start_link(), kai_connection:start_link(), - kai_api:start_link(), + kai_rpc:start_link(), ?assertEqual( ok, @@ -70,7 +71,7 @@ kai_coordinator:route({delete, #data{key="item-1"}}) ), - kai_api:stop(), + kai_rpc:stop(), kai_connection:stop(), kai_version:stop(), kai_store:stop(), Modified: trunk/test/kai_hash_SUITE.erl =================================================================== --- trunk/test/kai_hash_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_hash_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -52,7 +52,7 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {n, 3}, {number_of_buckets, 8}, {number_of_virtual_nodes, 2} @@ -186,7 +186,7 @@ kai_config:start_link([ {hostname, "localhost"}, - {api_port, 1}, + {rpc_port, 1}, {n, 3}, {number_of_buckets, 16384}, % 16,384 = 128*64*2 {number_of_virtual_nodes, 128} Modified: trunk/test/kai_log_SUITE.erl =================================================================== --- trunk/test/kai_log_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_log_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -22,7 +22,7 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {n, 2}, {number_of_buckets, 16384}, % 16,384 = 128*64*2 {number_of_virtual_nodes, 128} Modified: trunk/test/kai_membership_SUITE.erl =================================================================== --- trunk/test/kai_membership_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_membership_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -52,7 +52,7 @@ gen_tcp:send(ApiSocket, term_to_binary({list_of_data, ListOfData})); test1_api_send(ApiSocket, {list, _Bucket}) -> gen_tcp:send(ApiSocket, term_to_binary({list_of_data, []})); -test1_api_send(ApiSocket, {get, "item-1"}) -> +test1_api_send(ApiSocket, {get, #data{key="item-1", bucket=3}}) -> Data = #data{ key = "item-1", bucket = 3, @@ -68,10 +68,11 @@ % This is NODE3, not NODE1 kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11013}, + {rpc_port, 11013}, {n, 3}, {number_of_buckets, 8}, - {number_of_virtual_nodes, 2} + {number_of_virtual_nodes, 2}, + {store, ets} ]), kai_hash:start_link(), kai_store:start_link(), Modified: trunk/test/kai_memcache_SUITE.erl =================================================================== --- trunk/test/kai_memcache_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_memcache_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -23,20 +23,21 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, - {api_max_processes, 2}, + {rpc_port, 11011}, + {rpc_max_processes, 2}, {memcache_port, 11211}, {memcache_max_processes, 2}, {max_connections, 32}, {n, 1}, {r, 1}, {w, 1}, {number_of_buckets, 8}, - {number_of_virtual_nodes, 2} + {number_of_virtual_nodes, 2}, + {store, ets} ]), kai_hash:start_link(), kai_store:start_link(), kai_version:start_link(), kai_connection:start_link(), - kai_api:start_link(), + kai_rpc:start_link(), kai_memcache:start_link(), timer:sleep(100), % wait for starting kai_memcache @@ -93,7 +94,7 @@ gen_tcp:close(MemcacheSocket), kai_memcache:stop(), - kai_api:stop(), + kai_rpc:stop(), kai_connection:stop(), kai_version:stop(), kai_store:stop(), Copied: trunk/test/kai_rpc_SUITE.erl (from rev 92, branches/takemaru_store_dets/test/kai_rpc_SUITE.erl) =================================================================== --- trunk/test/kai_rpc_SUITE.erl (rev 0) +++ trunk/test/kai_rpc_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -0,0 +1,72 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(kai_rpc_SUITE). +-compile(export_all). + +-include("kai.hrl"). +-include("kai_test.hrl"). + +all() -> [test1]. + +test1() -> []. +test1(_Conf) -> + kai_config:start_link([ + {hostname, "localhost"}, + {rpc_port, 11011}, + {rpc_max_processes, 2}, + {n, 3}, + {number_of_buckets, 8}, + {number_of_virtual_nodes, 2}, + {store, ets} + ]), + kai_hash:start_link(), + kai_store:start_link(), + kai_connection:start_link(), + kai_rpc:start_link(), + + timer:sleep(100), % wait for starting kai_rpc + + {node_info, ?NODE1, ?INFO} = kai_rpc:node_info(?NODE1), + + {node_list, [?NODE1]} = kai_rpc:node_list(?NODE1), + + Data = #data{ + key = "item-1", + bucket = 3, + last_modified = now(), + checksum = erlang:md5(<<"value-1">>), + flags = "0", + value = (<<"value-1">>) + }, + ok = kai_rpc:put(?NODE1, Data), + ?assertEqual(Data, kai_store:get(#data{key="item-1", bucket=3})), + + ListOfData = #data{ + key = "item-1", + bucket = 3, + last_modified = Data#data.last_modified, + checksum = erlang:md5(<<"value-1">>) + }, + {list_of_data, [ListOfData]} = kai_rpc:list(?NODE1, 3), + + Data = kai_rpc:get(?NODE1, #data{key="item-1", bucket=3}), + + ok = kai_rpc:delete(?NODE1, #data{key="item-1", bucket=3}), + + undefined = kai_rpc:get(?NODE1, #data{key="item-1", bucket=3}), + + kai_rpc:stop(), + kai_connection:stop(), + kai_store:stop(), + kai_hash:stop(), + kai_config:stop(). Modified: trunk/test/kai_store_SUITE.erl =================================================================== --- trunk/test/kai_store_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_store_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -16,10 +16,10 @@ -include("kai.hrl"). -include("kai_test.hrl"). -all() -> [test1, test2]. +all() -> [test_ets, test_dets, test_perf]. -test1() -> []. -test1(_Conf) -> +test(Conf) -> + kai_config:start_link(Conf), kai_store:start_link(), Data1 = #data{ @@ -31,13 +31,14 @@ value = (<<"value-1">>) }, kai_store:put(Data1), + ?assertEqual( Data1, - kai_store:get("item-1") + kai_store:get(#data{key="item-1", bucket=3}) ), ?assertEqual( undefined, - kai_store:get("item-2") + kai_store:get(#data{key="item-2", bucket=1}) ), Data2 = #data{ @@ -51,7 +52,7 @@ kai_store:put(Data2), ?assertEqual( Data2, - kai_store:get("item-2") + kai_store:get(#data{key="item-2", bucket=1}) ), Data3 = #data{ @@ -65,7 +66,7 @@ kai_store:put(Data3), ?assertEqual( Data3, - kai_store:get("item-3") + kai_store:get(#data{key="item-3", bucket=3}) ), {list_of_data, ListOfData1} = kai_store:list(1), @@ -91,22 +92,48 @@ kai_store:put(Data1b), ?assertEqual( Data1b, - kai_store:get("item-1") + kai_store:get(#data{key="item-1", bucket=3}) ), - kai_store:delete("item-1"), + kai_store:delete(#data{key="item-1", bucket=3}), ?assertEqual( undefined, - kai_store:get("item-1") + kai_store:get(#data{key="item-1", bucket=3}) ), {list_of_data, ListOfData4} = kai_store:list(3), ?assertEqual(1, length(ListOfData4)), ?assert(lists:keymember("item-3", 2, ListOfData4)), - kai_store:stop(). + kai_store:stop(), + kai_config:stop(). -test2_put(T) -> +test_ets() -> []. +test_ets(_Conf) -> + test([ + {hostname, "localhost"}, + {rpc_port, 11011}, + {n, 3}, + {number_of_buckets, 8}, + {number_of_virtual_nodes, 2}, + {store, ets} + ]). + +test_dets() -> []. +test_dets(_Conf) -> + test([ + {hostname, "localhost"}, + {rpc_port, 11011}, + {n, 3}, + {number_of_buckets, 8}, + {number_of_virtual_nodes, 2}, + {store, dets}, + {dets_dir, "."}, + {number_of_tables, 2} + ]), + file:delete("./1"), file:delete("./2"). + +test_perf_put(T) -> lists:foreach( fun(I) -> Key = "item-" ++ integer_to_list(I), @@ -124,27 +151,36 @@ lists:seq(1, T) ). -test2_get(T) -> +test_perf_get(T) -> lists:foreach( fun(I) -> Key = "item-" ++ integer_to_list(I), - kai_store:get(Key) + kai_store:get(#data{key=Key, bucket=0}) end, lists:seq(1, T) ). -test2() -> []. -test2(_Conf) -> +test_perf() -> []. +test_perf(_Conf) -> + kai_config:start_link([ + {hostname, "localhost"}, + {rpc_port, 11011}, + {n, 3}, + {number_of_buckets, 8}, + {number_of_virtual_nodes, 2}, + {store, ets} + ]), kai_store:start_link(), T = 10000, - {Usec, _} = timer:tc(?MODULE, test2_put, [T]), + {Usec, _} = timer:tc(?MODULE, test_perf_put, [T]), ?assert(Usec < 100*T), io:format("average time to put data: ~p [usec]", [Usec/T]), - {Usec2, _} = timer:tc(?MODULE, test2_get, [T]), + {Usec2, _} = timer:tc(?MODULE, test_perf_get, [T]), ?assert(Usec2 < 100*T), io:format("average time to get data: ~p [usec]", [Usec2/T]), - kai_store:stop(). + kai_store:stop(), + kai_config:stop(). Modified: trunk/test/kai_sync_SUITE.erl =================================================================== --- trunk/test/kai_sync_SUITE.erl 2008-10-31 14:06:07 UTC (rev 92) +++ trunk/test/kai_sync_SUITE.erl 2008-11-01 09:57:39 UTC (rev 93) @@ -39,7 +39,7 @@ test1_api_send(ApiSocket, {list, 0 = _Bucket}) -> Data4 = #data{ key = ("item-4"), - bucket = 3, + bucket = 0, last_modified = now(), checksum = erlang:md5(<<"item-4">>) }, @@ -58,7 +58,7 @@ checksum = erlang:md5(<<"item-3">>) }, gen_tcp:send(ApiSocket, term_to_binary({list_of_data, [Data1, Data3]})); -test1_api_send(ApiSocket, {get, "item-3"}) -> +test1_api_send(ApiSocket, {get, #data{key="item-3", bucket=3}}) -> Data3 = #data{ key = "item-3", bucket = 3, @@ -68,7 +68,7 @@ value = (<<"value-3">>) }, gen_tcp:send(ApiSocket, term_to_binary(Data3)); -test1_api_send(ApiSocket, {get, "item-4"}) -> +test1_api_send(ApiSocket, {get, #data{key="item-4", bucket=0}}) -> Data4 = #data{ key = "item-4", bucket = 0, @@ -82,10 +82,11 @@ test1(_Conf) -> kai_config:start_link([ {hostname, "localhost"}, - {api_port, 11011}, + {rpc_port, 11011}, {n, 3}, {number_of_buckets, 8}, - {number_of_virtual_nodes, 2} + {number_of_virtual_nodes, 2}, + {store, ets} ]), kai_hash:start_link(), kai_store:start_link(), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |