assorted-commits Mailing List for Assorted projects (Page 30)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-02-13 20:54:54
|
Revision: 1177 http://assorted.svn.sourceforge.net/assorted/?rev=1177&view=rev Author: yangzhang Date: 2009-02-13 20:54:46 +0000 (Fri, 13 Feb 2009) Log Message: ----------- - added release to closing - made closingfd publically inherit closing - refactored and cleaned up sockets code - added st_reader - fixed formatting bugs in check messages - added auto_array, array_view Modified Paths: -------------- cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/check.h cpp-commons/trunk/src/commons/closing.h cpp-commons/trunk/src/commons/sockets.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-02-11 19:59:53 UTC (rev 1176) +++ cpp-commons/trunk/src/commons/array.h 2009-02-13 20:54:46 UTC (rev 1177) @@ -2,6 +2,8 @@ #define COMMONS_ARRAY_H #include <boost/scoped_array.hpp> +#include <commons/check.h> +#include <commons/nullptr.h> namespace commons { @@ -14,20 +16,54 @@ template<typename T> class array : public scoped_array<T> { public: - explicit array(size_t n) : scoped_array<T>(new T[n]), n_(n) {} + explicit array(size_t n) : scoped_array<T>(checkpass(new T[n])), n_(n) {} size_t size() { return n_; } + char *end() { return this->get() + n_; } private: size_t n_; }; - //template<typename T> - // class array { - // public: - // explicit array(int n) : a(new T[n]) {} - // private: - // scoped_array<T> a; - // }; + /** + * A release-able array. + */ + template<typename T> + class auto_array { + public: + auto_array(T *p) : p_(p) {} + ~auto_array() { if (p_ != nullptr) delete [] p_; } + T *release() { T *p = p_; p_ = nullptr; return p; } + T *get() { return p_; } + const T *get() const { return p_; } + operator T*() { return p_; } + operator const T*() const { return p_; } + private: + T *p_; + }; + /** + * Move-able, conditionally-scoped array. + * + * TODO: rename to managed_array + */ + template<typename T> + class array_view { + public: + array_view(T *p, bool scoped) : p_(p), scoped_(scoped) {} +#ifdef __GXX_EXPERIMENTAL_CXX0X__ + array_view(array_view<T> &&a) : p_(a.p_), scoped_(a.scoped_) { a.release(); } +#endif + ~array_view() { if (scoped_) delete [] p_; } + T *release() { T *p = p_; p_ = nullptr; scoped_ = false; return p; } + T *get() { return p_; } + const T *get() const { return p_; } + operator T*() { return p_; } + operator const T*() const { return p_; } + bool scoped() const { return scoped_; } + private: + T *p_; + bool scoped_; + }; + } #endif Modified: cpp-commons/trunk/src/commons/check.h =================================================================== --- cpp-commons/trunk/src/commons/check.h 2009-02-11 19:59:53 UTC (rev 1176) +++ cpp-commons/trunk/src/commons/check.h 2009-02-13 20:54:46 UTC (rev 1177) @@ -23,6 +23,8 @@ // // - chkr(): same as chk but returns -1 instead of throwing +// TODO: incorporate __PRETTY_FUNCTION__ + namespace commons { @@ -60,7 +62,7 @@ } } - __attribute__((format(printf, 4, 5))) inline void + __attribute__((format(printf, 4, 5))) void _check(bool cond, const char *file, int line, const char *fmt, ...) { va_list ap; @@ -85,7 +87,11 @@ template<typename T> inline T _checknneg(T x, const char *file, int line) { - _check(x >= 0, file, line, "expecting non-negative, got %d", x); + if (x < 0) { + stringstream ss; + ss << "expecting non-negative, got " << x; + _check(false, file, line, "%s", ss.str().c_str()); + } return x; } @@ -95,7 +101,9 @@ if (x < 0) { int e = errno; errno = 0; - _check(x, file, line, "expecting >=0, got %d: %s", x, strerror(e)); + stringstream ss; + ss << "expecting >=0, got " << x << ": " << strerror(e); + _check(false, file, line, "%s", ss.str().c_str()); } return x; } @@ -106,8 +114,9 @@ if (!x) { int e = errno; errno = 0; - _check(x, file, line, "expecting !=0, got %s: %s", - lexical_cast<string>(x).c_str(), strerror(e)); + stringstream ss; + ss << "expecting !=0, got " << x << ": " << strerror(e); + _check(false, file, line, "%s", ss.str().c_str()); } return x; } Modified: cpp-commons/trunk/src/commons/closing.h =================================================================== --- cpp-commons/trunk/src/commons/closing.h 2009-02-11 19:59:53 UTC (rev 1176) +++ cpp-commons/trunk/src/commons/closing.h 2009-02-13 20:54:46 UTC (rev 1177) @@ -8,13 +8,16 @@ class closing { public: - closing(T x) : x(x) {} - ~closing() { close(x); } + closing(T x) : x(x), scoped(true) {} + ~closing() { if (scoped) close(x); } + T get() { return x; } + T release() { scoped = false; return x; } private: T x; + bool scoped; }; - class closingfd : closing<int> { + class closingfd : public closing<int> { public: closingfd(int x) : closing<int>(x) {} }; Modified: cpp-commons/trunk/src/commons/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/sockets.h 2009-02-11 19:59:53 UTC (rev 1176) +++ cpp-commons/trunk/src/commons/sockets.h 2009-02-13 20:54:46 UTC (rev 1177) @@ -11,11 +11,78 @@ #include <unistd.h> #include <commons/check.h> +#include <commons/closing.h> +#include <commons/nullptr.h> namespace commons { /** + * Create a TCP socket. + */ + int + tcp_socket(bool nb) + { + int fd = checknnegerr(socket(PF_INET, SOCK_STREAM, 0)); + // Make our socket non-blocking if desired. + if (nb) + checknnegerr(fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL, 0))); + return fd; + } + + /** + * Initialize an inet address with just the port. + */ + void + sockaddr_init(sockaddr_in &a, uint16_t port) + { + bzero(&a, sizeof a); + a.sin_family = AF_INET; + a.sin_port = htons(port); + } + + /** + * Initialize an inet address. + */ + void + sockaddr_init(sockaddr_in &a, const char *host, uint16_t port) + { + sockaddr_init(a, port); + if (host == nullptr) { + a.sin_addr.s_addr = htonl(INADDR_ANY); + } else { + // First try to interpret host as a dot-notation string. + if (!inet_aton(host, (struct in_addr *) &a.sin_addr.s_addr)) { + // Now try to resolve the hostname. + struct hostent *res = checkpass(gethostbyname(host)); + memcpy(&a.sin_addr, res->h_addr_list[0], res->h_length); + } + } + } + + /** + * Initialize an inet address. + */ + void + sockaddr_init(sockaddr_in &a, in_addr host, uint16_t port) + { + sockaddr_init(a, port); + a.sin_addr = host; + } + + /** + * Construct an inet address. + */ + template <typename T> + sockaddr_in + make_sockaddr(T host, uint16_t port) + { + sockaddr_in a; + sockaddr_init(a, host, port); + return a; + } + + /** * Create a server socket bound to localhost, with SO_REUSEADDR enabled. * \param[in] port The port to listen on. * \param[in] nb Whether the socket should be non-blocking. @@ -25,34 +92,21 @@ server_socket(uint16_t port, bool nb = false) { // Create the socket. - int fd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); + closingfd c(tcp_socket(nb)); + int fd = c.get(); - try { - // Configure the socket. - int n = 1; - check0x(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast<char *>(&n), sizeof(n))); + // Configure the socket. + int n = 1; + check0x(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast<char*>(&n), sizeof(n))); - // Make our socket non-blocking if desired. - if (nb) { - checknneg(fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL, 0))); - } + // Create the local socket address. + sockaddr_in sa = make_sockaddr(nullptr, port); - // Create the local socket address. - sockaddr_in sa; - bzero(&sa, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = htonl(INADDR_ANY); + // Bind the socket to the local socket address. + check0x(::bind(fd, (sockaddr*) &sa, sizeof sa)); - // Bind the socket to the local socket address. - check0x(::bind(fd, (sockaddr*) &sa, sizeof sa)); - - return fd; - } catch (...) { - close(fd); - throw; - } + return c.release(); } /** @@ -75,6 +129,19 @@ throw; } } + + /** + * Connect to a TCP socket. + */ + int + tcp_connect(const char *host, uint16_t port) + { + closingfd c(tcp_socket(false)); + sockaddr_in a = make_sockaddr(host, port); + check0x(connect(c.get(), reinterpret_cast<sockaddr*>(&a), sizeof a)); + return c.release(); + } + } #endif Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-11 19:59:53 UTC (rev 1176) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-13 20:54:46 UTC (rev 1177) @@ -1,6 +1,7 @@ #ifndef COMMONS_ST_ST_H #define COMMONS_ST_ST_H +#include <algorithm> #include <exception> #include <map> #include <queue> @@ -8,14 +9,17 @@ #include <sstream> #include <st.h> #include <stx.h> +#include <commons/array.h> #include <commons/nullptr.h> // delegates.h must be included after sockets.h due to bind() conflicts. #include <commons/sockets.h> #include <commons/boost/delegates.h> #include <boost/foreach.hpp> #include <boost/function.hpp> +#include <boost/shared_ptr.hpp> #define foreach BOOST_FOREACH +#define shared_ptr boost::shared_ptr namespace commons { @@ -31,7 +35,7 @@ { public: st_closing(st_netfd_t x) : attached(true), x(x) {} - ~st_closing() { if (attached) st_netfd_close(x); } + ~st_closing() { if (attached) check0x(st_netfd_close(x)); } void detach() { attached = false; } private: bool attached; @@ -47,6 +51,7 @@ stfd(st_netfd_t fd) : fd_(fd), sclose(fd) {} st_netfd_t fd() const { return fd_; } operator st_netfd_t() const { return fd_; } + st_netfd_t release() { sclose.detach(); return fd_; } private: const st_netfd_t fd_; st_closing sclose; @@ -94,24 +99,14 @@ st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) { // Create remote socket address. - struct sockaddr_in sa; - bzero(&sa, sizeof sa); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr = host; + sockaddr_in sa = make_sockaddr(host, port); // Create the socket. - int sfd = checknnegerr(socket(PF_INET, SOCK_STREAM, 0)); - st_netfd_t nfd = st_netfd_open_socket(sfd); + stfd s(checkpass(st_netfd_open_socket(tcp_socket(true)))); // Connect. - try { - check0x(st_connect(nfd, (sockaddr*) &sa, sizeof sa, timeout)); - return nfd; - } catch (...) { - check0x(st_netfd_close(nfd)); - throw; - } + check0x(st_connect(s.fd(), reinterpret_cast<sockaddr*>(&sa), sizeof sa, timeout)); + return s.release(); } /** @@ -394,6 +389,100 @@ std::set<st_thread_t> ts; }; + class eof_exception : public std::exception { + const char *what() const throw() { return "EOF"; } + }; + + /** + * Convenience class for reading from sockets. + */ + class st_reader + { + public: + st_reader(st_netfd_t fd, size_t bufsize = 1e7) : + fd_(fd), + buf_(bufsize), + start_(buf_.get()), + end_(buf_.get()) + {} + + /** + * The size of the unconsumed range of bytes. + */ + size_t amt() { return end_ - start_; } + + /** + * The remaining number of bytes in the buffer + */ + size_t rem() { return buf_.end() - end_; } + + /** + * Returns a char array that contains the requested number of bytes. If + * we hit an error or EOF, then an exception is thrown. + */ + array_view<char> read(size_t req = 0, st_utime_t to = ST_UTIME_NO_TIMEOUT) { + // Do we already have the requested data? + if (amt() >= req) { + array_view<char> p(start_, false); + start_ += req; + return p; + } + + // Handle large arrays specially. + if (req > buf_.size()) { + array_view<char> p(checkpass(new char[req]), true); + copy(start_, end_, p.get()); + checkeqnneg(st_read_fully(fd_, p + amt(), req, to), static_cast<ssize_t>(req)); + start_ = end_ = buf_.get(); + return p; + } + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) { + copy(start_, end_, buf_.get()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + + // Keep reading until we have enough. + while (amt() < req) { + ssize_t res = st_read(fd_, end_, rem(), to); + checknneg(res); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (amt() < req) + throw eof_exception(); + + array_view<char> p(start_, false); + start_ += req; + return p; + } + + private: + st_reader(const st_reader &); + + st_netfd_t fd_; + + /** + * The temporary storage buffer. + */ + array<char> buf_; + + /** + * The start of the unconsumed range of bytes. + */ + char *start_; + + /** + * The end of the unconsumed range of bytes. + */ + char *end_; + }; + } #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-11 19:59:56
|
Revision: 1176 http://assorted.svn.sourceforge.net/assorted/?rev=1176&view=rev Author: yangzhang Date: 2009-02-11 19:59:53 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - setup-ydb builds optimized and perftools-enabled - scaling uses perftools profiler - mtcp uses several possible senders/receivers (mult controls whether receiver listens on different ports) - simplified the timing code (last time changed to time the entire operation, not just individual sends, which yielded misleading results before) Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-11 19:59:47 UTC (rev 1175) +++ ydb/trunk/tools/test.bash 2009-02-11 19:59:53 UTC (rev 1176) @@ -194,7 +194,7 @@ refresh-local cd ~/ydb/src make clean - make WTF= + PPROF=1 OPT=1 make WTF= } init-setup() { @@ -393,33 +393,44 @@ scaling-helper() { local leader=$1 shift - tagssh $leader "ydb/src/ydb -l -n $# -X 100000 ${extraargs:-}" & + tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 100000 ${extraargs:-}" & sleep .1 for rep in "$@" - do tagssh $rep "ydb/src/ydb -n $# -H $leader ${extraargs:-}" & + do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader ${extraargs:-}" & done wait } scaling() { hostargs scaling-helper ; } exp-scaling() { minn=0 exp-var scaling stop ; } -# socat app mtcp-helper() { local leader=$1 n=$(( $# - 1 )) - tagssh $leader 'pkill socat || true' + : ${sender:=socat} ${receiver:=socat} + if [[ $receiver == socat ]] + then local mult=1 + fi shift tagssh $leader " - for i in \`seq $n\` ; do - socat TCP4-LISTEN:\$((9876+i)),reuseaddr - > /dev/null & - done + set -x + pkill socat + pkill epperf + pkill stperf + sleep .5 + pgrep socat + pgrep epperf + if [[ $receiver == socat ]] ; then + for i in \`seq $n\` ; do + socat TCP4-LISTEN:\$((9876+i)),reuseaddr - > /dev/null & + done + elif [[ $receiver == socat ]] ; then + fi wait" & sleep .1 { - time { - for i in `seq $n` ; do - tagssh $1 " - if false; then - time python -c ' + for i in `seq $n` ; do + tagssh $1 " + if [[ $use == python ]] ; then + time python -c ' import socket, sys sys.stdout.flush() host, port, i, n = [sys.argv[1]] + map(int, sys.argv[2:]) @@ -427,18 +438,18 @@ s.connect((host, port)) # For baseline benchmarking (how long it takes to generate the msg) # sys.exit(1 == ord(chr(1)*(100000000)[-1])) -s.send(chr(1)*(100000000/n))' $leader $((9876+i)) $((i-1)) $n - elif true ; then - time dd bs=10000 if=/dev/zero count=$((100000000/10000/n)) | - socat - TCP4:$leader:$((9876+i)) - elif false ; then - time /tmp/stperf -s $leader -p $((9876+i)) -i $((i-1)) -n $n - fi" & - shift - done - wait - } +s.send(chr(1)*(100000000/n))' $leader $((9876+i*mult)) $((i-1)) $n + elif [[ $use == socat ]] ; then + time dd bs=10000 if=/dev/zero count=$((100000000/10000/n)) | + socat - TCP4:$leader:$((9876+i*mult)) + elif [[ $use == st ]] ; then + time /tmp/stperf -s $leader -p $((9876+i*mult)) -i $((i-1)) -n $n + fi" & + shift + done + time wait } 2>&1 | fgrep real + wait } mtcp() { hostargs mtcp-helper ; } exp-mtcp() { exp-var mtcp ; } @@ -450,13 +461,13 @@ parssh "pkill stperf || true" tagssh $leader "/tmp/stperf -r -n $n > /dev/null" & sleep .1 - for i in `seq $n` ; do - tagssh $1 " - ( time /tmp/stperf -s $leader -i $((i-1)) -n $n ) 2>&1 | - fgrep real" & - shift - done - wait + { + for i in `seq $n` ; do + tagssh $1 "time /tmp/stperf -s $leader -i $((i-1)) -n $n" & + shift + done + time wait + } 2>&1 | fgrep real } stperf() { hostargs stperf-helper ; } exp-stperf() { exp-var stperf ; } @@ -466,15 +477,16 @@ local leader=$1 n=$(( $# - 1 )) shift parssh "pkill epperf || true" + sleep .1 tagssh $leader "/tmp/epperf $n > /dev/null" & sleep .1 - for i in `seq $n` ; do - tagssh $1 " - ( time /tmp/stperf -s $leader -i $((i-1)) -n $n ) 2>&1 | - fgrep real" & - shift - done - wait + { + for i in `seq $n` ; do + tagssh $1 "time /tmp/stperf -s $leader -i $((i-1)) -n $n" & + shift + done + time wait + } 2>&1 | fgrep real } epperf() { hostargs epperf-helper ; } exp-epperf() { exp-var epperf ; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-11 19:59:50
|
Revision: 1175 http://assorted.svn.sourceforge.net/assorted/?rev=1175&view=rev Author: yangzhang Date: 2009-02-11 19:59:47 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - added simple line-counting tool Added Paths: ----------- ydb/trunk/tools/wc.bash Added: ydb/trunk/tools/wc.bash =================================================================== --- ydb/trunk/tools/wc.bash (rev 0) +++ ydb/trunk/tools/wc.bash 2009-02-11 19:59:47 UTC (rev 1175) @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +cd "$(dirname "$0")/../src/" +wc -l main.lzz.clamp +{ + wc -l main.lzz.clamp + + cat main.lzz.clamp | + perl -n -e 'if (m /#include <commons/s) { s/.*<(.+)>.*/$1/; print }' | + xargs -I_ wc -l /home/yang/ccom/src/_ +} | cut -f1 -d' ' | numsum | xargs -I_ echo _ total Property changes on: ydb/trunk/tools/wc.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-11 19:40:23
|
Revision: 1174 http://assorted.svn.sourceforge.net/assorted/?rev=1174&view=rev Author: yangzhang Date: 2009-02-11 19:40:15 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - added txn batching and --batch-size (still need to do response batching) - cleaned up thread profiling - added bcastmsg_fake to do everything but the actual st_write calls - added and switched to bcastmsg_async to delegate bcasting to separate (bcaster) thread - replaced map with unordered_map; using mii in more places - changed response_handler into an object to reduce the amount of copying overhead in constructing the `finally` functor in the loop - fixed some warnings that were only exposed on optimization (uninitialized values for before_read/before_write) - replaced all i++ with ++i - disabling boost::thread for now - added --suppress-txn-msgs Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-11 19:26:20 UTC (rev 1173) +++ ydb/trunk/src/main.lzz.clamp 2009-02-11 19:40:15 UTC (rev 1174) @@ -7,7 +7,7 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> -#include <boost/thread.hpp> +//#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -24,6 +24,7 @@ #include <set> #include <sys/socket.h> // getpeername #include <sys/types.h> // ssize_t +#include <tr1/unordered_map> #include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" @@ -33,19 +34,28 @@ using namespace commons; using namespace std; using namespace testing; +using namespace tr1; + +#define GETMSG(buf) \ +checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ +if (stop_time != nullptr) \ + *stop_time = current_time_millis(); \ +check(msg.ParseFromArray(buf, len)); #end +#define map_t unordered_map typedef pair<int, int> pii; -typedef map<int, int> mii; +typedef map_t<int, int> mii; // Configuration. st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno; + stop_on_seqno, batch_size; size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal; + debug_threads, multirecover, disk, debug_memory, use_wal, + suppress_txn_msgs; long long timelim, read_thresh, write_thresh; // Control. @@ -56,6 +66,15 @@ int updates; /** + * Convenience function for calculating percentages. + */ +double +pct(double sub, double tot) +{ + return 100 * sub / tot; +} + +/** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. */ @@ -223,6 +242,134 @@ }; /** + * XXX + */ +template<typename T> +void +bcastmsg_fake(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + int dstno = 0; + foreach (st_netfd_t dst, dsts) { + size_t resid = sizeof len; +#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) + int res = true ? 0 : st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + if (res == -1 && errno == ETIME) { + checksize(st_write(dst, + reinterpret_cast<char*>(&len) + sizeof len - resid, + resid, + ST_UTIME_NO_TIMEOUT), + resid); + } else { + check0x(res); + } + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + if (false) + checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + ++dstno; + } +} + +st_channel<shared_ptr<string> > msgs; + +const vector<st_netfd_t> *gdsts; + +/** + * XXX + */ +void +bcaster() +{ + int counter = 0; + while (!kill_hub) { + shared_ptr<string> p; + { + st_intr intr(kill_hub); + p = msgs.take(); + } + if (p.get() == nullptr) break; + string &s = *p.get(); + + int dstno = 0; + foreach (st_netfd_t dst, *gdsts) { + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + + checksize(st_write(dst, s.data(), s.size(), ST_UTIME_NO_TIMEOUT), + s.size()); + + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() + << ": write #" << counter + << " of size " << s.size() + << " bytes to dst #" << dstno + << " took " << write_time << " ms" << endl; + } + } + ++dstno; + } + ++counter; + } +} + +/** + * XXX + */ +template<typename T> +void +bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) +{ + gdsts = &dsts; + + // Serialize message to a buffer. + uint32_t len; + shared_ptr<string> p(new string(sizeof len, '\0')); + string &s = *p.get(); + check(msg.AppendToString(&s)); + + if (s.size() > 1000000) + cout << "sending large message to " << dsts.size() << " dsts, size = " + << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + for (size_t i = 0; i < sizeof len; ++i) + s[i] = plen[i]; + + msgs.push(p); +} + + +/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -247,7 +394,7 @@ size_t resid = sizeof len; #define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write; + long long before_write = -1; if (write_thresh > 0) { before_write = current_time_millis(); } @@ -270,7 +417,7 @@ } checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), s.size()); - dstno++; + ++dstno; } } @@ -324,12 +471,6 @@ *start_time = current_time_millis(); len = ntohl(len); -#define GETMSG(buf) \ - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ - if (stop_time != nullptr) \ - *stop_time = current_time_millis(); \ - check(msg.ParseFromArray(buf, len)); - // Parse the message body. if (len < 4096) { char buf[len]; @@ -383,7 +524,7 @@ }; // Globals -map<int, int> g_map; +mii g_map; wal *g_wal; /** @@ -393,10 +534,15 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { +#define bcastmsg bcastmsg_async Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); +#if bcastmsg == bcastmsg_async + st_joining join_bcaster(my_spawn(bcaster, "bcaster")); +#endif + finally f(lambda () { showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); @@ -408,9 +554,9 @@ // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcastmsg(fds, Txn()); + bcastmsg(fds, TxnBatch()); } else { - sendmsg(fds[0], Txn()); + sendmsg(fds[0], TxnBatch()); } } // Bring in any new members. @@ -418,58 +564,77 @@ fds.push_back(newreps.take().fd()); } - // Generate a random transaction. - Txn txn; - txn.set_seqno(seqno); - int count = randint(min_ops, max_ops + 1); - for (int o = 0; o < count; o++) { - Op *op = txn.add_op(); - int rtype = general_txns ? randint(3) : 1, rkey = randint(), rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } + // Generate some random transactions. + TxnBatch batch; + for (int t = 0; t < batch_size; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(seqno); + int count = randint(min_ops, max_ops + 1); + for (int o = 0; o < count; ++o) { + Op *op = txn.add_op(); + int rtype = general_txns ? randint(3) : 1, + rkey = randint(), + rvalue = randint(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } - if (do_pause) do_pause.waitreset(); + // Process immediately if not bcasting. + if (fds.empty()) { + --seqno; + process_txn(nullptr, g_map, txn, seqno, true); + } + ++seqno; - // Process, or broadcast and increment seqno. - if (fds.empty()) { - int dummy_seqno = seqno - 1; - process_txn(nullptr, g_map, txn, dummy_seqno, true); - } else { - bcastmsg(fds, txn); - } + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "issued txn " << txn.seqno() << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + st_sleep(0); + } - // Checkpoint. - if (txn.seqno() % chkpt == 0) { - if (verbose) - cout << "issued txn " << txn.seqno() << endl; - if (timelim > 0 && current_time_millis() - start_time > timelim) { - cout << "time's up; issued " << txn.seqno() << " txns in " << timelim - << " ms" << endl; + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + } + + // Are we to accept a new joiner? + if (txn.seqno() == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (txn.seqno() == stop_on_seqno) { + cout << "stopping on issue of seqno " << txn.seqno() << endl; stop_hub.set(); + break; } - st_sleep(0); } - if (issuing_interval > 0) { - st_sleep(issuing_interval); - } - if (txn.seqno() == accept_joiner_seqno) { - accept_joiner.set(); - } + // Broadcast. + if (!fds.empty() && !suppress_txn_msgs) + bcastmsg(fds, batch); - if (txn.seqno() == stop_on_seqno) { - cout << "stopping on issue of seqno " << txn.seqno() << endl; - stop_hub.set(); - } - - ++seqno; + // Pause? + if (do_pause) + do_pause.waitreset(); } - Txn txn; + // This means "The End." + TxnBatch batch; + Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcastmsg(fds, txn); + bcastmsg(fds, batch); +#if bcastmsg == bcastmsg_any + msgs.push(shared_ptr<string>()); +#endif +#undef bcastmsg } /** @@ -477,7 +642,7 @@ * leader. */ void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, +process_txn(st_netfd_t leader, mii &map, const Txn &txn, int &seqno, bool caught_up) { wal &wal = *g_wal; @@ -486,14 +651,14 @@ res.set_seqno(txn.seqno()); res.set_caught_up(caught_up); seqno = txn.seqno(); - for (int o = 0; o < txn.op_size(); o++) { + for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); const int key = op.key(); - ::map<int, int>::iterator it = map.find(key); + mii::iterator it = map.find(key); if (show_updates || count_updates) { if (it != map.end()) { if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) updates++; + if (count_updates) ++updates; } } switch (op.type()) { @@ -581,7 +746,7 @@ * \param[in] wal The WAL. */ void -process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, +process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, st_channel<shared_ptr<Txn> > &backlog, int init_seqno, int mypos, int nnodes) @@ -600,8 +765,7 @@ showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << - endl; + cout << "live-processing: never entered this phase (never caught up)" << endl; } else { showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); @@ -609,188 +773,229 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - while (true) { - Txn txn; - long long before_read; - if (read_thresh > 0) { - before_read = current_time_millis(); - } - { - st_intr intr(stop_hub); - readmsg(leader, txn); - } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; + class break_exception : public std::exception {}; + + try { + while (true) { + TxnBatch batch; + long long before_read = -1; + if (read_thresh > 0) { + before_read = current_time_millis(); } - } - if (txn.has_seqno()) { - // Regular transaction. - const char *action; - if (txn.seqno() < 0) { - break; - } else if (txn.seqno() == seqno + 1) { - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; + { + st_intr intr(stop_hub); + readmsg(leader, batch); + } + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; } - process_txn(leader, map, txn, seqno, true); - action = "processed"; - } else { - if (first_seqno == -1) - first_seqno = txn.seqno(); - // Queue up for later processing once a snapshot has been received. - backlog.push(shared_ptr<Txn>(new Txn(txn))); - action = "backlogged"; } + if (batch.txn_size() > 0) { + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + // Regular transaction. + const char *action; + if (txn.seqno() < 0) { + throw break_exception(); + } else if (txn.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + process_txn(leader, map, txn, seqno, true); + action = "processed"; + } else { + if (first_seqno == -1) + first_seqno = txn.seqno(); + // Queue up for later processing once a snapshot has been received. + backlog.push(shared_ptr<Txn>(new Txn(txn))); + action = "backlogged"; + } - if (txn.seqno() % chkpt == 0) { - if (verbose) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + if (txn.seqno() % chkpt == 0) { + if (verbose) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + st_sleep(0); + } } - st_sleep(0); + } else { + // Empty (default) Txn means "generate a snapshot." + // TODO make this faster + shared_ptr<Recovery> recovery(new Recovery); + typedef ::map<int, int> mii_; + mii_ map_(map.begin(), map.end()); + mii_::const_iterator begin = + map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); + mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? + map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); + cout << "generating recovery over " << begin->first << ".." + << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); + if (multirecover) + cout << " (node " << mypos << " of " << nnodes << ")"; + cout << endl; + long long start_snap = current_time_millis(); + foreach (const pii &p, make_iterator_range(begin, end)) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + cout << "generating recovery took " + << current_time_millis() - start_snap << " ms" << endl; + recovery->set_seqno(seqno); + send_states.push(recovery); } - } else { - // Empty (default) Txn means "generate a snapshot." - shared_ptr<Recovery> recovery(new Recovery); - mii::const_iterator begin = - map.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); - mii::const_iterator end = multirecover && mypos < nnodes - 1 ? - map.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map.end(); - cout << "generating recovery over " << begin->first << ".." - << (end == map.end() ? "end" : lexical_cast<string>(end->first)); - if (multirecover) - cout << " (node " << mypos << " of " << nnodes << ")"; - cout << endl; - long long start_snap = current_time_millis(); - foreach (const pii &p, make_iterator_range(begin, end)) { - Recovery_Pair *pair = recovery->add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - cout << "generating recovery took " - << current_time_millis() - start_snap << " ms" << endl; - recovery->set_seqno(seqno); - send_states.push(recovery); } + } catch (break_exception &ex) { } } -/** - * Swallow replica responses. - */ -void -handle_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) +class response_handler { - st_channel<long long> &sub = recover_signals.subscribe(); - long long start_time = current_time_millis(), - recovery_start_time = caught_up ? -1 : start_time, - recovery_end_time = -1; - int recovery_start_seqno = caught_up ? -1 : seqno, - recovery_end_seqno = -1; - int last_seqno = -1; - finally f(lambda () { - long long end_time = current_time_millis(); - if (__ref(recovery_end_time) > -1) { - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); - } - }); - while (true) { +public: + response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + //start_time = current_time_millis(); + //recovery_start_time = caught_up ? -1 : start_time; + //recovery_end_time = -1; + //recovery_start_seqno = caught_up ? -1 : seqno; + //recovery_end_seqno = -1; + //last_seqno = -1; + finally f(lambda () { - // TODO: convert the whole thing to an object so that we can have "scoped - // globals". - long long &recovery_start_time = __ref(recovery_start_time); - int &recovery_start_seqno = __ref(recovery_start_seqno); - long long &recovery_end_time = __ref(recovery_end_time); - int &recovery_end_seqno = __ref(recovery_end_seqno); - long long &start_time = __ref(start_time); - const int &seqno = __ref(seqno); - int &rid = __ref(rid); - st_channel<long long> &sub = __ref(sub); - // The first timestamp that comes down the subscription pipeline is the - // recovery start time, issued by the main thread. The second one is the - // recovery end time, issued by the response handler associated with the - // joiner. - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = seqno; - cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); + long long end_time = current_time_millis(); + if (__ref(recovery_end_time) > -1) { + cout << __ref(rid) << ": "; + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); } }); - Response res; - // Read the message, but correctly respond to interrupts so that we can - // cleanly exit (slightly tricky). - if (last_seqno + 1 == seqno) { - // Stop-interruptible in case we're already caught up. - try { - st_intr intr(stop_hub); + + while (true) { + finally f(boost::bind(&response_handler::loop_cleanup, this)); + + Response res; + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + readmsg(replica, res); + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). + st_intr intr(kill_hub); readmsg(replica, res); - } catch (...) { // TODO: only catch interruptions - // This check on seqnos is OK for termination since the seqno will - // never grow again if stop_hub is set. - if (last_seqno + 1 == seqno) { + } + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + if (!caught_up && res.caught_up()) { + long long now = current_time_millis(), timediff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": "; + cout << "recovering node caught up; took " + << timediff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + if (res.seqno() % chkpt == 0) { + if (verbose) { cout << rid << ": "; - cout << "clean stop; next expected seqno is " << seqno - << " (last seqno was " << last_seqno << ")" << endl; - break; - } else { - continue; + cout << "got response " << res.seqno() << " from " << replica << endl; } + st_sleep(0); } - } else { - // Only kill-interruptible because we want a clean termination (want - // to get all the acks back). - st_intr intr(kill_hub); - readmsg(replica, res); + last_seqno = res.seqno(); } - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - if (!caught_up && res.caught_up()) { - long long now = current_time_millis(), timediff = now - start_time; - caught_up = true; - recover_signals.push(now); + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; cout << rid << ": "; - cout << "recovering node caught up; took " - << timediff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + cout << rid << ": "; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); } - if (res.seqno() % chkpt == 0) { - if (verbose) { - cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; - } - st_sleep(0); - } - last_seqno = res.seqno(); } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +/** + * Swallow replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); } /** @@ -807,7 +1012,7 @@ * from process_txns. */ void -recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, +recover_joiner(st_netfd_t listener, const mii &map, const int &seqno, st_channel<shared_ptr<Recovery> > &send_states) { st_netfd_t joiner; @@ -858,7 +1063,7 @@ vector<replica_info> replicas; st_closing_all_infos close_replicas(replicas); cout << "waiting for at least " << minreps << " replicas to join" << endl; - for (int i = 0; i < minreps; i++) { + for (int i = 0; i < minreps; ++i) { st_netfd_t fd; { st_intr intr(stop_hub); @@ -943,13 +1148,13 @@ { if (disk) { // Disk IO threads. - for (int i = 0; i < 5; i++) { - thread somethread(threadfunc); + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); } } // Initialize database state. - map<int, int> map; + mii map; int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; @@ -994,7 +1199,7 @@ vector<st_netfd_t> replicas; st_closing_all close_replicas(replicas); int mypos = -1; - for (int i = 0; i < init.node_size(); i++) { + for (int i = 0; i < init.node_size(); ++i) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; in_addr host = { sa.host() }; @@ -1030,7 +1235,7 @@ vector<st_thread_t> recovery_builders; assert(seqno == -1); - for (int i = 0; i < (multirecover ? init.node_size() : 1); i++) { + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message. Recovery recovery; @@ -1046,7 +1251,7 @@ << build_start - __ref(before_recv) << " ms: xfer took " << receive_end - receive_start << " ms, deserialization took " << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); i++) { + for (int i = 0; i < recovery.pair_size(); ++i) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); if (i % chkpt == 0) { @@ -1183,6 +1388,8 @@ ("dump,D", po::bool_switch(&dump), "replicas should finally dump their state to a tmp file for " "inspection/diffing") + ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), + "suppress txn msgs") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") ("count-updates,u",po::bool_switch(&count_updates), @@ -1195,7 +1402,9 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader only)") - ("exit-on-seqno,X", po::value<int>(&stop_on_seqno)->default_value(-1), + ("batch-size,b", po::value<int>(&batch_size)->default_value(10), + "number of txns to batch up in each msg (for leader only)") + ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader only)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), @@ -1302,16 +1511,15 @@ foreach (entry p, threadtimes) { total += p.second; } - cout << "total " << total << " all " << all << endl; foreach (entry p, threadtimes) { cout << "- " << threadname(p.first) << ": " << p.second << " ms (" - << (static_cast<double>(p.second) / total) << "% of total, " - << (static_cast<double>(p.second) / all) << "% of all)" << endl; + << pct(p.second, total) << "% of total, " + << pct(p.second, all) << "% of all)" << endl; } - cout << "- total: " << total << " ms (" << double(total) / all + cout << "- total: " << total << " ms (" << pct(total, all) << "% of all)" << endl; cout << "- unaccounted: " << all - total << " ms (" - << double(all - total) / all << "% of all)" << endl; + << pct(all - total, all) << "% of all)" << endl; cout << "- all: " << all << " ms" << endl; } }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-11 19:26:27
|
Revision: 1173 http://assorted.svn.sourceforge.net/assorted/?rev=1173&view=rev Author: yangzhang Date: 2009-02-11 19:26:20 +0000 (Wed, 11 Feb 2009) Log Message: ----------- - removed inline warnings due to problems in boost - include serpref in make clean - added "distclean" Modified Paths: -------------- ydb/trunk/src/Makefile Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-06 06:16:46 UTC (rev 1172) +++ ydb/trunk/src/Makefile 2009-02-11 19:26:20 UTC (rev 1173) @@ -44,7 +44,7 @@ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Winline -Wsynth $(CXXFLAGS) + -Wno-inline -Wsynth $(CXXFLAGS) PBCXXFLAGS := $(OPT) -Wall -Werror $(GPROF) all: $(TARGET) @@ -74,8 +74,11 @@ clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ clean: - rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h + rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h serperf +distclean: clean + rm -f all.h all.h.gch + doc: $(SRCS) $(HDRS) doxygen @@ -88,4 +91,4 @@ serperf: serperf.o ydb.o $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ -# serperf.cc ydb.pb.h \ No newline at end of file +# serperf.cc ydb.pb.h This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 06:16:50
|
Revision: 1172 http://assorted.svn.sourceforge.net/assorted/?rev=1172&view=rev Author: yangzhang Date: 2009-02-06 06:16:46 +0000 (Fri, 06 Feb 2009) Log Message: ----------- oops...there are 1000ms in a s, duh Modified Paths: -------------- ydb/trunk/src/serperf.cc Modified: ydb/trunk/src/serperf.cc =================================================================== --- ydb/trunk/src/serperf.cc 2009-02-06 05:00:42 UTC (rev 1171) +++ ydb/trunk/src/serperf.cc 2009-02-06 06:16:46 UTC (rev 1172) @@ -30,7 +30,7 @@ batch.SerializeToOstream(&ss); } long long time = current_time_millis() - start; - double tps = 100 * static_cast<double>(count * batchsize) / time; + double tps = 1000 * static_cast<double>(count * batchsize) / time; cout << "protobuf: " << time << " ms, " << tps << " tps" << endl; } @@ -51,7 +51,7 @@ } } long long time = current_time_millis() - start; - double tps = 100 * static_cast<double>(count * batchsize) / time; + double tps = 1000 * static_cast<double>(count * batchsize) / time; cout << "boost: " << time << " ms, " << tps << " tps" << endl; } @@ -72,7 +72,7 @@ } } long long time = current_time_millis() - start; - double tps = 100 * static_cast<double>(count * batchsize) / time; + double tps = 1000 * static_cast<double>(count * batchsize) / time; cout << "streambuf.sputn: " << time << " ms, " << tps << " tps" << endl; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 05:00:49
|
Revision: 1171 http://assorted.svn.sourceforge.net/assorted/?rev=1171&view=rev Author: yangzhang Date: 2009-02-06 05:00:42 +0000 (Fri, 06 Feb 2009) Log Message: ----------- rewrote everything to increase flexibility and reuse Modified Paths: -------------- serialization-bench/trunk/src/Makefile serialization-bench/trunk/src/main.cc Modified: serialization-bench/trunk/src/Makefile =================================================================== --- serialization-bench/trunk/src/Makefile 2009-02-06 00:41:10 UTC (rev 1170) +++ serialization-bench/trunk/src/Makefile 2009-02-06 05:00:42 UTC (rev 1171) @@ -1,7 +1,10 @@ +WTF := wtf +CXX := $(WTF) $(CXX) + all: main main: main.cc test.pb.h test.pb.cc - g++ -O3 -I. -g3 -Wall -o main main.cc test.pb.cc \ + $(CXX) -O3 -I. -g3 -Wall -o main main.cc test.pb.cc \ -lboost_serialization-gcc43-mt -lprotobuf test.pb.cc test.pb.h: test.proto Modified: serialization-bench/trunk/src/main.cc =================================================================== --- serialization-bench/trunk/src/main.cc 2009-02-06 00:41:10 UTC (rev 1170) +++ serialization-bench/trunk/src/main.cc 2009-02-06 05:00:42 UTC (rev 1171) @@ -5,80 +5,152 @@ #include <commons/time.h> #include <fstream> #include <iostream> +#include <sstream> #include <sys/types.h> #include <sys/stat.h> +#include <typeinfo> #include <unistd.h> using namespace boost::archive; using namespace commons; using namespace std; -int main() { - int count = 1e6, reps = 3; - cout << "TIMES" << endl; - for (int r = 0; r < reps; r++) { - ofstream o("raw.out", ios::out); +int cnt = 1e6, reps = 3; + +struct exp { + void time() { + build(); long long start = current_time_millis(); - for (int i = 0; i < count; i++) { + run(); + long long end = current_time_millis(); + cout << name << ": " << end - start << " ms, " << size() << " b" << endl; + } + virtual void build() {} + virtual void run() = 0; + virtual size_t size() { return 0; } // XXX + exp &named(const string &n) { this->name = n; return *this; } + string name; +}; + +struct streambuf_exp : virtual exp { + stringbuf b; +}; + +struct ostream_exp : virtual exp { + stringstream o; +}; + +struct ostream_write : virtual ostream_exp { + void run() { + for (int i = 0; i < cnt; ++i) { o.write(reinterpret_cast<char*>(&i), sizeof i); } - long long end = current_time_millis(); - cout << "raw: " << end - start << endl; } - for (int r = 0; r < reps; r++) { - ofstream o("htonl.out", ios::out); - long long start = current_time_millis(); - for (int i = 0; i < count; i++) { - int n = htonl(i); - o.write(reinterpret_cast<char*>(&n), sizeof i); +}; + +struct streambuf_sputn : virtual streambuf_exp { + void run() { + for (int i = 0; i < cnt; ++i) { + b.sputn(reinterpret_cast<char*>(&i), sizeof i); } - long long end = current_time_millis(); - cout << "htonl: " << end - start << endl; } - for (int r = 0; r < reps; r++) { - ofstream o("fstream.out", ios::out); - long long start = current_time_millis(); - for (int i = 0; i < count; i++) { +}; + +struct streambuf_sputn_htonl : virtual streambuf_exp { + void run() { + for (int i = 0; i < cnt; ++i) { int n = htonl(i); - o << n << " "; + b.sputn(reinterpret_cast<char*>(&n), sizeof i); } - long long end = current_time_millis(); - cout << "fstream: " << end - start << endl; } - for (int r = 0; r < reps; r++) { - filebuf b; - b.open("boost.out", ios::out); - binary_oarchive o(b); - long long start = current_time_millis(); - for (int i = 0; i < count; i++) { - o & i; +}; + +struct ostream_fmt : virtual ostream_exp { + void run() { + for (int i = 0; i < cnt; ++i) { + o << i << ' '; } - long long end = current_time_millis(); - cout << "boost: " << end - start << endl; } - for (int r = 0; r < reps; r++) { - ofstream o("protobuf.out", ios::out); - varray a; - long long start = current_time_millis(); - for (int i = 0; i < count; i++) { - a.add_v(i); +}; + +struct streambuf_boost : virtual streambuf_exp { + binary_oarchive a; + streambuf_boost() : a(b) {} + void run() { + for (int i = 0; i < cnt; ++i) { + a << i; } - a.SerializeToOstream(&o); - long long end = current_time_millis(); - cout << "protobuf: " << end - start << endl; } - for (int r = 0; r < reps; r++) { - ofstream o("manyprotos.out", ios::out); - long long start = current_time_millis(); - for (int i = 0; i < count; i++) { - varray a; +}; + +struct protobuf : virtual exp { + varray a; + void build() { + for (int i = 0; i < cnt; ++i) { a.add_v(i); - a.SerializeToOstream(&o); } - long long end = current_time_millis(); - cout << "many protobufs: " << end - start << endl; } +}; +struct ostream_protobuf : virtual ostream_exp, virtual protobuf { + void run() { a.SerializeToOstream(&o); } +}; + +struct protobuf_string : virtual protobuf { + string s; + void run() { a.SerializeToString(&s); } +}; + +struct protobuf_array : virtual protobuf { + char *s; + protobuf_array() : s(new char[2 * cnt * sizeof(int)]) {} + void run() { a.SerializeToArray(s, 2 * cnt * sizeof(int)); } +}; + +struct protobufs_string : virtual exp { + vector<varray> as; + string s; + protobufs_string() : as(cnt) {} + void build() { + for (int i = 0; i < cnt; ++i) { + as[i].add_v(i); + } + } + void run() { + for (int i = 0; i < cnt; ++i) { + as[i].SerializeToString(&s); + } + } +}; + +struct raw_array : virtual exp { + char *a; + raw_array() : a(new char[cnt * sizeof(int)]) {} + void run() { + int *p = (int*) a; + for (int i = 0; i < cnt; ++i) { + p[i] = i; + } + } +}; + +#define rep(x) \ + for (int r = 0; r < reps; ++r) \ + x().named(#x).time(); + +int main() { + rep(ostream_write); + rep(streambuf_sputn); + rep(streambuf_sputn_htonl); + rep(ostream_fmt); + rep(streambuf_boost); + rep(ostream_protobuf); + rep(protobuf_string); + rep(protobuf_array); + rep(protobufs_string); + rep(raw_array); + +#if 0 cout << "SIZES" << endl; vector<string> paths; paths.push_back("raw.out"); @@ -92,5 +164,7 @@ stat(paths[i].c_str(), &s); cout << paths[i] << ": " << s.st_size << endl; } +#endif + return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 00:41:19
|
Revision: 1170 http://assorted.svn.sourceforge.net/assorted/?rev=1170&view=rev Author: yangzhang Date: 2009-02-06 00:41:10 +0000 (Fri, 06 Feb 2009) Log Message: ----------- hadn't committed in a while; added various projects; added rss feed link Modified Paths: -------------- assorted-site/trunk/index.txt Modified: assorted-site/trunk/index.txt =================================================================== --- assorted-site/trunk/index.txt 2009-02-06 00:40:21 UTC (rev 1169) +++ assorted-site/trunk/index.txt 2009-02-06 00:41:10 UTC (rev 1170) @@ -30,6 +30,9 @@ - JFX Table: an editable table (spreadsheet) widget in [JavaFX] (done) - LZXGrid: an editable table (spreadsheet) widget in [OpenLaszlo] (done) - System utilities + - [GDB Visual Stack Debugger](gdb-stack-viz): a [Python-GDB] extension that + helps visualize the stack; originally designed to aid instruction on stack + smashing (active) - [UDP Prober](udp-prober): small program that logs the RTTs of periodic UDP pings, and an exercise in using [`boost::asio`] (hiatus) - Throttled Repeater: small program that sends a fixed number of lines at a @@ -49,8 +52,9 @@ - [Simple Preprocessor](simple-preprocessor): tiny Scala implementation of the C preprocessor's _object-like macros_ (done) - Tools for various websites/web applications - - [Facebook](facebook-tools): monitor changes in your [Facebook] network - (done) + - [Facebook Tools](facebook-tools): scrape a photo album, utilities for + pyfacebook, and monitor adds/removes in your friend list (done) + - [Google Tools](google-tools): Google Reader archiver (active) - Myspace: crawl [MySpace] profiles within $n$ degrees of you for fast searches (done) - O'Reilly Safari: cache text from the [O'Reilly Safari] online bookshelf for @@ -76,6 +80,11 @@ - Exploration, experimentation, research - [YDB](ydb): simple memory store that serves as a research testbed for approaches to recovery in [VOLTDB] (H-Store) (active) + - [C++ Serialization Benchmark](serialization-bench): simple comparison of the + speed and verbosity of various readily-available approaches/libraries for + serializing lots of small ints (active) + - [C++ Containers Benchmark](container-bench): simple analysis of performance + behavior of various readily-available associative containers (active) - TCQ Wavelets: wavelet domain stream query processing for the Data Triage project in TelegraphCQ (done) - [Hash distribution](hash-dist): for observing the distribution of hash @@ -88,6 +97,8 @@ - Sandbox: heap of small test cases to explore (mostly programming language details, bugs, corner cases, features, etc.) (passive) - Miscellaneous + - [Music Labeler](music-labeler): a slick GUI for quickly + labeling/categorizing music - [Mailing List Filter](mailing-list-filter): deal with high-volume mailing lists by filtering your mailbox for threads in which you were a participant (done) @@ -146,17 +157,21 @@ - active: development is happening at a faster pace - abandoned: incomplete; no plans to pick it up again - hiatus: incomplete; plan to resume development +- obsolete: done, but no longer needed due to better alternative approaches Project pages: - [SourceForge Project Page]: view summary, [browse the repository] - [Google Code Page]: download file releases, report bugs/request features - [Google Groups Page]: discussions and support +- [RSS Feed]: monitor this page for changes (by [page2rss.com]) [SourceForge Project Page]: http://sf.net/projects/assorted/ [Google Code Page]: http://code.google.com/p/assorted/ [Google Groups Page]: http://groups.google.com/group/assorted-projects/ [browse the repository]: http://assorted.svn.sourceforge.net/viewvc/assorted/ +[RSS Feed]: http://page2rss.com/rss/e7243f1dd131a476baaf0e346528d962 +[page2rss.com]: http://page2rss.com/ Copyright 2008 [Yang Zhang]. All rights reserved. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 00:40:28
|
Revision: 1169 http://assorted.svn.sourceforge.net/assorted/?rev=1169&view=rev Author: yangzhang Date: 2009-02-06 00:40:21 +0000 (Fri, 06 Feb 2009) Log Message: ----------- added to docstring an example of group_as_subseqs Modified Paths: -------------- python-commons/trunk/src/commons/seqs.py Modified: python-commons/trunk/src/commons/seqs.py =================================================================== --- python-commons/trunk/src/commons/seqs.py 2009-02-06 00:39:52 UTC (rev 1168) +++ python-commons/trunk/src/commons/seqs.py 2009-02-06 00:40:21 UTC (rev 1169) @@ -404,6 +404,9 @@ """ Takes a sequence and breaks it up into multiple subsequences, which are groups keyed on L{key}. + + >>> map(lis, group_as_subseqs(range(10), lambda x: x/3)) + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] """ xs = iter(xs) while True: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 00:40:04
|
Revision: 1168 http://assorted.svn.sourceforge.net/assorted/?rev=1168&view=rev Author: yangzhang Date: 2009-02-06 00:39:52 +0000 (Fri, 06 Feb 2009) Log Message: ----------- - use shared ptrs in st_multichannel - added overload randint(min,max) - tweaked README Modified Paths: -------------- cpp-commons/trunk/README cpp-commons/trunk/src/commons/rand.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2009-02-06 00:36:31 UTC (rev 1167) +++ cpp-commons/trunk/README 2009-02-06 00:39:52 UTC (rev 1168) @@ -11,7 +11,7 @@ Here are some of the features present in the library: - C functions for string manipulation -- RAII utilities, such as for closing file descriptors +- RAII utilities, such as for closing file descriptors and `finally` objects - `array`: thin wrapper around arrays (`scoped_array` + size) - `pool`: fixed-size object pools - bit manipulation Modified: cpp-commons/trunk/src/commons/rand.h =================================================================== --- cpp-commons/trunk/src/commons/rand.h 2009-02-06 00:36:31 UTC (rev 1167) +++ cpp-commons/trunk/src/commons/rand.h 2009-02-06 00:39:52 UTC (rev 1168) @@ -24,7 +24,7 @@ }; /** - * Generate a random int up to but excluding max. + * Generate a random int from 0 up to but excluding max. * \param[in] max Must be greater than one. */ inline int @@ -98,10 +98,20 @@ // // OK, this path isn't taking us anywhere. Let's go back to the original // formula. Sure, it exceeds max. But what if we just wrap it back around - // when it does? We can do so using %max. Success! + // when it does? We can do so using % max. Success! return static_cast<int>( random() / ( RAND_MAX / max ) % max ); } + + /** + * Use randint() to return a number in [min, max) if max > min + 1 (i.e. if + * there is more than one choice), otherwise return min. + */ + inline int + randint(int min, int max) + { + return max > min + 1 ? min + randint(max - min) : min; + } } #endif Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-06 00:36:31 UTC (rev 1167) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-06 00:39:52 UTC (rev 1168) @@ -189,6 +189,8 @@ bool b; }; + void toggle(st_bool& b) { if (b) b.reset(); else b.set(); } + /** * Wraps st_mutex_* errno-functions with exceptions and cleans up on * destruction. @@ -223,8 +225,9 @@ empty_.signal(); } T take() { - while (q_.empty()) + while (q_.empty()) { empty_.wait(); + } T x = front(); q_.pop(); return x; @@ -241,24 +244,23 @@ /** * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. - * \todo Use shared_ptr. */ template <typename T> class st_multichannel { public: void push(const T &x) { - foreach (st_channel<T> *q, qs) { + foreach (shared_ptr<st_channel<T> > q, qs) { q->push(x); } } st_channel<T> &subscribe() { - st_channel<T>* q = new st_channel<T>; + shared_ptr<st_channel<T> > q(new st_channel<T>); qs.push_back(q); return *q; } private: - vector<st_channel<T>*> qs; + vector<shared_ptr<st_channel<T> > > qs; }; /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 00:36:33
|
Revision: 1167 http://assorted.svn.sourceforge.net/assorted/?rev=1167&view=rev Author: yangzhang Date: 2009-02-06 00:36:31 +0000 (Fri, 06 Feb 2009) Log Message: ----------- - Added epperf benchmark because I couldn't trust ST - Made stperf a bit more flexible - Made epperf a better system - Changed the `run` analysis to produce a bar chart comparing multi and single side by side - Added aggregation filter specifiers to the regex group names - Rearranged test.bash to clean it up - Added notes and more TODOs to the README Modified Paths: -------------- ydb/trunk/README ydb/trunk/tools/analysis.py ydb/trunk/tools/stperf.cc ydb/trunk/tools/test.bash Added Paths: ----------- ydb/trunk/tools/epperf.cc ydb/trunk/tools/epperf.mk Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-06 00:32:30 UTC (rev 1166) +++ ydb/trunk/README 2009-02-06 00:36:31 UTC (rev 1167) @@ -255,12 +255,40 @@ - DONE parallel tcp benchmark - DONE simple wal - issues: + - associative containers: hash is strong contender, array is unbeatable, but + most are close enough + - interested to see how the cache-friendly btree programming competition + pans out + - serialization benchmark + - protobufs very cheap for large messages, and always terse + - may want to look into batching up msgs for faster ser/deser - multi vs single + - some microbenchmarks demonstrate that in fact there is no speedup from + parallel transfers; network is already saturated + - used: socat (mtcp), epperf, stperf + - show scaling results + - 7MB / 300ms = 2.3MB/ms = 23MB/s = 184Mb/s + - slower than expected + - share network with incoming & outgoing leader comm + - still need to understand why the build-up is larger for multi + - scalability graphs: bottleneck is...? + - 4 + 4 + 8 * 5 = 48 B per txn + - one: 25,000txn/s * 48B/txn * 8b/B / 1e6b/Mb = 9Mb/s + - two: 9Mb/s * 2 - WAL performs well - - what to do? limit parallelism? how? + - close to no replication in scalability graphs + - what to do? limit parallelism? how? + - include actual separate clients? -Period: 2/3- +Period: 2/5- +- TODO commit!!! +- TODO serialization bench (multiple layers, control batch sizes) +- TODO network throughput bench +- TODO associative container bench +- TODO combine the analyses of the above three; integrate with actual message + formats, etc. +- TODO batching, serialization, disk speed - TODO better wal - TODO better understand multihost recovery - TODO fix up analysis of multihost recovery Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-06 00:32:30 UTC (rev 1166) +++ ydb/trunk/tools/analysis.py 2009-02-06 00:36:31 UTC (rev 1167) @@ -5,11 +5,15 @@ from path import path from os.path import basename, realpath from pylab import * +from matplotlib.font_manager import FontProperties class struct(object): pass def getname(path): return basename(realpath(path)) +def mean(xs): return array(xs).mean() +def last(xs): return xs[-1] + def check(path): with file(path) as f: if 'got timeout' in f.read(): @@ -36,6 +40,15 @@ def logextract(path, indexkey, pats, xform = None): if xform is None: xform = lambda x: x check(path) + # Prepare the regex patterns. + filts = {} + def repl(m): + name, filt = m.group(1), m.group(2) + # Duplicate check. + assert name not in filts, 'Capture %r exists more than once.' % name + filts[name] = filt + return '(?P<%s>' % name + pats = [ re.sub(r'\(\?P<(\w+)\|(\w+)>', repl, pat) for pat in pats ] # Capture values from log using regex pats. def getcaps(): with file(path) as f: @@ -47,15 +60,23 @@ for i, pat in enumerate(pats): m = re.search(pat, line) if m: - for k in m.groupdict(): + gd = dict( (k, float(v)) for (k,v) in m.groupdict().iteritems() ) + for k, v in gd.iteritems(): if k in caps: - caps[k + '0'] = caps[k] - caps.update((k, float(v)) for k,v in m.groupdict().iteritems()) + if k in filts: + if type(caps[k]) != list: caps[k] = [caps[k]] + caps[k].append(v) + else: + caps[k + '0'] = caps[k] + else: + caps[k] = v sats[i] = True break if all(sats): sats = [ False for pat in pats ] -# print '!!!' + caps = dict( (k, eval(filts.get(k, 'lambda x: x'))(v)) + for k,v in caps.iteritems() ) + assert all( type(v) != list for v in caps.itervalues() ) yield xform(caps) caps = {} # Aggregate the captured values. @@ -109,6 +130,7 @@ def run(singlepath, multipath): singlepath, multipath = map(path, [singlepath, multipath]) + ress = [] for datpath, titlestr, name in [(singlepath, 'single recoverer', 'single'), (multipath, 'multi recoverer', 'multi')]: def xform(d): @@ -118,55 +140,68 @@ print 'file:', getname(datpath) res = logextract(datpath, 'seqno', [ r'=== seqno=(?P<seqno>\d+) ', - r'got recovery message of (?P<len>\d+) bytes in (?P<dump>\d+) ms: xfer took (?P<recv>\d+) ms, deserialization took (?P<deser>\d+)', - r'built up .* (?P<buildup>\d+) ms', + r'got recovery message of (?P<len|mean>\d+) bytes in (?P<dump|mean>\d+) ms: xfer took (?P<recv|mean>\d+) ms, deserialization took (?P<deser|sum>\d+)', + r'built up .* (?P<buildup|mean>\d+) ms', r'generating recovery took (?P<gen>\d+) ms', r'replayer caught up; from backlog replayed \d+ txns .* in (?P<catchup>\d+) ms', r'.*: recovering node caught up; took (?P<total>\d+) ?ms' ], xform ) + ress.append((datpath, titlestr, name, res)) + seqnos = ress[0][-1]['seqno'] + interval = float(seqnos[1] - seqnos[0]) + xmin, xmax = seqnos.min() - interval / 2, seqnos.max() + interval / 2 + gap = interval / 10 # (xmax - xmin) / len(seqnos) + width = (interval - gap) / len(ress) + step = 1. / len(seqnos) # For color. + + for pos, (datpath, titlestr, name, res) in enumerate(ress): # Colors and positioning - width = 5e4 - step = 1.0 / 5 - hues = ( colorsys.hls_to_rgb(step * i, .7, .5) for i in itertools.count() ) + hues = ( colorsys.hls_to_rgb(step * i, .7 - pos * .2, .5) for i in itertools.count() ) ehues = ( colorsys.hls_to_rgb(step * i, .3, .5) for i in itertools.count() ) - widths = ( 2 * width - 2 * width / 5 * i for i in itertools.count() ) - offsets = ( width - 2 * width / 5 * i for i in itertools.count() ) + widths = ( width for i in itertools.count() ) + offsets = ( pos * width for i in itertools.count() ) self = struct() self.bottom = 0 - clf() def mybar(yskey, eskey, label): - bar(res['seqno'] - offsets.next(), res[yskey], yerr = res[eskey], width = - widths.next(), color = hues.next(), edgecolor = (1,1,1), ecolor = - ehues.next(), label = label, bottom = self.bottom) + bar(res['seqno'] - (interval - gap) / 2 + offsets.next(), res[yskey], yerr + = res[eskey], width = widths.next(), color = hues.next(), edgecolor = + (1,1,1), ecolor = ehues.next(), label = name + ' ' + label, bottom = + self.bottom) self.bottom += res[yskey] - mybar('realdump mean', 'realdump sd', 'State dump etc.') - mybar('recv mean', 'recv sd', 'State receive') - mybar('deser mean', 'deser sd', 'State deserialization') - mybar('buildup mean', 'buildup sd', 'Build-up') - mybar('catchup mean', 'catchup sd', 'Catch-up') + mybar('realdump mean', 'realdump sd', 'state dump etc.') + mybar('recv mean', 'recv sd', 'state receive') + mybar('deser mean', 'deser sd', 'state deserialization') + mybar('buildup mean', 'buildup sd', 'build-up') + mybar('catchup mean', 'catchup sd', 'catch-up') - title('Recovery time of ' + titlestr + ' over data size') - xlabel('Transaction count (corresponds roughly to data size)') - ylabel('Mean time in ms (SD error bars)') - legend(loc = 'upper left') + title('Recovery time of ' + titlestr + ' over data size') + xlabel('Transaction count (corresponds roughly to data size)') + ylabel('Mean time in ms (SD error bars)') + legend(loc = 'upper left', prop = FontProperties(size = 'small')) - ax2 = twinx() - col = colorsys.hls_to_rgb(.6, .4, .4) - ax2.errorbar(res['seqno'], res['len mean'] / 1024, res['len sd'] / 1024, marker = 'o', - color = col) - ax2.set_ylabel('Size of serialized state (KB)', color = col) - ax2.set_ylim(ymin = 0) - for tl in ax2.get_yticklabels(): tl.set_color(col) - xlim(xmin = min(res['seqno']) - width, xmax = max(res['seqno']) + width) + ax2 = twinx() + colors = ( colorsys.hls_to_rgb(.6, .6 - i * .2, .4) for i, _ in enumerate(ress) ) + for _, _, name, res in ress: + ax2.errorbar(res['seqno'], res['len mean'] / 1024, res['len sd'] / 1024, + marker = 'o', color = colors.next(), + label = name + ' msg size') + ax2.set_ylabel('Size of serialized state (KB)') + ax2.set_ylim(ymin = 0) +# for tl in ax2.get_yticklabels(): tl.set_color(col) + xlim(xmin = xmin, xmax = xmax) + legend(loc = 'upper center', prop = FontProperties(size = 'small')) + if False: pngpath = datpath.realpath() + '.png' savefig(pngpath) symlink = path(name + '.png') if symlink.isfile(): symlink.remove() pngpath.symlink(symlink) + else: + savefig('run.png') def mtcp(datpath): def xform(d): @@ -182,6 +217,7 @@ title('Scaling of sending large message using socat (6888896 bytes)') xlabel('Number of parallel senders') ylabel('Speedup') + ylim(ymin = 0) savefig('mtcp.png') def stperf(datpath): @@ -198,6 +234,7 @@ title('Scaling of sending large message using ST (6888896 bytes)') xlabel('Number of parallel senders') ylabel('Speedup') + ylim(ymin = 0) savefig('stperf.png') def main(argv): Added: ydb/trunk/tools/epperf.cc =================================================================== --- ydb/trunk/tools/epperf.cc (rev 0) +++ ydb/trunk/tools/epperf.cc 2009-02-06 00:36:31 UTC (rev 1167) @@ -0,0 +1,188 @@ +#include <fcntl.h> +#include <stdio.h> +#include <sys/epoll.h> +#include <unistd.h> + +#include <iostream> +#include <cstdlib> + +#include <commons/check.h> +#include <commons/closing.h> +#include <commons/pool.h> +#include <commons/sockets.h> +#include <boost/scoped_array.hpp> + +using namespace boost; +using namespace commons; +using namespace std; + +enum { size = 100000000 }; +int n, expected; + +class echoer { + public: + echoer() : buf(new char[4096]) {} + + /** + * \return true iff we are not done with the reading/would've blocked + * (EAGAIN), false iff we've gotten the full 40-byte packet or have hit + * EOF/an error. + */ + bool consume() { + while (true) { + int bytes = ::read(fd_, buf.get(), 4096); + if (bytes == -1) { + // We're going to block. + if (errno == EAGAIN) { + return true; + } else { + perror("read"); + return false; + } + } + if (bytes == 0) { + return false; + } + ss_ << string(buf.get(), bytes); + //if (ss_.tellp() >= expected) + //return false; + } + } + + /** + * Read the contents of the buffer as a string. + */ + string read() { return ss_.str(); } + + /** + * The socket file descriptor we're currently associated with. + */ + int & fd() { return fd_; } + int fd() const { return fd_; } + + private: + stringstream ss_; + int fd_; + scoped_array<char> buf; // (new char[4096]); +}; + +int +main(int argc, char* argv[]) { + if (argc < 2) { return 1; } + n = atoi(argv[1]); + expected = size / n; + + // Create a non-blocking server socket. + int server = tcp_listen(9876, true); + + // Make sure the fd is finally closed. + closingfd closer(server); + + // Create our epoll file descriptor. max_events is the maximum number of + // events to process at a time (max number of events that we want a call to + // epoll_wait() to "return"), while max_echoers is the max number of + // connections to make. + const int max_events = 16, max_echoers = 100; + + // This file descriptor isn't actually bound to any socket; it's a special fd + // that is really just used for manipulating the epoll (e.g., registering + // more sockets/connections with it). TODO: Figure out the rationale behind + // why this thing is an fd. + int epoll_fd = checknneg(epoll_create(max_events)); + + // Add our server fd to the epoll event loop. The event specifies: + // + // - what fd is + // - what operations we're interested in (connections, hangups, errors) + // (TODO: what are hangups?) + // - arbitrary data to be associated with this fd, in the form of a pointer + // (ptr) or number (u32/u64); this is more useful for connection fd's, of + // which there are multiple, and so it helps to have a direct pointer to + // (say) that connection's handler. + // + // The add operation actually makes a copy of the given epoll_event, so + // that's why we can reuse this `event` later. + struct epoll_event event; + event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; + event.data.fd = server; + checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server, &event)); + + // Set up a bunch of echo server instances. + pool<echoer> echoers(max_echoers); + + // Execute the epoll event loop. + int ncons = 0; + while (ncons < n) { + struct epoll_event events[max_events]; + int num_fds = epoll_wait(epoll_fd, events, max_events, -1); + + for (int i = 0; i < num_fds; i++) { + // Case 1: Error condition. + if (events[i].events & (EPOLLHUP | EPOLLERR)) { + fputs("epoll: EPOLLERR", stderr); + // epoll will remove the fd from its set automatically when the fd is + // closed. + close(events[i].data.fd); + } else { + check(events[i].events & EPOLLIN); + + // Case 2: Our server is receiving a connection. + if (events[i].data.fd == server) { + struct sockaddr remote_addr; + socklen_t addr_size = sizeof remote_addr; + int connection = accept(server, &remote_addr, &addr_size); + if (connection == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + perror("accept"); + } + continue; + } + + // Make the connection non-blocking. + checknneg(fcntl(connection, F_SETFL, + O_NONBLOCK | fcntl(connection, F_GETFL, 0))); + + // Add the connection to our epoll loop. Note we are reusing our + // epoll_event. Now we're actually using the ptr field to point to a + // free handler. event.data is a union of {ptr, fd, ...}, so we can + // only use one of these. event.data is entirely for the user; epoll + // doesn't actually look at this. Note that we're passing the fd + // (connection) separately into epoll_ctl(). + echoer *e = echoers.take(); + cout << "got a connection! " << + echoers.size() << " echoers remaining" << endl; + event.data.ptr = e; + e->fd() = connection; + checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection, + &event)); + } + + // Case 3: One of our connections has read data. + else { + echoer &e = *((echoer*) events[i].data.ptr); + // If we have read the minimum amount (or encountered a dead-end + // situation), then echo the data back. + if (!e.consume()) { + cout << "done!" << endl; +// // Write back! +// string s = e.read(); +// check((size_t) checknneg(write(e.fd(), s.c_str(), s.size())) == s.size()); +// +// // epoll will remove the fd from its set automatically when the fd is +// // closed. + close(e.fd()); +// +// // Release the echoer. +// echoers.drop(&e); +// +// cout << "responded with '" << e.read() << "'; " << +// echoers.size() << " echoers remaining" << endl; + ++ncons; + } + } + } + } + } + + return 0; +} Added: ydb/trunk/tools/epperf.mk =================================================================== --- ydb/trunk/tools/epperf.mk (rev 0) +++ ydb/trunk/tools/epperf.mk 2009-02-06 00:36:31 UTC (rev 1167) @@ -0,0 +1,9 @@ +CXXFLAGS += -O3 -Wall +CXX := $(WTF) $(CXX) + +all: epperf + +clean: + rm -f epperf + +.PHONY: clean all Modified: ydb/trunk/tools/stperf.cc =================================================================== --- ydb/trunk/tools/stperf.cc 2009-02-06 00:32:30 UTC (rev 1166) +++ ydb/trunk/tools/stperf.cc 2009-02-06 00:36:31 UTC (rev 1167) @@ -12,8 +12,9 @@ using namespace commons; using namespace std; -enum { port = 9876, size = 100000000 }; // 100mb +enum { size = 100000000 }; // 100mb char *rbuf, *sbuf, *host; +short port; bool do_r, do_s; int n, my_i; @@ -28,6 +29,7 @@ if (do_r) { vector<st_thread_t> ts; st_netfd_t l = st_tcp_listen(port); + // XXX: bug: the ordering here (the value of i) should be specified by the connector. for (int i = 0; i < n; i++) { st_netfd_t c = st_accept(l, 0, 0, -1); ts.push_back(st_spawn(boost::bind(rr, i, c))); @@ -47,16 +49,18 @@ int main(int argc, char **argv) { host = strdup("localhost"); n = 1; + port = 9876; int opt; - while ((opt = getopt(argc, argv, "i:n:rs:")) != -1) { + while ((opt = getopt(argc, argv, "i:n:p:rs:")) != -1) { switch (opt) { case 'i': my_i = atoi(optarg); break; case 'n': n = atoi(optarg); break; + case 'p': port = atoi(optarg); break; case 'r': do_r = true; break; case 's': do_s = true; host = strdup(optarg); break; } } - cout << "n=" << n << " i=" << my_i + cout << "n=" << n << " i=" << my_i << " port=" << port << " start=" << bstart(my_i) << " end=" << bend(my_i) << endl; if (!(do_r || do_s)) do_r = do_s = true; rbuf = new char[size]; Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-06 00:32:30 UTC (rev 1166) +++ ydb/trunk/tools/test.bash 2009-02-06 00:36:31 UTC (rev 1167) @@ -7,6 +7,41 @@ script="$(basename "$0")" +# +# Configuration +# + +# Set up hosts. +confighosts() { + if [[ ! ${remote:-} ]] ; then + if [[ ! "${hosts:-}" && ! "${range:-}" ]] + then range='1 14'; echo "warning: running with farms 1..14" 1>&2 + fi + if [[ "${range:-}" ]] + then hosts="$( seq $range | sed 's/^/farm/' )" + fi + hosts="$( echo -n $hosts )" + fi +} + +# Set up logger. +configlogger() { + if [[ ! ${remote:-} ]] ; then + ( + flock -n /tmp/ydbtest.socket + ) > /tmp/y + fi +} + +# +# Utilities +# + +# Use mssh to log in with password as root to each machine. +mssh-root() { + mssh -l root "$@" +} + tagssh() { ssh "$@" 2>&1 | python -u -c ' import time, sys, socket @@ -28,6 +63,38 @@ fi } +remote() { + local host="$1" + shift + scp -q "$(dirname "$0")/$script" "$host:" + tagssh "$host" "remote=1 ./$script" "$@" +} + +parhosts() { + echo -n $hosts | xargs ${xargs--P9} -d' ' -I^ "$@" +} + +parssh() { + parhosts ssh ^ "set -o errexit -o nounset; $@" +} + +parscp() { + parhosts scp -q "$@" +} + +parremote() { + export hosts range + parhosts "./$script" remote ^ "$@" +} + +hostargs() { + "$@" $hosts +} + +# +# Setup +# + node-init-setup() { check-remote mkdir -p work @@ -130,30 +197,6 @@ make WTF= } -remote() { - local host="$1" - shift - scp -q "$(dirname "$0")/$script" "$host:" - tagssh "$host" "remote=1 ./$script" "$@" -} - -parhosts() { - echo -n $hosts | xargs ${xargs--P9} -d' ' -I^ "$@" -} - -parssh() { - parhosts ssh ^ "set -o errexit -o nounset; $@" -} - -parscp() { - parhosts scp -q "$@" -} - -parremote() { - export hosts range - parhosts "./$script" remote ^ "$@" -} - init-setup() { parremote node-init-setup } @@ -205,12 +248,21 @@ parssh make -C /tmp/ -f stperf.mk } +setup-epperf() { + parscp epperf.{cc,mk} ^:/tmp/ + parssh make -C /tmp/ -f epperf.mk +} + full-setup() { init-setup setup-deps setup-ydb } +# +# Status +# + hostinfos() { xargs= parssh " echo @@ -230,64 +282,28 @@ " } -hostargs() { - "$@" $hosts +times() { + parssh date +%s.%N } -scaling-helper() { - local leader=$1 - shift - tagssh $leader "ydb/src/ydb -l -n $# -X 100000 ${extraargs:-}" & - sleep .1 - for rep in "$@" - do tagssh $rep "ydb/src/ydb -n $# -H $leader ${extraargs:-}" & - done - wait -} +# +# Experiments involving ydb recovery (varying amount of data). +# -# This just tests how the system scales; no recovery involved. -scaling() { - hostargs scaling-helper -} - -# Repeat some experiment some number of trials and for some number of range -# configurations; e.g., "repeat scaling". -# TODO: fix this to work also with `hosts`; move into repeat-helper that's run -# via hostargs, and change the range= to hosts= -exp-scaling() { - local out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) - local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) - ln -sf $out scaling-log - for n in `seq $maxn -1 0` ; do # configurations - stop - for i in {1..3} ; do # trials - echo === n=$n i=$i === - echo === n=$n i=$i === > `tty` - scaling - sleep 1 - stop - sleep .1 - echo - done - hosts="${hosts% *}" - done >& $out - hosts="$orighosts" -} - rec-helper() { local leader=$1 shift : ${seqno:=100000} - tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & # -v --debug-threads - sleep .1 # pexpect 'waiting for at least' + tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & + sleep .1 # Run initial replicas. while (( $# > 1 )) ; do tagssh $1 "ydb/src/ydb -H $leader" & shift done - sleep .1 # pexpect 'got all \d+ replicas' leader + sleep .1 # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up ${extraargs:-}" & # -v --debug-threads -t 200000" & + tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up ${extraargs:-}" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -297,11 +313,8 @@ fi wait } +rec() { hostargs rec-helper ; } -rec() { - hostargs rec-helper -} - # Recovery experient. exp-rec() { for seqno in 500000 400000 300000 200000 100000 ; do # configurations @@ -332,62 +345,105 @@ extraargs="-m ${extraargs:-}" exp-rec >& $out } -# WAL. -aries() { - extraargs='--wal' scaling ${hosts:-} +stop-helper() { + tagssh $1 'pkill -sigint ydb' } -exp-aries() { - local out=aries-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out aries-log - for i in {1..3} ; do - echo === n=-1 i=$i === - echo === n=-1 i=$i === > `tty` - aries - echo - done >& $out +stop() { + hostargs stop-helper } -mtcp-helper() { - local leader=$1 n=$(( $# - 1 )) - tagssh $leader 'pkill socat' - shift - for i in `seq $n` ; do - tagssh $1 " - sleep .2 - ( time seq $((1000000/n)) | socat - TCP4:$leader:$((9876+i)) ) 2>&1 | - fgrep real" & - shift +kill-helper() { + for i in "$@" + do tagssh $i 'pkill ydb' done - tagssh $leader " - for i in \`seq $n\` ; do - socat TCP4-LISTEN:\$((9876+i)),reuseaddr - > /dev/null & - done - wait" - wait } -mtcp() { - hostargs mtcp-helper +kill() { + hostargs kill-helper } -exp-mtcp() { - local out=mtcp-log-$(date +%Y-%m-%d-%H:%M:%S-%N) +# +# Experiments varying number of nodes (no recovery). +# + +# Repeat some experiment some number of trials and for varying numbers of hosts. +exp-var() { + local name=$1 cmd=$1 stop=${2:-} + local out=$name-log-$(date +%Y-%m-%d-%H:%M:%S-%N) local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) - ln -sf $out mtcp-log - for n in `seq $maxn -1 1` ; do # configurations + ln -sf $out $name-log + for n in `seq $maxn -1 ${minn:-1}` ; do # configurations + $stop for i in {1..3} ; do # trials echo === n=$n i=$i === - echo === n=$n i=$i === > `tty` - mtcp + $cmd sleep 1 + if [[ $stop ]] + then $stop; sleep .1 + fi echo done hosts="${hosts% *}" - done >& $out + done 2>&1 | tee $out hosts="$orighosts" } +# ydb scalability test. +scaling-helper() { + local leader=$1 + shift + tagssh $leader "ydb/src/ydb -l -n $# -X 100000 ${extraargs:-}" & + sleep .1 + for rep in "$@" + do tagssh $rep "ydb/src/ydb -n $# -H $leader ${extraargs:-}" & + done + wait +} +scaling() { hostargs scaling-helper ; } +exp-scaling() { minn=0 exp-var scaling stop ; } + +# socat app +mtcp-helper() { + local leader=$1 n=$(( $# - 1 )) + tagssh $leader 'pkill socat || true' + shift + tagssh $leader " + for i in \`seq $n\` ; do + socat TCP4-LISTEN:\$((9876+i)),reuseaddr - > /dev/null & + done + wait" & + sleep .1 + { + time { + for i in `seq $n` ; do + tagssh $1 " + if false; then + time python -c ' +import socket, sys +sys.stdout.flush() +host, port, i, n = [sys.argv[1]] + map(int, sys.argv[2:]) +s = socket.socket() +s.connect((host, port)) +# For baseline benchmarking (how long it takes to generate the msg) +# sys.exit(1 == ord(chr(1)*(100000000)[-1])) +s.send(chr(1)*(100000000/n))' $leader $((9876+i)) $((i-1)) $n + elif true ; then + time dd bs=10000 if=/dev/zero count=$((100000000/10000/n)) | + socat - TCP4:$leader:$((9876+i)) + elif false ; then + time /tmp/stperf -s $leader -p $((9876+i)) -i $((i-1)) -n $n + fi" & + shift + done + wait + } + } 2>&1 | fgrep real +} +mtcp() { hostargs mtcp-helper ; } +exp-mtcp() { exp-var mtcp ; } + +# ST app stperf-helper() { local leader=$1 n=$(( $# - 1 )) shift @@ -402,78 +458,49 @@ done wait } +stperf() { hostargs stperf-helper ; } +exp-stperf() { exp-var stperf ; } -stperf() { - hostargs stperf-helper -} - -exp-stperf() { - local out=stperf-log-$(date +%Y-%m-%d-%H:%M:%S-%N) - local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) - ln -sf $out stperf-log - for n in `seq $maxn` ; do # configurations - for i in {1..3} ; do # trials - echo === n=$n i=$i === - echo === n=$n i=$i === > `tty` - stperf - sleep 1 - echo - done - hosts="${hosts% *}" - done >& $out - hosts="$orighosts" -} - -stop-helper() { - tagssh $1 'pkill -sigint ydb' -} - -stop() { - hostargs stop-helper -} - -kill-helper() { - for i in "$@" - do tagssh $i 'pkill ydb' +# epoll app +epperf-helper() { + local leader=$1 n=$(( $# - 1 )) + shift + parssh "pkill epperf || true" + tagssh $leader "/tmp/epperf $n > /dev/null" & + sleep .1 + for i in `seq $n` ; do + tagssh $1 " + ( time /tmp/stperf -s $leader -i $((i-1)) -n $n ) 2>&1 | + fgrep real" & + shift done + wait } +epperf() { hostargs epperf-helper ; } +exp-epperf() { exp-var epperf ; } -kill() { - hostargs kill-helper -} +# +# WAL experiments. +# -times() { - parssh date +%s.%N +aries() { + extraargs='--wal' scaling ${hosts:-} } -# Use mssh to log in with password as root to each machine. -mssh-root() { - mssh -l root "$@" +exp-aries() { + local out=aries-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out aries-log + for i in {1..3} ; do + echo === n=-1 i=$i === + echo === n=-1 i=$i === > `tty` + aries + echo + done >& $out } -# Set up hosts. -confighosts() { - if [[ ! ${remote:-} ]] ; then - if [[ ! "${hosts:-}" && ! "${range:-}" ]] - then range='1 14'; echo "warning: running with farms 1..14" 1>&2 - fi - if [[ "${range:-}" ]] - then hosts="$( seq $range | sed 's/^/farm/' )" - fi - hosts="$( echo -n $hosts )" - fi -} +# +# Main +# -# Set up logger. -configlogger() { - if [[ ! ${remote:-} ]] ; then - ( - flock -n /tmp/ydbtest.socket - ) > /tmp/y - fi -} - confighosts -#configlogger - eval "$@" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-06 00:32:37
|
Revision: 1166 http://assorted.svn.sourceforge.net/assorted/?rev=1166&view=rev Author: yangzhang Date: 2009-02-06 00:32:30 +0000 (Fri, 06 Feb 2009) Log Message: ----------- - cleaned up the makefile a bit - cleaned up and added some more info to the thread profiling - added TxnBatch - added serperf benchmark Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/src/serperf.cc Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-05 09:41:13 UTC (rev 1165) +++ ydb/trunk/src/Makefile 2009-02-06 00:32:30 UTC (rev 1166) @@ -25,28 +25,38 @@ ifneq ($(GCOV),) GCOV := -fprofile-arcs -ftest-coverage endif -LDFLAGS := -pthread -lstx -lst -lresolv -lprotobuf -lgtest \ +ifneq ($(PPROF),) + PPROF := -lprofiler +endif +ifneq ($(OPT),) + OPT := -O3 +else + OPT := -g3 +endif +CXX := $(WTF) $(CXX) +LDFLAGS := -pthread $(GPROF) +LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ - -lboost_serialization-gcc43-mt $(GPROF) + -lboost_serialization-gcc43-mt $(PPROF) # The -Wno- warnings are for boost. -CXXFLAGS := -g3 -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ +CXXFLAGS := $(OPT) -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Winline -Wsynth -PBCXXFLAGS := -g3 -Wall -Werror $(GPROF) + -Winline -Wsynth $(CXXFLAGS) +PBCXXFLAGS := $(OPT) -Wall -Werror $(GPROF) all: $(TARGET) $(TARGET): $(OBJS) - $(CXX) -o $@ $^ $(LDFLAGS) + $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ %.o: %.cc $(PBHDRS) - $(WTF) $(CXX) $(CXXFLAGS) -c -o $@ $< + $(COMPILE.cc) $(OUTPUT_OPTION) $< %.o: %.pb.cc %.pb.h - $(WTF) $(CXX) $(PBCXXFLAGS) -c -o $@ $< + $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< %.cc: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< @@ -72,3 +82,10 @@ .PHONY: clean .SECONDARY: $(SRCS) $(HDRS) $(OBJS) main.lzz + +### + +serperf: serperf.o ydb.o + $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ + +# serperf.cc ydb.pb.h \ No newline at end of file Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-05 09:41:13 UTC (rev 1165) +++ ydb/trunk/src/main.lzz.clamp 2009-02-06 00:32:30 UTC (rev 1166) @@ -1125,7 +1125,6 @@ } else if (sig == SIGUSR1) { toggle(do_pause); } - //break; } } @@ -1291,24 +1290,29 @@ my_spawn(memmon, "memmon"); } + long long start = thread_start_time = current_time_millis(); // At the end, print thread profiling information. finally f(lambda() { if (profile_threads) { + long long end = current_time_millis(); + long long all = end - __ref(start); cout << "thread profiling results:" << endl; - long long total; + long long total = 0; typedef pair<st_thread_t, long long> entry; foreach (entry p, threadtimes) { - const string &name = threadname(p.first); - if (name != "main" && name != "handle_sig_sync") - total += p.second; + total += p.second; } + cout << "total " << total << " all " << all << endl; foreach (entry p, threadtimes) { - const string &name = threadname(p.first); - if (name != "main" && name != "handle_sig_sync") - cout << "- " << threadname(p.first) << ": " << p.second - << " (" << (static_cast<double>(p.second) / total) << "%)" - << endl; + cout << "- " << threadname(p.first) << ": " << p.second << " ms (" + << (static_cast<double>(p.second) / total) << "% of total, " + << (static_cast<double>(p.second) / all) << "% of all)" << endl; } + cout << "- total: " << total << " ms (" << double(total) / all + << "% of all)" << endl; + cout << "- unaccounted: " << all - total << " ms (" + << double(all - total) / all << "% of all)" << endl; + cout << "- all: " << all << " ms" << endl; } }); Added: ydb/trunk/src/serperf.cc =================================================================== --- ydb/trunk/src/serperf.cc (rev 0) +++ ydb/trunk/src/serperf.cc 2009-02-06 00:32:30 UTC (rev 1166) @@ -0,0 +1,80 @@ +#include <iostream> +#include <sstream> +#include <commons/time.h> +#include "ydb.pb.h" +#include <boost/archive/binary_oarchive.hpp> + +using namespace boost::archive; +using namespace std; +using namespace commons; + +int main(int argc, char **argv) { + const int count = atoi(argv[1]), batchsize = atoi(argv[2]); + + TxnBatch batch; + for (int i = 0; i < batchsize; ++i) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(i); + for (int j = 0; j < 5; j++) { + Op *op = txn.add_op(); + op->set_key(j); + op->set_value(-j); + op->set_type(Op::read); + } + } + + { + long long start = current_time_millis(); + for (int i = 0; i < count; ++i) { + stringstream ss; + batch.SerializeToOstream(&ss); + } + long long time = current_time_millis() - start; + double tps = 100 * static_cast<double>(count * batchsize) / time; + cout << "protobuf: " << time << " ms, " << tps << " tps" << endl; + } + + { + long long start = current_time_millis(); + for (int i = 0; i < count; ++i) { + stringbuf sb; + binary_oarchive oa(sb); + for (int j = 0; j < batchsize; ++j) { + const Txn &txn = batch.txn(j); + int seqno = txn.seqno(); + oa << seqno; + for (int k = 0; k < 5; ++k) { + const Op &op = txn.op(k); + int key = op.key(), value = op.value(), type = op.value(); + oa << key << value << type; + } + } + } + long long time = current_time_millis() - start; + double tps = 100 * static_cast<double>(count * batchsize) / time; + cout << "boost: " << time << " ms, " << tps << " tps" << endl; + } + + { + long long start = current_time_millis(); + for (int i = 0; i < count; ++i) { + stringbuf sb; + for (int j = 0; j < batchsize; ++j) { + const Txn &txn = batch.txn(j); +#define write(x) { typeof(x) __x = x; sb.sputn((char*)(&__x), sizeof __x); } + write(txn.seqno()); + for (int k = 0; k < 5; ++k) { + const Op &op = txn.op(k); + write(op.key()); + write(op.value()); + write(op.type()); + } + } + } + long long time = current_time_millis() - start; + double tps = 100 * static_cast<double>(count * batchsize) / time; + cout << "streambuf.sputn: " << time << " ms, " << tps << " tps" << endl; + } + + return 0; +} Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-02-05 09:41:13 UTC (rev 1165) +++ ydb/trunk/src/ydb.proto 2009-02-06 00:32:30 UTC (rev 1166) @@ -75,3 +75,7 @@ // into action. message Ready { } + +message TxnBatch { + repeated Txn txn = 1; +} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-05 09:41:19
|
Revision: 1165 http://assorted.svn.sourceforge.net/assorted/?rev=1165&view=rev Author: yangzhang Date: 2009-02-05 09:41:13 +0000 (Thu, 05 Feb 2009) Log Message: ----------- - added a simple analysis (plotting) tool - added some sample data Added Paths: ----------- serialization-bench/trunk/data/ serialization-bench/trunk/data/log serialization-bench/trunk/tools/ serialization-bench/trunk/tools/analysis.py Added: serialization-bench/trunk/data/log =================================================================== --- serialization-bench/trunk/data/log (rev 0) +++ serialization-bench/trunk/data/log 2009-02-05 09:41:13 UTC (rev 1165) @@ -0,0 +1,26 @@ +TIMES +raw: 50 +raw: 50 +raw: 51 +htonl: 51 +htonl: 52 +htonl: 51 +fstream: 251 +fstream: 250 +fstream: 248 +boost: 42 +boost: 42 +boost: 42 +protobuf: 48 +protobuf: 69 +protobuf: 47 +many protobufs: 334 +many protobufs: 337 +many protobufs: 338 +SIZES +raw.out: 4000000 +htonl.out: 4000000 +fstream.out: 10982555 +boost.out: 4000039 +protobuf.out: 3983488 +manyprotos.out: 3983488 Copied: serialization-bench/trunk/tools/analysis.py (from rev 1160, container-bench/trunk/tools/analysis.py) =================================================================== --- serialization-bench/trunk/tools/analysis.py (rev 0) +++ serialization-bench/trunk/tools/analysis.py 2009-02-05 09:41:13 UTC (rev 1165) @@ -0,0 +1,57 @@ +#!/usr/bin/env python + +from __future__ import with_statement +import sys, itertools, colorsys +from pylab import * + +# Parse the data. +pairs = ( line.split(': ') + for line in itertools.takewhile(lambda line: 'SIZES' not in line, + sys.stdin) + if ': ' in line ) +pairs = sorted( (a, int(b.split()[0])) for (a,b) in pairs ) + +# Aggregate the data. +def f(): + for key, group in itertools.groupby(pairs, lambda (a,b): a): + print key + a = array([b for a,b in group]) + yield key, a.mean(), a.std() + +# Sort the data into groups (based on type of operation). +pairs = sorted(f()) +groups = map(lambda (key, group): (key, list(group)), + itertools.groupby(pairs, lambda x: 0)) + +# Prepare plotting data. +labels, means, sds = zip(*pairs) +xs = arange(len(labels))+.5 +width = .5 + +# Prepare plotting parameters. +step = 1.0 / len(groups) +colors = ( colorsys.hls_to_rgb(step * i, .7, .5) for i in itertools.count() ) +ecolors = ( colorsys.hls_to_rgb(step * i, .3, .5) for i in itertools.count() ) + +# Plot! +start = 0 +for key, group in groups: + stop = start + len(group) + col = colors.next() + bar(xs[start:stop], means[start:stop], yerr = sds[start:stop], width = width, + color = col, edgecolor = col, ecolor = ecolors.next()) + start = stop + +# Annotate. +title("Performance of serialization techniques/libraries") +ylabel("Time (ms)") +xlim(0, xs[-1] + width*2) +xticks(xs + width/2, labels, rotation = 90) +# Make space for the vertically rotated labels. +gcf().subplots_adjust(bottom = .3) +# Show only bottom/left ticks. +gca().get_xaxis().tick_bottom() +gca().get_yaxis().tick_left() + +# Output. +savefig('results.png') Property changes on: serialization-bench/trunk/tools/analysis.py ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-04 10:44:06
|
Revision: 1164 http://assorted.svn.sourceforge.net/assorted/?rev=1164&view=rev Author: yangzhang Date: 2009-02-04 10:44:03 +0000 (Wed, 04 Feb 2009) Log Message: ----------- - Added stperf benchmark, showing no speedup (and in fact some slow-down) as we scale up the number of parallel senders. - Changed nc to the more reliable socat (after encountering problems with bg-ing nc) - Changed the scaling plots to depict speedup rather than time. Modified Paths: -------------- ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Added Paths: ----------- ydb/trunk/tools/stperf.cc ydb/trunk/tools/stperf.mk Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-04 09:26:12 UTC (rev 1163) +++ ydb/trunk/tools/analysis.py 2009-02-04 10:44:03 UTC (rev 1164) @@ -169,15 +169,37 @@ pngpath.symlink(symlink) def mtcp(datpath): + def xform(d): + d['s'] = 1. / d['t'] + return d res = logextract(datpath, 'n', [ r'=== n=(?P<n>\d+)', - r'real\s+0m(?P<t>[0-9\.]+)s' ]) - errorbar(res['n'], res['t mean'], res['t sd']) - title('Time to send a large message (6888896 bytes)') + r'real\s+0m(?P<t>[0-9\.]+)s' ], + xform) + errorbar(res['n'], + res['s mean']/res['s mean'][0], + res['s sd']/res['s mean'][0]) + title('Scaling of sending large message using socat (6888896 bytes)') xlabel('Number of parallel senders') - ylabel('Time (ms)') + ylabel('Speedup') savefig('mtcp.png') +def stperf(datpath): + def xform(d): + d['s'] = 1. / d['t'] + return d + res = logextract(datpath, 'n', + [ r'=== n=(?P<n>\d+)', + r'real\s+0m(?P<t>[0-9\.]+)s' ], + xform) + errorbar(res['n'], + res['s mean']/res['s mean'][0], + res['s sd']/res['s mean'][0]) + title('Scaling of sending large message using ST (6888896 bytes)') + xlabel('Number of parallel senders') + ylabel('Speedup') + savefig('stperf.png') + def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' @@ -187,6 +209,8 @@ run(*argv[2:] if len(argv) > 2 else ['single-log', 'multi-log']) elif argv[1] == 'mtcp': mtcp('mtcp-log') + elif argv[1] == 'stperf': + stperf('stperf-log') else: print >> sys.stderr, 'Unknown command:', argv[1] Added: ydb/trunk/tools/stperf.cc =================================================================== --- ydb/trunk/tools/stperf.cc (rev 0) +++ ydb/trunk/tools/stperf.cc 2009-02-04 10:44:03 UTC (rev 1164) @@ -0,0 +1,74 @@ +// Simple send/recv client/server demo, transferring a large message. Useful +// for scalability tests. + +#include <boost/bind.hpp> +#include <commons/st/st.h> +#include <cstdlib> +#include <iostream> +#include <st.h> +#include <unistd.h> + +using namespace boost; +using namespace commons; +using namespace std; + +enum { port = 9876, size = 100000000 }; // 100mb +char *rbuf, *sbuf, *host; +bool do_r, do_s; +int n, my_i; + +inline size_t bstart(int i) { return i * size / n; } +inline size_t bend(int i) { return i == n - 1 ? size : (i+1) * size / n; } + +void rr(int i, st_netfd_t c) { + st_read_fully(c, rbuf + bstart(i), bend(i) - bstart(i), -1); +} + +void r() { + if (do_r) { + vector<st_thread_t> ts; + st_netfd_t l = st_tcp_listen(port); + for (int i = 0; i < n; i++) { + st_netfd_t c = st_accept(l, 0, 0, -1); + ts.push_back(st_spawn(boost::bind(rr, i, c))); + } + for (int i = 0; i < n; i++) + st_join(ts[i]); + } +} + +void s() { + if (do_s) { + st_netfd_t s = st_tcp_connect(host, port, -1); + st_write(s, sbuf + bstart(my_i), bend(my_i) - bstart(my_i), -1); + } +} + +int main(int argc, char **argv) { + host = strdup("localhost"); + n = 1; + int opt; + while ((opt = getopt(argc, argv, "i:n:rs:")) != -1) { + switch (opt) { + case 'i': my_i = atoi(optarg); break; + case 'n': n = atoi(optarg); break; + case 'r': do_r = true; break; + case 's': do_s = true; host = strdup(optarg); break; + } + } + cout << "n=" << n << " i=" << my_i + << " start=" << bstart(my_i) << " end=" << bend(my_i) << endl; + if (!(do_r || do_s)) do_r = do_s = true; + rbuf = new char[size]; + sbuf = new char[size]; + memset(rbuf, 0, size); + memset(sbuf, 0, size); + + st_init(); + st_thread_t t0 = st_spawn(r); + if (do_r && do_s) st_usleep(10000); + st_thread_t t1 = st_spawn(s); + st_join(t1); + st_join(t0); + return 0; +} Added: ydb/trunk/tools/stperf.mk =================================================================== --- ydb/trunk/tools/stperf.mk (rev 0) +++ ydb/trunk/tools/stperf.mk 2009-02-04 10:44:03 UTC (rev 1164) @@ -0,0 +1,4 @@ +CXXFLAGS += -Wall +LDFLAGS += -static +LDLIBS += -lst -lstx -lresolv +all: stperf Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-04 09:26:12 UTC (rev 1163) +++ ydb/trunk/tools/test.bash 2009-02-04 10:44:03 UTC (rev 1164) @@ -200,6 +200,11 @@ parremote node-setup-ydb-2 } +setup-stperf() { + parscp stperf.{cc,mk} ^:/tmp/ + parssh make -C /tmp/ -f stperf.mk +} + full-setup() { init-setup setup-deps @@ -345,13 +350,20 @@ mtcp-helper() { local leader=$1 n=$(( $# - 1 )) - tagssh $leader 'pkill nc' + tagssh $leader 'pkill socat' shift - while (( $# > 0 )) ; do - tagssh $1 "sleep .5 ; time seq $((1000000/n)) | nc $leader 9876" & + for i in `seq $n` ; do + tagssh $1 " + sleep .2 + ( time seq $((1000000/n)) | socat - TCP4:$leader:$((9876+i)) ) 2>&1 | + fgrep real" & shift done - tagssh $leader "nc -l 9876 > /dev/null" + tagssh $leader " + for i in \`seq $n\` ; do + socat TCP4-LISTEN:\$((9876+i)),reuseaddr - > /dev/null & + done + wait" wait } @@ -376,6 +388,42 @@ hosts="$orighosts" } +stperf-helper() { + local leader=$1 n=$(( $# - 1 )) + shift + parssh "pkill stperf || true" + tagssh $leader "/tmp/stperf -r -n $n > /dev/null" & + sleep .1 + for i in `seq $n` ; do + tagssh $1 " + ( time /tmp/stperf -s $leader -i $((i-1)) -n $n ) 2>&1 | + fgrep real" & + shift + done + wait +} + +stperf() { + hostargs stperf-helper +} + +exp-stperf() { + local out=stperf-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) + ln -sf $out stperf-log + for n in `seq $maxn` ; do # configurations + for i in {1..3} ; do # trials + echo === n=$n i=$i === + echo === n=$n i=$i === > `tty` + stperf + sleep 1 + echo + done + hosts="${hosts% *}" + done >& $out + hosts="$orighosts" +} + stop-helper() { tagssh $1 'pkill -sigint ydb' } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-04 09:26:19
|
Revision: 1163 http://assorted.svn.sourceforge.net/assorted/?rev=1163&view=rev Author: yangzhang Date: 2009-02-04 09:26:12 +0000 (Wed, 04 Feb 2009) Log Message: ----------- simplified longblock.mk Modified Paths: -------------- sandbox/trunk/src/cc/st/longblock.mk Modified: sandbox/trunk/src/cc/st/longblock.mk =================================================================== --- sandbox/trunk/src/cc/st/longblock.mk 2009-02-03 22:29:55 UTC (rev 1162) +++ sandbox/trunk/src/cc/st/longblock.mk 2009-02-04 09:26:12 UTC (rev 1163) @@ -1,2 +1,4 @@ -longblock: - g++ -o longblock longblock.cc -lst -lstx -lresolv +CXXFLAGS += -Wall +LDFLAGS += -static +LDLIBS += -lst -lstx -lresolv +all: longblock This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 22:29:59
|
Revision: 1162 http://assorted.svn.sourceforge.net/assorted/?rev=1162&view=rev Author: yangzhang Date: 2009-02-03 22:29:55 +0000 (Tue, 03 Feb 2009) Log Message: ----------- - Print out the raw data tables. - Added default value lookups to deal with unreliably funneled output. - Fixed the parsing loop to understand the significance of === markers. - Graphs are named after the real filenames of the logs they're generated from. - Added WAL benchmark. - Updated the scaling analysis to include the WAL results. - Added a mtcp benchmark. - Renamed run to rec. - Added --yield-build-up to alleviate the large distortion in recv times (though this greatly inflates the build-up times). - Updated README/TODOs. Modified Paths: -------------- ydb/trunk/README ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-03 22:24:59 UTC (rev 1161) +++ ydb/trunk/README 2009-02-03 22:29:55 UTC (rev 1162) @@ -251,12 +251,20 @@ Period: 1/27-2/3 +- DONE associative containers benchmark +- DONE parallel tcp benchmark - DONE simple wal +- issues: + - multi vs single + - WAL performs well + - what to do? limit parallelism? how? Period: 2/3- -- DONE better wal +- TODO better wal +- TODO better understand multihost recovery - TODO fix up analysis of multihost recovery +- TODO data structures benchmark - TODO implement checkpointing disk-based scheme - TODO implement log-based recovery; show that it sucks - TODO implement group (batch) commit for log-based recovery Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-03 22:24:59 UTC (rev 1161) +++ ydb/trunk/tools/analysis.py 2009-02-03 22:29:55 UTC (rev 1162) @@ -2,6 +2,7 @@ from __future__ import with_statement import re, sys, itertools, colorsys +from path import path from os.path import basename, realpath from pylab import * @@ -28,9 +29,12 @@ def show_table1(dicts): keys = dicts[0].keys() - return show_table([(k, [d[k] for d in dicts]) for k in keys]) + # TODO: Remove the default arg once we have reliably funneled output. + return show_table([(k, [d.get(k, dicts[0][k]) for d in dicts]) + for k in keys]) -def logextract(path, indexkey, pats): +def logextract(path, indexkey, pats, xform = None): + if xform is None: xform = lambda x: x check(path) # Capture values from log using regex pats. def getcaps(): @@ -38,6 +42,7 @@ caps = {} # captures: name -> int/float sats = [ False for pat in pats ] for line in f: + if line.startswith('=== '): print line,; caps = {}; sats = [False for pat in pats] # if line == '\n': print '===', caps.keys(), ''.join('1' if s else '0' for s in sats) for i, pat in enumerate(pats): m = re.search(pat, line) @@ -51,17 +56,18 @@ if all(sats): sats = [ False for pat in pats ] # print '!!!' - yield caps.copy() # [ caps[k] for k in keys ] - caps.clear() + yield xform(caps) + caps = {} # Aggregate the captured values. caps = list(getcaps()) -# print show_table1(caps) + print show_table1(caps) + caps = sorted(caps, key = lambda d: d[indexkey]) keys = [indexkey] + filter(lambda x: x != indexkey, caps[0].keys()) def gen(): for index, ds in itertools.groupby(caps, lambda d: d[indexkey]): ds = list(ds) - print [d['len'] for d in ds] - yield [ [d[k] for k in keys] for d in ds ] + # TODO: Remove the default arg once we have reliably funneled output. + yield [ [d.get(k, ds[0][k]) for k in keys] for d in ds ] a = array(list(gen())) # raw results indexes = a[:,0,0] means = median(a,1) # or a.mean(1) @@ -78,33 +84,46 @@ print return res -def scaling(path): +def scaling(scalingpath, ariespath): print '=== scaling ===' - print 'file:', getname(path) - res = logextract(path, 'n', [ - r'=== n=(?P<n>\d+) ', + print 'file:', getname(scalingpath) + res = logextract(scalingpath, 'n', [ + r'=== n=(?P<n>-?\d+) ', r'issued .*\((?P<tps>[.\d]+) tps\)' ]) - errorbar(res['n'], res['tps mean'], res['tps sd']) + print 'file:', getname(ariespath) + res2 = logextract(ariespath, 'n', [ + r'=== n=(?P<n>-?\d+) ', + r'issued .*\((?P<tps>[.\d]+) tps\)' ]) + + errorbar(hstack([res2['n'], res['n']]), + hstack([res2['tps mean'], res['tps mean']]), + hstack([res2['tps sd'], res['tps sd']])) title('Scaling of baseline throughput with number of nodes') xlabel('Node count') ylabel('Mean TPS (stdev error bars)') - xlim(res['n'].min() - .5, res['n'].max() + .5) + xlim(hstack([res2['n'], res['n']]).min() - .5, + hstack([res2['n'], res['n']]).max() + .5) ylim(ymin = 0) savefig('scaling.png') def run(singlepath, multipath): - for path, titlestr, name in [(singlepath, 'single recoverer', 'single'), - (multipath, 'multi recoverer', 'multi')]: + singlepath, multipath = map(path, [singlepath, multipath]) + for datpath, titlestr, name in [(singlepath, 'single recoverer', 'single'), + (multipath, 'multi recoverer', 'multi')]: + def xform(d): + d['realdump'] = d['dump'] - d['recv'] - d['deser'] + return d print '===', titlestr, '===' - print 'file:', getname(path) - res = logextract(path, 'seqno', + print 'file:', getname(datpath) + res = logextract(datpath, 'seqno', [ r'=== seqno=(?P<seqno>\d+) ', r'got recovery message of (?P<len>\d+) bytes in (?P<dump>\d+) ms: xfer took (?P<recv>\d+) ms, deserialization took (?P<deser>\d+)', r'built up .* (?P<buildup>\d+) ms', r'generating recovery took (?P<gen>\d+) ms', r'replayer caught up; from backlog replayed \d+ txns .* in (?P<catchup>\d+) ms', - r'.*: recovering node caught up; took (?P<total>\d+) ?ms' ] ) + r'.*: recovering node caught up; took (?P<total>\d+) ?ms' ], + xform ) # Colors and positioning width = 5e4 @@ -123,7 +142,7 @@ ehues.next(), label = label, bottom = self.bottom) self.bottom += res[yskey] - mybar('dump mean', 'dump sd', 'State dump') + mybar('realdump mean', 'realdump sd', 'State dump etc.') mybar('recv mean', 'recv sd', 'State receive') mybar('deser mean', 'deser sd', 'State deserialization') mybar('buildup mean', 'buildup sd', 'Build-up') @@ -141,17 +160,33 @@ ax2.set_ylabel('Size of serialized state (KB)', color = col) ax2.set_ylim(ymin = 0) for tl in ax2.get_yticklabels(): tl.set_color(col) - xlim(xmin = min(res['seqno']) - width, xmax = max(res['seqno']) + width) - savefig(name + '.png') + pngpath = datpath.realpath() + '.png' + savefig(pngpath) + symlink = path(name + '.png') + if symlink.isfile(): symlink.remove() + pngpath.symlink(symlink) + +def mtcp(datpath): + res = logextract(datpath, 'n', + [ r'=== n=(?P<n>\d+)', + r'real\s+0m(?P<t>[0-9\.]+)s' ]) + errorbar(res['n'], res['t mean'], res['t sd']) + title('Time to send a large message (6888896 bytes)') + xlabel('Number of parallel senders') + ylabel('Time (ms)') + savefig('mtcp.png') + def main(argv): if len(argv) <= 1: print >> sys.stderr, 'Must specify a command' elif argv[1] == 'scaling': - scaling(argv[2] if len(argv) > 2 else 'scaling-log') + scaling(*argv[2:] if len(argv) > 2 else ['scaling-log', 'aries-log']) elif argv[1] == 'run': run(*argv[2:] if len(argv) > 2 else ['single-log', 'multi-log']) + elif argv[1] == 'mtcp': + mtcp('mtcp-log') else: print >> sys.stderr, 'Unknown command:', argv[1] Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-03 22:24:59 UTC (rev 1161) +++ ydb/trunk/tools/test.bash 2009-02-03 22:29:55 UTC (rev 1162) @@ -232,10 +232,10 @@ scaling-helper() { local leader=$1 shift - tagssh $leader "ydb/src/ydb -l -n $# -X 100000" & + tagssh $leader "ydb/src/ydb -l -n $# -X 100000 ${extraargs:-}" & sleep .1 for rep in "$@" - do tagssh $rep "ydb/src/ydb -n $# -H $leader" & + do tagssh $rep "ydb/src/ydb -n $# -H $leader ${extraargs:-}" & done wait } @@ -249,13 +249,13 @@ # configurations; e.g., "repeat scaling". # TODO: fix this to work also with `hosts`; move into repeat-helper that's run # via hostargs, and change the range= to hosts= -full-scaling() { +exp-scaling() { local out=scaling-log-$(date +%Y-%m-%d-%H:%M:%S-%N) local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) ln -sf $out scaling-log - for n in `seq $maxn -1 1` ; do # configurations + for n in `seq $maxn -1 0` ; do # configurations stop - for i in {1..5} ; do # trials + for i in {1..3} ; do # trials echo === n=$n i=$i === echo === n=$n i=$i === > `tty` scaling @@ -269,7 +269,7 @@ hosts="$orighosts" } -run-helper() { +rec-helper() { local leader=$1 shift : ${seqno:=100000} @@ -282,7 +282,7 @@ done sleep .1 # pexpect 'got all \d+ replicas' leader # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader --yield-catch-up ${extraargs:-}" & # -v --debug-threads -t 200000" & + tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up ${extraargs:-}" & # -v --debug-threads -t 200000" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -293,18 +293,18 @@ wait } -run() { - hostargs run-helper +rec() { + hostargs rec-helper } # Recovery experient. -exp() { +exp-rec() { for seqno in 500000 400000 300000 200000 100000 ; do # configurations stop - for i in {1..5} ; do # trials + for i in {1..3} ; do # trials echo === seqno=$seqno i=$i === echo === seqno=$seqno i=$i === > `tty` - run + rec sleep 1 stop sleep .1 @@ -314,19 +314,68 @@ } # Single-host recovery experiment. -exp-single() { +exp-rec-single() { local out=single-log-$(date +%Y-%m-%d-%H:%M:%S) ln -sf $out single-log - exp >& $out + exp-rec >& $out } # Multi-host recovery experiment. -exp-multi() { +exp-rec-multi() { local out=multi-log-$(date +%Y-%m-%d-%H:%M:%S) ln -sf $out multi-log - extraargs="-m ${extraargs:-}" exp >& $out + extraargs="-m ${extraargs:-}" exp-rec >& $out } +# WAL. +aries() { + extraargs='--wal' scaling ${hosts:-} +} + +exp-aries() { + local out=aries-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out aries-log + for i in {1..3} ; do + echo === n=-1 i=$i === + echo === n=-1 i=$i === > `tty` + aries + echo + done >& $out +} + +mtcp-helper() { + local leader=$1 n=$(( $# - 1 )) + tagssh $leader 'pkill nc' + shift + while (( $# > 0 )) ; do + tagssh $1 "sleep .5 ; time seq $((1000000/n)) | nc $leader 9876" & + shift + done + tagssh $leader "nc -l 9876 > /dev/null" + wait +} + +mtcp() { + hostargs mtcp-helper +} + +exp-mtcp() { + local out=mtcp-log-$(date +%Y-%m-%d-%H:%M:%S-%N) + local orighosts="$hosts" maxn=$(( $(echo $hosts | wc -w) - 1 )) + ln -sf $out mtcp-log + for n in `seq $maxn -1 1` ; do # configurations + for i in {1..3} ; do # trials + echo === n=$n i=$i === + echo === n=$n i=$i === > `tty` + mtcp + sleep 1 + echo + done + hosts="${hosts% *}" + done >& $out + hosts="$orighosts" +} + stop-helper() { tagssh $1 'pkill -sigint ydb' } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 22:25:08
|
Revision: 1161 http://assorted.svn.sourceforge.net/assorted/?rev=1161&view=rev Author: yangzhang Date: 2009-02-03 22:24:59 +0000 (Tue, 03 Feb 2009) Log Message: ----------- added some data Modified Paths: -------------- container-bench/trunk/README Added Paths: ----------- container-bench/trunk/data/ container-bench/trunk/data/log Modified: container-bench/trunk/README =================================================================== --- container-bench/trunk/README 2009-02-03 22:22:54 UTC (rev 1160) +++ container-bench/trunk/README 2009-02-03 22:24:59 UTC (rev 1161) @@ -31,3 +31,5 @@ tools/runner.bash # Builds, runs, and creates `log` file. tools/analysis.py < log # Create `results.png`. + +The log in `data/` is from my Dell XPS 410 with a 1.8G Core 2 Duo and 2 GB memory. Added: container-bench/trunk/data/log =================================================================== --- container-bench/trunk/data/log (rev 0) +++ container-bench/trunk/data/log 2009-02-03 22:24:59 UTC (rev 1161) @@ -0,0 +1,114 @@ +HERE +map init: 485 ms +map reload: 284 ms +map iter: 34 ms +map index: 287 ms +map init: 427 ms +map reload: 295 ms +map iter: 29 ms +map index: 293 ms +2000009 mallocs +HERE +map init: 508 ms +map reload: 289 ms +map iter: 34 ms +map index: 285 ms +map init: 433 ms +map reload: 292 ms +map iter: 29 ms +map index: 286 ms +2000009 mallocs +HERE +map init: 490 ms +map reload: 292 ms +map iter: 31 ms +map index: 309 ms +map init: 413 ms +map reload: 288 ms +map iter: 30 ms +map index: 285 ms +2000009 mallocs +HERE +btree init: 254 ms +btree reload: 229 ms +btree iter: 26 ms +btree index: 269 ms +btree init: 256 ms +btree reload: 230 ms +btree iter: 25 ms +btree index: 259 ms +137503 mallocs +HERE +btree init: 240 ms +btree reload: 201 ms +btree iter: 22 ms +btree index: 222 ms +btree init: 222 ms +btree reload: 201 ms +btree iter: 23 ms +btree index: 233 ms +137503 mallocs +HERE +btree init: 234 ms +btree reload: 206 ms +btree iter: 24 ms +btree index: 256 ms +btree init: 224 ms +btree reload: 202 ms +btree iter: 24 ms +btree index: 250 ms +137503 mallocs +HERE +hash init: 200 ms +hash reload: 29 ms +hash iter: 12 ms +hash index: 32 ms +hash init: 165 ms +hash reload: 25 ms +hash iter: 12 ms +hash index: 23 ms +2000043 mallocs +HERE +hash init: 207 ms +hash reload: 30 ms +hash iter: 20 ms +hash index: 27 ms +hash init: 181 ms +hash reload: 28 ms +hash iter: 11 ms +hash index: 26 ms +2000043 mallocs +HERE +hash init: 178 ms +hash reload: 25 ms +hash iter: 12 ms +hash index: 24 ms +hash init: 160 ms +hash reload: 25 ms +hash iter: 12 ms +hash index: 23 ms +2000043 mallocs +HERE +arr init: 6 ms +arr reload: 2 ms +arr index: 3 ms +arr init: 8 ms +arr reload: 4 ms +arr index: 3 ms +9 mallocs +HERE +arr init: 5 ms +arr reload: 3 ms +arr index: 3 ms +arr init: 5 ms +arr reload: 4 ms +arr index: 3 ms +9 mallocs +HERE +arr init: 5 ms +arr reload: 3 ms +arr index: 5 ms +arr init: 5 ms +arr reload: 3 ms +arr index: 3 ms +9 mallocs This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 22:22:58
|
Revision: 1160 http://assorted.svn.sourceforge.net/assorted/?rev=1160&view=rev Author: yangzhang Date: 2009-02-03 22:22:54 +0000 (Tue, 03 Feb 2009) Log Message: ----------- final touch ups and polish Modified Paths: -------------- container-bench/trunk/tools/analysis.py container-bench/trunk/tools/runner.bash Modified: container-bench/trunk/tools/analysis.py =================================================================== --- container-bench/trunk/tools/analysis.py 2009-02-03 22:15:39 UTC (rev 1159) +++ container-bench/trunk/tools/analysis.py 2009-02-03 22:22:54 UTC (rev 1160) @@ -33,8 +33,9 @@ start = 0 for key, group in groups: stop = start + len(group) - bar(xs[start:stop], means[start:stop], yerr = sds[start:stop], width = width) # , -# color = colors.next(), edgecolor = ecolors.next()) + col = colors.next() + bar(xs[start:stop], means[start:stop], yerr = sds[start:stop], width = width, + color = col, edgecolor = col, ecolor = ecolors.next()) start = stop # Annotate. Modified: container-bench/trunk/tools/runner.bash =================================================================== --- container-bench/trunk/tools/runner.bash 2009-02-03 22:15:39 UTC (rev 1159) +++ container-bench/trunk/tools/runner.bash 2009-02-03 22:22:54 UTC (rev 1160) @@ -4,9 +4,11 @@ src="$(dirname "$0")/../src" -make -C "$src/" +make -s -C "$src/" for i in 1 2 4 8 ; do - LD_PRELOAD="$src/../../../sandbox/trunk/src/nix/preload/interposer.so" \ - "$src/bench" $i + for j in {1..3} ; do + LD_PRELOAD="$src/../../../sandbox/trunk/src/nix/preload/interposer.so" \ + "$src/bench" $i + done done > log This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 22:15:46
|
Revision: 1159 http://assorted.svn.sourceforge.net/assorted/?rev=1159&view=rev Author: yangzhang Date: 2009-02-03 22:15:39 +0000 (Tue, 03 Feb 2009) Log Message: ----------- - polished up analysis tools - added runner script - renamed scan -> init - updated README Modified Paths: -------------- container-bench/trunk/README container-bench/trunk/src/bench.cc container-bench/trunk/tools/analysis.py Added Paths: ----------- container-bench/trunk/tools/runner.bash Modified: container-bench/trunk/README =================================================================== --- container-bench/trunk/README 2009-02-03 20:18:52 UTC (rev 1158) +++ container-bench/trunk/README 2009-02-03 22:15:39 UTC (rev 1159) @@ -19,6 +19,15 @@ Analysis tools requirements: -- [Matplotlib] 0.98.3 +- [Matplotlib] 0.98.5.2 + - due to [this bug](http://article.gmane.org/gmane.comp.python.matplotlib.general/14474) [Matplotlib]: http://matplotlib.sf.net/ + +Usage +----- + +Run: + + tools/runner.bash # Builds, runs, and creates `log` file. + tools/analysis.py < log # Create `results.png`. Modified: container-bench/trunk/src/bench.cc =================================================================== --- container-bench/trunk/src/bench.cc 2009-02-03 20:18:52 UTC (rev 1158) +++ container-bench/trunk/src/bench.cc 2009-02-03 22:15:39 UTC (rev 1159) @@ -29,7 +29,7 @@ template<typename T> inline void -scan(T &m, const string &label) +iter(T &m, const string &label) { long long start = current_time_millis(); typedef pair<int, int> pii; @@ -45,7 +45,6 @@ index(T &m, const string &label) { long long start = current_time_millis(); - typedef pair<int, int> pii; for (int i = 0; i < len; i++) { xs[i] = m[i]; } @@ -67,7 +66,7 @@ map<int, int> m; load(m, "map init"); load(m, "map reload"); - scan(m, "map scan"); + iter(m, "map iter"); index(m, "map index"); } if (mode & 0x2) { @@ -75,14 +74,14 @@ btree_map<int, int> m; load(m, "btree init"); load(m, "btree reload"); - scan(m, "btree scan"); + iter(m, "btree iter"); index(m, "btree index"); } if (mode & 0x4) { unordered_map<int, int> m; load(m, "hash init"); load(m, "hash reload"); - scan(m, "hash scan"); + iter(m, "hash iter"); index(m, "hash index"); } if (mode & 0x8) { Modified: container-bench/trunk/tools/analysis.py =================================================================== --- container-bench/trunk/tools/analysis.py 2009-02-03 20:18:52 UTC (rev 1158) +++ container-bench/trunk/tools/analysis.py 2009-02-03 22:15:39 UTC (rev 1159) @@ -1,22 +1,52 @@ #!/usr/bin/env python from __future__ import with_statement -import sys +import sys, itertools, colorsys from pylab import * +# Parse the data. pairs = ( line.split(': ') for line in sys.stdin if ': ' in line ) -pairs = [ (a, int(b.split()[0])) for (a,b) in pairs ] +pairs = sorted( (a, int(b.split()[0])) for (a,b) in pairs ) -labels, data = zip(*pairs) -xs = arange(len(data))+.5 +# Aggregate the data. +def f(): + for key, group in itertools.groupby(pairs, lambda (a,b): a): + print key + a = array([b for a,b in group]) + yield key, a.mean(), a.std() + +# Sort the data into groups (based on type of operation). +pairs = sorted(f(), key = lambda tup: tup[0].split()[-1]) +groups = map(lambda (key, group): (key, list(group)), itertools.groupby(pairs, lambda tup: tup[0].split()[-1])) + +# Prepare plotting data. +labels, means, sds = zip(*pairs) +xs = arange(len(labels))+.5 width = .5 -bar(xs, data, width = width) +# Prepare plotting parameters. +step = 1.0 / len(groups) +colors = ( colorsys.hls_to_rgb(step * i, .7, .5) for i in itertools.count() ) +ecolors = ( colorsys.hls_to_rgb(step * i, .3, .5) for i in itertools.count() ) + +# Plot! +start = 0 +for key, group in groups: + stop = start + len(group) + bar(xs[start:stop], means[start:stop], yerr = sds[start:stop], width = width) # , +# color = colors.next(), edgecolor = ecolors.next()) + start = stop + +# Annotate. +title("Performance of associative containers") +ylabel("Time (ms)") xlim(0, xs[-1] + width*2) xticks(xs + width/2, labels, rotation = 90) -title("Performance of associative containers") -# show only bottom/left ticks +# Make space for the vertically rotated labels. +gcf().subplots_adjust(bottom = .3) +# Show only bottom/left ticks. gca().get_xaxis().tick_bottom() gca().get_yaxis().tick_left() +# Output. savefig('results.png') Added: container-bench/trunk/tools/runner.bash =================================================================== --- container-bench/trunk/tools/runner.bash (rev 0) +++ container-bench/trunk/tools/runner.bash 2009-02-03 22:15:39 UTC (rev 1159) @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -o errexit -o nounset + +src="$(dirname "$0")/../src" + +make -C "$src/" + +for i in 1 2 4 8 ; do + LD_PRELOAD="$src/../../../sandbox/trunk/src/nix/preload/interposer.so" \ + "$src/bench" $i +done > log Property changes on: container-bench/trunk/tools/runner.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 20:18:59
|
Revision: 1158 http://assorted.svn.sourceforge.net/assorted/?rev=1158&view=rev Author: yangzhang Date: 2009-02-03 20:18:52 +0000 (Tue, 03 Feb 2009) Log Message: ----------- added longblock, threads Added Paths: ----------- sandbox/trunk/src/cc/st/longblock.cc sandbox/trunk/src/cc/st/longblock.mk sandbox/trunk/src/cc/st/threads.cc Added: sandbox/trunk/src/cc/st/longblock.cc =================================================================== --- sandbox/trunk/src/cc/st/longblock.cc (rev 0) +++ sandbox/trunk/src/cc/st/longblock.cc 2009-02-03 20:18:52 UTC (rev 1158) @@ -0,0 +1,50 @@ +// Demo that the st_read_fully function does in fact re-yield in the middle of +// its operation, which makes sense when reading large amounts of data. + +#include <st.h> +#include <iostream> +#include <commons/st/st.h> + +using namespace commons; +using namespace std; + +enum { port = 9876, size = 100000000 }; // 100mb +char *rbuf, *sbuf; + +void r() { + st_netfd_t l = st_tcp_listen(port); + st_netfd_t c = st_accept(l, 0, 0, -1); + st_read_fully(c, rbuf, size, -1); +} + +void s() { + st_netfd_t s = st_tcp_connect("localhost", port, -1); + st_write(s, sbuf, size, -1); +} + +void f() { + int i = 0; + while (i < size) { + while (rbuf[i] == 0) st_usleep(1); + for (; i < size && rbuf[i] == 1; i++); + cout << i << endl; + } +} + +int main() { + rbuf = new char[size]; + sbuf = new char[size]; + for (int i = 0; i < size; i++) { + rbuf[i] = 0; + sbuf[i] = 1; + } + st_init(); + st_thread_t t = st_spawn(f); + st_thread_t t0 = st_spawn(r); + st_usleep(10000); + st_thread_t t1 = st_spawn(s); + st_join(t1); + st_join(t0); + st_join(t); + return 0; +} Added: sandbox/trunk/src/cc/st/longblock.mk =================================================================== --- sandbox/trunk/src/cc/st/longblock.mk (rev 0) +++ sandbox/trunk/src/cc/st/longblock.mk 2009-02-03 20:18:52 UTC (rev 1158) @@ -0,0 +1,2 @@ +longblock: + g++ -o longblock longblock.cc -lst -lstx -lresolv Added: sandbox/trunk/src/cc/st/threads.cc =================================================================== --- sandbox/trunk/src/cc/st/threads.cc (rev 0) +++ sandbox/trunk/src/cc/st/threads.cc 2009-02-03 20:18:52 UTC (rev 1158) @@ -0,0 +1,40 @@ +// Demonstrates ST compatibility with OS threads. + +#include <iostream> +#include <pthread.h> +#include <st.h> +#include <unistd.h> + +using namespace std; + +void *f(void *p) { + cout << "hello" << endl; + st_sleep(1); + cout << "goodbye" << endl; + return 0; +} + +void *F(void *p) { + cout << "HELLO" << endl; + sleep(1); + cout << "GOODBYE" << endl; + return 0; +} + +int main() { + st_init(); + st_thread_t t1 = st_thread_create(f, 0, 1, 0); + st_thread_t t2 = st_thread_create(f, 0, 1, 0); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_t T; + pthread_create(&T, &attr, F, 0); + + st_thread_join(t1, 0); + st_thread_join(t2, 0); + + pthread_join(T, 0); + + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 07:39:08
|
Revision: 1157 http://assorted.svn.sourceforge.net/assorted/?rev=1157&view=rev Author: yangzhang Date: 2009-02-03 07:39:02 +0000 (Tue, 03 Feb 2009) Log Message: ----------- removed trash; trash-cli is usable now Removed Paths: ------------- shell-tools/trunk/src/trash.bash Deleted: shell-tools/trunk/src/trash.bash =================================================================== --- shell-tools/trunk/src/trash.bash 2009-02-03 00:00:26 UTC (rev 1156) +++ shell-tools/trunk/src/trash.bash 2009-02-03 07:39:02 UTC (rev 1157) @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -# vim:et:sw=4 - -# Brain-dead trash mechanism - -. common.bash || exit 1 - -: "${TRASHDIR:="$HOME/trash"}" -dir="$TRASHDIR/$(date)" -mkdir -p "$dir" -mv "$@" "$dir" -echo "$PWD: $0 $@" >> "$dir/trash-metadata" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-03 00:00:31
|
Revision: 1156 http://assorted.svn.sourceforge.net/assorted/?rev=1156&view=rev Author: yangzhang Date: 2009-02-03 00:00:26 +0000 (Tue, 03 Feb 2009) Log Message: ----------- - added simple "WAL" and leader-only mode - changed experiments from yield vs block to single vs multi host Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/analysis.py ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/README 2009-02-03 00:00:26 UTC (rev 1156) @@ -249,8 +249,13 @@ takes more around 50 ms - DONE start building infrastructure for disk IO -Period: 1/27- +Period: 1/27-2/3 +- DONE simple wal + +Period: 2/3- + +- DONE better wal - TODO fix up analysis of multihost recovery - TODO implement checkpointing disk-based scheme - TODO implement log-based recovery; show that it sucks Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/src/Makefile 2009-02-03 00:00:26 UTC (rev 1156) @@ -26,7 +26,8 @@ GCOV := -fprofile-arcs -ftest-coverage endif LDFLAGS := -pthread -lstx -lst -lresolv -lprotobuf -lgtest \ - -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt $(GPROF) + -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ + -lboost_serialization-gcc43-mt $(GPROF) # The -Wno- warnings are for boost. CXXFLAGS := -g3 -pthread $(GPROF) -Wall -Werror -Wextra -Woverloaded-virtual \ -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/src/main.lzz.clamp 2009-02-03 00:00:26 UTC (rev 1156) @@ -1,4 +1,6 @@ #hdr +#include <boost/archive/binary_iarchive.hpp> +#include <boost/archive/binary_oarchive.hpp> #include <boost/bind.hpp> #include <boost/foreach.hpp> #include <boost/program_options.hpp> @@ -14,7 +16,7 @@ #include <cstdio> #include <cstring> // strsignal #include <iostream> -#include <fstream> +#include <fstream> // ofstream #include <gtest/gtest.h> #include <malloc.h> #include <map> @@ -27,6 +29,7 @@ #include "ydb.pb.h" #define foreach BOOST_FOREACH using namespace boost; +using namespace boost::archive; using namespace commons; using namespace std; using namespace testing; @@ -42,7 +45,7 @@ size_t accept_joiner_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory; + debug_threads, multirecover, disk, debug_memory, use_wal; long long timelim, read_thresh, write_thresh; // Control. @@ -355,6 +358,35 @@ } /** + * ARIES write-ahead log. No undo logging necessary (no steal). + */ +class wal +{ +public: + wal() : of("wal"), out(of) {} + void del(int key) { + int op = op_del; // TODO: is this really necessary? + out & op & key; + } + void write(int key, int val) { + int op = op_write; + out & op & key & val; + } + void commit() { + int op = op_commit; + out & op; + } +private: + enum { op_del, op_write, op_commit }; + ofstream of; + binary_oarchive out; +}; + +// Globals +map<int, int> g_map; +wal *g_wal; + +/** * Keep issuing transactions to the replicas. */ void @@ -388,7 +420,7 @@ // Generate a random transaction. Txn txn; - txn.set_seqno(seqno++); + txn.set_seqno(seqno); int count = randint(min_ops, max_ops + 1); for (int o = 0; o < count; o++) { Op *op = txn.add_op(); @@ -400,8 +432,13 @@ if (do_pause) do_pause.waitreset(); - // Broadcast. - bcastmsg(fds, txn); + // Process, or broadcast and increment seqno. + if (fds.empty()) { + int dummy_seqno = seqno - 1; + process_txn(nullptr, g_map, txn, dummy_seqno, true); + } else { + bcastmsg(fds, txn); + } // Checkpoint. if (txn.seqno() % chkpt == 0) { @@ -426,6 +463,8 @@ cout << "stopping on issue of seqno " << txn.seqno() << endl; stop_hub.set(); } + + ++seqno; } Txn txn; @@ -441,6 +480,7 @@ process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, bool caught_up) { + wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); Response res; res.set_seqno(txn.seqno()); @@ -449,27 +489,33 @@ for (int o = 0; o < txn.op_size(); o++) { const Op &op = txn.op(o); const int key = op.key(); + ::map<int, int>::iterator it = map.find(key); if (show_updates || count_updates) { - if (map.find(key) != map.end()) { + if (it != map.end()) { if (show_updates) cout << "existing key: " << key << endl; if (count_updates) updates++; } } switch (op.type()) { case Op::read: - res.add_result(map[key]); + if (it == map.end()) res.add_result(0); + else res.add_result(it->second); break; case Op::write: - map[key] = op.value(); + if (use_wal) wal.write(key, op.value()); + if (it == map.end()) map[key] = op.value(); + else it->second = op.value(); break; case Op::del: - map.erase(key); + if (it != map.end()) { + if (use_wal) wal.del(key); + map.erase(it); + } break; } } - if (caught_up) { - sendmsg(leader, res); - } + if (use_wal) wal.commit(); + if (caught_up && leader != nullptr) sendmsg(leader, res); } void @@ -531,6 +577,8 @@ * calculating the sub-range of the map for which this node is responsible. * * \param[in] nnodes The total number nodes in the Init message list. + * + * \param[in] wal The WAL. */ void process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, @@ -801,6 +849,9 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; + scoped_ptr<wal> pwal(new wal); + g_wal = pwal.get(); + // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); st_closing close_listener(listener); @@ -1139,6 +1190,8 @@ "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") + ("wal", po::bool_switch(&use_wal), + "enable ARIES write-ahead logging") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/tools/analysis.py 2009-02-03 00:00:26 UTC (rev 1156) @@ -93,9 +93,9 @@ ylim(ymin = 0) savefig('scaling.png') -def run(blockpath, yieldpath): - for path, titlestr, name in [#(blockpath, 'blocking scheme', 'block'), - (yieldpath, 'yielding scheme', 'yield')]: +def run(singlepath, multipath): + for path, titlestr, name in [(singlepath, 'single recoverer', 'single'), + (multipath, 'multi recoverer', 'multi')]: print '===', titlestr, '===' print 'file:', getname(path) res = logextract(path, 'seqno', @@ -151,7 +151,7 @@ elif argv[1] == 'scaling': scaling(argv[2] if len(argv) > 2 else 'scaling-log') elif argv[1] == 'run': - run(*argv[2:] if len(argv) > 2 else ['block-log', 'yield-log']) + run(*argv[2:] if len(argv) > 2 else ['single-log', 'multi-log']) else: print >> sys.stderr, 'Unknown command:', argv[1] Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-02 22:42:05 UTC (rev 1155) +++ ydb/trunk/tools/test.bash 2009-02-03 00:00:26 UTC (rev 1156) @@ -282,7 +282,7 @@ done sleep .1 # pexpect 'got all \d+ replicas' leader # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader ${extraargs:-}" & # -v --debug-threads -t 200000" & + tagssh $1 "ydb/src/ydb -H $leader --yield-catch-up ${extraargs:-}" & # -v --debug-threads -t 200000" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -297,8 +297,9 @@ hostargs run-helper } -full-run() { - for seqno in 500000 400000 300000 200000 100000 ; do # 200000 300000 400000 500000 ; do # 700000 900000; do # configurations +# Recovery experient. +exp() { + for seqno in 500000 400000 300000 200000 100000 ; do # configurations stop for i in {1..5} ; do # trials echo === seqno=$seqno i=$i === @@ -312,16 +313,18 @@ done } -full-block() { - local out=block-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out block-log - full-run >& $out +# Single-host recovery experiment. +exp-single() { + local out=single-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out single-log + exp >& $out } -full-yield() { - local out=yield-log-$(date +%Y-%m-%d-%H:%M:%S) - ln -sf $out yield-log - extraargs="--yield-catch-up ${extraargs:-}" full-run >& $out +# Multi-host recovery experiment. +exp-multi() { + local out=multi-log-$(date +%Y-%m-%d-%H:%M:%S) + ln -sf $out multi-log + extraargs="-m ${extraargs:-}" exp >& $out } stop-helper() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-02 22:42:18
|
Revision: 1155 http://assorted.svn.sourceforge.net/assorted/?rev=1155&view=rev Author: yangzhang Date: 2009-02-02 22:42:05 +0000 (Mon, 02 Feb 2009) Log Message: ----------- added serialization benchmark Added Paths: ----------- serialization-bench/ serialization-bench/trunk/ serialization-bench/trunk/README serialization-bench/trunk/src/ serialization-bench/trunk/src/Makefile serialization-bench/trunk/src/main.cc serialization-bench/trunk/src/test.proto Added: serialization-bench/trunk/README =================================================================== --- serialization-bench/trunk/README (rev 0) +++ serialization-bench/trunk/README 2009-02-02 22:42:05 UTC (rev 1155) @@ -0,0 +1,5 @@ +Overview +-------- + +This is a benchmark comparing various systems for data serialization. +And under different workloads. Added: serialization-bench/trunk/src/Makefile =================================================================== --- serialization-bench/trunk/src/Makefile (rev 0) +++ serialization-bench/trunk/src/Makefile 2009-02-02 22:42:05 UTC (rev 1155) @@ -0,0 +1,13 @@ +all: main + +main: main.cc test.pb.h test.pb.cc + g++ -O3 -I. -g3 -Wall -o main main.cc test.pb.cc \ + -lboost_serialization-gcc43-mt -lprotobuf + +test.pb.cc test.pb.h: test.proto + protoc --cpp_out=. test.proto + +.PHONY: clean + +clean: + rm -f main test.pb.* *.out \ No newline at end of file Added: serialization-bench/trunk/src/main.cc =================================================================== --- serialization-bench/trunk/src/main.cc (rev 0) +++ serialization-bench/trunk/src/main.cc 2009-02-02 22:42:05 UTC (rev 1155) @@ -0,0 +1,96 @@ +#include "test.pb.h" +#include <arpa/inet.h> +#include <boost/archive/binary_iarchive.hpp> +#include <boost/archive/binary_oarchive.hpp> +#include <commons/time.h> +#include <fstream> +#include <iostream> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + +using namespace boost::archive; +using namespace commons; +using namespace std; + +int main() { + int count = 1e6, reps = 3; + cout << "TIMES" << endl; + for (int r = 0; r < reps; r++) { + ofstream o("raw.out", ios::out); + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + o.write(reinterpret_cast<char*>(&i), sizeof i); + } + long long end = current_time_millis(); + cout << "raw: " << end - start << endl; + } + for (int r = 0; r < reps; r++) { + ofstream o("htonl.out", ios::out); + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + int n = htonl(i); + o.write(reinterpret_cast<char*>(&n), sizeof i); + } + long long end = current_time_millis(); + cout << "htonl: " << end - start << endl; + } + for (int r = 0; r < reps; r++) { + ofstream o("fstream.out", ios::out); + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + int n = htonl(i); + o << n << " "; + } + long long end = current_time_millis(); + cout << "fstream: " << end - start << endl; + } + for (int r = 0; r < reps; r++) { + filebuf b; + b.open("boost.out", ios::out); + binary_oarchive o(b); + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + o & i; + } + long long end = current_time_millis(); + cout << "boost: " << end - start << endl; + } + for (int r = 0; r < reps; r++) { + ofstream o("protobuf.out", ios::out); + varray a; + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + a.add_v(i); + } + a.SerializeToOstream(&o); + long long end = current_time_millis(); + cout << "protobuf: " << end - start << endl; + } + for (int r = 0; r < reps; r++) { + ofstream o("manyprotos.out", ios::out); + long long start = current_time_millis(); + for (int i = 0; i < count; i++) { + varray a; + a.add_v(i); + a.SerializeToOstream(&o); + } + long long end = current_time_millis(); + cout << "many protobufs: " << end - start << endl; + } + + cout << "SIZES" << endl; + vector<string> paths; + paths.push_back("raw.out"); + paths.push_back("htonl.out"); + paths.push_back("fstream.out"); + paths.push_back("boost.out"); + paths.push_back("protobuf.out"); + paths.push_back("manyprotos.out"); + for (size_t i = 0; i < paths.size(); i++) { + struct stat s; + stat(paths[i].c_str(), &s); + cout << paths[i] << ": " << s.st_size << endl; + } + return 0; +} Added: serialization-bench/trunk/src/test.proto =================================================================== --- serialization-bench/trunk/src/test.proto (rev 0) +++ serialization-bench/trunk/src/test.proto 2009-02-02 22:42:05 UTC (rev 1155) @@ -0,0 +1,4 @@ +option optimize_for = SPEED; +message varray { + repeated int32 v = 1; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-31 08:13:38
|
Revision: 1154 http://assorted.svn.sourceforge.net/assorted/?rev=1154&view=rev Author: yangzhang Date: 2009-01-31 08:13:37 +0000 (Sat, 31 Jan 2009) Log Message: ----------- added some notes to operator demo Modified Paths: -------------- sandbox/trunk/src/cc/operators.cc Modified: sandbox/trunk/src/cc/operators.cc =================================================================== --- sandbox/trunk/src/cc/operators.cc 2009-01-31 08:12:50 UTC (rev 1153) +++ sandbox/trunk/src/cc/operators.cc 2009-01-31 08:13:37 UTC (rev 1154) @@ -1,6 +1,9 @@ #include <iostream> using namespace std; +// This is how you overload the increment/decrement operators; distinguish +// between postfix/prefix with an extra dummy argument. + class c { public: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-01-31 08:13:01
|
Revision: 1153 http://assorted.svn.sourceforge.net/assorted/?rev=1153&view=rev Author: yangzhang Date: 2009-01-31 08:12:50 +0000 (Sat, 31 Jan 2009) Log Message: ----------- added operator demo Added Paths: ----------- sandbox/trunk/src/cc/operators.cc Added: sandbox/trunk/src/cc/operators.cc =================================================================== --- sandbox/trunk/src/cc/operators.cc (rev 0) +++ sandbox/trunk/src/cc/operators.cc 2009-01-31 08:12:50 UTC (rev 1153) @@ -0,0 +1,20 @@ +#include <iostream> +using namespace std; + +class c +{ +public: + c& operator++() { cout << "++x" << endl; return *this; } + c operator++(int) { cout << "x++" << endl; return *this; } + c& operator--() { cout << "--x" << endl; return *this; } + c operator--(int) { cout << "x--" << endl; return *this; } +}; + +int main() { + c c; + c++; + ++c; + c--; + --c; + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |