Revision: 85
http://kai.svn.sourceforge.net/kai/?rev=85&view=rev
Author: takemaru
Date: 2008-08-28 13:03:48 +0000 (Thu, 28 Aug 2008)
Log Message:
-----------
* 0.2.0 released:
- Better stability under high load
- Introducing connection pooling
- Avoidance of deadlock
- Fixing a bug of node list in the consistent hash
- Preparation for vector clocks
- Routing requests to the coordinator
- Operability issues
- EDoc and dialyzer
Modified Paths:
--------------
tags/0.2.0/Makefile
trunk/Makefile
Added Paths:
-----------
tags/0.2.0/
tags/0.2.0/include/kai.hrl
tags/0.2.0/kai.config
tags/0.2.0/src/Makefile
tags/0.2.0/src/kai.app.src
tags/0.2.0/src/kai.erl
tags/0.2.0/src/kai_api.erl
tags/0.2.0/src/kai_config.erl
tags/0.2.0/src/kai_connection.erl
tags/0.2.0/src/kai_hash.erl
tags/0.2.0/src/kai_membership.erl
tags/0.2.0/src/kai_memcache.erl
tags/0.2.0/src/kai_sup.erl
tags/0.2.0/src/kai_tcp_server.erl
tags/0.2.0/test/Makefile
tags/0.2.0/test/kai.coverspec
tags/0.2.0/test/kai_api_SUITE.erl
tags/0.2.0/test/kai_config_SUITE.erl
tags/0.2.0/test/kai_connection_SUITE.erl
tags/0.2.0/test/kai_coordinator_SUITE.erl
tags/0.2.0/test/kai_hash_SUITE.erl
tags/0.2.0/test/kai_log_SUITE.erl
tags/0.2.0/test/kai_membership_SUITE.erl
tags/0.2.0/test/kai_memcache_SUITE.erl
tags/0.2.0/test/kai_sync_SUITE.erl
tags/0.2.0/test/kai_tcp_server_SUITE.erl
Removed Paths:
-------------
tags/0.2.0/include/kai.hrl
tags/0.2.0/kai.config
tags/0.2.0/src/Makefile
tags/0.2.0/src/kai.app.src
tags/0.2.0/src/kai.erl
tags/0.2.0/src/kai_api.erl
tags/0.2.0/src/kai_config.erl
tags/0.2.0/src/kai_hash.erl
tags/0.2.0/src/kai_membership.erl
tags/0.2.0/src/kai_memcache.erl
tags/0.2.0/src/kai_sup.erl
tags/0.2.0/src/kai_tcp_server.erl
tags/0.2.0/test/Makefile
tags/0.2.0/test/kai.coverspec
tags/0.2.0/test/kai_api_SUITE.erl
tags/0.2.0/test/kai_config_SUITE.erl
tags/0.2.0/test/kai_coordinator_SUITE.erl
tags/0.2.0/test/kai_hash_SUITE.erl
tags/0.2.0/test/kai_log_SUITE.erl
tags/0.2.0/test/kai_membership_SUITE.erl
tags/0.2.0/test/kai_memcache_SUITE.erl
tags/0.2.0/test/kai_sync_SUITE.erl
tags/0.2.0/test/kai_tcp_server_SUITE.erl
Modified: tags/0.2.0/Makefile
===================================================================
--- trunk/Makefile 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/Makefile 2008-08-28 13:03:48 UTC (rev 85)
@@ -9,7 +9,7 @@
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
-KAI_VSN=0.1.0
+KAI_VSN=0.2.0
ifndef ROOT
ROOT=$(shell pwd)
Deleted: tags/0.2.0/include/kai.hrl
===================================================================
--- trunk/include/kai.hrl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/include/kai.hrl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,34 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--record(data, {
- key, bucket, last_modified, vector_clocks, checksum, flags, value
-}).
-
--record(tcp_server_option, {
- listen = [{active, false}, binary, {packet, line}, {reuseaddr, true}],
- port = 11211,
- max_connections = 8,
- max_restarts = 3,
- time = 60,
- shutdown = 2000,
- accept_timeout = infinity,
- recv_length = 0,
- recv_timeout = infinity
-}).
-
--define(error (Data), kai_log:log(error, self(), ?FILE, ?LINE, Data)).
--define(warning(Data), kai_log:log(warning, self(), ?FILE, ?LINE, Data)).
--define(info (Data), kai_log:log(info, self(), ?FILE, ?LINE, Data)).
-
-%-define(debug(Data), kai_log:log(debug, self(), ?FILE, ?LINE, Data)).
--define(debug(_Data), ok).
Copied: tags/0.2.0/include/kai.hrl (from rev 84, trunk/include/kai.hrl)
===================================================================
--- tags/0.2.0/include/kai.hrl (rev 0)
+++ tags/0.2.0/include/kai.hrl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(data, {
+ key, bucket, last_modified, vector_clocks, checksum, flags, value
+}).
+
+-record(tcp_server_option, {
+ listen = [{active, false}, binary, {packet, line}, {reuseaddr, true}],
+ port = 11211,
+ max_processes = 8,
+ max_restarts = 3,
+ time = 60,
+ shutdown = 2000,
+ accept_timeout = infinity,
+ accept_error_sleep_time = 3000,
+ recv_length = 0,
+ recv_timeout = infinity
+}).
+
+-define(error (Data), kai_log:log(error, self(), ?FILE, ?LINE, Data)).
+-define(warning(Data), kai_log:log(warning, self(), ?FILE, ?LINE, Data)).
+-define(info (Data), kai_log:log(info, self(), ?FILE, ?LINE, Data)).
+
+%-define(debug(Data), kai_log:log(debug, self(), ?FILE, ?LINE, Data)).
+-define(debug(_Data), ok).
Deleted: tags/0.2.0/kai.config
===================================================================
--- trunk/kai.config 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/kai.config 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,11 +0,0 @@
-[{kai, [%{logfile, "kai.log"},
- %{hostname, "localhost"},
- {port, 11011},
- {max_connections, 30},
- {memcache_port, 11211},
- {memcache_max_connections, 10},
- {n, 3},
- {r, 2},
- {w, 2},
- {number_of_buckets, 1024},
- {number_of_virtual_nodes, 128}]}].
Copied: tags/0.2.0/kai.config (from rev 84, trunk/kai.config)
===================================================================
--- tags/0.2.0/kai.config (rev 0)
+++ tags/0.2.0/kai.config 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,12 @@
+[{kai, [%{logfile, "kai.log"},
+ %{hostname, "localhost"},
+ {api_port, 11011},
+ {api_max_processes, 30},
+ {memcache_port, 11211},
+ {memcache_max_processes, 10},
+ {max_connections, 32},
+ {n, 3},
+ {r, 2},
+ {w, 2},
+ {number_of_buckets, 1024},
+ {number_of_virtual_nodes, 128}]}].
Deleted: tags/0.2.0/src/Makefile
===================================================================
--- trunk/src/Makefile 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/Makefile 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,43 +0,0 @@
-## Licensed under the Apache License, Version 2.0 (the "License"); you may not
-## use this file except in compliance with the License. You may obtain a copy
-## of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-## License for the specific language governing permissions and limitations
-## under the License.
-EMULATOR=beam
-EBIN = $(ROOT)/ebin
-INCLUDE = $(ROOT)/include
-
-APP_TARGET = $(EBIN)/kai.app
-
-ERL_COMPILE_FLAGS += \
- +warn_unused_vars +nowarn_shadow_vars +warn_unused_import \
- +debug_info
-
-SOURCES = \
- kai_config kai_log kai_hash kai_store kai_version \
- kai_coordinator kai_sync kai_membership kai_sup kai \
- kai_tcp_server kai_api kai_memcache vclock
-
-MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET)
-
-$(EBIN)/%.$(EMULATOR):%.erl
- erlc -pa $(EBIN) -W $(ERL_COMPILE_FLAGS) -I$(INCLUDE) -o$(EBIN) $<
-
-all: $(MODS)
-
-$(MODS): $(INCLUDE)/kai.hrl
-
-dialyze:
- dialyzer --succ_typings -c ${SOURCES:%=%.erl}
-
-clean:
- rm -rf $(EBIN)/*.$(EMULATOR) $(APP_TARGET) $(EBIN)/erl_crash.dump *~
-
-$(APP_TARGET): kai.app.src Makefile
- sed -e 's;%VSN%;$(KAI_VSN);' $< > $@
Copied: tags/0.2.0/src/Makefile (from rev 84, trunk/src/Makefile)
===================================================================
--- tags/0.2.0/src/Makefile (rev 0)
+++ tags/0.2.0/src/Makefile 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,43 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy
+## of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations
+## under the License.
+EMULATOR=beam
+EBIN = $(ROOT)/ebin
+INCLUDE = $(ROOT)/include
+
+APP_TARGET = $(EBIN)/kai.app
+
+ERL_COMPILE_FLAGS += \
+ +warn_unused_vars +nowarn_shadow_vars +warn_unused_import \
+ +debug_info
+
+SOURCES = \
+ kai_config kai_log kai_hash kai_store kai_version kai_connection \
+ kai_sync kai_membership kai_coordinator kai_tcp_server kai_api \
+ kai_memcache kai_sup kai vclock
+
+MODS = ${SOURCES:%=$(EBIN)/%.$(EMULATOR)} $(APP_TARGET)
+
+$(EBIN)/%.$(EMULATOR):%.erl
+ erlc -pa $(EBIN) -W $(ERL_COMPILE_FLAGS) -I$(INCLUDE) -o$(EBIN) $<
+
+all: $(MODS)
+
+$(MODS): $(INCLUDE)/kai.hrl
+
+dialyze:
+ dialyzer --succ_typings -c ${SOURCES:%=%.erl}
+
+clean:
+ rm -rf $(EBIN)/*.$(EMULATOR) $(APP_TARGET) $(EBIN)/erl_crash.dump *~
+
+$(APP_TARGET): kai.app.src Makefile
+ sed -e 's;%VSN%;$(KAI_VSN);' $< > $@
Deleted: tags/0.2.0/src/kai.app.src
===================================================================
--- trunk/src/kai.app.src 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai.app.src 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,24 +0,0 @@
-{application, kai,
- [{description, "Kai - A distributed hashtable like Amazon's Dynamo"},
- {vsn, "%VSN%"},
- {modules, [
- kai, kai_sup, kai_memcache, kai_api, kai_membership, kai_sync,
- kai_coordinator, kai_version, kai_store, kai_hash, kai_config
- ]},
- {registered, [
- kai_sup, kai_membership, kai_sync, kai_version, kai_store, kai_hash,
- kai_log, kai_config
- ]},
- {applications, [kernel, stdlib]},
- {mod, {kai, []}},
- {start_phases, []},
- {env, [
- {port, 11011},
- {max_connections, 30},
- {memcache_port, 11211},
- {memcache_max_connections, 10},
- {n, 3}, {r, 2}, {w, 2},
- {number_of_buckets, 1024},
- {number_of_virtual_nodes, 128}
- ]}
- ]}.
Copied: tags/0.2.0/src/kai.app.src (from rev 84, trunk/src/kai.app.src)
===================================================================
--- tags/0.2.0/src/kai.app.src (rev 0)
+++ tags/0.2.0/src/kai.app.src 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,26 @@
+{application, kai,
+ [{description, "Kai - A distributed hashtable like Amazon's Dynamo"},
+ {vsn, "%VSN%"},
+ {modules, [
+ kai, kai_sup, kai_memcache, kai_api, kai_tcp_server, kai_coordinator,
+ kai_membership, kai_sync, kai_connection, kai_version, kai_store,
+ kai_hash, kai_log, kai_config
+ ]},
+ {registered, [
+ kai_sup, kai_membership, kai_sync, kai_connection, kai_version, kai_store,
+ kai_hash, kai_log, kai_config
+ ]},
+ {applications, [kernel, stdlib]},
+ {mod, {kai, []}},
+ {start_phases, []},
+ {env, [
+ {api_port, 11011},
+ {api_max_processes, 30},
+ {memcache_port, 11211},
+ {memcache_max_processes, 10},
+ {max_connections, 32},
+ {n, 3}, {r, 2}, {w, 2},
+ {number_of_buckets, 1024},
+ {number_of_virtual_nodes, 128}
+ ]}
+ ]}.
Deleted: tags/0.2.0/src/kai.erl
===================================================================
--- trunk/src/kai.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,37 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai).
--behaviour(application).
-
--export([start/2, stop/1]).
-
-config([], Acc) ->
- Acc;
-config([Key|Rest], Acc) ->
- case application:get_env(kai, Key) of
- undefined -> config(Rest, Acc);
- {ok, Value} -> config(Rest, [{Key, Value}|Acc])
- end.
-
-start(_Type, _Args) ->
- Args = config([
- logfile, hostname,
- port, max_connections,
- memcache_port, memcache_max_connections,
- n, r, w,
- number_of_buckets, number_of_virtual_nodes
- ], []),
- kai_sup:start_link(Args).
-
-stop(_State) ->
- ok.
Copied: tags/0.2.0/src/kai.erl (from rev 84, trunk/src/kai.erl)
===================================================================
--- tags/0.2.0/src/kai.erl (rev 0)
+++ tags/0.2.0/src/kai.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai).
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+config([], Acc) ->
+ Acc;
+config([Key|Rest], Acc) ->
+ case application:get_env(kai, Key) of
+ undefined -> config(Rest, Acc);
+ {ok, Value} -> config(Rest, [{Key, Value}|Acc])
+ end.
+
+start(_Type, _Args) ->
+ Args = config([
+ logfile, hostname,
+ api_port, api_max_processes,
+ memcache_port, memcache_max_processes,
+ max_connections,
+ n, r, w,
+ number_of_buckets, number_of_virtual_nodes
+ ], []),
+ kai_sup:start_link(Args).
+
+stop(_State) ->
+ ok.
Deleted: tags/0.2.0/src/kai_api.erl
===================================================================
--- trunk/src/kai_api.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_api.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,165 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_api).
--behaviour(kai_tcp_server).
-
--export([start_link/0, stop/0]).
--export([init/1, handle_call/3]).
--export([
- node_info/1, node_list/1,
- list/2, get/2, put/2, delete/2,
- check_node/2, route/2
-]).
-
--include("kai.hrl").
-
--define(TIMEOUT, 3000).
-
-start_link() ->
- kai_tcp_server:start_link(
- {local, ?MODULE},
- ?MODULE,
- [],
- #tcp_server_option{
- listen = [binary, {packet, 4}, {active, true}, {reuseaddr, true}],
- port = kai_config:get(port),
- max_connections = kai_config:get(max_connections)
- }
- ).
-
-stop() -> kai_tcp_server:stop(?MODULE).
-
-init(_Args) -> {ok, {}}.
-
-handle_call(Socket, Data, State) ->
- dispatch(Socket, binary_to_term(Data), State).
-
-dispatch(_Socket, node_info, State) ->
- reply(kai_config:node_info(), State);
-
-dispatch(_Socket, node_list, State) ->
- reply(kai_hash:node_list(), State);
-
-dispatch(_Socket, {list, Bucket}, State) ->
- reply(kai_store:list(Bucket), State);
-
-dispatch(_Socket, {get, Key}, State) ->
- reply(kai_store:get(Key), State);
-
-dispatch(_Socket, {put, Data}, State) when is_record(Data, data)->
- reply(kai_store:put(Data), State);
-
-dispatch(_Socket, {delete, Key}, State) ->
- reply(kai_store:delete(Key), State);
-
-dispatch(_Socket, {check_node, Node}, State) ->
- reply(kai_membership:check_node(Node), State);
-
-dispatch(_Socket, {route, Request}, State) ->
- reply(kai_coordinator:route(Request), State);
-
-dispatch(_Socket, _Unknown, State) ->
- reply({error, enotsup}, State).
-
-reply(Data, State) ->
- {reply, term_to_binary(Data), State}.
-
-recv_response(ApiSocket, Node, Message) ->
- receive
- {tcp, ApiSocket, Bin} ->
- binary_to_term(Bin);
- {tcp_closed, ApiSocket} ->
- ?warning(io_lib:format("recv_response(~p, ~p): ~p",
- [Node, Message, {error, econnreset}])),
- {error, econnreset};
- {error, Reason} ->
- ?warning(io_lib:format("recv_response(~p, ~p): ~p",
- [Node, Message, {error, Reason}])),
- {error, Reason}
-
- % Don't place Other alternative here. This is to avoid to catch event
- % messages, '$gen_event' or something like that. Remember that this
- % function can be called from gen_fsm/gen_event.
-
- after ?TIMEOUT ->
- ?warning(io_lib:format("recv_response(~p, ~p): ~p",
- [Node, Message, {error, etimedout}])),
- {error, etimedout}
- end.
-
-send_request({Address, Port} = Node, Message) ->
- case gen_tcp:connect(
- Address, Port, [binary, {packet, 4}, {reuseaddr, true}], ?TIMEOUT
- ) of
- {ok, ApiSocket} ->
- gen_tcp:send(ApiSocket, term_to_binary(Message)),
- Response = recv_response(ApiSocket, Node, Message),
- gen_tcp:close(ApiSocket),
- Response;
- {error, Reason} ->
- ?warning(io_lib:format("send_request(~p, ~p): ~p",
- [Node, Message, {error, Reason}])),
- {error, Reason}
- end.
-
-is_local_node(Node) ->
- LocalNode = kai_config:get(node),
- Node =:= LocalNode.
-
-node_info(Node) ->
- case is_local_node(Node) of
- true -> kai_config:node_info();
- _ -> send_request(Node, node_info)
- end.
-
-node_list(Node) ->
- case is_local_node(Node) of
- true -> kai_hash:node_list();
- _ -> send_request(Node, node_list)
- end.
-
-list(Node, Bucket) ->
- case is_local_node(Node) of
- true -> kai_store:list(Bucket);
- _ -> send_request(Node, {list, Bucket})
- end.
-
-get(Node, Key) ->
- case is_local_node(Node) of
- true -> kai_store:get(Key);
- _ -> send_request(Node, {get, Key})
- end.
-
-put(Node, Data) ->
- case is_local_node(Node) of
- true -> kai_store:put(Data);
- _ -> send_request(Node, {put, Data})
- end.
-
-delete(Node, Key) ->
- case is_local_node(Node) of
- true -> kai_store:delete(Key);
- _ -> send_request(Node, {delete, Key})
- end.
-
-check_node(Node, Node2) ->
- case is_local_node(Node) of
- true -> kai_membership:check_node(Node2);
- _ -> send_request(Node, {check_node, Node2})
- end.
-
-route(Node, Request) ->
- case is_local_node(Node) of
- true -> {error, ewouldblock};
- _ -> send_request(Node, {route, Request})
- end.
Copied: tags/0.2.0/src/kai_api.erl (from rev 84, trunk/src/kai_api.erl)
===================================================================
--- tags/0.2.0/src/kai_api.erl (rev 0)
+++ tags/0.2.0/src/kai_api.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,176 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_api).
+-behaviour(kai_tcp_server).
+
+-export([start_link/0, stop/0]).
+-export([init/1, handle_call/3]).
+-export([
+ node_info/1, node_list/1,
+ list/2, get/2, put/2, delete/2,
+ check_node/2, route/2
+]).
+
+-include("kai.hrl").
+
+-define(TIMEOUT, 3000).
+
+start_link() ->
+ kai_tcp_server:start_link(
+ {local, ?MODULE},
+ ?MODULE,
+ [],
+ #tcp_server_option{
+ listen = [binary, {packet, 4}, {active, true}, {reuseaddr, true}],
+ port = kai_config:get(api_port),
+ max_processes = kai_config:get(api_max_processes)
+ }
+ ).
+
+stop() -> kai_tcp_server:stop(?MODULE).
+
+init(_Args) -> {ok, {}}.
+
+handle_call(Socket, Data, State) ->
+ dispatch(Socket, binary_to_term(Data), State).
+
+dispatch(_Socket, node_info, State) ->
+ reply(kai_config:node_info(), State);
+
+dispatch(_Socket, node_list, State) ->
+ reply(kai_hash:node_list(), State);
+
+dispatch(_Socket, {list, Bucket}, State) ->
+ reply(kai_store:list(Bucket), State);
+
+dispatch(_Socket, {get, Key}, State) ->
+ reply(kai_store:get(Key), State);
+
+dispatch(_Socket, {put, Data}, State) when is_record(Data, data)->
+ reply(kai_store:put(Data), State);
+
+dispatch(_Socket, {delete, Key}, State) ->
+ reply(kai_store:delete(Key), State);
+
+dispatch(_Socket, {check_node, Node}, State) ->
+ reply(kai_membership:check_node(Node), State);
+
+dispatch(_Socket, {route, Request}, State) ->
+ reply(kai_coordinator:route(Request), State);
+
+dispatch(_Socket, _Unknown, State) ->
+ reply({error, enotsup}, State).
+
+reply(Data, State) ->
+ {reply, term_to_binary(Data), State}.
+
+recv_response(ApiSocket) ->
+ receive
+ {tcp, ApiSocket, Bin} ->
+ {ok, binary_to_term(Bin)};
+ {tcp_closed, ApiSocket} ->
+ {error, econnreset};
+ {error, Reason} ->
+ {error, Reason}
+
+ % Don't place Other alternative here. This is to avoid to catch event
+ % messages, '$gen_event' or something like that. Remember that this
+ % function can be called from gen_fsm/gen_event.
+
+ after ?TIMEOUT ->
+ {error, timeout}
+ end.
+
+do_request(Node, Message) ->
+ case kai_connection:lease(Node, self()) of
+ {ok, ApiSocket} ->
+ case gen_tcp:send(ApiSocket, term_to_binary(Message)) of
+ ok ->
+ case recv_response(ApiSocket) of
+ {ok, Result} ->
+ kai_connection:return(ApiSocket),
+ {ok, Result};
+ {error, Reason} ->
+ kai_connection:close(ApiSocket),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ kai_connection:close(ApiSocket),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+request(Node, Message) ->
+ case do_request(Node, Message) of
+ {ok, Result} ->
+ Result;
+ {error, Reason} ->
+ ?warning(io_lib:format("request(~p, ~p): ~p",
+ [Node, Message, {error, Reason}])),
+% kai_membership:check_node(Node),
+ {error, Reason}
+ end.
+
+is_local_node(Node) ->
+ LocalNode = kai_config:get(node),
+ Node =:= LocalNode.
+
+node_info(Node) ->
+ case is_local_node(Node) of
+ true -> kai_config:node_info();
+ _ -> request(Node, node_info)
+ end.
+
+node_list(Node) ->
+ case is_local_node(Node) of
+ true -> kai_hash:node_list();
+ _ -> request(Node, node_list)
+ end.
+
+list(Node, Bucket) ->
+ case is_local_node(Node) of
+ true -> kai_store:list(Bucket);
+ _ -> request(Node, {list, Bucket})
+ end.
+
+get(Node, Key) ->
+ case is_local_node(Node) of
+ true -> kai_store:get(Key);
+ _ -> request(Node, {get, Key})
+ end.
+
+put(Node, Data) ->
+ case is_local_node(Node) of
+ true -> kai_store:put(Data);
+ _ -> request(Node, {put, Data})
+ end.
+
+delete(Node, Key) ->
+ case is_local_node(Node) of
+ true -> kai_store:delete(Key);
+ _ -> request(Node, {delete, Key})
+ end.
+
+check_node(Node, Node2) ->
+ case is_local_node(Node) of
+ true -> kai_membership:check_node(Node2);
+ _ -> request(Node, {check_node, Node2})
+ end.
+
+route(Node, Request) ->
+ case is_local_node(Node) of
+ true -> {error, ewouldblock};
+ _ -> request(Node, {route, Request})
+ end.
Deleted: tags/0.2.0/src/kai_config.erl
===================================================================
--- trunk/src/kai_config.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_config.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,97 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_config).
--behaviour(gen_server).
-
--export([start_link/1, stop/0]).
--export([get/1, node_info/0]).
--export([
- init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3
-]).
-
--include("kai.hrl").
-
--define(SERVER, ?MODULE).
-
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Args, _Opts = []).
-
-init(Args) ->
- ets:new(config, [set, private, named_table]),
-
- lists:foreach(
- fun({Key, Value}) -> ets:insert(config, {Key, Value}) end,
- Args
- ),
-
- Hostname =
- case proplists:get_value(hostname, Args) of
- undefined -> {ok, H} = inet:gethostname(), H;
- H -> H
- end,
- {ok, Address} = inet:getaddr(Hostname, inet),
- Port = proplists:get_value(port, Args),
- ets:insert(config, {node, {Address, Port}}),
-
- NumberOfBuckets = proplists:get_value(number_of_buckets, Args),
- Exponent = round( math:log(NumberOfBuckets) / math:log(2) ),
- ets:insert(config, {number_of_buckets, trunc( math:pow(2, Exponent) )}),
-
- {ok, []}.
-
-terminate(_Reason, _State) ->
- ets:delete(config),
- ok.
-
-do_get(Key) ->
- case ets:lookup(config, Key) of
- [{Key, Value}|_] -> Value;
- _ -> undefined
- end.
-
-do_get([], ListOfValues) ->
- lists:reverse(ListOfValues);
-do_get([Key|Rest], ListOfValues) ->
- do_get(Rest, [do_get(Key)|ListOfValues]).
-
-get(ListOfKeys, State) when is_list(ListOfKeys)->
- {reply, do_get(ListOfKeys, []), State};
-get(Key, State) ->
- {reply, do_get(Key), State}.
-
-node_info(State) ->
- [LocalNode, NumberOfVirtualNode] =
- do_get([node, number_of_virtual_nodes], []),
- Info = [{number_of_virtual_nodes, NumberOfVirtualNode}],
- {reply, {node_info, LocalNode, Info}, State}.
-
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State};
-handle_call({get, Key}, _From, State) ->
- get(Key, State);
-handle_call(node_info, _From, State) ->
- node_info(State).
-handle_cast(_Msg, State) ->
- {noreply, State}.
-handle_info(_Info, State) ->
- {noreply, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-stop() ->
- gen_server:call(?SERVER, stop).
-get(Key) ->
- gen_server:call(?SERVER, {get, Key}).
-node_info() ->
- gen_server:call(?SERVER, node_info).
Copied: tags/0.2.0/src/kai_config.erl (from rev 84, trunk/src/kai_config.erl)
===================================================================
--- tags/0.2.0/src/kai_config.erl (rev 0)
+++ tags/0.2.0/src/kai_config.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_config).
+-behaviour(gen_server).
+
+-export([start_link/1, stop/0]).
+-export([get/1, node_info/0]).
+-export([
+ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3
+]).
+
+-include("kai.hrl").
+
+-define(SERVER, ?MODULE).
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, _Opts = []).
+
+init(Args) ->
+ ets:new(config, [set, private, named_table]),
+
+ lists:foreach(
+ fun({Key, Value}) -> ets:insert(config, {Key, Value}) end,
+ Args
+ ),
+
+ Hostname =
+ case proplists:get_value(hostname, Args) of
+ undefined -> {ok, H} = inet:gethostname(), H;
+ H -> H
+ end,
+ {ok, Address} = inet:getaddr(Hostname, inet),
+ Port = proplists:get_value(api_port, Args),
+ ets:insert(config, {node, {Address, Port}}),
+
+ NumberOfBuckets = proplists:get_value(number_of_buckets, Args),
+ Exponent = round( math:log(NumberOfBuckets) / math:log(2) ),
+ ets:insert(config, {number_of_buckets, trunc( math:pow(2, Exponent) )}),
+
+ {ok, []}.
+
+terminate(_Reason, _State) ->
+ ets:delete(config),
+ ok.
+
+do_get(Key) ->
+ case ets:lookup(config, Key) of
+ [{Key, Value}|_] -> Value;
+ _ -> undefined
+ end.
+
+do_get([], ListOfValues) ->
+ lists:reverse(ListOfValues);
+do_get([Key|Rest], ListOfValues) ->
+ do_get(Rest, [do_get(Key)|ListOfValues]).
+
+get(ListOfKeys, State) when is_list(ListOfKeys)->
+ {reply, do_get(ListOfKeys, []), State};
+get(Key, State) ->
+ {reply, do_get(Key), State}.
+
+node_info(State) ->
+ [LocalNode, NumberOfVirtualNode] =
+ do_get([node, number_of_virtual_nodes], []),
+ Info = [{number_of_virtual_nodes, NumberOfVirtualNode}],
+ {reply, {node_info, LocalNode, Info}, State}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call({get, Key}, _From, State) ->
+ get(Key, State);
+handle_call(node_info, _From, State) ->
+ node_info(State).
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+stop() ->
+ gen_server:call(?SERVER, stop).
+get(Key) ->
+ gen_server:call(?SERVER, {get, Key}).
+node_info() ->
+ gen_server:call(?SERVER, node_info).
Copied: tags/0.2.0/src/kai_connection.erl (from rev 84, trunk/src/kai_connection.erl)
===================================================================
--- tags/0.2.0/src/kai_connection.erl (rev 0)
+++ tags/0.2.0/src/kai_connection.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,203 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_connection).
+-behaviour(gen_server).
+
+-export([start_link/0, stop/0]).
+-export([lease/2, lease/3, return/1, close/1, connections/0]).
+-export([
+ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3
+]).
+
+-include("kai.hrl").
+
+-define(SERVER, ?MODULE).
+-define(TIMEOUT, 3000).
+
+-record(connection, {node, available, socket}).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
+
+init(_Args) ->
+ {ok, []}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+do_lease({Address, Port} = Node, Pid, Opts, [], Acc) ->
+ case gen_tcp:connect(
+ Address, Port,
+ [binary, {active, true}, {packet, 4}, {reuseaddr, true}],
+ ?TIMEOUT
+ ) of
+ {ok, Socket} ->
+ case gen_tcp:controlling_process(Socket, Pid) of
+ ok ->
+ ok = inet:setopts(Socket, Opts),
+ Connection = #connection{
+ node = Node,
+ available = false,
+ socket = Socket
+ },
+ Connections = [Connection|lists:reverse(Acc)], % LRU
+ {ok, Socket, Connections};
+ {error, Reason} ->
+ {error, Reason, Acc}
+ end;
+ {error, Reason} ->
+ {error, Reason, Acc}
+ end;
+do_lease(Node, Pid, Opts, [#connection{node=Node, available=true, socket=Socket}|Rest], Acc) ->
+ case gen_tcp:controlling_process(Socket, Pid) of
+ ok ->
+ ok = inet:setopts(Socket, Opts),
+ Connection = #connection{
+ node = Node,
+ available = false,
+ socket = Socket
+ },
+ Connections = [Connection|lists:reverse(Acc)] ++ Rest, % LRU
+ flush(Socket),
+ {ok, Socket, Connections};
+ {error, Reason} ->
+ Connections = Acc ++ Rest,
+ {error, Reason, Connections}
+ end;
+do_lease(Node, Pid, Opts, [C|Rest], Acc) ->
+ do_lease(Node, Pid, Opts, Rest, [C|Acc]).
+
+lease(Node, Pid, Opts, Connections) ->
+ case do_lease(Node, Pid, Opts, Connections, []) of
+ {ok, Socket, Connections2} ->
+ Connections3 = lru(Connections2),
+ {reply, {ok, Socket}, Connections3};
+ {error, Reason, Connections2} ->
+ ?warning(io_lib:format("lease(~p, ~p) ~p",
+ [Node, Pid, {error, Reason}])),
+ {reply, {error, Reason}, Connections2}
+ end.
+
+flush(Socket) ->
+ receive
+ {tcp, Socket, _Bin} -> flush(Socket)
+ after 0 -> ok
+ end.
+
+lru(0, Rest, Acc) ->
+ lists:reverse(Rest) ++ Acc;
+lru(_N, [], Acc) ->
+ Acc;
+lru(N, [#connection{node=_, available=true, socket=Socket}|Rest], Acc) ->
+ gen_tcp:close(Socket),
+ lru(N-1, Rest, Acc);
+lru(N, [#connection{node=_, available=false, socket=_} = C|Rest], Acc) ->
+ lru(N, Rest, [C|Acc]).
+
+lru(Connections) ->
+ MaxConnections = kai_config:get(max_connections),
+ Len = length(Connections),
+ if
+ Len > MaxConnections ->
+ lru(Len - MaxConnections, lists:reverse(Connections), []);
+ true ->
+ Connections
+ end.
+
+do_return(_Socket, [], Acc) ->
+ {error, enoent, Acc};
+do_return(Socket, [#connection{node=Node, available=_, socket=Socket}|Rest], Acc) ->
+ Connection = #connection{
+ node = Node,
+ available = true,
+ socket = Socket
+ },
+ Connections = [Connection|lists:reverse(Acc)] ++ Rest, % LRU
+ {ok, Connections};
+do_return(Socket, [C|Rest], Acc) ->
+ do_return(Socket, Rest, [C|Acc]).
+
+return(Socket, Connections) ->
+ case do_return(Socket, Connections, []) of
+ {ok, Connections2} ->
+ Connections3 = lru(Connections2),
+ {reply, ok, Connections3};
+ {error, Reason, Connections2} ->
+ ?warning(io_lib:format("return(~p) ~p",
+ [Socket, {error, Reason}])),
+ {reply, {error, Reason}, Connections2}
+ end.
+
+
+do_close(_Socket, [], Acc) ->
+ {error, enoent, Acc};
+do_close(Socket, [#connection{node=_, available=_, socket=Socket}|Rest], Acc) ->
+ gen_tcp:close(Socket),
+ {ok, lists:reverse(Acc) ++ Rest};
+do_close(Socket, [C|Rest], Acc) ->
+ do_close(Socket, Rest, [C|Acc]).
+
+close(Socket, Connections) ->
+ case do_close(Socket, Connections, []) of
+ {ok, Connections2} ->
+ {reply, ok, Connections2};
+ {error, Reason, Connections2} ->
+ {reply, {error, Reason}, Connections2}
+ end.
+
+connections(Connections) ->
+ {reply, {ok, Connections}, Connections}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call({lease, Node, Pid, Opts}, _From, State) ->
+ lease(Node, Pid, Opts, State);
+handle_call({return, Socket}, _From, State) ->
+ return(Socket, State);
+handle_call({close, Socket}, _From, State) ->
+ close(Socket, State);
+handle_call(connections, _From, State) ->
+ connections(State).
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+stop() ->
+ gen_server:call(?SERVER, stop).
+lease(Node, Pid) when is_pid(Pid)->
+ gen_server:call(?SERVER, {lease, Node, Pid, []}).
+lease(Node, Pid, Opts) when is_pid(Pid)->
+ gen_server:call(?SERVER, {lease, Node, Pid, Opts}).
+return(Socket) when is_port(Socket) ->
+ case reset_controlling_process(Socket) of
+ ok ->
+ gen_server:call(?SERVER, {return, Socket});
+ {error, Reason} ->
+ ?warning(io_lib:format("return(~p) ~p", [Socket, {error, Reason}])),
+ gen_server:call(?SERVER, {close, Socket}),
+ {error, Reason}
+ end.
+close(Socket) ->
+ gen_server:call(?SERVER, {close, Socket}).
+connections() ->
+ gen_server:call(?SERVER, connections).
+
+reset_controlling_process(Socket) ->
+ case whereis(?SERVER) of
+ Pid when is_pid(Pid) -> gen_tcp:controlling_process(Socket, Pid);
+ _ -> {error, esrch}
+ end.
Deleted: tags/0.2.0/src/kai_hash.erl
===================================================================
--- trunk/src/kai_hash.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_hash.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,312 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_hash).
--behaviour(gen_server).
-
--export([start_link/0, stop/0]).
--export([
- update_nodes/2, find_bucket/1, find_nodes/1,
- choose_node_randomly/0, choose_bucket_randomly/0,
- node_info/1, node_info/0, node_list/0, virtual_node_list/0, buckets/0
-]).
--export([
- init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3
-]).
-
--include("kai.hrl").
-
--define(SERVER, ?MODULE).
--define(HASH_LEN, 32).
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
-
-init(_Args) ->
- ets:new(node_list, [set, private, named_table]),
- ets:new(virtual_node_list, [ordered_set, private, named_table]),
- ets:new(buckets, [set, private, named_table]),
-
- {node_info, LocalNode, Info} = kai_config:node_info(),
- update_nodes([{LocalNode, Info}], [], _State = []),
-
- {ok, _State = []}.
-
-terminate(_Reason, _State) ->
- ets:delete(node_list),
- ets:delete(virtual_node_list),
- ets:delete(buckets),
- ok.
-
-hash(Key) ->
- <<HashedKey:?HASH_LEN/integer, _/binary>> = erlang:md5(Key),
- HashedKey.
-hash({{N1,N2,N3,N4}, Port}, VirtualNode) ->
- <<HashedKey:?HASH_LEN/integer, _/binary>> =
- erlang:md5(<<N1,N2,N3,N4,Port:16,VirtualNode:16>>),
- HashedKey.
-
-bucket_range(NumberOfBuckets) ->
- trunc( math:pow(2, ?HASH_LEN) / NumberOfBuckets ).
-
-search_bucket_nodes(_HashedKey, _N, 0, Nodes) ->
- {nodes, lists:reverse(Nodes)};
-search_bucket_nodes(HashedKey, N, I, Nodes) ->
- HashedNode =
- case ets:next(virtual_node_list, HashedKey) of
- '$end_of_table' -> ets:first(virtual_node_list);
- Other -> Other
- end,
- [{_HashedNode, Node}|_] = ets:lookup(virtual_node_list, HashedNode),
- Nodes2 =
- case lists:member(Node, Nodes) of
- true -> Nodes;
- _ -> [Node|Nodes]
- end,
- case length(Nodes2) of
- N -> {nodes, lists:reverse(Nodes2)};
- _ -> search_bucket_nodes(HashedNode, N, I-1, Nodes2)
- end.
-
-lists_index(_Elem, [], _I) ->
- 0;
-lists_index(Elem, [Head|Rest], I) ->
- case Elem =:= Head of
- true -> I;
- _ -> lists_index(Elem, Rest, I+1)
- end.
-lists_index(Elem, List) ->
- lists_index(Elem, List, 1).
-
-update_buckets(-1 = _Bucket, _LocalNode, _BucketRange, _N, _MaxSearch,
- ReplacedBuckets) ->
- {replaced_buckets, ReplacedBuckets};
-update_buckets(Bucket, LocalNode, BucketRange, N, MaxSearch,
- ReplacedBuckets) ->
- {nodes, NewNodes} =
- search_bucket_nodes(Bucket * BucketRange, N, MaxSearch, []),
- case ets:lookup(buckets, Bucket) of
- [{Bucket, NewNodes}] ->
- update_buckets(Bucket-1, LocalNode, BucketRange, N, MaxSearch,
- ReplacedBuckets);
- OldBucket ->
- ets:insert(buckets, {Bucket, NewNodes}),
- NewReplica = lists_index(LocalNode, NewNodes),
- OldReplica =
- case OldBucket of
- [{Bucket, OldNodes}] -> lists_index(LocalNode, OldNodes);
- [] -> 0
- end,
- ReplacedBuckets2 =
- case {NewReplica, OldReplica} of
- {Replica, Replica} ->
- ReplacedBuckets;
- _ ->
- [{Bucket, NewReplica, OldReplica}|ReplacedBuckets]
- end,
- update_buckets(Bucket-1, LocalNode, BucketRange, N, MaxSearch,
- ReplacedBuckets2)
- end.
-
-update_buckets() ->
- [LocalNode, N, NumberOfBuckets] =
- kai_config:get([node, n, number_of_buckets]),
- BucketRange = bucket_range(NumberOfBuckets),
- NumberOfNodes = proplists:get_value(size, ets:info(node_list)),
-
- % Don't search other nodes to fill a bucket when NumberOfNodes is 1, since
- % they are never found.
- MaxSearch =
- case NumberOfNodes of
- 1 -> 1;
- _ -> proplists:get_value(size, ets:info(virtual_node_list))
- end,
-
- update_buckets(NumberOfBuckets-1, LocalNode, BucketRange, N, MaxSearch, []).
-
-add_nodes([]) ->
- ok;
-add_nodes([{Node, Info}|Rest]) ->
- case ets:lookup(node_list, Node) of
- [{Node, _Info}|_] -> ok;
- [] ->
- ets:insert(node_list, {Node, Info}),
- NumberOfVirtualNodes =
- proplists:get_value(number_of_virtual_nodes, Info),
- lists:foreach(
- fun(VirtualNode) ->
- HashedKey = hash(Node, VirtualNode),
- ets:insert(virtual_node_list, {HashedKey, Node})
- end,
- lists:seq(1, NumberOfVirtualNodes)
- )
- end,
- add_nodes(Rest).
-
-remove_nodes([]) ->
- ok;
-remove_nodes([Node|Rest]) ->
- case ets:lookup(node_list, Node) of
- [{Node, Info}|_] ->
- ets:delete(node_list, Node),
- NumberOfVirtualNodes =
- proplists:get_value(number_of_virtual_nodes, Info),
- lists:foreach(
- fun(VirtualNode) ->
- HashedKey = hash(Node, VirtualNode),
- ets:delete(virtual_node_list, HashedKey)
- end,
- lists:seq(1, NumberOfVirtualNodes)
- );
- [] -> ok
- end,
- remove_nodes(Rest).
-
-update_nodes(NodesToAdd, NodesToRemove, State) ->
- LocalNode = kai_config:get(node),
- Reply =
- case {NodesToAdd, NodesToRemove -- [LocalNode]} of
- {[], []} ->
- {replaced_buckets, []};
- _ ->
- ?info({update, NodesToAdd, NodesToRemove}),
- add_nodes(NodesToAdd),
- remove_nodes(NodesToRemove),
- update_buckets()
- end,
- {reply, Reply, State}.
-
-do_find_bucket(Bucket, NumberOfBuckets) when is_integer(Bucket) ->
- Bucket rem NumberOfBuckets;
-do_find_bucket(Key, NumberOfBuckets) ->
- hash(Key) div bucket_range(NumberOfBuckets).
-
-find_bucket(KeyOrBucket, State) ->
- NumberOfBuckets = kai_config:get(number_of_buckets),
- {reply, {bucket, do_find_bucket(KeyOrBucket, NumberOfBuckets)}, State}.
-
-find_nodes(KeyOrBucket, State) ->
- NumberOfBuckets = kai_config:get(number_of_buckets),
- Bucket = do_find_bucket(KeyOrBucket, NumberOfBuckets),
- [{Bucket, Nodes}|_] = ets:lookup(buckets, Bucket),
- {reply, {nodes, Nodes}, State}.
-
-choose_node_randomly(State) ->
- {{N1,N2,N3,N4}, Port} = kai_config:get(node),
- Head = {'$1', '_'},
- Cond = [{'=/=', '$1', {{{{N1,N2,N3,N4}}, Port}}}], % double tuple paranthesis
- Body = ['$1'],
- Nodes = ets:select(node_list, [{Head, Cond, Body}]),
- Len = length(Nodes),
- case Len of
- 0 -> {reply, undefined, State};
- _ -> {reply, {node, lists:nth(random:uniform(Len), Nodes)}, State}
- end.
-
-inversed_buckets(_Node, -1 = _Bucket, Buckets) ->
- Buckets;
-inversed_buckets(Node, Bucket, Buckets) ->
- [{Bucket, Nodes}|_] = ets:lookup(buckets, Bucket),
- case lists:member(Node, Nodes) of
- true -> inversed_buckets(Node, Bucket-1, [Bucket|Buckets]);
- _ -> inversed_buckets(Node, Bucket-1, Buckets)
- end.
-
-inversed_buckets(Node) ->
- NumberOfBuckets = kai_config:get(number_of_buckets),
- inversed_buckets(Node, NumberOfBuckets-1, []).
-
-choose_bucket_randomly(State) ->
- LocalNode = kai_config:get(node),
- Buckets = inversed_buckets(LocalNode),
- Len = length(Buckets),
- case Len of
- 0 -> {reply, undefined, State};
- _ -> {reply, {bucket, lists:nth(random:uniform(Len), Buckets)}, State}
- end.
-
-do_node_info(Node, State) ->
- Head = {Node, '$2'},
- Cond = [],
- Body = ['$2'],
- [Info] = ets:select(node_list, [{Head, Cond, Body}]),
- {reply, {node_info, Node, Info}, State}.
-
-do_node_info(State) ->
- LocalNode = kai_config:get(node),
- do_node_info(LocalNode, State).
-
-node_list(State) ->
- NodeList = ets:tab2list(node_list),
- NodeList2 = lists:map(fun({Node, _Info}) -> Node end, NodeList),
- {reply, {node_list, NodeList2}, State}.
-
-virtual_node_list(State) ->
- VirtualNodeList = ets:tab2list(virtual_node_list),
- {reply, {virtual_node_list, VirtualNodeList}, State}.
-
-buckets(State) ->
- Buckets = ets:tab2list(buckets),
- {reply, {buckets, Buckets}, State}.
-
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State};
-handle_call({update_nodes, NodesToAdd, NodesToRemove}, _From, State) ->
- update_nodes(NodesToAdd, NodesToRemove, State);
-handle_call({find_bucket, KeyOrBucket}, _From, State) ->
- find_bucket(KeyOrBucket, State);
-handle_call({find_nodes, KeyOrBucket}, _From, State) ->
- find_nodes(KeyOrBucket, State);
-handle_call(choose_node_randomly, _From, State) ->
- choose_node_randomly(State);
-handle_call(choose_bucket_randomly, _From, State) ->
- choose_bucket_randomly(State);
-handle_call({node_info, Node}, _From, State) ->
- do_node_info(Node, State);
-handle_call(node_info, _From, State) ->
- do_node_info(State);
-handle_call(node_list, _From, State) ->
- node_list(State);
-handle_call(virtual_node_list, _From, State) ->
- virtual_node_list(State);
-handle_call(buckets, _From, State) ->
- buckets(State).
-handle_cast(_Msg, State) ->
- {noreply, State}.
-handle_info(_Info, State) ->
- {noreply, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-stop() ->
- gen_server:call(?SERVER, stop).
-update_nodes(NodesToAdd, NodesToRemove) ->
- gen_server:call(?SERVER, {update_nodes, NodesToAdd, NodesToRemove}).
-find_bucket(KeyOrBucket) ->
- gen_server:call(?SERVER, {find_bucket, KeyOrBucket}).
-find_nodes(KeyOrBucket) ->
- gen_server:call(?SERVER, {find_nodes, KeyOrBucket}).
-choose_node_randomly() ->
- gen_server:call(?SERVER, choose_node_randomly).
-choose_bucket_randomly() ->
- gen_server:call(?SERVER, choose_bucket_randomly).
-node_info() ->
- gen_server:call(?SERVER, node_info).
-node_info(Node) ->
- gen_server:call(?SERVER, {node_info, Node}).
-node_list() ->
- gen_server:call(?SERVER, node_list).
-virtual_node_list() ->
- gen_server:call(?SERVER, virtual_node_list).
-buckets() ->
- gen_server:call(?SERVER, buckets).
Copied: tags/0.2.0/src/kai_hash.erl (from rev 84, trunk/src/kai_hash.erl)
===================================================================
--- tags/0.2.0/src/kai_hash.erl (rev 0)
+++ tags/0.2.0/src/kai_hash.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,322 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_hash).
+-behaviour(gen_server).
+
+-export([start_link/0, stop/0]).
+-export([
+ update_nodes/2, find_bucket/1, find_replica/1, find_nodes/1,
+ choose_node_randomly/0, choose_bucket_randomly/0,
+ node_info/1, node_info/0, node_list/0, virtual_node_list/0, buckets/0
+]).
+-export([
+ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3
+]).
+
+-include("kai.hrl").
+
+-define(SERVER, ?MODULE).
+-define(HASH_LEN, 32).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
+
+init(_Args) ->
+ ets:new(node_list, [set, private, named_table]),
+ ets:new(virtual_node_list, [ordered_set, private, named_table]),
+ ets:new(buckets, [set, private, named_table]),
+
+ {node_info, LocalNode, Info} = kai_config:node_info(),
+ update_nodes([{LocalNode, Info}], [], _State = []),
+
+ {ok, _State = []}.
+
+terminate(_Reason, _State) ->
+ ets:delete(node_list),
+ ets:delete(virtual_node_list),
+ ets:delete(buckets),
+ ok.
+
+hash(Key) ->
+ <<HashedKey:?HASH_LEN/integer, _/binary>> = erlang:md5(Key),
+ HashedKey.
+hash({{N1,N2,N3,N4}, Port}, VirtualNode) ->
+ <<HashedKey:?HASH_LEN/integer, _/binary>> =
+ erlang:md5(<<N1,N2,N3,N4,Port:16,VirtualNode:16>>),
+ HashedKey.
+
+bucket_range(NumberOfBuckets) ->
+ trunc( math:pow(2, ?HASH_LEN) / NumberOfBuckets ).
+
+search_bucket_nodes(_HashedKey, _N, 0, Nodes) ->
+ {nodes, lists:reverse(Nodes)};
+search_bucket_nodes(HashedKey, N, I, Nodes) ->
+ HashedNode =
+ case ets:next(virtual_node_list, HashedKey) of
+ '$end_of_table' -> ets:first(virtual_node_list);
+ Other -> Other
+ end,
+ [{_HashedNode, Node}|_] = ets:lookup(virtual_node_list, HashedNode),
+ Nodes2 =
+ case lists:member(Node, Nodes) of
+ true -> Nodes;
+ _ -> [Node|Nodes]
+ end,
+ case length(Nodes2) of
+ N -> {nodes, lists:reverse(Nodes2)};
+ _ -> search_bucket_nodes(HashedNode, N, I-1, Nodes2)
+ end.
+
+lists_index(_Elem, [], _I) ->
+ undefined;
+lists_index(Elem, [Head|Rest], I) ->
+ case Elem =:= Head of
+ true -> I;
+ _ -> lists_index(Elem, Rest, I+1)
+ end.
+lists_index(Elem, List) ->
+ lists_index(Elem, List, 1).
+
+update_buckets(-1 = _Bucket, _LocalNode, _BucketRange, _N, _MaxSearch,
+ ReplacedBuckets) ->
+ {replaced_buckets, ReplacedBuckets};
+update_buckets(Bucket, LocalNode, BucketRange, N, MaxSearch,
+ ReplacedBuckets) ->
+ {nodes, NewNodes} =
+ search_bucket_nodes(Bucket * BucketRange, N, MaxSearch, []),
+ case ets:lookup(buckets, Bucket) of
+ [{Bucket, NewNodes}] ->
+ update_buckets(Bucket-1, LocalNode, BucketRange, N, MaxSearch,
+ ReplacedBuckets);
+ OldBucket ->
+ ets:insert(buckets, {Bucket, NewNodes}),
+ NewReplica = lists_index(LocalNode, NewNodes),
+ OldReplica =
+ case OldBucket of
+ [{Bucket, OldNodes}] -> lists_index(LocalNode, OldNodes);
+ [] -> undefined
+ end,
+ ReplacedBuckets2 =
+ case {NewReplica, OldReplica} of
+ {Replica, Replica} ->
+ ReplacedBuckets;
+ _ ->
+ [{Bucket, NewReplica, OldReplica}|ReplacedBuckets]
+ end,
+ update_buckets(Bucket-1, LocalNode, BucketRange, N, MaxSearch,
+ ReplacedBuckets2)
+ end.
+
+update_buckets() ->
+ [LocalNode, N, NumberOfBuckets] =
+ kai_config:get([node, n, number_of_buckets]),
+ BucketRange = bucket_range(NumberOfBuckets),
+ NumberOfNodes = proplists:get_value(size, ets:info(node_list)),
+
+ % Don't search other nodes to fill a bucket when NumberOfNodes is 1, since
+ % they are never found.
+ MaxSearch =
+ case NumberOfNodes of
+ 1 -> 1;
+ _ -> proplists:get_value(size, ets:info(virtual_node_list))
+ end,
+
+ update_buckets(NumberOfBuckets-1, LocalNode, BucketRange, N, MaxSearch, []).
+
+add_nodes([]) ->
+ ok;
+add_nodes([{Node, Info}|Rest]) ->
+ case ets:lookup(node_list, Node) of
+ [{Node, _Info}|_] -> ok;
+ [] ->
+ ets:insert(node_list, {Node, Info}),
+ NumberOfVirtualNodes =
+ proplists:get_value(number_of_virtual_nodes, Info),
+ lists:foreach(
+ fun(VirtualNode) ->
+ HashedKey = hash(Node, VirtualNode),
+ ets:insert(virtual_node_list, {HashedKey, Node})
+ end,
+ lists:seq(1, NumberOfVirtualNodes)
+ )
+ end,
+ add_nodes(Rest).
+
+remove_nodes([]) ->
+ ok;
+remove_nodes([Node|Rest]) ->
+ case ets:lookup(node_list, Node) of
+ [{Node, Info}|_] ->
+ ets:delete(node_list, Node),
+ NumberOfVirtualNodes =
+ proplists:get_value(number_of_virtual_nodes, Info),
+ lists:foreach(
+ fun(VirtualNode) ->
+ HashedKey = hash(Node, VirtualNode),
+ ets:delete(virtual_node_list, HashedKey)
+ end,
+ lists:seq(1, NumberOfVirtualNodes)
+ );
+ [] -> ok
+ end,
+ remove_nodes(Rest).
+
+update_nodes(NodesToAdd, NodesToRemove, State) ->
+ LocalNode = kai_config:get(node),
+ Reply =
+ case {NodesToAdd, NodesToRemove -- [LocalNode]} of
+ {[], []} ->
+ {replaced_buckets, []};
+ _ ->
+ ?info({update, NodesToAdd, NodesToRemove}),
+ add_nodes(NodesToAdd),
+ remove_nodes(NodesToRemove),
+ update_buckets()
+ end,
+ {reply, Reply, State}.
+
+do_find_bucket(Bucket, NumberOfBuckets) when is_integer(Bucket) ->
+ Bucket rem NumberOfBuckets;
+do_find_bucket(Key, NumberOfBuckets) ->
+ hash(Key) div bucket_range(NumberOfBuckets).
+
+find_bucket(KeyOrBucket, State) ->
+ NumberOfBuckets = kai_config:get(number_of_buckets),
+ {reply, {bucket, do_find_bucket(KeyOrBucket, NumberOfBuckets)}, State}.
+
+find_replica(KeyOrBucket, State) ->
+ LocalNode = kai_config:get(node),
+ {reply, {nodes, Nodes}, State2} = find_nodes(KeyOrBucket, State),
+ Replica = lists_index(LocalNode, Nodes),
+ {reply, {replica, Replica}, State2}.
+
+find_nodes(KeyOrBucket, State) ->
+ NumberOfBuckets = kai_config:get(number_of_buckets),
+ Bucket = do_find_bucket(KeyOrBucket, NumberOfBuckets),
+ [{Bucket, Nodes}|_] = ets:lookup(buckets, Bucket),
+ {reply, {nodes, Nodes}, State}.
+
+choose_node_randomly(State) ->
+ {{N1,N2,N3,N4}, Port} = kai_config:get(node),
+ Head = {'$1', '_'},
+ Cond = [{'=/=', '$1', {{{{N1,N2,N3,N4}}, Port}}}], % double tuple paranthesis
+ Body = ['$1'],
+ Nodes = ets:select(node_list, [{Head, Cond, Body}]),
+ Len = length(Nodes),
+ case Len of
+ 0 -> {reply, undefined, State};
+ _ -> {reply, {node, lists:nth(random:uniform(Len), Nodes)}, State}
+ end.
+
+inversed_buckets(_Node, -1 = _Bucket, Buckets) ->
+ Buckets;
+inversed_buckets(Node, Bucket, Buckets) ->
+ [{Bucket, Nodes}|_] = ets:lookup(buckets, Bucket),
+ case lists:member(Node, Nodes) of
+ true -> inversed_buckets(Node, Bucket-1, [Bucket|Buckets]);
+ _ -> inversed_buckets(Node, Bucket-1, Buckets)
+ end.
+
+inversed_buckets(Node) ->
+ NumberOfBuckets = kai_config:get(number_of_buckets),
+ inversed_buckets(Node, NumberOfBuckets-1, []).
+
+choose_bucket_randomly(State) ->
+ LocalNode = kai_config:get(node),
+ Buckets = inversed_buckets(LocalNode),
+ Len = length(Buckets),
+ case Len of
+ 0 -> {reply, undefined, State};
+ _ -> {reply, {bucket, lists:nth(random:uniform(Len), Buckets)}, State}
+ end.
+
+do_node_info(Node, State) ->
+ Head = {Node, '$2'},
+ Cond = [],
+ Body = ['$2'],
+ [Info] = ets:select(node_list, [{Head, Cond, Body}]),
+ {reply, {node_info, Node, Info}, State}.
+
+do_node_info(State) ->
+ LocalNode = kai_config:get(node),
+ do_node_info(LocalNode, State).
+
+node_list(State) ->
+ NodeList = ets:tab2list(node_list),
+ NodeList2 = lists:map(fun({Node, _Info}) -> Node end, NodeList),
+ {reply, {node_list, NodeList2}, State}.
+
+virtual_node_list(State) ->
+ VirtualNodeList = ets:tab2list(virtual_node_list),
+ {reply, {virtual_node_list, VirtualNodeList}, State}.
+
+buckets(State) ->
+ Buckets = ets:tab2list(buckets),
+ {reply, {buckets, Buckets}, State}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call({update_nodes, NodesToAdd, NodesToRemove}, _From, State) ->
+ update_nodes(NodesToAdd, NodesToRemove, State);
+handle_call({find_bucket, KeyOrBucket}, _From, State) ->
+ find_bucket(KeyOrBucket, State);
+handle_call({find_replica, KeyOrBucket}, _From, State) ->
+ find_replica(KeyOrBucket, State);
+handle_call({find_nodes, KeyOrBucket}, _From, State) ->
+ find_nodes(KeyOrBucket, State);
+handle_call(choose_node_randomly, _From, State) ->
+ choose_node_randomly(State);
+handle_call(choose_bucket_randomly, _From, State) ->
+ choose_bucket_randomly(State);
+handle_call({node_info, Node}, _From, State) ->
+ do_node_info(Node, State);
+handle_call(node_info, _From, State) ->
+ do_node_info(State);
+handle_call(node_list, _From, State) ->
+ node_list(State);
+handle_call(virtual_node_list, _From, State) ->
+ virtual_node_list(State);
+handle_call(buckets, _From, State) ->
+ buckets(State).
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+stop() ->
+ gen_server:call(?SERVER, stop).
+update_nodes(NodesToAdd, NodesToRemove) ->
+ gen_server:call(?SERVER, {update_nodes, NodesToAdd, NodesToRemove}).
+find_bucket(KeyOrBucket) ->
+ gen_server:call(?SERVER, {find_bucket, KeyOrBucket}).
+find_replica(KeyOrBucket) ->
+ gen_server:call(?SERVER, {find_replica, KeyOrBucket}).
+find_nodes(KeyOrBucket) ->
+ gen_server:call(?SERVER, {find_nodes, KeyOrBucket}).
+choose_node_randomly() ->
+ gen_server:call(?SERVER, choose_node_randomly).
+choose_bucket_randomly() ->
+ gen_server:call(?SERVER, choose_bucket_randomly).
+node_info() ->
+ gen_server:call(?SERVER, node_info).
+node_info(Node) ->
+ gen_server:call(?SERVER, {node_info, Node}).
+node_list() ->
+ gen_server:call(?SERVER, node_list).
+virtual_node_list() ->
+ gen_server:call(?SERVER, virtual_node_list).
+buckets() ->
+ gen_server:call(?SERVER, buckets).
Deleted: tags/0.2.0/src/kai_membership.erl
===================================================================
--- trunk/src/kai_membership.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_membership.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,105 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_membership).
--behaviour(gen_fsm).
-
--export([start_link/0, stop/0]).
--export([check_node/1]).
--export([
- init/1, ready/2, handle_event/3, handle_sync_event/4, handle_info/3,
- terminate/3, code_change/4
-]).
-
--include("kai.hrl").
-
--define(SERVER, ?MODULE).
--define(TIMEOUT, 3000).
--define(TIMER, 1000).
-
-start_link() ->
- gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
-
-init(_Args) ->
- {ok, ready, [], ?TIMER}.
-
-terminate(_Reason, _StateName, _StateData) ->
- ok.
-
-ping_nodes([], AvailableNodes, DownNodes) ->
- {AvailableNodes, DownNodes};
-ping_nodes([Node|Nodes], AvailableNodes, DownNodes) ->
- case kai_api:node_info(Node) of
- {node_info, Node2, Info} ->
- ping_nodes(Nodes, [{Node2, Info}|AvailableNodes], DownNodes);
- {error, Reason} ->
- ?warning(io_lib:format("ping_nodes/3: ~p", [{error, Reason}])),
- ping_nodes(Nodes, AvailableNodes, [Node|DownNodes])
- end.
-
-retrieve_node_list(Node) ->
- case kai_api:node_list(Node) of
- {node_list, RemoteNodeList} ->
- {node_list, LocalNodeList} = kai_hash:node_list(),
- NewNodes = RemoteNodeList -- LocalNodeList,
- OldNodes = LocalNodeList -- RemoteNodeList,
- Nodes = NewNodes ++ OldNodes,
- LocalNode = kai_config:get(node),
- ping_nodes(Nodes -- [LocalNode], [], []);
- {error, Reason} ->
- ?warning(io_lib:format("retrieve_node_list/1: ~p", [{error, Reason}])),
- {[], [Node]}
- end.
-
-sync_buckets([], _LocalNode) ->
- ok;
-sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets], LocalNode) ->
- case {NewReplica, OldReplica} of
- {NewReplica, 0} -> kai_sync:update_bucket(Bucket);
- {0, OldReplica} -> kai_sync:delete_bucket(Bucket);
- _ -> nop
- end,
- sync_buckets(ReplacedBuckets, LocalNode).
-
-sync_buckets(ReplacedBuckets) ->
- LocalNode = kai_config:get(node),
- sync_buckets(ReplacedBuckets, LocalNode).
-
-do_check_node({Address, Port}) ->
- {AvailableNodes, DownNodes} = retrieve_node_list({Address, Port}),
- {replaced_buckets, ReplacedBuckets} =
- kai_hash:update_nodes(AvailableNodes, DownNodes),
- sync_buckets(ReplacedBuckets).
-
-ready({check_node, Node}, State) ->
- do_check_node(Node),
- {next_state, ready, State, ?TIMER};
-ready(timeout, State) ->
- case kai_hash:choose_node_randomly() of
- {node, Node} -> do_check_node(Node);
- _ -> nop
- end,
- {next_state, ready, State, ?TIMER}.
-
-handle_event(stop, _StateName, StateData) ->
- {stop, normal, StateData}.
-handle_sync_event(_Event, _From, _StateName, StateData) ->
- {next_state, ready, StateData, 3000}.
-handle_info(_Info, _StateName, StateData) ->
- {next_state, ready, StateData, 3000}.
-code_change(_OldVsn, _StateName, StateData, _Extra) ->
- {ok, ready, StateData}.
-
-stop() ->
- gen_fsm:send_all_state_event(?SERVER, stop).
-check_node(Node) ->
- gen_fsm:send_event(?SERVER, {check_node, Node}).
Copied: tags/0.2.0/src/kai_membership.erl (from rev 84, trunk/src/kai_membership.erl)
===================================================================
--- tags/0.2.0/src/kai_membership.erl (rev 0)
+++ tags/0.2.0/src/kai_membership.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,105 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_membership).
+-behaviour(gen_fsm).
+
+-export([start_link/0, stop/0]).
+-export([check_node/1]).
+-export([
+ init/1, ready/2, handle_event/3, handle_sync_event/4, handle_info/3,
+ terminate/3, code_change/4
+]).
+
+-include("kai.hrl").
+
+-define(SERVER, ?MODULE).
+-define(TIMEOUT, 3000).
+-define(TIMER, 1000).
+
+start_link() ->
+ gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
+
+init(_Args) ->
+ {ok, ready, [], ?TIMER}.
+
+terminate(_Reason, _StateName, _StateData) ->
+ ok.
+
+ping_nodes([], AvailableNodes, DownNodes) ->
+ {AvailableNodes, DownNodes};
+ping_nodes([Node|Nodes], AvailableNodes, DownNodes) ->
+ case kai_api:node_info(Node) of
+ {node_info, Node2, Info} ->
+ ping_nodes(Nodes, [{Node2, Info}|AvailableNodes], DownNodes);
+ {error, Reason} ->
+ ?warning(io_lib:format("ping_nodes/3: ~p", [{error, Reason}])),
+ ping_nodes(Nodes, AvailableNodes, [Node|DownNodes])
+ end.
+
+retrieve_node_list(Node) ->
+ case kai_api:node_list(Node) of
+ {node_list, RemoteNodeList} ->
+ {node_list, LocalNodeList} = kai_hash:node_list(),
+ NewNodes = RemoteNodeList -- LocalNodeList,
+ OldNodes = LocalNodeList -- RemoteNodeList,
+ Nodes = NewNodes ++ OldNodes,
+ LocalNode = kai_config:get(node),
+ ping_nodes(Nodes -- [LocalNode], [], []);
+ {error, Reason} ->
+ ?warning(io_lib:format("retrieve_node_list/1: ~p", [{error, Reason}])),
+ {[], [Node]}
+ end.
+
+sync_buckets([], _LocalNode) ->
+ ok;
+sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets], LocalNode) ->
+ case {NewReplica, OldReplica} of
+ {NewReplica, undefined } -> kai_sync:update_bucket(Bucket);
+ {undefined, OldReplica} -> kai_sync:delete_bucket(Bucket);
+ _ -> nop
+ end,
+ sync_buckets(ReplacedBuckets, LocalNode).
+
+sync_buckets(ReplacedBuckets) ->
+ LocalNode = kai_config:get(node),
+ sync_buckets(ReplacedBuckets, LocalNode).
+
+do_check_node({Address, Port}) ->
+ {AvailableNodes, DownNodes} = retrieve_node_list({Address, Port}),
+ {replaced_buckets, ReplacedBuckets} =
+ kai_hash:update_nodes(AvailableNodes, DownNodes),
+ sync_buckets(ReplacedBuckets).
+
+ready({check_node, Node}, State) ->
+ do_check_node(Node),
+ {next_state, ready, State, ?TIMER};
+ready(timeout, State) ->
+ case kai_hash:choose_node_randomly() of
+ {node, Node} -> do_check_node(Node);
+ _ -> nop
+ end,
+ {next_state, ready, State, ?TIMER}.
+
+handle_event(stop, _StateName, StateData) ->
+ {stop, normal, StateData}.
+handle_sync_event(_Event, _From, _StateName, StateData) ->
+ {next_state, ready, StateData, 3000}.
+handle_info(_Info, _StateName, StateData) ->
+ {next_state, ready, StateData, 3000}.
+code_change(_OldVsn, _StateName, StateData, _Extra) ->
+ {ok, ready, StateData}.
+
+stop() ->
+ gen_fsm:send_all_state_event(?SERVER, stop).
+check_node(Node) ->
+ gen_fsm:send_event(?SERVER, {check_node, Node}).
Deleted: tags/0.2.0/src/kai_memcache.erl
===================================================================
--- trunk/src/kai_memcache.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_memcache.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,104 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_memcache).
--behaviour(kai_tcp_server).
-
--export([start_link/0, stop/0]).
--export([init/1, handle_call/3]).
-
--include("kai.hrl").
-
--define(TIMEOUT_CLIENT, 3000).
-
-start_link() ->
- kai_tcp_server:start_link(
- {local, ?MODULE},
- ?MODULE,
- [],
- #tcp_server_option{
- port = kai_config:get(memcache_port),
- max_connections = kai_config:get(memcache_max_connections)
- }
- ).
-
-stop() -> kai_tcp_server:stop(?MODULE).
-
-init(_Args) -> {ok, {}}.
-
-handle_call(Socket, Data, State) ->
- dispatch(Socket, string:tokens(binary_to_list(Data), " \r\n"), State).
-
-dispatch(_Socket, ["get", Key], State) ->
- case kai_coordinator:route({get, #data{key=Key}}) of
- Data when is_list(Data) ->
- Response = get_response(Data),
- {reply, [Response|"END\r\n"], State};
- undefined ->
- {reply, <<"END\r\n">>, State};
- _Other ->
- send_error_and_close("Failed to read.", State)
- end;
-
-dispatch(Socket, ["set", _Key, _Flags, "0", _Bytes] = Data, State) ->
- inet:setopts(Socket, [{packet, raw}]),
- Result = recv_set_data(Socket, Data, State),
- inet:setopts(Socket, [{packet, line}]),
- Result;
-
-dispatch(_Socket, ["set", _Key, _Flags, _Exptime, _Bytes], State) ->
- {reply, <<"CLIENT_ERROR Exptime must be zero.\r\n">>, State};
-
-dispatch(_Socket, ["delete", Key], State) ->
- case kai_coordinator:route({delete, #data{key=Key}}) of
- ok -> {reply, <<"DELETED\r\n">>, State};
- undefined -> {reply, <<"NOT_FOUND\r\n">>, State};
- _Other ->
- send_error_and_close(State, "Failed to delete.")
- end;
-
-dispatch(_Socket, ["quit"], _State) -> quit;
-
-dispatch(_Socket, _Unknown, _State) -> {reply, <<"ERROR\r\n">>}.
-
-get_response(Data) ->
- lists:map(fun(Elem) ->
- Key = Elem#data.key,
- Flags = Elem#data.flags,
- Value = Elem#data.value,
- [
- io_lib:format("VALUE ~s ~s ~w", [Key, Flags, byte_size(Value)]),
- "\r\n", Value, "\r\n"
- ]
- end, Data).
-
-recv_set_data(Socket, ["set", Key, Flags, "0", Bytes], State) ->
- case gen_tcp:recv(Socket, list_to_integer(Bytes), ?TIMEOUT_CLIENT) of
- {ok, Value} ->
- gen_tcp:recv(Socket, 2, ?TIMEOUT_CLIENT),
- case kai_coordinator:route(
- {put, #data{key=Key, flags=Flags, value=Value}}
- ) of
- ok ->
- gen_tcp:send(Socket, <<"STORED\r\n">>),
- {noreply, State};
- _Other ->
- send_error_and_close("Failed to write.", State)
- end;
- _Other ->
- {noreply, State}
- end.
-
-send_error_and_close(Message, State) ->
- ?warning(io_lib:format("send_error_and_close/2: ~p", [Message])),
- {close, ["SERVER_ERROR ", Message, "\r\n"], State}.
-
Copied: tags/0.2.0/src/kai_memcache.erl (from rev 84, trunk/src/kai_memcache.erl)
===================================================================
--- tags/0.2.0/src/kai_memcache.erl (rev 0)
+++ tags/0.2.0/src/kai_memcache.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,104 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_memcache).
+-behaviour(kai_tcp_server).
+
+-export([start_link/0, stop/0]).
+-export([init/1, handle_call/3]).
+
+-include("kai.hrl").
+
+-define(TIMEOUT_CLIENT, 3000).
+
+start_link() ->
+ kai_tcp_server:start_link(
+ {local, ?MODULE},
+ ?MODULE,
+ [],
+ #tcp_server_option{
+ port = kai_config:get(memcache_port),
+ max_processes = kai_config:get(memcache_max_processes)
+ }
+ ).
+
+stop() -> kai_tcp_server:stop(?MODULE).
+
+init(_Args) -> {ok, {}}.
+
+handle_call(Socket, Data, State) ->
+ dispatch(Socket, string:tokens(binary_to_list(Data), " \r\n"), State).
+
+dispatch(_Socket, ["get", Key], State) ->
+ case kai_coordinator:route({get, #data{key=Key}}) of
+ Data when is_list(Data) ->
+ Response = get_response(Data),
+ {reply, [Response|"END\r\n"], State};
+ undefined ->
+ {reply, <<"END\r\n">>, State};
+ _Other ->
+ send_error_and_close("Failed to read.", State)
+ end;
+
+dispatch(Socket, ["set", _Key, _Flags, "0", _Bytes] = Data, State) ->
+ inet:setopts(Socket, [{packet, raw}]),
+ Result = recv_set_data(Socket, Data, State),
+ inet:setopts(Socket, [{packet, line}]),
+ Result;
+
+dispatch(_Socket, ["set", _Key, _Flags, _Exptime, _Bytes], State) ->
+ {reply, <<"CLIENT_ERROR Exptime must be zero.\r\n">>, State};
+
+dispatch(_Socket, ["delete", Key], State) ->
+ case kai_coordinator:route({delete, #data{key=Key}}) of
+ ok -> {reply, <<"DELETED\r\n">>, State};
+ undefined -> {reply, <<"NOT_FOUND\r\n">>, State};
+ _Other ->
+ send_error_and_close(State, "Failed to delete.")
+ end;
+
+dispatch(_Socket, ["quit"], _State) -> quit;
+
+dispatch(_Socket, _Unknown, _State) -> {reply, <<"ERROR\r\n">>}.
+
+get_response(Data) ->
+ lists:map(fun(Elem) ->
+ Key = Elem#data.key,
+ Flags = Elem#data.flags,
+ Value = Elem#data.value,
+ [
+ io_lib:format("VALUE ~s ~s ~w", [Key, Flags, byte_size(Value)]),
+ "\r\n", Value, "\r\n"
+ ]
+ end, Data).
+
+recv_set_data(Socket, ["set", Key, Flags, "0", Bytes], State) ->
+ case gen_tcp:recv(Socket, list_to_integer(Bytes), ?TIMEOUT_CLIENT) of
+ {ok, Value} ->
+ gen_tcp:recv(Socket, 2, ?TIMEOUT_CLIENT),
+ case kai_coordinator:route(
+ {put, #data{key=Key, flags=Flags, value=Value}}
+ ) of
+ ok ->
+ gen_tcp:send(Socket, <<"STORED\r\n">>),
+ {noreply, State};
+ _Other ->
+ send_error_and_close("Failed to write.", State)
+ end;
+ _Other ->
+ {noreply, State}
+ end.
+
+send_error_and_close(Message, State) ->
+ ?warning(io_lib:format("send_error_and_close/2: ~p", [Message])),
+ {close, ["SERVER_ERROR ", Message, "\r\n"], State}.
+
Deleted: tags/0.2.0/src/kai_sup.erl
===================================================================
--- trunk/src/kai_sup.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_sup.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,81 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_sup).
--behaviour(supervisor).
-
--export([start_link/1]).
--export([init/1]).
-
--define(SERVER, ?MODULE).
-
-start_link(Args) ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, Args).
-
-init(Args) ->
- Config = {
- kai_config,
- {kai_config, start_link, [Args]},
- permanent, 1000, worker,
- [kai_config]
- },
- Log = {
- kai_log,
- {kai_log, start_link, []},
- permanent, 1000, worker,
- [kai_log]
- },
- Hash = {
- kai_hash,
- {kai_hash, start_link, []},
- permanent, 1000, worker,
- [kai_hash]
- },
- Store = {
- kai_store,
- {kai_store, start_link, []},
- permanent, 1000, worker,
- [kai_store]
- },
- Version = {
- kai_version,
- {kai_version, start_link, []},
- permanent, 1000, worker,
- [kai_version]
- },
- Sync = {
- kai_sync,
- {kai_sync, start_link, []},
- permanent, 1000, worker,
- [kai_sync]
- },
- Membership = {
- kai_membership,
- {kai_membership, start_link, []},
- permanent, 1000, worker,
- [kai_membership]
- },
- Api = {
- kai_api,
- {kai_api, start_link, []},
- permanent, 1000, worker,
- [kai_api]
- },
- Memcache = {
- kai_memcache,
- {kai_memcache, start_link, []},
- permanent, 1000, worker,
- [kai_memcache]
- },
- {ok, {{one_for_one, 3, 10}, [
- Config, Log, Hash, Store, Version, Sync, Membership, Api, Memcache
- ]}}.
Copied: tags/0.2.0/src/kai_sup.erl (from rev 84, trunk/src/kai_sup.erl)
===================================================================
--- tags/0.2.0/src/kai_sup.erl (rev 0)
+++ tags/0.2.0/src/kai_sup.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,88 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_sup).
+-behaviour(supervisor).
+
+-export([start_link/1]).
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link(Args) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, Args).
+
+init(Args) ->
+ Config = {
+ kai_config,
+ {kai_config, start_link, [Args]},
+ permanent, 1000, worker,
+ [kai_config]
+ },
+ Log = {
+ kai_log,
+ {kai_log, start_link, []},
+ permanent, 1000, worker,
+ [kai_log]
+ },
+ Hash = {
+ kai_hash,
+ {kai_hash, start_link, []},
+ permanent, 1000, worker,
+ [kai_hash]
+ },
+ Store = {
+ kai_store,
+ {kai_store, start_link, []},
+ permanent, 1000, worker,
+ [kai_store]
+ },
+ Version = {
+ kai_version,
+ {kai_version, start_link, []},
+ permanent, 1000, worker,
+ [kai_version]
+ },
+ Connection = {
+ kai_connection,
+ {kai_connection, start_link, []},
+ permanent, 1000, worker,
+ [kai_connection]
+ },
+ Sync = {
+ kai_sync,
+ {kai_sync, start_link, []},
+ permanent, 1000, worker,
+ [kai_sync]
+ },
+ Membership = {
+ kai_membership,
+ {kai_membership, start_link, []},
+ permanent, 1000, worker,
+ [kai_membership]
+ },
+ Api = {
+ kai_api,
+ {kai_api, start_link, []},
+ permanent, 1000, worker,
+ [kai_api]
+ },
+ Memcache = {
+ kai_memcache,
+ {kai_memcache, start_link, []},
+ permanent, 1000, worker,
+ [kai_memcache]
+ },
+ {ok, {{one_for_one, 3, 10}, [
+ Config, Log, Hash, Store, Version, Connection, Sync, Membership, Api,
+ Memcache
+ ]}}.
Deleted: tags/0.2.0/src/kai_tcp_server.erl
===================================================================
--- trunk/src/kai_tcp_server.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/src/kai_tcp_server.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,162 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_tcp_server).
--behaviour(supervisor).
-
--export([behaviour_info/1]).
--export([start_link/1, start_link/2, start_link/3, start_link/4]).
--export([stop/0, stop/1]).
--export([init/1, acceptor_init/5]).
--export([acceptor_start_link/5]).
-
--include("kai.hrl").
-
-% Behaviour Callbacks
-behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}];
-behaviour_info(_Other) -> undefined.
-
-% Supervisor - tcp_server
-%% External APIs
-start_link(Mod) -> start_link(Mod, []).
-start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}).
-start_link(Mod, Args, Option) ->
- start_link({local, ?MODULE}, Mod, Args, Option).
-start_link(Name, Mod, Args, Option) ->
- supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]).
-
-stop() -> stop(?MODULE).
-stop(Name) ->
- case whereis(Name) of
- Pid when is_pid(Pid) ->
- exit(Pid, normal),
- ok;
- _ -> not_started
- end.
-
-%% Callbacks
-init([Name, Mod, Args, Option]) ->
- case Mod:init(Args) of
- {ok, State} -> listen(State, Name, Mod, Option);
- {stop, Reason} -> Reason;
- Other -> Other % 'ignore' is contained.
- end.
-
-%% Internal Functions
-listen(State, Name, Mod, Option) ->
- case gen_tcp:listen(
- Option#tcp_server_option.port,
- Option#tcp_server_option.listen
- ) of
- {ok, ListenSocket} ->
- init_acceptors(ListenSocket, State, Name, Mod, Option);
- {error, Reason} ->
- {stop, Reason}
- end.
-
-init_acceptors(ListenSocket, State, {Dest, Name}, Mod, Option) ->
- #tcp_server_option{
- max_connections = MaxConn,
- max_restarts = MaxRestarts,
- time = Time,
- shutdown = Shutdown
- } = Option,
- {ok, {{one_for_one, MaxRestarts, Time}, lists:map(
- fun (N) ->
- AcceptorName = list_to_atom(
- atom_to_list(Name) ++ "_acceptor_" ++ integer_to_list(N)
- ),
- {
- AcceptorName,
- {
- ?MODULE,
- acceptor_start_link,
- [{Dest, AcceptorName}, ListenSocket, State, Mod, Option]
- },
- permanent,
- Shutdown,
- worker,
- []
- }
- end,
- lists:seq(1, MaxConn)
- )}}.
-
-% ProcLib - tcp_acceptor_N
-%% External APIs
-acceptor_start_link({Dest, Name}, ListenSocket, State, Mod, Option) ->
- {ok, Pid} = proc_lib:start_link(
- ?MODULE, acceptor_init, [self(), ListenSocket, State, Mod, Option]
- ),
- case Dest of
- local -> register(Name, Pid);
- _Global -> global:register_name(Name, Pid)
- end,
- {ok, Pid}.
-
-%% Callbacks
-acceptor_init(Parent, ListenSocket, State, Mod, Option) ->
- proc_lib:init_ack(Parent, {ok, self()}),
- acceptor_accept(ListenSocket, State, Mod, Option).
-
-acceptor_accept(ListenSocket, State, Mod, Option) ->
- {ok, Socket} = gen_tcp:accept(
- ListenSocket, Option#tcp_server_option.accept_timeout
- ),
- acceptor_loop(
- proplists:get_value(active, Option#tcp_server_option.listen),
- Socket, State, Mod, Option
- ),
- acceptor_accept(ListenSocket, State, Mod, Option).
-
-acceptor_loop(false, Socket, State, Mod, Option) ->
- case gen_tcp:recv(
- Socket,
- Option#tcp_server_option.recv_length,
- Option#tcp_server_option.recv_timeout
- ) of
- {ok, Data} ->
- call_mod(false, Socket, Data, State, Mod, Option);
- {error, closed} ->
- tcp_closed;
- {error, Reason} ->
- exit({error, Reason})
- end;
-
-acceptor_loop(true, _DummySocket, State, Mod, Option) ->
- receive
- {tcp, Socket, Data} ->
- call_mod(true, Socket, Data, State, Mod, Option);
- {tcp_closed, _Socket} ->
- tcp_closed;
- Error ->
- exit({error, Error})
- after Option#tcp_server_option.recv_timeout ->
- exit({error, tcp_timeout})
- end.
-
-call_mod(Active, Socket, Data, State, Mod, Option) ->
- case Mod:handle_call(Socket, Data, State) of
- {reply, DataToSend, State} ->
- gen_tcp:send(Socket, DataToSend),
- acceptor_loop(Active, Socket, State, Mod, Option);
- {noreply, State} ->
- acceptor_loop(Active, Socket, State, Mod, Option);
- {close, State} ->
- gen_tcp:close(Socket);
- {close, DataToSend, State} ->
- gen_tcp:send(Socket, DataToSend),
- gen_tcp:close(Socket);
- _Error ->
- gen_tcp:close(Socket)
- end.
-
Copied: tags/0.2.0/src/kai_tcp_server.erl (from rev 84, trunk/src/kai_tcp_server.erl)
===================================================================
--- tags/0.2.0/src/kai_tcp_server.erl (rev 0)
+++ tags/0.2.0/src/kai_tcp_server.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,169 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_tcp_server).
+-behaviour(supervisor).
+
+-export([behaviour_info/1]).
+-export([start_link/1, start_link/2, start_link/3, start_link/4]).
+-export([stop/0, stop/1]).
+-export([init/1, acceptor_init/5]).
+-export([acceptor_start_link/5]).
+
+-include("kai.hrl").
+
+% Behaviour Callbacks
+behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}];
+behaviour_info(_Other) -> undefined.
+
+% Supervisor - tcp_server
+%% External APIs
+start_link(Mod) -> start_link(Mod, []).
+start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}).
+start_link(Mod, Args, Option) ->
+ start_link({local, ?MODULE}, Mod, Args, Option).
+start_link(Name, Mod, Args, Option) ->
+ supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]).
+
+stop() -> stop(?MODULE).
+stop(Name) ->
+ case whereis(Name) of
+ Pid when is_pid(Pid) ->
+ exit(Pid, normal),
+ ok;
+ _ -> not_started
+ end.
+
+%% Callbacks
+init([Name, Mod, Args, Option]) ->
+ case Mod:init(Args) of
+ {ok, State} -> listen(State, Name, Mod, Option);
+ {stop, Reason} -> Reason;
+ Other -> Other % 'ignore' is contained.
+ end.
+
+%% Internal Functions
+listen(State, Name, Mod, Option) ->
+ case gen_tcp:listen(
+ Option#tcp_server_option.port,
+ Option#tcp_server_option.listen
+ ) of
+ {ok, ListenSocket} ->
+ init_acceptors(ListenSocket, State, Name, Mod, Option);
+ {error, Reason} ->
+ ?warning(io_lib:format("listen(~p) ~p", [Mod, {error, Reason}])),
+ {stop, Reason}
+ end.
+
+init_acceptors(ListenSocket, State, {Dest, Name}, Mod, Option) ->
+ #tcp_server_option{
+ max_processes = MaxProcesses,
+ max_restarts = MaxRestarts,
+ time = Time,
+ shutdown = Shutdown
+ } = Option,
+ {ok, {{one_for_one, MaxRestarts, Time}, lists:map(
+ fun (N) ->
+ AcceptorName = list_to_atom(
+ atom_to_list(Name) ++ "_acceptor_" ++ integer_to_list(N)
+ ),
+ {
+ AcceptorName,
+ {
+ ?MODULE,
+ acceptor_start_link,
+ [{Dest, AcceptorName}, ListenSocket, State, Mod, Option]
+ },
+ permanent,
+ Shutdown,
+ worker,
+ []
+ }
+ end,
+ lists:seq(1, MaxProcesses)
+ )}}.
+
+% ProcLib - tcp_acceptor_N
+%% External APIs
+acceptor_start_link({Dest, Name}, ListenSocket, State, Mod, Option) ->
+ {ok, Pid} = proc_lib:start_link(
+ ?MODULE, acceptor_init, [self(), ListenSocket, State, Mod, Option]
+ ),
+ case Dest of
+ local -> register(Name, Pid);
+ _Global -> global:register_name(Name, Pid)
+ end,
+ {ok, Pid}.
+
+%% Callbacks
+acceptor_init(Parent, ListenSocket, State, Mod, Option) ->
+ proc_lib:init_ack(Parent, {ok, self()}),
+ acceptor_accept(ListenSocket, State, Mod, Option).
+
+acceptor_accept(ListenSocket, State, Mod, Option) ->
+ case gen_tcp:accept(
+ ListenSocket, Option#tcp_server_option.accept_timeout
+ ) of
+ {ok, Socket} ->
+ acceptor_loop(
+ proplists:get_value(active, Option#tcp_server_option.listen),
+ Socket, State, Mod, Option
+ );
+ {error, Reason} ->
+ ?warning(io_lib:format("acceptor_accept(~p) ~p", [Mod, {error, Reason}])),
+ timer:sleep(Option#tcp_server_option.accept_error_sleep_time)
+ end,
+ acceptor_accept(ListenSocket, State, Mod, Option).
+
+acceptor_loop(false, Socket, State, Mod, Option) ->
+ case gen_tcp:recv(
+ Socket,
+ Option#tcp_server_option.recv_length,
+ Option#tcp_server_option.recv_timeout
+ ) of
+ {ok, Data} ->
+ call_mod(false, Socket, Data, State, Mod, Option);
+ {error, closed} ->
+ tcp_closed;
+ {error, Reason} ->
+ ?warning(io_lib:format("acceptor_loop(~p) ~p", [Mod, {error, Reason}])),
+ exit({error, Reason})
+ end;
+
+acceptor_loop(true, _DummySocket, State, Mod, Option) ->
+ receive
+ {tcp, Socket, Data} ->
+ call_mod(true, Socket, Data, State, Mod, Option);
+ {tcp_closed, _Socket} ->
+ tcp_closed;
+ Error ->
+ exit({error, Error})
+ after Option#tcp_server_option.recv_timeout ->
+ exit({error, tcp_timeout})
+ end.
+
+call_mod(Active, Socket, Data, State, Mod, Option) ->
+ case Mod:handle_call(Socket, Data, State) of
+ {reply, DataToSend, State} ->
+ gen_tcp:send(Socket, DataToSend),
+ acceptor_loop(Active, Socket, State, Mod, Option);
+ {noreply, State} ->
+ acceptor_loop(Active, Socket, State, Mod, Option);
+ {close, State} ->
+ gen_tcp:close(Socket);
+ {close, DataToSend, State} ->
+ gen_tcp:send(Socket, DataToSend),
+ gen_tcp:close(Socket);
+ _Error ->
+ gen_tcp:close(Socket)
+ end.
+
Deleted: tags/0.2.0/test/Makefile
===================================================================
--- trunk/test/Makefile 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/test/Makefile 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,38 +0,0 @@
-## Licensed under the Apache License, Version 2.0 (the "License"); you may not
-## use this file except in compliance with the License. You may obtain a copy
-## of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-## License for the specific language governing permissions and limitations
-## under the License.
-
-INCLUDE = $(ROOT)/include
-TEST_SERVER_INCLUDE = $(TEST_SERVER)/include
-RUN_TEST_INCLUDE = $(RUN_TEST)/include
-ERL_COMPILE_FLAGS += +warn_unused_vars +nowarn_shadow_vars +warn_unused_import
-
-.SUFFIXES: .erl .beam .yrl
-
-.erl.beam:
- erlc -W $(ERL_COMPILE_FLAGS) \
- -I$(INCLUDE) -I$(RUN_TEST_INCLUDE) -I$(TEST_SERVER_INCLUDE) \
- $<
-
-.yrl.erl:
- erlc -W $(ERL_COMPILE_FLAGS) -I$(INCLUDE) $<
-
-MODS = kai_config_SUITE kai_log_SUITE kai_hash_SUITE kai_store_SUITE \
- kai_version_SUITE kai_coordinator_SUITE kai_sync_SUITE \
- kai_membership_SUITE \
- kai_tcp_server_SUITE kai_api_SUITE kai_memcache_SUITE
-
-all: compile
-
-compile: ${MODS:%=%.beam}
-
-clean:
- rm -rf *.beam erl_crash.dump *~
Copied: tags/0.2.0/test/Makefile (from rev 84, trunk/test/Makefile)
===================================================================
--- tags/0.2.0/test/Makefile (rev 0)
+++ tags/0.2.0/test/Makefile 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,38 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy
+## of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations
+## under the License.
+
+INCLUDE = $(ROOT)/include
+TEST_SERVER_INCLUDE = $(TEST_SERVER)/include
+RUN_TEST_INCLUDE = $(RUN_TEST)/include
+ERL_COMPILE_FLAGS += +warn_unused_vars +nowarn_shadow_vars +warn_unused_import
+
+.SUFFIXES: .erl .beam .yrl
+
+.erl.beam:
+ erlc -W $(ERL_COMPILE_FLAGS) \
+ -I$(INCLUDE) -I$(RUN_TEST_INCLUDE) -I$(TEST_SERVER_INCLUDE) \
+ $<
+
+.yrl.erl:
+ erlc -W $(ERL_COMPILE_FLAGS) -I$(INCLUDE) $<
+
+MODS = kai_config_SUITE kai_log_SUITE kai_hash_SUITE kai_store_SUITE \
+ kai_version_SUITE kai_connection_SUITE kai_sync_SUITE \
+ kai_membership_SUITE kai_coordinator_SUITE kai_tcp_server_SUITE \
+ kai_api_SUITE kai_memcache_SUITE
+
+all: compile
+
+compile: ${MODS:%=%.beam}
+
+clean:
+ rm -rf *.beam erl_crash.dump *~
Deleted: tags/0.2.0/test/kai.coverspec
===================================================================
--- trunk/test/kai.coverspec 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/test/kai.coverspec 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,7 +0,0 @@
-{level, details}.
-{incl_mods, [
- kai_config, kai_log, kai_hash, kai_store, kai_version,
- kai_coordinator, kai_sync, kai_membership, kai_sup, kai,
- kai_tcp_server, kai_api, kai_memcache
-]}.
-
Copied: tags/0.2.0/test/kai.coverspec (from rev 84, trunk/test/kai.coverspec)
===================================================================
--- tags/0.2.0/test/kai.coverspec (rev 0)
+++ tags/0.2.0/test/kai.coverspec 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,7 @@
+{level, details}.
+{incl_mods, [
+ kai_config, kai_log, kai_hash, kai_store, kai_version,
+ kai_connection, kai_sync, kai_membership, kai_coordinator,
+ kai_tcp_server, kai_api, kai_memcache
+]}.
+
Deleted: tags/0.2.0/test/kai_api_SUITE.erl
===================================================================
--- trunk/test/kai_api_SUITE.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/test/kai_api_SUITE.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,68 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_api_SUITE).
--compile(export_all).
-
--include("kai.hrl").
--include("kai_test.hrl").
-
-all() -> [test1].
-
-test1() -> [].
-test1(_Conf) ->
- kai_config:start_link([
- {hostname, "localhost"},
- {port, 11011},
- {max_connections, 2},
- {n, 3},
- {number_of_buckets, 8},
- {number_of_virtual_nodes, 2}]),
- kai_hash:start_link(),
- kai_store:start_link(),
- kai_api:start_link(),
-
- timer:sleep(100), % wait for starting kai_api
-
- {node_info, ?NODE1, ?INFO} = kai_api:node_info(?NODE1),
-
- {node_list, [?NODE1]} = kai_api:node_list(?NODE1),
-
- Data = #data{
- key = "item-1",
- bucket = 3,
- last_modified = now(),
- checksum = erlang:md5(<<"value-1">>),
- flags = "0",
- value = (<<"value-1">>)
- },
- ok = kai_api:put(?NODE1, Data),
- ?assertEqual(Data, kai_store:get("item-1")),
-
- ListOfData = #data{
- key = "item-1",
- bucket = 3,
- last_modified = Data#data.last_modified,
- checksum = erlang:md5(<<"value-1">>)
- },
- {list_of_data, [ListOfData]} = kai_api:list(?NODE1, 3),
-
- Data = kai_api:get(?NODE1, "item-1"),
-
- ok = kai_api:delete(?NODE1, "item-1"),
-
- undefined = kai_api:get(?NODE1, "item-1"),
-
- kai_api:stop(),
- kai_store:stop(),
- kai_hash:stop(),
- kai_config:stop().
Copied: tags/0.2.0/test/kai_api_SUITE.erl (from rev 84, trunk/test/kai_api_SUITE.erl)
===================================================================
--- tags/0.2.0/test/kai_api_SUITE.erl (rev 0)
+++ tags/0.2.0/test/kai_api_SUITE.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(kai_api_SUITE).
+-compile(export_all).
+
+-include("kai.hrl").
+-include("kai_test.hrl").
+
+all() -> [test1].
+
+test1() -> [].
+test1(_Conf) ->
+ kai_config:start_link([
+ {hostname, "localhost"},
+ {api_port, 11011},
+ {api_max_processes, 2},
+ {n, 3},
+ {number_of_buckets, 8},
+ {number_of_virtual_nodes, 2}]),
+ kai_hash:start_link(),
+ kai_store:start_link(),
+ kai_connection:start_link(),
+ kai_api:start_link(),
+
+ timer:sleep(100), % wait for starting kai_api
+
+ {node_info, ?NODE1, ?INFO} = kai_api:node_info(?NODE1),
+
+ {node_list, [?NODE1]} = kai_api:node_list(?NODE1),
+
+ Data = #data{
+ key = "item-1",
+ bucket = 3,
+ last_modified = now(),
+ checksum = erlang:md5(<<"value-1">>),
+ flags = "0",
+ value = (<<"value-1">>)
+ },
+ ok = kai_api:put(?NODE1, Data),
+ ?assertEqual(Data, kai_store:get("item-1")),
+
+ ListOfData = #data{
+ key = "item-1",
+ bucket = 3,
+ last_modified = Data#data.last_modified,
+ checksum = erlang:md5(<<"value-1">>)
+ },
+ {list_of_data, [ListOfData]} = kai_api:list(?NODE1, 3),
+
+ Data = kai_api:get(?NODE1, "item-1"),
+
+ ok = kai_api:delete(?NODE1, "item-1"),
+
+ undefined = kai_api:get(?NODE1, "item-1"),
+
+ kai_api:stop(),
+ kai_connection:stop(),
+ kai_store:stop(),
+ kai_hash:stop(),
+ kai_config:stop().
Deleted: tags/0.2.0/test/kai_config_SUITE.erl
===================================================================
--- trunk/test/kai_config_SUITE.erl 2008-08-28 12:33:36 UTC (rev 83)
+++ tags/0.2.0/test/kai_config_SUITE.erl 2008-08-28 13:03:48 UTC (rev 85)
@@ -1,54 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(kai_config_SUITE).
--compile(export_all).
-
--include("kai.hrl").
--include("kai_test.hrl").
-
-all() -> [test1].
-
-test1() -> [].
-test1(_Conf) ->
- kai_config:start_link([
- {hostname, "localhost"},
- {port, 11011},
- {n, 2},
- {number_of_buckets, 16384},
- {number_of_virtual_nodes, 128}
- ]),
-
- ?assertEqual(
- ?NODE1,
- kai_config:get(node)
- ),
- ?assertEqual(
- 16384,
- kai_config:get(number_of_buckets)
- ),
- ?assertEqual(
- 128,
- kai_config:get(number_of_virtual_nodes)
- ),
-
- ?assertEqual(
- [?NODE1, 16384, 128],
- kai_config:get([node, number_of_buckets, number_of_virtual_nodes])
- ),
-
- ?assertEqual(
- {node_info, ?NODE1, [{number_of_virtual_nodes, 128}]},
- kai_config:node_info()
- ),
-
- kai_config:stop().
Copied: tags/0.2.0/test/kai_config_SUITE.erl (from rev 84, trunk/test/kai_config_SUITE.erl)
===================================================================
--- tags/0.2.0/test/kai_config_SUITE.erl (rev 0)
@@ Diff output truncated at 100000 characters. @@
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|