[Assorted-commits] SF.net SVN: assorted:[1156] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-03 00:00:31
|
Revision: 1156 http://assorted.svn.sourceforge.net/assorted/?rev=1156&view=rev Author: yangzhang Date: 2009-02-03 00:00:26 +0000 (Tue, 03 Feb 2009) Log Message: ----------- - added simple "WAL" and leader-only mode - changed experiments from yield vs block to single vs multi host Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/README 2009-02-03 00:00:26 UTC (rev 1156) @@ -249,8 +249,13 @@ takes more around 50 ms - DONE start building infrastructure for disk IO -Period: 1/27- +Period: 1/27-2/3 +- DONE simple wal + +Period: 2/3- + +- DONE better wal - TODO fix up analysis of multihost recovery - TODO implement checkpointing disk-based scheme - TODO implement log-based recovery; show that it sucks Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/src/Makefile 2009-02-03 00:00:26 UTC (rev 1156) @@ -26,7 +26,8 @@ GCOV := -fprofile-arcs -ftest-coverage endif LDFLAGS := -pthread -lstx -lst -lresolv -lprotobuf -lgtest \ - -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt $(GPROF) + -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ + -lboost_serialization-gcc43-mt $(GPROF) # The -Wno- warnings are for boost. CXXFLAGS := -g3 -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/src/main.lzz.clamp 2009-02-03 00:00:26 UTC (rev 1156) @@ -1,4 +1,6 @@ #hdr +#include <boost/archive/binary_iarchive.hpp> +#include <boost/archive/binary_oarchive.hpp> #include <boost/bind.hpp> #include <boost/foreach.hpp> #include <boost/program_options.hpp> @@ -14,7 +16,7 @@ #include <cstdio> #include <cstring> // strsignal #include <iostream> -#include <fstream> +#include <fstream> // ofstream #include <gtest/gtest.h> #include <malloc.h> #include <map> @@ -27,6 +29,7 @@ #include "ydb.pb.h" #define foreach BOOST_FOREACH using namespace boost; +using namespace boost::archive; using namespace commons; using namespace std; using namespace testing; @@ -42,7 +45,7 @@ size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory; + debug_threads, multirecover, disk, debug_memory, use_wal; long long timelim, read_thresh, write_thresh; // Control. @@ -355,6 +358,35 @@ } /** + * ARIES write-ahead log. No undo logging necessary (no steal). + */ +class wal +{ +public: + wal() : of("wal"), out(of) {} + void del(int key) { + int op = op_del; // TODO: is this really necessary? + out & op & key; + } + void write(int key, int val) { + int op = op_write; + out & op & key & val; + } + void commit() { + int op = op_commit; + out & op; + } +private: + enum { op_del, op_write, op_commit }; + ofstream of; + binary_oarchive out; +}; + +// Globals +map<int, int> g_map; +wal *g_wal; + +/** * Keep issuing transactions to the replicas. */ void @@ -388,7 +420,7 @@ // Generate a random transaction. Txn txn; - txn.set_seqno(seqno++); + txn.set_seqno(seqno); int count = randint(min_ops, max_ops + 1); for (int o = 0; o < count; o++) { Op *op = txn.add_op(); @@ -400,8 +432,13 @@ if (do_pause) do_pause.waitreset(); - // Broadcast. - bcastmsg(fds, txn); + // Process, or broadcast and increment seqno. + if (fds.empty()) { + int dummy_seqno = seqno - 1; + process_txn(nullptr, g_map, txn, dummy_seqno, true); + } else { + bcastmsg(fds, txn); + } // Checkpoint. if (txn.seqno() % chkpt == 0) { @@ -426,6 +463,8 @@ cout << "stopping on issue of seqno " << txn.seqno() << endl; stop_hub.set(); } + + ++seqno; } Txn txn; @@ -441,6 +480,7 @@ process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, bool caught_up) { + wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); Response res; res.set_seqno(txn.seqno()); @@ -449,27 +489,33 @@ for (int o = 0; o < txn.op_size(); o++) { const Op &op = txn.op(o); const int key = op.key(); + ::map<int, int>::iterator it = map.find(key); if (show_updates || count_updates) { - if (map.find(key) != map.end()) { + if (it != map.end()) { if (show_updates) cout << "existing key: " << key << endl; if (count_updates) updates++; } } switch (op.type()) { case Op::read: - res.add_result(map[key]); + if (it == map.end()) res.add_result(0); + else res.add_result(it->second); break; case Op::write: - map[key] = op.value(); + if (use_wal) wal.write(key, op.value()); + if (it == map.end()) map[key] = op.value(); + else it->second = op.value(); break; case Op::del: - map.erase(key); + if (it != map.end()) { + if (use_wal) wal.del(key); + map.erase(it); + } break; } } - if (caught_up) { - sendmsg(leader, res); - } + if (use_wal) wal.commit(); + if (caught_up && leader != nullptr) sendmsg(leader, res); } void @@ -531,6 +577,8 @@ * calculating the sub-range of the map for which this node is responsible. * * \param[in] nnodes The total number nodes in the Init message list. + * + * \param[in] wal The WAL. */ void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, @@ -801,6 +849,9 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; + scoped_ptr<wal> pwal(new wal); + g_wal = pwal.get(); + // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); st_closing close_listener(listener); @@ -1139,6 +1190,8 @@ "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") + ("wal", po::bool_switch(&use_wal), + "enable ARIES write-ahead logging") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/tools/analysis.py 2009-02-03 00:00:26 UTC (rev 1156) @@ -93,9 +93,9 @@ ylim(ymin = 0) savefig('scaling.png') -def run(blockpath, yieldpath): - for path, titlestr, name in [#(blockpath, 'blocking scheme', 'block'), - (yieldpath, 'yielding scheme', 'yield')]: +def run(singlepath, multipath): + for path, titlestr, name in [(singlepath, 'single recoverer', 'single'), + (multipath, 'multi recoverer', 'multi')]: print '===', titlestr, '===' print 'file:', getname(path) res = logextract(path, 'seqno', @@ -151,7 +151,7 @@ elif argv[1] == 'scaling': scaling(argv[2] if len(argv) > 2 else 'scaling-log') elif argv[1] == 'run': - run(*argv[2:] if len(argv) > 2 else ['block-log', 'yield-log']) + run(*argv[2:] if len(argv) > 2 else ['single-log', 'multi-log']) else: print >> sys.stderr, 'Unknown command:', argv[1] Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/tools/test.bash 2009-02-03 00:00:26 UTC (rev 1156) @@ -282,7 +282,7 @@ done sleep .1 # pexpect 'got all \d+ replicas' leader # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader ${extraargs:-}" & # -v --debug-threads -t 200000" & + tagssh $1 "ydb/src/ydb -H $leader --yield-catch-up ${extraargs:-}" & # -v --debug-threads -t 200000" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -297,8 +297,9 @@ hostargs run-helper } -full-run() { - for seqno in 500000 400000 300000 200000 100000 ; do # 200000 300000 400000 500000 ; do # 700000 900000; do # configurations +# Recovery experient. +exp() { + for seqno in 500000 400000 300000 200000 100000 ; do # configurations stop for i in {1..5} ; do # trials echo === seqno=$seqno i=$i === @@ -312,16 +313,18 @@ done } -full-block() { - local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out block-log - full-run >& $out +# Single-host recovery experiment. +exp-single() { + local out=single-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out single-log + exp >& $out } -full-yield() { - local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out yield-log - extraargs="--yield-catch-up ${extraargs:-}" full-run >& $out +# Multi-host recovery experiment. +exp-multi() { + local out=multi-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out multi-log + extraargs="-m ${extraargs:-}" exp >& $out } stop-helper() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |