[Assorted-commits] SF.net SVN: assorted:[1174] ydb/trunk/src/main.lzz.clamp
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-11 19:40:23
|
Revision: 1174 http://assorted.svn.sourceforge.net/assorted/?rev=1174&view=rev Author: yangzhang Date: 2009-02-11 19:40:15 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - added txn batching and --batch-size (still need to do response batching) - cleaned up thread profiling - added bcastmsg_fake to do everything but the actual st_write calls - added and switched to bcastmsg_async to delegate bcasting to separate (bcaster) thread - replaced map with unordered_map; using mii in more places - changed response_handler into an object to reduce the amount of copying overhead in constructing the `finally` functor in the loop - fixed some warnings that were only exposed on optimization (uninitialized values for before_read/before_write) - replaced all i++ with ++i - disabling boost::thread for now - added --suppress-txn-msgs Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-11 19:26:20 UTC (rev 1173) +++ ydb/trunk/src/main.lzz.clamp 2009-02-11 19:40:15 UTC (rev 1174) @@ -7,7 +7,7 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> -#include <boost/thread.hpp> +//#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -24,6 +24,7 @@ #include <set> #include <sys/socket.h> // getpeername #include <sys/types.h> // ssize_t +#include <tr1/unordered_map> #include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" @@ -33,19 +34,28 @@ using namespace commons; using namespace std; using namespace testing; +using namespace tr1; + +#define GETMSG(buf) \ +checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ +if (stop_time != nullptr) \ + *stop_time = current_time_millis(); \ +check(msg.ParseFromArray(buf, len)); #end +#define map_t unordered_map typedef pair<int, int> pii; -typedef map<int, int> mii; +typedef map_t<int, int> mii; // Configuration. st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno; + stop_on_seqno, batch_size; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal; + debug_threads, multirecover, disk, debug_memory, use_wal, + suppress_txn_msgs; long long timelim, read_thresh, write_thresh; // Control. @@ -56,6 +66,15 @@ int updates; /** + * Convenience function for calculating percentages. + */ +double +pct(double sub, double tot) +{ + return 100 * sub / tot; +} + +/** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. */ @@ -223,6 +242,134 @@ }; /** + * XXX + */ +template<typename T> +void +bcastmsg_fake(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + int dstno = 0; + foreach (st_netfd_t dst, dsts) { + size_t resid = sizeof len; +#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) + int res = true ? 0 : st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + if (res == -1 && errno == ETIME) { + checksize(st_write(dst, + reinterpret_cast<char*>(&len) + sizeof len - resid, + resid, + ST_UTIME_NO_TIMEOUT), + resid); + } else { + check0x(res); + } + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + if (false) + checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + ++dstno; + } +} + +st_channel<shared_ptr<string> > msgs; + +const vector<st_netfd_t> *gdsts; + +/** + * XXX + */ +void +bcaster() +{ + int counter = 0; + while (!kill_hub) { + shared_ptr<string> p; + { + st_intr intr(kill_hub); + p = msgs.take(); + } + if (p.get() == nullptr) break; + string &s = *p.get(); + + int dstno = 0; + foreach (st_netfd_t dst, *gdsts) { + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + + checksize(st_write(dst, s.data(), s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write #" << counter + << " of size " << s.size() + << " bytes to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + ++dstno; + } + ++counter; + } +} + +/** + * XXX + */ +template<typename T> +void +bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) +{ + gdsts = &dsts; + + // Serialize message to a buffer. + uint32_t len; + shared_ptr<string> p(new string(sizeof len, '\0')); + string &s = *p.get(); + check(msg.AppendToString(&s)); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + for (size_t i = 0; i < sizeof len; ++i) + s[i] = plen[i]; + + msgs.push(p); +} + + +/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -247,7 +394,7 @@ size_t resid = sizeof len; #define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write; + long long before_write = -1; if (write_thresh > 0) { before_write = current_time_millis(); } @@ -270,7 +417,7 @@ } checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), s.size()); - dstno++; + ++dstno; } } @@ -324,12 +471,6 @@ *start_time = current_time_millis(); len = ntohl(len); -#define GETMSG(buf) \ - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ - if (stop_time != nullptr) \ - *stop_time = current_time_millis(); \ - check(msg.ParseFromArray(buf, len)); - // Parse the message body. if (len < 4096) { char buf[len]; @@ -383,7 +524,7 @@ }; // Globals -map<int, int> g_map; +mii g_map; wal *g_wal; /** @@ -393,10 +534,15 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { +#define bcastmsg bcastmsg_async Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); +#if bcastmsg == bcastmsg_async + st_joining join_bcaster(my_spawn(bcaster, "bcaster")); +#endif + finally f(lambda () { showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); @@ -408,9 +554,9 @@ // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcastmsg(fds, Txn()); + bcastmsg(fds, TxnBatch()); } else { - sendmsg(fds[0], Txn()); + sendmsg(fds[0], TxnBatch()); } } // Bring in any new members. @@ -418,58 +564,77 @@ fds.push_back(newreps.take().fd()); } - // Generate a random transaction. - Txn txn; - txn.set_seqno(seqno); - int count = randint(min_ops, max_ops + 1); - for (int o = 0; o < count; o++) { - Op *op = txn.add_op(); - int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } + // Generate some random transactions. + TxnBatch batch; + for (int t = 0; t < batch_size; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(seqno); + int count = randint(min_ops, max_ops + 1); + for (int o = 0; o < count; ++o) { + Op *op = txn.add_op(); + int rtype = general_txns ? randint(3) : 1, + rkey = randint(), + rvalue = randint(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } - if (do_pause) do_pause.waitreset(); + // Process immediately if not bcasting. + if (fds.empty()) { + --seqno; + process_txn(nullptr, g_map, txn, seqno, true); + } + ++seqno; - // Process, or broadcast and increment seqno. - if (fds.empty()) { - int dummy_seqno = seqno - 1; - process_txn(nullptr, g_map, txn, dummy_seqno, true); - } else { - bcastmsg(fds, txn); - } + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "issued txn " << txn.seqno() << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + st_sleep(0); + } - // Checkpoint. - if (txn.seqno() % chkpt == 0) { - if (verbose) - cout << "issued txn " << txn.seqno() << endl; - if (timelim > 0 && current_time_millis() - start_time > timelim) { - cout << "time's up; issued " << txn.seqno() << " txns in " << timelim - << " ms" << endl; + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + } + + // Are we to accept a new joiner? + if (txn.seqno() == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (txn.seqno() == stop_on_seqno) { + cout << "stopping on issue of seqno " << txn.seqno() << endl; stop_hub.set(); + break; } - st_sleep(0); } - if (issuing_interval > 0) { - st_sleep(issuing_interval); - } - if (txn.seqno() == accept_joiner_seqno) { - accept_joiner.set(); - } + // Broadcast. + if (!fds.empty() && !suppress_txn_msgs) + bcastmsg(fds, batch); - if (txn.seqno() == stop_on_seqno) { - cout << "stopping on issue of seqno " << txn.seqno() << endl; - stop_hub.set(); - } - - ++seqno; + // Pause? + if (do_pause) + do_pause.waitreset(); } - Txn txn; + // This means "The End." + TxnBatch batch; + Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcastmsg(fds, txn); + bcastmsg(fds, batch); +#if bcastmsg == bcastmsg_any + msgs.push(shared_ptr<string>()); +#endif +#undef bcastmsg } /** @@ -477,7 +642,7 @@ * leader. */ void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, +process_txn(st_netfd_t leader, mii &map, const Txn &txn, int &seqno, bool caught_up) { wal &wal = *g_wal; @@ -486,14 +651,14 @@ res.set_seqno(txn.seqno()); res.set_caught_up(caught_up); seqno = txn.seqno(); - for (int o = 0; o < txn.op_size(); o++) { + for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); const int key = op.key(); - ::map<int, int>::iterator it = map.find(key); + mii::iterator it = map.find(key); if (show_updates || count_updates) { if (it != map.end()) { if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) updates++; + if (count_updates) ++updates; } } switch (op.type()) { @@ -581,7 +746,7 @@ * \param[in] wal The WAL. */ void -process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, +process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno, int mypos, int nnodes) @@ -600,8 +765,7 @@ showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; + cout << "live-processing: never entered this phase (never caught up)" << endl; } else { showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); @@ -609,188 +773,229 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - while (true) { - Txn txn; - long long before_read; - if (read_thresh > 0) { - before_read = current_time_millis(); - } - { - st_intr intr(stop_hub); - readmsg(leader, txn); - } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; + class break_exception : public std::exception {}; + + try { + while (true) { + TxnBatch batch; + long long before_read = -1; + if (read_thresh > 0) { + before_read = current_time_millis(); } - } - if (txn.has_seqno()) { - // Regular transaction. - const char *action; - if (txn.seqno() < 0) { - break; - } else if (txn.seqno() == seqno + 1) { - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; + { + st_intr intr(stop_hub); + readmsg(leader, batch); + } + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; } - process_txn(leader, map, txn, seqno, true); - action = "processed"; - } else { - if (first_seqno == -1) - first_seqno = txn.seqno(); - // Queue up for later processing once a snapshot has been received. - backlog.push(shared_ptr<Txn>(new Txn(txn))); - action = "backlogged"; } + if (batch.txn_size() > 0) { + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + // Regular transaction. + const char *action; + if (txn.seqno() < 0) { + throw break_exception(); + } else if (txn.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + process_txn(leader, map, txn, seqno, true); + action = "processed"; + } else { + if (first_seqno == -1) + first_seqno = txn.seqno(); + // Queue up for later processing once a snapshot has been received. + backlog.push(shared_ptr<Txn>(new Txn(txn))); + action = "backlogged"; + } - if (txn.seqno() % chkpt == 0) { - if (verbose) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + if (txn.seqno() % chkpt == 0) { + if (verbose) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + st_sleep(0); + } } - st_sleep(0); + } else { + // Empty (default) Txn means "generate a snapshot." + // TODO make this faster + shared_ptr<Recovery> recovery(new Recovery); + typedef ::map<int, int> mii_; + mii_ map_(map.begin(), map.end()); + mii_::const_iterator begin = + map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); + mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? + map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); + cout << "generating recovery over " << begin->first << ".." + << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); + if (multirecover) + cout << " (node " << mypos << " of " << nnodes << ")"; + cout << endl; + long long start_snap = current_time_millis(); + foreach (const pii &p, make_iterator_range(begin, end)) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + cout << "generating recovery took " + << current_time_millis() - start_snap << " ms" << endl; + recovery->set_seqno(seqno); + send_states.push(recovery); } - } else { - // Empty (default) Txn means "generate a snapshot." - shared_ptr<Recovery> recovery(new Recovery); - mii::const_iterator begin = - map.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); - mii::const_iterator end = multirecover && mypos < nnodes - 1 ? - map.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map.end(); - cout << "generating recovery over " << begin->first << ".." - << (end == map.end() ? "end" : lexical_cast<string>(end->first)); - if (multirecover) - cout << " (node " << mypos << " of " << nnodes << ")"; - cout << endl; - long long start_snap = current_time_millis(); - foreach (const pii &p, make_iterator_range(begin, end)) { - Recovery_Pair *pair = recovery->add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - cout << "generating recovery took " - << current_time_millis() - start_snap << " ms" << endl; - recovery->set_seqno(seqno); - send_states.push(recovery); } + } catch (break_exception &ex) { } } -/** - * Swallow replica responses. - */ -void -handle_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) +class response_handler { - st_channel<long long> &sub = recover_signals.subscribe(); - long long start_time = current_time_millis(), - recovery_start_time = caught_up ? -1 : start_time, - recovery_end_time = -1; - int recovery_start_seqno = caught_up ? -1 : seqno, - recovery_end_seqno = -1; - int last_seqno = -1; - finally f(lambda () { - long long end_time = current_time_millis(); - if (__ref(recovery_end_time) > -1) { - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); - } - }); - while (true) { +public: + response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + //start_time = current_time_millis(); + //recovery_start_time = caught_up ? -1 : start_time; + //recovery_end_time = -1; + //recovery_start_seqno = caught_up ? -1 : seqno; + //recovery_end_seqno = -1; + //last_seqno = -1; + finally f(lambda () { - // TODO: convert the whole thing to an object so that we can have "scoped - // globals". - long long &recovery_start_time = __ref(recovery_start_time); - int &recovery_start_seqno = __ref(recovery_start_seqno); - long long &recovery_end_time = __ref(recovery_end_time); - int &recovery_end_seqno = __ref(recovery_end_seqno); - long long &start_time = __ref(start_time); - const int &seqno = __ref(seqno); - int &rid = __ref(rid); - st_channel<long long> &sub = __ref(sub); - // The first timestamp that comes down the subscription pipeline is the - // recovery start time, issued by the main thread. The second one is the - // recovery end time, issued by the response handler associated with the - // joiner. - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = seqno; - cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); + long long end_time = current_time_millis(); + if (__ref(recovery_end_time) > -1) { + cout << __ref(rid) << ": "; + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); } }); - Response res; - // Read the message, but correctly respond to interrupts so that we can - // cleanly exit (slightly tricky). - if (last_seqno + 1 == seqno) { - // Stop-interruptible in case we're already caught up. - try { - st_intr intr(stop_hub); + + while (true) { + finally f(boost::bind(&response_handler::loop_cleanup, this)); + + Response res; + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + readmsg(replica, res); + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). + st_intr intr(kill_hub); readmsg(replica, res); - } catch (...) { // TODO: only catch interruptions - // This check on seqnos is OK for termination since the seqno will - // never grow again if stop_hub is set. - if (last_seqno + 1 == seqno) { + } + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + if (!caught_up && res.caught_up()) { + long long now = current_time_millis(), timediff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": "; + cout << "recovering node caught up; took " + << timediff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + if (res.seqno() % chkpt == 0) { + if (verbose) { cout << rid << ": "; - cout << "clean stop; next expected seqno is " << seqno - << " (last seqno was " << last_seqno << ")" << endl; - break; - } else { - continue; + cout << "got response " << res.seqno() << " from " << replica << endl; } + st_sleep(0); } - } else { - // Only kill-interruptible because we want a clean termination (want - // to get all the acks back). - st_intr intr(kill_hub); - readmsg(replica, res); + last_seqno = res.seqno(); } - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - if (!caught_up && res.caught_up()) { - long long now = current_time_millis(), timediff = now - start_time; - caught_up = true; - recover_signals.push(now); + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; cout << rid << ": "; - cout << "recovering node caught up; took " - << timediff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + cout << rid << ": "; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); } - if (res.seqno() % chkpt == 0) { - if (verbose) { - cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; - } - st_sleep(0); - } - last_seqno = res.seqno(); } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +/** + * Swallow replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); } /** @@ -807,7 +1012,7 @@ * from process_txns. */ void -recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, +recover_joiner(st_netfd_t listener, const mii &map, const int &seqno, st_channel<shared_ptr<Recovery> > &send_states) { st_netfd_t joiner; @@ -858,7 +1063,7 @@ vector<replica_info> replicas; st_closing_all_infos close_replicas(replicas); cout << "waiting for at least " << minreps << " replicas to join" << endl; - for (int i = 0; i < minreps; i++) { + for (int i = 0; i < minreps; ++i) { st_netfd_t fd; { st_intr intr(stop_hub); @@ -943,13 +1148,13 @@ { if (disk) { // Disk IO threads. - for (int i = 0; i < 5; i++) { - thread somethread(threadfunc); + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); } } // Initialize database state. - map<int, int> map; + mii map; int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; @@ -994,7 +1199,7 @@ vector<st_netfd_t> replicas; st_closing_all close_replicas(replicas); int mypos = -1; - for (int i = 0; i < init.node_size(); i++) { + for (int i = 0; i < init.node_size(); ++i) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; in_addr host = { sa.host() }; @@ -1030,7 +1235,7 @@ vector<st_thread_t> recovery_builders; assert(seqno == -1); - for (int i = 0; i < (multirecover ? init.node_size() : 1); i++) { + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message. Recovery recovery; @@ -1046,7 +1251,7 @@ << build_start - __ref(before_recv) << " ms: xfer took " << receive_end - receive_start << " ms, deserialization took " << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); i++) { + for (int i = 0; i < recovery.pair_size(); ++i) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); if (i % chkpt == 0) { @@ -1183,6 +1388,8 @@ ("dump,D", po::bool_switch(&dump), "replicas should finally dump their state to a tmp file for " "inspection/diffing") + ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), + "suppress txn msgs") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") ("count-updates,u",po::bool_switch(&count_updates), @@ -1195,7 +1402,9 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader only)") - ("exit-on-seqno,X", po::value<int>(&stop_on_seqno)->default_value(-1), + ("batch-size,b", po::value<int>(&batch_size)->default_value(10), + "number of txns to batch up in each msg (for leader only)") + ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader only)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), @@ -1302,16 +1511,15 @@ foreach (entry p, threadtimes) { total += p.second; } - cout << "total " << total << " all " << all << endl; foreach (entry p, threadtimes) { cout << "- " << threadname(p.first) << ": " << p.second << " ms (" - << (static_cast<double>(p.second) / total) << "% of total, " - << (static_cast<double>(p.second) / all) << "% of all)" << endl; + << pct(p.second, total) << "% of total, " + << pct(p.second, all) << "% of all)" << endl; } - cout << "- total: " << total << " ms (" << double(total) / all + cout << "- total: " << total << " ms (" << pct(total, all) << "% of all)" << endl; cout << "- unaccounted: " << all - total << " ms (" - << double(all - total) / all << "% of all)" << endl; + << pct(all - total, all) << "% of all)" << endl; cout << "- all: " << all << " ms" << endl; } }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |