[Assorted-commits] SF.net SVN: assorted:[1111] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-12-22 22:57:43
|
Revision: 1111 http://assorted.svn.sourceforge.net/assorted/?rev=1111&view=rev Author: yangzhang Date: 2008-12-22 22:33:22 +0000 (Mon, 22 Dec 2008) Log Message: ----------- - replaced dump_state with a finally lambda - added accept_joiner_seqno - tweaked output - fixed run() in test.bash Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2008-12-19 23:46:26 UTC (rev 1110) +++ ydb/trunk/src/main.lzz.clamp 2008-12-22 22:33:22 UTC (rev 1111) @@ -29,7 +29,7 @@ typedef pair<int, int> pii; st_utime_t timeout; -int chkpt; +int chkpt, accept_joiner_seqno; bool verbose, yield_during_build_up, yield_during_catch_up; long long timelim; st_intr_bool stop_hub, kill_hub; @@ -143,28 +143,6 @@ }; /** - * RAII for dumping the final state of the DB to a file on disk. - */ -class dump_state -{ - public: - dump_state(const map<int, int> &map, const int &seqno) - : map_(map), seqno_(seqno) {} - ~dump_state() { - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << seqno_ << endl; - foreach (const pii &p, map_) { - of << p.first << ": " << p.second << endl; - } - } - private: - const map<int, int> &map_; - const int &seqno_; -}; - -/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -246,7 +224,8 @@ * Keep issuing transactions to the replicas. */ void -issue_txns(st_channel<replica_info> &newreps, int &seqno) +issue_txns(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) { Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; @@ -292,6 +271,10 @@ } st_sleep(0); } + + if (txn.seqno() == accept_joiner_seqno) { + accept_joiner.set(); + } } } @@ -423,10 +406,10 @@ } /** - * Keep swallowing replica responses. + * Swallow replica responses. */ void -handle_responses(st_netfd_t replica, const int &seqno, +handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { st_channel<long long> &sub = recover_signals.subscribe(); @@ -437,6 +420,7 @@ recovery_end_seqno = -1; finally f(lambda () { long long end_time = current_time_millis(); + cout << __ref(rid) << ": "; showtput("after recovery, finished", end_time, __ref(recovery_end_time), __ref(seqno), __ref(recovery_end_seqno)); }); @@ -449,11 +433,13 @@ if (recovery_start_time == -1 && !sub.empty()) { recovery_start_time = sub.take(); recovery_start_seqno = seqno; + cout << rid << ": "; showtput("before recovery, finished", recovery_start_time, start_time, recovery_start_seqno, 0); } else if (recovery_end_time == -1 && !sub.empty()) { recovery_end_time = sub.take(); recovery_end_seqno = seqno; + cout << rid << ": "; showtput("during recovery, finished", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); } @@ -461,16 +447,20 @@ long long t = current_time_millis(), timediff = t - start_time; caught_up = true; recover_signals.push(t); + cout << rid << ": "; cout << "recovering node caught up; took " << timediff << "ms" << endl; } if (res.seqno() % chkpt == 0) { - if (verbose) + if (verbose) { + cout << rid << ": "; cout << "got response " << res.seqno() << " from " << replica << endl; + } st_sleep(0); } // This is OK since the seqno will never grow again if stop_hub is set. if (stop_hub && res.seqno() + 1 == seqno) { + cout << rid << ": "; cout << "stopping seqno = " << res.seqno() << endl; break; } @@ -557,17 +547,20 @@ } // Start dispatching queries. + st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); + const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno), + ref(accept_joiner)); st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); // Start handling responses. st_thread_group handlers; + int rid = 0; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, ref(recover_signals), true), "handle_responses")); } @@ -578,6 +571,7 @@ st_intr intr(stop_hub); joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); } Join join = readmsg<Join>(joiner); cout << "setting seqno to " << seqno << endl; @@ -589,7 +583,7 @@ cout << "start streaming txns to joiner" << endl; replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, ref(recover_signals), false), "handle_responses")); } @@ -603,7 +597,16 @@ // Initialize database state. map<int, int> map; int seqno = -1; - dump_state ds(map, seqno); + finally f(lambda () { + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + cout << "dumping DB state (seqno = " << __ref(seqno) << ", size = " + << __ref(map).size() << ") to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << __ref(seqno) << endl; + foreach (const pii &p, __ref(map)) { + of << p.first << ": " << p.second << endl; + } + }); st_channel<shared_ptr<Recovery> > send_states; cout << "starting as replica on port " << listen_port << endl; @@ -769,6 +772,9 @@ "yield periodically during catch-up phase of recovery") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") + ("accept-joiner-seqno,j", + po::value<int>(&accept_joiner_seqno)->default_value(0), + "accept recovering joiner (start recovery) after this seqno") ("leader-host,H", po::value<string>(&leader_host)->default_value(string("localhost")), "hostname or address of the leader") Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2008-12-19 23:46:26 UTC (rev 1110) +++ ydb/trunk/tools/test.bash 2008-12-22 22:33:22 UTC (rev 1111) @@ -227,21 +227,24 @@ " } +range2args() { + "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') +} + run-helper() { tagssh $1 "ydb/src/ydb -l" & sleep .1 tagssh $2 "ydb/src/ydb -H $1" & tagssh $3 "ydb/src/ydb -H $1" & - sleep ${wait:-10} + sleep ${wait1:-10} tagssh $4 "ydb/src/ydb -H $1" & - read - kill %1 + if [[ ${wait2:-} ]] + then sleep $wait2 + else read + fi + tagssh $1 "pkill -sigint ydb" } -range2args() { - "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') -} - run() { range2args run-helper } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |