Thread: [Assorted-commits] SF.net SVN: assorted:[1100] ydb/trunk/src/main.lzz.clamp
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-12-11 21:21:55
|
Revision: 1100 http://assorted.svn.sourceforge.net/assorted/?rev=1100&view=rev Author: yangzhang Date: 2008-12-11 21:21:42 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - added --debug-threads - added leader throughput measurements for before, during, and after recovery Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:13 UTC (rev 1099) +++ ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:42 UTC (rev 1100) @@ -76,10 +76,11 @@ * used anywhere. */ st_thread_t -my_spawn(const function0<void> &f, bool intr = false) +my_spawn(const function0<void> &f, string name, bool intr = false) { st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); threads.insert(t); + threadnames[t] = name; return t; } @@ -425,18 +426,41 @@ * Keep swallowing replica responses. */ void -handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) +handle_responses(st_netfd_t replica, const int &seqno, + st_multichannel<long long> &recover_signals, bool caught_up) { - long long start_time = current_time_millis(); + st_channel<long long> &sub = recover_signals.subscribe(); + long long start_time = current_time_millis(), + recovery_start_time = caught_up ? -1 : start_time, + recovery_end_time = -1; + int recovery_start_seqno = caught_up ? -1 : seqno, + recovery_end_seqno = -1; + finally f(lambda () { + long long end_time = current_time_millis(); + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); + }); while (true) { Response res; { st_intr intr(kill_hub); readmsg(replica, res); } + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } if (!caught_up && res.caught_up()) { + long long t = current_time_millis(), timediff = t - start_time; caught_up = true; - long long timediff = current_time_millis() - start_time; + recover_signals.push(t); cout << "recovering node caught up; took " << timediff << "ms" << endl; } @@ -445,8 +469,9 @@ cout << "got response " << res.seqno() << " from " << replica << endl; st_sleep(0); } + // This is OK since the seqno will never grow again if stop_hub is set. if (stop_hub && res.seqno() + 1 == seqno) { - cout << "seqno = " << res.seqno() << endl; + cout << "stopping seqno = " << res.seqno() << endl; break; } } @@ -496,6 +521,7 @@ run_leader(int minreps, uint16_t leader_port) { cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -534,14 +560,16 @@ int seqno = 0; st_channel<replica_info> newreps; const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); - st_thread_t swallower = my_spawn(bind(swallow, f)); + st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); // Start handling responses. st_thread_group handlers; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), + ref(recover_signals), true), + "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -555,12 +583,15 @@ cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); sendmsg(joiner, init); + recover_signals.push(current_time_millis()); // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), + ref(recover_signals), false), + "handle_responses")); } /** @@ -618,9 +649,11 @@ st_channel<shared_ptr<Txn> > backlog; st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), ref(seqno), ref(send_states), - ref(backlog), init.txnseqno()))); + ref(backlog), init.txnseqno()), + "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_states)))); + ref(seqno), ref(send_states)), + "recover_joiner")); // If there's anything to recover. if (init.txnseqno() > 0) { @@ -695,6 +728,17 @@ } } +map<st_thread_t, string> threadnames; + +void cb() +{ + if (threadnames.find(st_thread_self()) != threadnames.end()) { + cout << "switched to: " << threadnames[st_thread_self()] << endl; + } else { + cout << "switched to: " << st_thread_self() << endl; + } +} + /** * Initialization and command-line parsing. */ @@ -705,7 +749,7 @@ try { GOOGLE_PROTOBUF_VERIFY_VERSION; - bool is_leader, use_epoll; + bool is_leader, use_epoll, debug_threads; int minreps; uint16_t leader_port, listen_port; string leader_host; @@ -714,6 +758,8 @@ po::options_description desc("Allowed options"); desc.add_options() ("help,h", "show this help message") + ("debug-threads,d",po::bool_switch(&debug_threads), + "enable context switch debug outputs") ("verbose,v", "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") @@ -766,10 +812,14 @@ if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); st_spawn(bind(handle_sig_sync)); + if (debug_threads) { + st_set_switch_in_cb(cb); + } // Initialize thread manager for clean shutdown of all threads. thread_eraser eraser; threads.insert(st_thread_self()); + threadnames[st_thread_self()] = "main"; // Which role are we? if (is_leader) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-11 19:40:23
|
Revision: 1174 http://assorted.svn.sourceforge.net/assorted/?rev=1174&view=rev Author: yangzhang Date: 2009-02-11 19:40:15 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - added txn batching and --batch-size (still need to do response batching) - cleaned up thread profiling - added bcastmsg_fake to do everything but the actual st_write calls - added and switched to bcastmsg_async to delegate bcasting to separate (bcaster) thread - replaced map with unordered_map; using mii in more places - changed response_handler into an object to reduce the amount of copying overhead in constructing the `finally` functor in the loop - fixed some warnings that were only exposed on optimization (uninitialized values for before_read/before_write) - replaced all i++ with ++i - disabling boost::thread for now - added --suppress-txn-msgs Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-11 19:26:20 UTC (rev 1173) +++ ydb/trunk/src/main.lzz.clamp 2009-02-11 19:40:15 UTC (rev 1174) @@ -7,7 +7,7 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> -#include <boost/thread.hpp> +//#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -24,6 +24,7 @@ #include <set> #include <sys/socket.h> // getpeername #include <sys/types.h> // ssize_t +#include <tr1/unordered_map> #include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" @@ -33,19 +34,28 @@ using namespace commons; using namespace std; using namespace testing; +using namespace tr1; + +#define GETMSG(buf) \ +checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ +if (stop_time != nullptr) \ + *stop_time = current_time_millis(); \ +check(msg.ParseFromArray(buf, len)); #end +#define map_t unordered_map typedef pair<int, int> pii; -typedef map<int, int> mii; +typedef map_t<int, int> mii; // Configuration. st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno; + stop_on_seqno, batch_size; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal; + debug_threads, multirecover, disk, debug_memory, use_wal, + suppress_txn_msgs; long long timelim, read_thresh, write_thresh; // Control. @@ -56,6 +66,15 @@ int updates; /** + * Convenience function for calculating percentages. + */ +double +pct(double sub, double tot) +{ + return 100 * sub / tot; +} + +/** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. */ @@ -223,6 +242,134 @@ }; /** + * XXX + */ +template<typename T> +void +bcastmsg_fake(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + int dstno = 0; + foreach (st_netfd_t dst, dsts) { + size_t resid = sizeof len; +#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) + int res = true ? 0 : st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + if (res == -1 && errno == ETIME) { + checksize(st_write(dst, + reinterpret_cast<char*>(&len) + sizeof len - resid, + resid, + ST_UTIME_NO_TIMEOUT), + resid); + } else { + check0x(res); + } + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + if (false) + checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + ++dstno; + } +} + +st_channel<shared_ptr<string> > msgs; + +const vector<st_netfd_t> *gdsts; + +/** + * XXX + */ +void +bcaster() +{ + int counter = 0; + while (!kill_hub) { + shared_ptr<string> p; + { + st_intr intr(kill_hub); + p = msgs.take(); + } + if (p.get() == nullptr) break; + string &s = *p.get(); + + int dstno = 0; + foreach (st_netfd_t dst, *gdsts) { + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + + checksize(st_write(dst, s.data(), s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write #" << counter + << " of size " << s.size() + << " bytes to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + ++dstno; + } + ++counter; + } +} + +/** + * XXX + */ +template<typename T> +void +bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) +{ + gdsts = &dsts; + + // Serialize message to a buffer. + uint32_t len; + shared_ptr<string> p(new string(sizeof len, '\0')); + string &s = *p.get(); + check(msg.AppendToString(&s)); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + for (size_t i = 0; i < sizeof len; ++i) + s[i] = plen[i]; + + msgs.push(p); +} + + +/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -247,7 +394,7 @@ size_t resid = sizeof len; #define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write; + long long before_write = -1; if (write_thresh > 0) { before_write = current_time_millis(); } @@ -270,7 +417,7 @@ } checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), s.size()); - dstno++; + ++dstno; } } @@ -324,12 +471,6 @@ *start_time = current_time_millis(); len = ntohl(len); -#define GETMSG(buf) \ - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ - if (stop_time != nullptr) \ - *stop_time = current_time_millis(); \ - check(msg.ParseFromArray(buf, len)); - // Parse the message body. if (len < 4096) { char buf[len]; @@ -383,7 +524,7 @@ }; // Globals -map<int, int> g_map; +mii g_map; wal *g_wal; /** @@ -393,10 +534,15 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { +#define bcastmsg bcastmsg_async Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); +#if bcastmsg == bcastmsg_async + st_joining join_bcaster(my_spawn(bcaster, "bcaster")); +#endif + finally f(lambda () { showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); @@ -408,9 +554,9 @@ // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcastmsg(fds, Txn()); + bcastmsg(fds, TxnBatch()); } else { - sendmsg(fds[0], Txn()); + sendmsg(fds[0], TxnBatch()); } } // Bring in any new members. @@ -418,58 +564,77 @@ fds.push_back(newreps.take().fd()); } - // Generate a random transaction. - Txn txn; - txn.set_seqno(seqno); - int count = randint(min_ops, max_ops + 1); - for (int o = 0; o < count; o++) { - Op *op = txn.add_op(); - int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } + // Generate some random transactions. + TxnBatch batch; + for (int t = 0; t < batch_size; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(seqno); + int count = randint(min_ops, max_ops + 1); + for (int o = 0; o < count; ++o) { + Op *op = txn.add_op(); + int rtype = general_txns ? randint(3) : 1, + rkey = randint(), + rvalue = randint(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } - if (do_pause) do_pause.waitreset(); + // Process immediately if not bcasting. + if (fds.empty()) { + --seqno; + process_txn(nullptr, g_map, txn, seqno, true); + } + ++seqno; - // Process, or broadcast and increment seqno. - if (fds.empty()) { - int dummy_seqno = seqno - 1; - process_txn(nullptr, g_map, txn, dummy_seqno, true); - } else { - bcastmsg(fds, txn); - } + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "issued txn " << txn.seqno() << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + st_sleep(0); + } - // Checkpoint. - if (txn.seqno() % chkpt == 0) { - if (verbose) - cout << "issued txn " << txn.seqno() << endl; - if (timelim > 0 && current_time_millis() - start_time > timelim) { - cout << "time's up; issued " << txn.seqno() << " txns in " << timelim - << " ms" << endl; + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + } + + // Are we to accept a new joiner? + if (txn.seqno() == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (txn.seqno() == stop_on_seqno) { + cout << "stopping on issue of seqno " << txn.seqno() << endl; stop_hub.set(); + break; } - st_sleep(0); } - if (issuing_interval > 0) { - st_sleep(issuing_interval); - } - if (txn.seqno() == accept_joiner_seqno) { - accept_joiner.set(); - } + // Broadcast. + if (!fds.empty() && !suppress_txn_msgs) + bcastmsg(fds, batch); - if (txn.seqno() == stop_on_seqno) { - cout << "stopping on issue of seqno " << txn.seqno() << endl; - stop_hub.set(); - } - - ++seqno; + // Pause? + if (do_pause) + do_pause.waitreset(); } - Txn txn; + // This means "The End." + TxnBatch batch; + Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcastmsg(fds, txn); + bcastmsg(fds, batch); +#if bcastmsg == bcastmsg_any + msgs.push(shared_ptr<string>()); +#endif +#undef bcastmsg } /** @@ -477,7 +642,7 @@ * leader. */ void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, +process_txn(st_netfd_t leader, mii &map, const Txn &txn, int &seqno, bool caught_up) { wal &wal = *g_wal; @@ -486,14 +651,14 @@ res.set_seqno(txn.seqno()); res.set_caught_up(caught_up); seqno = txn.seqno(); - for (int o = 0; o < txn.op_size(); o++) { + for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); const int key = op.key(); - ::map<int, int>::iterator it = map.find(key); + mii::iterator it = map.find(key); if (show_updates || count_updates) { if (it != map.end()) { if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) updates++; + if (count_updates) ++updates; } } switch (op.type()) { @@ -581,7 +746,7 @@ * \param[in] wal The WAL. */ void -process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, +process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno, int mypos, int nnodes) @@ -600,8 +765,7 @@ showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; + cout << "live-processing: never entered this phase (never caught up)" << endl; } else { showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); @@ -609,188 +773,229 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - while (true) { - Txn txn; - long long before_read; - if (read_thresh > 0) { - before_read = current_time_millis(); - } - { - st_intr intr(stop_hub); - readmsg(leader, txn); - } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; + class break_exception : public std::exception {}; + + try { + while (true) { + TxnBatch batch; + long long before_read = -1; + if (read_thresh > 0) { + before_read = current_time_millis(); } - } - if (txn.has_seqno()) { - // Regular transaction. - const char *action; - if (txn.seqno() < 0) { - break; - } else if (txn.seqno() == seqno + 1) { - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; + { + st_intr intr(stop_hub); + readmsg(leader, batch); + } + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; } - process_txn(leader, map, txn, seqno, true); - action = "processed"; - } else { - if (first_seqno == -1) - first_seqno = txn.seqno(); - // Queue up for later processing once a snapshot has been received. - backlog.push(shared_ptr<Txn>(new Txn(txn))); - action = "backlogged"; } + if (batch.txn_size() > 0) { + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + // Regular transaction. + const char *action; + if (txn.seqno() < 0) { + throw break_exception(); + } else if (txn.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + process_txn(leader, map, txn, seqno, true); + action = "processed"; + } else { + if (first_seqno == -1) + first_seqno = txn.seqno(); + // Queue up for later processing once a snapshot has been received. + backlog.push(shared_ptr<Txn>(new Txn(txn))); + action = "backlogged"; + } - if (txn.seqno() % chkpt == 0) { - if (verbose) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + if (txn.seqno() % chkpt == 0) { + if (verbose) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + st_sleep(0); + } } - st_sleep(0); + } else { + // Empty (default) Txn means "generate a snapshot." + // TODO make this faster + shared_ptr<Recovery> recovery(new Recovery); + typedef ::map<int, int> mii_; + mii_ map_(map.begin(), map.end()); + mii_::const_iterator begin = + map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); + mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? + map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); + cout << "generating recovery over " << begin->first << ".." + << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); + if (multirecover) + cout << " (node " << mypos << " of " << nnodes << ")"; + cout << endl; + long long start_snap = current_time_millis(); + foreach (const pii &p, make_iterator_range(begin, end)) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + cout << "generating recovery took " + << current_time_millis() - start_snap << " ms" << endl; + recovery->set_seqno(seqno); + send_states.push(recovery); } - } else { - // Empty (default) Txn means "generate a snapshot." - shared_ptr<Recovery> recovery(new Recovery); - mii::const_iterator begin = - map.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); - mii::const_iterator end = multirecover && mypos < nnodes - 1 ? - map.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map.end(); - cout << "generating recovery over " << begin->first << ".." - << (end == map.end() ? "end" : lexical_cast<string>(end->first)); - if (multirecover) - cout << " (node " << mypos << " of " << nnodes << ")"; - cout << endl; - long long start_snap = current_time_millis(); - foreach (const pii &p, make_iterator_range(begin, end)) { - Recovery_Pair *pair = recovery->add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - cout << "generating recovery took " - << current_time_millis() - start_snap << " ms" << endl; - recovery->set_seqno(seqno); - send_states.push(recovery); } + } catch (break_exception &ex) { } } -/** - * Swallow replica responses. - */ -void -handle_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) +class response_handler { - st_channel<long long> &sub = recover_signals.subscribe(); - long long start_time = current_time_millis(), - recovery_start_time = caught_up ? -1 : start_time, - recovery_end_time = -1; - int recovery_start_seqno = caught_up ? -1 : seqno, - recovery_end_seqno = -1; - int last_seqno = -1; - finally f(lambda () { - long long end_time = current_time_millis(); - if (__ref(recovery_end_time) > -1) { - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); - } - }); - while (true) { +public: + response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + //start_time = current_time_millis(); + //recovery_start_time = caught_up ? -1 : start_time; + //recovery_end_time = -1; + //recovery_start_seqno = caught_up ? -1 : seqno; + //recovery_end_seqno = -1; + //last_seqno = -1; + finally f(lambda () { - // TODO: convert the whole thing to an object so that we can have "scoped - // globals". - long long &recovery_start_time = __ref(recovery_start_time); - int &recovery_start_seqno = __ref(recovery_start_seqno); - long long &recovery_end_time = __ref(recovery_end_time); - int &recovery_end_seqno = __ref(recovery_end_seqno); - long long &start_time = __ref(start_time); - const int &seqno = __ref(seqno); - int &rid = __ref(rid); - st_channel<long long> &sub = __ref(sub); - // The first timestamp that comes down the subscription pipeline is the - // recovery start time, issued by the main thread. The second one is the - // recovery end time, issued by the response handler associated with the - // joiner. - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = seqno; - cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); + long long end_time = current_time_millis(); + if (__ref(recovery_end_time) > -1) { + cout << __ref(rid) << ": "; + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); } }); - Response res; - // Read the message, but correctly respond to interrupts so that we can - // cleanly exit (slightly tricky). - if (last_seqno + 1 == seqno) { - // Stop-interruptible in case we're already caught up. - try { - st_intr intr(stop_hub); + + while (true) { + finally f(boost::bind(&response_handler::loop_cleanup, this)); + + Response res; + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + readmsg(replica, res); + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). + st_intr intr(kill_hub); readmsg(replica, res); - } catch (...) { // TODO: only catch interruptions - // This check on seqnos is OK for termination since the seqno will - // never grow again if stop_hub is set. - if (last_seqno + 1 == seqno) { + } + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + if (!caught_up && res.caught_up()) { + long long now = current_time_millis(), timediff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": "; + cout << "recovering node caught up; took " + << timediff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + if (res.seqno() % chkpt == 0) { + if (verbose) { cout << rid << ": "; - cout << "clean stop; next expected seqno is " << seqno - << " (last seqno was " << last_seqno << ")" << endl; - break; - } else { - continue; + cout << "got response " << res.seqno() << " from " << replica << endl; } + st_sleep(0); } - } else { - // Only kill-interruptible because we want a clean termination (want - // to get all the acks back). - st_intr intr(kill_hub); - readmsg(replica, res); + last_seqno = res.seqno(); } - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - if (!caught_up && res.caught_up()) { - long long now = current_time_millis(), timediff = now - start_time; - caught_up = true; - recover_signals.push(now); + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; cout << rid << ": "; - cout << "recovering node caught up; took " - << timediff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + cout << rid << ": "; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); } - if (res.seqno() % chkpt == 0) { - if (verbose) { - cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; - } - st_sleep(0); - } - last_seqno = res.seqno(); } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +/** + * Swallow replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); } /** @@ -807,7 +1012,7 @@ * from process_txns. */ void -recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, +recover_joiner(st_netfd_t listener, const mii &map, const int &seqno, st_channel<shared_ptr<Recovery> > &send_states) { st_netfd_t joiner; @@ -858,7 +1063,7 @@ vector<replica_info> replicas; st_closing_all_infos close_replicas(replicas); cout << "waiting for at least " << minreps << " replicas to join" << endl; - for (int i = 0; i < minreps; i++) { + for (int i = 0; i < minreps; ++i) { st_netfd_t fd; { st_intr intr(stop_hub); @@ -943,13 +1148,13 @@ { if (disk) { // Disk IO threads. - for (int i = 0; i < 5; i++) { - thread somethread(threadfunc); + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); } } // Initialize database state. - map<int, int> map; + mii map; int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; @@ -994,7 +1199,7 @@ vector<st_netfd_t> replicas; st_closing_all close_replicas(replicas); int mypos = -1; - for (int i = 0; i < init.node_size(); i++) { + for (int i = 0; i < init.node_size(); ++i) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; in_addr host = { sa.host() }; @@ -1030,7 +1235,7 @@ vector<st_thread_t> recovery_builders; assert(seqno == -1); - for (int i = 0; i < (multirecover ? init.node_size() : 1); i++) { + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message. Recovery recovery; @@ -1046,7 +1251,7 @@ << build_start - __ref(before_recv) << " ms: xfer took " << receive_end - receive_start << " ms, deserialization took " << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); i++) { + for (int i = 0; i < recovery.pair_size(); ++i) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); if (i % chkpt == 0) { @@ -1183,6 +1388,8 @@ ("dump,D", po::bool_switch(&dump), "replicas should finally dump their state to a tmp file for " "inspection/diffing") + ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), + "suppress txn msgs") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") ("count-updates,u",po::bool_switch(&count_updates), @@ -1195,7 +1402,9 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader only)") - ("exit-on-seqno,X", po::value<int>(&stop_on_seqno)->default_value(-1), + ("batch-size,b", po::value<int>(&batch_size)->default_value(10), + "number of txns to batch up in each msg (for leader only)") + ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader only)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), @@ -1302,16 +1511,15 @@ foreach (entry p, threadtimes) { total += p.second; } - cout << "total " << total << " all " << all << endl; foreach (entry p, threadtimes) { cout << "- " << threadname(p.first) << ": " << p.second << " ms (" - << (static_cast<double>(p.second) / total) << "% of total, " - << (static_cast<double>(p.second) / all) << "% of all)" << endl; + << pct(p.second, total) << "% of total, " + << pct(p.second, all) << "% of all)" << endl; } - cout << "- total: " << total << " ms (" << double(total) / all + cout << "- total: " << total << " ms (" << pct(total, all) << "% of all)" << endl; cout << "- unaccounted: " << all - total << " ms (" - << double(all - total) / all << "% of all)" << endl; + << pct(all - total, all) << "% of all)" << endl; cout << "- all: " << all << " ms" << endl; } }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-05 23:16:57
|
Revision: 1255 http://assorted.svn.sourceforge.net/assorted/?rev=1255&view=rev Author: yangzhang Date: 2009-03-05 23:16:46 +0000 (Thu, 05 Mar 2009) Log Message: ----------- added fast rb backlogging/catch-up Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-05 23:15:25 UTC (rev 1254) +++ ydb/trunk/src/main.lzz.clamp 2009-03-05 23:16:46 UTC (rev 1255) @@ -7,6 +7,7 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> +#include <boost/tuple/tuple.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -27,13 +28,15 @@ #include <tr1/unordered_map> #include <unistd.h> // pipe, write #include <vector> +#include "ser.h" #include "ydb.pb.h" -#include "ser.h" #define function boost::function #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref +#define tuple boost::tuple +#define make_tuple boost::make_tuple using namespace boost; using namespace boost::archive; @@ -53,6 +56,8 @@ typedef pair<int, int> pii; typedef map_t<int, int> mii; +typedef tuple<sized_array<char>, char*, char*> chunk; + template<typename T> void init_map(T &map) {} template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); @@ -69,7 +74,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, + use_pb, use_pb_res, g_caught_up, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -862,7 +867,8 @@ void process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, - st_channel<shared_ptr<pb::Txn> > &backlog, int init_seqno, + /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ + st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { typedef typename Types::TxnBatch TxnBatch; @@ -880,7 +886,8 @@ // issued more since the Init message). int first_seqno = -1; - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + commons::array<char> wbuf(buf_size); st_reader reader(leader, rbuf.get(), rbuf.size()); vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { @@ -910,85 +917,125 @@ scoped_ptr<ResponseBatch> presbatch(new_ResponseBatch<ResponseBatch>(s)); ResponseBatch &resbatch = *presbatch; ser_t serbuf; + char *first_start = reader.start(); + assert(first_start == rbuf.get()); + const size_t headerlen = sizeof(uint32_t) + sizeof(short) + sizeof(int); while (true) { uint32_t prefix = 0; - long long before_read = -1; - if (read_thresh > 0) { - before_read = current_time_millis(); + char *start = reader.start(); + + // Will overflow on next few reads ("header")? + if (reader.unread() + reader.rem() < headerlen) { + sized_array<char> buf(new char[read_buf_size], read_buf_size); + memcpy(buf.get(), reader.start(), reader.unread()); + swap(buf, reader.buf()); + reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); + backlog.push(make_tuple(buf, first_start, start)); + first_start = reader.start(); } - { + + if (Types::is_pb()) { + long long before_read = -1; + if (read_thresh > 0) { + before_read = current_time_millis(); + } + { + st_intr intr(stop_hub); + readmsg(reader, batch); + } + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; + } + } + } else { st_intr intr(stop_hub); - if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + prefix = reader.read<uint32_t>(); + check(prefix < 10000); + batch.Clear(); } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; - } - } + if (batch.txn_size() > 0) { - w.mark(); - resbatch.Clear(); - start_res(resbatch); - // XXX - //char *start = reader.start(); - //const Txn &first_txn = batch.txn(0); - //if (txn.seqno() < 0) { - //} else if (txn.seqno() == seqno + 1) { - //} else { - // // Skip entire message. - // reader. - //} - for (int t = 0; t < batch.txn_size(); ++t) { - // XXX const Txn &txn = t == 0 ? first_txn : batch.txn(t); - const Txn &txn = batch.txn(t); - // Regular transaction. - const char *action; - if (txn.seqno() < 0) { - throw break_exception(); - } else if (txn.seqno() == seqno + 1) { - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; - } + const Txn &first_txn = batch.txn(0); + if (first_txn.seqno() < 0) { + break; + } else if (first_txn.seqno() > seqno + 1) { + // In backlogging mode? + + // Skip entire message, pushing it to the thread that's handling + // recovery for later processing once snapshot is received. + // TODO: implement and use anchors instead? + if (first_seqno == -1) + cout << "first seqno: " << (first_seqno = first_txn.seqno()) << endl; + + // Caught up? + if (first_seqno == seqno + 1) { + // Rewind so we process accumulated messages. + reader.reset_range(first_start, reader.end()); + continue; + } + + // About to overflow? + if (reader.unread() + reader.rem() < prefix + sizeof(uint32_t) - headerlen) { + // Move current partial message to new buffer. + sized_array<char> tmp(new char[read_buf_size], read_buf_size); + *reinterpret_cast<uint32_t*>(tmp.get()) = prefix; + *reinterpret_cast<short*>(tmp.get() + sizeof(uint32_t)) = short(batch.txn_size()); + *reinterpret_cast<int*>(tmp.get() + sizeof(uint32_t) + sizeof(short)) = first_txn.seqno(); + memcpy(tmp.get() + headerlen, reader.start(), reader.unread()); + + // Swap the buffers. + swap(tmp, reader.buf()); + reader.reset_range(reader.buf().get() + headerlen, reader.buf().get() + headerlen + reader.unread()); + assert(tmp.get() <= first_start && first_start < tmp.end()); + assert(tmp.get() < start && start < tmp.end()); + assert(first_start < start); + backlog.push(make_tuple(tmp, first_start, start)); + first_start = reader.buf().get(); + first_seqno = first_txn.seqno(); + } + + // Fill up rest of the message + assert(reader.unread() + reader.rem() >= prefix + sizeof(uint32_t) - headerlen); + check0x(reader.accum(prefix + sizeof(uint32_t) - headerlen)); + } else { + // Regular transaction batch. + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + w.mark(); + resbatch.Clear(); + start_res(resbatch); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = t == 0 ? first_txn : batch.txn(t); Response *res = resbatch.add_res(); process_txn<Types, RTypes>(map, txn, seqno, res); if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); } - action = "processed"; - } else { - if (first_seqno == -1) - first_seqno = txn.seqno(); - // Queue up entire buffer for later processing once a snapshot has - // been received. - // XXX backlog.push(array()); - // Stop the loop. - // XXX t = batch.txn_size(); - backlog.push(to_pb_Txn(txn)); - action = "backlogged"; - } - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "processed txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } } + fin_res(resbatch); + if (RTypes::is_pb() && resbatch.res_size() > 0) { + serbuf.clear(); + ser(serbuf, resbatch); + sendbuf(leader, serbuf); + } } - fin_res(resbatch); - if (RTypes::is_pb() && resbatch.res_size() > 0) { - serbuf.clear(); - ser(serbuf, resbatch); - sendbuf(leader, serbuf); - } } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster @@ -1439,7 +1486,8 @@ } // Process txns. - st_channel<shared_ptr<pb::Txn> > backlog; + // XXX st_channel<shared_ptr<pb::Txn> > backlog; + st_channel<chunk> backlog; const function<void()> process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, @@ -1497,7 +1545,51 @@ long long mid_time = current_time_millis(); int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw operation_not_supported("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + assert(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + assert(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + assert(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = reader.read<uint32_t>(); + assert(prefix < 10000); + assert(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + process_txn<rb_types, rb_types>(map, txn, seqno, nullptr); + if (fake_exec && !Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + } + assert(start + sizeof(uint32_t) + prefix == reader.start()); + } + } + g_caught_up = true; +#if 0 + while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); @@ -1511,6 +1603,7 @@ st_sleep(0); } } +#endif showtput("replayer caught up; from backlog replayed", current_time_millis(), mid_time, seqno, mid_seqno); } @@ -1786,6 +1879,8 @@ // Initialize the map. init_map(g_map); + cout << "pid " << getpid() << endl; + // Which role are we? if (is_leader) { if (use_pb) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 02:13:38
|
Revision: 1282 http://assorted.svn.sourceforge.net/assorted/?rev=1282&view=rev Author: yangzhang Date: 2009-03-12 02:13:21 +0000 (Thu, 12 Mar 2009) Log Message: ----------- - added pwal recovery (--rec-pwal) - added show_sockaddr for more useful debug outputs in timed writes - changed behavior of logging; by default does not clobber the log file, but writes instead to /dev/null - using timed writes in process_txns - added showdatarate - fixed recovery_start_seqno/recovery_end_seqno setting in response_handler Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-12 02:08:04 UTC (rev 1281) +++ ydb/trunk/src/main.lzz.clamp 2009-03-12 02:13:21 UTC (rev 1282) @@ -86,7 +86,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, + use_pb, use_pb_res, g_caught_up, rec_pwal, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -219,12 +219,32 @@ return t; } +char * +show_sockaddr(st_netfd_t fd) +{ + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd), + reinterpret_cast<sockaddr*>(&sa), + &salen)); + return inet_ntoa(sa.sin_addr); +} + +map<st_netfd_t, string> nfdnames; + +inline const string& +nfd2name(st_netfd_t fd) +{ + return nfdnames[fd]; +} + /** * Used by the leader to bookkeep information about replicas. */ class replica_info { public: + /** port is the replica's listen port, not the port bound to the fd socket. */ replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} st_netfd_t fd() const { return fd_; } /** The port on which the replica is listening. */ @@ -404,7 +424,8 @@ long long write_time = current_time_millis() - before_write; if (write_time > write_thresh) { cout << "thread " << threadname() << " write of " << len - << " bytes took " << write_time << " ms" << endl; + << " bytes to dst " << show_sockaddr(dst) << " blocked for " + << write_time << " ms" << endl; } } } @@ -536,13 +557,15 @@ check(msg.ParseFromArray(src.read(len), len)); } +enum { op_del, op_write, op_commit }; + /** * ARIES write-ahead log. No undo logging necessary (no steal). */ class wal { public: - wal() : of("wal"), out(of) {} + wal(const string &fname) : of(fname.c_str()), out(of) {} template <typename T> void log(const T &msg) { ser(of, msg); } void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } @@ -562,7 +585,6 @@ out & op; } private: - enum { op_del, op_write, op_commit }; ofstream of; binary_oarchive out; }; @@ -600,8 +622,7 @@ else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); + st_timed_write(dst, buf, len); }; char *real_wbuf = newreps.empty() ? rbuf.get() : wbuf.get(); @@ -622,7 +643,8 @@ // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). - if (!newreps.empty() && seqno > 0) { + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { start_txn(batch); fin_txn(batch); // TODO: verify that this made the catch-up stream more efficient, @@ -796,7 +818,21 @@ } void -showtput(const string &action, long long stop_time, long long start_time, +showdatarate(const char *action, streamoff len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showdatarate(const char *action, size_t len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showtput(const char *action, long long stop_time, long long start_time, int stop_count, int start_count) { long long time_diff = stop_time - start_time; @@ -918,7 +954,7 @@ char *start = reader.start(); // Will overflow on next few reads ("header")? - if (reader.unread() + reader.rem() < headerlen) { + if (!caught_up && reader.unread() + reader.rem() < headerlen) { sized_array<char> buf(new char[read_buf_size], read_buf_size); memcpy(buf.get(), reader.start(), reader.unread()); swap(buf, reader.buf()); @@ -1108,11 +1144,14 @@ << hdr.count << " slots (" << bodylen << " bytes); range is [" << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; + long long start_time = current_time_millis(); commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); raw_writer ser(recovery.begin()); ser.write(recovery.size()); ser.write(hdr); memcpy(ser.ptr(), src.begin() + begin, bodylen); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); return recovery; } @@ -1206,12 +1245,12 @@ cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; } if (!caught_up && rcaught_up) { - long long now = current_time_millis(), timediff = now - start_time; + long long now = current_time_millis(), time_diff = now - start_time; caught_up = true; recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " - << timediff << " ms" << endl; + << time_diff << " ms" << endl; // This will cause the program to exit eventually, but cleanly, such that // the recovery time will be set first, before the eventual exit (which // may not even happen in the current iteration). @@ -1244,13 +1283,13 @@ // joiner. if (recovery_start_time == -1 && !sub.empty()) { recovery_start_time = sub.take(); - recovery_start_seqno = seqno; + recovery_start_seqno = last_seqno; cout << rid << ": "; showtput("before recovery, finished", recovery_start_time, start_time, recovery_start_seqno, 0); } else if (recovery_end_time == -1 && !sub.empty()) { recovery_end_time = sub.take(); - recovery_end_seqno = seqno; + recovery_end_seqno = last_seqno; cout << rid << ": "; showtput("during recovery, finished roughly", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); @@ -1332,7 +1371,7 @@ #if 0 sendmsg(joiner, *recovery); #endif - cout << "sent recovery in " << diff << " ms" << endl; + showdatarate("sent recovery", recovery.size(), diff); } void @@ -1354,8 +1393,8 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; - scoped_ptr<wal> pwal(new wal); - g_wal = pwal.get(); + scoped_ptr<wal> twal(new wal(use_twal ? "twal" : "/dev/null")); + g_wal = twal.get(); // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -1539,8 +1578,11 @@ } } + // Initialize physical log. + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + // Process txns. - // XXX st_channel<shared_ptr<pb::Txn> > backlog; st_channel<chunk> backlog; const function<void()> process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), @@ -1554,103 +1596,140 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - assert(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + vector<st_thread_t> recovery_builders; + assert(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - } - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); - long long tm = current_time_millis(); - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - cout << "got recovery message of " << len << " bytes (" - << hdr.size << " records in " << hdr.count << " slots) in " - << tm - __ref(before_recv) << " ms; now at seqno " - << hdr.seqno << endl; + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } + long long mid_time = current_time_millis(); - int mid_seqno = seqno; // XXX using msg::TxnBatch; @@ -1678,6 +1757,7 @@ batch.Clear(); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); @@ -1812,6 +1892,8 @@ "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), "recover from multiple hosts, instead of just one (specified via leader)") + ("rec-pwal", po::bool_switch(&rec_pwal), + "recover from pwal") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") ("dump,D", po::bool_switch(&dump), @@ -2026,9 +2108,3 @@ return 1; } } - -/* - * Compile-time options: - * - * - map, unordered_map, dense_hash_map - */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-13 19:27:05
|
Revision: 1293 http://assorted.svn.sourceforge.net/assorted/?rev=1293&view=rev Author: yangzhang Date: 2009-03-13 19:26:56 +0000 (Fri, 13 Mar 2009) Log Message: ----------- integrated tpcc into ydb Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:26 UTC (rev 1292) +++ ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:56 UTC (rev 1293) @@ -1,4 +1,5 @@ #hdr +#define __STDC_FORMAT_MACROS #include <boost/archive/binary_iarchive.hpp> #include <boost/archive/binary_oarchive.hpp> #include <boost/bind.hpp> @@ -19,10 +20,11 @@ #include <csignal> // sigaction etc. #include <cstdio> #include <cstring> // strsignal -#include <iostream> #include <fstream> // ofstream #include <google/dense_hash_map> #include <gtest/gtest.h> +#include <inttypes.h> // PRId64 +#include <iostream> #include <malloc.h> #include <map> #include <netinet/in.h> // in_addr etc. @@ -33,6 +35,11 @@ #include <unistd.h> // pipe, write #include <vector> #include "ser.h" +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" #include "ydb.pb.h" #define function boost::function @@ -80,7 +87,7 @@ st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, stop_on_seqno, batch_size, handle_responses_display, - catch_up_display, issue_display, + catch_up_display, issue_display, nwarehouses, process_display; size_t accept_joiner_size, buf_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, @@ -1876,6 +1883,51 @@ } } +void +run_tpcc() +{ + TPCCTables* tables = new TPCCTables(); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + printf("Loading %d warehouses... ", nwarehouses); + fflush(stdout); + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + int64_t begin = clock->getMicroseconds(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + int64_t end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); + + // Change the constants for run + random = new RealRandomGenerator(); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + TPCCClient client(clock, random, tables, Item::NUM_ITEMS, static_cast<int>(nwarehouses), + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + printf("Running... "); + fflush(stdout); + begin = clock->getMicroseconds(); + for (int i = 0; i < 200000; ++i) { + client.doOne(); + } + end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); +} + /** * Initialization and command-line parsing. */ @@ -1971,6 +2023,9 @@ ("max-ops,O", po::value<int>(&max_ops)->default_value(5), "upper bound on randomly generated number of operations per txn (for leader)") + ("warehouses,w", + po::value<int>(&nwarehouses)->default_value(1), + "number of warehouses to model") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |