assorted-commits Mailing List for Assorted projects (Page 31)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-01-31 08:11:34
|
Revision: 1152 http://assorted.svn.sourceforge.net/assorted/?rev=1152&view=rev Author: yangzhang Date: 2009-01-31 08:11:29 +0000 (Sat, 31 Jan 2009) Log Message: ----------- added tocdepth demo Added Paths: ----------- sandbox/trunk/src/tex/tocdepth.tex Added: sandbox/trunk/src/tex/tocdepth.tex =================================================================== --- sandbox/trunk/src/tex/tocdepth.tex (rev 0) +++ sandbox/trunk/src/tex/tocdepth.tex 2009-01-31 08:11:29 UTC (rev 1152) @@ -0,0 +1,20 @@ +\documentclass{article} +\begin{document} +\tableofcontents +\appendix +\section{alpha} +\section{beta} +\section{gamma} +\subsection{alfa} +\subsection{bravo} +\subsubsection{first} +\subsubsection{second} +\subsubsection{third} +\subsubsection{billionth} +\setcounter{tocdepth}{2} % doesn't matter where you set this; can't be "selective" +\subsection{charlie} +\subsubsection{first} +\subsubsection{second} +\subsubsection{third} +\subsubsection{billionth} +\end{document} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-28 23:39:04
|
Revision: 1151 http://assorted.svn.sourceforge.net/assorted/?rev=1151&view=rev Author: yangzhang Date: 2009-01-28 23:38:55 +0000 (Wed, 28 Jan 2009) Log Message: ----------- fixed up a bit to require gcc/build-essential and wget/GET; runs on ubuntu server Modified Paths: -------------- configs/trunk/bootstrap.bash Modified: configs/trunk/bootstrap.bash =================================================================== --- configs/trunk/bootstrap.bash 2009-01-28 20:45:15 UTC (rev 1150) +++ configs/trunk/bootstrap.bash 2009-01-28 23:38:55 UTC (rev 1151) @@ -74,11 +74,17 @@ # TODO # check that we have requirements +! use_sudo || type sudo type cat type svn type perl -type wget -! use_sudo || type sudo +type gcc || use_sudo && pkg_install build-essential +type python +if type wget ; then +fetch() { wget -O- "$@" ; } +elif type GET ; then +fetch() { GET "$@" ; } +fi if use_sudo ; then python -c " @@ -115,7 +121,7 @@ # setup toast if ! has toast ; then - wget -O- http://toastball.net/toast/toast | dosu perl -x - arm toast + fetch http://toastball.net/toast/toast | perl -x - arm toast export PATH="$HOME/.toast/armed/bin:$PATH" fi This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-28 20:45:22
|
Revision: 1150 http://assorted.svn.sourceforge.net/assorted/?rev=1150&view=rev Author: yangzhang Date: 2009-01-28 20:45:15 +0000 (Wed, 28 Jan 2009) Log Message: ----------- added dyndns-allocator Added Paths: ----------- sandbox/trunk/src/one-off-scripts/dyndns-allocator/ sandbox/trunk/src/one-off-scripts/dyndns-allocator/README sandbox/trunk/src/one-off-scripts/dyndns-allocator/find.bash sandbox/trunk/src/one-off-scripts/dyndns-allocator/results.bz2 Added: sandbox/trunk/src/one-off-scripts/dyndns-allocator/README =================================================================== --- sandbox/trunk/src/one-off-scripts/dyndns-allocator/README (rev 0) +++ sandbox/trunk/src/one-off-scripts/dyndns-allocator/README 2009-01-28 20:45:15 UTC (rev 1150) @@ -0,0 +1 @@ +A simple tool for finding available two-letter names under ath.cx. Added: sandbox/trunk/src/one-off-scripts/dyndns-allocator/find.bash =================================================================== --- sandbox/trunk/src/one-off-scripts/dyndns-allocator/find.bash (rev 0) +++ sandbox/trunk/src/one-off-scripts/dyndns-allocator/find.bash 2009-01-28 20:45:15 UTC (rev 1150) @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +for i in {a..z} {0..9} ; do + for j in {a..z} {0..9} ; do + host $i$j.ath.cx + done +done Property changes on: sandbox/trunk/src/one-off-scripts/dyndns-allocator/find.bash ___________________________________________________________________ Added: svn:executable + * Added: sandbox/trunk/src/one-off-scripts/dyndns-allocator/results.bz2 =================================================================== (Binary files differ) Property changes on: sandbox/trunk/src/one-off-scripts/dyndns-allocator/results.bz2 ___________________________________________________________________ Added: svn:mime-type + application/octet-stream This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-28 07:19:58
|
Revision: 1149 http://assorted.svn.sourceforge.net/assorted/?rev=1149&view=rev Author: yangzhang Date: 2009-01-28 07:19:46 +0000 (Wed, 28 Jan 2009) Log Message: ----------- added gtk event quirks exploration Added Paths: ----------- sandbox/trunk/src/py/gtk/ sandbox/trunk/src/py/gtk/eventquirks.py Added: sandbox/trunk/src/py/gtk/eventquirks.py =================================================================== --- sandbox/trunk/src/py/gtk/eventquirks.py (rev 0) +++ sandbox/trunk/src/py/gtk/eventquirks.py 2009-01-28 07:19:46 UTC (rev 1149) @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +""" +http://www.mail-archive.com/py...@da.../msg17197.html +""" + +from gtk import * + +e = Entry() +def handler(*args): print e.get_window().get_geometry() +#e.connect('focus-in-event', handler) +e.connect('configure-event', handler) + +h = HBox() +h.pack_start(e) +w = Window() + +w.add(h) +w.show_all() + +main() Property changes on: sandbox/trunk/src/py/gtk/eventquirks.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-28 07:07:57
|
Revision: 1148 http://assorted.svn.sourceforge.net/assorted/?rev=1148&view=rev Author: yangzhang Date: 2009-01-28 07:07:48 +0000 (Wed, 28 Jan 2009) Log Message: ----------- added c++ container benchmark Added Paths: ----------- container-bench/ container-bench/trunk/ container-bench/trunk/README container-bench/trunk/src/ container-bench/trunk/src/Makefile container-bench/trunk/src/bench.cc container-bench/trunk/tools/ container-bench/trunk/tools/analysis.py Added: container-bench/trunk/README =================================================================== --- container-bench/trunk/README (rev 0) +++ container-bench/trunk/README 2009-01-28 07:07:48 UTC (rev 1148) @@ -0,0 +1,24 @@ +Overview +-------- + +Benchmark of various associative containers for C++. + +Part of the [H-Store] project, to determine the performance behavior of various +readily available options. + +Setup +----- + +Requirements: + +- [Boost] 1.37 +- [C++ Commons] + +[Boost]: http://boost.org/ +[C++ Commons]: http://assorted.sf.net/cpp-commons/ + +Analysis tools requirements: + +- [Matplotlib] 0.98.3 + +[Matplotlib]: http://matplotlib.sf.net/ Added: container-bench/trunk/src/Makefile =================================================================== --- container-bench/trunk/src/Makefile (rev 0) +++ container-bench/trunk/src/Makefile 2009-01-28 07:07:48 UTC (rev 1148) @@ -0,0 +1,9 @@ +CXXFLAGS += -O3 -Wall +BINS := bench + +all: $(BINS) + +clean: + rm -f $(BINS) + +.PHONY: clean Added: container-bench/trunk/src/bench.cc =================================================================== --- container-bench/trunk/src/bench.cc (rev 0) +++ container-bench/trunk/src/bench.cc 2009-01-28 07:07:48 UTC (rev 1148) @@ -0,0 +1,96 @@ +#include <map> +#include <iostream> +#include <vector> +#include <commons/rand.h> +#include <commons/time.h> +#include <stx/btree_map.h> +#include <boost/foreach.hpp> +#include <tr1/unordered_map> +#define foreach BOOST_FOREACH +using namespace std; +using namespace commons; +using namespace stx; +using namespace tr1; + +enum { len = 1000000 }; +int *xs; + +template<typename T> +inline void +load(T &m, const string &label) +{ + long long start = current_time_millis(); + for (int i = 0; i < len; i++) { + m[i] = xs[i]; + } + long long end = current_time_millis(); + cout << label << ": " << end - start << " ms" << endl; +} + +template<typename T> +inline void +scan(T &m, const string &label) +{ + long long start = current_time_millis(); + typedef pair<int, int> pii; + foreach (pii p, m) { + xs[p.first] = p.second; + } + long long end = current_time_millis(); + cout << label << ": " << end - start << " ms" << endl; +} + +template<typename T> +inline void +index(T &m, const string &label) +{ + long long start = current_time_millis(); + typedef pair<int, int> pii; + for (int i = 0; i < len; i++) { + xs[i] = m[i]; + } + long long end = current_time_millis(); + cout << label << ": " << end - start << " ms" << endl; +} + +int main(int argc, char **argv) { + int mode = atoi(argv[1]); + int nreps = 2; + + xs = new int[len]; + for (int i = 0; i < len; i++) { + xs[i] = rand(); + } + + for (int r = 0; r < nreps; r++) { + if (mode & 0x1) { + map<int, int> m; + load(m, "map init"); + load(m, "map reload"); + scan(m, "map scan"); + index(m, "map index"); + } + if (mode & 0x2) { + // default trait: 256B/node -> 32pairs/node + btree_map<int, int> m; + load(m, "btree init"); + load(m, "btree reload"); + scan(m, "btree scan"); + index(m, "btree index"); + } + if (mode & 0x4) { + unordered_map<int, int> m; + load(m, "hash init"); + load(m, "hash reload"); + scan(m, "hash scan"); + index(m, "hash index"); + } + if (mode & 0x8) { + int *m = new int[len]; + load(m, "arr init"); + load(m, "arr reload"); + index(m, "arr index"); + } + } + return 0; +} Added: container-bench/trunk/tools/analysis.py =================================================================== --- container-bench/trunk/tools/analysis.py (rev 0) +++ container-bench/trunk/tools/analysis.py 2009-01-28 07:07:48 UTC (rev 1148) @@ -0,0 +1,22 @@ +#!/usr/bin/env python + +from __future__ import with_statement +import sys +from pylab import * + +pairs = ( line.split(': ') for line in sys.stdin if ': ' in line ) +pairs = [ (a, int(b.split()[0])) for (a,b) in pairs ] + +labels, data = zip(*pairs) +xs = arange(len(data))+.5 +width = .5 +bar(xs, data, width = width) + +xlim(0, xs[-1] + width*2) +xticks(xs + width/2, labels, rotation = 90) +title("Performance of associative containers") +# show only bottom/left ticks +gca().get_xaxis().tick_bottom() +gca().get_yaxis().tick_left() + +savefig('results.png') Property changes on: container-bench/trunk/tools/analysis.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-27 23:25:29
|
Revision: 1147 http://assorted.svn.sourceforge.net/assorted/?rev=1147&view=rev Author: yangzhang Date: 2009-01-27 23:25:16 +0000 (Tue, 27 Jan 2009) Log Message: ----------- - added --exit-on-seqno - refactored analysis.py, settled in the sloppy regex extraction approach - brought back scaling analysis - improved the colors and shapes - pretty print raw data tables - cleaned up range/hosts configuration in test.bash (incl. propagation to subscripts) - added len-plotting to analysis - fixed erratic behavior by lowering chkpt to 1K from 10K - added recovery-generation timing - fixed always-multirecover bug - fixed set_yourhost omission for the joiner - added somewhat-off analysis for multirecover Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-26 05:35:42 UTC (rev 1146) +++ ydb/trunk/README 2009-01-27 23:25:16 UTC (rev 1147) @@ -231,32 +231,45 @@ Period: 1/20-1/27 - DONE implement multihost -- TODO add simple, proper timestamped logging -- TODO see how much multihost recovery affects perf -- TODO look again at how much yielding affects perf -- TODO monitor memory usage -- TODO switch to btree -- TODO break down the red bar some more -- TODO see how much time difference there is -- TODO red bar: why are/aren't we saturating bandwidth? -- TODO understand the rest of the perf (eg stl map) -- TODO try scaling up + - not much, it only decreases the xfer time (which orig was thought to be the bottleneck) +- DONE see how much multihost recovery affects perf + - quite a bit! +- DONE look again at how much yielding affects perf + - not much +- DONE break down the red bar some more + - most of the time is spent in the dumping +- DONE understand the rest of the perf (eg stl map) + - DONE why the big jump in 400,000 ops? why all the unexpected ups & downs? + - due to the 10,000-txn quantum; lowering this to 1,000 made everything much + saner + - DONE how does the recovery state xfer time compare to what's expected? + - msgs smaller than expected, eg 300,000 txns * 2*4 bytes per txn = 2.4MB, + but msgs are ~2MB (compression, some random overwrites) + - xfer takes much longer than the theoretical time; 2MB on 1GbE = 16 ms, but + takes more around 50 ms +- DONE start building infrastructure for disk IO + +Period: 1/27- + +- TODO fix up analysis of multihost recovery - TODO implement checkpointing disk-based scheme - TODO implement log-based recovery; show that it sucks - TODO implement group (batch) commit for log-based recovery -- TODO talk - - motivation: log-based sucks, look into alternatives +- TODO try scaling up - TODO serialize outputs from the various clients to a single merger to (1) have ordering over the (timestamped) messages, and (2) avoid interleaved lines - -Period: 1/27- - +- TODO add simple, proper timestamped logging +- TODO see how much clock difference there is among the hosts +- TODO monitor memory usage +- TODO try improving map perf; switch to btree; try bulk loading - TODO detailed view of tps during recovery over time (should see various phases) - TODO later: runtime overhead of logging/tps under normal operation (scaled with # nodes?) - TODO later: timestamped logging? +- TODO talk + - motivation: log-based sucks, look into alternatives Longer term Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-26 05:35:42 UTC (rev 1146) +++ ydb/trunk/src/main.lzz.clamp 2009-01-27 23:25:16 UTC (rev 1147) @@ -37,7 +37,8 @@ // Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops; +int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, + stop_on_seqno; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, @@ -283,34 +284,47 @@ /** * Read a message. This is done in two steps: first by reading the length - * prefix, then by reading the actual body. + * prefix, then by reading the actual body. This function also provides a way + * to measure how much time is spent actually reading the message from the + * network. Such measurement only makes sense for large messages which take a + * long time to receive. * * \param[in] src The socket from which to read. * * \param[in] msg The protobuf to read into. * - * \param[in] timed Whether to make a note of the time at which the first piece of the - * message (the length) was received. Such measurement only makes sense for - * large messages which take a long time to receive. + * \param[out] start_time If not null, record the time at which we start to + * receive the message (after the length is received). * + * \param[out] stop_time If not null, record the time at which we finish + * receiving the message (before we deserialize the protobuf). + * + * \param[out] len If not null, record the size of the serialized message + * in bytes. + * * \param[in] timeout on each of the two read operations (first one is on * length, second one is on the rest). + * + * \return The length of the serialized message. */ template <typename T> -long long -readmsg(st_netfd_t src, T & msg, bool timed = false, st_utime_t timeout = - ST_UTIME_NO_TIMEOUT) +size_t +readmsg(st_netfd_t src, T & msg, long long *start_time = nullptr, long long + *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { // Read the message length. uint32_t len; checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, timeout), static_cast<ssize_t>(sizeof len)); - long long start_receive = timed ? current_time_millis() : -1; + if (start_time != nullptr) + *start_time = current_time_millis(); len = ntohl(len); #define GETMSG(buf) \ checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ + if (stop_time != nullptr) \ + *stop_time = current_time_millis(); \ check(msg.ParseFromArray(buf, len)); // Parse the message body. @@ -323,7 +337,7 @@ GETMSG(buf.get()); } - return start_receive; + return len; } /** @@ -336,7 +350,7 @@ readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { T msg; - readmsg(src, msg, false, timeout); + readmsg(src, msg, nullptr, nullptr, timeout); return msg; } @@ -407,6 +421,11 @@ if (txn.seqno() == accept_joiner_seqno) { accept_joiner.set(); } + + if (txn.seqno() == stop_on_seqno) { + cout << "stopping on issue of seqno " << txn.seqno() << endl; + stop_hub.set(); + } } Txn txn; @@ -600,14 +619,18 @@ mii::const_iterator end = multirecover && mypos < nnodes - 1 ? map.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map.end(); cout << "generating recovery over " << begin->first << ".." - << (end == map.end() ? "end" : lexical_cast<string>(end->first)) - << " (node " << mypos << " of " << nnodes << ")" - << endl; + << (end == map.end() ? "end" : lexical_cast<string>(end->first)); + if (multirecover) + cout << " (node " << mypos << " of " << nnodes << ")"; + cout << endl; + long long start_snap = current_time_millis(); foreach (const pii &p, make_iterator_range(begin, end)) { Recovery_Pair *pair = recovery->add_pair(); pair->set_key(p.first); pair->set_value(p.second); } + cout << "generating recovery took " + << current_time_millis() - start_snap << " ms" << endl; recovery->set_seqno(seqno); send_states.push(recovery); } @@ -799,7 +822,7 @@ // Construct the initialization message. Init init; init.set_txnseqno(0); - init.set_multirecover(true); + init.set_multirecover(multirecover); foreach (replica_info r, replicas) { SockAddr *psa = init.add_node(); psa->set_host(r.host()); @@ -841,14 +864,15 @@ accept_joiner.waitset(); } Join join = readmsg<Join>(joiner); + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); + init.set_yourhost(replicas.back().host()); sendmsg(joiner, init); recover_signals.push(current_time_millis()); // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, ref(recover_signals), false), @@ -959,15 +983,18 @@ recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message. Recovery recovery; - long long receive_start = -1; + long long receive_start = 0, receive_end = 0; + size_t len = 0; { st_intr intr(stop_hub); - receive_start = readmsg(__ref(replicas)[__ctx(i)], recovery, true); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } long long build_start = current_time_millis(); - cout << "got recovery message in " - << build_start - __ref(before_recv) << " ms (xfer took " - << build_start - receive_start << " ms)" << endl; + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); @@ -1116,6 +1143,8 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader only)") + ("exit-on-seqno,X", po::value<int>(&stop_on_seqno)->default_value(-1), + "exit after txn seqno is issued (for leader only)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), "accept recovering joiner (start recovery) after DB grows to this size " @@ -1139,7 +1168,7 @@ ("leader-port,P", po::value<uint16_t>(&leader_port)->default_value(7654), "port the leader listens on") - ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), + ("chkpt,c", po::value<int>(&chkpt)->default_value(1000), "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-26 05:35:42 UTC (rev 1146) +++ ydb/trunk/tools/analysis.py 2009-01-27 23:25:16 UTC (rev 1147) @@ -1,10 +1,12 @@ #!/usr/bin/env python from __future__ import with_statement -import re, sys, itertools +import re, sys, itertools, colorsys from os.path import basename, realpath from pylab import * +class struct(object): pass + def getname(path): return basename(realpath(path)) def check(path): @@ -12,106 +14,137 @@ if 'got timeout' in f.read(): print 'warning: timeout occurred' -def agg(src): +def show_table(pairs): + def fmt(x): + s = str(x) + if s.endswith('.0'): return s[:-2] + p = s.index('.') + return s if p < 0 else s[:p+4] + cols = [ [heading] + map(fmt, col) for (heading, col) in pairs ] + widths = [ max(map(len, col)) for col in cols ] + return '\n'.join( + '|'.join( ('%%%ds' % width) % val for width, val in zip(widths, row) ) + for row in zip(*cols) ) + +def show_table1(dicts): + keys = dicts[0].keys() + return show_table([(k, [d[k] for d in dicts]) for k in keys]) + +def logextract(path, indexkey, pats): + check(path) + # Capture values from log using regex pats. + def getcaps(): + with file(path) as f: + caps = {} # captures: name -> int/float + sats = [ False for pat in pats ] + for line in f: +# if line == '\n': print '===', caps.keys(), ''.join('1' if s else '0' for s in sats) + for i, pat in enumerate(pats): + m = re.search(pat, line) + if m: + for k in m.groupdict(): + if k in caps: + caps[k + '0'] = caps[k] + caps.update((k, float(v)) for k,v in m.groupdict().iteritems()) + sats[i] = True + break + if all(sats): + sats = [ False for pat in pats ] +# print '!!!' + yield caps.copy() # [ caps[k] for k in keys ] + caps.clear() + # Aggregate the captured values. + caps = list(getcaps()) +# print show_table1(caps) + keys = [indexkey] + filter(lambda x: x != indexkey, caps[0].keys()) def gen(): - for index, tups in itertools.groupby(src, lambda x: x[0]): - yield list(tups) - a = array(list(gen())) + for index, ds in itertools.groupby(caps, lambda d: d[indexkey]): + ds = list(ds) + print [d['len'] for d in ds] + yield [ [d[k] for k in keys] for d in ds ] + a = array(list(gen())) # raw results indexes = a[:,0,0] - means = median(a,1) #a.mean(1) - stds = a.std(1) - tup = (indexes,) - for i in range(1, len(a[0,0])): - tup += (means[:,i], stds[:,i]) - stacked = hstack(map(lambda x: x.reshape((len(indexes),1)), tup)) - return tup + (stacked, a) + means = median(a,1) # or a.mean(1) + sds = a.std(1) + # Build result dict. + stacks = [ (indexkey, indexes) ] # no need to agg the index + for i,k in list(enumerate(keys))[1:]: # everything but index + stacks.append((k + ' mean', means[:,i])) + stacks.append((k + ' sd', sds[:,i])) + res = dict(stacks) + res['stacked'] = hstack(map(lambda (_,x): x.reshape((len(indexes), 1)), stacks)) + res['raw'] = a + print show_table(stacks) + print + return res def scaling(path): print '=== scaling ===' print 'file:', getname(path) - check(path) - def getpairs(): - with file(path) as f: - for line in f: - m = re.match( r'=== n=(?P<n>\d+) ', line ) - if m: - n = int(m.group('n')) - m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+) ?tps', line ) - if m: - tps = float(m.group('tps')) - yield (n, tps) - tups = agg(getpairs()) - ns, tpsmeans, tpssds, stacked, a = agg(getpairs()) - print 'n, tps mean, tps sd' - print stacked - print + res = logextract(path, 'n', [ + r'=== n=(?P<n>\d+) ', + r'issued .*\((?P<tps>[.\d]+) tps\)' ]) - errorbar(ns, tpsmeans, tpssds) + errorbar(res['n'], res['tps mean'], res['tps sd']) title('Scaling of baseline throughput with number of nodes') xlabel('Node count') ylabel('Mean TPS (stdev error bars)') - xlim(ns.min() - .5, ns.max() + .5) + xlim(res['n'].min() - .5, res['n'].max() + .5) ylim(ymin = 0) savefig('scaling.png') def run(blockpath, yieldpath): - for path, label in [#(blockpath, 'blocking scheme'), - (yieldpath, 'yielding scheme')]: - print '===', label, '===' + for path, titlestr, name in [#(blockpath, 'blocking scheme', 'block'), + (yieldpath, 'yielding scheme', 'yield')]: + print '===', titlestr, '===' print 'file:', getname(path) - check(path) - def getpairs(): - with file(path) as f: - seqno = dump = recv = buildup = catchup = total = None - for line in f: - m = re.match( r'=== seqno=(?P<seqno>\d+) ', line ) - if m: seqno = int(m.group('seqno')) - m = re.search( r'got recovery message in (?P<dump>\d+) ms \(xfer took (?P<recv>\d+) ms\)', line ) - if m: dump, recv = float(m.group('dump')), float(m.group('recv')) - m = re.search( r'built up .* (?P<time>\d+) ms', line ) - if m: buildup = float(m.group('time')) - m = re.search( r'replayer caught up; from backlog replayed \d+ txns .* in (?P<time>\d+) ms', line ) - if m: catchup = float(m.group('time')) - m = re.match( r'.*: recovering node caught up; took (?P<time>\d+) ?ms', line ) - if m: total = float(m.group('time')) - tup = (seqno, dump, recv, buildup, catchup, total) - if all(tup): - yield tup - seqno = dump = recv = buildup = catchup = total = None - seqnos, dumpmeans, dumpsds, recvmeans, recvsds, buildmeans, buildsds, \ - catchmeans, catchsds, totalmeans, totalsds, stacked, a = \ - agg(getpairs()) + res = logextract(path, 'seqno', + [ r'=== seqno=(?P<seqno>\d+) ', + r'got recovery message of (?P<len>\d+) bytes in (?P<dump>\d+) ms: xfer took (?P<recv>\d+) ms, deserialization took (?P<deser>\d+)', + r'built up .* (?P<buildup>\d+) ms', + r'generating recovery took (?P<gen>\d+) ms', + r'replayer caught up; from backlog replayed \d+ txns .* in (?P<catchup>\d+) ms', + r'.*: recovering node caught up; took (?P<total>\d+) ?ms' ] ) - print 'max seqno, dump mean, dump sd, recv mean, recv sd, build mean, build sd, catch mean, catch sd, total mean, total sd' - print stacked - print - + # Colors and positioning width = 5e4 - # From "zen and tea" on kuler.adobe.com - hue = lambda i: tuple(map(lambda x: float(x)/255, - [( 16, 34, 43), - (149,171, 99), - (189,214,132), - (226,240,214), - (246,255,224)][i+1])) - ehue = lambda i: hue(-1) # tuple(map(lambda x: min(1, x + .3), hue(i))) - bar(seqnos, dumpmeans, yerr = dumpsds, width = width, color = hue(0), - ecolor = ehue(0), label = 'State serialization') - bar(seqnos, recvmeans, yerr = recvsds, width = width, color = hue(0), - ecolor = ehue(0), label = 'State receive', bottom = dumpmeans) - bar(seqnos, buildmeans, yerr = buildsds, width = width, color = hue(1), - ecolor = ehue(1), label = 'Build-up', - bottom = dumpmeans + recvmeans) - bar(seqnos, catchmeans, yerr = catchsds, width = width, color = hue(2), - ecolor = ehue(2), label = 'Catch-up', - bottom = dumpmeans + recvmeans + buildmeans) + step = 1.0 / 5 + hues = ( colorsys.hls_to_rgb(step * i, .7, .5) for i in itertools.count() ) + ehues = ( colorsys.hls_to_rgb(step * i, .3, .5) for i in itertools.count() ) + widths = ( 2 * width - 2 * width / 5 * i for i in itertools.count() ) + offsets = ( width - 2 * width / 5 * i for i in itertools.count() ) + self = struct() + self.bottom = 0 - title('Recovery time over number of transactions') - xlabel('Transaction count (corresponds roughly to data size)') - ylabel('Mean time in ms (SD error bars)') - legend(loc = 'upper left') - savefig('run.png') + clf() + def mybar(yskey, eskey, label): + bar(res['seqno'] - offsets.next(), res[yskey], yerr = res[eskey], width = + widths.next(), color = hues.next(), edgecolor = (1,1,1), ecolor = + ehues.next(), label = label, bottom = self.bottom) + self.bottom += res[yskey] + mybar('dump mean', 'dump sd', 'State dump') + mybar('recv mean', 'recv sd', 'State receive') + mybar('deser mean', 'deser sd', 'State deserialization') + mybar('buildup mean', 'buildup sd', 'Build-up') + mybar('catchup mean', 'catchup sd', 'Catch-up') + + title('Recovery time of ' + titlestr + ' over data size') + xlabel('Transaction count (corresponds roughly to data size)') + ylabel('Mean time in ms (SD error bars)') + legend(loc = 'upper left') + + ax2 = twinx() + col = colorsys.hls_to_rgb(.6, .4, .4) + ax2.errorbar(res['seqno'], res['len mean'] / 1024, res['len sd'] / 1024, marker = 'o', + color = col) + ax2.set_ylabel('Size of serialized state (KB)', color = col) + ax2.set_ylim(ymin = 0) + for tl in ax2.get_yticklabels(): tl.set_color(col) + + xlim(xmin = min(res['seqno']) - width, xmax = max(res['seqno']) + width) + savefig(name + '.png') + def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-26 05:35:42 UTC (rev 1146) +++ ydb/trunk/tools/test.bash 2009-01-27 23:25:16 UTC (rev 1147) @@ -9,17 +9,22 @@ tagssh() { ssh "$@" 2>&1 | python -u -c ' -import time, sys +import time, sys, socket +# def fmt(*xs): return " ".join(map(str, xs)) + "\n" +# s = socket.socket() +# s.connect(("localhost", 9876)) +# f = s.makefile() +f = sys.stdout while True: line = sys.stdin.readline() if line == "": break - print sys.argv[1], time.time(), ":\t", line, + print >> f, sys.argv[1], time.time(), ":\t", line, ' $1 } check-remote() { - if [[ ${force:-asdf} != asdf && `hostname` == yang-xps410 ]] - then echo 'running a remote command on your pc!' 1>&2 && exit 1 + if [[ ! ${remote:-} ]] + then 'running a remote command on your pc!' fi } @@ -129,36 +134,11 @@ local host="$1" shift scp -q "$(dirname "$0")/$script" "$host:" - tagssh "$host" "./$script" "$@" + tagssh "$host" "remote=1 ./$script" "$@" } -hosts() { - if [[ ${host:-} ]] ; then - echo $host - elif [[ ${range:-} ]] ; then - seq $range | sed 's/^/farm/; s/$/.csail/' - else - cat << EOF -farm1.csail -farm2.csail -farm3.csail -farm4.csail -farm5.csail -farm6.csail -farm7.csail -farm8.csail -farm9.csail -farm10.csail -farm11.csail -farm12.csail -farm13.csail -farm14.csail -EOF - fi -} - parhosts() { - hosts | xargs ${xargs--P9} -I^ "$@" + echo -n $hosts | xargs ${xargs--P9} -d' ' -I^ "$@" } parssh() { @@ -170,6 +150,7 @@ } parremote() { + export hosts range parhosts "./$script" remote ^ "$@" } @@ -235,7 +216,7 @@ " } -hosttops() { +tops() { xargs= parssh " echo hostname @@ -245,22 +226,17 @@ } hostargs() { - if [[ $range ]] - then "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') - else "$@" ${hosts[@]} - fi + "$@" $hosts } scaling-helper() { local leader=$1 shift - tagssh $leader "ydb/src/ydb -l -n $#" & + tagssh $leader "ydb/src/ydb -l -n $# -X 100000" & sleep .1 for rep in "$@" do tagssh $rep "ydb/src/ydb -n $# -H $leader" & done - sleep ${wait1:-10} - tagssh $leader 'pkill -sigint ydb' wait } @@ -274,21 +250,23 @@ # TODO: fix this to work also with `hosts`; move into repeat-helper that's run # via hostargs, and change the range= to hosts= full-scaling() { - local base=$1 out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) - shift + local out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) ln -sf $out scaling-log - for n in {1..5} ; do # configurations - export range="$base $((base + n))" + for n in `seq $maxn -1 1` ; do # configurations stop for i in {1..5} ; do # trials echo === n=$n i=$i === + echo === n=$n i=$i === > `tty` scaling sleep 1 stop sleep .1 echo done + hosts="${hosts% *}" done >& $out + hosts="$orighosts" } run-helper() { @@ -324,6 +302,7 @@ stop for i in {1..5} ; do # trials echo === seqno=$seqno i=$i === + echo === seqno=$seqno i=$i === > `tty` run sleep 1 stop @@ -342,15 +321,9 @@ full-yield() { local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) ln -sf $out yield-log - extraargs='--yield-catch-up' full-run >& $out + extraargs="--yield-catch-up ${extraargs:-}" full-run >& $out } -full() { - full-block - full-yield - full-scaling -} - stop-helper() { tagssh $1 'pkill -sigint ydb' } @@ -375,8 +348,32 @@ # Use mssh to log in with password as root to each machine. mssh-root() { - : "${hosts:="$(hosts)"}" mssh -l root "$@" } -"$@" +# Set up hosts. +confighosts() { + if [[ ! ${remote:-} ]] ; then + if [[ ! "${hosts:-}" && ! "${range:-}" ]] + then range='1 14'; echo "warning: running with farms 1..14" 1>&2 + fi + if [[ "${range:-}" ]] + then hosts="$( seq $range | sed 's/^/farm/' )" + fi + hosts="$( echo -n $hosts )" + fi +} + +# Set up logger. +configlogger() { + if [[ ! ${remote:-} ]] ; then + ( + flock -n /tmp/ydbtest.socket + ) > /tmp/y + fi +} + +confighosts +#configlogger + +eval "$@" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-26 05:35:47
|
Revision: 1146 http://assorted.svn.sourceforge.net/assorted/?rev=1146&view=rev Author: yangzhang Date: 2009-01-26 05:35:42 +0000 (Mon, 26 Jan 2009) Log Message: ----------- added gstdemo, ranges Added Paths: ----------- sandbox/trunk/src/py/gstdemo.py sandbox/trunk/src/py/ranges.py Added: sandbox/trunk/src/py/gstdemo.py =================================================================== --- sandbox/trunk/src/py/gstdemo.py (rev 0) +++ sandbox/trunk/src/py/gstdemo.py 2009-01-26 05:35:42 UTC (rev 1146) @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +# Demo of gst, from following the tutorial at +# <http://pygstdocs.berlios.de/pygst-tutorial/>. Pass in full path to an audio +# file. + +# You can also do: +# gst-launch-0.10 playbin uri=file://<PATH> + +import gst, urllib, sys, time +player = gst.element_factory_make('playbin') +player.set_property('uri', 'file://' + urllib.quote(sys.argv[1])) +print 'play' +player.set_state(gst.STATE_PLAYING) +time.sleep(1) # running in another thread +print 'still playing' +player.set_state(gst.STATE_PLAYING) +time.sleep(1) +print 'pause' +player.set_state(gst.STATE_PAUSED) +time.sleep(.5) +print 'play' +player.set_state(gst.STATE_PLAYING) +time.sleep(.5) +print 'stop' +player.set_state(gst.STATE_NULL) +time.sleep(.5) +print 'play; note that we have been reset' +player.set_state(gst.STATE_PLAYING) +time.sleep(2) +timefmt = gst.Format(gst.FORMAT_TIME) +# if you call this in STATE_NULL then you'll get QueryError: query failed +posns = player.query_position(timefmt, None)[0] +durns = player.query_duration(timefmt, None)[0] +print timefmt, posns / 1e9, durns / 1e9 +# "A non flushing seek might take some time to perform as the currently playing +# data in the pipeline will not be cleared. An accurate seek might be slower +# for formats that don't have any indexes or timestamp markers in the stream. +# Specifying this flag might require a complete scan of the file in those +# cases." From +# <http://pygstdocs.berlios.de/pygst-reference/gst-constants.html#gst-state-constants> +print 'seek end' +player.seek_simple(timefmt, gst.SEEK_FLAG_FLUSH, durns) +time.sleep(.5) +print 'seek 50%' +player.seek_simple(timefmt, gst.SEEK_FLAG_FLUSH, durns / 2) +time.sleep(1) +print 'done' Property changes on: sandbox/trunk/src/py/gstdemo.py ___________________________________________________________________ Added: svn:executable + * Added: sandbox/trunk/src/py/ranges.py =================================================================== --- sandbox/trunk/src/py/ranges.py (rev 0) +++ sandbox/trunk/src/py/ranges.py 2009-01-26 05:35:42 UTC (rev 1146) @@ -0,0 +1,6 @@ +# Sanity-checking my own grasp on range partitioning and int division. +for i in range(10): + for j in range(10): + for x in range(j): + print 'max=%d n=%d pos=%d: %d..%d' % (i,j,x,i*x/j,i*(x+1)/j if x<j-1 else i) + print This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-25 09:31:06
|
Revision: 1145 http://assorted.svn.sourceforge.net/assorted/?rev=1145&view=rev Author: yangzhang Date: 2009-01-25 09:30:59 +0000 (Sun, 25 Jan 2009) Log Message: ----------- updated reminder Modified Paths: -------------- music-labeler/trunk/publish.bash Modified: music-labeler/trunk/publish.bash =================================================================== --- music-labeler/trunk/publish.bash 2009-01-25 09:30:18 UTC (rev 1144) +++ music-labeler/trunk/publish.bash 2009-01-25 09:30:59 UTC (rev 1145) @@ -1,7 +1,7 @@ #!/usr/bin/env bash echo 'Remember to keep versions in sync in all three locations:' -echo '__init__.py, README (Changes), setup.bash, and setup.py' +echo 'README (Changes), setup.bash, and setup.py' fullname='Music Labeler' version=0.1 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-25 09:30:21
|
Revision: 1144 http://assorted.svn.sourceforge.net/assorted/?rev=1144&view=rev Author: yangzhang Date: 2009-01-25 09:30:18 +0000 (Sun, 25 Jan 2009) Log Message: ----------- added some meta files for the project Added Paths: ----------- music-labeler/trunk/publish.bash music-labeler/trunk/setup.bash music-labeler/trunk/setup.py Copied: music-labeler/trunk/publish.bash (from rev 1123, python-commons/trunk/publish.bash) =================================================================== --- music-labeler/trunk/publish.bash (rev 0) +++ music-labeler/trunk/publish.bash 2009-01-25 09:30:18 UTC (rev 1144) @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +echo 'Remember to keep versions in sync in all three locations:' +echo '__init__.py, README (Changes), setup.bash, and setup.py' + +fullname='Music Labeler' +version=0.1 +license=gpl3 +websrcs=( README ) +rels=( pypi: ) +. assorted.bash "$@" Property changes on: music-labeler/trunk/publish.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Copied: music-labeler/trunk/setup.bash (from rev 1123, python-commons/trunk/setup.bash) =================================================================== --- music-labeler/trunk/setup.bash (rev 0) +++ music-labeler/trunk/setup.bash 2009-01-25 09:30:18 UTC (rev 1144) @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +pkg=music-labeler +. simple-setup.bash + +install_strip bin/ src/ml.py Property changes on: music-labeler/trunk/setup.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Copied: music-labeler/trunk/setup.py (from rev 1123, python-commons/trunk/setup.py) =================================================================== --- music-labeler/trunk/setup.py (rev 0) +++ music-labeler/trunk/setup.py 2009-01-25 09:30:18 UTC (rev 1144) @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# -*- mode: python; tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4; -*- +# vim:ft=python:et:sw=4:ts=4 + +from commons import setup + +# XXX: adjust the details here! + +pkg_info_text = """ +Metadata-Version: 1.1 +Name: music-labeler +Version: 0.1 +Author: Yang Zhang +Author-email: yaaang NOSPAM at REMOVECAPS gmail +Home-page: http://assorted.sourceforge.net/python-commons +Summary: Python Commons +License: Python Software Foundation License +Description: Music labeling (categorization) tool. +Keywords: Python,common,commons,utility,utilities,library,libraries +Platform: any +Provides: commons +Classifier: Development Status :: 4 - Beta +Classifier: Environment :: No Input/Output (Daemon) +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: Python Software Foundation License +Classifier: Operating System :: OS Independent +Classifier: Programming Language :: Python +Classifier: Topic :: Communications +Classifier: Topic :: Database +Classifier: Topic :: Internet +Classifier: Topic :: Software Development :: Libraries :: Python Modules +Classifier: Topic :: System +Classifier: Topic :: System :: Filesystems +Classifier: Topic :: System :: Logging +Classifier: Topic :: System :: Networking +Classifier: Topic :: Text Processing +Classifier: Topic :: Utilities +""" + +setup.run_setup( pkg_info_text, + #scripts = ['frontend/py_hotshot.py'], + ) Property changes on: music-labeler/trunk/setup.py ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-25 09:24:24
|
Revision: 1143 http://assorted.svn.sourceforge.net/assorted/?rev=1143&view=rev Author: yangzhang Date: 2009-01-25 09:24:15 +0000 (Sun, 25 Jan 2009) Log Message: ----------- added music playback and control using GST! Modified Paths: -------------- music-labeler/trunk/README music-labeler/trunk/src/ml.py Modified: music-labeler/trunk/README =================================================================== --- music-labeler/trunk/README 2009-01-25 06:26:15 UTC (rev 1142) +++ music-labeler/trunk/README 2009-01-25 09:24:15 UTC (rev 1143) @@ -10,14 +10,11 @@ Requirements: - [pygtk] 2.13 -- [Mutagen] 1.14 - -Requirements for [Quodlibet] frontend: - +- [GST] 0.10 - [Quodlibet] 1.0 [pygtk]: http://www.pygtk.org/ -[Mutagen]: http://code.google.com/p/quodlibet/wiki/Development/Mutagen +[GST]: http://gstreamer.freedesktop.org/modules/gst-python.html [Quodlibet]: http://code.google.com/p/quodlibet/ Related Modified: music-labeler/trunk/src/ml.py =================================================================== --- music-labeler/trunk/src/ml.py 2009-01-25 06:26:15 UTC (rev 1142) +++ music-labeler/trunk/src/ml.py 2009-01-25 09:24:15 UTC (rev 1143) @@ -4,7 +4,7 @@ from gtk import * from cgi import escape from cStringIO import StringIO -import itertools +import itertools, gst, urllib import gtk.keysyms as k, gtk.gdk as gdk from os.path import expanduser from path import path @@ -50,6 +50,9 @@ def labeler(pls, labels, track_paths): sep = ';' + player = gst.element_factory_make('playbin') + timefmt = gst.Format(gst.FORMAT_TIME) + w = Window() w.set_title('Music Labeler') w.connect('delete-event', lambda *args: False) @@ -192,6 +195,20 @@ sel.select_path(pos) t.scroll_to_cell(pos, None, True, .5, .5) return True + elif ev.keyval in [k._1, k._2, k._3, k._4, k._5] and \ + ev.state & gdk.CONTROL_MASK: + # Seek to 0%, 25%, 50%, 75%, or 100% positions. + pos = float(ev.keyval - k._1) / 4 * player.query_duration(timefmt)[0] + player.seek_simple(timefmt, gst.SEEK_FLAG_FLUSH, pos) + return True + elif ev.keyval in [k.period, k.comma] and \ + ev.state & gdk.CONTROL_MASK: + # Seek forward/backward 5 seconds. + pos = player.query_position(timefmt)[0] + dir = 1 if ev.keyval == k.comma else 1 + debug('seek from', pos / 1e9, 'to', pos + dir * 5e9) + player.seek_simple(timefmt, gst.SEEK_FLAG_FLUSH, pos + dir * 5e9) + return True elif ev.keyval in [k.Tab]: # Tab completes. complete() @@ -234,6 +251,14 @@ @connecting_after(e, 'focus-in-event') # TODO hack; too early on resizes def show_popup(*args): + # Play the track if not already playing. + uri = 'file://' + urllib.quote(track_path) + if player.get_property('uri') != uri: + player.set_state(gst.STATE_NULL) + player.set_property('uri', uri) + player.set_state(gst.STATE_PLAYING) + + # Show the popup. tw.show_all() win = e.get_window() _,_,ew,eh,_ = win.get_geometry() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-25 06:26:22
|
Revision: 1142 http://assorted.svn.sourceforge.net/assorted/?rev=1142&view=rev Author: yangzhang Date: 2009-01-25 06:26:15 +0000 (Sun, 25 Jan 2009) Log Message: ----------- - proper parsing of metadata - fixed modifier mask handling - don't write if no change Modified Paths: -------------- music-labeler/trunk/src/ml.py Modified: music-labeler/trunk/src/ml.py =================================================================== --- music-labeler/trunk/src/ml.py 2009-01-25 03:12:07 UTC (rev 1141) +++ music-labeler/trunk/src/ml.py 2009-01-25 06:26:15 UTC (rev 1142) @@ -3,9 +3,9 @@ from __future__ import with_statement from gtk import * from cgi import escape +from cStringIO import StringIO import itertools import gtk.keysyms as k, gtk.gdk as gdk -from mutagen import File from os.path import expanduser from path import path from collections import defaultdict @@ -13,6 +13,7 @@ # Quod Libet messiness. sys.path.append('/usr/share/quodlibet/') import util +import formats util.gettext_install() from browsers.playlists import Playlist quote, unquote = Playlist.quote, Playlist.unquote @@ -69,7 +70,7 @@ def make_e(track_path): - meta = File(track_path) + meta = formats.MusicFile(track_path) def refresh(): 'Called when the list is changed.' @@ -174,14 +175,14 @@ refresh() if ev.keyval in [k.Down, k.Up] or \ - (ev.keyval in [k.n, k.p] and ev.state == gdk.CONTROL_MASK): + (ev.keyval in [k.n, k.p] and ev.state & gdk.CONTROL_MASK): # Up/down selects prev/next row in the list, if possible, and scroll to # center that row. sel = t.get_selection() mdl, itr = sel.get_selected() assert mdl is l dir = 1 if ev.keyval == k.Down or \ - (ev.keyval == k.n and ev.state == gdk.CONTROL_MASK) else -1 + (ev.keyval == k.n and ev.state & gdk.CONTROL_MASK) else -1 if itr is not None: (pos,) = l.get_path(itr) pos += dir @@ -202,7 +203,7 @@ txt = get_mid() if txt == '': # Move on to next track. - dir,inc = ('prev',-1) if ev.state == gdk.SHIFT_MASK else ('next',1) + dir,inc = ('prev',-1) if ev.state & gdk.SHIFT_MASK else ('next',1) debug('moving to %s track' % dir) nexte = es[(es.index(e) + inc) % len(es)] nexte.grab_focus() @@ -244,11 +245,12 @@ debug('win.orig:', win.get_origin()) debug('win.pos:', win.get_position()) tw.set_size_request(ew,-1) - tw.move(ex,ey+eh) + # Position the popup to the right. + tw.move(ex+ew,ey) row = rows.next() - caption = '%s - %s' % (trunc(meta.get('TPE1','Unknown Artist')), - trunc(meta.get('TIT2','Untitled'))) + caption = '%s - %s' % (trunc(meta.get('artist', '[Unknown Artist]')), + trunc(meta.get('title', '[Untitled]'))) tab.attach(Label(str = caption), 0, 1, row, row+1) tab.attach(e, 1, 2, row, row+1) es.append(e) @@ -301,11 +303,21 @@ # Run the labeler. labeler(pls, labels, track_paths) - # Output the new playlists. + # Output the new playlists, but only if there's any difference. for lbl, pl in pls.iteritems(): - with file(pldir / quote(lbl), 'w') as f: - for track_path in sorted(pl, key = str.lower): - print >> f, track_path + f = StringIO() + for track_path in sorted(pl, key = str.lower): + print >> f, track_path + new = f.getvalue() + if not (pldir / quote(lbl)).exists(): + old = '' + else: + with file(pldir / quote(lbl)) as f: + old = f.read() + if old != new != '': + print 'updating', lbl + with file(pldir / quote(lbl), 'w') as f: + f.write(new) if __name__ == '__main__': mainwin() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-25 03:12:13
|
Revision: 1141 http://assorted.svn.sourceforge.net/assorted/?rev=1141&view=rev Author: yangzhang Date: 2009-01-25 03:12:07 +0000 (Sun, 25 Jan 2009) Log Message: ----------- - added output back to playlists - refactored to separate the core gui from the quodlibet ETL - added tools.bash Modified Paths: -------------- music-labeler/trunk/README music-labeler/trunk/src/ml.py Added Paths: ----------- music-labeler/trunk/tools.bash Modified: music-labeler/trunk/README =================================================================== --- music-labeler/trunk/README 2009-01-24 23:57:46 UTC (rev 1140) +++ music-labeler/trunk/README 2009-01-25 03:12:07 UTC (rev 1141) @@ -12,8 +12,13 @@ - [pygtk] 2.13 - [Mutagen] 1.14 +Requirements for [Quodlibet] frontend: + +- [Quodlibet] 1.0 + [pygtk]: http://www.pygtk.org/ [Mutagen]: http://code.google.com/p/quodlibet/wiki/Development/Mutagen +[Quodlibet]: http://code.google.com/p/quodlibet/ Related ------- @@ -21,3 +26,9 @@ Another application that demonstrates GTK autocompletion is [drun] [drun]: http://sourceforge.net/projects/drun/ + +Todo +---- + +- Integrate into quodlibet. +- Move to quodlibet 2.0. Modified: music-labeler/trunk/src/ml.py =================================================================== --- music-labeler/trunk/src/ml.py 2009-01-24 23:57:46 UTC (rev 1140) +++ music-labeler/trunk/src/ml.py 2009-01-25 03:12:07 UTC (rev 1141) @@ -7,7 +7,22 @@ import gtk.keysyms as k, gtk.gdk as gdk from mutagen import File from os.path import expanduser +from path import path +from collections import defaultdict +# Quod Libet messiness. +sys.path.append('/usr/share/quodlibet/') +import util +util.gettext_install() +from browsers.playlists import Playlist +quote, unquote = Playlist.quote, Playlist.unquote + +# General stuff. + +do_debug = False +def debug(*args): + if do_debug: print ' '.join(map(str,args)) + cap = 50 def trunc(s): s = str(s) @@ -15,8 +30,7 @@ class struct(object): pass -def debug(*args): - if do_debug: print ' '.join(map(str,args)) +# GTK+ helpers. def connecting(widget, signal): def wrapper(handler): @@ -30,7 +44,9 @@ return handler return wrapper -def mainwin(): +# Main GUI layer. + +def labeler(pls, labels, track_paths): sep = ';' w = Window() @@ -51,14 +67,18 @@ es = [] rows = itertools.count() - def make_e(track): + def make_e(track_path): + meta = File(track_path) + def refresh(): 'Called when the list is changed.' labels.sort(key = str.lower) on_change() e = Entry() + e.set_text('; '.join(sorted(lbl for lbl, paths in pls.iteritems() + if track_path in paths) + [''])) self = struct() def get_pieces(do_strip = False, truncate = False): @@ -123,7 +143,7 @@ l.append([m]) # Something is always selected in the list. t.get_selection().select_path(0) - t.scroll_to_cell(0) + if len(l) > 0: t.scroll_to_cell(0) @connecting(e, 'key-press-event') def on_key_press(src, ev): @@ -227,25 +247,65 @@ tw.move(ex,ey+eh) row = rows.next() - caption = '%s - %s' % (trunc(track.get('TPE1','Unknown Artist')), - trunc(track.get('TIT2','Untitled'))) + caption = '%s - %s' % (trunc(meta.get('TPE1','Unknown Artist')), + trunc(meta.get('TIT2','Untitled'))) tab.attach(Label(str = caption), 0, 1, row, row+1) tab.attach(e, 1, 2, row, row+1) es.append(e) + return e + # Build row per track in New playlist. + path2e = {} + for track_path in track_paths: + path2e[track_path] = make_e(track_path) + # Final steps. - with file(expanduser('~/.quodlibet/playlists/New')) as f: - for line in f: - make_e(File(line.rstrip())) w.set_position(WIN_POS_CENTER) debug('size req:', tab.size_request()) w.set_default_size(800,600) w.show_all() + es[0].set_position(-1) main() + # Update playlists in memory. + newpls = defaultdict(set) + for track_path, e in path2e.iteritems(): + # Remove from all playlists. + for pl in pls.itervalues(): + pl.discard(track_path) + # Add to just the specified playlists. + for lbl in filter(lambda x: x, map(str.strip, e.get_text().split(';'))): + if lbl not in pls: pls[lbl] = set() + pls[lbl].add(track_path) + +# Code specific to quodlibet. + +def mainwin(): + "These are the actual tracks to use." + from os import listdir + pldir = path('~/.quodlibet/playlists/').expanduser() + + # Build dict mapping playlist to set of track paths. + pls = {} + specpl = 'New' + for pl in pldir.files(): + if pl.basename() != specpl: + with file(pl) as f: + pls[unquote(pl.basename())] = set(track.rstrip() for track in f) + labels = pls.keys() + + # The list of tracks to label. + with file(pldir / specpl) as f: + track_paths = map(str.rstrip, f) + + # Run the labeler. + labeler(pls, labels, track_paths) + + # Output the new playlists. + for lbl, pl in pls.iteritems(): + with file(pldir / quote(lbl), 'w') as f: + for track_path in sorted(pl, key = str.lower): + print >> f, track_path + if __name__ == '__main__': - from os import listdir - from urllib import quote, unquote - labels = map(unquote, listdir(expanduser('~/.quodlibet/playlists/'))) - do_debug = False mainwin() Added: music-labeler/trunk/tools.bash =================================================================== --- music-labeler/trunk/tools.bash (rev 0) +++ music-labeler/trunk/tools.bash 2009-01-25 03:12:07 UTC (rev 1141) @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -o errexit -o nounset + +for i in $1/* ; do + python -c " +s = '\n'.join(sorted(filter(lambda x: x!='', set(file('$i').read().split('\n'))), key=str.lower)) +print >> file('$i', 'w'), s" +done Property changes on: music-labeler/trunk/tools.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-24 23:57:54
|
Revision: 1140 http://assorted.svn.sourceforge.net/assorted/?rev=1140&view=rev Author: yangzhang Date: 2009-01-24 23:57:46 +0000 (Sat, 24 Jan 2009) Log Message: ----------- - added multi-track tagging support with song labels in scrolled, tabular layout - added track loading from the "New" playlist - fixed up the pop-up window drawing - added semi-colon/return/shift-return handling Modified Paths: -------------- music-labeler/trunk/README music-labeler/trunk/src/ml.py Modified: music-labeler/trunk/README =================================================================== --- music-labeler/trunk/README 2009-01-24 11:01:40 UTC (rev 1139) +++ music-labeler/trunk/README 2009-01-24 23:57:46 UTC (rev 1140) @@ -10,8 +10,11 @@ Requirements: - [pygtk] 2.13 -- [Python Commons] +- [Mutagen] 1.14 +[pygtk]: http://www.pygtk.org/ +[Mutagen]: http://code.google.com/p/quodlibet/wiki/Development/Mutagen + Related ------- Modified: music-labeler/trunk/src/ml.py =================================================================== --- music-labeler/trunk/src/ml.py 2009-01-24 11:01:40 UTC (rev 1139) +++ music-labeler/trunk/src/ml.py 2009-01-24 23:57:46 UTC (rev 1140) @@ -1,9 +1,18 @@ #!/usr/bin/env python +from __future__ import with_statement from gtk import * from cgi import escape +import itertools import gtk.keysyms as k, gtk.gdk as gdk +from mutagen import File +from os.path import expanduser +cap = 50 +def trunc(s): + s = str(s) + return s[:cap] + '...' if len(s) > cap else s + class struct(object): pass def debug(*args): @@ -29,180 +38,214 @@ w.connect('delete-event', lambda *args: False) w.connect('destroy', lambda *args: main_quit()) - h = HBox() - w.add(h) + tab = Table(rows=1, columns=2) + # TODO: make main window scrollable + if 1: + s = ScrolledWindow() + s.add_with_viewport(tab) + s.set_policy(POLICY_AUTOMATIC, POLICY_AUTOMATIC) + w.add(s) + else: + w.add(tab) es = [] + rows = itertools.count() - def refresh(): - 'Called when the list is changed.' - labels.sort(key = str.lower) - on_change() + def make_e(track): - e = Entry() - self = struct() + def refresh(): + 'Called when the list is changed.' + labels.sort(key = str.lower) + on_change() - def get_pieces(do_strip = False, truncate = False): - """Return left, mid, right, where mid is the part of the Entry that the - cursor is currently positioned over, and left and right is all the text - to the left and right, respectively. If do_strip is True, also strip off - whitespace around mid.""" - pos = e.get_position() - cur = e.get_text() - if truncate: cur = cur[:pos] - start = max(cur.rfind(sep, 0, pos) + 1, 0) - end = cur.find(sep, pos) % (len(cur) + 1) - mid = cur[start:end] - res = (cur[:start], (mid.strip() if do_strip else mid), cur[end:]) - debug(pos, start, end, mid, res) - return res + e = Entry() + self = struct() - def get_mid(): return get_pieces(True)[1] - def get_prefix(): return get_pieces(True, True)[1] + def get_pieces(do_strip = False, truncate = False): + """Return left, mid, right, where mid is the part of the Entry that the + cursor is currently positioned over, and left and right is all the text + to the left and right, respectively. If do_strip is True, also strip off + whitespace around mid.""" + pos = e.get_position() + cur = e.get_text() + if truncate: cur = cur[:pos] + start = max(cur.rfind(sep, 0, pos) + 1, 0) + end = cur.find(sep, pos) % (len(cur) + 1) + mid = cur[start:end] + res = (cur[:start], (mid.strip() if do_strip else mid), cur[end:]) + debug(pos, start, end, mid, res) + return res - @connecting(e, 'focus-out-event') - def on_focus_out(*args): - 'Destroy the pop-up completion window.' - debug('focus-out') + def get_mid(): return get_pieces(True)[1] + def get_prefix(): return get_pieces(True, True)[1] - @connecting(e, 'focus') - def on_focus(*args): - debug('focus') + @connecting(e, 'focus-out-event') + def on_focus_out(*args): + 'Destroy the pop-up completion window.' + debug('focus-out') + tw.hide() - @connecting(e, 'focus-in-event') - def on_focus_in(*args): - debug('focus-in') + @connecting(e, 'focus') + def on_focus(*args): + debug('focus') - @connecting_after(e, 'notify::cursor-position') - def on_change(*args): - 'Filter and highlight the list based on entered text.' - pat = get_prefix().lower() - # TODO: make the matching match up with the highlighting (use - # word-boundaries) - self.matches = matches = [lbl for lbl in labels if pat in lbl.lower()] - l.clear() - for m in matches: - pieces = [] - lasti = i = 0 - lp = len(pat) - if pat != '': - while i <= len(m) - lp: - if (i == 0 or not m[i-1].isalnum()) and m[i:i+lp].lower() == pat: - pieces += [m[lasti:i], (m[i:i+lp],)] - i += lp - lasti = i - else: - i += 1 - pieces += [m[lasti:]] - m = ''.join(('<b>' + escape(p[0]) + '</b>' - if type(p) == tuple - else escape(p)) - for p in pieces) - else: - m = escape(m) - l.append([m]) - # Something is always selected in the list. - t.get_selection().select_path(0) - t.scroll_to_cell(0) + @connecting(e, 'focus-in-event') + def on_focus_in(*args): + debug('focus-in') - @connecting(e, 'key-press-event') - def on_key_press(src, ev): - 'Handle special keys.' + @connecting_after(e, 'notify::cursor-position') + def on_change(*args): + 'Filter and highlight the list based on entered text.' + pat = get_prefix().lower() + # TODO: make the matching match up with the highlighting (use + # word-boundaries) + self.matches = matches = [lbl for lbl in labels if pat in lbl.lower()] + l.clear() + for m in matches: + pieces = [] + lasti = i = 0 + lp = len(pat) + if pat != '': + while i <= len(m) - lp: + if (i == 0 or not m[i-1].isalnum()) and m[i:i+lp].lower() == pat: + pieces += [m[lasti:i], (m[i:i+lp],)] + i += lp + lasti = i + else: + i += 1 + pieces += [m[lasti:]] + m = ''.join(('<b>' + escape(p[0]) + '</b>' + if type(p) == tuple + else escape(p)) + for p in pieces) + else: + m = escape(m) + l.append([m]) + # Something is always selected in the list. + t.get_selection().select_path(0) + t.scroll_to_cell(0) - def complete(): - '''Complete the entered text using currently highlighted list item and - advance focus to the next label (creating one if necessary).''' - # Complete. - mdl, itr = t.get_selection().get_selected() - assert mdl is l - (pos,) = l.get_path(itr) - left, mid, right = get_pieces(True) - debug(repr(left), repr(mid), repr(right)) - if left != '' and not left[-1:].isspace(): left += ' ' - before_pos = left + self.matches[pos] + sep - if not right[:0].isspace(): before_pos += ' ' - e.set_text(before_pos + right) - e.set_position(len(before_pos)) + @connecting(e, 'key-press-event') + def on_key_press(src, ev): + 'Handle special keys.' - if ev.keyval in [k.Down, k.Up] or \ - (ev.keyval in [k.n, k.p] and ev.state == gdk.CONTROL_MASK): - # Up/down selects prev/next row in the list, if possible, and scroll to - # center that row. - sel = t.get_selection() - mdl, itr = sel.get_selected() - assert mdl is l - dir = 1 if ev.keyval == k.Down or \ - (ev.keyval == k.n and ev.state == gdk.CONTROL_MASK) else -1 - if itr is not None: + def complete(): + '''Complete the entered text using currently highlighted list item and + advance focus to the next label (creating one if necessary).''' + # Complete. + mdl, itr = t.get_selection().get_selected() + assert mdl is l (pos,) = l.get_path(itr) - pos += dir - else: - pos = 0 if ev.keyval == k.Down else len(l) - 1 - if 0 <= pos < len(l): - sel.select_path(pos) - t.scroll_to_cell(pos, None, True, .5, .5) - return True - elif ev.keyval in [k.Tab]: - # Tab completes. - complete() - return True - elif ev.keyval in [k.Return]: - # If a new label is entered, add that to the list. If the label - # exists, behave as Tab. If nothing is entered, finish with this song - # and move on. - txt = get_mid() - if txt == '': - pass # TODO: move on to next song - elif txt not in labels: - labels.append(txt) - refresh() - elif txt in labels: + left, mid, right = get_pieces(True) + debug(repr(left), repr(mid), repr(right)) + if left != '' and not left[-1:].isspace(): left += ' ' + before_pos = left + self.matches[pos] + sep + if not right[:0].isspace(): before_pos += ' ' + e.set_text(before_pos + right) + e.set_position(len(before_pos)) + + def try_add(txt = None): + if txt is None: txt = get_mid() + if txt != '': + if txt in labels: + complete() + else: + labels.append(txt) + refresh() + + if ev.keyval in [k.Down, k.Up] or \ + (ev.keyval in [k.n, k.p] and ev.state == gdk.CONTROL_MASK): + # Up/down selects prev/next row in the list, if possible, and scroll to + # center that row. + sel = t.get_selection() + mdl, itr = sel.get_selected() + assert mdl is l + dir = 1 if ev.keyval == k.Down or \ + (ev.keyval == k.n and ev.state == gdk.CONTROL_MASK) else -1 + if itr is not None: + (pos,) = l.get_path(itr) + pos += dir + else: + pos = 0 if ev.keyval == k.Down else len(l) - 1 + if 0 <= pos < len(l): + sel.select_path(pos) + t.scroll_to_cell(pos, None, True, .5, .5) + return True + elif ev.keyval in [k.Tab]: + # Tab completes. complete() - return True + return True + elif ev.keyval in [k.Return]: + # If a new label is entered, add that to the list. If the label + # exists, behave as Tab. If nothing is entered, finish with this track + # and move on. + txt = get_mid() + if txt == '': + # Move on to next track. + dir,inc = ('prev',-1) if ev.state == gdk.SHIFT_MASK else ('next',1) + debug('moving to %s track' % dir) + nexte = es[(es.index(e) + inc) % len(es)] + nexte.grab_focus() + nexte.set_position(-1) + else: + try_add(txt) + return True + elif ev.keyval == k.semicolon: + # If a new label is entered, add that to the list. If the label + # exists, behave as Tab. + try_add() + return False - # The pop-up list box window. - tw = Window(WINDOW_POPUP) - tw.set_transient_for(w) - tw.set_default_size(100, 200) - tw.set_accept_focus(False) - l = ListStore(str) - t = TreeView(l) - t.set_headers_visible(False) - t.append_column(TreeViewColumn(None, CellRendererText(), markup = 0)) - s = ScrolledWindow() - s.add(t) - s.set_policy(POLICY_AUTOMATIC, POLICY_AUTOMATIC) - tw.add(s) - refresh() - tw.show_all() + # The pop-up list box window. + tw = Window(WINDOW_POPUP) + tw.set_transient_for(w) + tw.set_default_size(100, 200) + tw.set_accept_focus(False) + l = ListStore(str) + t = TreeView(l) + t.set_headers_visible(False) + t.append_column(TreeViewColumn(None, CellRendererText(), markup = 0)) + s = ScrolledWindow() + s.add(t) + s.set_policy(POLICY_AUTOMATIC, POLICY_AUTOMATIC) + tw.add(s) + refresh() + @connecting_after(e, 'focus-in-event') # TODO hack; too early on resizes + def show_popup(*args): + tw.show_all() + win = e.get_window() + _,_,ew,eh,_ = win.get_geometry() + ex,ey = win.get_origin() + if do_debug: + debug('size_request:', e.size_request()) + debug('alloc:', (e.get_allocation().x, e.get_allocation().y)) + debug('win.geom:', win.get_geometry()) + debug('win.orig:', win.get_origin()) + debug('win.pos:', win.get_position()) + tw.set_size_request(ew,-1) + tw.move(ex,ey+eh) + + row = rows.next() + caption = '%s - %s' % (trunc(track.get('TPE1','Unknown Artist')), + trunc(track.get('TIT2','Untitled'))) + tab.attach(Label(str = caption), 0, 1, row, row+1) + tab.attach(e, 1, 2, row, row+1) + es.append(e) + # Final steps. - h.pack_start(e) - @connecting_after(w, 'show') # initial, but doesn't work (too early) - @connecting_after(w, 'drag-end') # want to detect movement - @connecting_after(e, 'notify::cursor-position') - @connecting_after(e, 'focus-in-event') # hack to handle initial - def shown(*args): - ew,eh = e.size_request() - gdkwin = e.get_parent_window() - x,y = gdkwin.get_origin() - if do_debug: - a = e.get_allocation() - debug(ew, eh, a.x, a.y, a.width, a.height, gdkwin, (x,y), - gdkwin.get_position(), gdkwin.get_geometry()) - y += eh - # TODO: figure out how to position the pop-up window correctly. - tw.set_size_request(e.size_request()[0],-1) - tw.move(x,y) - w.move(0,0) + with file(expanduser('~/.quodlibet/playlists/New')) as f: + for line in f: + make_e(File(line.rstrip())) + w.set_position(WIN_POS_CENTER) + debug('size req:', tab.size_request()) + w.set_default_size(800,600) w.show_all() - es.append(e) main() if __name__ == '__main__': from os import listdir - from os.path import expanduser from urllib import quote, unquote labels = map(unquote, listdir(expanduser('~/.quodlibet/playlists/'))) - do_debug = True + do_debug = False mainwin() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-24 11:01:43
|
Revision: 1139 http://assorted.svn.sourceforge.net/assorted/?rev=1139&view=rev Author: yangzhang Date: 2009-01-24 11:01:40 +0000 (Sat, 24 Jan 2009) Log Message: ----------- taped together a "working" positioning Modified Paths: -------------- music-labeler/trunk/src/ml.py Modified: music-labeler/trunk/src/ml.py =================================================================== --- music-labeler/trunk/src/ml.py 2009-01-24 10:15:03 UTC (rev 1138) +++ music-labeler/trunk/src/ml.py 2009-01-24 11:01:40 UTC (rev 1139) @@ -103,6 +103,7 @@ l.append([m]) # Something is always selected in the list. t.get_selection().select_path(0) + t.scroll_to_cell(0) @connecting(e, 'key-press-event') def on_key_press(src, ev): @@ -173,13 +174,27 @@ s.set_policy(POLICY_AUTOMATIC, POLICY_AUTOMATIC) tw.add(s) refresh() - ew,eh = e.size_request() - # TODO: figure out how to position the pop-up window correctly. - #tw.move() tw.show_all() # Final steps. h.pack_start(e) + @connecting_after(w, 'show') # initial, but doesn't work (too early) + @connecting_after(w, 'drag-end') # want to detect movement + @connecting_after(e, 'notify::cursor-position') + @connecting_after(e, 'focus-in-event') # hack to handle initial + def shown(*args): + ew,eh = e.size_request() + gdkwin = e.get_parent_window() + x,y = gdkwin.get_origin() + if do_debug: + a = e.get_allocation() + debug(ew, eh, a.x, a.y, a.width, a.height, gdkwin, (x,y), + gdkwin.get_position(), gdkwin.get_geometry()) + y += eh + # TODO: figure out how to position the pop-up window correctly. + tw.set_size_request(e.size_request()[0],-1) + tw.move(x,y) + w.move(0,0) w.show_all() es.append(e) main() @@ -189,5 +204,5 @@ from os.path import expanduser from urllib import quote, unquote labels = map(unquote, listdir(expanduser('~/.quodlibet/playlists/'))) - do_debug = False + do_debug = True mainwin() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-24 10:15:06
|
Revision: 1138 http://assorted.svn.sourceforge.net/assorted/?rev=1138&view=rev Author: yangzhang Date: 2009-01-24 10:15:03 +0000 (Sat, 24 Jan 2009) Log Message: ----------- added more c sandbox items Modified Paths: -------------- sandbox/trunk/src/c/switches.c Added Paths: ----------- sandbox/trunk/src/c/arrayinit.c sandbox/trunk/src/c/coercion.mk sandbox/trunk/src/c/overflow/ sandbox/trunk/src/c/rand.c sandbox/trunk/src/c/switchsyntax.c Added: sandbox/trunk/src/c/arrayinit.c =================================================================== --- sandbox/trunk/src/c/arrayinit.c (rev 0) +++ sandbox/trunk/src/c/arrayinit.c 2009-01-24 10:15:03 UTC (rev 1138) @@ -0,0 +1,141 @@ +#include <stdio.h> + +// Array initialization syntax. + +int main() { + // Can be any order. + int xs[] = { [0] 0, [4] -4, [1] -1, [2] -2 }; + printf("%d\n", xs[3]); // Garbage. + printf("%d\n", xs[4]); // As expected. + return 0; +} + +#if 0 +/* + * Warning -- this file must be compiled with GCC + * to show the goto dispatching. In particular, + * the compiler must be compatible with gnu99 + * (i.e. -std=gnu99). + */ + +/* + * Reminder -- disassemble with: + * objdump 4-jumptable -d > jumptable.asm + * Consider the generated asm. + */ + +//enum simplifies unique numbers for opcodes +enum { + OP_ADD = 0, + OP_SUB, + OP_MUL, + OP_DIV, + OP_MAX +}; + +int add(int x, int y); +int sub(int x, int y); +int mul(int x, int y); +int div(int x, int y); +int goto_dispatch(int op, int x, int y); + +int +main(int argc, char **argv) +{ + void print_sw(int op, int r, int s) { + printf("SWCH: op %d (%d, %d) = %d\n", op, r, s, switch_dispatch(op, r, s)); + } + print_sw(OP_ADD, 4, 5); + print_sw(OP_DIV, 8, 2); + switch_dispatch(7, 0, 0); + switch_dispatch(10, 0, 0); + switch_dispatch(15, 0, 0); + switch_dispatch(20, 0, 0); + + //nested convenience function prints the op + void print_func(int op, int r, int s) { + printf("FUNC: op %d (%d, %d) = %d\n", op, r, s, func_dispatch(op, r, s)); + }; + //test a few dispatches + print_func(OP_ADD, 4, 5); + print_func(OP_DIV, 8, 2); + + //what if we don't want to make a new stack frame? + void print_goto(int op, int r, int s) { + printf("GOTO: op %d (%d, %d) = %d\n", op, r, s, goto_dispatch(op, r, s)); + } + print_goto(OP_ADD, 4, 5); + print_goto(OP_DIV, 8, 2); + + return 0; +} + +int +switch_dispatch(int op, int x, int y) +{ + switch (op) { + case OP_ADD: + return x + y; + case OP_MUL: + return x * y; + case OP_DIV: + return x / y; + case OP_SUB: + return x - y; + case 7 ... 10: + printf("switch_dispatch invoked with opcode in range 7 to 10\n"); + break; + case 15 ... 20: + printf("switch_dispatch invoked with opcode in range 15 to 20\n"); + break; + } + return -1; +} + +int +func_dispatch(int op, int x, int y) +{ + //let's make a jumptable with function pointers + //compare with and without static + static int (*jumptable[OP_MAX])(int a, int b) = { + //named index notation lets us specify things out-of-order + [OP_ADD] add, + [OP_MUL] mul, + [OP_SUB] sub, + [OP_DIV] div + }; + return jumptable[op](x, y); + //or + //return (*jumptable[op])(x, y); +} + +int add(int x, int y) { return x + y; } +int sub(int x, int y) { return x - y; } +int mul(int x, int y) { return x * y; } +int div(int x, int y) { return x / y; } + +int +goto_dispatch(int op, int x, int y) +{ + //now, lets make a jumptable using gotos + static void *labeltable[OP_MAX] = { + //here we see the address-of-label syntax + [OP_ADD] &&l_add, + [OP_MUL] &&l_mul, + [OP_SUB] &&l_sub, + [OP_DIV] &&l_div + }; + + //we must dereference a label's address if we want to goto it + goto *labeltable[op]; + +l_add: + return x+y; +l_sub: + return x-y; +l_mul: + return x*y; +l_div: + return x/y; +} +#endif Added: sandbox/trunk/src/c/coercion.mk =================================================================== --- sandbox/trunk/src/c/coercion.mk (rev 0) +++ sandbox/trunk/src/c/coercion.mk 2009-01-24 10:15:03 UTC (rev 1138) @@ -0,0 +1,2 @@ +coercion: coercion.c + gcc -Wconversion coercion.c Added: sandbox/trunk/src/c/rand.c =================================================================== --- sandbox/trunk/src/c/rand.c (rev 0) +++ sandbox/trunk/src/c/rand.c 2009-01-24 10:15:03 UTC (rev 1138) @@ -0,0 +1,8 @@ +#include <limits.h> +#include <stdlib.h> +#include <stdio.h> +int main() { + printf("RAND_MAX = %d\n", RAND_MAX); + printf("INT_MAX = %d\n", INT_MAX); + return 0; +} Modified: sandbox/trunk/src/c/switches.c =================================================================== --- sandbox/trunk/src/c/switches.c 2009-01-23 21:44:55 UTC (rev 1137) +++ sandbox/trunk/src/c/switches.c 2009-01-24 10:15:03 UTC (rev 1138) @@ -1,4 +1,4 @@ - +// Explore the generated asm code. int main() { int x = 3; switch (x) { Added: sandbox/trunk/src/c/switchsyntax.c =================================================================== --- sandbox/trunk/src/c/switchsyntax.c (rev 0) +++ sandbox/trunk/src/c/switchsyntax.c 2009-01-24 10:15:03 UTC (rev 1138) @@ -0,0 +1,9 @@ +// Demo fancy switch range syntax. + +int main(int argc, char **argv) { + switch (argc) { + case 0 ... 3: return 0; + case 4 ... 8: return 1; + default: return 2; + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-23 22:44:14
|
Revision: 1137 http://assorted.svn.sourceforge.net/assorted/?rev=1137&view=rev Author: yangzhang Date: 2009-01-23 21:44:55 +0000 (Fri, 23 Jan 2009) Log Message: ----------- - added further breakdown of the xfer time - upgraded readmsg - mean -> median in analysis plot Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-01-23 09:34:16 UTC (rev 1136) +++ ydb/trunk/src/Makefile 2009-01-23 21:44:55 UTC (rev 1137) @@ -27,6 +27,7 @@ endif LDFLAGS := -pthread -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt $(GPROF) +# The -Wno- warnings are for boost. CXXFLAGS := -g3 -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-23 09:34:16 UTC (rev 1136) +++ ydb/trunk/src/main.lzz.clamp 2009-01-23 21:44:55 UTC (rev 1137) @@ -282,17 +282,31 @@ } /** - * Read a message. + * Read a message. This is done in two steps: first by reading the length + * prefix, then by reading the actual body. + * + * \param[in] src The socket from which to read. + * + * \param[in] msg The protobuf to read into. + * + * \param[in] timed Whether to make a note of the time at which the first piece of the + * message (the length) was received. Such measurement only makes sense for + * large messages which take a long time to receive. + * + * \param[in] timeout on each of the two read operations (first one is on + * length, second one is on the rest). */ template <typename T> -void -readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +long long +readmsg(st_netfd_t src, T & msg, bool timed = false, st_utime_t timeout = + ST_UTIME_NO_TIMEOUT) { // Read the message length. uint32_t len; checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, timeout), static_cast<ssize_t>(sizeof len)); + long long start_receive = timed ? current_time_millis() : -1; len = ntohl(len); #define GETMSG(buf) \ @@ -308,6 +322,8 @@ scoped_array<char> buf(new char[len]); GETMSG(buf.get()); } + + return start_receive; } /** @@ -320,7 +336,7 @@ readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { T msg; - readmsg(src, msg, timeout); + readmsg(src, msg, false, timeout); return msg; } @@ -943,13 +959,15 @@ recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message. Recovery recovery; + long long receive_start = -1; { st_intr intr(stop_hub); - readmsg(__ref(replicas)[__ctx(i)], recovery); + receive_start = readmsg(__ref(replicas)[__ctx(i)], recovery, true); } long long build_start = current_time_millis(); cout << "got recovery message in " - << build_start - __ref(before_recv) << " ms" << endl; + << build_start - __ref(before_recv) << " ms (xfer took " + << build_start - receive_start << " ms)" << endl; for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-23 09:34:16 UTC (rev 1136) +++ ydb/trunk/tools/analysis.py 2009-01-23 21:44:55 UTC (rev 1137) @@ -18,7 +18,7 @@ yield list(tups) a = array(list(gen())) indexes = a[:,0,0] - means = a.mean(1) + means = median(a,1) #a.mean(1) stds = a.std(1) tup = (indexes,) for i in range(1, len(a[0,0])): @@ -62,25 +62,27 @@ check(path) def getpairs(): with file(path) as f: - seqno = recv = buildup = catchup = total = None + seqno = dump = recv = buildup = catchup = total = None for line in f: m = re.match( r'=== seqno=(?P<seqno>\d+) ', line ) if m: seqno = int(m.group('seqno')) - m = re.search( r'got recovery message in (?P<time>\d+) ms', line ) - if m: recv = float(m.group('time')) + m = re.search( r'got recovery message in (?P<dump>\d+) ms \(xfer took (?P<recv>\d+) ms\)', line ) + if m: dump, recv = float(m.group('dump')), float(m.group('recv')) m = re.search( r'built up .* (?P<time>\d+) ms', line ) if m: buildup = float(m.group('time')) - m = re.search( r'replayer caught up; from backlog replayed \d+ txns in (?P<time>\d+) ms', line ) + m = re.search( r'replayer caught up; from backlog replayed \d+ txns .* in (?P<time>\d+) ms', line ) if m: catchup = float(m.group('time')) m = re.match( r'.*: recovering node caught up; took (?P<time>\d+) ?ms', line ) if m: total = float(m.group('time')) - tup = (seqno, recv, buildup, catchup, total) + tup = (seqno, dump, recv, buildup, catchup, total) if all(tup): yield tup - seqno = recv = buildup = catchup = total = None - seqnos, recvmeans, recvsds, buildmeans, buildsds, catchmeans, catchsds, totalmeans, totalsds, stacked, a = agg(getpairs()) + seqno = dump = recv = buildup = catchup = total = None + seqnos, dumpmeans, dumpsds, recvmeans, recvsds, buildmeans, buildsds, \ + catchmeans, catchsds, totalmeans, totalsds, stacked, a = \ + agg(getpairs()) - print 'max seqno, recv mean, recv sd, build mean, build sd, catch mean, catch sd, total mean, total sd' + print 'max seqno, dump mean, dump sd, recv mean, recv sd, build mean, build sd, catch mean, catch sd, total mean, total sd' print stacked print @@ -93,13 +95,16 @@ (226,240,214), (246,255,224)][i+1])) ehue = lambda i: hue(-1) # tuple(map(lambda x: min(1, x + .3), hue(i))) - a = bar(seqnos, recvmeans, yerr = recvsds, width = width, color = hue(0), - ecolor = ehue(0), label = 'State receive') - b = bar(seqnos, buildmeans, yerr = buildsds, width = width, color = hue(1), - ecolor = ehue(1), label = 'Build-up time', bottom = recvmeans) - c = bar(seqnos, catchmeans, yerr = catchsds, width = width, color = hue(2), - ecolor = ehue(2), label = 'Catch-up', - bottom = recvmeans + buildmeans) + bar(seqnos, dumpmeans, yerr = dumpsds, width = width, color = hue(0), + ecolor = ehue(0), label = 'State serialization') + bar(seqnos, recvmeans, yerr = recvsds, width = width, color = hue(0), + ecolor = ehue(0), label = 'State receive', bottom = dumpmeans) + bar(seqnos, buildmeans, yerr = buildsds, width = width, color = hue(1), + ecolor = ehue(1), label = 'Build-up', + bottom = dumpmeans + recvmeans) + bar(seqnos, catchmeans, yerr = catchsds, width = width, color = hue(2), + ecolor = ehue(2), label = 'Catch-up', + bottom = dumpmeans + recvmeans + buildmeans) title('Recovery time over number of transactions') xlabel('Transaction count (corresponds roughly to data size)') Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-23 09:34:16 UTC (rev 1136) +++ ydb/trunk/tools/test.bash 2009-01-23 21:44:55 UTC (rev 1137) @@ -369,6 +369,10 @@ hostargs kill-helper } +times() { + parssh date +%s.%N +} + # Use mssh to log in with password as root to each machine. mssh-root() { : "${hosts:="$(hosts)"}" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-23 09:34:27
|
Revision: 1136 http://assorted.svn.sourceforge.net/assorted/?rev=1136&view=rev Author: yangzhang Date: 2009-01-23 09:34:16 +0000 (Fri, 23 Jan 2009) Log Message: ----------- - added multi-host recovery - added start of googletest tests - added --debug-memory, memory monitor - added place-holder for disk IO threads - fixed some bugs in test.bash - added gtest setup to test.bash - improved colors on plots - updated reqs, todo Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ydb.proto ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/README 2009-01-23 09:34:16 UTC (rev 1136) @@ -30,6 +30,7 @@ - [C++ Commons] svn r1082 - [clamp] 153 - [GCC] 4.3.2 +- [googletest] 1.2.1 - [Lazy C++] 2.8.0 - [Protocol Buffers] 2.0.0 - [State Threads] 1.8 @@ -38,6 +39,7 @@ [C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ [clamp]: http://home.clara.net/raoulgough/clamp/ [GCC]: http://gcc.gnu.org/ +[googletest]: http://code.google.com/p/googletest/ [Lazy C++]: http://www.lazycplusplus.com/ [Protocol Buffers]: http://code.google.com/p/protobuf/ [State Threads]: http://state-threads.sourceforge.net/ @@ -203,6 +205,8 @@ Todo ---- +Period: -1/20 + - DONE add benchmarking/testing hooks - start the recovering joiner at a well-defined time (after a certain # txns or after the DB reaches a certain size) @@ -223,9 +227,31 @@ above restricting of ops to writes - 5 is a lot; db grows large; experiments take much longer - DONE break down into various phases using bar graph of segmented bars + +Period: 1/20-1/27 + +- DONE implement multihost +- TODO add simple, proper timestamped logging +- TODO see how much multihost recovery affects perf +- TODO look again at how much yielding affects perf +- TODO monitor memory usage +- TODO switch to btree +- TODO break down the red bar some more +- TODO see how much time difference there is +- TODO red bar: why are/aren't we saturating bandwidth? +- TODO understand the rest of the perf (eg stl map) +- TODO try scaling up +- TODO implement checkpointing disk-based scheme +- TODO implement log-based recovery; show that it sucks +- TODO implement group (batch) commit for log-based recovery +- TODO talk + - motivation: log-based sucks, look into alternatives - TODO serialize outputs from the various clients to a single merger to (1) have ordering over the (timestamped) messages, and (2) avoid interleaved lines + +Period: 1/27- + - TODO detailed view of tps during recovery over time (should see various phases) - TODO later: runtime overhead of logging/tps under normal operation (scaled Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/src/Makefile 2009-01-23 09:34:16 UTC (rev 1136) @@ -25,10 +25,10 @@ ifneq ($(GCOV),) GCOV := -fprofile-arcs -ftest-coverage endif -LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf \ - -lboost_program_options-gcc43-mt $(GPROF) -CXXFLAGS := -g3 $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ - -Wno-conversion -Wno-ignored-qualifiers \ +LDFLAGS := -pthread -lstx -lst -lresolv -lprotobuf -lgtest \ + -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt $(GPROF) +CXXFLAGS := -g3 -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ + -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/src/main.lzz.clamp 2009-01-23 09:34:16 UTC (rev 1136) @@ -2,8 +2,10 @@ #include <boost/bind.hpp> #include <boost/foreach.hpp> #include <boost/program_options.hpp> +#include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> +#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -13,6 +15,8 @@ #include <cstring> // strsignal #include <iostream> #include <fstream> +#include <gtest/gtest.h> +#include <malloc.h> #include <map> #include <netinet/in.h> // in_addr etc. #include <set> @@ -25,9 +29,11 @@ using namespace boost; using namespace commons; using namespace std; +using namespace testing; #end typedef pair<int, int> pii; +typedef map<int, int> mii; // Configuration. st_utime_t timeout; @@ -35,7 +41,7 @@ size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads; + debug_threads, multirecover, disk, debug_memory; long long timelim, read_thresh, write_thresh; // Control. @@ -339,7 +345,11 @@ // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { - sendmsg(fds[0], Txn()); + if (multirecover) { + bcastmsg(fds, Txn()); + } else { + sendmsg(fds[0], Txn()); + } } // Bring in any new members. while (!newreps.empty()) { @@ -392,7 +402,7 @@ * Process a transaction: update DB state (incl. seqno) and send response to * leader. */ - void +void process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, bool caught_up) { @@ -427,18 +437,44 @@ } } - void +void showtput(const string &action, long long stop_time, long long start_time, int stop_count, int start_count) { long long time_diff = stop_time - start_time; int count_diff = stop_count - start_count; double rate = double(count_diff) * 1000 / time_diff; - cout << action << " " << count_diff << " txns in " << time_diff << " ms (" - << rate << " tps)" << endl; + cout << action << " " << count_diff << " txns [" + << start_count << ".." << stop_count + << "] in " << time_diff << " ms [" + << start_time << ".." << stop_time + << "] (" + << rate << " tps)" << endl; } /** + * Return range * part / nparts, but with proper casting. Assumes that part < + * nparts. + */ +inline int +interp(int range, int part, int nparts) { + return static_cast<int>(static_cast<long long>(range) * part / nparts); +} + +#src +TEST(interp_test, basics) { + EXPECT_EQ(0, interp(3, 0, 3)); + EXPECT_EQ(1, interp(3, 1, 3)); + EXPECT_EQ(2, interp(3, 2, 3)); + EXPECT_EQ(3, interp(3, 3, 3)); + + EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); + EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); + EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); +} +#end + +/** * Actually do the work of executing a transaction and sending back the reply. * * \param[in] leader The connection to the leader. @@ -454,13 +490,18 @@ * \param[in] backlog The backlog of txns that need to be processed. * * \param[in] init_seqno The seqno that was sent in the Init message from the - * leader. Not entirely clear that this is necessary; could probably just go - * with seqno. + * leader. The first expected seqno. + * + * \param[in] mypos This host's position in the Init message list. Used for + * calculating the sub-range of the map for which this node is responsible. + * + * \param[in] nnodes The total number nodes in the Init message list. */ void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, - st_channel<shared_ptr<Txn> > &backlog, int init_seqno) + st_channel<shared_ptr<Txn> > &backlog, int init_seqno, + int mypos, int nnodes) { bool caught_up = init_seqno == 0; long long start_time = current_time_millis(), @@ -503,6 +544,7 @@ } } if (txn.has_seqno()) { + // Regular transaction. const char *action; if (txn.seqno() < 0) { break; @@ -512,7 +554,7 @@ seqno_caught_up = seqno; showtput("process_txns caught up; backlogged", time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno : first_seqno); + first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } process_txn(leader, map, txn, seqno, true); @@ -535,10 +577,17 @@ st_sleep(0); } } else { - // Generate a snapshot. + // Empty (default) Txn means "generate a snapshot." shared_ptr<Recovery> recovery(new Recovery); - cout << "generating recovery of " << map.size() << " records" << endl; - foreach (const pii &p, map) { + mii::const_iterator begin = + map.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); + mii::const_iterator end = multirecover && mypos < nnodes - 1 ? + map.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map.end(); + cout << "generating recovery over " << begin->first << ".." + << (end == map.end() ? "end" : lexical_cast<string>(end->first)) + << " (node " << mypos << " of " << nnodes << ")" + << endl; + foreach (const pii &p, make_iterator_range(begin, end)) { Recovery_Pair *pair = recovery->add_pair(); pair->set_key(p.first); pair->set_value(p.second); @@ -553,7 +602,7 @@ /** * Swallow replica responses. */ - void +void handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { @@ -695,6 +744,15 @@ cout << "sent recovery" << endl; } +void +threadfunc() +{ + while (true) { + sleep(3); + cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; + } +} + /** * Run the leader. */ @@ -725,6 +783,7 @@ // Construct the initialization message. Init init; init.set_txnseqno(0); + init.set_multirecover(true); foreach (replica_info r, replicas) { SockAddr *psa = init.add_node(); psa->set_host(r.host()); @@ -791,6 +850,13 @@ void run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) { + if (disk) { + // Disk IO threads. + for (int i = 0; i < 5; i++) { + thread somethread(threadfunc); + } + } + // Initialize database state. map<int, int> map; int seqno = -1; @@ -829,13 +895,15 @@ readmsg(leader, init); } uint32_t listen_host = init.yourhost(); + multirecover = init.multirecover(); // Display the info. cout << "got init msg with txn seqno " << init.txnseqno() << " and hosts:" << endl; vector<st_netfd_t> replicas; st_closing_all close_replicas(replicas); - for (uint16_t i = 0; i < init.node_size(); i++) { + int mypos = -1; + for (int i = 0; i < init.node_size(); i++) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; in_addr host = { sa.host() }; @@ -843,6 +911,7 @@ cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, INET_ADDRSTRLEN)) << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (is_self) mypos = i; if (!is_self && init.txnseqno() > 0) { replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(sa.port()), @@ -854,7 +923,8 @@ st_channel<shared_ptr<Txn> > backlog; st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), ref(seqno), ref(send_states), - ref(backlog), init.txnseqno()), + ref(backlog), init.txnseqno(), + mypos, init.node_size()), "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), ref(seqno), ref(send_states)), @@ -863,33 +933,46 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - cout << "waiting for recovery from " << replicas[0] << endl; + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; long long before_recv = current_time_millis(); - // Read the recovery message. - Recovery recovery; - { - st_intr intr(stop_hub); - readmsg(replicas[0], recovery); + vector<st_thread_t> recovery_builders; + assert(seqno == -1); + for (int i = 0; i < (multirecover ? init.node_size() : 1); i++) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message. + Recovery recovery; + { + st_intr intr(stop_hub); + readmsg(__ref(replicas)[__ctx(i)], recovery); + } + long long build_start = current_time_millis(); + cout << "got recovery message in " + << build_start - __ref(before_recv) << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; + }, "recovery_builder" + lexical_cast<string>(i))); } - long long build_start = current_time_millis(); - cout << "got recovery message in " << build_start - before_recv - << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); - if (i % chkpt == 0) { - if (yield_during_build_up) st_sleep(0); - } + foreach (st_thread_t t, recovery_builders) { + st_join(t); } - assert(seqno == -1 && - static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - int mid_seqno = seqno = recovery.seqno(); long long mid_time = current_time_millis(); - cout << "receive and build-up took " << mid_time - before_recv - << " ms; built up map of " << recovery.pair_size() << " records in " - << mid_time - build_start << " ms; now at seqno " << seqno << endl; + int mid_seqno = seqno; while (!backlog.empty()) { shared_ptr<Txn> p = backlog.take(); process_txn(leader, map, *p, seqno, false); @@ -951,6 +1034,21 @@ } /** + * Memory monitor. + */ +void +memmon() +{ + while (!stop_hub) { + { + st_intr intr(stop_hub); + st_sleep(1); + } + malloc_stats(); + } +} + +/** * Initialization and command-line parsing. */ int @@ -969,6 +1067,8 @@ po::options_description desc("Allowed options"); desc.add_options() ("help,h", "show this help message") + ("debug-memory,M", po::bool_switch(&debug_memory), + "enable memory monitoring/debug outputs") ("debug-threads,d",po::bool_switch(&debug_threads), "enable context switch debug outputs") ("profile-threads,q",po::bool_switch(&profile_threads), @@ -978,9 +1078,13 @@ ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery") + "yield periodically during build-up phase of recovery (for recoverer only)") ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery") + "yield periodically during catch-up phase of recovery (for recoverer only)") + ("multirecover,m", po::bool_switch(&multirecover), + "recover from multiple hosts, instead of just one (specified via leader only)") + ("disk,k", po::bool_switch(&disk), + "use disk-based recovery") ("dump,D", po::bool_switch(&dump), "replicas should finally dump their state to a tmp file for " "inspection/diffing") @@ -1029,6 +1133,7 @@ "port to listen on (replicas only)") ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), "timeout for IO operations (in microseconds)") + ("test", "execute unit tests instead of running the normal system") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); @@ -1051,6 +1156,12 @@ return 1; } + // Run unit-tests. + if (vm.count("test")) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); + } + // Initialize support for ST working with asynchronous signals. check0x(pipe(sig_pipe)); struct sigaction sa; @@ -1075,6 +1186,12 @@ threads.insert(st_thread_self()); threadnames[st_thread_self()] = "main"; + // Print memory debugging information. + if (debug_memory) { + my_spawn(memmon, "memmon"); + } + + // At the end, print thread profiling information. finally f(lambda() { if (profile_threads) { cout << "thread profiling results:" << endl; Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/src/ydb.proto 2009-01-23 09:34:16 UTC (rev 1136) @@ -14,13 +14,15 @@ // Initialization message sent to a nodes when it joins. message Init { - // The current seqno that the server is on. + // The next seqno that the server is going to send. required int32 txnseqno = 1; // What the leader perceives to be the joining replica's IP address. required uint32 yourhost = 2; + // Which recovery scheme we're using. + required bool multirecover = 3; // The nodes that have joined (including the joining node); the ports here // are the ports on which the nodes are listening. - repeated SockAddr node = 3; + repeated SockAddr node = 4; } // Sent to already-joined nodes to inform them of a newly joining node. Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/tools/analysis.py 2009-01-23 09:34:16 UTC (rev 1136) @@ -85,12 +85,21 @@ print width = 5e4 - a = bar(seqnos, recvmeans, yerr = recvsds, width = width, color = 'r', - label = 'State receive') - b = bar(seqnos, buildmeans, yerr = buildsds, width = width, color = 'g', - label = 'Build-up time', bottom = recvmeans) - c = bar(seqnos, catchmeans, yerr = catchsds, width = width, color = 'b', - label = 'Catch-up', bottom = recvmeans + buildmeans) + # From "zen and tea" on kuler.adobe.com + hue = lambda i: tuple(map(lambda x: float(x)/255, + [( 16, 34, 43), + (149,171, 99), + (189,214,132), + (226,240,214), + (246,255,224)][i+1])) + ehue = lambda i: hue(-1) # tuple(map(lambda x: min(1, x + .3), hue(i))) + a = bar(seqnos, recvmeans, yerr = recvsds, width = width, color = hue(0), + ecolor = ehue(0), label = 'State receive') + b = bar(seqnos, buildmeans, yerr = buildsds, width = width, color = hue(1), + ecolor = ehue(1), label = 'Build-up time', bottom = recvmeans) + c = bar(seqnos, catchmeans, yerr = catchsds, width = width, color = hue(2), + ecolor = ehue(2), label = 'Catch-up', + bottom = recvmeans + buildmeans) title('Recovery time over number of transactions') xlabel('Transaction count (corresponds roughly to data size)') Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-19 04:52:00 UTC (rev 1135) +++ ydb/trunk/tools/test.bash 2009-01-23 09:34:16 UTC (rev 1136) @@ -99,6 +99,12 @@ refresh-local } +node-setup-gtest() { + check-remote + cd /tmp/ + toast --quiet arm googletest +} + node-setup-ydb-1() { check-remote if [[ ! -L ~/ydb ]] @@ -200,6 +206,7 @@ parremote node-setup-m4 parremote node-setup-bison parremote node-setup-clamp + parremote node-setup-gtest } setup-ydb() { @@ -287,6 +294,7 @@ run-helper() { local leader=$1 shift + : ${seqno:=100000} tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & # -v --debug-threads sleep .1 # pexpect 'waiting for at least' # Run initial replicas. @@ -296,7 +304,7 @@ done sleep .1 # pexpect 'got all \d+ replicas' leader # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader" & # -v --debug-threads -t 200000" & + tagssh $1 "ydb/src/ydb -H $leader ${extraargs:-}" & # -v --debug-threads -t 200000" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-19 04:52:07
|
Revision: 1135 http://assorted.svn.sourceforge.net/assorted/?rev=1135&view=rev Author: yangzhang Date: 2009-01-19 04:52:00 +0000 (Mon, 19 Jan 2009) Log Message: ----------- added some elisp functions after taking phil's emacs iap class; new fx.vim Modified Paths: -------------- configs/trunk/src/emacs/yang.el configs/trunk/src/vim/syntax/fx.vim Modified: configs/trunk/src/emacs/yang.el =================================================================== --- configs/trunk/src/emacs/yang.el 2009-01-19 04:50:37 UTC (rev 1134) +++ configs/trunk/src/emacs/yang.el 2009-01-19 04:52:00 UTC (rev 1135) @@ -115,8 +115,8 @@ ;; The following lines are always needed. Choose your own keys. (add-to-list 'auto-mode-alist '("\\.org$" . org-mode)) -(define-key global-map "\C-cl" 'org-store-link) -(define-key global-map "\C-ca" 'org-agenda) +(global-set-key "\C-cl" 'org-store-link) +(global-set-key "\C-ca" 'org-agenda) (add-hook 'org-mode-hook 'turn-on-font-lock) ; org-mode buffers only @@ -256,9 +256,9 @@ (require 'jtags) (require 'zoom-frm) -(define-key global-map (read-kbd-macro "C--") 'zoom-frm-out) -(define-key global-map (read-kbd-macro "C-=") 'zoom-frm-in) -(define-key global-map (read-kbd-macro "C-0") 'zoom-frm-unzoom) +(global-set-key (read-kbd-macro "C--") 'zoom-frm-out) +(global-set-key (read-kbd-macro "C-=") 'zoom-frm-in) +(global-set-key (read-kbd-macro "C-0") 'zoom-frm-unzoom) ;; }}} @@ -290,6 +290,20 @@ (global-set-key [f12] 'search-word-under-cursor-forward) (global-set-key [f11] 'search-word-under-cursor-backward) +;; This was written in Phil's Emacs IAP class! +(defun count-words (&optional start end) + "Print the number of words in the region or buffer." + (interactive "r") + (let ((count 0) + (start (if (use-region-p) start (point-min))) + (end (if (use-region-p) end (point-max)))) + (save-excursion + (goto-char start) + (while (and (< (point) end) + (re-search-forward "\\w+" end t)) + (setq count (+ 1 count)))) + (message "Region contains %d words" count))) + ;; TODO make this work in XEmacs (defun copy-line (n) "Copy N lines at point to the kill-ring." @@ -504,9 +518,9 @@ (add-hook 'write-file-hook 'time-stamp) ;; Flyspell is an in-line spell checker. It checks the spelling as you type. -;(dolist (hook '(text-mode-hook)) -;(add-hook hook (lambda () (refill-mode 1)))) -;(add-hook hook (lambda () (flyspell-mode 1)))) +(dolist (hook '(text-mode-hook)) +(add-hook hook (lambda () (refill-mode 1)))) +(add-hook hook (lambda () (flyspell-mode 1)))) ;; }}} Modified: configs/trunk/src/vim/syntax/fx.vim =================================================================== --- configs/trunk/src/vim/syntax/fx.vim 2009-01-19 04:50:37 UTC (rev 1134) +++ configs/trunk/src/vim/syntax/fx.vim 2009-01-19 04:52:00 UTC (rev 1135) @@ -2,7 +2,7 @@ " Language: JavaFX Script " Maintainer: Yang Zhang <com.gmail@yaaang> " URL: http://assorted.sf.net/jfx-vim -" Last Change: 2007 Jun 03 +" Last Change: 2008 Jan 18 " Quit when a syntax file was already loaded if version < 600 @@ -15,7 +15,7 @@ syn sync minlines=50 " most JFX keywords -syn keyword jfxKeyword after as before bind class else extends first for foreach from if in into last new on private protected return select switch super then trigger var where while insert delete +syn keyword jfxKeyword abstract attribute bind break class continue delete false for function if import init insert new not null package private public return super sizeof static this throw try true var while after and as before catch dur else exclusive extends finally in bound indexof into inverse lazy on or replace step with where instanceof override at then tween assert by do first from last let protected readonly typeof lazy syn keyword jfxImport import nextgroup=scalaFqn skipwhite syn match jfxFqn "\<[._$a-zA-Z0-9,*]*" contained nextgroup=jfxFqnSet This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-19 04:50:45
|
Revision: 1134 http://assorted.svn.sourceforge.net/assorted/?rev=1134&view=rev Author: yangzhang Date: 2009-01-19 04:50:37 +0000 (Mon, 19 Jan 2009) Log Message: ----------- tagged versions; released new version Modified Paths: -------------- jfx-vim/tags/0.2/src/fx.vim jfx-vim/trunk/src/fx.vim Added Paths: ----------- jfx-vim/tags/ jfx-vim/tags/0.1/ jfx-vim/tags/0.2/ Property changes on: jfx-vim/tags/0.1 ___________________________________________________________________ Added: svn:mergeinfo + Property changes on: jfx-vim/tags/0.2 ___________________________________________________________________ Added: svn:mergeinfo + Modified: jfx-vim/tags/0.2/src/fx.vim =================================================================== --- jfx-vim/trunk/src/fx.vim 2009-01-09 06:27:26 UTC (rev 1123) +++ jfx-vim/tags/0.2/src/fx.vim 2009-01-19 04:50:37 UTC (rev 1134) @@ -2,7 +2,7 @@ " Language: JavaFX Script " Maintainer: Yang Zhang <com.gmail@yaaang> " URL: http://assorted.sf.net/jfx-vim -" Last Change: 2007 Jun 03 +" Last Change: 2008 Jan 18 " Quit when a syntax file was already loaded if version < 600 @@ -15,7 +15,7 @@ syn sync minlines=50 " most JFX keywords -syn keyword jfxKeyword after as before bind class else extends first for foreach from if in into last new on private protected return select switch super then trigger var where while insert delete +syn keyword jfxKeyword abstract attribute bind break class continue delete false for function if import init insert new not null package private public return super sizeof static this throw try true var while after and as before catch dur else exclusive extends finally in bound indexof into inverse lazy on or replace step with where instanceof override at then tween assert by do first from last let protected readonly typeof lazy syn keyword jfxImport import nextgroup=scalaFqn skipwhite syn match jfxFqn "\<[._$a-zA-Z0-9,*]*" contained nextgroup=jfxFqnSet Modified: jfx-vim/trunk/src/fx.vim =================================================================== --- jfx-vim/trunk/src/fx.vim 2009-01-17 18:56:53 UTC (rev 1133) +++ jfx-vim/trunk/src/fx.vim 2009-01-19 04:50:37 UTC (rev 1134) @@ -2,7 +2,7 @@ " Language: JavaFX Script " Maintainer: Yang Zhang <com.gmail@yaaang> " URL: http://assorted.sf.net/jfx-vim -" Last Change: 2007 Jun 03 +" Last Change: 2008 Jan 18 " Quit when a syntax file was already loaded if version < 600 @@ -15,7 +15,7 @@ syn sync minlines=50 " most JFX keywords -syn keyword jfxKeyword after as before bind class else extends first for foreach from if in into last new on private protected return select switch super then trigger var where while insert delete +syn keyword jfxKeyword abstract attribute bind break class continue delete false for function if import init insert new not null package private public return super sizeof static this throw try true var while after and as before catch dur else exclusive extends finally in bound indexof into inverse lazy on or replace step with where instanceof override at then tween assert by do first from last let protected readonly typeof lazy syn keyword jfxImport import nextgroup=scalaFqn skipwhite syn match jfxFqn "\<[._$a-zA-Z0-9,*]*" contained nextgroup=jfxFqnSet This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-17 18:57:00
|
Revision: 1133 http://assorted.svn.sourceforge.net/assorted/?rev=1133&view=rev Author: yangzhang Date: 2009-01-17 18:56:53 +0000 (Sat, 17 Jan 2009) Log Message: ----------- - filled in more measurement documentation and rewrote parts of the overview - added -pg to the Makefile for profiling - added --profile-threads - added --write-thresh - improved detail of and tweaked some output messages - more clearly distinguished the different phases of recovery on the joiner - changed the joiner behavior to not send responses when catching up - added clean shutdown messages from leader to replicas - decreased the variance in the txns - added --max-ops, --min-ops - added some lineage to the analysis.py output - generalized analysis code to support plotting multi-segmented bar charts Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/README 2009-01-17 18:56:53 UTC (rev 1133) @@ -3,24 +3,23 @@ ydb (Yang's Database) is a simple replicated memory store, developed for the purpose of researching various approaches to recovery in such OLTP-optimized -databases as [VOLTDB] (formerly H-Store/Horizontica). +databases as [H-Store] (or VOLTDB). -[VOLTDB]: http://db.cs.yale.edu/hstore/ +[H-Store]: http://db.cs.yale.edu/hstore/ -Currently, the only recovery implemented mechanism is to have the first-joining +Currently, the only recovery implemented mechanism is to have an already-joined replica serialize its entire database state and send that to the joining node. -If you start a system with a minimum of $n$ replicas, then the leader will wait -for that many to them to join before it starts issuing transactions. Then when -replica $n+1$ joins, it will need to catch up to the current state of the -system; it will do so by contacting the first-joining replica and receiving a -complete dump of its DB state. +If you start a system with a minimum of $n$ replicas, then the leader waits for +that many to them to join before it starts issuing transactions. Once replica +$n+1$ joins, it needs to catch up to the current state of the system; it does +so by contacting the first-joining replica and receiving a complete dump of its +DB state. -The leader will report the current txn seqno to the joiner, and start streaming -txns beyond that seqno to the joiner, which the joiner will push onto its -backlog. It will also instruct that first replica to snapshot its DB state at -this txn seqno and prepare to send it to the recovering node as soon as it -connects. +The leader reports the current txn seqno to the joiner, and starts streaming +txns beyond that seqno to the joiner, which the joiner pushes onto its backlog. +It also instructs that first replica to snapshot its DB state at this txn seqno +and prepare to send it to the recovering node as soon as it connects. Setup ----- @@ -123,23 +122,28 @@ ### Recovery experiments To run a leader on `farm10`, initial replicas on `farm11` and `farm12`, and a -recovering replica on `farm13` after 5 seconds: +recovering replica on `farm13` after once the 100,000th txn has been issued: - range='10 13' ./test.bash run 1000 # Command requires exactly 4 nodes. + range='10 13' seqno=100000 ./test.bash run -To run this experiment TODO -trials: +The above experiment uses the `block` recovery scheme. To use the `yield` +recovery scheme: - range='10 13' + range='10 13' seqno=100000 extraargs=--yield-catch-up ./test.bash run +To run `block` and `yield`, respectively, for varying values of `seqno` and for +some number of trials: + + range='10 13' ./test.bash full-block + range='10 13' ./test.bash full-yield + ### Scaling experiments To run a leader on `farm10` with initial replicas on the rest: range='10 15' ./test.bash scaling -To run for 1 through 3 initial replicas, repeating each configuration for 3 -trials: +To run for varying numbers of replicas and for some number of trials: range='10 13' ./test.bash full-scaling @@ -199,12 +203,44 @@ Todo ---- +- DONE add benchmarking/testing hooks + - start the recovering joiner at a well-defined time (after a certain # + txns or after the DB reaches a certain size) + - stop the system once recovery finishes +- DONE find out how often prng yields same number + - not very often +- DONE baseline scaling (tps with number of nodes) + - inversely proportional to number of nodes, so bottlenecked at leader +- DONE recovery time as a function of amount of data +- DONE use only insert (and update) txns + - db size blows up much faster +- DONE try gprof profiling + - output quirky; waiting on list response to question +- DONE optimize acks from joiner + - much faster, and much less variance +- DONE use more careful txn counting/data size + - added lower/upper bounds on the rand # ops per txn (5), combined with + above restricting of ops to writes + - 5 is a lot; db grows large; experiments take much longer +- DONE break down into various phases using bar graph of segmented bars +- TODO serialize outputs from the various clients to a single merger to (1) + have ordering over the (timestamped) messages, and (2) avoid interleaved + lines +- TODO detailed view of tps during recovery over time (should see various + phases) +- TODO later: runtime overhead of logging/tps under normal operation (scaled + with # nodes?) +- TODO later: timestamped logging? + +Longer term + +- Testing + - unit/regression/mock + - performance tests + - valgrind + - Add a way to reliably obtain ST stack traces -- Add benchmarking/testing hooks, e.g.: - - start the recovering joiner at a well-defined time (after a certain # txns - or after the DB reaches a certain size) - - Add benchmarking information, e.g.: - txns/second normally - txns during recovery @@ -238,24 +274,8 @@ - Add disk-based recovery methods. -Plan/Notes ----------- +Presentation Notes +------------------ -Measurements - -- DONE find out how often prng yields same number - - not very often -- DONE baseline scaling (tps with number of nodes) - - inversely proportional to number of nodes, so bottlenecked at leader -- DONE recovery time as a function of amount of data - - TODO break down into various phases using bar graph of segmented bars -- DONE use only insert (and update) txns -- TODO try profiling -- TODO detailed view of tps during recovery over time (should see various phases) -- TODO later: runtime overhead of logging/tps under normal operation (scaled - with # nodes?) - -Presentation - - TODO differences from: harbor, harp, aries - TODO understand 2pc, paxos, etc. Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/src/Makefile 2009-01-17 18:56:53 UTC (rev 1133) @@ -19,14 +19,21 @@ SRCS := $(GENSRCS) OBJS := $(GENOBJS) +ifneq ($(GPROF),) + GPROF := -pg +endif +ifneq ($(GCOV),) + GCOV := -fprofile-arcs -ftest-coverage +endif LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf \ - -lboost_program_options-gcc43-mt -CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ + -lboost_program_options-gcc43-mt $(GPROF) +CXXFLAGS := -g3 $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ + -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ -Winline -Wsynth -PBCXXFLAGS := -g3 -Wall -Werror +PBCXXFLAGS := -g3 -Wall -Werror $(GPROF) all: $(TARGET) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/src/main.lzz.clamp 2009-01-17 18:56:53 UTC (rev 1133) @@ -31,11 +31,12 @@ // Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno, issuing_interval; +int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, - count_updates, stop_on_recovery, general_txns; -long long timelim, read_thresh; + count_updates, stop_on_recovery, general_txns, profile_threads, + debug_threads; +long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; @@ -60,10 +61,19 @@ ~thread_eraser() { threads.erase(st_thread_self()); } }; +/** + * For debug/error-printing purposes. + */ map<st_thread_t, string> threadnames; st_thread_t last_thread; /** + * For profiling. + */ +map<st_thread_t, long long> threadtimes; +long long thread_start_time; + +/** * Look up thread name, or just show thread ID. */ string @@ -81,7 +91,9 @@ void switch_out_cb() { - last_thread = st_thread_self(); + if (debug_threads) last_thread = st_thread_self(); + if (profile_threads) + threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; } /** @@ -89,11 +101,13 @@ */ void switch_in_cb() { - if (last_thread != st_thread_self()) { + if (debug_threads && last_thread != st_thread_self()) { cout << "switching"; if (last_thread != 0) cout << " from " << threadname(last_thread); cout << " to " << threadname() << endl; } + if (profile_threads) + thread_start_time = current_time_millis(); } /** @@ -223,9 +237,11 @@ size_t resid = sizeof len; #define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + long long before_write; + if (write_thresh > 0) { + before_write = current_time_millis(); + } if (res == -1 && errno == ETIME) { - cerr << "got timeout! " << resid << " of " << sizeof len - << " remaining, for dst #" << dstno << endl; checksize(st_write(dst, reinterpret_cast<char*>(&len) + sizeof len - resid, resid, @@ -234,6 +250,14 @@ } else { check0x(res); } + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), s.size()); dstno++; @@ -325,7 +349,7 @@ // Generate a random transaction. Txn txn; txn.set_seqno(seqno++); - int count = randint(5) + 1; + int count = randint(min_ops, max_ops + 1); for (int o = 0; o < count; o++) { Op *op = txn.add_op(); int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); @@ -358,6 +382,10 @@ accept_joiner.set(); } } + + Txn txn; + txn.set_seqno(-1); + bcastmsg(fds, txn); } /** @@ -394,7 +422,9 @@ break; } } - sendmsg(leader, res); + if (caught_up) { + sendmsg(leader, res); + } } void @@ -405,7 +435,7 @@ int count_diff = stop_count - start_count; double rate = double(count_diff) * 1000 / time_diff; cout << action << " " << count_diff << " txns in " << time_diff << " ms (" - << rate << "tps)" << endl; + << rate << " tps)" << endl; } /** @@ -427,7 +457,7 @@ * leader. Not entirely clear that this is necessary; could probably just go * with seqno. */ - void +void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno) @@ -436,20 +466,24 @@ long long start_time = current_time_millis(), time_caught_up = caught_up ? start_time : -1; int seqno_caught_up = caught_up ? seqno : -1; + // Used by joiner only to tell where we actually started (init_seqno is just + // the seqno reported by the leader in the Init message, but it may have + // issued more since the Init message). + int first_seqno = -1; finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(shared_ptr<Recovery>()); - }); + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); while (true) { Txn txn; @@ -464,23 +498,28 @@ if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; if (read_time > read_thresh) { - cout << "current_time_millis() - before_read = " << read_time << " > " - << read_thresh << endl; + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; } } if (txn.has_seqno()) { const char *action; - if (txn.seqno() == seqno + 1) { + if (txn.seqno() < 0) { + break; + } else if (txn.seqno() == seqno + 1) { if (!caught_up) { time_caught_up = current_time_millis(); seqno_caught_up = seqno; - showtput("backlogged", time_caught_up, start_time, seqno_caught_up, - init_seqno); + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno : first_seqno); caught_up = true; } process_txn(leader, map, txn, seqno, true); action = "processed"; } else { + if (first_seqno == -1) + first_seqno = txn.seqno(); // Queue up for later processing once a snapshot has been received. backlog.push(shared_ptr<Txn>(new Txn(txn))); action = "backlogged"; @@ -576,7 +615,8 @@ // never grow again if stop_hub is set. if (last_seqno + 1 == seqno) { cout << rid << ": "; - cout << "stopping seqno = " << res.seqno() << endl; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; break; } else { continue; @@ -597,7 +637,7 @@ recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " - << timediff << "ms" << endl; + << timediff << " ms" << endl; // This will cause the program to exit eventually, but cleanly, such that // the recovery time will be set first, before the eventual exit (which // may not even happen in the current iteration). @@ -756,12 +796,12 @@ int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; - cout << "total updates = " << updates << endl; - cout << "final DB state: seqno = " << __ref(seqno) << ", size = " + cout << "- total updates = " << updates << endl; + cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " << __ref(map).size() << endl; string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); if (dump) { - cout << "dumping to " << fname << endl; + cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; foreach (const pii &p, __ref(map)) { @@ -824,6 +864,7 @@ // If there's anything to recover. if (init.txnseqno() > 0) { cout << "waiting for recovery from " << replicas[0] << endl; + long long before_recv = current_time_millis(); // Read the recovery message. Recovery recovery; @@ -831,6 +872,9 @@ st_intr intr(stop_hub); readmsg(replicas[0], recovery); } + long long build_start = current_time_millis(); + cout << "got recovery message in " << build_start - before_recv + << " ms" << endl; for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); @@ -840,8 +884,11 @@ } assert(seqno == -1 && static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered " << recovery.pair_size() << " records." << endl; + int mid_seqno = seqno = recovery.seqno(); + long long mid_time = current_time_millis(); + cout << "receive and build-up took " << mid_time - before_recv + << " ms; built up map of " << recovery.pair_size() << " records in " + << mid_time - build_start << " ms; now at seqno " << seqno << endl; while (!backlog.empty()) { shared_ptr<Txn> p = backlog.take(); @@ -849,13 +896,14 @@ if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; + << "backlog.size = " << backlog.queue().size() << endl; // Explicitly yield. (Note that yielding does still effectively // happen anyway because process_txn is a yield point.) st_sleep(0); } } - cout << "caught up." << endl; + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); } } catch (std::exception &ex) { cerr_thread_ex(ex) << endl; @@ -912,7 +960,7 @@ try { GOOGLE_PROTOBUF_VERIFY_VERSION; - bool is_leader, use_epoll, debug_threads; + bool is_leader, use_epoll; int minreps; uint16_t leader_port, listen_port; string leader_host; @@ -923,6 +971,8 @@ ("help,h", "show this help message") ("debug-threads,d",po::bool_switch(&debug_threads), "enable context switch debug outputs") + ("profile-threads,q",po::bool_switch(&profile_threads), + "enable profiling of threads") ("verbose,v", po::bool_switch(&verbose), "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), @@ -951,6 +1001,12 @@ ("issuing-interval,i", po::value<int>(&issuing_interval)->default_value(0), "seconds to sleep between issuing txns (for leader only)") + ("min-ops,o", + po::value<int>(&min_ops)->default_value(5), + "lower bound on randomly generated number of operations per txn (for leader only)") + ("max-ops,O", + po::value<int>(&max_ops)->default_value(5), + "upper bound on randomly generated number of operations per txn (for leader only)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " @@ -965,9 +1021,10 @@ "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") + ("write-thresh,w", po::value<long long>(&write_thresh)->default_value(200), + "if positive and any txn write exceeds this, then print a message (for replicas only)") ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), - "if positive and any txn read exceeds this, then print a message " - "(for replicas only)") + "if positive and any txn read exceeds this, then print a message (for replicas only)") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), "port to listen on (replicas only)") ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), @@ -984,6 +1041,11 @@ cout << desc << endl; return 0; } + + // Validate arguments. + check(min_ops > 0); + check(max_ops > 0); + check(max_ops >= min_ops); } catch (std::exception &ex) { cerr << ex.what() << endl << endl << desc << endl; return 1; @@ -1002,8 +1064,8 @@ // Initialize ST. if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); - st_spawn(bind(handle_sig_sync)); - if (debug_threads) { + my_spawn(bind(handle_sig_sync), "handle_sig_sync"); + if (debug_threads || profile_threads) { st_set_switch_out_cb(switch_out_cb); st_set_switch_in_cb(switch_in_cb); } @@ -1013,6 +1075,26 @@ threads.insert(st_thread_self()); threadnames[st_thread_self()] = "main"; + finally f(lambda() { + if (profile_threads) { + cout << "thread profiling results:" << endl; + long long total; + typedef pair<st_thread_t, long long> entry; + foreach (entry p, threadtimes) { + const string &name = threadname(p.first); + if (name != "main" && name != "handle_sig_sync") + total += p.second; + } + foreach (entry p, threadtimes) { + const string &name = threadname(p.first); + if (name != "main" && name != "handle_sig_sync") + cout << "- " << threadname(p.first) << ": " << p.second + << " (" << (static_cast<double>(p.second) / total) << "%)" + << endl; + } + } + }); + // Which role are we? if (is_leader) { run_leader(minreps, leader_port); Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/tools/analysis.py 2009-01-17 18:56:53 UTC (rev 1133) @@ -1,9 +1,12 @@ #!/usr/bin/env python from __future__ import with_statement -import re, sys, itertools, numpy +import re, sys, itertools +from os.path import basename, realpath from pylab import * +def getname(path): return basename(realpath(path)) + def check(path): with file(path) as f: if 'got timeout' in f.read(): @@ -11,12 +14,21 @@ def agg(src): def gen(): - for seqno, pairs in itertools.groupby(src, lambda (a,b): a): - ts = numpy.array([t for seqno, t in pairs]) - yield seqno, ts.mean(), ts.std(), ts - return list(gen()) + for index, tups in itertools.groupby(src, lambda x: x[0]): + yield list(tups) + a = array(list(gen())) + indexes = a[:,0,0] + means = a.mean(1) + stds = a.std(1) + tup = (indexes,) + for i in range(1, len(a[0,0])): + tup += (means[:,i], stds[:,i]) + stacked = hstack(map(lambda x: x.reshape((len(indexes),1)), tup)) + return tup + (stacked, a) def scaling(path): + print '=== scaling ===' + print 'file:', getname(path) check(path) def getpairs(): with file(path) as f: @@ -24,62 +36,77 @@ m = re.match( r'=== n=(?P<n>\d+) ', line ) if m: n = int(m.group('n')) - m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+)tps', line ) + m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+) ?tps', line ) if m: tps = float(m.group('tps')) yield (n, tps) tups = agg(getpairs()) + ns, tpsmeans, tpssds, stacked, a = agg(getpairs()) + print 'n, tps mean, tps sd' + print stacked + print - print 'num nodes, mean tps, stdev tps' - for n, mean, sd, raw in tups: print n, mean, sd, raw - - xs, ys, es, rs = zip(*tups) - errorbar(xs, ys, es) + errorbar(ns, tpsmeans, tpssds) title('Scaling of baseline throughput with number of nodes') xlabel('Node count') ylabel('Mean TPS (stdev error bars)') - xlim(.5, n+.5) + xlim(ns.min() - .5, ns.max() + .5) ylim(ymin = 0) savefig('scaling.png') def run(blockpath, yieldpath): - for path, label in [(blockpath, 'blocking scheme'), (yieldpath, 'yielding scheme')]: + for path, label in [#(blockpath, 'blocking scheme'), + (yieldpath, 'yielding scheme')]: + print '===', label, '===' + print 'file:', getname(path) check(path) def getpairs(): with file(path) as f: + seqno = recv = buildup = catchup = total = None for line in f: - m = re.match( r'=== seqno=(?P<n>\d+) ', line ) - if m: - seqno = int(m.group('n')) - m = re.match( r'.*: recovering node caught up; took (?P<time>\d+)ms', line ) - if m: - t = float(m.group('time')) - yield (seqno, t) - tups = agg(getpairs()) + m = re.match( r'=== seqno=(?P<seqno>\d+) ', line ) + if m: seqno = int(m.group('seqno')) + m = re.search( r'got recovery message in (?P<time>\d+) ms', line ) + if m: recv = float(m.group('time')) + m = re.search( r'built up .* (?P<time>\d+) ms', line ) + if m: buildup = float(m.group('time')) + m = re.search( r'replayer caught up; from backlog replayed \d+ txns in (?P<time>\d+) ms', line ) + if m: catchup = float(m.group('time')) + m = re.match( r'.*: recovering node caught up; took (?P<time>\d+) ?ms', line ) + if m: total = float(m.group('time')) + tup = (seqno, recv, buildup, catchup, total) + if all(tup): + yield tup + seqno = recv = buildup = catchup = total = None + seqnos, recvmeans, recvsds, buildmeans, buildsds, catchmeans, catchsds, totalmeans, totalsds, stacked, a = agg(getpairs()) - print '===', label, '===' - print 'max seqno, mean time, stdev time [raw data]' - for seqno, mean, sd, raw in tups: print seqno, mean, sd, raw + print 'max seqno, recv mean, recv sd, build mean, build sd, catch mean, catch sd, total mean, total sd' + print stacked print - xs, ys, es, rs = zip(*tups) - errorbar(xs, ys, es, label = label) + width = 5e4 + a = bar(seqnos, recvmeans, yerr = recvsds, width = width, color = 'r', + label = 'State receive') + b = bar(seqnos, buildmeans, yerr = buildsds, width = width, color = 'g', + label = 'Build-up time', bottom = recvmeans) + c = bar(seqnos, catchmeans, yerr = catchsds, width = width, color = 'b', + label = 'Catch-up', bottom = recvmeans + buildmeans) title('Recovery time over number of transactions') xlabel('Transaction count (corresponds roughly to data size)') - ylabel('Mean recovery time in ms (stdev error bars)') - #xlim(.5, n+.5) - #ylim(ymin = 0) + ylabel('Mean time in ms (SD error bars)') + legend(loc = 'upper left') savefig('run.png') def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' - elif sys.argv[1] == 'scaling': - scaling(sys.argv[2] if len(sys.argv) > 2 else 'scaling-log') - elif sys.argv[1] == 'run': - run(*sys.argv[2:] if len(sys.argv) > 2 else ['block-log', 'yield-log']) + elif argv[1] == 'scaling': + scaling(argv[2] if len(argv) > 2 else 'scaling-log') + elif argv[1] == 'run': + run(*argv[2:] if len(argv) > 2 else ['block-log', 'yield-log']) else: - print >> sys.stderr, 'Unknown command:', sys.argv[1] + print >> sys.stderr, 'Unknown command:', argv[1] -sys.exit(main(sys.argv)) +if __name__ == '__main__': + sys.exit(main(sys.argv)) Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-14 18:29:46 UTC (rev 1132) +++ ydb/trunk/tools/test.bash 2009-01-17 18:56:53 UTC (rev 1133) @@ -269,6 +269,7 @@ full-scaling() { local base=$1 out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) shift + ln -sf $out scaling-log for n in {1..5} ; do # configurations export range="$base $((base + n))" stop @@ -281,13 +282,12 @@ echo done done >& $out - ln -sf $out scaling-log } run-helper() { local leader=$1 shift - tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) ${extraargs:-}" & # -v --debug-threads + tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & # -v --debug-threads sleep .1 # pexpect 'waiting for at least' # Run initial replicas. while (( $# > 1 )) ; do @@ -312,7 +312,7 @@ } full-run() { - for seqno in 100000 300000 500000 700000 900000; do # configurations + for seqno in 500000 400000 300000 200000 100000 ; do # 200000 300000 400000 500000 ; do # 700000 900000; do # configurations stop for i in {1..5} ; do # trials echo === seqno=$seqno i=$i === @@ -327,20 +327,20 @@ full-block() { local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out block-log full-run >& $out - ln -sf $out block-log } full-yield() { local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out yield-log extraargs='--yield-catch-up' full-run >& $out - ln -sf $out yield-log } full() { - #full-block + full-block full-yield - #full-scaling + full-scaling } stop-helper() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-14 18:29:56
|
Revision: 1132 http://assorted.svn.sourceforge.net/assorted/?rev=1132&view=rev Author: yangzhang Date: 2009-01-14 18:29:46 +0000 (Wed, 14 Jan 2009) Log Message: ----------- easier to read Modified Paths: -------------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-14 18:29:07 UTC (rev 1131) +++ ydb/trunk/tools/analysis.py 2009-01-14 18:29:46 UTC (rev 1132) @@ -57,8 +57,10 @@ yield (seqno, t) tups = agg(getpairs()) + print '===', label, '===' print 'max seqno, mean time, stdev time [raw data]' for seqno, mean, sd, raw in tups: print seqno, mean, sd, raw + print xs, ys, es, rs = zip(*tups) errorbar(xs, ys, es, label = label) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-14 18:29:16
|
Revision: 1131 http://assorted.svn.sourceforge.net/assorted/?rev=1131&view=rev Author: yangzhang Date: 2009-01-14 18:29:07 +0000 (Wed, 14 Jan 2009) Log Message: ----------- oops Modified Paths: -------------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-01-14 18:25:52 UTC (rev 1130) +++ ydb/trunk/tools/analysis.py 2009-01-14 18:29:07 UTC (rev 1131) @@ -65,7 +65,7 @@ title('Recovery time over number of transactions') xlabel('Transaction count (corresponds roughly to data size)') - ylabel('Mean TPS (stdev error bars)') + ylabel('Mean recovery time in ms (stdev error bars)') #xlim(.5, n+.5) #ylim(ymin = 0) savefig('run.png') This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-14 18:26:15
|
Revision: 1130 http://assorted.svn.sourceforge.net/assorted/?rev=1130&view=rev Author: yangzhang Date: 2009-01-14 18:25:52 +0000 (Wed, 14 Jan 2009) Log Message: ----------- - added analysis.py for aggregating and plotting measurement results (collected from test.bash) - added --dump for finally dumping state - added --exit-on-recovery to make automated runs easier - added --issuing-interval for debugging - fixed bug with response_handler hanging forever if the sequence numbers happen to be caught up; very visible with --issuing-interval - added scaling, full-scaling, full-yield, full-block, much more to test.bash - a bunch of general refactoring of test.bash - documented measurement/analysis tools, tool reqs, more usage notes in general, and personal TODOs/notes - added timestamping to tagssh - fixed bug with process_txn throws a timeout exception trying to sendmsg - added --read-thresh for debugging time spent waiting to read from network socket - added SIGUSR1 pausing - refactored threadnames - improved thread switching callback debugging - refactored/improved thread exception printing - added debugging of large message sending - added sendmsg timeout warnings - added --general-txns so system defaults to insert/update txns - added --count-updates, --show-updates - fixed after-recovery statistics bookkeeping - fixed stop-responsiveness in response_handler - added exception printing in case something goes awry before the RAII thread joins in the main `run_*` functions - removed the breaking in handle_sig_sync Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Added Paths: ----------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/README 2009-01-14 18:25:52 UTC (rev 1130) @@ -1,7 +1,7 @@ Overview -------- -YDB (Yang's Database) is a simple replicated memory store, developed for the +ydb (Yang's Database) is a simple replicated memory store, developed for the purpose of researching various approaches to recovery in such OLTP-optimized databases as [VOLTDB] (formerly H-Store/Horizontica). @@ -25,7 +25,7 @@ Setup ----- -Requirements: +Requirements for the ydb system: - [boost] 1.37 - [C++ Commons] svn r1082 @@ -43,6 +43,16 @@ [Protocol Buffers]: http://code.google.com/p/protobuf/ [State Threads]: http://state-threads.sourceforge.net/ +Requirements for tools: + +- [Assorted Shell Tools] (bash-commons, mssh) +- [Pylab] 0.98.3 +- [Python] 2.5 + +[Assorted Shell Tools]: http://assorted.sf.net/shell-tools/ +[Pylab]: http://matplotlib.sf.net/ +[Python]: http://python.org/ + Usage ----- @@ -74,23 +84,72 @@ sigint to try to force all working threads to shut down (any node, including replicas, respond to ctrl-c). -Full System Test ----------------- +To pause/resume the issuing of transactions, send a sigusr1 to the leader. - ./test.bash full +Measurements +------------ +Included is a suite of scripts to run ydb on the PMG farm machines. It is from +this deployment that performance measurements are collected. + + ./test.bash full-setup + will configure all the farm machines to (1) have my proper initial environment, (2) have all the prerequisite software, and (3) build ydb. This may take a -long time (particularly the boost-building phase). +long time (particularly the boost-building phase). Subsequently, - range='10 13' wait=5 ./test.bash run + ./test.bash setup-ydb -will run a leader on farm10, replicas on farm11 and farm12, and a recovering -replica on farm13 after 5 seconds. Pipe several runs of this to some files -(`*.out`), and plot the results with +should be sufficient for pushing out the source from the current working copy +of the source repository and building on each machine. - ./test.bash plot *.out +Find out which of the farm machines is free by looking at the top 3 items from +`top` on each machine: + ./test.bash hosttops + +Most commands you pass to `test.bash` accept (and some require) a set of hosts +on which to run. Look at the comment documentation in test.bash to find out +more about each function (command). You must specify the hosts by setting +either the `hosts` environment variable to a string of space-separated +hostnames or an array of hostnames, or you may set `range` to a string of a +start and end number to select all hosts from `farm<START>` to `farm<END>`. +Examples (the following all run in parallel across the specified hosts): + + hosts='farm3 farm5 farm7' ./test.bash full-setup + hosts=(farm2 farm3 farm4) ./test.bash setup-ydb # Arrays are also accepted. + range='2 4' ./test.bash hosttops # Same as last line. + +### Recovery experiments + +To run a leader on `farm10`, initial replicas on `farm11` and `farm12`, and a +recovering replica on `farm13` after 5 seconds: + + range='10 13' ./test.bash run 1000 # Command requires exactly 4 nodes. + +To run this experiment TODO +trials: + + range='10 13' + +### Scaling experiments + +To run a leader on `farm10` with initial replicas on the rest: + + range='10 15' ./test.bash scaling + +To run for 1 through 3 initial replicas, repeating each configuration for 3 +trials: + + range='10 13' ./test.bash full-scaling + +Pipe several runs of this to the file `scaling-log`, and plot the results with + + ./analysis.py scaling + +Hence the name "scaling"---this was a test of the scalability of the base +system (no recovery involved). + Recovery Mechanisms ------------------- @@ -98,8 +157,9 @@ - Network recovery - From a single node - - Interleave the state recovery/catch up with the backlogging of live txns - - Recover/catch up in one swoop, then backlog the live txns + - **block**: Backlog live txns, then recover/catch up in one swoop + - **yield**: Interleave the state recovery/catch up with the backlogging of + live txns Pseudo-code ----------- @@ -133,12 +193,14 @@ foreach replica connect to replica recv recovery msg from replica - apply the state - apply backlog + apply the state (regularly yielding if desired) + apply backlog (regularly yielding if desired) Todo ---- +- Add a way to reliably obtain ST stack traces + - Add benchmarking/testing hooks, e.g.: - start the recovering joiner at a well-defined time (after a certain # txns or after the DB reaches a certain size) @@ -150,6 +212,12 @@ - time to recover - bytes used to recover +- Produce time series graphs of the txn throughput and mark the events on the + x-axis, which also conveys the duration of the various phases + - Overlay onto this the various recovery schemes + - Main benchmark: wait until the state grows to a certain size, then start + the recovery + - Run some benchmarks, esp. on multiple physical hosts. - Figure out why things are running so slowly with >2 replicas. @@ -169,3 +237,25 @@ - Add richer transactions/queries/operations. - Add disk-based recovery methods. + +Plan/Notes +---------- + +Measurements + +- DONE find out how often prng yields same number + - not very often +- DONE baseline scaling (tps with number of nodes) + - inversely proportional to number of nodes, so bottlenecked at leader +- DONE recovery time as a function of amount of data + - TODO break down into various phases using bar graph of segmented bars +- DONE use only insert (and update) txns +- TODO try profiling +- TODO detailed view of tps during recovery over time (should see various phases) +- TODO later: runtime overhead of logging/tps under normal operation (scaled + with # nodes?) + +Presentation + +- TODO differences from: harbor, harp, aries +- TODO understand 2pc, paxos, etc. Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/src/main.lzz.clamp 2009-01-14 18:25:52 UTC (rev 1130) @@ -28,12 +28,22 @@ #end typedef pair<int, int> pii; + +// Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno; -bool verbose, yield_during_build_up, yield_during_catch_up; -long long timelim; +int chkpt, accept_joiner_seqno, issuing_interval; +size_t accept_joiner_size; +bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, + count_updates, stop_on_recovery, general_txns; +long long timelim, read_thresh; + +// Control. st_intr_bool stop_hub, kill_hub; +st_bool do_pause; +// Statistics. +int updates; + /** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. @@ -50,7 +60,53 @@ ~thread_eraser() { threads.erase(st_thread_self()); } }; +map<st_thread_t, string> threadnames; +st_thread_t last_thread; + /** + * Look up thread name, or just show thread ID. + */ +string +threadname(st_thread_t t = st_thread_self()) { + if (threadnames.find(t) != threadnames.end()) { + return threadnames[t]; + } else { + return lexical_cast<string>(t); + } +} + +/** + * Debug function for thread names. Remember what we're switching from. + */ +void +switch_out_cb() +{ + last_thread = st_thread_self(); +} + +/** + * Debug function for thread names. Show what we're switching from/to. + */ +void switch_in_cb() +{ + if (last_thread != st_thread_self()) { + cout << "switching"; + if (last_thread != 0) cout << " from " << threadname(last_thread); + cout << " to " << threadname() << endl; + } +} + +/** + * Print to cerr a thread exception. + */ +ostream& +cerr_thread_ex(const std::exception &ex) +{ + return cerr << "exception in thread " << threadname() + << ": " << ex.what(); +} + +/** * Delegate for running thread targets. * \param[in] f The function to execute. * \param[in] intr Whether to signal stop_hub on an exception. @@ -61,9 +117,8 @@ thread_eraser eraser; try { f(); - } catch (const std::exception &ex) { - cerr << "thread " << st_thread_self() << ": " << ex.what() - << (intr ? "; interrupting!" : "") << endl; + } catch (std::exception &ex) { + cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; if (intr) stop_hub.set(); } } @@ -120,6 +175,7 @@ public: st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} ~st_closing_all_infos() { + cout << "closing all conns to replicas (replica_infos)" << endl; foreach (replica_info r, rs_) check0x(st_netfd_close(r.fd())); } @@ -154,15 +210,33 @@ check(msg.SerializeToString(&s)); const char *buf = s.c_str(); + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + // Prefix the message with a four-byte length. uint32_t len = htonl(static_cast<uint32_t>(s.size())); // Broadcast the length-prefixed message to replicas. + int dstno = 0; foreach (st_netfd_t dst, dsts) { - checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), - static_cast<ssize_t>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(s.size())); + size_t resid = sizeof len; +#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) + int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + if (res == -1 && errno == ETIME) { + cerr << "got timeout! " << resid << " of " << sizeof len + << " remaining, for dst #" << dstno << endl; + checksize(st_write(dst, + reinterpret_cast<char*>(&len) + sizeof len - resid, + resid, + ST_UTIME_NO_TIMEOUT), + resid); + } else { + check0x(res); + } + checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + dstno++; } } @@ -237,10 +311,13 @@ }); while (!stop_hub) { - // Did we get a new member? + // Did we get a new member? If so, notify an arbitrary member (the first + // one) to prepare to send recovery information (by sending an + // empty/default Txn). if (!newreps.empty() && seqno > 0) { sendmsg(fds[0], Txn()); } + // Bring in any new members. while (!newreps.empty()) { fds.push_back(newreps.take().fd()); } @@ -251,12 +328,14 @@ int count = randint(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = randint(3), rkey = randint(), rvalue = randint(); + int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); } + if (do_pause) do_pause.waitreset(); + // Broadcast. bcastmsg(fds, txn); @@ -266,11 +345,14 @@ cout << "issued txn " << txn.seqno() << endl; if (timelim > 0 && current_time_millis() - start_time > timelim) { cout << "time's up; issued " << txn.seqno() << " txns in " << timelim - << " ms" << endl; + << " ms" << endl; stop_hub.set(); } st_sleep(0); } + if (issuing_interval > 0) { + st_sleep(issuing_interval); + } if (txn.seqno() == accept_joiner_seqno) { accept_joiner.set(); @@ -282,7 +364,7 @@ * Process a transaction: update DB state (incl. seqno) and send response to * leader. */ -void + void process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, bool caught_up) { @@ -293,22 +375,29 @@ seqno = txn.seqno(); for (int o = 0; o < txn.op_size(); o++) { const Op &op = txn.op(o); + const int key = op.key(); + if (show_updates || count_updates) { + if (map.find(key) != map.end()) { + if (show_updates) cout << "existing key: " << key << endl; + if (count_updates) updates++; + } + } switch (op.type()) { case Op::read: - res.add_result(map[op.key()]); + res.add_result(map[key]); break; case Op::write: - map[op.key()] = op.value(); + map[key] = op.value(); break; case Op::del: - map.erase(op.key()); + map.erase(key); break; } } sendmsg(leader, res); } -void + void showtput(const string &action, long long stop_time, long long start_time, int stop_count, int start_count) { @@ -316,7 +405,7 @@ int count_diff = stop_count - start_count; double rate = double(count_diff) * 1000 / time_diff; cout << action << " " << count_diff << " txns in " << time_diff << " ms (" - << rate << "tps)" << endl; + << rate << "tps)" << endl; } /** @@ -338,7 +427,7 @@ * leader. Not entirely clear that this is necessary; could probably just go * with seqno. */ -void + void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno) @@ -349,27 +438,38 @@ int seqno_caught_up = caught_up ? seqno : -1; finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(shared_ptr<Recovery>()); - }); + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); while (true) { Txn txn; + long long before_read; + if (read_thresh > 0) { + before_read = current_time_millis(); + } { st_intr intr(stop_hub); readmsg(leader, txn); } - + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "current_time_millis() - before_read = " << read_time << " > " + << read_thresh << endl; + } + } if (txn.has_seqno()) { + const char *action; if (txn.seqno() == seqno + 1) { if (!caught_up) { time_caught_up = current_time_millis(); @@ -379,20 +479,26 @@ caught_up = true; } process_txn(leader, map, txn, seqno, true); + action = "processed"; } else { // Queue up for later processing once a snapshot has been received. backlog.push(shared_ptr<Txn>(new Txn(txn))); + action = "backlogged"; } if (txn.seqno() % chkpt == 0) { - if (verbose) - cout << "processed txn " << txn.seqno() - << "; db size = " << map.size() << endl; + if (verbose) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } st_sleep(0); } } else { // Generate a snapshot. shared_ptr<Recovery> recovery(new Recovery); + cout << "generating recovery of " << map.size() << " records" << endl; foreach (const pii &p, map) { Recovery_Pair *pair = recovery->add_pair(); pair->set_key(p.first); @@ -408,7 +514,7 @@ /** * Swallow replica responses. */ -void + void handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { @@ -418,38 +524,87 @@ recovery_end_time = -1; int recovery_start_seqno = caught_up ? -1 : seqno, recovery_end_seqno = -1; + int last_seqno = -1; finally f(lambda () { long long end_time = current_time_millis(); - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); + if (__ref(recovery_end_time) > -1) { + cout << __ref(rid) << ": "; + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); + } }); while (true) { + finally f(lambda () { + // TODO: convert the whole thing to an object so that we can have "scoped + // globals". + long long &recovery_start_time = __ref(recovery_start_time); + int &recovery_start_seqno = __ref(recovery_start_seqno); + long long &recovery_end_time = __ref(recovery_end_time); + int &recovery_end_seqno = __ref(recovery_end_seqno); + long long &start_time = __ref(start_time); + const int &seqno = __ref(seqno); + int &rid = __ref(rid); + st_channel<long long> &sub = __ref(sub); + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + cout << rid << ": "; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + }); Response res; - { + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + readmsg(replica, res); + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "stopping seqno = " << res.seqno() << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). st_intr intr(kill_hub); readmsg(replica, res); } - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = seqno; - cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); - } + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. if (!caught_up && res.caught_up()) { - long long t = current_time_millis(), timediff = t - start_time; + long long now = current_time_millis(), timediff = now - start_time; caught_up = true; - recover_signals.push(t); + recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " << timediff << "ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } } if (res.seqno() % chkpt == 0) { if (verbose) { @@ -458,12 +613,7 @@ } st_sleep(0); } - // This is OK since the seqno will never grow again if stop_hub is set. - if (stop_hub && res.seqno() + 1 == seqno) { - cout << rid << ": "; - cout << "stopping seqno = " << res.seqno() << endl; - break; - } + last_seqno = res.seqno(); } } @@ -499,9 +649,10 @@ } st_closing closing(joiner); - cout << "got joiner's connection, sending recovery" << endl; + cout << "got joiner's connection, sending recovery of " + << recovery->pair_size() << " records" << endl; sendmsg(joiner, *recovery); - cout << "sent" << endl; + cout << "sent recovery" << endl; } /** @@ -556,36 +707,42 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), true), - "handle_responses")); - } + try { + // Start handling responses. + st_thread_group handlers; + int rid = 0; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true), + "handle_responses")); + } - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); + } + Join join = readmsg<Join>(joiner); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); + recover_signals.push(current_time_millis()); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false), + "handle_responses_joiner")); + } catch (std::exception &ex) { + // TODO: maybe there's a cleaner way to do this final step before waiting with the join + cerr_thread_ex(ex) << endl; + throw; } - Join join = readmsg<Join>(joiner); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, - ref(recover_signals), false), - "handle_responses")); } /** @@ -598,13 +755,18 @@ map<int, int> map; int seqno = -1; finally f(lambda () { + cout << "REPLICA SUMMARY" << endl; + cout << "total updates = " << updates << endl; + cout << "final DB state: seqno = " << __ref(seqno) << ", size = " + << __ref(map).size() << endl; string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - cout << "dumping DB state (seqno = " << __ref(seqno) << ", size = " - << __ref(map).size() << ") to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, __ref(map)) { - of << p.first << ": " << p.second << endl; + if (dump) { + cout << "dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << __ref(seqno) << endl; + foreach (const pii &p, __ref(map)) { + of << p.first << ": " << p.second << endl; + } } }); st_channel<shared_ptr<Recovery> > send_states; @@ -658,39 +820,46 @@ ref(seqno), ref(send_states)), "recover_joiner")); - // If there's anything to recover. - if (init.txnseqno() > 0) { - cout << "waiting for recovery from " << replicas[0] << endl; + try { + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery from " << replicas[0] << endl; - // Read the recovery message. - Recovery recovery; - { - st_intr intr(stop_hub); - readmsg(replicas[0], recovery); - } - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); - if (i % chkpt == 0) { - if (yield_during_build_up) st_sleep(0); + // Read the recovery message. + Recovery recovery; + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); } - } - assert(seqno == -1 && - static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered." << endl; + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered " << recovery.pair_size() << " records." << endl; - while (!backlog.empty()) { - shared_ptr<Txn> p = backlog.take(); - process_txn(leader, map, *p, seqno, false); - if (p->seqno() % chkpt == 0) { - if (verbose) - cout << "processed txn " << p->seqno() << " off the backlog" << endl; - if (yield_during_catch_up) + while (!backlog.empty()) { + shared_ptr<Txn> p = backlog.take(); + process_txn(leader, map, *p, seqno, false); + if (p->seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) st_sleep(0); + } } + cout << "caught up." << endl; } - cout << "caught up." << endl; + } catch (std::exception &ex) { + cerr_thread_ex(ex) << endl; + throw; } stop_hub.insert(st_thread_self()); @@ -726,22 +895,13 @@ foreach (st_thread_t t, threads) { st_thread_interrupt(t); } + } else if (sig == SIGUSR1) { + toggle(do_pause); } - break; + //break; } } -map<st_thread_t, string> threadnames; - -void cb() -{ - if (threadnames.find(st_thread_self()) != threadnames.end()) { - cout << "switched to: " << threadnames[st_thread_self()] << endl; - } else { - cout << "switched to: " << st_thread_self() << endl; - } -} - /** * Initialization and command-line parsing. */ @@ -763,18 +923,38 @@ ("help,h", "show this help message") ("debug-threads,d",po::bool_switch(&debug_threads), "enable context switch debug outputs") - ("verbose,v", "enable periodic printing of txn processing progress") + ("verbose,v", po::bool_switch(&verbose), + "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") ("yield-build-up", po::bool_switch(&yield_during_build_up), "yield periodically during build-up phase of recovery") ("yield-catch-up", po::bool_switch(&yield_during_catch_up), "yield periodically during catch-up phase of recovery") + ("dump,D", po::bool_switch(&dump), + "replicas should finally dump their state to a tmp file for " + "inspection/diffing") + ("show-updates,U", po::bool_switch(&show_updates), + "log operations that touch (update/read/delete) an existing key") + ("count-updates,u",po::bool_switch(&count_updates), + "count operations that touch (update/read/delete) an existing key") + ("general-txns,g", po::bool_switch(&general_txns), + "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") + ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), + "exit after the joiner fully recovers (for leader only)") + ("accept-joiner-size,s", + po::value<size_t>(&accept_joiner_size)->default_value(0), + "accept recovering joiner (start recovery) after DB grows to this size " + "(for leader only)") + ("issuing-interval,i", + po::value<int>(&issuing_interval)->default_value(0), + "seconds to sleep between issuing txns (for leader only)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), - "accept recovering joiner (start recovery) after this seqno") + "accept recovering joiner (start recovery) after this seqno (for leader " + "only)") ("leader-host,H", po::value<string>(&leader_host)->default_value(string("localhost")), "hostname or address of the leader") @@ -784,10 +964,13 @@ ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), - "time limit in milliseconds, or 0 for none") + "general network IO time limit in milliseconds, or 0 for none") + ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), + "if positive and any txn read exceeds this, then print a message " + "(for replicas only)") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), "port to listen on (replicas only)") - ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), "timeout for IO operations (in microseconds)") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); @@ -813,13 +996,16 @@ check0x(sigemptyset(&sa.sa_mask)); sa.sa_flags = 0; check0x(sigaction(SIGINT, &sa, nullptr)); + check0x(sigaction(SIGTERM, &sa, nullptr)); + check0x(sigaction(SIGUSR1, &sa, nullptr)); // Initialize ST. if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); st_spawn(bind(handle_sig_sync)); if (debug_threads) { - st_set_switch_in_cb(cb); + st_set_switch_out_cb(switch_out_cb); + st_set_switch_in_cb(switch_in_cb); } // Initialize thread manager for clean shutdown of all threads. @@ -835,9 +1021,9 @@ } return 0; - } catch (const std::exception &ex) { + } catch (std::exception &ex) { // Must catch all exceptions at the top to make the stack unwind. - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + cerr_thread_ex(ex) << endl; return 1; } } Added: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py (rev 0) +++ ydb/trunk/tools/analysis.py 2009-01-14 18:25:52 UTC (rev 1130) @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +from __future__ import with_statement +import re, sys, itertools, numpy +from pylab import * + +def check(path): + with file(path) as f: + if 'got timeout' in f.read(): + print 'warning: timeout occurred' + +def agg(src): + def gen(): + for seqno, pairs in itertools.groupby(src, lambda (a,b): a): + ts = numpy.array([t for seqno, t in pairs]) + yield seqno, ts.mean(), ts.std(), ts + return list(gen()) + +def scaling(path): + check(path) + def getpairs(): + with file(path) as f: + for line in f: + m = re.match( r'=== n=(?P<n>\d+) ', line ) + if m: + n = int(m.group('n')) + m = re.match( r'.*: issued .*[^.\d](?P<tps>[.\d]+)tps', line ) + if m: + tps = float(m.group('tps')) + yield (n, tps) + tups = agg(getpairs()) + + print 'num nodes, mean tps, stdev tps' + for n, mean, sd, raw in tups: print n, mean, sd, raw + + xs, ys, es, rs = zip(*tups) + errorbar(xs, ys, es) + title('Scaling of baseline throughput with number of nodes') + xlabel('Node count') + ylabel('Mean TPS (stdev error bars)') + xlim(.5, n+.5) + ylim(ymin = 0) + savefig('scaling.png') + +def run(blockpath, yieldpath): + for path, label in [(blockpath, 'blocking scheme'), (yieldpath, 'yielding scheme')]: + check(path) + def getpairs(): + with file(path) as f: + for line in f: + m = re.match( r'=== seqno=(?P<n>\d+) ', line ) + if m: + seqno = int(m.group('n')) + m = re.match( r'.*: recovering node caught up; took (?P<time>\d+)ms', line ) + if m: + t = float(m.group('time')) + yield (seqno, t) + tups = agg(getpairs()) + + print 'max seqno, mean time, stdev time [raw data]' + for seqno, mean, sd, raw in tups: print seqno, mean, sd, raw + + xs, ys, es, rs = zip(*tups) + errorbar(xs, ys, es, label = label) + + title('Recovery time over number of transactions') + xlabel('Transaction count (corresponds roughly to data size)') + ylabel('Mean TPS (stdev error bars)') + #xlim(.5, n+.5) + #ylim(ymin = 0) + savefig('run.png') + +def main(argv): + if len(argv) <= 1: + print >> sys.stderr, 'Must specify a command' + elif sys.argv[1] == 'scaling': + scaling(sys.argv[2] if len(sys.argv) > 2 else 'scaling-log') + elif sys.argv[1] == 'run': + run(*sys.argv[2:] if len(sys.argv) > 2 else ['block-log', 'yield-log']) + else: + print >> sys.stderr, 'Unknown command:', sys.argv[1] + +sys.exit(main(sys.argv)) Property changes on: ydb/trunk/tools/analysis.py ___________________________________________________________________ Added: svn:executable + * Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-01-12 20:04:16 UTC (rev 1129) +++ ydb/trunk/tools/test.bash 2009-01-14 18:25:52 UTC (rev 1130) @@ -8,7 +8,13 @@ script="$(basename "$0")" tagssh() { - ssh "$@" 2>&1 | sed "s/^/$1: /" + ssh "$@" 2>&1 | python -u -c ' +import time, sys +while True: + line = sys.stdin.readline() + if line == "": break + print sys.argv[1], time.time(), ":\t", line, +' $1 } check-remote() { @@ -120,7 +126,7 @@ tagssh "$host" "./$script" "$@" } -allhosts() { +hosts() { if [[ ${host:-} ]] ; then echo $host elif [[ ${range:-} ]] ; then @@ -142,23 +148,27 @@ farm13.csail farm14.csail EOF - fi | xargs ${xargs--P9} -I^ "$@" + fi } -allssh() { - allhosts ssh ^ "set -o errexit -o nounset; $@" +parhosts() { + hosts | xargs ${xargs--P9} -I^ "$@" } -allscp() { - allhosts scp -q "$@" +parssh() { + parhosts ssh ^ "set -o errexit -o nounset; $@" } -allremote() { - allhosts "./$script" remote ^ "$@" +parscp() { + parhosts scp -q "$@" } +parremote() { + parhosts "./$script" remote ^ "$@" +} + init-setup() { - allremote node-init-setup + parremote node-init-setup } get-deps() { @@ -174,7 +184,7 @@ } setup-deps() { - allscp \ + parscp \ /usr/share/misc/config.guess \ /tmp/lzz.static \ /tmp/st-1.8.tar.gz \ @@ -183,33 +193,33 @@ clamp.patch \ ^:/tmp/ - allremote node-setup-lzz - allremote node-setup-st - allremote node-setup-pb - allremote node-setup-boost - allremote node-setup-m4 - allremote node-setup-bison - allremote node-setup-clamp + parremote node-setup-lzz + parremote node-setup-st + parremote node-setup-pb + parremote node-setup-boost + parremote node-setup-m4 + parremote node-setup-bison + parremote node-setup-clamp } setup-ydb() { - allremote node-setup-ydb-1 - rm -r /tmp/{ydb,ccom}-src/ + parremote node-setup-ydb-1 + rm -rf /tmp/{ydb,ccom}-src/ svn export ~/ydb/src /tmp/ydb-src/ svn export ~/ccom/src /tmp/ccom-src/ - allscp -r /tmp/ydb-src/* ^:ydb/src/ - allscp -r /tmp/ccom-src/* ^:ccom/src/ - allremote node-setup-ydb-2 + parscp -r /tmp/ydb-src/* ^:ydb/src/ + parscp -r /tmp/ccom-src/* ^:ccom/src/ + parremote node-setup-ydb-2 } -full() { +full-setup() { init-setup setup-deps setup-ydb } hostinfos() { - xargs= allssh " + xargs= parssh " echo hostname echo ===== @@ -219,7 +229,7 @@ } hosttops() { - xargs= allssh " + xargs= parssh " echo hostname echo ===== @@ -227,51 +237,134 @@ " } -range2args() { - "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') +hostargs() { + if [[ $range ]] + then "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') + else "$@" ${hosts[@]} + fi } -run-helper() { - tagssh $1 "ydb/src/ydb -l" & +scaling-helper() { + local leader=$1 + shift + tagssh $leader "ydb/src/ydb -l -n $#" & sleep .1 - tagssh $2 "ydb/src/ydb -H $1" & - tagssh $3 "ydb/src/ydb -H $1" & + for rep in "$@" + do tagssh $rep "ydb/src/ydb -n $# -H $leader" & + done sleep ${wait1:-10} - tagssh $4 "ydb/src/ydb -H $1" & - if [[ ${wait2:-} ]] - then sleep $wait2 - else read + tagssh $leader 'pkill -sigint ydb' + wait +} + +# This just tests how the system scales; no recovery involved. +scaling() { + hostargs scaling-helper +} + +# Repeat some experiment some number of trials and for some number of range +# configurations; e.g., "repeat scaling". +# TODO: fix this to work also with `hosts`; move into repeat-helper that's run +# via hostargs, and change the range= to hosts= +full-scaling() { + local base=$1 out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + shift + for n in {1..5} ; do # configurations + export range="$base $((base + n))" + stop + for i in {1..5} ; do # trials + echo === n=$n i=$i === + scaling + sleep 1 + stop + sleep .1 + echo + done + done >& $out + ln -sf $out scaling-log +} + +run-helper() { + local leader=$1 + shift + tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) ${extraargs:-}" & # -v --debug-threads + sleep .1 # pexpect 'waiting for at least' + # Run initial replicas. + while (( $# > 1 )) ; do + tagssh $1 "ydb/src/ydb -H $leader" & + shift + done + sleep .1 # pexpect 'got all \d+ replicas' leader + # Run joiner. + tagssh $1 "ydb/src/ydb -H $leader" & # -v --debug-threads -t 200000" & + if false ; then + if [[ ${wait2:-} ]] + then sleep $wait2 + else read + fi + tagssh $leader "pkill -sigint ydb" fi - tagssh $1 "pkill -sigint ydb" + wait } run() { - range2args run-helper + hostargs run-helper } +full-run() { + for seqno in 100000 300000 500000 700000 900000; do # configurations + stop + for i in {1..5} ; do # trials + echo === seqno=$seqno i=$i === + run + sleep 1 + stop + sleep .1 + echo + done + done +} + +full-block() { + local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) + full-run >& $out + ln -sf $out block-log +} + +full-yield() { + local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) + extraargs='--yield-catch-up' full-run >& $out + ln -sf $out yield-log +} + +full() { + #full-block + full-yield + #full-scaling +} + stop-helper() { - tagssh $1 'pkill ydb' + tagssh $1 'pkill -sigint ydb' } stop() { - range2args stop-helper + hostargs stop-helper } kill-helper() { - tagssh $1 'pkill ydb' - tagssh $2 'pkill ydb' - tagssh $3 'pkill ydb' - tagssh $4 'pkill ydb' + for i in "$@" + do tagssh $i 'pkill ydb' + done } kill() { - range2args kill-helper + hostargs kill-helper } -#plot() { -# for i in "$@" ; do -# sed "s/farm$i.csail//" < "$i" -# done -#} +# Use mssh to log in with password as root to each machine. +mssh-root() { + : "${hosts:="$(hosts)"}" + mssh -l root "$@" +} "$@" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-12 20:04:24
|
Revision: 1129 http://assorted.svn.sourceforge.net/assorted/?rev=1129&view=rev Author: yangzhang Date: 2009-01-12 20:04:16 +0000 (Mon, 12 Jan 2009) Log Message: ----------- added arrays demo Added Paths: ----------- sandbox/trunk/src/bash/arrays.bash Added: sandbox/trunk/src/bash/arrays.bash =================================================================== --- sandbox/trunk/src/bash/arrays.bash (rev 0) +++ sandbox/trunk/src/bash/arrays.bash 2009-01-12 20:04:16 UTC (rev 1129) @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +# $@ and $* cannot be treated as arrays! + +echo ${BASH_ARGV[0]} + +for (( i = 1; i < $# - 1; i++ )) +do echo ${BASH_ARGV[$i]} +done + +echo ${BASH_ARGV[$(($#-1))]} Property changes on: sandbox/trunk/src/bash/arrays.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-12 06:35:32
|
Revision: 1128 http://assorted.svn.sourceforge.net/assorted/?rev=1128&view=rev Author: yangzhang Date: 2009-01-12 06:35:21 +0000 (Mon, 12 Jan 2009) Log Message: ----------- added mssh; tweaked env vars; updated mkcdbs Modified Paths: -------------- shell-tools/trunk/README shell-tools/trunk/src/bash-commons/bashrc.bash shell-tools/trunk/src/bash-commons/common.bash Added Paths: ----------- shell-tools/trunk/src/mssh.py Modified: shell-tools/trunk/README =================================================================== --- shell-tools/trunk/README 2009-01-09 08:43:53 UTC (rev 1127) +++ shell-tools/trunk/README 2009-01-12 06:35:21 UTC (rev 1128) @@ -101,10 +101,16 @@ `bootstrap- Fetch, build, and install cabal. bash, ghc6 cabal` + +`mssh` Run ssh commands on multiple hosts Python, + (sequentially), but also be able to enter a [Pexpect] + common password (useful when keys aren't set + up. -------------------------------------------------------------------------------- [HSH]: http://software.complete.org/hsh/ [Python Commons]: http://assorted.sf.net/python-commons/ +[Pexpect]: http://pexpect.sf.net/ Usage ----- Modified: shell-tools/trunk/src/bash-commons/bashrc.bash =================================================================== --- shell-tools/trunk/src/bash-commons/bashrc.bash 2009-01-09 08:43:53 UTC (rev 1127) +++ shell-tools/trunk/src/bash-commons/bashrc.bash 2009-01-12 06:35:21 UTC (rev 1128) @@ -32,7 +32,8 @@ ################################################################ # personal environment variables (TODO somehow factor this out) -export EMAIL='Yang Zhang <y_...@mi...>' +export EMAIL='Yang Zhang <ya...@gm...>' +export DEBEMAIL='ya...@gm...' # TODO this is from sht; mv it to a mini-commons # TODO is this necessary in light of advanced var expansions? Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2009-01-09 08:43:53 UTC (rev 1127) +++ shell-tools/trunk/src/bash-commons/common.bash 2009-01-12 06:35:21 UTC (rev 1128) @@ -513,9 +513,17 @@ mv "$path" "${path%.*}" } +# Set up a new Debian CDBS package but remove all the boilerplate cruft and +# tweak the files. mkcdbs() { - echo -e 'b\n\n' | dh_make --createorig + local flags= pkg="$(basename "${PWD%-*}")" ver="${PWD##*-}" + if [[ ! -d ../$pkg-$ver.orig && ! -f ../${pkg}_$ver.orig.tar.gz ]] + then flags=--createorig + fi + echo | dh_make --cdbs --copyright gpl $flags rm debian/{dirs,docs,README*,*.ex,*.EX} + sed -i 's/-1) unstable;/-1ubuntu1) intrepid;/' debian/changelog + sed -i 's/Standards-Version: .*/Standards-Version: 3.8.0/' debian/control } #if ! is_declared indent ; then Added: shell-tools/trunk/src/mssh.py =================================================================== --- shell-tools/trunk/src/mssh.py (rev 0) +++ shell-tools/trunk/src/mssh.py 2009-01-12 06:35:21 UTC (rev 1128) @@ -0,0 +1,34 @@ +#!/usr/bin/env python + +""" +Run ssh commands on multiple hosts (sequentially), but also be able to take a +password and forward it on to each of the hosts. This is useful when SSH keys +are not set up on the hosts. The hosts that you log into are read from the +`hosts` environment variable. Examples of invocation: + + hosts='farm2 farm3 farm4' mssh -l root 'usermod -a -G admin yang' + hosts='farm2 farm3 farm4' mssh -l root 'echo ... >> .ssh/authorized_keys' + +This is also a nice, simple demo of how tty/pty stuff works. +""" + +import getpass, os, pexpect, sys, time +passwd = getpass.getpass("Password: ", open("/dev/tty", "w", 0)) +try: + for host in os.getenv("hosts").split(): + child = pexpect.spawn("ssh", [host] + sys.argv[1:]) + child.expect("password: ") + child.sendline(passwd) + try: + child.expect("password: ") + except pexpect.EOF: # Label and show the output + print "==", host, "==", child.before + else: # Actually got prompted for password again + child.sendcontrol("c") + try: child.read() + except pexpect.EOF: pass + raise Exception("bad passwd") + finally: + if child.isalive(): child.wait() +except Exception, ex: + print >> sys.stderr, ex Property changes on: shell-tools/trunk/src/mssh.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |