[Assorted-commits] SF.net SVN: assorted:[1130] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-01-14 18:26:15
|
Revision: 1130 http://assorted.svn.sourceforge.net/assorted/?rev=1130&view=rev Author: yangzhang Date: 2009-01-14 18:25:52 +0000 (Wed, 14 Jan 2009) Log Message: ----------- - added analysis.py for aggregating and plotting measurement results (collected from test.bash) - added --dump for finally dumping state - added --exit-on-recovery to make automated runs easier - added --issuing-interval for debugging - fixed bug with response_handler hanging forever if the sequence numbers happen to be caught up; very visible with --issuing-interval - added scaling, full-scaling, full-yield, full-block, much more to test.bash - a bunch of general refactoring of test.bash - documented measurement/analysis tools, tool reqs, more usage notes in general, and personal TODOs/notes - added timestamping to tagssh - fixed bug with process_txn throws a timeout exception trying to sendmsg - added --read-thresh for debugging time spent waiting to read from network socket - added SIGUSR1 pausing - refactored threadnames - improved thread switching callback debugging - refactored/improved thread exception printing - added debugging of large message sending - added sendmsg timeout warnings - added --general-txns so system defaults to insert/update txns - added --count-updates, --show-updates - fixed after-recovery statistics bookkeeping - fixed stop-responsiveness in response_handler - added exception printing in case something goes awry before the RAII thread joins in the main `run_*` functions - removed the breaking in handle_sig_sync Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Added Paths: ----------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/README 2009-01-14 18:25:52 UTC (rev 1130) @@ -1,7 +1,7 @@ Overview -------- -YDB (Yang's Database) is a simple replicated memory store, developed for the +ydb (Yang's Database) is a simple replicated memory store, developed for the purpose of researching various approaches to recovery in such OLTP-optimized databases as [VOLTDB] (formerly H-Store/Horizontica). @@ -25,7 +25,7 @@ Setup ----- -Requirements: +Requirements for the ydb system: - [boost] 1.37 - [C++ Commons] svn r1082 @@ -43,6 +43,16 @@ [Protocol Buffers]: http://code.google.com/p/protobuf/ [State Threads]: http://state-threads.sourceforge.net/ +Requirements for tools: + +- [Assorted Shell Tools] (bash-commons, mssh) +- [Pylab] 0.98.3 +- [Python] 2.5 + +[Assorted Shell Tools]: http://assorted.sf.net/shell-tools/ +[Pylab]: http://matplotlib.sf.net/ +[Python]: http://python.org/ + Usage ----- @@ -74,23 +84,72 @@ sigint to try to force all working threads to shut down (any node, including replicas, respond to ctrl-c). -Full System Test ----------------- +To pause/resume the issuing of transactions, send a sigusr1 to the leader. - ./test.bash full +Measurements +------------ +Included is a suite of scripts to run ydb on the PMG farm machines. It is from +this deployment that performance measurements are collected. + + ./test.bash full-setup + will configure all the farm machines to (1) have my proper initial environment, (2) have all the prerequisite software, and (3) build ydb. This may take a -long time (particularly the boost-building phase). +long time (particularly the boost-building phase). Subsequently, - range='10 13' wait=5 ./test.bash run + ./test.bash setup-ydb -will run a leader on farm10, replicas on farm11 and farm12, and a recovering -replica on farm13 after 5 seconds. Pipe several runs of this to some files -(`*.out`), and plot the results with +should be sufficient for pushing out the source from the current working copy +of the source repository and building on each machine. - ./test.bash plot *.out +Find out which of the farm machines is free by looking at the top 3 items from +`top` on each machine: + ./test.bash hosttops + +Most commands you pass to `test.bash` accept (and some require) a set of hosts +on which to run. Look at the comment documentation in test.bash to find out +more about each function (command). You must specify the hosts by setting +either the `hosts` environment variable to a string of space-separated +hostnames or an array of hostnames, or you may set `range` to a string of a +start and end number to select all hosts from `farm<START>` to `farm<END>`. +Examples (the following all run in parallel across the specified hosts): + + hosts='farm3 farm5 farm7' ./test.bash full-setup + hosts=(farm2 farm3 farm4) ./test.bash setup-ydb # Arrays are also accepted. + range='2 4' ./test.bash hosttops # Same as last line. + +### Recovery experiments + +To run a leader on `farm10`, initial replicas on `farm11` and `farm12`, and a +recovering replica on `farm13` after 5 seconds: + + range='10 13' ./test.bash run 1000 # Command requires exactly 4 nodes. + +To run this experiment TODO +trials: + + range='10 13' + +### Scaling experiments + +To run a leader on `farm10` with initial replicas on the rest: + + range='10 15' ./test.bash scaling + +To run for 1 through 3 initial replicas, repeating each configuration for 3 +trials: + + range='10 13' ./test.bash full-scaling + +Pipe several runs of this to the file `scaling-log`, and plot the results with + + ./analysis.py scaling + +Hence the name "scaling"---this was a test of the scalability of the base +system (no recovery involved). + Recovery Mechanisms ------------------- @@ -98,8 +157,9 @@ - Network recovery - From a single node - - Interleave the state recovery/catch up with the backlogging of live txns - - Recover/catch up in one swoop, then backlog the live txns + - **block**: Backlog live txns, then recover/catch up in one swoop + - **yield**: Interleave the state recovery/catch up with the backlogging of + live txns Pseudo-code ----------- @@ -133,12 +193,14 @@ foreach replica connect to replica recv recovery msg from replica - apply the state - apply backlog + apply the state (regularly yielding if desired) + apply backlog (regularly yielding if desired) Todo ---- +- Add a way to reliably obtain ST stack traces + - Add benchmarking/testing hooks, e.g.: - start the recovering joiner at a well-defined time (after a certain # txns or after the DB reaches a certain size) @@ -150,6 +212,12 @@ - time to recover - bytes used to recover +- Produce time series graphs of the txn throughput and mark the events on the + x-axis, which also conveys the duration of the various phases + - Overlay onto this the various recovery schemes + - Main benchmark: wait until the state grows to a certain size, then start + the recovery + - Run some benchmarks, esp. on multiple physical hosts. - Figure out why things are running so slowly with >2 replicas. @@ -169,3 +237,25 @@ - Add richer transactions/queries/operations. - Add disk-based recovery methods. + +Plan/Notes +---------- + +Measurements + +- DONE find out how often prng yields same number + - not very often +- DONE baseline scaling (tps with number of nodes) + - inversely proportional to number of nodes, so bottlenecked at leader +- DONE recovery time as a function of amount of data + - TODO break down into various phases using bar graph of segmented bars +- DONE use only insert (and update) txns +- TODO try profiling +- TODO detailed view of tps during recovery over time (should see various phases) +- TODO later: runtime overhead of logging/tps under normal operation (scaled + with # nodes?) + +Presentation + +- TODO differences from: harbor, harp, aries +- TODO understand 2pc, paxos, etc. Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/src/main.lzz.clamp 2009-01-14 18:25:52 UTC (rev 1130) @@ -28,12 +28,22 @@ #end typedef pair<int, int> pii; + +// Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno; -bool verbose, yield_during_build_up, yield_during_catch_up; -long long timelim; +int chkpt, accept_joiner_seqno, issuing_interval; +size_t accept_joiner_size; +bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, + count_updates, stop_on_recovery, general_txns; +long long timelim, read_thresh; + +// Control. st_intr_bool stop_hub, kill_hub; +st_bool do_pause; +// Statistics. +int updates; + /** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. @@ -50,7 +60,53 @@ ~thread_eraser() { threads.erase(st_thread_self()); } }; +map<st_thread_t, string> threadnames; +st_thread_t last_thread; + /** + * Look up thread name, or just show thread ID. + */ +string +threadname(st_thread_t t = st_thread_self()) { + if (threadnames.find(t) != threadnames.end()) { + return threadnames[t]; + } else { + return lexical_cast<string>(t); + } +} + +/** + * Debug function for thread names. Remember what we're switching from. + */ +void +switch_out_cb() +{ + last_thread = st_thread_self(); +} + +/** + * Debug function for thread names. Show what we're switching from/to. + */ +void switch_in_cb() +{ + if (last_thread != st_thread_self()) { + cout << "switching"; + if (last_thread != 0) cout << " from " << threadname(last_thread); + cout << " to " << threadname() << endl; + } +} + +/** + * Print to cerr a thread exception. + */ +ostream& +cerr_thread_ex(const std::exception &ex) +{ + return cerr << "exception in thread " << threadname() + << ": " << ex.what(); +} + +/** * Delegate for running thread targets. * \param[in] f The function to execute. * \param[in] intr Whether to signal stop_hub on an exception. @@ -61,9 +117,8 @@ thread_eraser eraser; try { f(); - } catch (const std::exception &ex) { - cerr << "thread " << st_thread_self() << ": " << ex.what() - << (intr ? "; interrupting!" : "") << endl; + } catch (std::exception &ex) { + cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; if (intr) stop_hub.set(); } } @@ -120,6 +175,7 @@ public: st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} ~st_closing_all_infos() { + cout << "closing all conns to replicas (replica_infos)" << endl; foreach (replica_info r, rs_) check0x(st_netfd_close(r.fd())); } @@ -154,15 +210,33 @@ check(msg.SerializeToString(&s)); const char *buf = s.c_str(); + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + // Prefix the message with a four-byte length. uint32_t len = htonl(static_cast<uint32_t>(s.size())); // Broadcast the length-prefixed message to replicas. + int dstno = 0; foreach (st_netfd_t dst, dsts) { - checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), - static_cast<ssize_t>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(s.size())); + size_t resid = sizeof len; +#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) + int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + if (res == -1 && errno == ETIME) { + cerr << "got timeout! " << resid << " of " << sizeof len + << " remaining, for dst #" << dstno << endl; + checksize(st_write(dst, + reinterpret_cast<char*>(&len) + sizeof len - resid, + resid, + ST_UTIME_NO_TIMEOUT), + resid); + } else { + check0x(res); + } + checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + dstno++; } } @@ -237,10 +311,13 @@ }); while (!stop_hub) { - // Did we get a new member? + // Did we get a new member? If so, notify an arbitrary member (the first + // one) to prepare to send recovery information (by sending an + // empty/default Txn). if (!newreps.empty() && seqno > 0) { sendmsg(fds[0], Txn()); } + // Bring in any new members. while (!newreps.empty()) { fds.push_back(newreps.take().fd()); } @@ -251,12 +328,14 @@ int count = randint(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = randint(3), rkey = randint(), rvalue = randint(); + int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); } + if (do_pause) do_pause.waitreset(); + // Broadcast. bcastmsg(fds, txn); @@ -266,11 +345,14 @@ cout << "issued txn " << txn.seqno() << endl; if (timelim > 0 && current_time_millis() - start_time > timelim) { cout << "time's up; issued " << txn.seqno() << " txns in " << timelim - << " ms" << endl; + << " ms" << endl; stop_hub.set(); } st_sleep(0); } + if (issuing_interval > 0) { + st_sleep(issuing_interval); + } if (txn.seqno() == accept_joiner_seqno) { accept_joiner.set(); @@ -282,7 +364,7 @@ * Process a transaction: update DB state (incl. seqno) and send response to * leader. */ -void + void process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, bool caught_up) { @@ -293,22 +375,29 @@ seqno = txn.seqno(); for (int o = 0; o < txn.op_size(); o++) { const Op &op = txn.op(o); + const int key = op.key(); + if (show_updates || count_updates) { + if (map.find(key) != map.end()) { + if (show_updates) cout << "existing key: " << key << endl; + if (count_updates) updates++; + } + } switch (op.type()) { case Op::read: - res.add_result(map[op.key()]); + res.add_result(map[key]); break; case Op::write: - map[op.key()] = op.value(); + map[key] = op.value(); break; case Op::del: - map.erase(op.key()); + map.erase(key); break; } } sendmsg(leader, res); } -void + void showtput(const string &action, long long stop_time, long long start_time, int stop_count, int start_count) { @@ -316,7 +405,7 @@ int count_diff = stop_count - start_count; double rate = double(count_diff) * 1000 / time_diff; cout << action << " " << count_diff << " txns in " << time_diff << " ms (" - << rate << "tps)" << endl; + << rate << "tps)" << endl; } /** @@ -338,7 +427,7 @@ * leader. Not entirely clear that this is necessary; could probably just go * with seqno. */ -void + void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno) @@ -349,27 +438,38 @@ int seqno_caught_up = caught_up ? seqno : -1; finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(shared_ptr<Recovery>()); - }); + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); while (true) { Txn txn; + long long before_read; + if (read_thresh > 0) { + before_read = current_time_millis(); + } { st_intr intr(stop_hub); readmsg(leader, txn); } - + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "current_time_millis() - before_read = " << read_time << " > " + << read_thresh << endl; + } + } if (txn.has_seqno()) { + const char *action; if (txn.seqno() == seqno + 1) { if (!caught_up) { time_caught_up = current_time_millis(); @@ -379,20 +479,26 @@ caught_up = true; } process_txn(leader, map, txn, seqno, true); + action = "processed"; } else { // Queue up for later processing once a snapshot has been received. backlog.push(shared_ptr<Txn>(new Txn(txn))); + action = "backlogged"; } if (txn.seqno() % chkpt == 0) { - if (verbose) - cout << "processed txn " << txn.seqno() - << "; db size = " << map.size() << endl; + if (verbose) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } st_sleep(0); } } else { // Generate a snapshot. shared_ptr<Recovery> recovery(new Recovery); + cout << "generating recovery of " << map.size() << " records" << endl; foreach (const pii &p, map) { Recovery_Pair *pair = recovery->add_pair(); pair->set_key(p.first); @@ -408,7 +514,7 @@ /** * Swallow replica responses. */ -void + void handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { @@ -418,38 +524,87 @@ recovery_end_time = -1; int recovery_start_seqno = caught_up ? -1 : seqno, recovery_end_seqno = -1; + int last_seqno = -1; finally f(lambda () { long long end_time = current_time_millis(); - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); + if (__ref(recovery_end_time) > -1) { + cout << __ref(rid) << ": "; + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); + } }); while (true) { + finally f(lambda () { + // TODO: convert the whole thing to an object so that we can have "scoped + // globals". + long long &recovery_start_time = __ref(recovery_start_time); + int &recovery_start_seqno = __ref(recovery_start_seqno); + long long &recovery_end_time = __ref(recovery_end_time); + int &recovery_end_seqno = __ref(recovery_end_seqno); + long long &start_time = __ref(start_time); + const int &seqno = __ref(seqno); + int &rid = __ref(rid); + st_channel<long long> &sub = __ref(sub); + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + cout << rid << ": "; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + }); Response res; - { + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + readmsg(replica, res); + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "stopping seqno = " << res.seqno() << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). st_intr intr(kill_hub); readmsg(replica, res); } - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = seqno; - cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); - } + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. if (!caught_up && res.caught_up()) { - long long t = current_time_millis(), timediff = t - start_time; + long long now = current_time_millis(), timediff = now - start_time; caught_up = true; - recover_signals.push(t); + recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " << timediff << "ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } } if (res.seqno() % chkpt == 0) { if (verbose) { @@ -458,12 +613,7 @@ } st_sleep(0); } - // This is OK since the seqno will never grow again if stop_hub is set. - if (stop_hub && res.seqno() + 1 == seqno) { - cout << rid << ": "; - cout << "stopping seqno = " << res.seqno() << endl; - break; - } + last_seqno = res.seqno(); } } @@ -499,9 +649,10 @@ } st_closing closing(joiner); - cout << "got joiner's connection, sending recovery" << endl; + cout << "got joiner's connection, sending recovery of " + << recovery->pair_size() << " records" << endl; sendmsg(joiner, *recovery); - cout << "sent" << endl; + cout << "sent recovery" << endl; } /** @@ -556,36 +707,42 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), true), - "handle_responses")); - } + try { + // Start handling responses. + st_thread_group handlers; + int rid = 0; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true), + "handle_responses")); + } - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); + } + Join join = readmsg<Join>(joiner); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); + recover_signals.push(current_time_millis()); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false), + "handle_responses_joiner")); + } catch (std::exception &ex) { + // TODO: maybe there's a cleaner way to do this final step before waiting with the join + cerr_thread_ex(ex) << endl; + throw; } - Join join = readmsg<Join>(joiner); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, - ref(recover_signals), false), - "handle_responses")); } /** @@ -598,13 +755,18 @@ map<int, int> map; int seqno = -1; finally f(lambda () { + cout << "REPLICA SUMMARY" << endl; + cout << "total updates = " << updates << endl; + cout << "final DB state: seqno = " << __ref(seqno) << ", size = " + << __ref(map).size() << endl; string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - cout << "dumping DB state (seqno = " << __ref(seqno) << ", size = " - << __ref(map).size() << ") to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, __ref(map)) { - of << p.first << ": " << p.second << endl; + if (dump) { + cout << "dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << __ref(seqno) << endl; + foreach (const pii &p, __ref(map)) { + of << p.first << ": " << p.second << endl; + } } }); st_channel<shared_ptr<Recovery> > send_states; @@ -658,39 +820,46 @@ ref(seqno), ref(send_states)), "recover_joiner")); - // If there's anything to recover. - if (init.txnseqno() > 0) { - cout << "waiting for recovery from " << replicas[0] << endl; + try { + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery from " << replicas[0] << endl; - // Read the recovery message. - Recovery recovery; - { - st_intr intr(stop_hub); - readmsg(replicas[0], recovery); - } - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); - if (i % chkpt == 0) { - if (yield_during_build_up) st_sleep(0); + // Read the recovery message. + Recovery recovery; + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); } - } - assert(seqno == -1 && - static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered." << endl; + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered " << recovery.pair_size() << " records." << endl; - while (!backlog.empty()) { - shared_ptr<Txn> p = backlog.take(); - process_txn(leader, map, *p, seqno, false); - if (p->seqno() % chkpt == 0) { - if (verbose) - cout << "processed txn " << p->seqno() << " off the backlog" << endl; - if (yield_during_catch_up) + while (!backlog.empty()) { + shared_ptr<Txn> p = backlog.take(); + process_txn(leader, map, *p, seqno, false); + if (p->seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) st_sleep(0); + } } + cout << "caught up." << endl; } - cout << "caught up." << endl; + } catch (std::exception &ex) { + cerr_thread_ex(ex) << endl; + throw; } stop_hub.insert(st_thread_self()); @@ -726,22 +895,13 @@ foreach (st_thread_t t, threads) { st_thread_interrupt(t); } + } else if (sig == SIGUSR1) { + toggle(do_pause); } - break; + //break; } } -map<st_thread_t, string> threadnames; - -void cb() -{ - if (threadnames.find(st_thread_self()) != threadnames.end()) { - cout << "switched to: " << threadnames[st_thread_self()] << endl; - } else { - cout << "switched to: " << st_thread_self() << endl; - } -} - /** * Initialization and command-line parsing. */ @@ -763,18 +923,38 @@ ("help,h", "show this help message") ("debug-threads,d",po::bool_switch(&debug_threads), "enable context switch debug outputs") - ("verbose,v", "enable periodic printing of txn processing progress") + ("verbose,v", po::bool_switch(&verbose), + "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") ("yield-build-up", po::bool_switch(&yield_during_build_up), "yield periodically during build-up phase of recovery") ("yield-catch-up", po::bool_switch(&yield_during_catch_up), "yield periodically during catch-up phase of recovery") + ("dump,D", po::bool_switch(&dump), + "replicas should finally dump their state to a tmp file for " + "inspection/diffing") + ("show-updates,U", po::bool_switch(&show_updates), + "log operations that touch (update/read/delete) an existing key") + ("count-updates,u",po::bool_switch(&count_updates), + "count operations that touch (update/read/delete) an existing key") + ("general-txns,g", po::bool_switch(&general_txns), + "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") + ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), + "exit after the joiner fully recovers (for leader only)") + ("accept-joiner-size,s", + po::value<size_t>(&accept_joiner_size)->default_value(0), + "accept recovering joiner (start recovery) after DB grows to this size " + "(for leader only)") + ("issuing-interval,i", + po::value<int>(&issuing_interval)->default_value(0), + "seconds to sleep between issuing txns (for leader only)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), - "accept recovering joiner (start recovery) after this seqno") + "accept recovering joiner (start recovery) after this seqno (for leader " + "only)") ("leader-host,H", po::value<string>(&leader_host)->default_value(string("localhost")), "hostname or address of the leader") @@ -784,10 +964,13 @@ ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), - "time limit in milliseconds, or 0 for none") + "general network IO time limit in milliseconds, or 0 for none") + ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), + "if positive and any txn read exceeds this, then print a message " + "(for replicas only)") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), "port to listen on (replicas only)") - ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), "timeout for IO operations (in microseconds)") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); @@ -813,13 +996,16 @@ check0x(sigemptyset(&sa.sa_mask)); sa.sa_flags = 0; check0x(sigaction(SIGINT, &sa, nullptr)); + check0x(sigaction(SIGTERM, &sa, nullptr)); + check0x(sigaction(SIGUSR1, &sa, nullptr)); // Initialize ST. if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); st_spawn(bind(handle_sig_sync)); if (debug_threads) { - st_set_switch_in_cb(cb); + st_set_switch_out_cb(switch_out_cb); + st_set_switch_in_cb(switch_in_cb); } // Initialize thread manager for clean shutdown of all threads. @@ -835,9 +1021,9 @@ } return 0; - } catch (const std::exception &ex) { + } catch (std::exception &ex) { // Must catch all exceptions at the top to make the stack unwind. - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + cerr_thread_ex(ex) << endl; return 1; } } Added: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py (rev 0) +++ ydb/trunk/tools/analysis.py 2009-01-14 18:25:52 UTC (rev 1130) @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +from __future__ import with_statement +import re, sys, itertools, numpy +from pylab import * + +def check(path): + with file(path) as f: + if 'got timeout' in f.read(): + print 'warning: timeout occurred' + +def agg(src): + def gen(): + for seqno, pairs in itertools.groupby(src, lambda (a,b): a): + ts = numpy.array([t for seqno, t in pairs]) + yield seqno, ts.mean(), ts.std(), ts + return list(gen()) + +def scaling(path): + check(path) + def getpairs(): + with file(path) as f: + for line in f: + m = re.match( r'=== n=(?P<n>\d+) ', line ) + if m: + n = int(m.group('n')) + m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+)tps', line ) + if m: + tps = float(m.group('tps')) + yield (n, tps) + tups = agg(getpairs()) + + print 'num nodes, mean tps, stdev tps' + for n, mean, sd, raw in tups: print n, mean, sd, raw + + xs, ys, es, rs = zip(*tups) + errorbar(xs, ys, es) + title('Scaling of baseline throughput with number of nodes') + xlabel('Node count') + ylabel('Mean TPS (stdev error bars)') + xlim(.5, n+.5) + ylim(ymin = 0) + savefig('scaling.png') + +def run(blockpath, yieldpath): + for path, label in [(blockpath, 'blocking scheme'), (yieldpath, 'yielding scheme')]: + check(path) + def getpairs(): + with file(path) as f: + for line in f: + m = re.match( r'=== seqno=(?P<n>\d+) ', line ) + if m: + seqno = int(m.group('n')) + m = re.match( r'.*: recovering node caught up; took (?P<time>\d+)ms', line ) + if m: + t = float(m.group('time')) + yield (seqno, t) + tups = agg(getpairs()) + + print 'max seqno, mean time, stdev time [raw data]' + for seqno, mean, sd, raw in tups: print seqno, mean, sd, raw + + xs, ys, es, rs = zip(*tups) + errorbar(xs, ys, es, label = label) + + title('Recovery time over number of transactions') + xlabel('Transaction count (corresponds roughly to data size)') + ylabel('Mean TPS (stdev error bars)') + #xlim(.5, n+.5) + #ylim(ymin = 0) + savefig('run.png') + +def main(argv): + if len(argv) <= 1: + print >> sys.stderr, 'Must specify a command' + elif sys.argv[1] == 'scaling': + scaling(sys.argv[2] if len(sys.argv) > 2 else 'scaling-log') + elif sys.argv[1] == 'run': + run(*sys.argv[2:] if len(sys.argv) > 2 else ['block-log', 'yield-log']) + else: + print >> sys.stderr, 'Unknown command:', sys.argv[1] + +sys.exit(main(sys.argv)) Property changes on: ydb/trunk/tools/analysis.py ___________________________________________________________________ Added: svn:executable + * Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/tools/test.bash 2009-01-14 18:25:52 UTC (rev 1130) @@ -8,7 +8,13 @@ script="$(basename "$0")" tagssh() { - ssh "$@" 2>&1 | sed "s/^/$1: /" + ssh "$@" 2>&1 | python -u -c ' +import time, sys +while True: + line = sys.stdin.readline() + if line == "": break + print sys.argv[1], time.time(), ":\t", line, +' $1 } check-remote() { @@ -120,7 +126,7 @@ tagssh "$host" "./$script" "$@" } -allhosts() { +hosts() { if [[ ${host:-} ]] ; then echo $host elif [[ ${range:-} ]] ; then @@ -142,23 +148,27 @@ farm13.csail farm14.csail EOF - fi | xargs ${xargs--P9} -I^ "$@" + fi } -allssh() { - allhosts ssh ^ "set -o errexit -o nounset; $@" +parhosts() { + hosts | xargs ${xargs--P9} -I^ "$@" } -allscp() { - allhosts scp -q "$@" +parssh() { + parhosts ssh ^ "set -o errexit -o nounset; $@" } -allremote() { - allhosts "./$script" remote ^ "$@" +parscp() { + parhosts scp -q "$@" } +parremote() { + parhosts "./$script" remote ^ "$@" +} + init-setup() { - allremote node-init-setup + parremote node-init-setup } get-deps() { @@ -174,7 +184,7 @@ } setup-deps() { - allscp \ + parscp \ /usr/share/misc/config.guess \ /tmp/lzz.static \ /tmp/st-1.8.tar.gz \ @@ -183,33 +193,33 @@ clamp.patch \ ^:/tmp/ - allremote node-setup-lzz - allremote node-setup-st - allremote node-setup-pb - allremote node-setup-boost - allremote node-setup-m4 - allremote node-setup-bison - allremote node-setup-clamp + parremote node-setup-lzz + parremote node-setup-st + parremote node-setup-pb + parremote node-setup-boost + parremote node-setup-m4 + parremote node-setup-bison + parremote node-setup-clamp } setup-ydb() { - allremote node-setup-ydb-1 - rm -r /tmp/{ydb,ccom}-src/ + parremote node-setup-ydb-1 + rm -rf /tmp/{ydb,ccom}-src/ svn export ~/ydb/src /tmp/ydb-src/ svn export ~/ccom/src /tmp/ccom-src/ - allscp -r /tmp/ydb-src/* ^:ydb/src/ - allscp -r /tmp/ccom-src/* ^:ccom/src/ - allremote node-setup-ydb-2 + parscp -r /tmp/ydb-src/* ^:ydb/src/ + parscp -r /tmp/ccom-src/* ^:ccom/src/ + parremote node-setup-ydb-2 } -full() { +full-setup() { init-setup setup-deps setup-ydb } hostinfos() { - xargs= allssh " + xargs= parssh " echo hostname echo ===== @@ -219,7 +229,7 @@ } hosttops() { - xargs= allssh " + xargs= parssh " echo hostname echo ===== @@ -227,51 +237,134 @@ " } -range2args() { - "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') +hostargs() { + if [[ $range ]] + then "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') + else "$@" ${hosts[@]} + fi } -run-helper() { - tagssh $1 "ydb/src/ydb -l" & +scaling-helper() { + local leader=$1 + shift + tagssh $leader "ydb/src/ydb -l -n $#" & sleep .1 - tagssh $2 "ydb/src/ydb -H $1" & - tagssh $3 "ydb/src/ydb -H $1" & + for rep in "$@" + do tagssh $rep "ydb/src/ydb -n $# -H $leader" & + done sleep ${wait1:-10} - tagssh $4 "ydb/src/ydb -H $1" & - if [[ ${wait2:-} ]] - then sleep $wait2 - else read + tagssh $leader 'pkill -sigint ydb' + wait +} + +# This just tests how the system scales; no recovery involved. +scaling() { + hostargs scaling-helper +} + +# Repeat some experiment some number of trials and for some number of range +# configurations; e.g., "repeat scaling". +# TODO: fix this to work also with `hosts`; move into repeat-helper that's run +# via hostargs, and change the range= to hosts= +full-scaling() { + local base=$1 out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + shift + for n in {1..5} ; do # configurations + export range="$base $((base + n))" + stop + for i in {1..5} ; do # trials + echo === n=$n i=$i === + scaling + sleep 1 + stop + sleep .1 + echo + done + done >& $out + ln -sf $out scaling-log +} + +run-helper() { + local leader=$1 + shift + tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) ${extraargs:-}" & # -v --debug-threads + sleep .1 # pexpect 'waiting for at least' + # Run initial replicas. + while (( $# > 1 )) ; do + tagssh $1 "ydb/src/ydb -H $leader" & + shift + done + sleep .1 # pexpect 'got all \d+ replicas' leader + # Run joiner. + tagssh $1 "ydb/src/ydb -H $leader" & # -v --debug-threads -t 200000" & + if false ; then + if [[ ${wait2:-} ]] + then sleep $wait2 + else read + fi + tagssh $leader "pkill -sigint ydb" fi - tagssh $1 "pkill -sigint ydb" + wait } run() { - range2args run-helper + hostargs run-helper } +full-run() { + for seqno in 100000 300000 500000 700000 900000; do # configurations + stop + for i in {1..5} ; do # trials + echo === seqno=$seqno i=$i === + run + sleep 1 + stop + sleep .1 + echo + done + done +} + +full-block() { + local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) + full-run >& $out + ln -sf $out block-log +} + +full-yield() { + local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) + extraargs='--yield-catch-up' full-run >& $out + ln -sf $out yield-log +} + +full() { + #full-block + full-yield + #full-scaling +} + stop-helper() { - tagssh $1 'pkill ydb' + tagssh $1 'pkill -sigint ydb' } stop() { - range2args stop-helper + hostargs stop-helper } kill-helper() { - tagssh $1 'pkill ydb' - tagssh $2 'pkill ydb' - tagssh $3 'pkill ydb' - tagssh $4 'pkill ydb' + for i in "$@" + do tagssh $i 'pkill ydb' + done } kill() { - range2args kill-helper + hostargs kill-helper } -#plot() { -# for i in "$@" ; do -# sed "s/farm$i.csail//" < "$i" -# done -#} +# Use mssh to log in with password as root to each machine. +mssh-root() { + : "${hosts:="$(hosts)"}" + mssh -l root "$@" +} "$@" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |