[Assorted-commits] SF.net SVN: assorted:[1239] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-26 18:32:56
|
Revision: 1239 http://assorted.svn.sourceforge.net/assorted/?rev=1239&view=rev Author: yangzhang Date: 2009-02-26 18:32:43 +0000 (Thu, 26 Feb 2009) Log Message: ----------- - fixed (single-node) recovery - removed verbose and added separate --*-display flags - refactored interval-checking - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-26 18:28:10 UTC (rev 1238) +++ ydb/trunk/README 2009-02-26 18:32:43 UTC (rev 1239) @@ -454,7 +454,6 @@ - tiny improvement - 1: 366K - 2: 360K -- TODO fix pb recovery - DONE figure out why there's such a dramatic slowdown as the DB grows - ydb @@ -470,6 +469,16 @@ - 5e7: 495K - there was an int overflow bug +- DONE fix pb recovery + - abysmal perf; long wait at the map dump + almost never catch up, but at + least it works + +- TODO speed up backlogging; don't create pb objects, just take buffers + +- TODO fix multi-recovery if necessary + +- TODO speed up map dump; don't use range partitioning, but hash partitioning + - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-26 18:28:10 UTC (rev 1238) +++ ydb/trunk/src/main.lzz.clamp 2009-02-26 18:32:43 UTC (rev 1239) @@ -61,10 +61,12 @@ // Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size, display_interval; +int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, + stop_on_seqno, batch_size, handle_responses_display, + catch_up_display, issue_display, + process_display; size_t accept_joiner_size, buf_size, read_buf_size; -bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, +bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; @@ -81,7 +83,10 @@ * Convenience function for calculating percentages. */ template<typename T> -double pct(T sub, T tot) { return 100 * double(sub) / double(tot); } +inline double pct(T sub, T tot) +{ + return 100 * double(sub) / double(tot); +} /** * Convenience class for performing long-jumping break. @@ -119,7 +124,7 @@ /** * Look up thread name, or just show thread ID. */ -string +inline string threadname(st_thread_t t = st_thread_self()) { if (threadnames.find(t) != threadnames.end()) { return threadnames[t]; @@ -131,7 +136,7 @@ /** * Debug function for thread names. Remember what we're switching from. */ -void +inline void switch_out_cb() { if (debug_threads) last_thread = st_thread_self(); @@ -142,7 +147,7 @@ /** * Debug function for thread names. Show what we're switching from/to. */ -void switch_in_cb() +inline void switch_in_cb() { if (debug_threads && last_thread != st_thread_self()) { cout << "switching"; @@ -303,7 +308,7 @@ } template<typename T> -void +inline void ser(ser_array &s, const T &msg) { int len = msg.ByteSize(); @@ -320,7 +325,7 @@ * Serialization. */ template<typename T> -void +inline void ser(ostream &s, const T &msg) { uint32_t len = htonl(uint32_t(msg.ByteSize())); @@ -492,7 +497,7 @@ * for avoiding unnecessary copies. */ template <typename T> -T +inline T readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { T msg; @@ -505,7 +510,7 @@ * st_netfd_t. */ template <typename T> -void +inline void readmsg(st_reader &src, T & msg) { managed_array<char> a = src.read(sizeof(uint32_t)); @@ -602,11 +607,14 @@ // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { - if (multirecover) { - bcastmsg(fds, batch); - } else { - sendmsg(fds[0], batch); + start_txn(batch); + fin_txn(batch); + w.mark(); + if (Types::is_pb()) { + if (multirecover) bcastmsg(fds, batch); + else sendmsg(fds[0], batch); } + batch.Clear(); } // Bring in any new members. // TODO more efficient: copy/extend/append @@ -642,15 +650,14 @@ } // Checkpoint. - if (seqno % chkpt == 0) { - if (verbose) - cout << "issued txn " << seqno << endl; + if (check_interval(seqno, yield_interval)) st_sleep(0); + if (check_interval(seqno, issue_display)) { + cout << "issued txn " << seqno << endl; if (timelim > 0 && current_time_millis() - start_time > timelim) { cout << "time's up; issued " << seqno << " txns in " << timelim << " ms" << endl; stop_hub.set(); } - st_sleep(0); } // For debugging purposes. @@ -809,14 +816,20 @@ } #end -template<typename Txn> shared_ptr<pb::Txn> to_pb_Txn(Txn txn); -template<> shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { +template<typename Txn> inline shared_ptr<pb::Txn> to_pb_Txn(Txn txn); +template<> inline shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { return shared_ptr<pb::Txn>(new pb::Txn(txn)); } -template<> shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { +template<> inline shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { shared_ptr<pb::Txn> ptxn(new pb::Txn()); ptxn->set_seqno(txn.seqno()); - // XXX FIXME + for (int o = 0; o < txn.op_size(); ++o) { + pb::Op *pop = ptxn->add_op(); + const msg::Op &op = txn.op(o); + pop->set_type(static_cast<Op_OpType>(op.type())); + pop->set_key(op.key()); + pop->set_value(op.value()); + } return ptxn; } @@ -946,14 +959,12 @@ action = "backlogged"; } - if (txn.seqno() % chkpt == 0) { - if (verbose) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; - } - st_sleep(0); + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; } } fin_res(resbatch); @@ -962,9 +973,10 @@ ser(serbuf, batch); sendbuf(leader, serbuf); } - } else { + } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster + cout << "generating recovery..." << endl; shared_ptr<Recovery> recovery(new Recovery); typedef ::map<int, int> mii_; mii_ map_(map.begin(), map.end()); @@ -1090,13 +1102,15 @@ stop_hub.set(); } } - if (display_interval > 0 && rseqno % display_interval == 0 && rseqno > 0) { + if (check_interval(rseqno, handle_responses_display)) { cout << rid << ": " << "got response " << rseqno << " from " << replica << "; "; long long display_time = current_time_millis(); showtput("handling", display_time, last_display_time, rseqno, - rseqno - display_interval); + rseqno - handle_responses_display); last_display_time = display_time; + } + if (check_interval(rseqno, yield_interval)) { st_sleep(0); } last_seqno = rseqno; @@ -1439,7 +1453,7 @@ for (int i = 0; i < recovery.pair_size(); ++i) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); - if (i % chkpt == 0) { + if (i % yield_interval == 0) { if (yield_during_build_up) st_sleep(0); } } @@ -1462,14 +1476,17 @@ while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); - process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); - if (p->seqno() % chkpt == 0) { - if (verbose) + if (p->seqno() > seqno) { + process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { cout << "processed txn " << p->seqno() << " off the backlog; " << "backlog.size = " << backlog.queue().size() << endl; - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } } showtput("replayer caught up; from backlog replayed", @@ -1483,6 +1500,12 @@ stop_hub.insert(st_thread_self()); } +inline bool +check_interval(int seqno, int interval) +{ + return interval > 0 && seqno % interval == interval - 1; +} + int sig_pipe[2]; /** @@ -1559,16 +1582,14 @@ "enable context switch debug outputs") ("profile-threads,q",po::bool_switch(&profile_threads), "enable profiling of threads") - ("verbose,v", po::bool_switch(&verbose), - "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery (for recoverer only)") + "yield periodically during build-up phase of recovery (for recoverer)") ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery (for recoverer only)") + "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), - "recover from multiple hosts, instead of just one (specified via leader only)") + "recover from multiple hosts, instead of just one (specified via leader)") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") ("dump,D", po::bool_switch(&dump), @@ -1585,7 +1606,7 @@ ("count-updates,u",po::bool_switch(&count_updates), "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), - "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") + "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader)") ("use-pb", po::bool_switch(&use_pb), "use protocol buffers instead of raw buffers for txns") ("use-pb-res", po::bool_switch(&use_pb_res), @@ -1597,26 +1618,36 @@ ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), - "exit after the joiner fully recovers (for leader only)") + "exit after the joiner fully recovers (for leader)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), - "number of txns to batch up in each msg (for leader only)") + "number of txns to batch up in each msg (for leader)") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), - "exit after txn seqno is issued (for leader only)") + "exit after txn seqno is issued (for leader)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), "accept recovering joiner (start recovery) after DB grows to this size " - "(for leader only)") - ("disp-interval", po::value<int>(&display_interval)->default_value(0), - "after this many txns, print current handling rate") - ("issuing-interval,i", + "(for leader)") + ("handle-responses-display", + po::value<int>(&handle_responses_display)->default_value(0), + "number of responses before printing current handling rate (for leader)") + ("catch-up-display", + po::value<int>(&catch_up_display)->default_value(0), + "number of catch-up txns before printing current recovery rate and queue length (for recoverer)") + ("issue-display", + po::value<int>(&issue_display)->default_value(0), + "number of txns before showing the current issue rate (for leader)") + ("process-display", + po::value<int>(&process_display)->default_value(0), + "number of txns before showing the current issue rate (for worker)") + ("issuing-interval", po::value<int>(&issuing_interval)->default_value(0), - "seconds to sleep between issuing txns (for leader only)") + "seconds to sleep between issuing txns (for leader)") ("min-ops,o", po::value<int>(&min_ops)->default_value(5), - "lower bound on randomly generated number of operations per txn (for leader only)") + "lower bound on randomly generated number of operations per txn (for leader)") ("max-ops,O", po::value<int>(&max_ops)->default_value(5), - "upper bound on randomly generated number of operations per txn (for leader only)") + "upper bound on randomly generated number of operations per txn (for leader)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " @@ -1631,18 +1662,18 @@ "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") - ("chkpt,c", po::value<int>(&chkpt)->default_value(1000), - "number of txns before yielding/verbose printing") + ("yield_interval,y", po::value<int>(&yield_interval)->default_value(10000), + "number of txns before yielding") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") ("write-thresh,w", po::value<long long>(&write_thresh)->default_value(200), - "if positive and any txn write exceeds this, then print a message (for replicas only)") + "if positive and any txn write exceeds this, then print a message") ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), - "if positive and any txn read exceeds this, then print a message (for replicas only)") + "if positive and any txn read exceeds this, then print a message") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), - "port to listen on (replicas only)") + "port to listen on (for worker)") ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), - "timeout for IO operations (in microseconds)") + "timeout for some IO operations that should actually time out (in microseconds)") ("test", "execute unit tests instead of running the normal system") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |