[Assorted-commits] SF.net SVN: assorted:[1100] ydb/trunk/src/main.lzz.clamp
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-12-11 21:21:55
|
Revision: 1100 http://assorted.svn.sourceforge.net/assorted/?rev=1100&view=rev Author: yangzhang Date: 2008-12-11 21:21:42 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - added --debug-threads - added leader throughput measurements for before, during, and after recovery Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:13 UTC (rev 1099) +++ ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:42 UTC (rev 1100) @@ -76,10 +76,11 @@ * used anywhere. */ st_thread_t -my_spawn(const function0<void> &f, bool intr = false) +my_spawn(const function0<void> &f, string name, bool intr = false) { st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); threads.insert(t); + threadnames[t] = name; return t; } @@ -425,18 +426,41 @@ * Keep swallowing replica responses. */ void -handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) +handle_responses(st_netfd_t replica, const int &seqno, + st_multichannel<long long> &recover_signals, bool caught_up) { - long long start_time = current_time_millis(); + st_channel<long long> &sub = recover_signals.subscribe(); + long long start_time = current_time_millis(), + recovery_start_time = caught_up ? -1 : start_time, + recovery_end_time = -1; + int recovery_start_seqno = caught_up ? -1 : seqno, + recovery_end_seqno = -1; + finally f(lambda () { + long long end_time = current_time_millis(); + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); + }); while (true) { Response res; { st_intr intr(kill_hub); readmsg(replica, res); } + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } if (!caught_up && res.caught_up()) { + long long t = current_time_millis(), timediff = t - start_time; caught_up = true; - long long timediff = current_time_millis() - start_time; + recover_signals.push(t); cout << "recovering node caught up; took " << timediff << "ms" << endl; } @@ -445,8 +469,9 @@ cout << "got response " << res.seqno() << " from " << replica << endl; st_sleep(0); } + // This is OK since the seqno will never grow again if stop_hub is set. if (stop_hub && res.seqno() + 1 == seqno) { - cout << "seqno = " << res.seqno() << endl; + cout << "stopping seqno = " << res.seqno() << endl; break; } } @@ -496,6 +521,7 @@ run_leader(int minreps, uint16_t leader_port) { cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -534,14 +560,16 @@ int seqno = 0; st_channel<replica_info> newreps; const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); - st_thread_t swallower = my_spawn(bind(swallow, f)); + st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); // Start handling responses. st_thread_group handlers; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), + ref(recover_signals), true), + "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -555,12 +583,15 @@ cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); sendmsg(joiner, init); + recover_signals.push(current_time_millis()); // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), + ref(recover_signals), false), + "handle_responses")); } /** @@ -618,9 +649,11 @@ st_channel<shared_ptr<Txn> > backlog; st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), ref(seqno), ref(send_states), - ref(backlog), init.txnseqno()))); + ref(backlog), init.txnseqno()), + "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_states)))); + ref(seqno), ref(send_states)), + "recover_joiner")); // If there's anything to recover. if (init.txnseqno() > 0) { @@ -695,6 +728,17 @@ } } +map<st_thread_t, string> threadnames; + +void cb() +{ + if (threadnames.find(st_thread_self()) != threadnames.end()) { + cout << "switched to: " << threadnames[st_thread_self()] << endl; + } else { + cout << "switched to: " << st_thread_self() << endl; + } +} + /** * Initialization and command-line parsing. */ @@ -705,7 +749,7 @@ try { GOOGLE_PROTOBUF_VERIFY_VERSION; - bool is_leader, use_epoll; + bool is_leader, use_epoll, debug_threads; int minreps; uint16_t leader_port, listen_port; string leader_host; @@ -714,6 +758,8 @@ po::options_description desc("Allowed options"); desc.add_options() ("help,h", "show this help message") + ("debug-threads,d",po::bool_switch(&debug_threads), + "enable context switch debug outputs") ("verbose,v", "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") @@ -766,10 +812,14 @@ if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); st_spawn(bind(handle_sig_sync)); + if (debug_threads) { + st_set_switch_in_cb(cb); + } // Initialize thread manager for clean shutdown of all threads. thread_eraser eraser; threads.insert(st_thread_self()); + threadnames[st_thread_self()] = "main"; // Which role are we? if (is_leader) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |