[Assorted-commits] SF.net SVN: assorted:[1133] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-01-17 18:57:00
|
Revision: 1133 http://assorted.svn.sourceforge.net/assorted/?rev=1133&view=rev Author: yangzhang Date: 2009-01-17 18:56:53 +0000 (Sat, 17 Jan 2009) Log Message: ----------- - filled in more measurement documentation and rewrote parts of the overview - added -pg to the Makefile for profiling - added --profile-threads - added --write-thresh - improved detail of and tweaked some output messages - more clearly distinguished the different phases of recovery on the joiner - changed the joiner behavior to not send responses when catching up - added clean shutdown messages from leader to replicas - decreased the variance in the txns - added --max-ops, --min-ops - added some lineage to the analysis.py output - generalized analysis code to support plotting multi-segmented bar charts Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/README 2009-01-17 18:56:53 UTC (rev 1133) @@ -3,24 +3,23 @@ ydb (Yang's Database) is a simple replicated memory store, developed for the purpose of researching various approaches to recovery in such OLTP-optimized -databases as [VOLTDB] (formerly H-Store/Horizontica). +databases as [H-Store] (or VOLTDB). -[VOLTDB]: http://db.cs.yale.edu/hstore/ +[H-Store]: http://db.cs.yale.edu/hstore/ -Currently, the only recovery implemented mechanism is to have the first-joining +Currently, the only recovery implemented mechanism is to have an already-joined replica serialize its entire database state and send that to the joining node. -If you start a system with a minimum of $n$ replicas, then the leader will wait -for that many to them to join before it starts issuing transactions. Then when -replica $n+1$ joins, it will need to catch up to the current state of the -system; it will do so by contacting the first-joining replica and receiving a -complete dump of its DB state. +If you start a system with a minimum of $n$ replicas, then the leader waits for +that many to them to join before it starts issuing transactions. Once replica +$n+1$ joins, it needs to catch up to the current state of the system; it does +so by contacting the first-joining replica and receiving a complete dump of its +DB state. -The leader will report the current txn seqno to the joiner, and start streaming -txns beyond that seqno to the joiner, which the joiner will push onto its -backlog. It will also instruct that first replica to snapshot its DB state at -this txn seqno and prepare to send it to the recovering node as soon as it -connects. +The leader reports the current txn seqno to the joiner, and starts streaming +txns beyond that seqno to the joiner, which the joiner pushes onto its backlog. +It also instructs that first replica to snapshot its DB state at this txn seqno +and prepare to send it to the recovering node as soon as it connects. Setup ----- @@ -123,23 +122,28 @@ ### Recovery experiments To run a leader on `farm10`, initial replicas on `farm11` and `farm12`, and a -recovering replica on `farm13` after 5 seconds: +recovering replica on `farm13` after once the 100,000th txn has been issued: - range='10 13' ./test.bash run 1000 # Command requires exactly 4 nodes. + range='10 13' seqno=100000 ./test.bash run -To run this experiment TODO -trials: +The above experiment uses the `block` recovery scheme. To use the `yield` +recovery scheme: - range='10 13' + range='10 13' seqno=100000 extraargs=--yield-catch-up ./test.bash run +To run `block` and `yield`, respectively, for varying values of `seqno` and for +some number of trials: + + range='10 13' ./test.bash full-block + range='10 13' ./test.bash full-yield + ### Scaling experiments To run a leader on `farm10` with initial replicas on the rest: range='10 15' ./test.bash scaling -To run for 1 through 3 initial replicas, repeating each configuration for 3 -trials: +To run for varying numbers of replicas and for some number of trials: range='10 13' ./test.bash full-scaling @@ -199,12 +203,44 @@ Todo ---- +- DONE add benchmarking/testing hooks + - start the recovering joiner at a well-defined time (after a certain # + txns or after the DB reaches a certain size) + - stop the system once recovery finishes +- DONE find out how often prng yields same number + - not very often +- DONE baseline scaling (tps with number of nodes) + - inversely proportional to number of nodes, so bottlenecked at leader +- DONE recovery time as a function of amount of data +- DONE use only insert (and update) txns + - db size blows up much faster +- DONE try gprof profiling + - output quirky; waiting on list response to question +- DONE optimize acks from joiner + - much faster, and much less variance +- DONE use more careful txn counting/data size + - added lower/upper bounds on the rand # ops per txn (5), combined with + above restricting of ops to writes + - 5 is a lot; db grows large; experiments take much longer +- DONE break down into various phases using bar graph of segmented bars +- TODO serialize outputs from the various clients to a single merger to (1) + have ordering over the (timestamped) messages, and (2) avoid interleaved + lines +- TODO detailed view of tps during recovery over time (should see various + phases) +- TODO later: runtime overhead of logging/tps under normal operation (scaled + with # nodes?) +- TODO later: timestamped logging? + +Longer term + +- Testing + - unit/regression/mock + - performance tests + - valgrind + - Add a way to reliably obtain ST stack traces -- Add benchmarking/testing hooks, e.g.: - - start the recovering joiner at a well-defined time (after a certain # txns - or after the DB reaches a certain size) - - Add benchmarking information, e.g.: - txns/second normally - txns during recovery @@ -238,24 +274,8 @@ - Add disk-based recovery methods. -Plan/Notes ----------- +Presentation Notes +------------------ -Measurements - -- DONE find out how often prng yields same number - - not very often -- DONE baseline scaling (tps with number of nodes) - - inversely proportional to number of nodes, so bottlenecked at leader -- DONE recovery time as a function of amount of data - - TODO break down into various phases using bar graph of segmented bars -- DONE use only insert (and update) txns -- TODO try profiling -- TODO detailed view of tps during recovery over time (should see various phases) -- TODO later: runtime overhead of logging/tps under normal operation (scaled - with # nodes?) - -Presentation - - TODO differences from: harbor, harp, aries - TODO understand 2pc, paxos, etc. Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/src/Makefile 2009-01-17 18:56:53 UTC (rev 1133) @@ -19,14 +19,21 @@ SRCS := $(GENSRCS) OBJS := $(GENOBJS) +ifneq ($(GPROF),) + GPROF := -pg +endif +ifneq ($(GCOV),) + GCOV := -fprofile-arcs -ftest-coverage +endif LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf \ - -lboost_program_options-gcc43-mt -CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ + -lboost_program_options-gcc43-mt $(GPROF) +CXXFLAGS := -g3 $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ + -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ -Winline -Wsynth -PBCXXFLAGS := -g3 -Wall -Werror +PBCXXFLAGS := -g3 -Wall -Werror $(GPROF) all: $(TARGET) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/src/main.lzz.clamp 2009-01-17 18:56:53 UTC (rev 1133) @@ -31,11 +31,12 @@ // Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno, issuing_interval; +int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, - count_updates, stop_on_recovery, general_txns; -long long timelim, read_thresh; + count_updates, stop_on_recovery, general_txns, profile_threads, + debug_threads; +long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; @@ -60,10 +61,19 @@ ~thread_eraser() { threads.erase(st_thread_self()); } }; +/** + * For debug/error-printing purposes. + */ map<st_thread_t, string> threadnames; st_thread_t last_thread; /** + * For profiling. + */ +map<st_thread_t, long long> threadtimes; +long long thread_start_time; + +/** * Look up thread name, or just show thread ID. */ string @@ -81,7 +91,9 @@ void switch_out_cb() { - last_thread = st_thread_self(); + if (debug_threads) last_thread = st_thread_self(); + if (profile_threads) + threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; } /** @@ -89,11 +101,13 @@ */ void switch_in_cb() { - if (last_thread != st_thread_self()) { + if (debug_threads && last_thread != st_thread_self()) { cout << "switching"; if (last_thread != 0) cout << " from " << threadname(last_thread); cout << " to " << threadname() << endl; } + if (profile_threads) + thread_start_time = current_time_millis(); } /** @@ -223,9 +237,11 @@ size_t resid = sizeof len; #define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + long long before_write; + if (write_thresh > 0) { + before_write = current_time_millis(); + } if (res == -1 && errno == ETIME) { - cerr << "got timeout! " << resid << " of " << sizeof len - << " remaining, for dst #" << dstno << endl; checksize(st_write(dst, reinterpret_cast<char*>(&len) + sizeof len - resid, resid, @@ -234,6 +250,14 @@ } else { check0x(res); } + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), s.size()); dstno++; @@ -325,7 +349,7 @@ // Generate a random transaction. Txn txn; txn.set_seqno(seqno++); - int count = randint(5) + 1; + int count = randint(min_ops, max_ops + 1); for (int o = 0; o < count; o++) { Op *op = txn.add_op(); int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); @@ -358,6 +382,10 @@ accept_joiner.set(); } } + + Txn txn; + txn.set_seqno(-1); + bcastmsg(fds, txn); } /** @@ -394,7 +422,9 @@ break; } } - sendmsg(leader, res); + if (caught_up) { + sendmsg(leader, res); + } } void @@ -405,7 +435,7 @@ int count_diff = stop_count - start_count; double rate = double(count_diff) * 1000 / time_diff; cout << action << " " << count_diff << " txns in " << time_diff << " ms (" - << rate << "tps)" << endl; + << rate << " tps)" << endl; } /** @@ -427,7 +457,7 @@ * leader. Not entirely clear that this is necessary; could probably just go * with seqno. */ - void +void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno) @@ -436,20 +466,24 @@ long long start_time = current_time_millis(), time_caught_up = caught_up ? start_time : -1; int seqno_caught_up = caught_up ? seqno : -1; + // Used by joiner only to tell where we actually started (init_seqno is just + // the seqno reported by the leader in the Init message, but it may have + // issued more since the Init message). + int first_seqno = -1; finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(shared_ptr<Recovery>()); - }); + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); while (true) { Txn txn; @@ -464,23 +498,28 @@ if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; if (read_time > read_thresh) { - cout << "current_time_millis() - before_read = " << read_time << " > " - << read_thresh << endl; + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; } } if (txn.has_seqno()) { const char *action; - if (txn.seqno() == seqno + 1) { + if (txn.seqno() < 0) { + break; + } else if (txn.seqno() == seqno + 1) { if (!caught_up) { time_caught_up = current_time_millis(); seqno_caught_up = seqno; - showtput("backlogged", time_caught_up, start_time, seqno_caught_up, - init_seqno); + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno : first_seqno); caught_up = true; } process_txn(leader, map, txn, seqno, true); action = "processed"; } else { + if (first_seqno == -1) + first_seqno = txn.seqno(); // Queue up for later processing once a snapshot has been received. backlog.push(shared_ptr<Txn>(new Txn(txn))); action = "backlogged"; @@ -576,7 +615,8 @@ // never grow again if stop_hub is set. if (last_seqno + 1 == seqno) { cout << rid << ": "; - cout << "stopping seqno = " << res.seqno() << endl; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; break; } else { continue; @@ -597,7 +637,7 @@ recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " - << timediff << "ms" << endl; + << timediff << " ms" << endl; // This will cause the program to exit eventually, but cleanly, such that // the recovery time will be set first, before the eventual exit (which // may not even happen in the current iteration). @@ -756,12 +796,12 @@ int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; - cout << "total updates = " << updates << endl; - cout << "final DB state: seqno = " << __ref(seqno) << ", size = " + cout << "- total updates = " << updates << endl; + cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " << __ref(map).size() << endl; string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); if (dump) { - cout << "dumping to " << fname << endl; + cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; foreach (const pii &p, __ref(map)) { @@ -824,6 +864,7 @@ // If there's anything to recover. if (init.txnseqno() > 0) { cout << "waiting for recovery from " << replicas[0] << endl; + long long before_recv = current_time_millis(); // Read the recovery message. Recovery recovery; @@ -831,6 +872,9 @@ st_intr intr(stop_hub); readmsg(replicas[0], recovery); } + long long build_start = current_time_millis(); + cout << "got recovery message in " << build_start - before_recv + << " ms" << endl; for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); @@ -840,8 +884,11 @@ } assert(seqno == -1 && static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered " << recovery.pair_size() << " records." << endl; + int mid_seqno = seqno = recovery.seqno(); + long long mid_time = current_time_millis(); + cout << "receive and build-up took " << mid_time - before_recv + << " ms; built up map of " << recovery.pair_size() << " records in " + << mid_time - build_start << " ms; now at seqno " << seqno << endl; while (!backlog.empty()) { shared_ptr<Txn> p = backlog.take(); @@ -849,13 +896,14 @@ if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; + << "backlog.size = " << backlog.queue().size() << endl; // Explicitly yield. (Note that yielding does still effectively // happen anyway because process_txn is a yield point.) st_sleep(0); } } - cout << "caught up." << endl; + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); } } catch (std::exception &ex) { cerr_thread_ex(ex) << endl; @@ -912,7 +960,7 @@ try { GOOGLE_PROTOBUF_VERIFY_VERSION; - bool is_leader, use_epoll, debug_threads; + bool is_leader, use_epoll; int minreps; uint16_t leader_port, listen_port; string leader_host; @@ -923,6 +971,8 @@ ("help,h", "show this help message") ("debug-threads,d",po::bool_switch(&debug_threads), "enable context switch debug outputs") + ("profile-threads,q",po::bool_switch(&profile_threads), + "enable profiling of threads") ("verbose,v", po::bool_switch(&verbose), "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), @@ -951,6 +1001,12 @@ ("issuing-interval,i", po::value<int>(&issuing_interval)->default_value(0), "seconds to sleep between issuing txns (for leader only)") + ("min-ops,o", + po::value<int>(&min_ops)->default_value(5), + "lower bound on randomly generated number of operations per txn (for leader only)") + ("max-ops,O", + po::value<int>(&max_ops)->default_value(5), + "upper bound on randomly generated number of operations per txn (for leader only)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " @@ -965,9 +1021,10 @@ "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") + ("write-thresh,w", po::value<long long>(&write_thresh)->default_value(200), + "if positive and any txn write exceeds this, then print a message (for replicas only)") ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), - "if positive and any txn read exceeds this, then print a message " - "(for replicas only)") + "if positive and any txn read exceeds this, then print a message (for replicas only)") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), "port to listen on (replicas only)") ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), @@ -984,6 +1041,11 @@ cout << desc << endl; return 0; } + + // Validate arguments. + check(min_ops > 0); + check(max_ops > 0); + check(max_ops >= min_ops); } catch (std::exception &ex) { cerr << ex.what() << endl << endl << desc << endl; return 1; @@ -1002,8 +1064,8 @@ // Initialize ST. if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); - st_spawn(bind(handle_sig_sync)); - if (debug_threads) { + my_spawn(bind(handle_sig_sync), "handle_sig_sync"); + if (debug_threads || profile_threads) { st_set_switch_out_cb(switch_out_cb); st_set_switch_in_cb(switch_in_cb); } @@ -1013,6 +1075,26 @@ threads.insert(st_thread_self()); threadnames[st_thread_self()] = "main"; + finally f(lambda() { + if (profile_threads) { + cout << "thread profiling results:" << endl; + long long total; + typedef pair<st_thread_t, long long> entry; + foreach (entry p, threadtimes) { + const string &name = threadname(p.first); + if (name != "main" && name != "handle_sig_sync") + total += p.second; + } + foreach (entry p, threadtimes) { + const string &name = threadname(p.first); + if (name != "main" && name != "handle_sig_sync") + cout << "- " << threadname(p.first) << ": " << p.second + << " (" << (static_cast<double>(p.second) / total) << "%)" + << endl; + } + } + }); + // Which role are we? if (is_leader) { run_leader(minreps, leader_port); Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/tools/analysis.py 2009-01-17 18:56:53 UTC (rev 1133) @@ -1,9 +1,12 @@ #!/usr/bin/env python from __future__ import with_statement -import re, sys, itertools, numpy +import re, sys, itertools +from os.path import basename, realpath from pylab import * +def getname(path): return basename(realpath(path)) + def check(path): with file(path) as f: if 'got timeout' in f.read(): @@ -11,12 +14,21 @@ def agg(src): def gen(): - for seqno, pairs in itertools.groupby(src, lambda (a,b): a): - ts = numpy.array([t for seqno, t in pairs]) - yield seqno, ts.mean(), ts.std(), ts - return list(gen()) + for index, tups in itertools.groupby(src, lambda x: x[0]): + yield list(tups) + a = array(list(gen())) + indexes = a[:,0,0] + means = a.mean(1) + stds = a.std(1) + tup = (indexes,) + for i in range(1, len(a[0,0])): + tup += (means[:,i], stds[:,i]) + stacked = hstack(map(lambda x: x.reshape((len(indexes),1)), tup)) + return tup + (stacked, a) def scaling(path): + print '=== scaling ===' + print 'file:', getname(path) check(path) def getpairs(): with file(path) as f: @@ -24,62 +36,77 @@ m = re.match( r'=== n=(?P<n>\d+) ', line ) if m: n = int(m.group('n')) - m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+)tps', line ) + m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+) ?tps', line ) if m: tps = float(m.group('tps')) yield (n, tps) tups = agg(getpairs()) + ns, tpsmeans, tpssds, stacked, a = agg(getpairs()) + print 'n, tps mean, tps sd' + print stacked + print - print 'num nodes, mean tps, stdev tps' - for n, mean, sd, raw in tups: print n, mean, sd, raw - - xs, ys, es, rs = zip(*tups) - errorbar(xs, ys, es) + errorbar(ns, tpsmeans, tpssds) title('Scaling of baseline throughput with number of nodes') xlabel('Node count') ylabel('Mean TPS (stdev error bars)') - xlim(.5, n+.5) + xlim(ns.min() - .5, ns.max() + .5) ylim(ymin = 0) savefig('scaling.png') def run(blockpath, yieldpath): - for path, label in [(blockpath, 'blocking scheme'), (yieldpath, 'yielding scheme')]: + for path, label in [#(blockpath, 'blocking scheme'), + (yieldpath, 'yielding scheme')]: + print '===', label, '===' + print 'file:', getname(path) check(path) def getpairs(): with file(path) as f: + seqno = recv = buildup = catchup = total = None for line in f: - m = re.match( r'=== seqno=(?P<n>\d+) ', line ) - if m: - seqno = int(m.group('n')) - m = re.match( r'.*: recovering node caught up; took (?P<time>\d+)ms', line ) - if m: - t = float(m.group('time')) - yield (seqno, t) - tups = agg(getpairs()) + m = re.match( r'=== seqno=(?P<seqno>\d+) ', line ) + if m: seqno = int(m.group('seqno')) + m = re.search( r'got recovery message in (?P<time>\d+) ms', line ) + if m: recv = float(m.group('time')) + m = re.search( r'built up .* (?P<time>\d+) ms', line ) + if m: buildup = float(m.group('time')) + m = re.search( r'replayer caught up; from backlog replayed \d+ txns in (?P<time>\d+) ms', line ) + if m: catchup = float(m.group('time')) + m = re.match( r'.*: recovering node caught up; took (?P<time>\d+) ?ms', line ) + if m: total = float(m.group('time')) + tup = (seqno, recv, buildup, catchup, total) + if all(tup): + yield tup + seqno = recv = buildup = catchup = total = None + seqnos, recvmeans, recvsds, buildmeans, buildsds, catchmeans, catchsds, totalmeans, totalsds, stacked, a = agg(getpairs()) - print '===', label, '===' - print 'max seqno, mean time, stdev time [raw data]' - for seqno, mean, sd, raw in tups: print seqno, mean, sd, raw + print 'max seqno, recv mean, recv sd, build mean, build sd, catch mean, catch sd, total mean, total sd' + print stacked print - xs, ys, es, rs = zip(*tups) - errorbar(xs, ys, es, label = label) + width = 5e4 + a = bar(seqnos, recvmeans, yerr = recvsds, width = width, color = 'r', + label = 'State receive') + b = bar(seqnos, buildmeans, yerr = buildsds, width = width, color = 'g', + label = 'Build-up time', bottom = recvmeans) + c = bar(seqnos, catchmeans, yerr = catchsds, width = width, color = 'b', + label = 'Catch-up', bottom = recvmeans + buildmeans) title('Recovery time over number of transactions') xlabel('Transaction count (corresponds roughly to data size)') - ylabel('Mean recovery time in ms (stdev error bars)') - #xlim(.5, n+.5) - #ylim(ymin = 0) + ylabel('Mean time in ms (SD error bars)') + legend(loc = 'upper left') savefig('run.png') def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' - elif sys.argv[1] == 'scaling': - scaling(sys.argv[2] if len(sys.argv) > 2 else 'scaling-log') - elif sys.argv[1] == 'run': - run(*sys.argv[2:] if len(sys.argv) > 2 else ['block-log', 'yield-log']) + elif argv[1] == 'scaling': + scaling(argv[2] if len(argv) > 2 else 'scaling-log') + elif argv[1] == 'run': + run(*argv[2:] if len(argv) > 2 else ['block-log', 'yield-log']) else: - print >> sys.stderr, 'Unknown command:', sys.argv[1] + print >> sys.stderr, 'Unknown command:', argv[1] -sys.exit(main(sys.argv)) +if __name__ == '__main__': + sys.exit(main(sys.argv)) Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/tools/test.bash 2009-01-17 18:56:53 UTC (rev 1133) @@ -269,6 +269,7 @@ full-scaling() { local base=$1 out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) shift + ln -sf $out scaling-log for n in {1..5} ; do # configurations export range="$base $((base + n))" stop @@ -281,13 +282,12 @@ echo done done >& $out - ln -sf $out scaling-log } run-helper() { local leader=$1 shift - tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) ${extraargs:-}" & # -v --debug-threads + tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & # -v --debug-threads sleep .1 # pexpect 'waiting for at least' # Run initial replicas. while (( $# > 1 )) ; do @@ -312,7 +312,7 @@ } full-run() { - for seqno in 100000 300000 500000 700000 900000; do # configurations + for seqno in 500000 400000 300000 200000 100000 ; do # 200000 300000 400000 500000 ; do # 700000 900000; do # configurations stop for i in {1..5} ; do # trials echo === seqno=$seqno i=$i === @@ -327,20 +327,20 @@ full-block() { local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out block-log full-run >& $out - ln -sf $out block-log } full-yield() { local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out yield-log extraargs='--yield-catch-up' full-run >& $out - ln -sf $out yield-log } full() { - #full-block + full-block full-yield - #full-scaling + full-scaling } stop-helper() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |