kai-commits Mailing List for kai
Kai is a distributed key-value datastore
Status: Beta
Brought to you by:
takemaru
You can subscribe to this list here.
| 2008 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(16) |
Aug
(30) |
Sep
(4) |
Oct
(3) |
Nov
(8) |
Dec
(4) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2009 |
Jan
|
Feb
(10) |
Mar
(24) |
Apr
|
May
(8) |
Jun
(11) |
Jul
(4) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|
From: <tak...@us...> - 2009-07-29 13:59:16
|
Revision: 166
http://kai.svn.sourceforge.net/kai/?rev=166&view=rev
Author: takemaru
Date: 2009-07-29 13:59:03 +0000 (Wed, 29 Jul 2009)
Log Message:
-----------
* src/kai_store_dets.erl
- Added {repair, force} option to dets:open_file.
Modified Paths:
--------------
branches/0.4.1rc/src/kai_store_dets.erl
Modified: branches/0.4.1rc/src/kai_store_dets.erl
===================================================================
--- branches/0.4.1rc/src/kai_store_dets.erl 2009-07-27 14:20:53 UTC (rev 165)
+++ branches/0.4.1rc/src/kai_store_dets.erl 2009-07-29 13:59:03 UTC (rev 166)
@@ -33,7 +33,7 @@
fun(I) ->
Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(I)),
File = Dir ++ "/" ++ integer_to_list(I),
- case dets:open_file(Name, [{type, set}, {keypos, 2}, {file, File}]) of
+ case dets:open_file(Name, [{type, set}, {keypos, 2}, {file, File}, {repair, force}]) of
{ok, Table} -> {I, Table};
{error, Reason} -> ?info(Reason),
exit(Reason)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-07-27 14:21:02
|
Revision: 165
http://kai.svn.sourceforge.net/kai/?rev=165&view=rev
Author: takemaru
Date: 2009-07-27 14:20:53 +0000 (Mon, 27 Jul 2009)
Log Message:
-----------
* src/kai_membership.erl
- Retry pinging to suspicious nodes till 1 min has elapsed, before removing them.
Modified Paths:
--------------
branches/0.4.1rc/src/kai_membership.erl
branches/0.4.1rc/test/kai_membership_SUITE.erl
Modified: branches/0.4.1rc/src/kai_membership.erl
===================================================================
--- branches/0.4.1rc/src/kai_membership.erl 2009-07-27 13:04:05 UTC (rev 164)
+++ branches/0.4.1rc/src/kai_membership.erl 2009-07-27 14:20:53 UTC (rev 165)
@@ -28,20 +28,50 @@
gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
init(_Args) ->
+ ets:new(?MODULE, [set, public, named_table]), %% XXX public for tests
{ok, ready, [], ?TIMER}.
terminate(_Reason, _StateName, _StateData) ->
+ ets:delete(?MODULE),
ok.
+has_time_elapsed(Time) ->
+ {Msec1, Sec1, _} = Time,
+ {Msec2, Sec2, _} = now(),
+ 1000000 * Msec1 + Sec1 + 60 < 1000000 * Msec2 + Sec2. %% XXX 60 sec. is a magic number
+
+is_really_dead(AvailableNodes, DownNodes, Suspect) ->
+ case ets:lookup(?MODULE, Suspect) of
+ [{Suspect, Since}] ->
+ case has_time_elapsed(Since) of
+ true ->
+ ets:delete(?MODULE, Suspect),
+ {AvailableNodes, [Suspect|DownNodes]};
+ _ ->
+ {AvailableNodes, DownNodes}
+ end;
+ _ ->
+ ets:insert(?MODULE, {Suspect, now()}),
+ {AvailableNodes, DownNodes}
+ end.
+
+actually_alive(AvailableNodes, DownNodes, Suspect) ->
+ ets:delete(?MODULE, Suspect),
+ {[Suspect|AvailableNodes], DownNodes}.
+
ping_nodes([], AvailableNodes, DownNodes) ->
{AvailableNodes, DownNodes};
ping_nodes([Node|Nodes], AvailableNodes, DownNodes) ->
case kai_rpc:node_info(Node) of
{node_info, Node2, Info} ->
- ping_nodes(Nodes, [{Node2, Info}|AvailableNodes], DownNodes);
+% ping_nodes(Nodes, [{Node2, Info}|AvailableNodes], DownNodes);
+ {Avail, Down} = actually_alive(AvailableNodes, DownNodes, {Node2, Info}),
+ ping_nodes(Nodes, Avail, Down);
{error, Reason} ->
?warning(io_lib:format("ping_nodes/3: ~p", [{error, Reason}])),
- ping_nodes(Nodes, AvailableNodes, [Node|DownNodes])
+% ping_nodes(Nodes, AvailableNodes, [Node|DownNodes])
+ {Avail, Down} = is_really_dead(AvailableNodes, DownNodes, Node),
+ ping_nodes(Nodes, Avail, Down)
end.
retrieve_node_list(Node) ->
@@ -55,7 +85,8 @@
ping_nodes(Nodes -- [LocalNode], [], []);
{error, Reason} ->
?warning(io_lib:format("retrieve_node_list/1: ~p", [{error, Reason}])),
- {[], [Node]}
+% {[], [Node]}
+ is_really_dead([], [], Node)
end.
sync_buckets([], _LocalNode) ->
Modified: branches/0.4.1rc/test/kai_membership_SUITE.erl
===================================================================
--- branches/0.4.1rc/test/kai_membership_SUITE.erl 2009-07-27 13:04:05 UTC (rev 164)
+++ branches/0.4.1rc/test/kai_membership_SUITE.erl 2009-07-27 14:20:53 UTC (rev 165)
@@ -13,6 +13,7 @@
-module(kai_membership_SUITE).
-compile(export_all).
+-include("ct.hrl").
-include("kai.hrl").
-include("kai_test.hrl").
@@ -109,19 +110,35 @@
timer:sleep(100),
{node_list, NodeList2} = kai_hash:node_list(),
- ?assertEqual(3, length(NodeList2)),
- ?assertNot(lists:member(?NODE1, NodeList2)),
+% ?assertEqual(3, length(NodeList2)),
+% ?assertNot(lists:member(?NODE1, NodeList2)),
+ ?assertEqual(4, length(NodeList2)),
{list_of_data, ListOfData2} = kai_store:list(3),
- ?assertEqual(1, length(ListOfData2)),
- ?assert(lists:keymember("item-1", 2, ListOfData2)),
+% ?assertEqual(1, length(ListOfData2)),
+% ?assert(lists:keymember("item-1", 2, ListOfData2)),
+ ?assertEqual(0, length(ListOfData2)),
timer:sleep(2100), % timeout and check ?NODE4 by kai_hash:choose_node_randomly/0
{node_list, NodeList3} = kai_hash:node_list(),
- ?assertEqual(2, length(NodeList3)),
- ?assertNot(lists:member(?NODE4, NodeList3)),
+% ?assertEqual(2, length(NodeList3)),
+% ?assertNot(lists:member(?NODE4, NodeList3)),
+ ?assertEqual(4, length(NodeList3)),
+ct:pal(debug, "~p", [ets:tab2list(kai_membership)]),
+ timer:sleep(57000), %% Less than 60 - 2 sec.
+ct:pal(debug, "~p", [ets:tab2list(kai_membership)]),
+
+ {node_list, NodeList4} = kai_hash:node_list(),
+ ?assertEqual(4, length(NodeList4)),
+
+ timer:sleep(16000),
+ct:pal(debug, "~p", [ets:tab2list(kai_membership)]),
+
+ {node_list, NodeList5} = kai_hash:node_list(),
+ ?assert(length(NodeList5) < 4),
+
kai_membership:stop(),
kai_sync:stop(),
kai_connection:stop(),
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-07-27 13:04:15
|
Revision: 164
http://kai.svn.sourceforge.net/kai/?rev=164&view=rev
Author: takemaru
Date: 2009-07-27 13:04:05 +0000 (Mon, 27 Jul 2009)
Log Message:
-----------
* branches/0.4.1rc
- Created a new branch from tags/0.4.0 for sticky membership management.
Added Paths:
-----------
branches/0.4.1rc/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-07-19 09:26:43
|
Revision: 163
http://kai.svn.sourceforge.net/kai/?rev=163&view=rev
Author: shino_shun
Date: 2009-07-19 09:26:41 +0000 (Sun, 19 Jul 2009)
Log Message:
-----------
Merge branch 'shino_data_in_bag_local' into trunk_local
Modified Paths:
--------------
trunk/Makefile
trunk/ebin/.gitignore
trunk/src/kai_coordinator.erl
trunk/src/kai_store_dets.erl
trunk/src/kai_store_ets.erl
trunk/src/kai_sync.erl
trunk/test/kai_config_SUITE.erl
trunk/test/kai_coordinator_SUITE.erl
trunk/test/kai_membership_SUITE.erl
trunk/test/kai_rpc_SUITE.erl
trunk/test/kai_store_SUITE.erl
Modified: trunk/Makefile
===================================================================
--- trunk/Makefile 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/Makefile 2009-07-19 09:26:41 UTC (rev 163)
@@ -42,6 +42,12 @@
-logdir test/log -cover test/kai.coverspec \
-I$(ROOT)/include -pa $(ROOT)/ebin
+test_single_case: test_compile
+ mkdir -p test/log
+ ${RUN_TEST_CMD} -suite $(SUITE) -case ${CASE}\
+ -logdir test/log -cover test/kai.coverspec \
+ -I$(ROOT)/include -pa $(ROOT)/ebin
+
docs:
erl -noshell -run edoc_run application "'kai'" \
'"."' '[{def,{vsn, "$(KAI_VSN)"}}]'
Modified: trunk/ebin/.gitignore
===================================================================
--- trunk/ebin/.gitignore 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/ebin/.gitignore 2009-07-19 09:26:41 UTC (rev 163)
@@ -1 +1,2 @@
*.beam
+kai.app
Modified: trunk/src/kai_coordinator.erl
===================================================================
--- trunk/src/kai_coordinator.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/src/kai_coordinator.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -102,13 +102,13 @@
end.
gather_in_get(_Ref, _N, 0, Results) ->
- Results;
+ lists:flatten(Results);
gather_in_get(_Ref, 0, _R, _Results) ->
{error, enodata};
gather_in_get(Ref, N, R, Results) ->
receive
- {Ref, Data} when is_record(Data, data) ->
- gather_in_get(Ref, N-1, R-1, [Data|Results]);
+ {Ref, ListOfData} when is_list(ListOfData) ->
+ gather_in_get(Ref, N-1, R-1, [ListOfData|Results]);
{Ref, undefined} ->
gather_in_get(Ref, N-1, R-1, Results);
{Ref, _Other} ->
@@ -125,14 +125,18 @@
{ok, Bucket} = kai_hash:find_bucket(Key),
{ok, DstNodes} = kai_hash:find_nodes(Bucket),
Ref = make_ref(),
- Data1 =
+ VcList =
case kai_store:get(Data#data{bucket=Bucket}) of
- PreviousData when is_record(PreviousData, data) ->
- PreviousData;
+ PreviousDataList when is_list(PreviousDataList) ->
+ lists:map(
+ fun(PreviousData) ->
+ PreviousData#data.vector_clocks end,
+ PreviousDataList);
undefined ->
- #data{key=Key, vector_clocks=vclock:fresh()}
+ [vclock:fresh()]
end,
- {ok, Data2} = kai_version:update(Data1),
+ {ok, Data2} = kai_version:update(
+ Data#data{vector_clocks = vclock:merge(VcList)}),
Data3 = Data2#data{
bucket = Bucket,
checksum = erlang:md5(Value),
Modified: trunk/src/kai_store_dets.erl
===================================================================
--- trunk/src/kai_store_dets.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/src/kai_store_dets.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -33,7 +33,7 @@
fun(I) ->
Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(I)),
File = Dir ++ "/" ++ integer_to_list(I),
- case dets:open_file(Name, [{type, set}, {keypos, 2}, {file, File}]) of
+ case dets:open_file(Name, [{type, bag}, {keypos, 2}, {file, File}]) of
{ok, Table} -> {I, Table};
{error, Reason} -> ?info(Reason),
exit(Reason)
@@ -79,35 +79,38 @@
do_get(#data{key=Key, bucket=Bucket} = _Data, State) ->
Table = bucket_to_table(Bucket, State),
case dets:lookup(Table, Key) of
- [Data] -> {reply, Data, State};
- _ -> {reply, undefined, State}
+ [] -> {reply, undefined, State};
+ StoredDataList -> {reply, StoredDataList, State}
end.
do_put(Data, State) when is_record(Data, data) ->
Table = bucket_to_table(Data#data.bucket, State),
- case dets:lookup(Table, Data#data.key) of
- [StoredData] ->
- case vclock:descends(Data#data.vector_clocks, StoredData#data.vector_clocks) of
- true -> insert_and_reply(Data, Table, State);
- _ -> {reply, {error, "stale or concurrent state found in kai_store"}, State}
- end;
- _ -> insert_and_reply(Data, Table, State)
- end.
+ insert_and_remove(Table, Data, dets:lookup(Table, Data#data.key)),
+ {reply, ok, State}.
-insert_and_reply(Data, Table, State) ->
+insert_and_remove(Table, Data, StoredDataList) ->
dets:insert(Table, Data),
- dets:sync(Table),
- {reply, ok, State}.
+ remove_descend_data(Table, Data#data.vector_clocks, StoredDataList),
+ dets:sync(Table).
+remove_descend_data(_Table, _Vc, []) ->
+ ok;
+remove_descend_data(Table, Vc, [StoredData|Rest]) ->
+ case vclock:descends(Vc, StoredData#data.vector_clocks) of
+ true -> dets:delete_object(Table, StoredData);
+ _ -> nop
+ end,
+ remove_descend_data(Table, Vc, Rest).
+
do_delete(#data{key=Key, bucket=Bucket} = _Data, State) ->
Table = bucket_to_table(Bucket, State),
case dets:lookup(Table, Key) of
- [_Data2] ->
+ [] ->
+ {reply, undefined, State};
+ _StoredDataList ->
dets:delete(Table, Key),
dets:sync(Table),
- {reply, ok, State};
- _ ->
- {reply, undefined, State}
+ {reply, ok, State}
end.
info(Name, State) ->
Modified: trunk/src/kai_store_ets.erl
===================================================================
--- trunk/src/kai_store_ets.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/src/kai_store_ets.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -25,7 +25,7 @@
gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []).
init(_Args) ->
- ets:new(?MODULE, [set, private, named_table, {keypos, 2}]),
+ ets:new(?MODULE, [bag, private, named_table, {keypos, 2}]),
{ok, []}.
terminate(_Reason, _State) ->
@@ -37,7 +37,7 @@
key = '$1',
bucket = Bucket,
last_modified = '$2',
- vector_clocks = '$3',
+ vector_clocks = '$3',
checksum = '$4',
flags = '_',
value = '_'
@@ -55,33 +55,37 @@
do_get(#data{key=Key} = _Data, State) ->
case ets:lookup(?MODULE, Key) of
- [Data] -> {reply, Data, State};
- _ -> {reply, undefined, State}
+ [] -> {reply, undefined, State};
+ StoredDataList -> {reply, StoredDataList, State}
end.
do_put(Data, State) when is_record(Data, data) ->
- case ets:lookup(?MODULE, Data#data.key) of
- [StoredData] ->
- case vclock:descends(Data#data.vector_clocks, StoredData#data.vector_clocks) of
- true -> insert_and_reply(Data, State);
- _ -> {reply, {error, "stale or concurrent state found in kai_store"}, State}
- end;
- _ -> insert_and_reply(Data, State)
- end.
+ insert_and_remove(Data, ets:lookup(?MODULE, Data#data.key)),
+ {reply, ok, State}.
-insert_and_reply(Data, State) ->
+insert_and_remove(Data, StoredDataList) ->
ets:insert(?MODULE, Data),
- {reply, ok, State}.
+ remove_descend_data(Data#data.vector_clocks, StoredDataList).
+remove_descend_data(_Vc, []) ->
+ ok;
+remove_descend_data(Vc, [StoredData|Rest]) ->
+ case vclock:descends(Vc, StoredData#data.vector_clocks) of
+ true -> ets:delete_object(?MODULE, StoredData);
+ _ -> nop
+ end,
+ remove_descend_data(Vc, Rest).
+
do_delete(#data{key=Key} = _Data, State) ->
case ets:lookup(?MODULE, Key) of
- [_Data2] ->
+ [] ->
+ {reply, undefined, State};
+ _StoredDataList ->
ets:delete(?MODULE, Key),
- {reply, ok, State};
- _ ->
- {reply, undefined, State}
+ {reply, ok, State}
end.
+
info(Name, State) ->
Value =
case Name of
Modified: trunk/src/kai_sync.erl
===================================================================
--- trunk/src/kai_sync.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/src/kai_sync.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -58,12 +58,14 @@
ok;
retrieve_data(Node, [Metadata|Rest], State) ->
case kai_store:get(Metadata) of
- Data when is_record(Data, data) ->
+ DataList when is_list(DataList) ->
retrieve_data(Node, Rest, State);
undefined ->
case kai_rpc:get(Node, State#state.node, Metadata) of
- Data when is_record(Data, data) ->
- kai_store:put(Data),
+ RemoteDataList when is_list(RemoteDataList) ->
+ lists:map(fun(RemoteData) ->
+ kai_store:put(RemoteData) end,
+ RemoteDataList),
retrieve_data(Node, Rest, State);
undefined ->
retrieve_data(Node, Rest, State);
Modified: trunk/test/kai_config_SUITE.erl
===================================================================
--- trunk/test/kai_config_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/test/kai_config_SUITE.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -48,6 +48,8 @@
kai_config:start_link(Args),
Conf.
+end_per_testcase(quorum_error, _Conf) ->
+ ok;
end_per_testcase(_TestCase, _Conf) ->
kai_config:stop().
Modified: trunk/test/kai_coordinator_SUITE.erl
===================================================================
--- trunk/test/kai_coordinator_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/test/kai_coordinator_SUITE.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -77,7 +77,7 @@
value = <<"value">>
},
ok = rpc:call(Node, kai_coordinator, route, [?NODE1, {put, Data, Quorum}]),
- [_] = rpc:call(Node, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, Quorum}]),
+ [_Data] = rpc:call(Node, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, Quorum}]),
N = replica_counts(Key, Nodes),
ok = rpc:call(Node, kai_coordinator, route, [?NODE1, {delete, #data{key=Key}, Quorum}]),
@@ -95,10 +95,10 @@
replica_counts(Key, [Node|Rest], Acc) ->
{ok, Bucket} = rpc:call(Node, kai_hash, find_bucket, [Key]),
case rpc:call(Node, kai_store, get, [#data{key=Key, bucket=Bucket}]) of
- Data when is_record(Data, data) ->
- replica_counts(Key, Rest, Acc + 1);
undefined ->
- replica_counts(Key, Rest, Acc)
+ replica_counts(Key, Rest, Acc);
+ ListOfData when is_list(ListOfData) ->
+ replica_counts(Key, Rest, Acc + length(ListOfData))
end.
get_concurrent_data(Conf) ->
@@ -125,8 +125,14 @@
flags = "0",
value = <<"value">>
},
- {error, ebusy} = rpc:call(Node1, kai_coordinator, route, [?NODE1, {put, Data, {2,2,2}}]).
+ ok = rpc:call(Node1, kai_coordinator, route, [?NODE1, {put, Data, {2,2,2}}]),
+ %% Node1 has one version, Node2 has two version,
+ %% one from prep_concurrent_data, another from kai_coordinator:put
+ [_,_] = rpc:call(Node1, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, {2,2,2}}]),
+ ?assertEqual(3, replica_counts(Key, [Node1, Node2])),
+ ok.
+
prep_concurrent_data(Key, Nodes) when is_list(Key) ->
Data = #data{
key = Key,
@@ -175,9 +181,15 @@
set_clock_ahead(Key, Node1),
- %% FIXME: This operation must be succeeded
- {error, ebusy} = rpc:call(Node2, kai_coordinator, route, [?NODE2, {put, Data, {2,2,2}}]).
+ ok = rpc:call(Node2, kai_coordinator, route, [?NODE2, {put, Data, {2,2,2}}]),
+ %% Node2 has one version, Node1 has two version,
+ %% one from prep_concurrent_data, another from kai_coordinator:put
+ [_,_] = rpc:call(Node1, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, {2,2,2}}]),
+ ?assertEqual(3, replica_counts(Key, [Node1, Node2])),
+
+ ok.
+
set_clock_ahead(Key, Node) ->
SrcNode = rpc:call(Node, kai_config, get, [node]),
[Data] = rpc:call(Node, kai_coordinator, route, [SrcNode, {get, #data{key=Key}, {2,2,2}}]),
Modified: trunk/test/kai_membership_SUITE.erl
===================================================================
--- trunk/test/kai_membership_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/test/kai_membership_SUITE.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -20,7 +20,7 @@
sequences() ->
[{seq, [add_directly, add_indirectly,
remove_directly, remove_indirectly,
- update_by_timeout, sync_and_delete, sync_and_collect]}].
+ update_by_timeout, sync_and_move_out, sync_and_collect]}].
all() -> [{sequence, seq}].
@@ -164,7 +164,7 @@
?assertEqual(3, length(NodeList2)),
?assert(lists:member(?NODE3, NodeList2)).
-sync_and_delete(Conf) ->
+sync_and_move_out(Conf) ->
Node1 = ?config(node1, Conf),
%% 16 data are put at Node1
@@ -199,18 +199,17 @@
rpc:call(Node1, kai_membership, check_node, [?NODE3]),
rpc:call(Node3, kai_membership, check_node, [?NODE1]),
- %% Some data are moved to Node3
+ %% Some data are moved to Node3.
wait(),
- %% Node3 is down
+ %% Node3 is down, and Node 1 checks that.
ok = slave:stop(Node3),
-
rpc:call(Node1, kai_membership, check_node, [?NODE3]),
wait(),
%% All data must come back to Node1, since Node3 was down
- DataNum = count(Node1, 4, 0).
+ ?assertEqual(DataNum, count(Node1, 4, 0)).
prep(_Node, 0) ->
ok;
@@ -223,7 +222,7 @@
last_modified = now(),
checksum = erlang:md5(<<"value">>),
flags = "0",
- vector_clocks = vclock:fresh(),
+ vector_clocks = vclock:increment(Node, vclock:fresh()),
value = <<"value">>
}]),
prep(Node, I-1).
@@ -232,6 +231,7 @@
DataNum;
count(Node, Bucket, DataNum) ->
{ok, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]),
+ ct:log("~p 's Bucket #~p: ~p", [Node, Bucket-1, KeyList]),
count(Node, Bucket-1, DataNum + length(KeyList)).
%% TODO: Check whether data is synchronized
Modified: trunk/test/kai_rpc_SUITE.erl
===================================================================
--- trunk/test/kai_rpc_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/test/kai_rpc_SUITE.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -75,7 +75,7 @@
},
ok = rpc:call(Node1, kai_rpc, put, [?NODE2, ?NODE1, Data]),
- Data = rpc:call(Node1, kai_rpc, get, [?NODE2, ?NODE1, #data{key="key1", bucket=3}]),
+ [Data] = rpc:call(Node1, kai_rpc, get, [?NODE2, ?NODE1, #data{key="key1", bucket=3}]),
{ok, [Key]} = rpc:call(Node1, kai_rpc, list, [?NODE2, ?NODE1, 3]),
"key1" = Key#data.key,
Modified: trunk/test/kai_store_SUITE.erl
===================================================================
--- trunk/test/kai_store_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
+++ trunk/test/kai_store_SUITE.erl 2009-07-19 09:26:41 UTC (rev 163)
@@ -31,22 +31,22 @@
{dets_tables, 2}]).
sequences() ->
- [{ ets, [ ets_crud, ets_conflict, ets_info, ets_perf]},
- {dets, [dets_crud, dets_conflict, dets_info, dets_perf]}].
+ [{ ets, [ ets_crud, ets_all_delete, ets_conflict, ets_info, ets_perf]},
+ {dets, [dets_crud, dets_all_delete,dets_conflict, dets_info, dets_perf]}].
all() -> [{sequence, ets}, {sequence, dets}].
init_per_testcase(TestCase, Conf) ->
case atom_to_list(TestCase) of
- ["dets"|_] -> init(Conf, ?DETS_ARGS);
- _ -> init(Conf, ?ETS_ARGS)
+ "dets" ++ _ -> init(Conf, ?DETS_ARGS);
+ _ -> init(Conf, ?ETS_ARGS)
end.
init(Conf, Args) ->
kai_config:start_link(Args),
kai_store:start_link(),
Conf.
-
+
end_per_testcase(_TestCase, _Conf) ->
kai_store:stop(),
kai_config:stop(),
@@ -67,7 +67,7 @@
},
ok = kai_store:put(Data),
- Data = kai_store:get(#data{key="key1", bucket=3}),
+ [Data] = kai_store:get(#data{key="key1", bucket=3}),
undefined = kai_store:get(#data{key="key2", bucket=1}),
{ok, [Key]} = kai_store:list(3),
@@ -84,7 +84,7 @@
},
ok = kai_store:put(Data2),
- Data2 = kai_store:get(#data{key="key1", bucket=3}),
+ [Data2] = kai_store:get(#data{key="key1", bucket=3}),
ok = kai_store:delete(#data{key="key1", bucket=3}),
@@ -95,23 +95,54 @@
dets_conflict(_Conf) -> conflict().
conflict() ->
+ InitialVc = vclock:increment(node1, vclock:fresh()),
Data = #data{
key = "key1",
bucket = 3,
last_modified = now(),
checksum = erlang:md5(<<"value1">>),
flags = "0",
+ vector_clocks = InitialVc,
+ value = <<"value1">>
+ },
+ ok = kai_store:put(Data),
+
+ ConflictingVc = vclock:increment(node2, vclock:fresh()),
+ ok = kai_store:put(Data#data{ vector_clocks = ConflictingVc }),
+ ?assertEqual(2, length(kai_store:get(Data))),
+
+ AscendingVc = vclock:increment(node2, ConflictingVc),
+ ok = kai_store:put(Data#data{ vector_clocks = AscendingVc }),
+ ?assertEqual(2, length(kai_store:get(Data))),
+
+ MergedVc = vclock:merge([InitialVc, AscendingVc]),
+ ok = kai_store:put(Data#data{ vector_clocks = MergedVc }),
+ ?assertEqual(1, length(kai_store:get(Data))),
+ ok.
+
+ets_all_delete(_Conf) -> all_delete().
+dets_all_delete(_Conf) -> all_delete().
+
+all_delete() ->
+ Data = #data{
+ key = "key1",
+ bucket = 3,
+ last_modified = now(),
+ checksum = erlang:md5(<<"value1">>),
+ flags = "0",
vector_clocks = vclock:increment(node1, vclock:fresh()),
value = <<"value1">>
},
ok = kai_store:put(Data),
- {error, _Reason} =
- kai_store:put(Data#data{
- %% Conflicting VectorClocks
- vector_clocks = vclock:increment(node2, vclock:fresh())
- }).
+ ok = kai_store:put(
+ Data#data{vector_clocks = vclock:increment(node2, vclock:fresh())}),
+ ?assertEqual(2, length(kai_store:get(Data))),
+ ok = kai_store:delete(Data),
+ undefined = kai_store:get(Data),
+ ok.
+
ets_info(_Conf) -> info().
dets_info(_Conf) -> info().
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:29:29
|
Revision: 162
http://kai.svn.sourceforge.net/kai/?rev=162&view=rev
Author: shino_shun
Date: 2009-06-23 03:29:28 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
patch set 4/4 (N = 4 now)
Change kai_store type from set to bag, in order to store
multiple conflicting data.
Consequently, the return type of thier 'get' becomes [data].
Change: kai_sync, misc changes to follow kai_store's interface change
Change: kai_membership_SUITE, misc changes e.g. comments and logs
At this point, all test cases pass.
Modified Paths:
--------------
branches/shino_data_in_bag/src/kai_sync.erl
branches/shino_data_in_bag/test/kai_membership_SUITE.erl
Modified: branches/shino_data_in_bag/src/kai_sync.erl
===================================================================
--- branches/shino_data_in_bag/src/kai_sync.erl 2009-06-23 03:29:03 UTC (rev 161)
+++ branches/shino_data_in_bag/src/kai_sync.erl 2009-06-23 03:29:28 UTC (rev 162)
@@ -58,12 +58,14 @@
ok;
retrieve_data(Node, [Metadata|Rest], State) ->
case kai_store:get(Metadata) of
- Data when is_record(Data, data) ->
+ DataList when is_list(DataList) ->
retrieve_data(Node, Rest, State);
undefined ->
case kai_rpc:get(Node, State#state.node, Metadata) of
- Data when is_record(Data, data) ->
- kai_store:put(Data),
+ RemoteDataList when is_list(RemoteDataList) ->
+ lists:map(fun(RemoteData) ->
+ kai_store:put(RemoteData) end,
+ RemoteDataList),
retrieve_data(Node, Rest, State);
undefined ->
retrieve_data(Node, Rest, State);
Modified: branches/shino_data_in_bag/test/kai_membership_SUITE.erl
===================================================================
--- branches/shino_data_in_bag/test/kai_membership_SUITE.erl 2009-06-23 03:29:03 UTC (rev 161)
+++ branches/shino_data_in_bag/test/kai_membership_SUITE.erl 2009-06-23 03:29:28 UTC (rev 162)
@@ -20,7 +20,7 @@
sequences() ->
[{seq, [add_directly, add_indirectly,
remove_directly, remove_indirectly,
- update_by_timeout, sync_and_delete, sync_and_collect]}].
+ update_by_timeout, sync_and_move_out, sync_and_collect]}].
all() -> [{sequence, seq}].
@@ -164,7 +164,7 @@
?assertEqual(3, length(NodeList2)),
?assert(lists:member(?NODE3, NodeList2)).
-sync_and_delete(Conf) ->
+sync_and_move_out(Conf) ->
Node1 = ?config(node1, Conf),
%% 16 data are put at Node1
@@ -199,18 +199,17 @@
rpc:call(Node1, kai_membership, check_node, [?NODE3]),
rpc:call(Node3, kai_membership, check_node, [?NODE1]),
- %% Some data are moved to Node3
+ %% Some data are moved to Node3.
wait(),
- %% Node3 is down
+ %% Node3 is down, and Node 1 checks that.
ok = slave:stop(Node3),
-
rpc:call(Node1, kai_membership, check_node, [?NODE3]),
wait(),
%% All data must come back to Node1, since Node3 was down
- DataNum = count(Node1, 4, 0).
+ ?assertEqual(DataNum, count(Node1, 4, 0)).
prep(_Node, 0) ->
ok;
@@ -223,7 +222,7 @@
last_modified = now(),
checksum = erlang:md5(<<"value">>),
flags = "0",
- vector_clocks = vclock:fresh(),
+ vector_clocks = vclock:increment(Node, vclock:fresh()),
value = <<"value">>
}]),
prep(Node, I-1).
@@ -232,6 +231,7 @@
DataNum;
count(Node, Bucket, DataNum) ->
{ok, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]),
+ ct:log("~p 's Bucket #~p: ~p", [Node, Bucket-1, KeyList]),
count(Node, Bucket-1, DataNum + length(KeyList)).
%% TODO: Check whether data is synchronized
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:29:04
|
Revision: 161
http://kai.svn.sourceforge.net/kai/?rev=161&view=rev
Author: shino_shun
Date: 2009-06-23 03:29:03 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
patch set 3/N (N is yet unknown)
Change kai_store type from set to bag, in order to store
multiple conflicting data.
Consequently, the return type of thier 'get' becomes [data].
Change: in get, kai_coordinator gathers lists of #data
Change: in put, kai_coordinator merges lists of vector clocks of local data
Change: kai_coordinator_SUITE, misc changes to follow kai_store's interface change
Modified Paths:
--------------
branches/shino_data_in_bag/src/kai_coordinator.erl
branches/shino_data_in_bag/test/kai_coordinator_SUITE.erl
Modified: branches/shino_data_in_bag/src/kai_coordinator.erl
===================================================================
--- branches/shino_data_in_bag/src/kai_coordinator.erl 2009-06-23 03:28:38 UTC (rev 160)
+++ branches/shino_data_in_bag/src/kai_coordinator.erl 2009-06-23 03:29:03 UTC (rev 161)
@@ -102,13 +102,13 @@
end.
gather_in_get(_Ref, _N, 0, Results) ->
- Results;
+ lists:flatten(Results);
gather_in_get(_Ref, 0, _R, _Results) ->
{error, enodata};
gather_in_get(Ref, N, R, Results) ->
receive
- {Ref, Data} when is_record(Data, data) ->
- gather_in_get(Ref, N-1, R-1, [Data|Results]);
+ {Ref, ListOfData} when is_list(ListOfData) ->
+ gather_in_get(Ref, N-1, R-1, [ListOfData|Results]);
{Ref, undefined} ->
gather_in_get(Ref, N-1, R-1, Results);
{Ref, _Other} ->
@@ -125,14 +125,18 @@
{ok, Bucket} = kai_hash:find_bucket(Key),
{ok, DstNodes} = kai_hash:find_nodes(Bucket),
Ref = make_ref(),
- Data1 =
+ VcList =
case kai_store:get(Data#data{bucket=Bucket}) of
- PreviousData when is_record(PreviousData, data) ->
- PreviousData;
+ PreviousDataList when is_list(PreviousDataList) ->
+ lists:map(
+ fun(PreviousData) ->
+ PreviousData#data.vector_clocks end,
+ PreviousDataList);
undefined ->
- #data{key=Key, vector_clocks=vclock:fresh()}
+ [vclock:fresh()]
end,
- {ok, Data2} = kai_version:update(Data1),
+ {ok, Data2} = kai_version:update(
+ Data#data{vector_clocks = vclock:merge(VcList)}),
Data3 = Data2#data{
bucket = Bucket,
checksum = erlang:md5(Value),
Modified: branches/shino_data_in_bag/test/kai_coordinator_SUITE.erl
===================================================================
--- branches/shino_data_in_bag/test/kai_coordinator_SUITE.erl 2009-06-23 03:28:38 UTC (rev 160)
+++ branches/shino_data_in_bag/test/kai_coordinator_SUITE.erl 2009-06-23 03:29:03 UTC (rev 161)
@@ -77,7 +77,7 @@
value = <<"value">>
},
ok = rpc:call(Node, kai_coordinator, route, [?NODE1, {put, Data, Quorum}]),
- [_] = rpc:call(Node, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, Quorum}]),
+ [_Data] = rpc:call(Node, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, Quorum}]),
N = replica_counts(Key, Nodes),
ok = rpc:call(Node, kai_coordinator, route, [?NODE1, {delete, #data{key=Key}, Quorum}]),
@@ -95,10 +95,10 @@
replica_counts(Key, [Node|Rest], Acc) ->
{ok, Bucket} = rpc:call(Node, kai_hash, find_bucket, [Key]),
case rpc:call(Node, kai_store, get, [#data{key=Key, bucket=Bucket}]) of
- Data when is_record(Data, data) ->
- replica_counts(Key, Rest, Acc + 1);
undefined ->
- replica_counts(Key, Rest, Acc)
+ replica_counts(Key, Rest, Acc);
+ ListOfData when is_list(ListOfData) ->
+ replica_counts(Key, Rest, Acc + length(ListOfData))
end.
get_concurrent_data(Conf) ->
@@ -125,8 +125,14 @@
flags = "0",
value = <<"value">>
},
- {error, ebusy} = rpc:call(Node1, kai_coordinator, route, [?NODE1, {put, Data, {2,2,2}}]).
+ ok = rpc:call(Node1, kai_coordinator, route, [?NODE1, {put, Data, {2,2,2}}]),
+ %% Node1 has one version, Node2 has two version,
+ %% one from prep_concurrent_data, another from kai_coordinator:put
+ [_,_] = rpc:call(Node1, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, {2,2,2}}]),
+ ?assertEqual(3, replica_counts(Key, [Node1, Node2])),
+ ok.
+
prep_concurrent_data(Key, Nodes) when is_list(Key) ->
Data = #data{
key = Key,
@@ -175,9 +181,15 @@
set_clock_ahead(Key, Node1),
- %% FIXME: This operation must be succeeded
- {error, ebusy} = rpc:call(Node2, kai_coordinator, route, [?NODE2, {put, Data, {2,2,2}}]).
+ ok = rpc:call(Node2, kai_coordinator, route, [?NODE2, {put, Data, {2,2,2}}]),
+ %% Node2 has one version, Node1 has two version,
+ %% one from prep_concurrent_data, another from kai_coordinator:put
+ [_,_] = rpc:call(Node1, kai_coordinator, route, [?NODE1, {get, #data{key=Key}, {2,2,2}}]),
+ ?assertEqual(3, replica_counts(Key, [Node1, Node2])),
+
+ ok.
+
set_clock_ahead(Key, Node) ->
SrcNode = rpc:call(Node, kai_config, get, [node]),
[Data] = rpc:call(Node, kai_coordinator, route, [SrcNode, {get, #data{key=Key}, {2,2,2}}]),
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:28:40
|
Revision: 160
http://kai.svn.sourceforge.net/kai/?rev=160&view=rev
Author: shino_shun
Date: 2009-06-23 03:28:38 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
Add: execute just a single test case
Modified Paths:
--------------
branches/shino_data_in_bag/Makefile
Modified: branches/shino_data_in_bag/Makefile
===================================================================
--- branches/shino_data_in_bag/Makefile 2009-06-23 03:28:20 UTC (rev 159)
+++ branches/shino_data_in_bag/Makefile 2009-06-23 03:28:38 UTC (rev 160)
@@ -42,6 +42,12 @@
-logdir test/log -cover test/kai.coverspec \
-I$(ROOT)/include -pa $(ROOT)/ebin
+test_single_case: test_compile
+ mkdir -p test/log
+ ${RUN_TEST_CMD} -suite $(SUITE) -case ${CASE}\
+ -logdir test/log -cover test/kai.coverspec \
+ -I$(ROOT)/include -pa $(ROOT)/ebin
+
docs:
erl -noshell -run edoc_run application "'kai'" \
'"."' '[{def,{vsn, "$(KAI_VSN)"}}]'
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:28:22
|
Revision: 159
http://kai.svn.sourceforge.net/kai/?rev=159&view=rev
Author: shino_shun
Date: 2009-06-23 03:28:20 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
patch set 2/N (N is yet unknown)
Change kai_store type from set to bag, in order to store
multiple conflicting data.
Consequently, the return type of thier 'get' becomes [data].
Change: kai_rpc_SUITE to follow kai_store:get's inteface change
Modified Paths:
--------------
branches/shino_data_in_bag/test/kai_rpc_SUITE.erl
Modified: branches/shino_data_in_bag/test/kai_rpc_SUITE.erl
===================================================================
--- branches/shino_data_in_bag/test/kai_rpc_SUITE.erl 2009-06-23 03:28:00 UTC (rev 158)
+++ branches/shino_data_in_bag/test/kai_rpc_SUITE.erl 2009-06-23 03:28:20 UTC (rev 159)
@@ -75,7 +75,7 @@
},
ok = rpc:call(Node1, kai_rpc, put, [?NODE2, ?NODE1, Data]),
- Data = rpc:call(Node1, kai_rpc, get, [?NODE2, ?NODE1, #data{key="key1", bucket=3}]),
+ [Data] = rpc:call(Node1, kai_rpc, get, [?NODE2, ?NODE1, #data{key="key1", bucket=3}]),
{ok, [Key]} = rpc:call(Node1, kai_rpc, list, [?NODE2, ?NODE1, 3]),
"key1" = Key#data.key,
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:28:00
|
Revision: 158
http://kai.svn.sourceforge.net/kai/?rev=158&view=rev
Author: shino_shun
Date: 2009-06-23 03:28:00 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
Fix: warning happened in stopping a not-started gen_server process
Modified Paths:
--------------
branches/shino_data_in_bag/test/kai_config_SUITE.erl
Modified: branches/shino_data_in_bag/test/kai_config_SUITE.erl
===================================================================
--- branches/shino_data_in_bag/test/kai_config_SUITE.erl 2009-06-23 03:27:39 UTC (rev 157)
+++ branches/shino_data_in_bag/test/kai_config_SUITE.erl 2009-06-23 03:28:00 UTC (rev 158)
@@ -48,6 +48,8 @@
kai_config:start_link(Args),
Conf.
+end_per_testcase(quorum_error, _Conf) ->
+ ok;
end_per_testcase(_TestCase, _Conf) ->
kai_config:stop().
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-23 03:27:41
|
Revision: 157
http://kai.svn.sourceforge.net/kai/?rev=157&view=rev
Author: shino_shun
Date: 2009-06-23 03:27:39 +0000 (Tue, 23 Jun 2009)
Log Message:
-----------
patch set 1/N (N is yet unknown)
Change kai_store type from set to bag, in order to store
multiple conflicting data.
Consequently, the return type of thier 'get' becomes [data].
Change: make kai_store_(ets|dets) bags
Fix: kai_store_SUITE always ran with ets
Add: kai_store_SUITE test cases to get conflicting data and delete all of them
Modified Paths:
--------------
branches/shino_data_in_bag/src/kai_store_dets.erl
branches/shino_data_in_bag/src/kai_store_ets.erl
branches/shino_data_in_bag/test/kai_store_SUITE.erl
Modified: branches/shino_data_in_bag/src/kai_store_dets.erl
===================================================================
--- branches/shino_data_in_bag/src/kai_store_dets.erl 2009-06-22 02:23:28 UTC (rev 156)
+++ branches/shino_data_in_bag/src/kai_store_dets.erl 2009-06-23 03:27:39 UTC (rev 157)
@@ -33,7 +33,7 @@
fun(I) ->
Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(I)),
File = Dir ++ "/" ++ integer_to_list(I),
- case dets:open_file(Name, [{type, set}, {keypos, 2}, {file, File}]) of
+ case dets:open_file(Name, [{type, bag}, {keypos, 2}, {file, File}]) of
{ok, Table} -> {I, Table};
{error, Reason} -> ?info(Reason),
exit(Reason)
@@ -79,35 +79,38 @@
do_get(#data{key=Key, bucket=Bucket} = _Data, State) ->
Table = bucket_to_table(Bucket, State),
case dets:lookup(Table, Key) of
- [Data] -> {reply, Data, State};
- _ -> {reply, undefined, State}
+ [] -> {reply, undefined, State};
+ StoredDataList -> {reply, StoredDataList, State}
end.
do_put(Data, State) when is_record(Data, data) ->
Table = bucket_to_table(Data#data.bucket, State),
- case dets:lookup(Table, Data#data.key) of
- [StoredData] ->
- case vclock:descends(Data#data.vector_clocks, StoredData#data.vector_clocks) of
- true -> insert_and_reply(Data, Table, State);
- _ -> {reply, {error, "stale or concurrent state found in kai_store"}, State}
- end;
- _ -> insert_and_reply(Data, Table, State)
- end.
+ insert_and_remove(Table, Data, dets:lookup(Table, Data#data.key)),
+ {reply, ok, State}.
-insert_and_reply(Data, Table, State) ->
+insert_and_remove(Table, Data, StoredDataList) ->
dets:insert(Table, Data),
- dets:sync(Table),
- {reply, ok, State}.
+ remove_descend_data(Table, Data#data.vector_clocks, StoredDataList),
+ dets:sync(Table).
+remove_descend_data(_Table, _Vc, []) ->
+ ok;
+remove_descend_data(Table, Vc, [StoredData|Rest]) ->
+ case vclock:descends(Vc, StoredData#data.vector_clocks) of
+ true -> dets:delete_object(Table, StoredData);
+ _ -> nop
+ end,
+ remove_descend_data(Table, Vc, Rest).
+
do_delete(#data{key=Key, bucket=Bucket} = _Data, State) ->
Table = bucket_to_table(Bucket, State),
case dets:lookup(Table, Key) of
- [_Data2] ->
+ [] ->
+ {reply, undefined, State};
+ _StoredDataList ->
dets:delete(Table, Key),
dets:sync(Table),
- {reply, ok, State};
- _ ->
- {reply, undefined, State}
+ {reply, ok, State}
end.
info(Name, State) ->
Modified: branches/shino_data_in_bag/src/kai_store_ets.erl
===================================================================
--- branches/shino_data_in_bag/src/kai_store_ets.erl 2009-06-22 02:23:28 UTC (rev 156)
+++ branches/shino_data_in_bag/src/kai_store_ets.erl 2009-06-23 03:27:39 UTC (rev 157)
@@ -25,7 +25,7 @@
gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []).
init(_Args) ->
- ets:new(?MODULE, [set, private, named_table, {keypos, 2}]),
+ ets:new(?MODULE, [bag, private, named_table, {keypos, 2}]),
{ok, []}.
terminate(_Reason, _State) ->
@@ -37,7 +37,7 @@
key = '$1',
bucket = Bucket,
last_modified = '$2',
- vector_clocks = '$3',
+ vector_clocks = '$3',
checksum = '$4',
flags = '_',
value = '_'
@@ -55,33 +55,37 @@
do_get(#data{key=Key} = _Data, State) ->
case ets:lookup(?MODULE, Key) of
- [Data] -> {reply, Data, State};
- _ -> {reply, undefined, State}
+ [] -> {reply, undefined, State};
+ StoredDataList -> {reply, StoredDataList, State}
end.
do_put(Data, State) when is_record(Data, data) ->
- case ets:lookup(?MODULE, Data#data.key) of
- [StoredData] ->
- case vclock:descends(Data#data.vector_clocks, StoredData#data.vector_clocks) of
- true -> insert_and_reply(Data, State);
- _ -> {reply, {error, "stale or concurrent state found in kai_store"}, State}
- end;
- _ -> insert_and_reply(Data, State)
- end.
+ insert_and_remove(Data, ets:lookup(?MODULE, Data#data.key)),
+ {reply, ok, State}.
-insert_and_reply(Data, State) ->
+insert_and_remove(Data, StoredDataList) ->
ets:insert(?MODULE, Data),
- {reply, ok, State}.
+ remove_descend_data(Data#data.vector_clocks, StoredDataList).
+remove_descend_data(_Vc, []) ->
+ ok;
+remove_descend_data(Vc, [StoredData|Rest]) ->
+ case vclock:descends(Vc, StoredData#data.vector_clocks) of
+ true -> ets:delete_object(?MODULE, StoredData);
+ _ -> nop
+ end,
+ remove_descend_data(Vc, Rest).
+
do_delete(#data{key=Key} = _Data, State) ->
case ets:lookup(?MODULE, Key) of
- [_Data2] ->
+ [] ->
+ {reply, undefined, State};
+ _StoredDataList ->
ets:delete(?MODULE, Key),
- {reply, ok, State};
- _ ->
- {reply, undefined, State}
+ {reply, ok, State}
end.
+
info(Name, State) ->
Value =
case Name of
Modified: branches/shino_data_in_bag/test/kai_store_SUITE.erl
===================================================================
--- branches/shino_data_in_bag/test/kai_store_SUITE.erl 2009-06-22 02:23:28 UTC (rev 156)
+++ branches/shino_data_in_bag/test/kai_store_SUITE.erl 2009-06-23 03:27:39 UTC (rev 157)
@@ -31,22 +31,22 @@
{dets_tables, 2}]).
sequences() ->
- [{ ets, [ ets_crud, ets_conflict, ets_info, ets_perf]},
- {dets, [dets_crud, dets_conflict, dets_info, dets_perf]}].
+ [{ ets, [ ets_crud, ets_all_delete, ets_conflict, ets_info, ets_perf]},
+ {dets, [dets_crud, dets_all_delete,dets_conflict, dets_info, dets_perf]}].
all() -> [{sequence, ets}, {sequence, dets}].
init_per_testcase(TestCase, Conf) ->
case atom_to_list(TestCase) of
- ["dets"|_] -> init(Conf, ?DETS_ARGS);
- _ -> init(Conf, ?ETS_ARGS)
+ "dets" ++ _ -> init(Conf, ?DETS_ARGS);
+ _ -> init(Conf, ?ETS_ARGS)
end.
init(Conf, Args) ->
kai_config:start_link(Args),
kai_store:start_link(),
Conf.
-
+
end_per_testcase(_TestCase, _Conf) ->
kai_store:stop(),
kai_config:stop(),
@@ -67,7 +67,7 @@
},
ok = kai_store:put(Data),
- Data = kai_store:get(#data{key="key1", bucket=3}),
+ [Data] = kai_store:get(#data{key="key1", bucket=3}),
undefined = kai_store:get(#data{key="key2", bucket=1}),
{ok, [Key]} = kai_store:list(3),
@@ -84,7 +84,7 @@
},
ok = kai_store:put(Data2),
- Data2 = kai_store:get(#data{key="key1", bucket=3}),
+ [Data2] = kai_store:get(#data{key="key1", bucket=3}),
ok = kai_store:delete(#data{key="key1", bucket=3}),
@@ -95,23 +95,54 @@
dets_conflict(_Conf) -> conflict().
conflict() ->
+ InitialVc = vclock:increment(node1, vclock:fresh()),
Data = #data{
key = "key1",
bucket = 3,
last_modified = now(),
checksum = erlang:md5(<<"value1">>),
flags = "0",
+ vector_clocks = InitialVc,
+ value = <<"value1">>
+ },
+ ok = kai_store:put(Data),
+
+ ConflictingVc = vclock:increment(node2, vclock:fresh()),
+ ok = kai_store:put(Data#data{ vector_clocks = ConflictingVc }),
+ ?assertEqual(2, length(kai_store:get(Data))),
+
+ AscendingVc = vclock:increment(node2, ConflictingVc),
+ ok = kai_store:put(Data#data{ vector_clocks = AscendingVc }),
+ ?assertEqual(2, length(kai_store:get(Data))),
+
+ MergedVc = vclock:merge([InitialVc, AscendingVc]),
+ ok = kai_store:put(Data#data{ vector_clocks = MergedVc }),
+ ?assertEqual(1, length(kai_store:get(Data))),
+ ok.
+
+ets_all_delete(_Conf) -> all_delete().
+dets_all_delete(_Conf) -> all_delete().
+
+all_delete() ->
+ Data = #data{
+ key = "key1",
+ bucket = 3,
+ last_modified = now(),
+ checksum = erlang:md5(<<"value1">>),
+ flags = "0",
vector_clocks = vclock:increment(node1, vclock:fresh()),
value = <<"value1">>
},
ok = kai_store:put(Data),
- {error, _Reason} =
- kai_store:put(Data#data{
- %% Conflicting VectorClocks
- vector_clocks = vclock:increment(node2, vclock:fresh())
- }).
+ ok = kai_store:put(
+ Data#data{vector_clocks = vclock:increment(node2, vclock:fresh())}),
+ ?assertEqual(2, length(kai_store:get(Data))),
+ ok = kai_store:delete(Data),
+ undefined = kai_store:get(Data),
+ ok.
+
ets_info(_Conf) -> info().
dets_info(_Conf) -> info().
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-22 03:22:34
|
Revision: 155
http://kai.svn.sourceforge.net/kai/?rev=155&view=rev
Author: shino_shun
Date: 2009-06-22 02:02:46 +0000 (Mon, 22 Jun 2009)
Log Message:
-----------
Create branch shino_data_in_bag
Added Paths:
-----------
branches/shino_data_in_bag/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-06-22 02:23:35
|
Revision: 156
http://kai.svn.sourceforge.net/kai/?rev=156&view=rev
Author: shino_shun
Date: 2009-06-22 02:23:28 +0000 (Mon, 22 Jun 2009)
Log Message:
-----------
add ebin/kai.app to git ignore list
Modified Paths:
--------------
branches/shino_data_in_bag/ebin/.gitignore
Modified: branches/shino_data_in_bag/ebin/.gitignore
===================================================================
--- branches/shino_data_in_bag/ebin/.gitignore 2009-06-22 02:02:46 UTC (rev 155)
+++ branches/shino_data_in_bag/ebin/.gitignore 2009-06-22 02:23:28 UTC (rev 156)
@@ -1 +1,2 @@
*.beam
+kai.app
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-06-08 14:23:37
|
Revision: 154
http://kai.svn.sourceforge.net/kai/?rev=154&view=rev
Author: takemaru
Date: 2009-06-08 14:23:35 +0000 (Mon, 08 Jun 2009)
Log Message:
-----------
* branches/takemaru_bulk_transport
- Made a new branch for fast data synchronization.
* branches/takemaru_*
- Deleted old branches.
Added Paths:
-----------
branches/takemaru_bulk_transport/
Removed Paths:
-------------
branches/takemaru_connection_pooling/
branches/takemaru_refactoring/
branches/takemaru_stats/
branches/takemaru_store_dets/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-06-03 14:05:16
|
Revision: 152
http://kai.svn.sourceforge.net/kai/?rev=152&view=rev
Author: takemaru
Date: 2009-06-03 14:05:15 +0000 (Wed, 03 Jun 2009)
Log Message:
-----------
* src/kai_store_ets.erl
- Removed a meaningless function.
Modified Paths:
--------------
branches/takemaru_refactoring/src/kai_store_ets.erl
Modified: branches/takemaru_refactoring/src/kai_store_ets.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-03 13:27:39 UTC (rev 151)
+++ branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-03 14:05:15 UTC (rev 152)
@@ -67,8 +67,7 @@
_ -> {reply, {error, "stale or concurrent state found in kai_store"}, State}
end;
_ -> insert_and_reply(Data, State)
- end;
-do_put(a, b) -> ok.
+ end.
insert_and_reply(Data, State) ->
ets:insert(?MODULE, Data),
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-06-02 15:06:35
|
Revision: 150
http://kai.svn.sourceforge.net/kai/?rev=150&view=rev
Author: takemaru
Date: 2009-06-02 15:06:32 +0000 (Tue, 02 Jun 2009)
Log Message:
-----------
* test/kai_connection_SUITE.erl
- Added tests for the LRU algorithm.
* test/kai_tcp_server_SUITE.erl
- Decreased the number of connections used in multiple_connections test,
because it would be short of TCP ports in some environments.
* src/kai_store_ets.erl, src/kai_store_dets.erl
- Replaced successful tags like {T, V} by {ok, V}
* src/kai_sync.erl
- Uses record for server status.
Modified Paths:
--------------
branches/takemaru_refactoring/src/kai_coordinator.erl
branches/takemaru_refactoring/src/kai_membership.erl
branches/takemaru_refactoring/src/kai_rpc.erl
branches/takemaru_refactoring/src/kai_store_dets.erl
branches/takemaru_refactoring/src/kai_store_ets.erl
branches/takemaru_refactoring/src/kai_sync.erl
branches/takemaru_refactoring/src/kai_tcp_server.erl
branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl
branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl
branches/takemaru_refactoring/src/kai_tcp_server_sup.erl
branches/takemaru_refactoring/src/kai_version.erl
branches/takemaru_refactoring/test/kai_connection_SUITE.erl
branches/takemaru_refactoring/test/kai_membership_SUITE.erl
branches/takemaru_refactoring/test/kai_rpc_SUITE.erl
branches/takemaru_refactoring/test/kai_store_SUITE.erl
branches/takemaru_refactoring/test/kai_sync_SUITE.erl
branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl
branches/takemaru_refactoring/test/kai_version_SUITE.erl
Modified: branches/takemaru_refactoring/src/kai_coordinator.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_coordinator.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_coordinator.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_coordinator).
@@ -19,6 +19,28 @@
-define(SERVER, ?MODULE).
+route({_Type, Data} = Request) ->
+ Ref = make_ref(),
+ %% Though don't know the reason, application exits abnormally if it doesn't
+ %% spawn the process
+ spawn(?MODULE, start_route, [Request, self(), Ref]),
+ receive
+ {Ref, Result} -> Result
+ after ?TIMEOUT ->
+ ?warning(io_lib:format("route(~p): timeout", [Data#data.key])),
+ []
+ end.
+
+start_route({_Type, Data} = Request, Pid, Ref) ->
+ LocalNode = kai_config:get(node),
+ {ok, Nodes} = kai_hash:find_nodes(Data#data.key),
+ Results =
+ case lists:member(LocalNode, Nodes) of
+ true -> dispatch(Request);
+ _ -> do_route(Request, Nodes)
+ end,
+ Pid ! {Ref, Results}.
+
dispatch({Type, Data} = _Request) ->
case Type of
get -> coordinate_get(Data);
@@ -30,7 +52,7 @@
do_route(_Request, []) ->
{error, ebusy};
do_route({_Type, Data} = Request, [Node|RestNodes]) ->
- % TODO: introduce TTL, in order to avoid infinite loop
+ %% TODO: introduce TTL, in order to avoid infinite loop
case kai_rpc:route(Node, Request) of
{error, Reason} ->
?warning(io_lib:format("do_route(~p, ~p): ~p",
@@ -40,35 +62,13 @@
Results
end.
-start_route({_Type, Data} = Request, Pid, Ref) ->
- LocalNode = kai_config:get(node),
- {ok, Nodes} = kai_hash:find_nodes(Data#data.key),
- Results =
- case lists:member(LocalNode, Nodes) of
- true -> dispatch(Request);
- _ -> do_route(Request, Nodes)
- end,
- Pid ! {Ref, Results}.
-
-route({_Type, Data} = Request) ->
- Ref = make_ref(),
- % Though don't know the reason, application exits abnormally if it doesn't
- % spawn the process
- spawn(?MODULE, start_route, [Request, self(), Ref]),
- receive
- {Ref, Result} -> Result
- after ?TIMEOUT ->
- ?warning(io_lib:format("route(~p): timeout", [Data#data.key])),
- []
- end.
-
coordinate_get(Data) ->
{ok, Bucket} = kai_hash:find_bucket(Data#data.key),
{ok, Nodes } = kai_hash:find_nodes(Bucket),
Data2 = Data#data{bucket=Bucket},
Ref = make_ref(),
lists:foreach(
- fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, % Don't link
+ fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, %% Don't link
Nodes
),
{N,R,_W} = kai_config:get(quorum),
Modified: branches/takemaru_refactoring/src/kai_membership.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_membership.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_membership.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_membership).
-behaviour(gen_fsm).
@@ -22,13 +22,15 @@
-include("kai.hrl").
+-record(state, {node}).
+
-define(SERVER, ?MODULE).
start_link() ->
gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
init(_Args) ->
- {ok, ready, [], ?TIMER}.
+ {ok, ready, #state{node = kai_config:get(node)}, ?TIMER}.
terminate(_Reason, _StateName, _StateData) ->
ok.
@@ -44,45 +46,40 @@
ping_nodes(Nodes, AvailableNodes, [Node|DownNodes])
end.
-retrieve_node_list(Node) ->
+retrieve_node_list(Node, State) ->
case kai_rpc:node_list(Node) of
{ok, RemoteNodeList} ->
{ok, LocalNodeList} = kai_hash:node_list(),
NewNodes = RemoteNodeList -- LocalNodeList,
OldNodes = LocalNodeList -- RemoteNodeList,
Nodes = NewNodes ++ OldNodes,
- LocalNode = kai_config:get(node),
- ping_nodes(Nodes -- [LocalNode], [], []);
+ ping_nodes(Nodes -- [State#state.node], [], []);
{error, Reason} ->
?warning(io_lib:format("retrieve_node_list/1: ~p", [{error, Reason}])),
{[], [Node]}
end.
-sync_buckets([], _LocalNode) ->
+sync_buckets([]) ->
ok;
-sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets], LocalNode) ->
+sync_buckets([{Bucket, NewReplica, OldReplica}|ReplacedBuckets]) ->
case {NewReplica, OldReplica} of
{NewReplica, undefined } -> kai_sync:update_bucket(Bucket);
{undefined, OldReplica} -> kai_sync:delete_bucket(Bucket);
_ -> nop
end,
- sync_buckets(ReplacedBuckets, LocalNode).
+ sync_buckets(ReplacedBuckets).
-sync_buckets(ReplacedBuckets) ->
- LocalNode = kai_config:get(node),
- sync_buckets(ReplacedBuckets, LocalNode).
-
-do_check_node({Address, Port}) ->
- {AvailableNodes, DownNodes} = retrieve_node_list({Address, Port}),
+do_check_node({IpAddr, Port}, State) ->
+ {AvailableNodes, DownNodes} = retrieve_node_list({IpAddr, Port}, State),
{ok, ReplacedBuckets} = kai_hash:update_nodes(AvailableNodes, DownNodes),
sync_buckets(ReplacedBuckets).
ready({check_node, Node}, State) ->
- do_check_node(Node),
+ do_check_node(Node, State),
{next_state, ready, State, ?TIMER};
ready(timeout, State) ->
case kai_hash:choose_node_randomly() of
- {node, Node} -> do_check_node(Node);
+ {node, Node} -> do_check_node(Node, State);
_ -> nop
end,
{next_state, ready, State, ?TIMER}.
Modified: branches/takemaru_refactoring/src/kai_rpc.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_rpc.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_rpc.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_rpc).
-behaviour(kai_tcp_server).
@@ -24,6 +24,8 @@
-include("kai.hrl").
+-record(state, {node_info}).
+
start_link() ->
kai_tcp_server:start_link(
{local, ?MODULE},
@@ -38,7 +40,8 @@
stop() -> kai_tcp_server:stop(?MODULE).
-init(_Args) -> {ok, {}}.
+init(_Args) ->
+ {ok, #state{node_info = kai_config:node_info()}}.
handle_call(Socket, Data, State) ->
dispatch(Socket, binary_to_term(Data), State).
@@ -47,7 +50,7 @@
reply(ok, State);
dispatch(_Socket, node_info, State) ->
- reply(kai_config:node_info(), State);
+ reply(State#state.node_info, State);
dispatch(_Socket, node_list, State) ->
reply(kai_hash:node_list(), State);
@@ -85,9 +88,9 @@
{error, Reason} ->
{error, Reason}
- % Don't place Other alternative here. This is to avoid to catch event
- % messages, '$gen_event' or something like that. Remember that this
- % function can be called from gen_fsm/gen_event.
+ %% Don't place Other alternative here. This is to avoid to catch event
+ %% messages, '$gen_event' or something like that. Remember that this
+ %% function can be called from gen_fsm/gen_event.
after ?TIMEOUT ->
{error, timeout}
@@ -126,7 +129,7 @@
end.
is_local_node(Node) ->
- LocalNode = kai_config:get(node),
+ LocalNode = kai_config:get(node), %% TODO: Don't call kai_config:get/1 every time.
Node =:= LocalNode.
ok(Node) ->
Modified: branches/takemaru_refactoring/src/kai_store_dets.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_store_dets.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_store_dets.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -73,8 +73,8 @@
vector_clocks = '$3',
checksum = '$4'
}}],
- ListOfData = dets:select(Table, [{Head, Cond, Body}]),
- {reply, {list_of_data, ListOfData}, State}.
+ KeyList = dets:select(Table, [{Head, Cond, Body}]),
+ {reply, {ok, KeyList}, State}.
do_get(#data{key=Key, bucket=Bucket} = _Data, State) ->
Table = bucket_to_table(Bucket, State),
Modified: branches/takemaru_refactoring/src/kai_store_ets.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_store_ets.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -50,8 +50,8 @@
vector_clocks = '$3',
checksum = '$4'
}}],
- ListOfData = ets:select(?MODULE, [{Head, Cond, Body}]),
- {reply, {list_of_data, ListOfData}, State}.
+ KeyList = ets:select(?MODULE, [{Head, Cond, Body}]),
+ {reply, {ok, KeyList}, State}.
do_get(#data{key=Key} = _Data, State) ->
case ets:lookup(?MODULE, Key) of
Modified: branches/takemaru_refactoring/src/kai_sync.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_sync.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_sync.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_sync).
-behaviour(gen_fsm).
@@ -22,13 +22,15 @@
-include("kai.hrl").
+-record(state, {node}).
+
-define(SERVER, ?MODULE).
start_link() ->
gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
init(_Args) ->
- {ok, ready, [], ?TIMER}.
+ {ok, ready, #state{node = kai_config:get(node)}, ?TIMER}.
terminate(_Reason, _StateName, _StateData) ->
ok.
@@ -56,17 +58,15 @@
{error, enodata};
do_update_bucket(Bucket, [Node|Rest]) ->
case kai_rpc:list(Node, Bucket) of
- {list_of_data, ListOfData} ->
- retrieve_data(Node, ListOfData);
+ {ok, KeyList} ->
+ retrieve_data(Node, KeyList);
{error, Reason} ->
?warning(io_lib:format("do_update_bucket/2: ~p", [{error, Reason}])),
do_update_bucket(Bucket, Rest)
- end.
-
-do_update_bucket(Bucket) ->
+ end;
+do_update_bucket(Bucket, State) when is_record(State, state) ->
{ok, Nodes} = kai_hash:find_nodes(Bucket),
- LocalNode = kai_config:get(node),
- do_update_bucket(Bucket, Nodes -- [LocalNode]).
+ do_update_bucket(Bucket, Nodes -- [State#state.node]).
do_delete_bucket([]) ->
ok;
@@ -74,19 +74,19 @@
kai_store:delete(Metadata),
do_delete_bucket(Rest);
do_delete_bucket(Bucket) ->
- {list_of_data, ListOfData} = kai_store:list(Bucket),
- do_delete_bucket(ListOfData).
+ {ok, KeyList} = kai_store:list(Bucket),
+ do_delete_bucket(KeyList).
ready({update_bucket, Bucket}, State) ->
- do_update_bucket(Bucket),
+ do_update_bucket(Bucket, State),
{next_state, ready, State, ?TIMER};
ready({delete_bucket, Bucket}, State) ->
do_delete_bucket(Bucket),
{next_state, ready, State, ?TIMER};
ready(timeout, State) ->
case kai_hash:choose_bucket_randomly() of
- {ok, Bucket} -> do_update_bucket(Bucket);
- _ -> nop
+ {ok, Bucket} -> do_update_bucket(Bucket, State);
+ _ -> nop
end,
{next_state, ready, State, ?TIMER}.
Modified: branches/takemaru_refactoring/src/kai_tcp_server.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_tcp_server.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_tcp_server.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_tcp_server).
@@ -19,11 +19,11 @@
-include("kai.hrl").
-% Behaviour Callbacks
+%% Behaviour Callbacks
behaviour_info(callbacks) -> [{init, 1}, {handle_call, 3}];
behaviour_info(_Other) -> undefined.
-% External APIs
+%% External APIs
start_link(Mod) -> start_link(Mod, []).
start_link(Mod, Args) -> start_link(Mod, Args, #tcp_server_option{}).
start_link(Mod, Args, Option) ->
@@ -40,4 +40,3 @@
kai_tcp_server_monitor:info(
kai_tcp_server_sup:build_monitor_name(Name), Key
).
-
Modified: branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_tcp_server_acceptor.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_tcp_server_acceptor).
@@ -17,7 +17,7 @@
-include("kai.hrl").
-% External APIs
+%% External APIs
start_link({Dest, Name}, ListenSocket, State, MonitorName, Mod, Option) ->
{ok, Pid} = proc_lib:start_link(
?MODULE, init,
@@ -29,13 +29,13 @@
end,
{ok, Pid}.
-% Callbacks
+%% Callbacks
init(Parent, ListenSocket, State, MonitorName, Mod, Option) ->
proc_lib:init_ack(Parent, {ok, self()}),
kai_tcp_server_monitor:register(MonitorName, self()),
accept(ListenSocket, State, MonitorName, Mod, Option).
-% Internal Functions
+%% Internal Functions
accept(ListenSocket, State, MonitorName, Mod, Option) ->
case gen_tcp:accept(
ListenSocket, Option#tcp_server_option.accept_timeout
Modified: branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_tcp_server_monitor.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_tcp_server_monitor).
-behaviour(gen_server).
@@ -27,7 +27,7 @@
-include("kai.hrl").
-% External APIs
+%% External APIs
start_link(Name) ->
gen_server:start_link(Name, ?MODULE, [], []).
@@ -46,7 +46,7 @@
info(ServerRef, Key) ->
gen_server:call(ServerRef, {info, Key}).
-% Callbacks
+%% Callbacks
init(_Args) ->
{ok, {_MonitorRefs = [], _Pids = []}}.
@@ -90,7 +90,7 @@
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-% Internal Functions
+%% Internal Functions
state_to_info({_MonitorRefs, Pids}, curr_connections) ->
length(Pids);
Modified: branches/takemaru_refactoring/src/kai_tcp_server_sup.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_tcp_server_sup.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_tcp_server_sup.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -1,14 +1,14 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
-module(kai_tcp_server_sup).
-behaviour(supervisor).
@@ -19,7 +19,7 @@
-include("kai.hrl").
-% External APIs
+%% External APIs
start_link(Name, Mod, Args, Option) ->
supervisor:start_link(Name, ?MODULE, [Name, Mod, Args, Option]).
@@ -31,15 +31,15 @@
_ -> not_started
end.
-% Callbacks
+%% Callbacks
init([Name, Mod, Args, Option]) ->
case Mod:init(Args) of
{ok, State} -> listen(State, Name, Mod, Option);
{stop, Reason} -> Reason;
- Other -> Other % 'ignore' is contained.
+ Other -> Other %% Includes 'ignore'.
end.
-% Internal Functions
+%% Internal Functions
listen(State, Name, Mod, Option) ->
case gen_tcp:listen(
Option#tcp_server_option.port,
Modified: branches/takemaru_refactoring/src/kai_version.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_version.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/src/kai_version.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -59,25 +59,25 @@
end
end.
-order(ListOfData, State) when is_list(ListOfData) ->
- OrderedData = do_order(ListOfData, []),
+order(DataList, State) when is_list(DataList) ->
+ OrderedData = do_order(DataList, []),
{reply, OrderedData, State};
order(_Other, State) ->
{reply, undefined, State}.
%% TODO: raise error if length > 15(=2#1111)
-cas_unique(ListOfData) when length(ListOfData) > 2#1111 ->
- {error, lists:flatten(io_lib:format("data list is too long (~p)", [length(ListOfData)]))};
-cas_unique(ListOfData) ->
- Length = length(ListOfData),
+cas_unique(DataList) when length(DataList) > 2#1111 ->
+ {error, lists:flatten(io_lib:format("data list is too long (~p)", [length(DataList)]))};
+cas_unique(DataList) ->
+ Length = length(DataList),
EachBits = trunc(60/Length),
%% TODO: make 128 contant 2008/11/06 by shino
RestBits = 128- EachBits,
cas_unique(lists:map(fun (Data) ->
<<CheckSum:EachBits, _:RestBits>> = Data#data.checksum,
CheckSum
- end, ListOfData),
+ end, DataList),
EachBits,
Length,
4).
@@ -94,8 +94,8 @@
{stop, normal, stopped, State};
handle_call({update, Data}, _From, State) ->
update(Data, State);
-handle_call({order, ListOfData}, _From, State) ->
- order(ListOfData, State).
+handle_call({order, DataList}, _From, State) ->
+ order(DataList, State).
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
@@ -107,5 +107,5 @@
gen_server:call(?SERVER, stop).
update(Data) ->
gen_server:call(?SERVER, {update, Data}).
-order(ListOfData) ->
- gen_server:call(?SERVER, {order, ListOfData}).
+order(DataList) ->
+ gen_server:call(?SERVER, {order, DataList}).
Modified: branches/takemaru_refactoring/test/kai_connection_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -17,11 +17,14 @@
-include("kai.hrl").
-include("kai_test.hrl").
+-record(connection, {node, available, socket}). %% Defined in src/kai_connection.erl
+
-define(UNKNOWN, {{127,0,0,1}, 1}).
-define(MAX_CONNS, 4).
sequences() ->
- [{seq, [lease, return, close, unknown, multiple_processes, max_connections]}].
+ [{seq, [lease, return, close, unknown, multiple_processes, max_connections,
+ lru]}].
all() -> [{sequence, seq}].
@@ -107,17 +110,31 @@
),
%% The number of connections can be greater than MAX_CONNS when all
- %% connections are in use
+ %% connections are in use.
{ok, Conns} = kai_connection:connections(),
?assertEqual(?MAX_CONNS + 1, length(Conns)),
%% The number of connections equals to MAX_CONNS, because a connection has
- %% been returned
+ %% been returned.
[Socket|_] = Sockets,
ok = kai_connection:return(Socket),
{ok, Conns2} = kai_connection:connections(),
?assertEqual(?MAX_CONNS, length(Conns2)).
+lru(_Conf) ->
+ {ok, _Socket} = kai_connection:lease(?NODE2, self()),
+ {ok, Socket2} = kai_connection:lease(?NODE2, self()),
+ {ok, Socket3} = kai_connection:lease(?NODE2, self()),
+
+ %% Here, the connections are ordered like 3, 2, and 1.
+ {ok, [Conn|_]} = kai_connection:connections(),
+ Socket3 = Conn#connection.socket,
+
+ %% Socket2 is moved to the head.
+ ok = kai_connection:return(Socket2),
+ {ok, [Conn2|_]} = kai_connection:connections(),
+ Socket2 = Conn2#connection.socket.
+
echo_start(Port) ->
{ok, ListenSocket} =
gen_tcp:listen(Port, [binary, {packet, 4}, {reuseaddr, true}]),
Modified: branches/takemaru_refactoring/test/kai_membership_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -230,7 +230,7 @@
count(_Node, 0, DataNum) ->
DataNum;
count(Node, Bucket, DataNum) ->
- {list_of_data, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]),
+ {ok, KeyList} = rpc:call(Node, kai_store, list, [Bucket-1]),
count(Node, Bucket-1, DataNum + length(KeyList)).
%% TODO: Check whether data is synchronized
Modified: branches/takemaru_refactoring/test/kai_rpc_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -77,11 +77,8 @@
Data = rpc:call(Node1, kai_rpc, get, [?NODE2, #data{key="key1", bucket=3}]),
- {list_of_data, [Data2]} = rpc:call(Node1, kai_rpc, list, [?NODE2, 3]),
- ?assertEqual(
- "key1",
- Data2#data.key
- ),
+ {ok, [Key]} = rpc:call(Node1, kai_rpc, list, [?NODE2, 3]),
+ "key1" = Key#data.key,
ok = rpc:call(Node1, kai_rpc, delete, [?NODE2, #data{key="key1", bucket=3}]).
Modified: branches/takemaru_refactoring/test/kai_store_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -70,9 +70,8 @@
Data = kai_store:get(#data{key="key1", bucket=3}),
undefined = kai_store:get(#data{key="key2", bucket=1}),
- {list_of_data, KeyList} = kai_store:list(3),
- ?assertEqual(1, length(KeyList)),
- ?assert(lists:keymember("key1", 2, KeyList)),
+ {ok, [Key]} = kai_store:list(3),
+ "key1" = Key#data.key,
Data2 = #data{
key = "key1",
Modified: branches/takemaru_refactoring/test/kai_sync_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_sync_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_sync_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -46,16 +46,14 @@
{ok, _ReplacedBuckets} =
rpc:call(Node1, kai_hash, update_nodes, [[{?NODE2, ?INFO}], []]),
- {list_of_data, KeyList} = rpc:call(Node1, kai_store, list, [3]),
- ?assertEqual([], KeyList),
+ {ok, []} = rpc:call(Node1, kai_store, list, [3]),
%% Node1 tries to synchronize Bucket3 with Node2
ok = rpc:call(Node1, kai_sync, update_bucket, [3]),
wait(),
- {list_of_data, KeyList2} = rpc:call(Node1, kai_store, list, [3]),
- ?assertEqual(1, length(KeyList2)).
+ {ok, [_]} = rpc:call(Node1, kai_store, list, [3]).
sync_by_timeout(Conf) ->
Node1 = ?config(node1, Conf),
@@ -74,8 +72,7 @@
{ok, _ReplacedBuckets} =
rpc:call(Node1, kai_hash, update_nodes, [[{?NODE2, ?INFO}], []]),
- {list_of_data, KeyList} = rpc:call(Node1, kai_store, list, [3]),
- ?assertEqual([], KeyList),
+ {ok, []} = rpc:call(Node1, kai_store, list, [3]),
%% Node1 tries to synchronize Bucket3 every TIMER sec.
check(Node1, 16).
@@ -84,7 +81,7 @@
ct:fail(never_synchronized);
check(Node, I) ->
timer:sleep(?TIMER),
- {list_of_data, KeyList} = rpc:call(Node, kai_store, list, [3]),
+ {ok, KeyList} = rpc:call(Node, kai_store, list, [3]),
case KeyList of
[_] -> ok;
[] -> check(Node, I-1)
Modified: branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_tcp_server_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -67,7 +67,7 @@
lists:foreach(fun (_N) ->
{ok, Socket} = connect_to_echo_server(),
gen_tcp:close(Socket)
- end, lists:seq(1, 10000)).
+ end, lists:seq(1, 1024)).
connection_counter(_Conf) ->
Sockets = lists:map(fun (_N) ->
Modified: branches/takemaru_refactoring/test/kai_version_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-06-01 12:35:46 UTC (rev 149)
+++ branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-06-02 15:06:32 UTC (rev 150)
@@ -50,22 +50,20 @@
vector_clocks = VClock1
},
- %% trivial case
+ %% Trivial case
?assertEqual(1, length(kai_version:order([Data1]))),
- %% two concurrent data
+ %% Two concurrent data
Data2 = Data1#data{
vector_clocks = vclock:increment(otherNode, vclock:fresh())
},
- ListOfData12 = kai_version:order([Data1, Data2]),
- ?assertEqual(2, length(ListOfData12)),
+ [_,_] = kai_version:order([Data1, Data2]),
- %% one data is dropped
+ %% One data is dropped
Data3 = Data1#data{
vector_clocks = vclock:increment(otherNode2, Data1#data.vector_clocks)
},
- ListOfData23 = kai_version:order([Data1, Data2, Data3]),
- ?assertEqual(2, length(ListOfData23)).
+ [_,_] = kai_version:order([Data1, Data2, Data3]).
cas_unique1(_Conf) ->
Data1 = #data{
@@ -88,21 +86,21 @@
cas_unique7(_Conf) ->
%% trunc(60/7) = 8
- ListOfData = lists:map(fun (I) ->
+ DataList = lists:map(fun (I) ->
#data{checksum = <<I:8, 0:120>>}
end,
lists:seq(1,7)),
- {ok, CasUnique} = kai_version:cas_unique(ListOfData),
+ {ok, CasUnique} = kai_version:cas_unique(DataList),
Expected = <<7:4, 1:8, 2:8, 3:8, 4:8, 5:8, 6:8, 7:8, 0:4>>,
?assertEqual(Expected, CasUnique).
cas_unique16(_Conf) ->
%% 16 exceeds 4bit range (2#1111 = 15)
- ListOfData = lists:map(fun (I) ->
+ DataList = lists:map(fun (I) ->
#data{checksum = <<I:4, 0:60>>}
end,
lists:seq(1,16)),
- {error, Reason} = kai_version:cas_unique(ListOfData),
+ {error, Reason} = kai_version:cas_unique(DataList),
?assert(string:str(Reason, "16") > 0).
all_bit_on(Bytes) ->
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-05-29 14:22:47
|
Revision: 148
http://kai.svn.sourceforge.net/kai/?rev=148&view=rev
Author: takemaru
Date: 2009-05-29 13:44:48 +0000 (Fri, 29 May 2009)
Log Message:
-----------
* src/kai_config.erl etc.
- Replaced parameters of n,r,w and number_of_tables by quorum and
dets_number_of_tables, respectively. The old parameters are still valid,
but will be removed in the future release.
- Added tests for the quorum conditions.
* kai.config and src/kai.app.src
- Changed default values of rpc_max_processes, memcache_max_processes,
max_connections, and dets_number_of_tables.
Modified Paths:
--------------
branches/takemaru_refactoring/include/kai_test.hrl
branches/takemaru_refactoring/kai.config
branches/takemaru_refactoring/src/kai.app.src
branches/takemaru_refactoring/src/kai.erl
branches/takemaru_refactoring/src/kai_config.erl
branches/takemaru_refactoring/src/kai_coordinator.erl
branches/takemaru_refactoring/src/kai_hash.erl
branches/takemaru_refactoring/src/kai_stat.erl
branches/takemaru_refactoring/src/kai_store_dets.erl
branches/takemaru_refactoring/test/kai.coverspec
branches/takemaru_refactoring/test/kai_config_SUITE.erl
branches/takemaru_refactoring/test/kai_connection_SUITE.erl
branches/takemaru_refactoring/test/kai_coordinator_SUITE.erl
branches/takemaru_refactoring/test/kai_hash_SUITE.erl
branches/takemaru_refactoring/test/kai_log_SUITE.erl
branches/takemaru_refactoring/test/kai_membership_SUITE.erl
branches/takemaru_refactoring/test/kai_memcache_SUITE.erl
branches/takemaru_refactoring/test/kai_rpc_SUITE.erl
branches/takemaru_refactoring/test/kai_stat_SUITE.erl
branches/takemaru_refactoring/test/kai_store_SUITE.erl
branches/takemaru_refactoring/test/kai_version_SUITE.erl
Modified: branches/takemaru_refactoring/include/kai_test.hrl
===================================================================
--- branches/takemaru_refactoring/include/kai_test.hrl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/include/kai_test.hrl 2009-05-29 13:44:48 UTC (rev 148)
@@ -63,6 +63,7 @@
end, [{hostname, "127.0.0.1"},
{rpc_port, ?PORT1 + I - 1},
{memcache_port, ?PORT1 + I + 199},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2}|Options]),
ok = rpc:call(Node, application, start, [kai]),
Modified: branches/takemaru_refactoring/kai.config
===================================================================
--- branches/takemaru_refactoring/kai.config 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/kai.config 2009-05-29 13:44:48 UTC (rev 148)
@@ -1,17 +1,15 @@
[{kai, [
- % {logfile, "kai.log"},
- % {hostname, "localhost"},
+ %{logfile, "kai.log"},
+ %{hostname, "localhost"},
{rpc_port, 11011},
- {rpc_max_processes, 30},
+ {rpc_max_processes, 60},
{memcache_port, 11211},
- {memcache_max_processes, 10},
- {max_connections, 32},
- {n, 3},
- {r, 2},
- {w, 2},
+ {memcache_max_processes, 20},
+ {max_connections, 64},
+ {quorum, {3,2,2}},
{number_of_buckets, 1024},
{number_of_virtual_nodes, 128},
- {store, ets},
+ {store, ets}
%{dets_dir, "/path/to/dir"},
- {number_of_tables, 256}
+ %{dets_number_of_tables, 128}
]}].
Modified: branches/takemaru_refactoring/src/kai.app.src
===================================================================
--- branches/takemaru_refactoring/src/kai.app.src 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai.app.src 2009-05-29 13:44:48 UTC (rev 148)
@@ -15,14 +15,13 @@
{start_phases, []},
{env, [
{rpc_port, 11011},
- {rpc_max_processes, 30},
+ {rpc_max_processes, 60},
{memcache_port, 11211},
- {memcache_max_processes, 10},
- {max_connections, 32},
- {n, 3}, {r, 2}, {w, 2},
+ {memcache_max_processes, 20},
+ {max_connections, 64},
+ {quorum, {3,2,2}},
{number_of_buckets, 1024},
{number_of_virtual_nodes, 128},
- {store, ets},
- {number_of_tables, 256}
+ {store, ets}
]}
]}.
Modified: branches/takemaru_refactoring/src/kai.erl
===================================================================
--- branches/takemaru_refactoring/src/kai.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -30,9 +30,9 @@
rpc_port, rpc_max_processes,
memcache_port, memcache_max_processes,
max_connections,
- n, r, w,
+ quorum,
number_of_buckets, number_of_virtual_nodes,
- store, dets_dir, number_of_tables
+ store, dets_dir, dets_number_of_tables
], []),
kai_sup:start_link(Args).
Modified: branches/takemaru_refactoring/src/kai_config.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_config.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai_config.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -35,6 +35,8 @@
Args
),
+ backward_compatibility(Args),
+
Hostname =
case proplists:get_value(hostname, Args) of
undefined -> {ok, H} = inet:gethostname(), H;
@@ -44,6 +46,19 @@
Port = proplists:get_value(rpc_port, Args),
ets:insert(config, {node, {Address, Port}}),
+ %% Check quorum conditions.
+ {N,R,W} = proplists:get_value(quorum, Args),
+ if
+ R + W > N -> ok;
+ true -> exit("Quorum condition must be R + W > N")
+ end,
+ if
+ W > N/2 -> ok;
+ true -> exit("Quorum condition must be W > N/2")
+ end,
+
+ %% The number of buckets is upgraded to 2^n that is greater than or equal
+ %% to the specified number.
NumberOfBuckets = proplists:get_value(number_of_buckets, Args),
Exponent = round( math:log(NumberOfBuckets) / math:log(2) ),
ets:insert(config, {number_of_buckets, trunc( math:pow(2, Exponent) )}),
@@ -54,6 +69,25 @@
ets:delete(config),
ok.
+backward_compatibility(Args) ->
+ case proplists:get_value(n, Args) of
+ undefined -> ok;
+ N ->
+ R = proplists:get_value(r, Args),
+ W = proplists:get_value(w, Args),
+ ets:insert(config, {quorum, {N, R, W}}),
+ ?warning("The parameters 'n', 'r', 'w' are now obsoleted by "
+ "'{quorum, {N,R,W}}'.")
+ end,
+
+ case proplists:get_value(number_of_tables, Args) of
+ undefined -> ok;
+ TableNum ->
+ ets:insert(config, {dets_number_of_tables, TableNum}),
+ ?warning("The parameter 'number_of_tables' is now obsoleted by "
+ "'dets_number_of_tables'.")
+ end.
+
do_get(Key) ->
case ets:lookup(config, Key) of
[{Key, Value}|_] -> Value;
Modified: branches/takemaru_refactoring/src/kai_coordinator.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_coordinator.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai_coordinator.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -71,7 +71,7 @@
fun(Node) -> spawn(?MODULE, map_in_get, [Node, Data2, Ref, self()]) end, % Don't link
Nodes
),
- [N, R] = kai_config:get([n, r]),
+ {N,R,_W} = kai_config:get(quorum),
case gather_in_get(Ref, N, R, []) of
ListOfData when is_list(ListOfData) ->
%% TODO: write back recent if multiple versions are found and they can be resolved
@@ -142,7 +142,7 @@
fun(Node) -> spawn(?MODULE, map_in_put, [Node, Data3, Ref, self()]) end,
Nodes
),
- [N, W] = kai_config:get([n, w]),
+ {N,_R,W} = kai_config:get(quorum),
gather_in_put(Ref, N, W).
map_in_put(Node, Data, Ref, Pid) ->
@@ -176,7 +176,7 @@
fun(Node) -> spawn(?MODULE, map_in_delete, [Node, Data2, Ref, self()]) end,
Nodes
),
- [N, W] = kai_config:get([n, w]),
+ {N,_R,W} = kai_config:get(quorum),
gather_in_delete(Ref, N, W, []).
map_in_delete(Node, Data, Ref, Pid) ->
Modified: branches/takemaru_refactoring/src/kai_hash.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_hash.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai_hash.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -120,8 +120,8 @@
end.
update_buckets() ->
- [LocalNode, N, NumberOfBuckets] =
- kai_config:get([node, n, number_of_buckets]),
+ [LocalNode, {N,_R,_W}, NumberOfBuckets] =
+ kai_config:get([node, quorum, number_of_buckets]),
BucketRange = bucket_range(NumberOfBuckets),
NumberOfNodes = proplists:get_value(size, ets:info(node_list)),
Modified: branches/takemaru_refactoring/src/kai_stat.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_stat.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai_stat.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -36,9 +36,9 @@
init(_Args) ->
{Msec, Sec, _Usec} = now(),
BootTime = 1000000 * Msec + Sec,
- [LocalNode, N, R, W, NumberOfBuckets, NumberOfVirtualNodes, Store] =
+ [LocalNode, {N,R,W}, NumberOfBuckets, NumberOfVirtualNodes, Store] =
kai_config:get([
- node, n, r, w, number_of_buckets, number_of_virtual_nodes, store
+ node, quorum, number_of_buckets, number_of_virtual_nodes, store
]),
Quorum = join(",", [integer_to_list(X) || X <- [N, R, W]]),
{ok, #state{
Modified: branches/takemaru_refactoring/src/kai_store_dets.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_store_dets.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/src/kai_store_dets.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -26,8 +26,7 @@
gen_server:start_link({local, Server}, ?MODULE, [], _Opts = []).
init(_Args) ->
- Dir = kai_config:get(dets_dir),
- NumberOfTables = kai_config:get(number_of_tables),
+ [Dir, TableNum] =kai_config:get([dets_dir, dets_number_of_tables]),
Tables =
lists:map(
fun(I) ->
@@ -39,9 +38,9 @@
exit(Reason)
end
end,
- lists:seq(1, NumberOfTables)
+ lists:seq(1, TableNum)
),
- {ok, #state{number_of_tables = NumberOfTables, tables = Tables}}.
+ {ok, #state{number_of_tables = TableNum, tables = Tables}}.
terminate(_Reason, State) ->
lists:foreach(
Modified: branches/takemaru_refactoring/test/kai.coverspec
===================================================================
--- branches/takemaru_refactoring/test/kai.coverspec 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai.coverspec 2009-05-29 13:44:48 UTC (rev 148)
@@ -1,7 +1,7 @@
{level, details}.
{incl_mods, [
vclock, kai_config, kai_log, kai_hash, kai_version, kai_store,
- kai_store_etc, kai_store_dets, kai_stat, kai_connection, kai_tcp_server,
+ kai_store_ets, kai_store_dets, kai_stat, kai_connection, kai_tcp_server,
kai_tcp_server_sup, kai_tcp_server_acceptor, kai_tcp_server_monitor,
kai_rpc, kai_sync, kai_membership, kai_coordinator, kai_memcache
]}.
Modified: branches/takemaru_refactoring/test/kai_config_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_config_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_config_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -18,12 +18,13 @@
-include("kai_test.hrl").
-define(ARGS, [{rpc_port, ?PORT1},
- {number_of_buckets, 15000}, %% 15000 should be upgraded to 2^14 = 16384
+ {quorum, {3,2,2}},
+ {number_of_buckets, 15000}, %% 15000 is upgraded to 2^14 = 16384
{number_of_virtual_nodes, 128}]).
sequences() ->
[{seq, [node, node_by_ipaddr, node_without_hostname, number_of_buckets,
- no_such_parameter, parameter_list, node_info]}].
+ no_such_parameter, parameter_list, node_info, backward_compatibility]}].
all() -> [{sequence, seq}].
@@ -31,6 +32,8 @@
init(Conf, [{hostname, "127.0.0.1"}|?ARGS]);
init_per_testcase(node_without_hostname, Conf) ->
init(Conf, ?ARGS);
+init_per_testcase(backward_compatibility, Conf) ->
+ init(Conf, [{n,2}, {r,2}, {w,2}, {number_of_tables, 128}|?ARGS]);
init_per_testcase(_TestCase, Conf) ->
init(Conf, [{hostname, "localhost"}|?ARGS]).
@@ -42,40 +45,28 @@
kai_config:stop().
node(_Conf) ->
- ?assertEqual(
- ?NODE1,
- kai_config:get(node)
- ).
+ ?NODE1 = kai_config:get(node).
node_by_ipaddr(_Conf) ->
- ?assertEqual(
- ?NODE1,
- kai_config:get(node)
- ).
+ ?NODE1 = kai_config:get(node).
node_without_hostname(_Conf) ->
{_Addr, ?PORT1} = kai_config:get(node).
number_of_buckets(_Conf) ->
- ?assertEqual(
- 16384,
- kai_config:get(number_of_buckets)
- ).
+ 16384 = kai_config:get(number_of_buckets).
no_such_parameter(_Conf) ->
- ?assertEqual(
- undefined,
- kai_config:get(no_such_parameter)
- ).
+ undefined = kai_config:get(no_such_parameter).
parameter_list(_Conf) ->
- ?assertEqual(
- [?NODE1, 16384, 128],
- kai_config:get([node, number_of_buckets, number_of_virtual_nodes])
- ).
+ [?NODE1, 16384, 128] =
+ kai_config:get([node, number_of_buckets, number_of_virtual_nodes]).
node_info(_Conf) ->
- ?assertEqual(
- {node_info, ?NODE1, [{number_of_virtual_nodes, 128}]},
- kai_config:node_info()
- ).
+ {node_info, ?NODE1, [{number_of_virtual_nodes, 128}]} =
+ kai_config:node_info().
+
+backward_compatibility(_Conf) ->
+ {2,2,2} = kai_config:get(quorum),
+ 128 = kai_config:get(dets_number_of_tables).
Modified: branches/takemaru_refactoring/test/kai_connection_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_connection_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -27,6 +27,7 @@
init_per_testcase(_TestCase, Conf) ->
kai_config:start_link([{rpc_port, ?PORT1},
+ {quorum, {3,2,2}},
{max_connections, ?MAX_CONN},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2}]),
Modified: branches/takemaru_refactoring/test/kai_coordinator_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_coordinator_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_coordinator_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -24,11 +24,11 @@
all() -> [{sequence, seq}].
init_per_testcase(standalone, Conf) ->
- init_node(Conf, 1, [{n,1}, {r,1}, {w,1}]);
+ init_node(Conf, 1, [{quorum, {1,1,1}}]);
init_per_testcase(cluster, Conf) ->
- Conf2 = init_node(Conf, 1, [{n,2}, {r,2}, {w,2}]),
- Conf3 = init_node(Conf2, 2, [{n,2}, {r,2}, {w,2}]),
- Conf4 = init_node(Conf3, 3, [{n,2}, {r,2}, {w,2}]),
+ Conf2 = init_node(Conf, 1, [{quorum, {2,2,2}}]),
+ Conf3 = init_node(Conf2, 2, [{quorum, {2,2,2}}]),
+ Conf4 = init_node(Conf3, 3, [{quorum, {2,2,2}}]),
rpc:call(?config(node1, Conf4), kai_membership, check_node, [?NODE2]),
rpc:call(?config(node2, Conf4), kai_membership, check_node, [?NODE3]),
rpc:call(?config(node3, Conf4), kai_membership, check_node, [?NODE1]),
@@ -36,8 +36,8 @@
wait(),
Conf4;
init_per_testcase(_TestCase, Conf) ->
- Conf2 = init_node(Conf, 1, [{n,2}, {r,2}, {w,2}]),
- Conf3 = init_node(Conf2, 2, [{n,2}, {r,2}, {w,2}]),
+ Conf2 = init_node(Conf, 1, [{quorum, {2,2,2}}]),
+ Conf3 = init_node(Conf2, 2, [{quorum, {2,2,2}}]),
rpc:call(?config(node1, Conf3), kai_membership, check_node, [?NODE2]),
rpc:call(?config(node2, Conf3), kai_membership, check_node, [?NODE1]),
wait(),
Modified: branches/takemaru_refactoring/test/kai_hash_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_hash_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_hash_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -51,7 +51,7 @@
init_per_testcase(perf, Conf) ->
kai_config:start_link([{rpc_port, 1},
- {n, 3},
+ {quorum, {3,2,2}},
{number_of_buckets, 15000},
{number_of_virtual_nodes, 128}]),
kai_hash:start_link(),
@@ -59,7 +59,7 @@
init_per_testcase(_TestCase, Conf) ->
kai_config:start_link([{hostname, "127.0.0.1"},
{rpc_port, 11011},
- {n, 2},
+ {quorum, {2,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2}]),
kai_hash:start_link(),
Modified: branches/takemaru_refactoring/test/kai_log_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_log_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_log_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -24,6 +24,7 @@
init_per_testcase(_TestCase, Conf) ->
kai_config:start_link([{rpc_port, 11011},
+ {quorum, {3,2,2}},
{number_of_buckets, 15000},
{number_of_virtual_nodes, 128}]),
kai_log:start_link(),
Modified: branches/takemaru_refactoring/test/kai_membership_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_membership_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -28,9 +28,9 @@
Conf2 = init_node(Conf, 1),
init_node(Conf2, 2);
init_per_testcase(_TestCase, Conf) ->
- Conf2 = init_node(Conf, 1, [{n,2}, {r,2}, {w,2}]),
- Conf3 = init_node(Conf2, 2, [{n,2}, {r,2}, {w,2}]),
- init_node(Conf3, 3, [{n,2}, {r,2}, {w,2}]).
+ Conf2 = init_node(Conf, 1, [{quorum, {2,2,2}}]),
+ Conf3 = init_node(Conf2, 2, [{quorum, {2,2,2}}]),
+ init_node(Conf3, 3, [{quorum, {2,2,2}}]).
end_per_testcase(add_directly, Conf) ->
end_node(Conf, 1),
Modified: branches/takemaru_refactoring/test/kai_memcache_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_memcache_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_memcache_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -22,11 +22,11 @@
all() -> [{sequence, seq}].
init_per_testcase(standalone, Conf) ->
- init_node(Conf, 1, [{n,1}, {r,1}, {w,1}]);
+ init_node(Conf, 1, [{quorum, {1,1,1}}]);
init_per_testcase(_TestCase, Conf) ->
- Conf2 = init_node(Conf, 1, [{n,2}, {r,2}, {w,2}]),
- Conf3 = init_node(Conf2, 2, [{n,2}, {r,2}, {w,2}]),
- Conf4 = init_node(Conf3, 3, [{n,2}, {r,2}, {w,2}]),
+ Conf2 = init_node(Conf, 1, [{quorum, {2,2,2}}]),
+ Conf3 = init_node(Conf2, 2, [{quorum, {2,2,2}}]),
+ Conf4 = init_node(Conf3, 3, [{quorum, {2,2,2}}]),
rpc:call(?config(node1, Conf4), kai_membership, check_node, [?NODE2]),
rpc:call(?config(node2, Conf4), kai_membership, check_node, [?NODE3]),
rpc:call(?config(node3, Conf4), kai_membership, check_node, [?NODE1]),
Modified: branches/takemaru_refactoring/test/kai_rpc_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_rpc_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -32,6 +32,7 @@
{rpc_port, ?PORT1},
{rpc_max_processes, 2},
{max_connections, 4},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2}]),
Conf.
Modified: branches/takemaru_refactoring/test/kai_stat_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_stat_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_stat_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -24,7 +24,7 @@
init_per_testcase(_TestCase, Conf) ->
kai_config:start_link([{hostname, "127.0.0.1"},
{rpc_port, 11011},
- {n, 3}, {r, 2}, {w, 2},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2},
{store, ets}]),
Modified: branches/takemaru_refactoring/test/kai_store_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_store_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -18,15 +18,17 @@
-include("kai_test.hrl").
-define(ETS_ARGS, [{rpc_port, 11011},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2},
{store, ets}]).
-define(DETS_ARGS, [{rpc_port, 11011},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2},
{store, dets},
{dets_dir, "."},
- {number_of_tables, 2}]).
+ {dets_number_of_tables, 2}]).
sequences() ->
[{ ets, [ ets_crud, ets_conflict, ets_info, ets_perf]},
Modified: branches/takemaru_refactoring/test/kai_version_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-05-28 14:24:37 UTC (rev 147)
+++ branches/takemaru_refactoring/test/kai_version_SUITE.erl 2009-05-29 13:44:48 UTC (rev 148)
@@ -24,6 +24,7 @@
init_per_testcase(_TestCase, Conf) ->
kai_config:start_link([{rpc_port, 11011},
+ {quorum, {3,2,2}},
{number_of_buckets, 4},
{number_of_virtual_nodes, 2}]),
kai_version:start_link(),
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-05-24 14:17:56
|
Revision: 146
http://kai.svn.sourceforge.net/kai/?rev=146&view=rev
Author: takemaru
Date: 2009-05-24 13:42:43 +0000 (Sun, 24 May 2009)
Log Message:
-----------
* Merges -r 143:145 from trunk/
Modified Paths:
--------------
branches/takemaru_refactoring/src/kai_version.erl
branches/takemaru_refactoring/test/vclock_SUITE.erl
Modified: branches/takemaru_refactoring/src/kai_version.erl
===================================================================
--- branches/takemaru_refactoring/src/kai_version.erl 2009-05-24 01:20:52 UTC (rev 145)
+++ branches/takemaru_refactoring/src/kai_version.erl 2009-05-24 13:42:43 UTC (rev 146)
@@ -21,7 +21,6 @@
]).
-include("kai.hrl").
--record(state, {vector_clocks}).
-define(SERVER, ?MODULE).
-define(CAS_UNIQUE_BITS, 64).
@@ -29,19 +28,15 @@
gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
init(_Args) ->
- NodeVClock = vclock:increment(kai_config:get(node), vclock:fresh()),
- {ok, #state{vector_clocks = NodeVClock}}.
+ {ok, []}.
terminate(_Reason, _State) ->
ok.
update(Data, State) ->
- NodeVClock =State#state.vector_clocks,
- NewNodeVClock = vclock:increment(kai_config:get(node), NodeVClock),
NewDataVClock = vclock:increment(kai_config:get(node), Data#data.vector_clocks),
- NewState = State#state{vector_clocks = NewNodeVClock},
{reply,
- {ok, Data#data{last_modified=now(), vector_clocks=NewDataVClock}}, NewState }.
+ {ok, Data#data{last_modified=now(), vector_clocks=NewDataVClock}}, State }.
do_order([], []) ->
undefined;
Modified: branches/takemaru_refactoring/test/vclock_SUITE.erl
===================================================================
--- branches/takemaru_refactoring/test/vclock_SUITE.erl 2009-05-24 01:20:52 UTC (rev 145)
+++ branches/takemaru_refactoring/test/vclock_SUITE.erl 2009-05-24 13:42:43 UTC (rev 146)
@@ -16,7 +16,9 @@
-include("kai.hrl").
-include("kai_test.hrl").
-all() -> [test_desceds_in_single_node].
+all() -> [test_desceds_in_single_node,
+ test_concurrent,
+ test_merge].
test_desceds_in_single_node() -> [].
@@ -51,8 +53,15 @@
C1 = vclock:merge([A2, B2]),
?assert(vclock:descends(C1, A2)),
?assert(vclock:descends(C1, B2)),
+ ?assertNot(vclock:descends(A2, C1)),
+ ?assertNot(vclock:descends(B2, C1)),
+
C2 = vclock:increment(c, C1),
- ?assertNot(vclock:descends(C2, A2)),
- ?assertNot(vclock:descends(C2, B2)),
- ?assertNot(vclock:descends(C2, C1)),
+
+ ?assert(vclock:descends(C2, A2)),
+ ?assert(vclock:descends(C2, B2)),
+ ?assert(vclock:descends(C2, C1)),
+
+ ?assertNot(vclock:descends(A2, C2)),
+ ?assertNot(vclock:descends(B2, C2)),
ok.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-05-24 01:21:06
|
Revision: 145
http://kai.svn.sourceforge.net/kai/?rev=145&view=rev
Author: shino_shun
Date: 2009-05-24 01:20:52 +0000 (Sun, 24 May 2009)
Log Message:
-----------
add: test functions to all()
fix: invert wrong assertions
add: some assertions
Modified Paths:
--------------
trunk/test/vclock_SUITE.erl
Modified: trunk/test/vclock_SUITE.erl
===================================================================
--- trunk/test/vclock_SUITE.erl 2009-05-24 01:02:13 UTC (rev 144)
+++ trunk/test/vclock_SUITE.erl 2009-05-24 01:20:52 UTC (rev 145)
@@ -16,7 +16,9 @@
-include("kai.hrl").
-include("kai_test.hrl").
-all() -> [test_desceds_in_single_node].
+all() -> [test_desceds_in_single_node,
+ test_concurrent,
+ test_merge].
test_desceds_in_single_node() -> [].
@@ -51,8 +53,15 @@
C1 = vclock:merge([A2, B2]),
?assert(vclock:descends(C1, A2)),
?assert(vclock:descends(C1, B2)),
+ ?assertNot(vclock:descends(A2, C1)),
+ ?assertNot(vclock:descends(B2, C1)),
+
C2 = vclock:increment(c, C1),
- ?assertNot(vclock:descends(C2, A2)),
- ?assertNot(vclock:descends(C2, B2)),
- ?assertNot(vclock:descends(C2, C1)),
+
+ ?assert(vclock:descends(C2, A2)),
+ ?assert(vclock:descends(C2, B2)),
+ ?assert(vclock:descends(C2, C1)),
+
+ ?assertNot(vclock:descends(A2, C2)),
+ ?assertNot(vclock:descends(B2, C2)),
ok.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-05-24 01:02:20
|
Revision: 144
http://kai.svn.sourceforge.net/kai/?rev=144&view=rev
Author: shino_shun
Date: 2009-05-24 01:02:13 +0000 (Sun, 24 May 2009)
Log Message:
-----------
fix: remove state record, which is not used
Modified Paths:
--------------
trunk/src/kai_version.erl
Modified: trunk/src/kai_version.erl
===================================================================
--- trunk/src/kai_version.erl 2009-05-23 13:46:58 UTC (rev 143)
+++ trunk/src/kai_version.erl 2009-05-24 01:02:13 UTC (rev 144)
@@ -21,7 +21,6 @@
]).
-include("kai.hrl").
--record(state, {vector_clocks}).
-define(SERVER, ?MODULE).
-define(CAS_UNIQUE_BITS, 64).
@@ -29,19 +28,15 @@
gen_server:start_link({local, ?SERVER}, ?MODULE, [], _Opts = []).
init(_Args) ->
- NodeVClock = vclock:increment(kai_config:get(node), vclock:fresh()),
- {ok, #state{vector_clocks = NodeVClock}}.
+ {ok, []}.
terminate(_Reason, _State) ->
ok.
update(Data, State) ->
- NodeVClock =State#state.vector_clocks,
- NewNodeVClock = vclock:increment(kai_config:get(node), NodeVClock),
NewDataVClock = vclock:increment(kai_config:get(node), Data#data.vector_clocks),
- NewState = State#state{vector_clocks = NewNodeVClock},
{reply,
- {ok, Data#data{last_modified=now(), vector_clocks=NewDataVClock}}, NewState }.
+ {ok, Data#data{last_modified=now(), vector_clocks=NewDataVClock}}, State }.
do_order([], []) ->
undefined;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-05-23 14:22:13
|
Revision: 143
http://kai.svn.sourceforge.net/kai/?rev=143&view=rev
Author: takemaru
Date: 2009-05-23 13:46:58 +0000 (Sat, 23 May 2009)
Log Message:
-----------
* Makefile
- Updates the outdated version number in Makefile.
Modified Paths:
--------------
branches/takemaru_refactoring/Makefile
tags/0.4.0/Makefile
Modified: branches/takemaru_refactoring/Makefile
===================================================================
--- branches/takemaru_refactoring/Makefile 2009-05-23 12:19:15 UTC (rev 142)
+++ branches/takemaru_refactoring/Makefile 2009-05-23 13:46:58 UTC (rev 143)
@@ -9,7 +9,7 @@
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
-KAI_VSN=0.3.0
+KAI_VSN=0.4.0
ifndef ROOT
ROOT=$(shell pwd)
Modified: tags/0.4.0/Makefile
===================================================================
--- tags/0.4.0/Makefile 2009-05-23 12:19:15 UTC (rev 142)
+++ tags/0.4.0/Makefile 2009-05-23 13:46:58 UTC (rev 143)
@@ -9,7 +9,7 @@
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
-KAI_VSN=0.3.0
+KAI_VSN=0.4.0
ifndef ROOT
ROOT=$(shell pwd)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <coo...@us...> - 2009-05-23 13:22:28
|
Revision: 142
http://kai.svn.sourceforge.net/kai/?rev=142&view=rev
Author: cooldaemon
Date: 2009-05-23 12:19:15 +0000 (Sat, 23 May 2009)
Log Message:
-----------
Fixed a version number for a app file.
Modified Paths:
--------------
trunk/Makefile
Modified: trunk/Makefile
===================================================================
--- trunk/Makefile 2009-05-22 12:08:06 UTC (rev 141)
+++ trunk/Makefile 2009-05-23 12:19:15 UTC (rev 142)
@@ -9,7 +9,7 @@
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
-KAI_VSN=0.3.0
+KAI_VSN=0.4.0
ifndef ROOT
ROOT=$(shell pwd)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-05-22 12:08:25
|
Revision: 141
http://kai.svn.sourceforge.net/kai/?rev=141&view=rev
Author: takemaru
Date: 2009-05-22 12:08:06 +0000 (Fri, 22 May 2009)
Log Message:
-----------
* branches/takemaru_refactoring
- Makes a new branch for radical refactoring.
Added Paths:
-----------
branches/takemaru_refactoring/
branches/takemaru_refactoring/contrib/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tak...@us...> - 2009-05-21 14:01:48
|
Revision: 140
http://kai.svn.sourceforge.net/kai/?rev=140&view=rev
Author: takemaru
Date: 2009-05-21 13:44:14 +0000 (Thu, 21 May 2009)
Log Message:
-----------
* v0.4 released.
- Operational features:
- stats and version command of memcache API, and
- Cacti templates.
- Fixing some bugs on handling memcache API.
Added Paths:
-----------
tags/0.4.0/
tags/0.4.0/contrib/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-03-25 23:29:06
|
Revision: 138
http://kai.svn.sourceforge.net/kai/?rev=138&view=rev
Author: shino_shun
Date: 2009-03-25 23:28:51 +0000 (Wed, 25 Mar 2009)
Log Message:
-----------
fix: initial length of unreconciled_get in kai_stat
Modified Paths:
--------------
trunk/src/kai_stat.erl
trunk/test/kai_coordinator_SUITE.erl
Modified: trunk/src/kai_stat.erl
===================================================================
--- trunk/src/kai_stat.erl 2009-03-24 02:40:08 UTC (rev 137)
+++ trunk/src/kai_stat.erl 2009-03-25 23:28:51 UTC (rev 138)
@@ -47,7 +47,7 @@
cmd_set = 0,
bytes_read = 0,
bytes_write = 0,
- unreconciled_get = array:new([{size,2}, {fixed, false},
+ unreconciled_get = array:new([{size, N-1}, {fixed, false},
{default, [0,0]}]),
node = LocalNode,
quorum = Quorum,
Modified: trunk/test/kai_coordinator_SUITE.erl
===================================================================
--- trunk/test/kai_coordinator_SUITE.erl 2009-03-24 02:40:08 UTC (rev 137)
+++ trunk/test/kai_coordinator_SUITE.erl 2009-03-25 23:28:51 UTC (rev 138)
@@ -125,7 +125,8 @@
StatOfUnreconciledGet =
proplists:get_value(kai_unreconciled_get,
rpc:call(Node1, kai_stat, all, [])),
- ?assertEqual("1(1) 0(0)", lists:flatten(StatOfUnreconciledGet)),
+ ?assertEqual("1(1)",
+ lists:sublist(lists:flatten(StatOfUnreconciledGet), 4)),
ok.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <shi...@us...> - 2009-03-24 02:40:17
|
Revision: 137
http://kai.svn.sourceforge.net/kai/?rev=137&view=rev
Author: shino_shun
Date: 2009-03-24 02:40:08 +0000 (Tue, 24 Mar 2009)
Log Message:
-----------
add: coordinator updates kai_unreconciled_get statistics
Modified Paths:
--------------
trunk/src/kai_coordinator.erl
trunk/test/kai_coordinator_SUITE.erl
Modified: trunk/src/kai_coordinator.erl
===================================================================
--- trunk/src/kai_coordinator.erl 2009-03-24 02:38:28 UTC (rev 136)
+++ trunk/src/kai_coordinator.erl 2009-03-24 02:40:08 UTC (rev 137)
@@ -74,8 +74,19 @@
[N, R] = kai_config:get([n, r]),
case gather_in_get(Ref, N, R, []) of
ListOfData when is_list(ListOfData) ->
- % TODO: write back recent if multiple versions are found and they can be resolved
- kai_version:order(ListOfData);
+ %% TODO: write back recent if multiple versions are found and they can be resolved
+ InternalNum = sets:size(
+ sets:from_list(
+ lists:map(fun(E) -> E#data.vector_clocks end,
+ ListOfData))),
+ ReconciledList = kai_version:order(ListOfData),
+ if
+ InternalNum > 1 ->
+ kai_stat:incr_unreconciled_get(
+ {InternalNum, length(ReconciledList) =:= 1});
+ true -> ok
+ end,
+ ReconciledList;
_NoData ->
undefined
end.
Modified: trunk/test/kai_coordinator_SUITE.erl
===================================================================
--- trunk/test/kai_coordinator_SUITE.erl 2009-03-24 02:38:28 UTC (rev 136)
+++ trunk/test/kai_coordinator_SUITE.erl 2009-03-24 02:40:08 UTC (rev 137)
@@ -55,6 +55,7 @@
]),
kai_hash:start_link(),
kai_store:start_link(),
+ kai_stat:start_link(),
kai_version:start_link(),
kai_connection:start_link(),
kai_rpc:start_link(),
@@ -97,6 +98,7 @@
kai_rpc:stop(),
kai_connection:stop(),
kai_version:stop(),
+ kai_stat:stop(),
kai_store:stop(),
kai_hash:stop(),
kai_config:stop().
@@ -107,6 +109,7 @@
Key = "key1",
Node1 = ?config(node1, Config),
Node2 = ?config(node2, Config),
+
ok = rpc:call(Node1, kai_coordinator, route, [{put, #data{key=Key, value="value1"}}]),
[Data] = rpc:call(Node1, kai_coordinator, route, [{get, #data{key=Key}}]),
IntentionalConcurrentVCAtNode1 = vclock:increment(rpc:call(Node1, kai_config, get, [node]),
@@ -115,9 +118,14 @@
Data#data.vector_clocks),
ok = rpc:call(Node1, kai_store, put, [Data#data{vector_clocks=IntentionalConcurrentVCAtNode1}]),
ok = rpc:call(Node2, kai_store, put, [Data#data{vector_clocks=IntentionalConcurrentVCAtNode2}]),
+
GetResult = rpc:call(Node1, kai_coordinator, route, [{get, #data{key=Key}}]),
p("get result:", GetResult),
?assertEqual(2, length(GetResult)),
+ StatOfUnreconciledGet =
+ proplists:get_value(kai_unreconciled_get,
+ rpc:call(Node1, kai_stat, all, [])),
+ ?assertEqual("1(1) 0(0)", lists:flatten(StatOfUnreconciledGet)),
ok.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|