assorted-commits Mailing List for Assorted projects (Page 23)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-04-29 04:28:29
|
Revision: 1352 http://assorted.svn.sourceforge.net/assorted/?rev=1352&view=rev Author: yangzhang Date: 2009-04-29 04:28:27 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added file_size, read_file_as_array; tweaks to use other commons abstractions Modified Paths: -------------- cpp-commons/trunk/src/commons/files.h Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-04-29 02:13:34 UTC (rev 1351) +++ cpp-commons/trunk/src/commons/files.h 2009-04-29 04:28:27 UTC (rev 1352) @@ -12,6 +12,7 @@ #include <unistd.h> #include <fcntl.h> +#include <commons/array.h> #include <commons/check.h> #include <commons/closing.h> @@ -29,6 +30,26 @@ }; /** + * Get the size of a file in bytes. + */ + off_t file_size(const char *path) + { + struct stat s; + check0x(stat(path, &s)); + return s.st_size; + } + + /** + * Get the size of a file in bytes. + */ + off_t file_size(int fd) + { + struct stat s; + check0x(fstat(fd, &s)); + return s.st_size; + } + + /** * Read in a whole file as a string. */ void read_file_as_string ( const string & name, string & out ) { @@ -46,20 +67,9 @@ out = vector<char> ( istreambuf_iterator<char> ( in ), istreambuf_iterator<char>() ); } - off_t file_size(const char *path) - { - struct stat s; - check0x(stat(path, &s)); - return s.st_size; - } - - off_t file_size(int fd) - { - struct stat s; - check0x(fstat(fd, &s)); - return s.st_size; - } - + /** + * Read in a whole file as an array. + */ array<char> read_file_as_array(const char *path) { closingfd fd(checknnegerr(open(path, O_RDONLY))); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:13:42
|
Revision: 1351 http://assorted.svn.sourceforge.net/assorted/?rev=1351&view=rev Author: yangzhang Date: 2009-04-29 02:13:34 +0000 (Wed, 29 Apr 2009) Log Message: ----------- using target-specific variable values Modified Paths: -------------- cpp-commons/trunk/src/test/Makefile Modified: cpp-commons/trunk/src/test/Makefile =================================================================== --- cpp-commons/trunk/src/test/Makefile 2009-04-29 02:09:13 UTC (rev 1350) +++ cpp-commons/trunk/src/test/Makefile 2009-04-29 02:13:34 UTC (rev 1351) @@ -42,7 +42,7 @@ clean: rm -f $(BINS) -st: st.cc - $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) -lst -lstx -lresolv -o $@ +st: LDLIBS += -lst -lstx -lresolv +squeue: LDLIBS += -lboost_thread-gcc43-mt .PHONY: all build clean This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:09:22
|
Revision: 1350 http://assorted.svn.sourceforge.net/assorted/?rev=1350&view=rev Author: yangzhang Date: 2009-04-29 02:09:13 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added checkneq Modified Paths: -------------- cpp-commons/trunk/src/commons/check.h Modified: cpp-commons/trunk/src/commons/check.h =================================================================== --- cpp-commons/trunk/src/commons/check.h 2009-04-29 02:08:54 UTC (rev 1349) +++ cpp-commons/trunk/src/commons/check.h 2009-04-29 02:09:13 UTC (rev 1350) @@ -137,6 +137,16 @@ } template<typename T, typename U> inline void + _checkneq(T l, U r, const char *file, int line) + { + if (l == r) { + stringstream ss; + ss << "expecting " << l << " != " << r; + _check(false, file, line, "%s", ss.str().c_str()); + } + } + + template<typename T, typename U> inline void _checkeq(T l, U r, const char *file, int line) { if (l != r) { @@ -202,6 +212,12 @@ #define check0x(expr, msg...) _check0(expr, __FILE__, __LINE__, ## msg) /** + * Checks that the values are not equal. The exception will include both + * values, as formatted by ostream <<. + */ +#define checkneq(l, r, msg...) _checkneq(l, r, __FILE__, __LINE__, ## msg) + +/** * Checks that the values are equal. The exception will include both values, * as formatted by ostream <<. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:09:06
|
Revision: 1349 http://assorted.svn.sourceforge.net/assorted/?rev=1349&view=rev Author: yangzhang Date: 2009-04-29 02:08:54 +0000 (Wed, 29 Apr 2009) Log Message: ----------- removed note Modified Paths: -------------- cpp-commons/trunk/src/commons/threads.h Modified: cpp-commons/trunk/src/commons/threads.h =================================================================== --- cpp-commons/trunk/src/commons/threads.h 2009-04-29 02:08:29 UTC (rev 1348) +++ cpp-commons/trunk/src/commons/threads.h 2009-04-29 02:08:54 UTC (rev 1349) @@ -61,7 +61,6 @@ /** * Run a function in pthread. * \return The new pthread_t on success, 0 on failure. - * TODO: Is it safe to treat the pthread_t as an integral type? */ pthread_t spawn(const boost::function<void()>& f) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:08:37
|
Revision: 1348 http://assorted.svn.sourceforge.net/assorted/?rev=1348&view=rev Author: yangzhang Date: 2009-04-29 02:08:29 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added file_size, read_file_as_array; tweaks to use other commons abstractions Modified Paths: -------------- cpp-commons/trunk/src/commons/files.h Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-04-29 02:07:36 UTC (rev 1347) +++ cpp-commons/trunk/src/commons/files.h 2009-04-29 02:08:29 UTC (rev 1348) @@ -13,6 +13,7 @@ #include <fcntl.h> #include <commons/check.h> +#include <commons/closing.h> namespace commons { @@ -45,6 +46,28 @@ out = vector<char> ( istreambuf_iterator<char> ( in ), istreambuf_iterator<char>() ); } + off_t file_size(const char *path) + { + struct stat s; + check0x(stat(path, &s)); + return s.st_size; + } + + off_t file_size(int fd) + { + struct stat s; + check0x(fstat(fd, &s)); + return s.st_size; + } + + array<char> read_file_as_array(const char *path) + { + closingfd fd(checknnegerr(open(path, O_RDONLY))); + array<char> buf(file_size(fd)); + checkeqnneg(read(fd, buf, buf.size()), static_cast<ssize_t>(buf.size())); + return buf; + } + /** * Load an entire file directly into buf and also give us the length of the * buffer (size of the file). @@ -55,7 +78,7 @@ load_file(const char *path, size_t & len, unsigned int ncpus) { struct stat sb; - int fd = checkpass(open(path, 0)); + closingfd fd(checknnegerr(open(path, O_RDONLY))); check0x(fstat(fd, &sb)); check(sb.st_size <= 0xffffffff); @@ -78,8 +101,6 @@ } } - check0x(close(fd)); - buf[len] = '\0'; // don't let strcmp() run off the end return buf; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:07:43
|
Revision: 1347 http://assorted.svn.sourceforge.net/assorted/?rev=1347&view=rev Author: yangzhang Date: 2009-04-29 02:07:36 +0000 (Wed, 29 Apr 2009) Log Message: ----------- const tweaks Modified Paths: -------------- cpp-commons/trunk/src/commons/closing.h Modified: cpp-commons/trunk/src/commons/closing.h =================================================================== --- cpp-commons/trunk/src/commons/closing.h 2009-04-29 02:07:17 UTC (rev 1346) +++ cpp-commons/trunk/src/commons/closing.h 2009-04-29 02:07:36 UTC (rev 1347) @@ -21,8 +21,8 @@ public: closing(T x) : x(x), scoped(true) {} ~closing() { if (scoped) Closer::apply(x); } - T get() { return x; } - operator T() { return x; } + T get() const { return x; } + operator T() const { return x; } T release() { scoped = false; return x; } private: T x; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:07:23
|
Revision: 1345 http://assorted.svn.sourceforge.net/assorted/?rev=1345&view=rev Author: yangzhang Date: 2009-04-29 02:07:06 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added synchronized queue Added Paths: ----------- cpp-commons/trunk/src/commons/squeue.h Added: cpp-commons/trunk/src/commons/squeue.h =================================================================== --- cpp-commons/trunk/src/commons/squeue.h (rev 0) +++ cpp-commons/trunk/src/commons/squeue.h 2009-04-29 02:07:06 UTC (rev 1345) @@ -0,0 +1,63 @@ +#ifndef COMMONS_SQUEUE_H +#define COMMONS_SQUEUE_H + +#include <queue> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> + +namespace commons +{ + using namespace std; + using namespace boost; + + template<typename T> + class concurrent_queue + { + private: + queue<T> q_; + mutable mutex m_; + condition_variable c_; + + public: + template<typename U> void push(U &&x) + { + mutex::scoped_lock lock(m_); + q_.push(forward<U>(x)); + lock.unlock(); + c_.notify_one(); + } + + bool empty() const + { + mutex::scoped_lock lock(m_); + return q_.empty(); + } + + bool try_pop(T& x) + { + mutex::scoped_lock lock(m_); + if (q_.empty()) return false; + x = q_.front(); + q_.pop(); + return true; + } + + void pop(T& x) + { + mutex::scoped_lock lock(m_); + while (q_.empty()) c_.wait(lock); + x = move(q_.front()); + q_.pop(); + } + + T take() { + mutex::scoped_lock lock(m_); + while (q_.empty()) c_.wait(lock); + T x = move(q_.front()); + q_.pop(); + return x; + } + }; +} + +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:07:20
|
Revision: 1346 http://assorted.svn.sourceforge.net/assorted/?rev=1346&view=rev Author: yangzhang Date: 2009-04-29 02:07:17 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added versioned heap Added Paths: ----------- cpp-commons/trunk/src/commons/versioned_heap.h Added: cpp-commons/trunk/src/commons/versioned_heap.h =================================================================== --- cpp-commons/trunk/src/commons/versioned_heap.h (rev 0) +++ cpp-commons/trunk/src/commons/versioned_heap.h 2009-04-29 02:07:17 UTC (rev 1346) @@ -0,0 +1,86 @@ +#ifndef VERSIONED_HEAP_H +#define VERSIONED_HEAP_H + +#define _POSIX_C_SOURCE 200112L +#include <cstdlib> +#include <vector> +#include <boost/foreach.hpp> +#define foreach BOOST_FOREACH + +namespace commons +{ + + using namespace std; + + /** + * A simple object memory pool that places things into pages that are tagged + * with a version number. + */ + template<typename T, typename VersionType = int> + class versioned_heap + { + public: + typedef VersionType version_type; + typedef T value_type; + struct hdr { + version_type version; + uint32_t index; + }; + versioned_heap(size_t pgsz = 131072) : + pgsz_(pgsz), pgcnt_((pgsz - sizeof(hdr)) / sizeof(T)) {} + ~versioned_heap() { + foreach (char *page, pages_) ::free(page); + } + T &construct(version_type version = 0) { + for (size_t i = 0; i < freemap_.size(); ++i) { + if (freemap_[i]) { + freemap_[i] = false; + char *pos = pages_[i / pgcnt_] + sizeof(hdr) + + (i % pgcnt_) * sizeof(T); + if (version == 0) ++hdrof(pos).version; + else hdrof(pos).version = version; + return *new (pos) T; + } + } + char *page; + posix_memalign(reinterpret_cast<void**>(&page), pgsz_, pgsz_); + hdr &h = *reinterpret_cast<hdr*>(page); + h.version = version; + h.index = uint32_t(pages_.size()); + pages_.push_back(page); + freemap_.resize(freemap_.size() + pgcnt_, true); + freemap_[freemap_.size() - pgcnt_] = false; + return *new (page + sizeof(hdr)) T; + } + void free(T &x, version_type version = 0) { + x.~T(); + hdr &h = hdrof(&x); + if (version == 0) ++h.version; + else h.version = version; + char *px = reinterpret_cast<char*>(&x); + char *p0 = reinterpret_cast<char*>(&h + 1); + freemap_[h.index * pgcnt_ + (px - p0) / sizeof(T)] = true; + } + void touch(T &p) { ++hdrof(&p).version; } + void touch(T &p, version_type version) { hdrof(&p).version = version; } + size_t pgcnt() const { return pgcnt_; } + size_t pgsz() const { return pgsz_; } + const vector<char*> &pages() const { return pages_; } + /** Pronounced "header of". */ + const hdr &hdrof(void *p) const { + return *reinterpret_cast<hdr*>(uintptr_t(p) & ~(pgsz_ - 1)); + } + hdr &hdrof(void *p) { + return *reinterpret_cast<hdr*>(uintptr_t(p) & ~(pgsz_ - 1)); + } + private: + size_t pgsz_; + size_t pgcnt_; + vector<bool> freemap_; + vector<char*> pages_; + }; + +} + +#undef foreach +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 02:06:33
|
Revision: 1344 http://assorted.svn.sourceforge.net/assorted/?rev=1344&view=rev Author: yangzhang Date: 2009-04-29 02:06:23 +0000 (Wed, 29 Apr 2009) Log Message: ----------- using boost::function instead of std::function Modified Paths: -------------- cpp-commons/trunk/src/commons/delegates.h Modified: cpp-commons/trunk/src/commons/delegates.h =================================================================== --- cpp-commons/trunk/src/commons/delegates.h 2009-04-27 21:23:05 UTC (rev 1343) +++ cpp-commons/trunk/src/commons/delegates.h 2009-04-29 02:06:23 UTC (rev 1344) @@ -12,7 +12,7 @@ using namespace boost; using namespace std; - typedef std::function<void()> fn; + typedef boost::function<void()> fn; UNUSED static void swallow(const fn f) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-27 21:23:14
|
Revision: 1343 http://assorted.svn.sourceforge.net/assorted/?rev=1343&view=rev Author: yangzhang Date: 2009-04-27 21:23:05 +0000 (Mon, 27 Apr 2009) Log Message: ----------- added demo that posix_memalign doesn't call malloc Modified Paths: -------------- sandbox/trunk/src/nix/preload/interposer.c sandbox/trunk/src/nix/preload/run.bash Added Paths: ----------- sandbox/trunk/src/nix/preload/memalign.c Modified: sandbox/trunk/src/nix/preload/interposer.c =================================================================== --- sandbox/trunk/src/nix/preload/interposer.c 2009-04-27 21:16:07 UTC (rev 1342) +++ sandbox/trunk/src/nix/preload/interposer.c 2009-04-27 21:23:05 UTC (rev 1343) @@ -1,30 +1,45 @@ #define _GNU_SOURCE #include <stdio.h> +#include <stdlib.h> #include <dlfcn.h> +#ifdef COUNT +int mallocs = 0; +void dump() { printf("%d mallocs\n", mallocs); } +#endif + void * malloc(size_t sz) { // Find and cache the next malloc (in our example it should be the "real" // malloc). static void * (*func)(); - if (!func) + if (!func) { func = (void *(*)()) dlsym(RTLD_NEXT, "malloc"); +#ifdef COUNT + atexit(dump); +#endif + } - // Do our thang. #ifdef VERBOSE - printf("malloc(%u) is called\n", sz); + printf("malloc(%zu) is called\n", sz); #endif // Call real malloc. void *p = func(sz); - // Do our thang. +#ifdef COUNT + ++mallocs; +#endif + #ifdef VERBOSE printf("malloc returned %p\n", p); #endif + +#ifdef FAILURES if (!p) printf("malloc failed!\n"); +#endif return p; } Added: sandbox/trunk/src/nix/preload/memalign.c =================================================================== --- sandbox/trunk/src/nix/preload/memalign.c (rev 0) +++ sandbox/trunk/src/nix/preload/memalign.c 2009-04-27 21:23:05 UTC (rev 1343) @@ -0,0 +1,9 @@ +// malloc doesn't get called at all! + +#include <stdlib.h> + +int main() { + void *p; + posix_memalign(&p, 131072, 131072); + return 0; +} Modified: sandbox/trunk/src/nix/preload/run.bash =================================================================== --- sandbox/trunk/src/nix/preload/run.bash 2009-04-27 21:16:07 UTC (rev 1342) +++ sandbox/trunk/src/nix/preload/run.bash 2009-04-27 21:23:05 UTC (rev 1343) @@ -4,6 +4,9 @@ gcc -Wall -o testc test.c g++ -Wall -o testcc test.cc -gcc -Wall -shared -fPIC -ldl -o interposer.so interposer.c -LD_PRELOAD=./interposer.so ./testc -LD_PRELOAD=./interposer.so ./testcc || true +g++ -Wall -o memalign memalign.c +gcc -Wall -DCOUNT -shared -fPIC -ldl -o interposer_c.so interposer.c +gcc -Wall -DVERBOSE -shared -fPIC -ldl -o interposer_v.so interposer.c +LD_PRELOAD=./interposer_v.so ./memalign +LD_PRELOAD=./interposer_c.so ./testc +LD_PRELOAD=./interposer_c.so ./testcc || true This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-27 21:16:15
|
Revision: 1342 http://assorted.svn.sourceforge.net/assorted/?rev=1342&view=rev Author: yangzhang Date: 2009-04-27 21:16:07 +0000 (Mon, 27 Apr 2009) Log Message: ----------- added gdbinit for stl structures Modified Paths: -------------- configs/trunk/setup-yang.bash Added Paths: ----------- configs/trunk/src/gdbinit Modified: configs/trunk/setup-yang.bash =================================================================== --- configs/trunk/setup-yang.bash 2009-04-17 19:52:31 UTC (rev 1341) +++ configs/trunk/setup-yang.bash 2009-04-27 21:16:07 UTC (rev 1342) @@ -17,9 +17,10 @@ install .darcs/ darcs/boring install .devscripts devscripts install .dput.cf dput.cf -install .inputrc inputrc +install .gdbinit gdbinit install .gitconfig gitconfig install .gitignore gitignore +install .inputrc inputrc install .owl owl install .pythonrc.py pythonrc.py install .quiltrc quiltrc Added: configs/trunk/src/gdbinit =================================================================== --- configs/trunk/src/gdbinit (rev 0) +++ configs/trunk/src/gdbinit 2009-04-27 21:16:07 UTC (rev 1342) @@ -0,0 +1,695 @@ +# From http://www.yolinux.com/TUTORIALS/src/dbinit_stl_views-1.03.txt +# +# STL GDB evaluators/views/utilities - 1.03 +# +# The new GDB commands: +# are entirely non instrumental +# do not depend on any "inline"(s) - e.g. size(), [], etc +# are extremely tolerant to debugger settings +# +# This file should be "included" in .gdbinit as following: +# source stl-views.gdb or just paste it into your .gdbinit file +# +# The following STL containers are currently supported: +# +# std::vector<T> -- via pvector command +# std::list<T> -- via plist or plist_member command +# std::map<T,T> -- via pmap or pmap_member command +# std::multimap<T,T> -- via pmap or pmap_member command +# std::set<T> -- via pset command +# std::multiset<T> -- via pset command +# std::deque<T> -- via pdequeue command +# std::stack<T> -- via pstack command +# std::queue<T> -- via pqueue command +# std::priority_queue<T> -- via ppqueue command +# std::bitset<n> -- via pbitset command +# std::string -- via pstring command +# std::widestring -- via pwstring command +# +# The end of this file contains (optional) C++ beautifiers +# Make sure your debugger supports $argc +# +# Simple GDB Macros writen by Dan Marinescu (H-PhD) - License GPL +# Inspired by intial work of Tom Malnar, +# Tony Novac (PhD) / Cornell / Stanford, +# Gilad Mishne (PhD) and Many Many Others. +# Contact: dan...@ya... (Subject: STL) +# +# Modified to work with g++ 4.3 by Anders Elton +# Also added _member functions, that instead of printing the entire class in map, prints a member. + + + +# +# std::vector<> +# + +define pvector + if $argc == 0 + help pvector + else + set $size = $arg0._M_impl._M_finish - $arg0._M_impl._M_start + set $capacity = $arg0._M_impl._M_end_of_storage - $arg0._M_impl._M_start + set $size_max = $size - 1 + end + if $argc == 1 + set $i = 0 + while $i < $size + printf "elem[%u]: ", $i + p *($arg0._M_impl._M_start + $i) + set $i++ + end + end + if $argc == 2 + set $idx = $arg1 + if $idx < 0 || $idx > $size_max + printf "idx1, idx2 are not in acceptable range: [0..%u].\n", $size_max + else + printf "elem[%u]: ", $idx + p *($arg0._M_impl._M_start + $idx) + end + end + if $argc == 3 + set $start_idx = $arg1 + set $stop_idx = $arg2 + if $start_idx > $stop_idx + set $tmp_idx = $start_idx + set $start_idx = $stop_idx + set $stop_idx = $tmp_idx + end + if $start_idx < 0 || $stop_idx < 0 || $start_idx > $size_max || $stop_idx > $size_max + printf "idx1, idx2 are not in acceptable range: [0..%u].\n", $size_max + else + set $i = $start_idx + while $i <= $stop_idx + printf "elem[%u]: ", $i + p *($arg0._M_impl._M_start + $i) + set $i++ + end + end + end + if $argc > 0 + printf "Vector size = %u\n", $size + printf "Vector capacity = %u\n", $capacity + printf "Element " + whatis $arg0._M_impl._M_start + end +end + +document pvector + Prints std::vector<T> information. + Syntax: pvector <vector> <idx1> <idx2> + Note: idx, idx1 and idx2 must be in acceptable range [0..<vector>.size()-1]. + Examples: + pvector v - Prints vector content, size, capacity and T typedef + pvector v 0 - Prints element[idx] from vector + pvector v 1 2 - Prints elements in range [idx1..idx2] from vector +end + +# +# std::list<> +# + +define plist + if $argc == 0 + help plist + else + set $head = &$arg0._M_impl._M_node + set $current = $arg0._M_impl._M_node._M_next + set $size = 0 + while $current != $head + if $argc == 2 + printf "elem[%u]: ", $size + p *($arg1*)($current + 1) + end + if $argc == 3 + if $size == $arg2 + printf "elem[%u]: ", $size + p *($arg1*)($current + 1) + end + end + set $current = $current._M_next + set $size++ + end + printf "List size = %u \n", $size + if $argc == 1 + printf "List " + whatis $arg0 + printf "Use plist <variable_name> <element_type> to see the elements in the list.\n" + end + end +end + +document plist + Prints std::list<T> information. + Syntax: plist <list> <T> <idx>: Prints list size, if T defined all elements or just element at idx + Examples: + plist l - prints list size and definition + plist l int - prints all elements and list size + plist l int 2 - prints the third element in the list (if exists) and list size +end + +define plist_member + if $argc == 0 + help plist_member + else + set $head = &$arg0._M_impl._M_node + set $current = $arg0._M_impl._M_node._M_next + set $size = 0 + while $current != $head + if $argc == 3 + printf "elem[%u]: ", $size + p (*($arg1*)($current + 1)).$arg2 + end + if $argc == 4 + if $size == $arg3 + printf "elem[%u]: ", $size + p (*($arg1*)($current + 1)).$arg2 + end + end + set $current = $current._M_next + set $size++ + end + printf "List size = %u \n", $size + if $argc == 1 + printf "List " + whatis $arg0 + printf "Use plist_member <variable_name> <element_type> <member> to see the elements in the list.\n" + end + end +end + +document plist_member + Prints std::list<T> information. + Syntax: plist <list> <T> <idx>: Prints list size, if T defined all elements or just element at idx + Examples: + plist_member l int member - prints all elements and list size + plist_member l int member 2 - prints the third element in the list (if exists) and list size +end + + +# +# std::map and std::multimap +# + +define pmap + if $argc == 0 + help pmap + else + set $tree = $arg0 + set $i = 0 + set $node = $tree._M_t._M_impl._M_header._M_left + set $end = $tree._M_t._M_impl._M_header + set $tree_size = $tree._M_t._M_impl._M_node_count + if $argc == 1 + printf "Map " + whatis $tree + printf "Use pmap <variable_name> <left_element_type> <right_element_type> to see the elements in the map.\n" + end + if $argc == 3 + while $i < $tree_size + set $value = (void *)($node + 1) + printf "elem[%u].left: ", $i + p *($arg1*)$value + set $value = $value + sizeof($arg1) + printf "elem[%u].right: ", $i + p *($arg2*)$value + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + end + if $argc == 4 + set $idx = $arg3 + set $ElementsFound = 0 + while $i < $tree_size + set $value = (void *)($node + 1) + if *($arg1*)$value == $idx + printf "elem[%u].left: ", $i + p *($arg1*)$value + set $value = $value + sizeof($arg1) + printf "elem[%u].right: ", $i + p *($arg2*)$value + set $ElementsFound++ + end + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + printf "Number of elements found = %u\n", $ElementsFound + end + if $argc == 5 + set $idx1 = $arg3 + set $idx2 = $arg4 + set $ElementsFound = 0 + while $i < $tree_size + set $value = (void *)($node + 1) + set $valueLeft = *($arg1*)$value + set $valueRight = *($arg2*)($value + sizeof($arg1)) + if $valueLeft == $idx1 && $valueRight == $idx2 + printf "elem[%u].left: ", $i + p $valueLeft + printf "elem[%u].right: ", $i + p $valueRight + set $ElementsFound++ + end + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + printf "Number of elements found = %u\n", $ElementsFound + end + printf "Map size = %u\n", $tree_size + end +end + +document pmap + Prints std::map<TLeft and TRight> or std::multimap<TLeft and TRight> information. Works for std::multimap as well. + Syntax: pmap <map> <TtypeLeft> <TypeRight> <valLeft> <valRight>: Prints map size, if T defined all elements or just element(s) with val(s) + Examples: + pmap m - prints map size and definition + pmap m int int - prints all elements and map size + pmap m int int 20 - prints the element(s) with left-value = 20 (if any) and map size + pmap m int int 20 200 - prints the element(s) with left-value = 20 and right-value = 200 (if any) and map size +end + + +define pmap_member + if $argc == 0 + help pmap_member + else + set $tree = $arg0 + set $i = 0 + set $node = $tree._M_t._M_impl._M_header._M_left + set $end = $tree._M_t._M_impl._M_header + set $tree_size = $tree._M_t._M_impl._M_node_count + if $argc == 1 + printf "Map " + whatis $tree + printf "Use pmap <variable_name> <left_element_type> <right_element_type> to see the elements in the map.\n" + end + if $argc == 5 + while $i < $tree_size + set $value = (void *)($node + 1) + printf "elem[%u].left: ", $i + p (*($arg1*)$value).$arg2 + set $value = $value + sizeof($arg1) + printf "elem[%u].right: ", $i + p (*($arg3*)$value).$arg4 + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + end + if $argc == 6 + set $idx = $arg5 + set $ElementsFound = 0 + while $i < $tree_size + set $value = (void *)($node + 1) + if *($arg1*)$value == $idx + printf "elem[%u].left: ", $i + p (*($arg1*)$value).$arg2 + set $value = $value + sizeof($arg1) + printf "elem[%u].right: ", $i + p (*($arg3*)$value).$arg4 + set $ElementsFound++ + end + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + printf "Number of elements found = %u\n", $ElementsFound + end + printf "Map size = %u\n", $tree_size + end +end + +document pmap_member + Prints std::map<TLeft and TRight> or std::multimap<TLeft and TRight> information. Works for std::multimap as well. + Syntax: pmap <map> <TtypeLeft> <TypeRight> <valLeft> <valRight>: Prints map size, if T defined all elements or just element(s) with val(s) + Examples: + pmap_member m class1 member1 class2 member2 - prints class1.member1 : class2.member2 + pmap_member m class1 member1 class2 member2 lvalue - prints class1.member1 : class2.member2 where class1 == lvalue +end + + +# +# std::set and std::multiset +# + +define pset + if $argc == 0 + help pset + else + set $tree = $arg0 + set $i = 0 + set $node = $tree._M_t._M_impl._M_header._M_left + set $end = $tree._M_t._M_impl._M_header + set $tree_size = $tree._M_t._M_impl._M_node_count + if $argc == 1 + printf "Set " + whatis $tree + printf "Use pset <variable_name> <element_type> to see the elements in the set.\n" + end + if $argc == 2 + while $i < $tree_size + set $value = (void *)($node + 1) + printf "elem[%u]: ", $i + p *($arg1*)$value + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + end + if $argc == 3 + set $idx = $arg2 + set $ElementsFound = 0 + while $i < $tree_size + set $value = (void *)($node + 1) + if *($arg1*)$value == $idx + printf "elem[%u]: ", $i + p *($arg1*)$value + set $ElementsFound++ + end + if $node._M_right != 0 + set $node = $node._M_right + while $node._M_left != 0 + set $node = $node._M_left + end + else + set $tmp_node = $node._M_parent + while $node == $tmp_node._M_right + set $node = $tmp_node + set $tmp_node = $tmp_node._M_parent + end + if $node._M_right != $tmp_node + set $node = $tmp_node + end + end + set $i++ + end + printf "Number of elements found = %u\n", $ElementsFound + end + printf "Set size = %u\n", $tree_size + end +end + +document pset + Prints std::set<T> or std::multiset<T> information. Works for std::multiset as well. + Syntax: pset <set> <T> <val>: Prints set size, if T defined all elements or just element(s) having val + Examples: + pset s - prints set size and definition + pset s int - prints all elements and the size of s + pset s int 20 - prints the element(s) with value = 20 (if any) and the size of s +end + + + +# +# std::dequeue +# + +define pdequeue + if $argc == 0 + help pdequeue + else + set $size = 0 + set $start_cur = $arg0._M_impl._M_start._M_cur + set $start_last = $arg0._M_impl._M_start._M_last + set $start_stop = $start_last + while $start_cur != $start_stop + p *$start_cur + set $start_cur++ + set $size++ + end + set $finish_first = $arg0._M_impl._M_finish._M_first + set $finish_cur = $arg0._M_impl._M_finish._M_cur + set $finish_last = $arg0._M_impl._M_finish._M_last + if $finish_cur < $finish_last + set $finish_stop = $finish_cur + else + set $finish_stop = $finish_last + end + while $finish_first != $finish_stop + p *$finish_first + set $finish_first++ + set $size++ + end + printf "Dequeue size = %u\n", $size + end +end + +document pdequeue + Prints std::dequeue<T> information. + Syntax: pdequeue <dequeue>: Prints dequeue size, if T defined all elements + Deque elements are listed "left to right" (left-most stands for front and right-most stands for back) + Example: + pdequeue d - prints all elements and size of d +end + + + +# +# std::stack +# + +define pstack + if $argc == 0 + help pstack + else + set $start_cur = $arg0.c._M_impl._M_start._M_cur + set $finish_cur = $arg0.c._M_impl._M_finish._M_cur + set $size = $finish_cur - $start_cur + set $i = $size - 1 + while $i >= 0 + p *($start_cur + $i) + set $i-- + end + printf "Stack size = %u\n", $size + end +end + +document pstack + Prints std::stack<T> information. + Syntax: pstack <stack>: Prints all elements and size of the stack + Stack elements are listed "top to buttom" (top-most element is the first to come on pop) + Example: + pstack s - prints all elements and the size of s +end + + + +# +# std::queue +# + +define pqueue + if $argc == 0 + help pqueue + else + set $start_cur = $arg0.c._M_impl._M_start._M_cur + set $finish_cur = $arg0.c._M_impl._M_finish._M_cur + set $size = $finish_cur - $start_cur + set $i = 0 + while $i < $size + p *($start_cur + $i) + set $i++ + end + printf "Queue size = %u\n", $size + end +end + +document pqueue + Prints std::queue<T> information. + Syntax: pqueue <queue>: Prints all elements and the size of the queue + Queue elements are listed "top to bottom" (top-most element is the first to come on pop) + Example: + pqueue q - prints all elements and the size of q +end + + + +# +# std::priority_queue +# + +define ppqueue + if $argc == 0 + help ppqueue + else + set $size = $arg0.c._M_impl._M_finish - $arg0.c._M_impl._M_start + set $capacity = $arg0.c._M_impl._M_end_of_storage - $arg0.c._M_impl._M_start + set $i = $size - 1 + while $i >= 0 + p *($arg0.c._M_impl._M_start + $i) + set $i-- + end + printf "Priority queue size = %u\n", $size + printf "Priority queue capacity = %u\n", $capacity + end +end + +document ppqueue + Prints std::priority_queue<T> information. + Syntax: ppqueue <priority_queue>: Prints all elements, size and capacity of the priority_queue + Priority_queue elements are listed "top to buttom" (top-most element is the first to come on pop) + Example: + ppqueue pq - prints all elements, size and capacity of pq +end + + + +# +# std::bitset +# + +define pbitset + if $argc == 0 + help pbitset + else + p /t $arg0._M_w + end +end + +document pbitset + Prints std::bitset<n> information. + Syntax: pbitset <bitset>: Prints all bits in bitset + Example: + pbitset b - prints all bits in b +end + + + +# +# std::string +# + +define pstring + if $argc == 0 + help pstring + else + printf "String \t\t\t= \"%s\"\n", $arg0._M_data() + printf "String size/length \t= %u\n", $arg0._M_rep()._M_length + printf "String capacity \t= %u\n", $arg0._M_rep()._M_capacity + printf "String ref-count \t= %d\n", $arg0._M_rep()._M_refcount + end +end + +document pstring + Prints std::string information. + Syntax: pstring <string> + Example: + pstring s - Prints content, size/length, capacity and ref-count of string s +end + +# +# std::wstring +# + +define pwstring + if $argc == 0 + help pwstring + else + call printf("WString \t\t= \"%ls\"\n", $arg0._M_data()) + printf "WString size/length \t= %u\n", $arg0._M_rep()._M_length + printf "WString capacity \t= %u\n", $arg0._M_rep()._M_capacity + printf "WString ref-count \t= %d\n", $arg0._M_rep()._M_refcount + end +end + +document pwstring + Prints std::wstring information. + Syntax: pwstring <wstring> + Example: + pwstring s - Prints content, size/length, capacity and ref-count of wstring s +end + +# +# C++ related beautifiers (optional) +# + +set print pretty on +set print object on +set print static-members on +set print vtbl on +set print demangle on +set demangle-style gnu-v3 +set print sevenbit-strings off This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-17 19:52:50
|
Revision: 1341 http://assorted.svn.sourceforge.net/assorted/?rev=1341&view=rev Author: yangzhang Date: 2009-04-17 19:52:31 +0000 (Fri, 17 Apr 2009) Log Message: ----------- added note Modified Paths: -------------- sandbox/trunk/src/cc/boost_thread.cc Modified: sandbox/trunk/src/cc/boost_thread.cc =================================================================== --- sandbox/trunk/src/cc/boost_thread.cc 2009-04-16 20:25:22 UTC (rev 1340) +++ sandbox/trunk/src/cc/boost_thread.cc 2009-04-17 19:52:31 UTC (rev 1341) @@ -19,6 +19,8 @@ int main() { + // This doesn't work: + //thread t(f); thread t(bind(f)); t.join(); return 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-16 20:25:33
|
Revision: 1340 http://assorted.svn.sourceforge.net/assorted/?rev=1340&view=rev Author: yangzhang Date: 2009-04-16 20:25:22 +0000 (Thu, 16 Apr 2009) Log Message: ----------- added demo of tbb incompat with c++0x move Added Paths: ----------- sandbox/trunk/src/cc/tbbmove.cc Added: sandbox/trunk/src/cc/tbbmove.cc =================================================================== --- sandbox/trunk/src/cc/tbbmove.cc (rev 0) +++ sandbox/trunk/src/cc/tbbmove.cc 2009-04-16 20:25:22 UTC (rev 1340) @@ -0,0 +1,9 @@ +#include <tbb/concurrent_queue.h> +#include <commons/unique_ptr.h> +using namespace std; +using namespace tbb; +int main() { + concurrent_queue<unique_ptr<int> > q; + return 0; +} + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-07 03:07:24
|
Revision: 1339 http://assorted.svn.sourceforge.net/assorted/?rev=1339&view=rev Author: yangzhang Date: 2009-04-07 03:07:15 +0000 (Tue, 07 Apr 2009) Log Message: ----------- added distcc-accelerated builds; added setups for path.py, unzip Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-04-07 03:06:11 UTC (rev 1338) +++ ydb/trunk/tools/test.bash 2009-04-07 03:07:15 UTC (rev 1339) @@ -229,7 +229,7 @@ refresh-local cd ~/ydb/src make clean - PPROF=1 OPT=1 make WTF= ydb DISTCC= + PPROF=1 OPT=1 make -j16 ydb WTF= DISTCC= } node-setup-cog() { @@ -237,6 +237,49 @@ toast --quiet arm 'http://nedbatchelder.com/code/cog/cog-2.1.tar.gz' } +node-setup-pathpy() { + check-remote + toast --quiet arm 'http://pypi.python.org/packages/source/p/path.py/path-2.2.zip' +} + +node-setup-unzip() { + check-remote + sudo aptitude -y install unzip +} + +node-setup-distcc() { + check-remote + mkdir -p ~/.distcc/ + cat > ~/.distcc/hosts << "EOF" +@farm1 +@farm2 +@farm3 +@farm4 +@farm5 +@farm6 +@farm7 +@farm8 +@farm9 +@farm10 +@farm11 +@farm12 +@farm13 +@farm14 +EOF + + # Add all hosts to the known hosts file to avoid being prompted. + for i in $( grep ^@ ~/.distcc/hosts ) + do ssh -o StrictHostKeyChecking=no ${i#@} true + done +} + +setup-distcc() { + parscp id_dsa ^:.ssh/id_dsa + parscp id_dsa.pub ^:.ssh/id_dsa.pub + parhosts ssh-copy-id -i id_dsa ^ + parremote node-setup-distcc +} + init-setup() { parremote node-init-setup } @@ -274,6 +317,10 @@ parremote node-setup-gtest parremote node-setup-ghash parremote node-setup-cog + parremote node-setup-pathpy + parremote node-setup-unzip + + setup-distcc } setup-ydb() { @@ -283,7 +330,10 @@ svn export ~/work/assorted/cpp-commons/trunk/ /tmp/ccom-export/ parscp -r /tmp/ydb-export/* ^:ydb/ parscp -r /tmp/ccom-export/* ^:ccom/ - parremote node-setup-ydb + local head="${hosts%% *}" tail="${hosts#* }" + remote $head node-setup-ydb + scp $head:ydb/src/ydb /tmp/ + hosts="$tail" parscp /tmp/ydb ^:ydb/src/ } setup-stperf() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-07 03:06:25
|
Revision: 1338 http://assorted.svn.sourceforge.net/assorted/?rev=1338&view=rev Author: yangzhang Date: 2009-04-07 03:06:11 +0000 (Tue, 07 Apr 2009) Log Message: ----------- added DISTCC control; some rearrangement Modified Paths: -------------- ydb/trunk/src/Makefile Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-04-07 03:03:54 UTC (rev 1337) +++ ydb/trunk/src/Makefile 2009-04-07 03:06:11 UTC (rev 1338) @@ -4,7 +4,10 @@ SHELL := bash CCACHE := ccache -export CCACHE_PREFIX := distcc +DISTCC := distcc +ifneq ($(DISTCC),) +export CCACHE_PREFIX := $(DISTCC) +endif ifneq ($(WTF),) WTF := wtf endif @@ -76,13 +79,6 @@ CXXFLAGS0 = $(OPT) -MD -pthread $(GPROF) $(WARNINGS) -std=gnu++0x \ $(ORIGCXXFLAGS) -ifneq ($(PCH),) - CXXFLAGS = $(CXXFLAGS0) -include pch.h -$(LZZOBJS): pch.h.gch -else - CXXFLAGS = $(CXXFLAGS0) -endif - LDFLAGS := -pthread $(GPROF) $(LDFLAGS) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ @@ -112,7 +108,6 @@ COGS := $(wildcard tpcc/*.cog) COGOUTS := $(foreach cog,$(COGS),$(patsubst %.cog,%,$(cog))) -GENOBJS := $(LZZOBJS) $(PBOBJS) $(COGOBJS) GENOUTS := $(CLAMPOUTS) $(PBOUTS) $(COGOUTS) TPCCOBJS := clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables @@ -140,7 +135,7 @@ protoc --cpp_out=. $< %.clamp: %.clamp.lzz - ln -sf $< $@ + cp $< $@ chmod -w $@ %.cc.clamp %.hh.clamp: %.clamp @@ -171,6 +166,15 @@ %.h.gch: %.h $(LINK.cc) $(OUTPUT_OPTION) $< +# Be careful with PCH and distcc! It actually makes compilation *slower* for +# me when used with distcc (anecdotal: 12s vs 15s). +ifneq ($(PCH),) +CXXFLAGS = $(CXXFLAGS0) -include pch.h +$(OBJS): pch.h.gch pch.h +else +CXXFLAGS = $(CXXFLAGS0) +endif + # # Project-specific rules # @@ -193,7 +197,7 @@ -Wno-unused-parameter clean: - rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(TARGETS) $(CLAMPS) $(LZZOUTS) pch.h pch.h.gch .clamp/ *.d tpcc/*.d deps.mk + rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(TARGETS) $(CLAMPS) $(LZZOUTS) .clamp/ *.d tpcc/*.d deps.mk # pch.h pch.h.gch .SECONDARY: $(GENOUTS) $(OBJS) $(CLAMPS) $(LZZOUTS) pch.h pch.h.gch This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-07 03:04:06
|
Revision: 1337 http://assorted.svn.sourceforge.net/assorted/?rev=1337&view=rev Author: yangzhang Date: 2009-04-07 03:03:54 +0000 (Tue, 07 Apr 2009) Log Message: ----------- vim line Modified Paths: -------------- ydb/trunk/src/tpcc/tpcctables.cc.cog Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-04-03 18:12:18 UTC (rev 1336) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-04-07 03:03:54 UTC (rev 1337) @@ -740,3 +740,5 @@ serbuf_.reset(arr.get(), arr.size()); } + +// vim:ft=cpp This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-03 18:12:45
|
Revision: 1336 http://assorted.svn.sourceforge.net/assorted/?rev=1336&view=rev Author: yangzhang Date: 2009-04-03 18:12:18 +0000 (Fri, 03 Apr 2009) Log Message: ----------- added first thunderbird extension (despam) Added Paths: ----------- despam/ despam/trunk/ despam/trunk/setup.bash despam/trunk/src/ despam/trunk/src/chrome/ despam/trunk/src/chrome/content/ despam/trunk/src/chrome/content/despam.xul despam/trunk/src/chrome.manifest despam/trunk/src/install.rdf Copied: despam/trunk/setup.bash (from rev 1320, python-commons/trunk/setup.bash) =================================================================== --- despam/trunk/setup.bash (rev 0) +++ despam/trunk/setup.bash 2009-04-03 18:12:18 UTC (rev 1336) @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +# Install to e.g. ~/.mozilla-thunderbird/tunr7hm0.dev/extensions/ + +pkg=despam +. simple-setup.bash + +install 'de...@as...'/ src/ Property changes on: despam/trunk/setup.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Added: despam/trunk/src/chrome/content/despam.xul =================================================================== --- despam/trunk/src/chrome/content/despam.xul (rev 0) +++ despam/trunk/src/chrome/content/despam.xul 2009-04-03 18:12:18 UTC (rev 1336) @@ -0,0 +1,7 @@ +<?xml version="1.0"?> +<overlay id="despam" +xmlns="http://www.mozilla.org/keymaster/gatekeeper/there.is.only.xul"> + <statusbar id="status-bar"> + <statusbarpanel id="my-panel" label="Hello, World"/> + </statusbar> +</overlay> Added: despam/trunk/src/chrome.manifest =================================================================== --- despam/trunk/src/chrome.manifest (rev 0) +++ despam/trunk/src/chrome.manifest 2009-04-03 18:12:18 UTC (rev 1336) @@ -0,0 +1,2 @@ +content despam chrome/content/ +overlay chrome://messenger/content/messenger.xul chrome://despam/content/despam.xul Added: despam/trunk/src/install.rdf =================================================================== --- despam/trunk/src/install.rdf (rev 0) +++ despam/trunk/src/install.rdf 2009-04-03 18:12:18 UTC (rev 1336) @@ -0,0 +1,27 @@ +<?xml version="1.0"?> + +<RDF xmlns="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:em="http://www.mozilla.org/2004/em-rdf#"> + + <Description about="urn:mozilla:install-manifest"> + <em:id>de...@as...</em:id> + <em:version>1.0</em:version> + + + <!-- Target Application this extension can install into, + with minimum and maximum supported versions. --> + <em:targetApplication> + <Description> + <em:id>{3550f703-e582-4d05-9a08-453d09bdfdc6}</em:id> + <em:minVersion>2.0</em:minVersion> + <em:maxVersion>2.0.0.*</em:maxVersion> + </Description> + </em:targetApplication> + + <!-- Front End MetaData --> + <em:name>Sample!</em:name> + <em:description>A test extension</em:description> + <em:creator>Your Name Here</em:creator> + <em:homepageURL>http://www.example.net/</em:homepageURL> + </Description> +</RDF> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-28 07:47:01
|
Revision: 1335 http://assorted.svn.sourceforge.net/assorted/?rev=1335&view=rev Author: yangzhang Date: 2009-03-28 07:46:53 +0000 (Sat, 28 Mar 2009) Log Message: ----------- added gcc-perf-compare Added Paths: ----------- sandbox/trunk/src/one-off-scripts/gcc-perf-compare/ sandbox/trunk/src/one-off-scripts/gcc-perf-compare/compare.py Added: sandbox/trunk/src/one-off-scripts/gcc-perf-compare/compare.py =================================================================== --- sandbox/trunk/src/one-off-scripts/gcc-perf-compare/compare.py (rev 0) +++ sandbox/trunk/src/one-off-scripts/gcc-perf-compare/compare.py 2009-03-28 07:46:53 UTC (rev 1335) @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +from __future__ import with_statement +import re, sys +from collections import defaultdict + +pat = re.compile(r'^ (?P<stage>[^:]+): .* \d+\.\d+ \( *(?P<pct>\d+)%\) wall ') + +times = defaultdict(lambda: defaultdict(list)) +for fname in sys.argv[1:]: + with file(fname) as f: + for line in f: + if line.startswith('g++') and '-ftime-report' in line: + src = line.split()[-1] + else: + m = pat.match(line) + if m: times[src][m.group('stage').strip()].append(int(m.group('pct'))) + +def mean(xs): + return float(sum(xs)) / len(xs) + +for src in times: + print src, sum( mean(times[src].get(stage, [0])) + for stage in ['parser', 'name lookup']) Property changes on: sandbox/trunk/src/one-off-scripts/gcc-perf-compare/compare.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-26 07:54:41
|
Revision: 1334 http://assorted.svn.sourceforge.net/assorted/?rev=1334&view=rev Author: yangzhang Date: 2009-03-26 07:54:36 +0000 (Thu, 26 Mar 2009) Log Message: ----------- more notes Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-24 09:26:20 UTC (rev 1333) +++ ydb/trunk/README 2009-03-26 07:54:36 UTC (rev 1334) @@ -46,9 +46,9 @@ [Protocol Buffers]: http://code.google.com/p/protobuf/ [State Threads]: http://state-threads.sourceforge.net/ -Requirements for tools: +Requirements for tools (building, deployment, analysis): -- [Assorted Shell Tools] (bash-commons, mssh) +- [Assorted Shell Tools] (bash-commons, gcc-config, mssh) - [Pylab] 0.98.3 - [Python] 2.5 @@ -763,35 +763,59 @@ - if not, why did we set down this road a year ago? - cut our losses? -- TODO PAPER!!! +Period 3/24/09-3/26/09 -- TODO related work +recovery schemes - [HST+91] Svein-Olaf Hvasshovd, Tore Saeter, Oystein Torboørnsen, Petter - Moe, and Oddvar Risnes. A continously available and highly scalable - transaction server: Design experience from the HypRa project. In - Proceedings of the 4th International Workshop on High Performance - Transaction Systems, September 1991. +- all approaches assume network replication +- on recovery, xfer physical log + - snapshots can be piecewise, don't have to be transactional +- on recovery, xfer transaction log + - snapshots must be transactional +- on recovery, xfer full state + - normally, do nothing + - mark state COW so that it may be incrementally transferred without pausing + the ongoing work + - worst case: 2x size; try to discard copied state immediately + - distribute the xfer among all the standing replicas [IMPLEMENTED] + - catch-up: either: + - have leader stream txns to joiner; this is better if the txns touch a + lot of data, such that it's cheaper to xfer and execute txns than to + xfer state [IMPLEMENTED] + - re-xfer state; this is better if the states don't touch much data, such + that it's cheaper to xfer state than to xfer and execute txns - Ricardo Jimenez-Peris , M. Patino-Martinez , Gustavo Alonso , Bettina - Kemme, Are quorums an alternative for data replication?, ACM Transactions - on Database Systems (TODS), v.28 n.3, p.257-294, September 2003 +another logical view - ~\cite{peris-srds202}R. Jimenez-Peris , M. Patino-Martinez , G. Alonso, - An algorithm for non-intrusive, parallel recovery of replicated data and - its correctness, Proceedings of the 21st IEEE Symposium on Reliable - Distributed Systems (SRDS'02), p.150, October 13-16, 2002 +- normally: + - either: + - phys log: probably more expensive [implemented] + - txn log: cheaper, already serialized + - nothing + - either: + - incremental snapshots: write pages & seqno to disk + - full snapshot: useful for replaying txn log + - nothing +- recovery: + - either: + - xfer (out-of-date) state: small optimizations like COW [simple version + implemented] + - xfer (tail of) phys log; requires all replicas to do phys ops in same + order + - xfer (tail of) txn log; requires full snapshot and no multi-partition +- catch-up: + - either: + - stream txns from leader and replay them [implemented] + - stream phys log from replica and replay them + - iteratively re-xfer out-of-date state +- multi-partition woes + - how to solve multi-partition recovery? + - should we not be looking at txn based methods at all? + - phys logging requires physical ops to be carried out in the same order at + all replicas? - ~\cite{bartoli-dsn01}B. Kemme, A. Bartoli, and O. Babaoglu. Online - Reconfiguration in Replicated Databases Based on Group Communication. In - Proc. of the Int. Conf. on Dependable Systems and Networks (DSN 2001), - Goteborg, Sweden, June 2001. +Period 3/26/09-3/ - ~\cite{amir-thesis} Y. Amir. Replication Using Group Communication Over a - Dynamic Network. PhD thesis, Institute of Computer Science, The Hebrew - University of Jerusalem, Israel. Also available at - http://www.dsn.jhu.edu/~yairamir/Yair_phd.pdf, 1995. - - TODO faster disk logging using separate threads - TODO show aries-write This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-24 09:26:28
|
Revision: 1333 http://assorted.svn.sourceforge.net/assorted/?rev=1333&view=rev Author: yangzhang Date: 2009-03-24 09:26:20 +0000 (Tue, 24 Mar 2009) Log Message: ----------- more clean-up Modified Paths: -------------- ydb/trunk/src/Makefile Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-24 09:11:22 UTC (rev 1332) +++ ydb/trunk/src/Makefile 2009-03-24 09:26:20 UTC (rev 1333) @@ -97,11 +97,13 @@ SVNURL := https://assorted.svn.sourceforge.net/svnroot/assorted/ydb/trunk/src TARGET := ydb +TARGETS := ser p2 serperf CLAMPLZZS:= $(wildcard *.clamp.lzz) -CLAMPS := $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.hh.clamp,$(lzz))) \ - $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.cc.clamp,$(lzz))) -CLAMPOUTS:= $(foreach clamp,$(CLAMPS),$(patsubst %.clamp,%,$(clamp))) +CLAMPS := $(foreach lzz,$(CLAMPLZZS),$(patsubst %.lzz,%,$(lzz))) +LZZOUTS := $(foreach lzz,$(CLAMPS),$(patsubst %.clamp,%.hh.clamp,$(lzz))) \ + $(foreach lzz,$(CLAMPS),$(patsubst %.clamp,%.cc.clamp,$(lzz))) +CLAMPOUTS:= $(foreach clamp,$(LZZOUTS),$(patsubst %.clamp,%,$(clamp))) PBS := $(wildcard *.proto) PBOUTS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) \ @@ -137,12 +139,15 @@ %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< -%.cc.clamp %.hh.clamp: %.clamp.lzz - ln -sf $< $(basename $<) - rm -f $(basename $(basename $<)).{hh,cc}.clamp - lzz -hx hh.clamp -sx cc.clamp -hd -sd $(basename $<) - chmod -w $(basename $(basename $<)).{hh,cc}.clamp +%.clamp: %.clamp.lzz + ln -sf $< $@ + chmod -w $@ +%.cc.clamp %.hh.clamp: %.clamp + rm -f $(basename $<).{hh,cc}.clamp + lzz -hx hh.clamp -sx cc.clamp -hd -sd $< + chmod -w $(basename $<).{hh,cc}.clamp + %.cc: %.cc.clamp rm -f $@ mkdir -p .clamp/ @@ -188,13 +193,10 @@ -Wno-unused-parameter clean: - rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(CLAMPLZZS) .clamp/ *.d + rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(TARGETS) $(CLAMPS) $(LZZOUTS) pch.h pch.h.gch .clamp/ *.d tpcc/*.d deps.mk -distclean: clean - rm -f pch.h pch.h.gch +.SECONDARY: $(GENOUTS) $(OBJS) $(CLAMPS) $(LZZOUTS) pch.h pch.h.gch -.SECONDARY: $(GENOUTS) $(OBJS) pch.h.gch - serperf: ydb.pb.o ser: ydb.pb.o This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-24 09:11:34
|
Revision: 1332 http://assorted.svn.sourceforge.net/assorted/?rev=1332&view=rev Author: yangzhang Date: 2009-03-24 09:11:22 +0000 (Tue, 24 Mar 2009) Log Message: ----------- .lzz.clamp -> .clamp.lzz; cleaned up makefile Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/mkdeps.py Added Paths: ----------- ydb/trunk/src/leader.clamp.lzz ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/run.clamp.lzz ydb/trunk/src/stxn.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz ydb/trunk/src/util.clamp.lzz ydb/trunk/src/ydb.clamp.lzz Removed Paths: ------------- ydb/trunk/src/leader.lzz.clamp ydb/trunk/src/main.lzz.clamp ydb/trunk/src/rectpcc.lzz.clamp ydb/trunk/src/replica.lzz.clamp ydb/trunk/src/run.lzz.clamp ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/util.lzz.clamp ydb/trunk/src/ydb.lzz.clamp Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/Makefile 2009-03-24 09:11:22 UTC (rev 1332) @@ -3,10 +3,11 @@ # SHELL := bash -WTF := wtf -ORIGCXX := $(CXX) CCACHE := ccache export CCACHE_PREFIX := distcc +ifneq ($(WTF),) +WTF := wtf +endif CXX := $(WTF) $(CCACHE) $(CXX) -pipe TARGET_ARCH := $(shell [[ "$$(uname -m)" == x86_64 ]] && echo -m64 || echo -m32 ) \ @@ -95,35 +96,28 @@ # SVNURL := https://assorted.svn.sourceforge.net/svnroot/assorted/ydb/trunk/src - TARGET := ydb -CLAMPS := $(wildcard *.lzz.clamp) -CLAMPLZZS:= $(patsubst %.clamp,%,$(CLAMPS)) -PURELZZS := $(foreach lzz,$(wildcard *.lzz),$(if $(wildcard $(lzz).clamp),,$(lzz))) -LZZS := $(CLAMPLZZS) $(PURELZZS) -LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) -LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) -LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) +CLAMPLZZS:= $(wildcard *.clamp.lzz) +CLAMPS := $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.hh.clamp,$(lzz))) \ + $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.cc.clamp,$(lzz))) +CLAMPOUTS:= $(foreach clamp,$(CLAMPS),$(patsubst %.clamp,%,$(clamp))) PBS := $(wildcard *.proto) -PBHDRS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) -PBSRCS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) -PBOBJS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.o,$(pb))) +PBOUTS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) \ + $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) COGS := $(wildcard tpcc/*.cog) -COGSRCS := $(foreach cog,$(COGS),$(patsubst %.cog,%,$(cog))) +COGOUTS := $(foreach cog,$(COGS),$(patsubst %.cog,%,$(cog))) -GENHDRS := $(LZZHDRS) $(PBHDRS) $(COGHDRS) -GENSRCS := $(LZZSRCS) $(PBSRCS) $(COGSRCS) GENOBJS := $(LZZOBJS) $(PBOBJS) $(COGOBJS) +GENOUTS := $(CLAMPOUTS) $(PBOUTS) $(COGOUTS) -TPCC_OBJS:= clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables -TPCC_OBJS:= $(foreach i,$(TPCC_OBJS),tpcc/$(i).o) +TPCCOBJS := clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables +TPCCOBJS := $(foreach i,$(TPCCOBJS),tpcc/$(i).o) -HDRS := $(GENHDRS) -SRCS := $(GENSRCS) -OBJS := $(GENOBJS) $(TPCC_OBJS) +SRCS := $(GENOUTS) msg.h +OBJS := $(patsubst %.cc,%.o,$(filter %.cc,$(SRCS))) $(TPCCOBJS) # # Rules @@ -131,7 +125,7 @@ all: $(TARGET) -doc: $(SRCS) $(HDRS) +doc: $(SRCS) doxygen %.pb.o: WARNINGS = -Wall -Werror @@ -143,24 +137,11 @@ %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< -# ORIG -# -#%.cc %.hh: %.lzz -# lzz -hx hh -sx cc -hl -sl -hd -sd $< -# -#%.lzz: %.lzz.clamp -# rm -f $@ -# mkdir -p .clamp/ -# clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ -# sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ -# sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ -# chmod -w $@ - -%.cc.clamp %.hh.clamp: %.lzz.clamp +%.cc.clamp %.hh.clamp: %.clamp.lzz ln -sf $< $(basename $<) rm -f $(basename $(basename $<)).{hh,cc}.clamp lzz -hx hh.clamp -sx cc.clamp -hd -sd $(basename $<) - chmod -w $(basename $(basename $<)).{hh.clamp,cc.clamp} + chmod -w $(basename $(basename $<)).{hh,cc}.clamp %.cc: %.cc.clamp rm -f $@ @@ -207,12 +188,12 @@ -Wno-unused-parameter clean: - rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) *.d *.hh.clamp *.cc.clamp + rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(CLAMPLZZS) .clamp/ *.d distclean: clean rm -f pch.h pch.h.gch -.SECONDARY: $(GENSRCS) $(GENHDRS) $(OBJS) main.lzz pch.h.gch +.SECONDARY: $(GENOUTS) $(OBJS) pch.h.gch serperf: ydb.pb.o ser: ydb.pb.o Copied: ydb/trunk/src/leader.clamp.lzz (from rev 1331, ydb/trunk/src/leader.lzz.clamp) =================================================================== --- ydb/trunk/src/leader.clamp.lzz (rev 0) +++ ydb/trunk/src/leader.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,137 @@ +#hdr +#include <stdint.h> +#end + +#src +#include "unsetprefs.h" +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#include "setprefs.h" +#end + +/** + * Run the leader. + */ +void +run_leader(int minreps, uint16_t leader_port) +{ + cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; + + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(leader_port); + st_closing close_listener(listener); + vector<replica_info> replicas; + st_closing_all_infos close_replicas(replicas); + cout << "waiting for at least " << minreps << " replicas to join" << endl; + Join join; + for (int i = 0; i < minreps; ++i) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + readmsg(fd, join); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } + cout << "got all " << minreps << " replicas" << endl; + + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + init.set_multirecover(multirecover); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } + + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } + + // Start dispatching queries. + st_bool accept_joiner; + int seqno = 0; + st_channel<replica_info> newreps; + st_channel<st_netfd_t> delreps; + foreach (const replica_info &r, replicas) newreps.push(r); + function<void()> f; + if (do_tpcc) + f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); + else + f = bind(issue_txns, ref(newreps), ref(seqno), ref(accept_joiner)); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); + + finally fin(bind(summarize, "LEADER", ref(seqno))); + + try { + // Start handling responses. + st_thread_group handlers; + int rid = 0; + foreach (replica_info r, replicas) { + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), ref(delreps), true); + else + fn = bind(handle_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + try { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); + } catch (std::exception &ex) { + string s(ex.what()); + if (s.find("Interrupted system call") == s.npos) + throw; + else + throw break_exception(); + } + Join join; + readmsg(joiner, join); + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + init.set_yourhost(replicas.back().host()); + sendmsg(joiner, init); + recover_signals.push(current_time_millis()); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), ref(delreps), false); + else + handle_responses_joiner_fn = + bind(handle_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + newreps.push(replicas.back()); + handlers.insert(my_spawn(handle_responses_joiner_fn, + "handle_responses_joiner")); + } catch (break_exception &ex) { + } catch (std::exception &ex) { + // TODO: maybe there's a cleaner way to do this final step before waiting with the join + cerr_thread_ex(ex) << endl; + throw; + } +} Property changes on: ydb/trunk/src/leader.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/leader.lzz.clamp =================================================================== --- ydb/trunk/src/leader.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/leader.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,137 +0,0 @@ -#hdr -#include <stdint.h> -#end - -#src -#include "unsetprefs.h" -#include <commons/st/sockets.h> -#include <commons/st/threads.h> -#include "run.hh" -#include "stxn.hh" -#include "tpcc.hh" -#include "setprefs.h" -#end - -/** - * Run the leader. - */ -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - st_multichannel<long long> recover_signals; - - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - cout << "waiting for at least " << minreps << " replicas to join" << endl; - Join join; - for (int i = 0; i < minreps; ++i) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - readmsg(fd, join); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - cout << "got all " << minreps << " replicas" << endl; - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - init.set_multirecover(multirecover); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - st_bool accept_joiner; - int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; - foreach (const replica_info &r, replicas) newreps.push(r); - function<void()> f; - if (do_tpcc) - f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); - else - f = bind(issue_txns, ref(newreps), ref(seqno), ref(accept_joiner)); - st_joining join_issue_txns(my_spawn(f, "issue_txns")); - - finally fin(bind(summarize, "LEADER", ref(seqno))); - - try { - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - function<void()> fn; - if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); - else - fn = bind(handle_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); - handlers.insert(my_spawn(fn, "handle_responses")); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - try { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); - } catch (std::exception &ex) { - string s(ex.what()); - if (s.find("Interrupted system call") == s.npos) - throw; - else - throw break_exception(); - } - Join join; - readmsg(joiner, join); - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - init.set_yourhost(replicas.back().host()); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - function<void()> handle_responses_joiner_fn; - if (do_tpcc) - handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); - else - handle_responses_joiner_fn = - bind(handle_responses, joiner, ref(seqno), rid++, - ref(recover_signals), false); - newreps.push(replicas.back()); - handlers.insert(my_spawn(handle_responses_joiner_fn, - "handle_responses_joiner")); - } catch (break_exception &ex) { - } catch (std::exception &ex) { - // TODO: maybe there's a cleaner way to do this final step before waiting with the join - cerr_thread_ex(ex) << endl; - throw; - } -} Copied: ydb/trunk/src/main.clamp.lzz (from rev 1331, ydb/trunk/src/main.lzz.clamp) =================================================================== --- ydb/trunk/src/main.clamp.lzz (rev 0) +++ ydb/trunk/src/main.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,313 @@ +#hdr +#include "unsetprefs.h" +#include <boost/tuple/tuple.hpp> +#include <commons/st/intr.h> +#include <commons/st/sync.h> +#include <commons/st/channel.h> +#include <fstream> // ofstream +#include <vector> +#include "util.hh" +#include "setprefs.h" + +namespace boost { namespace archive { class binary_oarchive; } } + +using namespace boost; +using namespace boost::archive; +using namespace commons; +using namespace std; +using namespace ydb; +using namespace ydb::msg; +#end + +#src +#include "unsetprefs.h" +#include <boost/foreach.hpp> +#include <boost/archive/binary_oarchive.hpp> +#include <commons/assert.h> +#include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/sockets.h> +#include <iostream> +#include <unistd.h> // pipe, write, sync +#include "tpcc/tpcctables.h" +#include "msg.h" +#include "setprefs.h" +#end + +typedef tuple<sized_array<char>, char*, char*> chunk; + +typedef commons::array<char> recovery_t; + + +// Configuration. +st_utime_t timeout; +int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, + stop_on_seqno, batch_size, handle_responses_display, fail_seqno, + catch_up_display, issue_display, nwarehouses, + process_display; +size_t accept_joiner_size, read_buf_size; +bool yield_during_build_up, yield_during_catch_up, dump, show_updates, + count_updates, stop_on_recovery, general_txns, + disk, debug_memory, use_pwal, use_twal, + use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, + suppress_txn_msgs, force_ser, fake_exec, ship_log; +long long timelim, read_thresh; + +// Control. +st_intr_bool stop_hub, kill_hub; +st_bool do_pause; +// On leader, signifies that a node is in fail mode. On replica, signifies that a node is in fail mode/recovering from the twal. +st_bool failed; +// The seqno on which we should resume. +st_channel<int> resume; +bool stopped_issuing; + +// Statistics. +int updates; + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + /** port is the replica's listen port, not the port bound to the fd socket. */ + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } + /** The port on which the replica connected to us. */ + uint16_t local_port() const { return sockaddr().sin_port; } + uint32_t host() const { return sockaddr().sin_addr.s_addr; } + sockaddr_in sockaddr() const { sockaddr_in sa; sockaddr(sa); return sa; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all_infos +{ + public: + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { + cout << "closing all conns to replicas (replica_infos)" << endl; + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +#if 0 +st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; + +/** + * The worker that performs the actual broadcasting. + */ +void +bcaster() +{ + while (!kill_hub) { + pair<st_netfd_t, shared_ptr<string> > pr; + { + st_intr intr(kill_hub); + pr = msgs.take(); + } + st_netfd_t dst = pr.first; + shared_ptr<string> &p = pr.second; + if (p.get() == nullptr) break; + string &s = *p.get(); + + if (!fake_bcast) + st_timed_write(dst, s.data(), s.size()); + } +} + +/** + * Asynchronous version of the broadcaster. + */ +void +bcastbuf_async(const vector<st_netfd_t> &dsts, const ser_t &msg) +{ + shared_ptr<string> p(new string); + ser(*p.get(), msg); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); +} +#endif + +enum { op_del, op_write, op_commit }; + +/** + * ARIES write-ahead log. No undo logging necessary (no steal). + */ +class wal +{ +public: + wal(const string &fname) : + of_(fname.c_str()), + ar_(new binary_oarchive(of())) + {} + ~wal() { delete ar_; } + template <typename T> + void log(const T &msg) { ser(of(), msg); } + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } + void logbuf(const void *buf, size_t len) { + of().write(reinterpret_cast<const char*>(buf), len); + } + void logdel(int key) { + int op = op_del; // TODO: is this really necessary? + ar() & op & key; + } + void logwrite(int key, int val) { + int op = op_write; + ar() & op & key & val; + } + void logcommit() { + int op = op_commit; + ar() & op; + } + void flush() { of().flush(); } +private: + ofstream of_; + //unique_ptr<binary_oarchive> ar_; + binary_oarchive *ar_; + ofstream &of() { return of_; } + binary_oarchive &ar() { return *ar_; }; +}; + +// TODO? +class txn_wal { +public: + txn_wal(const string &fname) : of(fname.c_str()) {} + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } + void logbuf(const void *buf, size_t len) { + of.write(reinterpret_cast<const char*>(buf), len); + } + void flush() { of.flush(); } +private: + ofstream of; +}; + +// Globals +wal *g_wal; +txn_wal *g_twal; +//tpcc_wal *g_tpcc_wal; + +struct recreq { + int start_seqno, end_seqno; +}; + +/** + * Help the recovering node. + * + * \param[in] listener The connection on which we're listening for connections + * from recovering joiners. + * + * \param[in] map The database state. + * + * \param[in] seqno The sequence number. Always starts at 0. + * + * \param[in] send_states Channel of snapshots of the database state to receive + * from process_txns. + */ +void +recover_joiner(st_netfd_t listener, + st_channel<recovery_t> &send_states) +{ + cout << "waiting for joiner" << endl; + recovery_t recovery; + st_netfd_t joiner; + if (ship_log) { + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + st_closing closing(joiner); + recreq r; + st_read(joiner, r); + commons::array<char> wbuf(buf_size); + writer writer(lambda(const void *buf, size_t len) { + st_write(__ref(joiner), buf, len); + }, wbuf.get(), wbuf.size()); + cout << "got joiner's connection, sending log from seqnos " + << r.start_seqno << " to " << r.end_seqno << endl; + + g_twal->flush(); + sync(); + ifstream inf("twal"); + long long start_time = current_time_millis(); + for (int seqno = 0; seqno < r.start_seqno; ++seqno) { + ASSERT(inf.good()); + inf.seekg(readlen(inf), ios::cur); + } + long long mid_time = current_time_millis(); + streamoff mid_off = inf.tellg(); + showdatarate("scanned log", mid_off, mid_time - start_time); + for (int seqno = r.start_seqno; seqno < r.end_seqno; ++seqno) { + ASSERT(inf.good()); + uint32_t len = readlen(inf); + inf.read(writer.reserve(len), len); + writer.mark(); + cout << seqno << ' ' << len << endl; + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + writer.mark_and_flush(); + long long end_time = current_time_millis(); + streamoff end_off = inf.tellg(); + showdatarate("shipped log", end_off - mid_off, end_time - mid_time); + } else { + { + st_intr intr(stop_hub); + // Wait for the snapshot. + recovery = send_states.take(); + if (recovery == nullptr) { + return; + } + // Wait for the new joiner. + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + + st_closing closing(joiner); + cout << "got joiner's connection, sending recovery of " + << recovery.size() << " bytes" << endl; + long long start_time = current_time_millis(); + st_write(joiner, recovery.get(), recovery.size()); + long long diff = current_time_millis() - start_time; + showdatarate("sent recovery", recovery.size(), diff); + } +} + +void +threadfunc() +{ + while (true) { + sleep(3); + cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; + } +} Deleted: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/main.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,313 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <boost/tuple/tuple.hpp> -#include <commons/st/intr.h> -#include <commons/st/sync.h> -#include <commons/st/channel.h> -#include <fstream> // ofstream -#include <vector> -#include "util.hh" -#include "setprefs.h" - -namespace boost { namespace archive { class binary_oarchive; } } - -using namespace boost; -using namespace boost::archive; -using namespace commons; -using namespace std; -using namespace ydb; -using namespace ydb::msg; -#end - -#src -#include "unsetprefs.h" -#include <boost/foreach.hpp> -#include <boost/archive/binary_oarchive.hpp> -#include <commons/assert.h> -#include <commons/time.h> -#include <commons/st/io.h> -#include <commons/st/sockets.h> -#include <iostream> -#include <unistd.h> // pipe, write, sync -#include "tpcc/tpcctables.h" -#include "msg.h" -#include "setprefs.h" -#end - -typedef tuple<sized_array<char>, char*, char*> chunk; - -typedef commons::array<char> recovery_t; - - -// Configuration. -st_utime_t timeout; -int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size, handle_responses_display, fail_seqno, - catch_up_display, issue_display, nwarehouses, - process_display; -size_t accept_joiner_size, read_buf_size; -bool yield_during_build_up, yield_during_catch_up, dump, show_updates, - count_updates, stop_on_recovery, general_txns, - disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, - suppress_txn_msgs, force_ser, fake_exec, ship_log; -long long timelim, read_thresh; - -// Control. -st_intr_bool stop_hub, kill_hub; -st_bool do_pause; -// On leader, signifies that a node is in fail mode. On replica, signifies that a node is in fail mode/recovering from the twal. -st_bool failed; -// The seqno on which we should resume. -st_channel<int> resume; -bool stopped_issuing; - -// Statistics. -int updates; - -/** - * Used by the leader to bookkeep information about replicas. - */ -class replica_info -{ - public: - /** port is the replica's listen port, not the port bound to the fd socket. */ - replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} - st_netfd_t fd() const { return fd_; } - /** The port on which the replica is listening. */ - uint16_t port() const { return port_; } - /** The port on which the replica connected to us. */ - uint16_t local_port() const { return sockaddr().sin_port; } - uint32_t host() const { return sockaddr().sin_addr.s_addr; } - sockaddr_in sockaddr() const { sockaddr_in sa; sockaddr(sa); return sa; } - void sockaddr(sockaddr_in &sa) const { - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd_), - reinterpret_cast<struct sockaddr*>(&sa), - &salen)); - } - private: - st_netfd_t fd_; - uint16_t port_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all_infos -{ - public: - st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all_infos() { - cout << "closing all conns to replicas (replica_infos)" << endl; - foreach (replica_info r, rs_) - check0x(st_netfd_close(r.fd())); - } - private: - const vector<replica_info> &rs_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all -{ - public: - st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} - ~st_closing_all() { - foreach (st_netfd_t r, rs_) - check0x(st_netfd_close(r)); - } - private: - const vector<st_netfd_t> &rs_; -}; - -#if 0 -st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; - -/** - * The worker that performs the actual broadcasting. - */ -void -bcaster() -{ - while (!kill_hub) { - pair<st_netfd_t, shared_ptr<string> > pr; - { - st_intr intr(kill_hub); - pr = msgs.take(); - } - st_netfd_t dst = pr.first; - shared_ptr<string> &p = pr.second; - if (p.get() == nullptr) break; - string &s = *p.get(); - - if (!fake_bcast) - st_timed_write(dst, s.data(), s.size()); - } -} - -/** - * Asynchronous version of the broadcaster. - */ -void -bcastbuf_async(const vector<st_netfd_t> &dsts, const ser_t &msg) -{ - shared_ptr<string> p(new string); - ser(*p.get(), msg); - foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); -} -#endif - -enum { op_del, op_write, op_commit }; - -/** - * ARIES write-ahead log. No undo logging necessary (no steal). - */ -class wal -{ -public: - wal(const string &fname) : - of_(fname.c_str()), - ar_(new binary_oarchive(of())) - {} - ~wal() { delete ar_; } - template <typename T> - void log(const T &msg) { ser(of(), msg); } - void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } - void logbuf(const void *buf, size_t len) { - of().write(reinterpret_cast<const char*>(buf), len); - } - void logdel(int key) { - int op = op_del; // TODO: is this really necessary? - ar() & op & key; - } - void logwrite(int key, int val) { - int op = op_write; - ar() & op & key & val; - } - void logcommit() { - int op = op_commit; - ar() & op; - } - void flush() { of().flush(); } -private: - ofstream of_; - //unique_ptr<binary_oarchive> ar_; - binary_oarchive *ar_; - ofstream &of() { return of_; } - binary_oarchive &ar() { return *ar_; }; -}; - -// TODO? -class txn_wal { -public: - txn_wal(const string &fname) : of(fname.c_str()) {} - void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } - void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); - } - void flush() { of.flush(); } -private: - ofstream of; -}; - -// Globals -wal *g_wal; -txn_wal *g_twal; -//tpcc_wal *g_tpcc_wal; - -struct recreq { - int start_seqno, end_seqno; -}; - -/** - * Help the recovering node. - * - * \param[in] listener The connection on which we're listening for connections - * from recovering joiners. - * - * \param[in] map The database state. - * - * \param[in] seqno The sequence number. Always starts at 0. - * - * \param[in] send_states Channel of snapshots of the database state to receive - * from process_txns. - */ -void -recover_joiner(st_netfd_t listener, - st_channel<recovery_t> &send_states) -{ - cout << "waiting for joiner" << endl; - recovery_t recovery; - st_netfd_t joiner; - if (ship_log) { - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - st_closing closing(joiner); - recreq r; - st_read(joiner, r); - commons::array<char> wbuf(buf_size); - writer writer(lambda(const void *buf, size_t len) { - st_write(__ref(joiner), buf, len); - }, wbuf.get(), wbuf.size()); - cout << "got joiner's connection, sending log from seqnos " - << r.start_seqno << " to " << r.end_seqno << endl; - - g_twal->flush(); - sync(); - ifstream inf("twal"); - long long start_time = current_time_millis(); - for (int seqno = 0; seqno < r.start_seqno; ++seqno) { - ASSERT(inf.good()); - inf.seekg(readlen(inf), ios::cur); - } - long long mid_time = current_time_millis(); - streamoff mid_off = inf.tellg(); - showdatarate("scanned log", mid_off, mid_time - start_time); - for (int seqno = r.start_seqno; seqno < r.end_seqno; ++seqno) { - ASSERT(inf.good()); - uint32_t len = readlen(inf); - inf.read(writer.reserve(len), len); - writer.mark(); - cout << seqno << ' ' << len << endl; - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - writer.mark_and_flush(); - long long end_time = current_time_millis(); - streamoff end_off = inf.tellg(); - showdatarate("shipped log", end_off - mid_off, end_time - mid_time); - } else { - { - st_intr intr(stop_hub); - // Wait for the snapshot. - recovery = send_states.take(); - if (recovery == nullptr) { - return; - } - // Wait for the new joiner. - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - - st_closing closing(joiner); - cout << "got joiner's connection, sending recovery of " - << recovery.size() << " bytes" << endl; - long long start_time = current_time_millis(); - st_write(joiner, recovery.get(), recovery.size()); - long long diff = current_time_millis() - start_time; - showdatarate("sent recovery", recovery.size(), diff); - } -} - -void -threadfunc() -{ - while (true) { - sleep(3); - cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; - } -} Modified: ydb/trunk/src/mkdeps.py =================================================================== --- ydb/trunk/src/mkdeps.py 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/mkdeps.py 2009-03-24 09:11:22 UTC (rev 1332) @@ -24,7 +24,7 @@ @memoized def src(i): if i.endswith('.hh'): - clamp = path(i[:-3] + '.lzz.clamp') + clamp = path(i[:-3] + '.clamp.lzz') lzz = path(i[:-2] + '.lzz') if clamp.isfile(): return clamp if lzz.isfile(): return lzz @@ -39,8 +39,8 @@ for dep in deps(src(hdr)): yield dep -for i in pwd.glob('*.lzz.clamp'): - print sub(r'\.lzz\.clamp', '.o', i), ':', sub(r'\.lzz\.clamp', '.hh', i), ' '.join(deps(i)) +for i in pwd.glob('*.clamp.lzz'): + print sub(r'\.clamp\.lzz', '.o', i), ':', sub(r'\.clamp\.lzz', '.hh', i), ' '.join(deps(i)) for i in pwd.glob('*.d'): with file(i) as f: @@ -52,4 +52,4 @@ elif '_cc_lambda_' in word: print sub(r'(\.clamp/(.+)_cc_lambda_.+\.clamp_h)', r'\1: \2.cc.clamp', word) else: - print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) + print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.clamp.lzz', word) Copied: ydb/trunk/src/rectpcc.clamp.lzz (from rev 1331, ydb/trunk/src/rectpcc.lzz.clamp) =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz (rev 0) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,175 @@ +#hdr +#include "tpcc.hh" +namespace ydb { namespace pb { class Init; } } +using namespace ydb::pb; +#end + +#src +#include "unsetprefs.h" +#include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/threads.h> +#include <commons/st/reader.h> +#include "tpcc/tpcctables.h" +#include "ydb.pb.h" +#include "setprefs.h" +#end + +void +rec_tpcc(int &seqno, int mypos, const Init &init, + const vector<st_netfd_t> &replicas, recovery_t &orig, + st_channel<chunk> &backlog) +{ + commons::array<char> recarr(0); + + function<void()> rec_twal_fn = lambda() { + int &seqno = __ref(seqno); + cout << "recovering from twal" << endl; + long long start_time = current_time_millis(); + g_twal->flush(); + sync(); + ifstream inf("twal"); + TpccReq req; + while (inf.peek() != ifstream::traits_type::eof()) { + ASSERT(inf.good()); + readmsg(inf, req); + process_tpcc(req, seqno, nullptr); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + showdatarate("recovered from twal", inf.tellg(), + current_time_millis() - start_time); + cout << "now at seqno " << seqno << endl; + }; + + function<void()> recv_log_fn = lambda() { + st_netfd_t src = __ref(replicas[0]); + int &seqno = __ref(seqno); + ASSERT(fail_seqno == seqno); + recreq r = { fail_seqno + 1, resume.take() }; + st_write(src, r); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + function<void(anchored_stream_reader &reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + shift_reader(reader); + }; + anchored_stream_reader reader(st_read_fn(src), + st_read_fully_fn(src), + overflow_fn, rbuf.get(), rbuf.size()); + TpccReq req; + while (seqno < r.end_seqno) { + { st_intr intr(stop_hub); readmsg(reader, req); } + process_tpcc(req, seqno, nullptr); + reader.set_anchor(); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + }; + + if (rec_twal) { + failed.waitset(); + g_tables.reset(new TPCCTables); + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); + commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), + orig.size() - sizeof(tpcc_recovery_header)); + g_tables->deser(mypos, init.node_size(), hdr, body); + body.release(); + rec_twal_fn(); + failed.reset(); + recv_log_fn(); + } + +#if 0 + st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); + st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); + + st_join(rec_twal_thread); + st_join(recv_log_thread); +#endif + + if (rec_pwal) { + // Recover from phy log. + } else if (rec_twal) { + // Recover from txn log. + } else { + + g_tables.reset(new TPCCTables); + + // + // Build-up + // + + if (ship_log) { + } else { + // XXX indent + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); +} Property changes on: ydb/trunk/src/rectpcc.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/rectpcc.lzz.clamp =================================================================== --- ydb/trunk/src/rectpcc.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/rectpcc.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,175 +0,0 @@ -#hdr -#include "tpcc.hh" -namespace ydb { namespace pb { class Init; } } -using namespace ydb::pb; -#end - -#src -#include "unsetprefs.h" -#include <commons/time.h> -#include <commons/st/io.h> -#include <commons/st/threads.h> -#include <commons/st/reader.h> -#include "tpcc/tpcctables.h" -#include "ydb.pb.h" -#include "setprefs.h" -#end - -void -rec_tpcc(int &seqno, int mypos, const Init &init, - const vector<st_netfd_t> &replicas, recovery_t &orig, - st_channel<chunk> &backlog) -{ - commons::array<char> recarr(0); - - function<void()> rec_twal_fn = lambda() { - int &seqno = __ref(seqno); - cout << "recovering from twal" << endl; - long long start_time = current_time_millis(); - g_twal->flush(); - sync(); - ifstream inf("twal"); - TpccReq req; - while (inf.peek() != ifstream::traits_type::eof()) { - ASSERT(inf.good()); - readmsg(inf, req); - process_tpcc(req, seqno, nullptr); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - showdatarate("recovered from twal", inf.tellg(), - current_time_millis() - start_time); - cout << "now at seqno " << seqno << endl; - }; - - function<void()> recv_log_fn = lambda() { - st_netfd_t src = __ref(replicas[0]); - int &seqno = __ref(seqno); - ASSERT(fail_seqno == seqno); - recreq r = { fail_seqno + 1, resume.take() }; - st_write(src, r); - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - function<void(anchored_stream_reader &reader)> overflow_fn = - lambda(anchored_stream_reader &reader) { - shift_reader(reader); - }; - anchored_stream_reader reader(st_read_fn(src), - st_read_fully_fn(src), - overflow_fn, rbuf.get(), rbuf.size()); - TpccReq req; - while (seqno < r.end_seqno) { - { st_intr intr(stop_hub); readmsg(reader, req); } - process_tpcc(req, seqno, nullptr); - reader.set_anchor(); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - }; - - if (rec_twal) { - failed.waitset(); - g_tables.reset(new TPCCTables); - tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); - commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), - orig.size() - sizeof(tpcc_recovery_header)); - g_tables->deser(mypos, init.node_size(), hdr, body); - body.release(); - rec_twal_fn(); - failed.reset(); - recv_log_fn(); - } - -#if 0 - st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); - st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); - - st_join(rec_twal_thread); - st_join(recv_log_thread); -#endif - - if (rec_pwal) { - // Recover from phy log. - } else if (rec_twal) { - // Recover from txn log. - } else { - - g_tables.reset(new TPCCTables); - - // - // Build-up - // - - if (ship_log) { - } else { - // XXX indent - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - tpcc_recovery_header hdr; - checkeqnneg(st_read_fully(__ref(replicas[i]), - &hdr, sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof hdr)); - check(hdr.seqno >= 0); - - cout << "receiving recovery of " << hdr.len << " bytes" << endl; - - long long start_time = current_time_millis(); - __ref(recarr).reset(new char[hdr.len], hdr.len); - checkeqnneg(st_read_fully(__ref(replicas[i]), - __ref(recarr).get(), hdr.len, - ST_UTIME_NO_TIMEOUT), - ssize_t(hdr.len)); - - long long before_deser = current_time_millis(); - showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); - - if (__ref(seqno) == -1) - __ref(seqno) = hdr.seqno; - else - checkeq(__ref(seqno), hdr.seqno); - - g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); - - long long end_time = current_time_millis(); - showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); - cout << "receive & deserialize took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; - cout << "after deserialize, db state is now at seqno " - << hdr.seqno << ":" << endl; - g_tables->show(); - - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - TpccReq req; - while (!backlog.empty()) { - chunk chunk = backlog.take(); - cout << "took from backlog, now has " << backlog.queue().size() - << " chunks" << endl; - sized_array<char> &buf = chunk.get<0>(); - char *begin = chunk.get<1>(), *end = chunk.get<2>(); - ASSERT(buf.get() <= begin && begin < buf.end()); - ASSERT(buf.get() < end && end < buf.end()); - process_buf(begin, end, req, seqno); - } - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); -} Copied: ydb/trunk/src/replica.clamp.lzz (from rev 1331, ydb/trunk/src/replica.lzz.clamp) =================================================================== --- ydb/trunk/src/replica.clamp.lzz (rev 0) +++ ydb/trunk/src/replica.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,362 @@ +#hdr +#include "unsetprefs.h" +#include <string> +#end + +#src +#include "unsetprefs.h" +#include <boost/archive/binary_iarchive.hpp> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" +#include "rectpcc.hh" +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#end + +/** + * Run a replica. + */ +void +run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) +{ + if (disk) { + // Disk IO threads. + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); + } + } + + // Initialize database state. + int seqno = -1; + mii &map = g_map; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + tables->show(); + } + recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); + + finally f(bind(summarize, "REPLICA", ref(seqno))); + st_channel<recovery_t> send_states; + + cout << "starting as replica on port " << listen_port << endl; + + // Listen for connections from other replicas. + st_netfd_t listener = st_tcp_listen(listen_port); + + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + { + st_intr intr(stop_hub); + readmsg(leader, init); + } + uint32_t listen_host = init.yourhost(); + multirecover = init.multirecover(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); + int mypos = -1; + for (int i = 0; i < init.node_size(); ++i) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (is_self) mypos = i; + if (!is_self && (init.txnseqno() > 0 || rec_twal)) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); + } + } + + // Initialize physical or txn log. + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Process txns. + st_channel<chunk> backlog; + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns, leader, ref(map), ref(seqno), + ref(send_states), ref(backlog), init.txnseqno(), mypos, + init.node_size()); + st_joining join_proc(my_spawn(process_fn, "process_txns")); + st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? + my_spawn(bind(recover_joiner, listener, ref(send_states)), + "recover_joiner") : + nullptr); + + try { + // If there's anything to recover. + if (init.txnseqno() > 0 || fail_seqno > 0) { + if (do_tpcc) { + + rec_tpcc(seqno, mypos, init, replicas, orig, backlog); + + } else { + + // + // Simple txns + // + + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; +#if 0 + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); + } + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; +#endif + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn(map, txn, seqno); + if (fake_exec && !use_pb) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); + } + } + g_caught_up = true; +#if 0 + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } + } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } + } + } catch (std::exception &ex) { + cerr_thread_ex(ex) << endl; + throw; + } + + stop_hub.insert(st_thread_self()); +} Property changes on: ydb/trunk/src/replica.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/replica.lzz.clamp =================================================================== --- ydb/trunk/src/replica.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/replica.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,362 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <string> -#end - -#src -#include "unsetprefs.h" -#include <boost/archive/binary_iarchive.hpp> -#include <commons/st/sockets.h> -#include <commons/st/threads.h> -#include "tpcc/clock.h" -#include "tpcc/randomgenerator.h" -#include "tpcc/tpccclient.h" -#include "tpcc/tpccgenerator.h" -#include "tpcc/tpcctables.h" -#include "rectpcc.hh" -#include "run.hh" -#include "stxn.hh" -#include "tpcc.hh" -#end - -/** - * Run a replica. - */ -void -run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - if (disk) { - // Disk IO threads. - ... [truncated message content] |
From: <yan...@us...> - 2009-03-24 07:54:52
|
Revision: 1331 http://assorted.svn.sourceforge.net/assorted/?rev=1331&view=rev Author: yangzhang Date: 2009-03-24 07:54:47 +0000 (Tue, 24 Mar 2009) Log Message: ----------- major physical refactoring; substantially reduced critical-path build times by breaking down files into smaller files and introducing decoupling, particularly with templated entities Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/ydb.lzz.clamp Added Paths: ----------- ydb/trunk/src/leader.lzz.clamp ydb/trunk/src/rectpcc.lzz.clamp ydb/trunk/src/replica.lzz.clamp ydb/trunk/src/run.lzz.clamp Added: ydb/trunk/src/leader.lzz.clamp =================================================================== --- ydb/trunk/src/leader.lzz.clamp (rev 0) +++ ydb/trunk/src/leader.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -0,0 +1,137 @@ +#hdr +#include <stdint.h> +#end + +#src +#include "unsetprefs.h" +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#include "setprefs.h" +#end + +/** + * Run the leader. + */ +void +run_leader(int minreps, uint16_t leader_port) +{ + cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; + + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(leader_port); + st_closing close_listener(listener); + vector<replica_info> replicas; + st_closing_all_infos close_replicas(replicas); + cout << "waiting for at least " << minreps << " replicas to join" << endl; + Join join; + for (int i = 0; i < minreps; ++i) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + readmsg(fd, join); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } + cout << "got all " << minreps << " replicas" << endl; + + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + init.set_multirecover(multirecover); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } + + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } + + // Start dispatching queries. + st_bool accept_joiner; + int seqno = 0; + st_channel<replica_info> newreps; + st_channel<st_netfd_t> delreps; + foreach (const replica_info &r, replicas) newreps.push(r); + function<void()> f; + if (do_tpcc) + f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); + else + f = bind(issue_txns, ref(newreps), ref(seqno), ref(accept_joiner)); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); + + finally fin(bind(summarize, "LEADER", ref(seqno))); + + try { + // Start handling responses. + st_thread_group handlers; + int rid = 0; + foreach (replica_info r, replicas) { + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), ref(delreps), true); + else + fn = bind(handle_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + try { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); + } catch (std::exception &ex) { + string s(ex.what()); + if (s.find("Interrupted system call") == s.npos) + throw; + else + throw break_exception(); + } + Join join; + readmsg(joiner, join); + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + init.set_yourhost(replicas.back().host()); + sendmsg(joiner, init); + recover_signals.push(current_time_millis()); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), ref(delreps), false); + else + handle_responses_joiner_fn = + bind(handle_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + newreps.push(replicas.back()); + handlers.insert(my_spawn(handle_responses_joiner_fn, + "handle_responses_joiner")); + } catch (break_exception &ex) { + } catch (std::exception &ex) { + // TODO: maybe there's a cleaner way to do this final step before waiting with the join + cerr_thread_ex(ex) << endl; + throw; + } +} Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) +++ ydb/trunk/src/main.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -29,6 +29,7 @@ #include <commons/st/sockets.h> #include <iostream> #include <unistd.h> // pipe, write, sync +#include "tpcc/tpcctables.h" #include "msg.h" #include "setprefs.h" #end Added: ydb/trunk/src/rectpcc.lzz.clamp =================================================================== --- ydb/trunk/src/rectpcc.lzz.clamp (rev 0) +++ ydb/trunk/src/rectpcc.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -0,0 +1,175 @@ +#hdr +#include "tpcc.hh" +namespace ydb { namespace pb { class Init; } } +using namespace ydb::pb; +#end + +#src +#include "unsetprefs.h" +#include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/threads.h> +#include <commons/st/reader.h> +#include "tpcc/tpcctables.h" +#include "ydb.pb.h" +#include "setprefs.h" +#end + +void +rec_tpcc(int &seqno, int mypos, const Init &init, + const vector<st_netfd_t> &replicas, recovery_t &orig, + st_channel<chunk> &backlog) +{ + commons::array<char> recarr(0); + + function<void()> rec_twal_fn = lambda() { + int &seqno = __ref(seqno); + cout << "recovering from twal" << endl; + long long start_time = current_time_millis(); + g_twal->flush(); + sync(); + ifstream inf("twal"); + TpccReq req; + while (inf.peek() != ifstream::traits_type::eof()) { + ASSERT(inf.good()); + readmsg(inf, req); + process_tpcc(req, seqno, nullptr); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + showdatarate("recovered from twal", inf.tellg(), + current_time_millis() - start_time); + cout << "now at seqno " << seqno << endl; + }; + + function<void()> recv_log_fn = lambda() { + st_netfd_t src = __ref(replicas[0]); + int &seqno = __ref(seqno); + ASSERT(fail_seqno == seqno); + recreq r = { fail_seqno + 1, resume.take() }; + st_write(src, r); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + function<void(anchored_stream_reader &reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + shift_reader(reader); + }; + anchored_stream_reader reader(st_read_fn(src), + st_read_fully_fn(src), + overflow_fn, rbuf.get(), rbuf.size()); + TpccReq req; + while (seqno < r.end_seqno) { + { st_intr intr(stop_hub); readmsg(reader, req); } + process_tpcc(req, seqno, nullptr); + reader.set_anchor(); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + }; + + if (rec_twal) { + failed.waitset(); + g_tables.reset(new TPCCTables); + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); + commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), + orig.size() - sizeof(tpcc_recovery_header)); + g_tables->deser(mypos, init.node_size(), hdr, body); + body.release(); + rec_twal_fn(); + failed.reset(); + recv_log_fn(); + } + +#if 0 + st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); + st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); + + st_join(rec_twal_thread); + st_join(recv_log_thread); +#endif + + if (rec_pwal) { + // Recover from phy log. + } else if (rec_twal) { + // Recover from txn log. + } else { + + g_tables.reset(new TPCCTables); + + // + // Build-up + // + + if (ship_log) { + } else { + // XXX indent + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); +} Added: ydb/trunk/src/replica.lzz.clamp =================================================================== --- ydb/trunk/src/replica.lzz.clamp (rev 0) +++ ydb/trunk/src/replica.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -0,0 +1,362 @@ +#hdr +#include "unsetprefs.h" +#include <string> +#end + +#src +#include "unsetprefs.h" +#include <boost/archive/binary_iarchive.hpp> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" +#include "rectpcc.hh" +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#end + +/** + * Run a replica. + */ +void +run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) +{ + if (disk) { + // Disk IO threads. + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); + } + } + + // Initialize database state. + int seqno = -1; + mii &map = g_map; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + tables->show(); + } + recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); + + finally f(bind(summarize, "REPLICA", ref(seqno))); + st_channel<recovery_t> send_states; + + cout << "starting as replica on port " << listen_port << endl; + + // Listen for connections from other replicas. + st_netfd_t listener = st_tcp_listen(listen_port); + + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + { + st_intr intr(stop_hub); + readmsg(leader, init); + } + uint32_t listen_host = init.yourhost(); + multirecover = init.multirecover(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); + int mypos = -1; + for (int i = 0; i < init.node_size(); ++i) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (is_self) mypos = i; + if (!is_self && (init.txnseqno() > 0 || rec_twal)) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); + } + } + + // Initialize physical or txn log. + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Process txns. + st_channel<chunk> backlog; + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns, leader, ref(map), ref(seqno), + ref(send_states), ref(backlog), init.txnseqno(), mypos, + init.node_size()); + st_joining join_proc(my_spawn(process_fn, "process_txns")); + st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? + my_spawn(bind(recover_joiner, listener, ref(send_states)), + "recover_joiner") : + nullptr); + + try { + // If there's anything to recover. + if (init.txnseqno() > 0 || fail_seqno > 0) { + if (do_tpcc) { + + rec_tpcc(seqno, mypos, init, replicas, orig, backlog); + + } else { + + // + // Simple txns + // + + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; +#if 0 + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); + } + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; +#endif + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn(map, txn, seqno); + if (fake_exec && !use_pb) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); + } + } + g_caught_up = true; +#if 0 + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } + } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } + } + } catch (std::exception &ex) { + cerr_thread_ex(ex) << endl; + throw; + } + + stop_hub.insert(st_thread_self()); +} Added: ydb/trunk/src/run.lzz.clamp =================================================================== --- ydb/trunk/src/run.lzz.clamp (rev 0) +++ ydb/trunk/src/run.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -0,0 +1,12 @@ +#src +#include "tpcc.hh" +#include "stxn.hh" +#end + +void +summarize(const char *role, int seqno) +{ + cout << role << " SUMMARY\n"; + if (do_tpcc) summarize_tpcc(seqno); + else summarize_stxn(seqno); +} Modified: ydb/trunk/src/stxn.lzz.clamp =================================================================== --- ydb/trunk/src/stxn.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) +++ ydb/trunk/src/stxn.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -56,6 +56,8 @@ map.set_deleted_key(-2); } +namespace { + template<typename T> recovery_t make_recovery(const T &map, int mypos, int nnodes, int seqno); template<> @@ -88,8 +90,8 @@ */ template<typename Types> void -issue_txns(st_channel<replica_info> &newreps, int &seqno, - st_bool &accept_joiner) +issue_txns0(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) { USE(newreps); USE(seqno); @@ -341,11 +343,11 @@ */ template<typename Types, typename RTypes> void -process_txns(st_netfd_t leader, mii &map, int &seqno, - st_channel<recovery_t> &send_states, - /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ - st_channel<chunk> &backlog, int init_seqno, - int mypos, int nnodes) +process_txns0(st_netfd_t leader, mii &map, int &seqno, + st_channel<recovery_t> &send_states, + /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ + st_channel<chunk> &backlog, int init_seqno, + int mypos, int nnodes) { USE(leader); USE(map); @@ -737,15 +739,113 @@ int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; }; +template<typename T> +void +ser(ser_array &s, const T &msg) +{ + int len = msg.ByteSize(); + + // Grow the array as needed. + s.stretch(len + sizeof(uint32_t)); + + // Serialize message to a buffer with four-byte length prefix. + check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); + *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); +} + +template<typename T> +void +bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) +{ + ser_t s; + ser(s, msg); + bcastbuf(dsts, s); +} + +template<typename T> +void +sendmsg(st_netfd_t dst, const T &msg) +{ + ser_t s; + ser(s, msg); + sendbuf(dst, s); +} + +template<typename T> +void +readmsg(st_reader &src, T &msg) +{ + managed_array<char> a = src.read(sizeof(uint32_t)); + uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); + check(msg.ParseFromArray(src.read(len), len)); +} + +} + /** * Swallow replica responses. */ -template<typename Types> void handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { response_handler h(replica, seqno, rid, recover_signals, caught_up); - h.run<Types>(); + if (use_pb_res) + h.run<pb_traits>(); + else + h.run<rb_traits>(); } +void +issue_txns(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) +{ + if (use_pb) + issue_txns0<pb_traits>(newreps, seqno, accept_joiner); + else + issue_txns0<rb_traits>(newreps, seqno, accept_joiner); +} + +void +process_txns(st_netfd_t leader, mii &map, int &seqno, + st_channel<recovery_t> &send_states, + /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ + st_channel<chunk> &backlog, int init_seqno, + int mypos, int nnodes) +{ + if (use_pb && use_pb_res) + process_txns0<pb_traits, pb_traits> + (leader, map, seqno, send_states, backlog, init_seqno, mypos, nnodes); + else if (use_pb && !use_pb_res) + process_txns0<pb_traits, rb_traits> + (leader, map, seqno, send_states, backlog, init_seqno, mypos, nnodes); + else if (!use_pb && use_pb_res) + process_txns0<rb_traits, pb_traits> + (leader, map, seqno, send_states, backlog, init_seqno, mypos, nnodes); + else if (!use_pb && !use_pb_res) + process_txns0<rb_traits, rb_traits> + (leader, map, seqno, send_states, backlog, init_seqno, mypos, nnodes); +} + +void +process_txn(mii &map, const ydb::msg::Txn &txn, int &seqno) +{ + process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); +} + +void +summarize_stxn(int seqno) +{ + cout << "- total updates = " << updates << "\n" + << "- final DB state: seqno = " << seqno << ", size = " + << g_map.size() << endl; + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + cout << "- dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << seqno << endl; + foreach (const entry &p, g_map) { + of << p.first << ": " << p.second << endl; + } + } +} Modified: ydb/trunk/src/tpcc.lzz.clamp =================================================================== --- ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) +++ ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -831,3 +831,16 @@ ASSERT(false); } } + +void summarize_tpcc(int seqno) +{ + cout << "seqno: " << seqno << endl; + if (g_tables != nullptr) { + cout << "state:\n"; + g_tables->show(); + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + // XXX iterate & dump + } + } +} Modified: ydb/trunk/src/ydb.lzz.clamp =================================================================== --- ydb/trunk/src/ydb.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) +++ ydb/trunk/src/ydb.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) @@ -1,38 +1,17 @@ -#hdr -#include "unsetprefs.h" -#include <boost/bind.hpp> -#include <boost/function.hpp> -#include <boost/scoped_ptr.hpp> -#include <string> -#include <iostream> -#include <st.h> -#include "tpcc/clock.h" -#include "tpcc/randomgenerator.h" -#include "tpcc/tpccclient.h" -#include "tpcc/tpccgenerator.h" -#include "tpcc/tpcctables.h" -#include "util.hh" -#include "tpcc.hh" -#include "stxn.hh" -#include "main.hh" - -using namespace boost; -using namespace std; -using namespace commons; -#end - #src #include "unsetprefs.h" +#include <boost/program_options.hpp> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> #include <csignal> // sigaction, etc. #include <cstring> // strsignal, size_t -#include <boost/archive/binary_iarchive.hpp> -#include <boost/program_options.hpp> #include <gtest/gtest.h> #include <malloc.h> #include <string> -#include <commons/st/io.h> -#include <commons/st/sockets.h> -#include <commons/st/threads.h> +#include "tpcc.hh" +#include "stxn.hh" +#include "replica.hh" +#include "leader.hh" #include "setprefs.h" #end @@ -41,6 +20,7 @@ using namespace boost::archive; namespace { + /** * Memory monitor. */ @@ -95,6 +75,7 @@ } } } + } // @@ -325,35 +306,10 @@ cout << "pid " << getpid() << endl; // Which role are we? - if (is_leader) { - if (use_pb) { - if (use_pb_res) { - run_leader<pb_traits, pb_traits>(minreps, leader_port); - } else { - //run_leader<pb_traits, rb_traits>(minreps, leader_port); - } - } else { - if (use_pb_res) { - //run_leader<rb_traits, pb_traits>(minreps, leader_port); - } else { - //run_leader<rb_traits, rb_traits>(minreps, leader_port); - } - } - } else { - if (use_pb) { - if (use_pb_res) { - run_replica<pb_traits, pb_traits>(leader_host, leader_port, listen_port); - } else { - //run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); - } - } else { - if (use_pb_res) { - //run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); - } else { - //run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); - } - } - } + if (is_leader) + run_leader(minreps, leader_port); + else + run_replica(leader_host, leader_port, listen_port); return 0; } catch (std::exception &ex) { @@ -362,697 +318,3 @@ return 1; } } - -#if 0 -template<typename Types, typename RTypes> -void -run_leader(int minreps, uint16_t leader_port); -template<typename Types, typename RTypes> -void -run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port); -#endif - -#if 1 -namespace { -/** - * Run the leader. - */ -template<typename Types, typename RTypes> -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - st_multichannel<long long> recover_signals; - - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - cout << "waiting for at least " << minreps << " replicas to join" << endl; - Join join; - for (int i = 0; i < minreps; ++i) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - readmsg(fd, join); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - cout << "got all " << minreps << " replicas" << endl; - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - init.set_multirecover(multirecover); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - st_bool accept_joiner; - int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; - foreach (const replica_info &r, replicas) newreps.push(r); - function<void()> f; - if (do_tpcc) - f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); - else - f = bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_joining join_issue_txns(my_spawn(f, "issue_txns")); - - finally fin(bind(summarize, "LEADER", ref(seqno))); - - try { - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - function<void()> fn; - if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); - else - fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); - handlers.insert(my_spawn(fn, "handle_responses")); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - try { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); - } catch (std::exception &ex) { - string s(ex.what()); - if (s.find("Interrupted system call") == s.npos) - throw; - else - throw break_exception(); - } - Join join; - readmsg(joiner, join); - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - init.set_yourhost(replicas.back().host()); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - function<void()> handle_responses_joiner_fn; - if (do_tpcc) - handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); - else - handle_responses_joiner_fn = - bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, - ref(recover_signals), false); - newreps.push(replicas.back()); - handlers.insert(my_spawn(handle_responses_joiner_fn, - "handle_responses_joiner")); - } catch (break_exception &ex) { - } catch (std::exception &ex) { - // TODO: maybe there's a cleaner way to do this final step before waiting with the join - cerr_thread_ex(ex) << endl; - throw; - } -} - -void -summarize(const char *role, int seqno) -{ - cout << role << " SUMMARY\n"; - if (do_tpcc) { - cout << "seqno: " << seqno << endl; - if (g_tables != nullptr) { - cout << "state:\n"; - g_tables->show(); - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - // XXX iterate & dump - } - } - } else { - cout << "- total updates = " << updates << "\n" - << "- final DB state: seqno = " << seqno << ", size = " - << g_map.size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << seqno << endl; - foreach (const entry &p, g_map) { - of << p.first << ": " << p.second << endl; - } - } - } -} - -/** - * Run a replica. - */ -template<typename Types, typename RTypes> -void -run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - if (disk) { - // Disk IO threads. - for (int i = 0; i < 5; ++i) { - //thread somethread(threadfunc); - } - } - - // Initialize database state. - int seqno = -1; - mii &map = g_map; - commons::array<char> recarr(0); - if (do_tpcc) { - TPCCTables *tables = new TPCCTables(); - g_tables.reset(tables); - SystemClock* clock = new SystemClock(); - - // Create a generator for filling the database. - RealRandomGenerator* random = new RealRandomGenerator(); - NURandC cLoad = NURandC::makeRandom(random); - random->setC(cLoad); - - // Generate the data - cout << "loading " << nwarehouses << " warehouses" << endl; - char now[Clock::DATETIME_SIZE+1]; - clock->getDateTimestamp(now); - TPCCGenerator generator(random, now, Item::NUM_ITEMS, - District::NUM_PER_WAREHOUSE, - Customer::NUM_PER_DISTRICT, - NewOrder::INITIAL_NUM_PER_DISTRICT); - long long start_time = current_time_millis(); - generator.makeItemsTable(tables); - for (int i = 0; i < nwarehouses; ++i) { - generator.makeWarehouse(tables, i+1); - } - cout << "loaded " << nwarehouses << " warehouses in " - << current_time_millis() - start_time << " ms" << endl; - tables->show(); - } - recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); - - finally f(bind(summarize, "REPLICA", ref(seqno))); - st_channel<recovery_t> send_states; - - cout << "starting as replica on port " << listen_port << endl; - - // Listen for connections from other replicas. - st_netfd_t listener = st_tcp_listen(listen_port); - - // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, - timeout); - st_closing closing(leader); - Join join; - join.set_port(listen_port); - sendmsg(leader, join); - Init init; - { - st_intr intr(stop_hub); - readmsg(leader, init); - } - uint32_t listen_host = init.yourhost(); - multirecover = init.multirecover(); - - // Display the info. - cout << "got init msg with txn seqno " << init.txnseqno() - << " and hosts:" << endl; - vector<st_netfd_t> replicas; - st_closing_all close_replicas(replicas); - int mypos = -1; - for (int i = 0; i < init.node_size(); ++i) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host = { sa.host() }; - bool is_self = sa.host() == listen_host && sa.port() == listen_port; - cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, - INET_ADDRSTRLEN)) - << ':' << sa.port() << (is_self ? " (self)" : "") << endl; - if (is_self) mypos = i; - if (!is_self && (init.txnseqno() > 0 || rec_twal)) { - replicas.push_back(st_tcp_connect(host, - static_cast<uint16_t>(sa.port()), - timeout)); - } - } - - // Initialize physical or txn log. - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Process txns. - st_channel<chunk> backlog; - function<void()> process_fn; - if (do_tpcc) - process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), - ref(backlog), init.txnseqno(), mypos, init.node_size()); - else - process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), - ref(send_states), ref(backlog), init.txnseqno(), mypos, - init.node_size()); - st_joining join_proc(my_spawn(process_fn, "process_txns")); - st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? - my_spawn(bind(recover_joiner, listener, ref(send_states)), - "recover_joiner") : - nullptr); - - try { - // If there's anything to recover. - if (init.txnseqno() > 0 || fail_seqno > 0) { - if (do_tpcc) { - - // - // TPCC txns - // - - function<void()> rec_twal_fn = lambda() { - int &seqno = __ref(seqno); - cout << "recovering from twal" << endl; - long long start_time = current_time_millis(); - g_twal->flush(); - sync(); - ifstream inf("twal"); - TpccReq req; - while (inf.peek() != ifstream::traits_type::eof()) { - ASSERT(inf.good()); - readmsg(inf, req); - process_tpcc(req, seqno, nullptr); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - showdatarate("recovered from twal", inf.tellg(), - current_time_millis() - start_time); - cout << "now at seqno " << seqno << endl; - }; - - function<void()> recv_log_fn = lambda() { - st_netfd_t src = __ref(replicas[0]); - int &seqno = __ref(seqno); - ASSERT(fail_seqno == seqno); - recreq r = { fail_seqno + 1, resume.take() }; - st_write(src, r); - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - function<void(anchored_stream_reader &reader)> overflow_fn = - lambda(anchored_stream_reader &reader) { - shift_reader(reader); - }; - anchored_stream_reader reader(st_read_fn(src), - st_read_fully_fn(src), - overflow_fn, rbuf.get(), rbuf.size()); - TpccReq req; - while (seqno < r.end_seqno) { - { st_intr intr(stop_hub); readmsg(reader, req); } - process_tpcc(req, seqno, nullptr); - reader.set_anchor(); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - }; - - if (rec_twal) { - failed.waitset(); - g_tables.reset(new TPCCTables); - tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); - commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), - orig.size() - sizeof(tpcc_recovery_header)); - g_tables->deser(mypos, init.node_size(), hdr, body); - body.release(); - rec_twal_fn(); - failed.reset(); - recv_log_fn(); - } - -#if 0 - st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); - st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); - - st_join(rec_twal_thread); - st_join(recv_log_thread); -#endif - - if (rec_pwal) { - // Recover from phy log. - } else if (rec_twal) { - // Recover from txn log. - } else { - - g_tables.reset(new TPCCTables); - - // - // Build-up - // - - if (ship_log) { - } else { - // XXX indent - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - tpcc_recovery_header hdr; - checkeqnneg(st_read_fully(__ref(replicas[i]), - &hdr, sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof hdr)); - check(hdr.seqno >= 0); - - cout << "receiving recovery of " << hdr.len << " bytes" << endl; - - long long start_time = current_time_millis(); - __ref(recarr).reset(new char[hdr.len], hdr.len); - checkeqnneg(st_read_fully(__ref(replicas[i]), - __ref(recarr).get(), hdr.len, - ST_UTIME_NO_TIMEOUT), - ssize_t(hdr.len)); - - long long before_deser = current_time_millis(); - showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); - - if (__ref(seqno) == -1) - __ref(seqno) = hdr.seqno; - else - checkeq(__ref(seqno), hdr.seqno); - - g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); - - long long end_time = current_time_millis(); - showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); - cout << "receive & deserialize took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; - cout << "after deserialize, db state is now at seqno " - << hdr.seqno << ":" << endl; - g_tables->show(); - -#if 0 - // Resize the table if necessary. - - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(first) = false; - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } - } - - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); -#endif - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - TpccReq req; - while (!backlog.empty()) { - chunk chunk = backlog.take(); - cout << "took from backlog, now has " << backlog.queue().size() - << " chunks" << endl; - sized_array<char> &buf = chunk.get<0>(); - char *begin = chunk.get<1>(), *end = chunk.get<2>(); - ASSERT(buf.get() <= begin && begin < buf.end()); - ASSERT(buf.get() < end && end < buf.end()); - process_buf(begin, end, req, seqno); - } - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); - - } else { - - // - // Simple txns - // - - if (rec_pwal) { - // Recover from physical log. - cout << "recovering from pwal" << endl; - long long start_time = current_time_millis(); - ifstream inf("pwal"); - binary_iarchive in(inf); - int rseqno = -1; - while (inf.peek() != ifstream::traits_type::eof()) { - int op; - in & op; - switch (op) { - case op_del: - { - int key; - in & key; - mii::iterator it = map.find(key); - map.erase(it); - break; - } - case op_write: - { - int key, val; - in & key & val; - map[key] = val; - break; - } - case op_commit: - ++rseqno; - break; - } - if (check_interval(rseqno, yield_interval)) st_sleep(0); - } - seqno = init.txnseqno() - 1; - showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); - cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; - } else { - - // - // Build-up - // - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); - - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(first) = false; - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } - } - - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - long long start_time = current_time_millis(); - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); - long long end_time = current_time_millis(); - - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - showdatarate("got recovery message", len, end_time - start_time); - cout << "receive took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; -#if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); - } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; -#endif - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - // XXX - using msg::TxnBatch; - using msg::Txn; - commons::array<char> rbuf(0), wbuf(buf_size); - reader reader(nullptr, rbuf.get(), rbuf.size()); - writer writer(lambda(const void*, size_t) { - throw not_supported_exception("should not be writing responses during catch-up phase"); - }, wbuf.get(), wbuf.size()); - stream s(reader, writer); - TxnBatch batch(s); - while (!backlog.empty()) { - chunk chunk = backlog.take(); - sized_array<char> &buf = chunk.get<0>(); - ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - ASSERT(chunk.get<1>() < chunk.get<2>()); - swap(buf, reader.buf()); - reader.reset_range(chunk.get<1>(), chunk.get<2>()); - while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = ntohl(reader.read<uint32_t>()); - ASSERT(prefix < 10000); - ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); - batch.Clear(); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; - process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } - - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "caught up txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; - } - } - ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); - } - } - g_caught_up = true; -#if 0 - while (!backlog.empty()) { - using pb::Txn; - shared_ptr<Txn> p = backlog.take(); - process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; - } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } - } -#endif - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); - } - } - } catch (std::exception &ex) { - cerr_thread_ex(ex) << endl; - throw; - } - - stop_hub.insert(st_thread_self()); -} -} -#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-24 06:11:05
|
Revision: 1330 http://assorted.svn.sourceforge.net/assorted/?rev=1330&view=rev Author: yangzhang Date: 2009-03-24 06:10:53 +0000 (Tue, 24 Mar 2009) Log Message: ----------- - updated build system to first lzz then clamp, fixing the issue of where to place lambdas (header or source) - bunch of physical refactoring, particularly trying to reduce the number of includes in headers Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/mkdeps.py ydb/trunk/src/msg.h ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/ydb.lzz.clamp Added Paths: ----------- ydb/trunk/src/util.lzz.clamp Removed Paths: ------------- ydb/trunk/src/util.lzz Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/Makefile 2009-03-24 06:10:53 UTC (rev 1330) @@ -140,20 +140,41 @@ %.cc: %.cc.cog cog.py $< > $@ -%.cc %.hh: %.lzz - lzz -hx hh -sx cc -hl -sl -hd -sd $< - %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< -%.lzz: %.lzz.clamp +# ORIG +# +#%.cc %.hh: %.lzz +# lzz -hx hh -sx cc -hl -sl -hd -sd $< +# +#%.lzz: %.lzz.clamp +# rm -f $@ +# mkdir -p .clamp/ +# clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ +# sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ +# sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ +# chmod -w $@ + +%.cc.clamp %.hh.clamp: %.lzz.clamp + ln -sf $< $(basename $<) + rm -f $(basename $(basename $<)).{hh,cc}.clamp + lzz -hx hh.clamp -sx cc.clamp -hd -sd $(basename $<) + chmod -w $(basename $(basename $<)).{hh.clamp,cc.clamp} + +%.cc: %.cc.clamp rm -f $@ mkdir -p .clamp/ - clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ - sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ - sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ + clamp --outdir .clamp/ --prefix $(basename $@)_cc < $< | \ + sed 's/"$(basename $@).hh.clamp"/"$(basename $@).hh"/' > $@ chmod -w $@ +%.hh: %.hh.clamp + rm -f $@ + mkdir -p .clamp/ + clamp --outdir .clamp/ --prefix $(basename $@)_hh < $< > $@ + chmod -w $@ + pch.h: svn ls -rHEAD -R $(SVNURL) | \ egrep -v '/$$|Makefile' | \ @@ -186,7 +207,7 @@ -Wno-unused-parameter clean: - rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) + rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) *.d *.hh.clamp *.cc.clamp distclean: clean rm -f pch.h pch.h.gch Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/main.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,15 +1,16 @@ #hdr #include "unsetprefs.h" -#include <boost/archive/binary_iarchive.hpp> -#include <boost/archive/binary_oarchive.hpp> #include <boost/tuple/tuple.hpp> -#include <commons/st/st.h> +#include <commons/st/intr.h> +#include <commons/st/sync.h> +#include <commons/st/channel.h> #include <fstream> // ofstream #include <vector> -#include "msg.h" #include "util.hh" #include "setprefs.h" +namespace boost { namespace archive { class binary_oarchive; } } + using namespace boost; using namespace boost::archive; using namespace commons; @@ -21,11 +22,14 @@ #src #include "unsetprefs.h" #include <boost/foreach.hpp> +#include <boost/archive/binary_oarchive.hpp> #include <commons/assert.h> -#include <commons/nullptr.h> #include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/sockets.h> #include <iostream> #include <unistd.h> // pipe, write, sync +#include "msg.h" #include "setprefs.h" #end @@ -33,6 +37,7 @@ typedef commons::array<char> recovery_t; + // Configuration. st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, @@ -161,29 +166,36 @@ class wal { public: - wal(const string &fname) : of(fname.c_str()), out(of) {} + wal(const string &fname) : + of_(fname.c_str()), + ar_(new binary_oarchive(of())) + {} + ~wal() { delete ar_; } template <typename T> - void log(const T &msg) { ser(of, msg); } + void log(const T &msg) { ser(of(), msg); } void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); + of().write(reinterpret_cast<const char*>(buf), len); } void logdel(int key) { int op = op_del; // TODO: is this really necessary? - out & op & key; + ar() & op & key; } void logwrite(int key, int val) { int op = op_write; - out & op & key & val; + ar() & op & key & val; } void logcommit() { int op = op_commit; - out & op; + ar() & op; } - void flush() { of.flush(); } + void flush() { of().flush(); } private: - ofstream of; - binary_oarchive out; + ofstream of_; + //unique_ptr<binary_oarchive> ar_; + binary_oarchive *ar_; + ofstream &of() { return of_; } + binary_oarchive &ar() { return *ar_; }; }; // TODO? Modified: ydb/trunk/src/mkdeps.py =================================================================== --- ydb/trunk/src/mkdeps.py 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/mkdeps.py 2009-03-24 06:10:53 UTC (rev 1330) @@ -39,12 +39,17 @@ for dep in deps(src(hdr)): yield dep -for i in pwd.glob('*.lzz') + pwd.glob('*.lzz.clamp'): - print sub(r'\.lzz(\.clamp)?', '.o', i), ':', ' '.join(deps(i)) +for i in pwd.glob('*.lzz.clamp'): + print sub(r'\.lzz\.clamp', '.o', i), ':', sub(r'\.lzz\.clamp', '.hh', i), ' '.join(deps(i)) for i in pwd.glob('*.d'): with file(i) as f: for line in f: for word in line.split(): - if '.clamp' in word: - print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) + if '.clamp/' in word: + if '_hh_lambda_' in word: + print sub(r'(\.clamp/(.+)_hh_lambda_.+\.clamp_h)', r'\1: \2.hh.clamp', word) + elif '_cc_lambda_' in word: + print sub(r'(\.clamp/(.+)_cc_lambda_.+\.clamp_h)', r'\1: \2.cc.clamp', word) + else: + print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) Modified: ydb/trunk/src/msg.h =================================================================== --- ydb/trunk/src/msg.h 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/msg.h 2009-03-24 06:10:53 UTC (rev 1330) @@ -3,7 +3,7 @@ #include <commons/array.h> #include <commons/exceptions.h> -#include <commons/st/st.h> +#include <commons/st/reader.h> #include <commons/streamwriter.h> #include <commons/utility.h> #include <iomanip> Modified: ydb/trunk/src/stxn.lzz.clamp =================================================================== --- ydb/trunk/src/stxn.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/stxn.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,13 +1,15 @@ #hdr #include "unsetprefs.h" #include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/tuple/tuple.hpp> +#include <commons/array.h> #include <commons/memory.h> -#include <boost/foreach.hpp> +#include <commons/rand.h> #include <commons/snap_map.h> -#include <commons/rand.h> -#include <commons/array.h> +#include <commons/time.h> #include <google/dense_hash_map> -#include <boost/tuple/tuple.hpp> +#include "msg.h" #include "util.hh" #include "main.hh" #include "setprefs.h" @@ -89,6 +91,9 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { + USE(newreps); + USE(seqno); + USE(accept_joiner); typedef typename Types::TxnBatch TxnBatch; typedef typename Types::Txn Txn; typedef typename Types::Op Op; @@ -255,6 +260,10 @@ process_txn(mii &map, const typename Types::Txn &txn, int &seqno, typename RTypes::Response *res) { + USE(map); + USE(txn); + USE(seqno); + USE(res); typedef typename Types::Txn Txn; typedef typename Types::Op Op; checkeq(txn.seqno(), seqno + 1); @@ -338,6 +347,14 @@ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { + USE(leader); + USE(map); + USE(seqno); + USE(send_states); + USE(backlog); + USE(init_seqno); + USE(mypos); + USE(nnodes); typedef typename Types::TxnBatch TxnBatch; typedef typename Types::Txn Txn; typedef typename Types::Op Op; Modified: ydb/trunk/src/tpcc.lzz.clamp =================================================================== --- ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -11,6 +11,8 @@ #src #include "unsetprefs.h" #include <commons/memory.h> +#include <commons/st/io.h> +#include <commons/time.h> #include <string> #include "tpcc/clock.h" #include "tpcc/randomgenerator.h" @@ -18,6 +20,7 @@ #include "tpcc/tpccdb.h" #include "tpcc/tpccgenerator.h" #include "tpcc/tpcctables.h" +#include "msg.h" #include "setprefs.h" #end @@ -32,6 +35,16 @@ unique_ptr<TPCCTables> g_tables; namespace { +class st_bcast { + const vector<st_netfd_t> &fds_; +public: + st_bcast(const vector<st_netfd_t> &fds) : fds_(fds) {} + void operator()(const void *buf, size_t len) { + foreach (st_netfd_t dst, fds_) + st_timed_write(dst, buf, len); + } +}; + class st_tpcc : public TPCCDB { private: @@ -43,10 +56,7 @@ public: st_tpcc(const vector<st_netfd_t> &fds) : a_(buf_size), - writer_(lambda(const void *buf, size_t len) { - foreach (st_netfd_t dst, __ref(fds)) - st_timed_write(dst, buf, len); - }, a_, buf_size) {} + writer_(st_bcast(fds), a_, buf_size) {} void flush() { writer_.mark_and_flush(); } void set_seqno(int seqno) { seqno_ = seqno; } @@ -821,4 +831,3 @@ ASSERT(false); } } - Deleted: ydb/trunk/src/util.lzz =================================================================== --- ydb/trunk/src/util.lzz 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/util.lzz 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,503 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <cstring> // size_t -#include <ios> // streamoff -#include <st.h> -#include <string> -#include <map> -#include <set> -#include <utility> -#include <boost/scoped_array.hpp> -#include <commons/array.h> -#include <commons/nullptr.h> -#include <commons/time.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include "msg.h" -using namespace std; -using namespace boost; -using namespace ydb::msg; -using namespace google::protobuf::io; -#end -#src -#include <sys/socket.h> // getpeername -#include <gtest/gtest.h> -#include <netinet/in.h> // in_addr etc. -#end - -using namespace testing; - -// -// Globals -// - -bool fake_bcast, profile_threads, multirecover, debug_threads; -size_t buf_size; -long long write_thresh; - -// -// Display -// - -void -showdatarate(const char *action, streamoff len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showdatarate(const char *action, size_t len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showtput(const char *action, long long stop_time, long long start_time, - int stop_count, int start_count) -{ - long long time_diff = stop_time - start_time; - int count_diff = stop_count - start_count; - double rate = double(count_diff) * 1000. / double(time_diff); - cout << action << " " << count_diff << " txns [" - << start_count << ".." << stop_count - << "] in " << time_diff << " ms [" - << start_time << ".." << stop_time - << "] (" - << rate << " tps)" << endl; -} - -// -// Calculations -// - -pair<size_t, size_t> -recovery_range(size_t size, int mypos, int nnodes) -{ - return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, - multirecover ? size * (mypos + 1) / size_t(nnodes) : size); -} - -inline bool -check_interval(int seqno, int interval) -{ - return interval > 0 && seqno % interval == interval - 1; -} - -/** - * Return range * part / nparts, but with proper casting. Assumes that part < - * nparts. - */ -inline int -interp(int range, int part, int nparts) { - return static_cast<int>(static_cast<long long>(range) * part / nparts); -} - -#src -TEST(interp_test, basics) { - EXPECT_EQ(0, interp(3, 0, 3)); - EXPECT_EQ(1, interp(3, 1, 3)); - EXPECT_EQ(2, interp(3, 2, 3)); - EXPECT_EQ(3, interp(3, 3, 3)); - - EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); - EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); - EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); -} -#end - -/** - * Convenience function for calculating percentages. - */ -template<typename T> -inline double pct(T sub, T tot) -{ - return 100 * double(sub) / double(tot); -} - -// -// ST IO -// - -/** - * Perform an st_write but warn if it took over write_thresh ms. - */ -void -st_timed_write(st_netfd_t dst, const void *buf, size_t len) -{ - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() << " write of " << len - << " bytes to dst " << show_sockaddr(dst) << " blocked for " - << write_time << " ms" << endl; - } - } -} - -// -// ST Sockets -// - -char * -show_sockaddr(st_netfd_t fd) -{ - sockaddr_in sa; - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd), - reinterpret_cast<sockaddr*>(&sa), - &salen)); - return inet_ntoa(sa.sin_addr); -} - -inline const string& -nfd2name(st_netfd_t fd) -{ - return nfdnames[fd]; -} - -map<st_netfd_t, string> nfdnames; - -// -// ST Threads -// - -/** - * The list of all threads. Keep track of these so that we may cleanly shut - * down all threads. - */ -set<st_thread_t> threads; - -/** - * RAII for adding/removing the current thread from the global threads set. - */ -class thread_eraser -{ - public: - thread_eraser() { threads.insert(st_thread_self()); } - ~thread_eraser() { threads.erase(st_thread_self()); } -}; - -/** - * For debug/error-printing purposes. - */ -map<st_thread_t, string> threadnames; -st_thread_t last_thread; - -/** - * For profiling. - */ -map<st_thread_t, long long> threadtimes; -long long thread_start_time; - -/** - * Look up thread name, or just show thread ID. - */ -inline string -threadname(st_thread_t t = st_thread_self()) { - if (threadnames.find(t) != threadnames.end()) { - return threadnames[t]; - } else { - return lexical_cast<string>(t); - } -} - -/** - * Debug function for thread names. Remember what we're switching from. - */ -inline void -switch_out_cb() -{ - if (debug_threads) last_thread = st_thread_self(); - if (profile_threads) - threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; -} - -/** - * Debug function for thread names. Show what we're switching from/to. - */ -inline void switch_in_cb() -{ - if (debug_threads && last_thread != st_thread_self()) { - cout << "switching"; - if (last_thread != 0) cout << " from " << threadname(last_thread); - cout << " to " << threadname() << endl; - } - if (profile_threads) - thread_start_time = current_time_millis(); -} - -/** - * Print to cerr a thread exception. - */ -ostream& -cerr_thread_ex(const std::exception &ex) -{ - return cerr << "exception in thread " << threadname() - << ": " << ex.what(); -} - -// -// Serialization -// - -/** - * Adapter for arrays to look like strings (for PB serialization). - */ -class ser_array -{ - commons::array<char> a_; - size_t size_; -public: - ser_array(size_t size = buf_size) : a_(size), size_(0) {} - char *data() const { return a_.get(); } - size_t size() const { return size_; } - void clear() { size_ = 0; } - void stretch(size_t size) { - if (size > a_.size()) - a_.reset(new char[size], size); - size_ = size; - } -}; - -//typedef string ser_t; -typedef ser_array ser_t; - -template<typename T> -void -ser(writer &w, const T &msg) -{ - uint32_t len = msg.ByteSize(); - w.mark(); - w.reserve(len); - check(msg.SerializeToArray(w.cur(), len)); - w.skip(len); -} - -/** - * Serialization. - * - * TODO: experiment with which method is the fastest: using a string as shown - * here or computing the bytesize then allocating (or grabbing/reserving) the - * array. - */ -template<typename T> -void -ser(string &s, const T &msg) -{ - // Serialize message to a buffer. - uint32_t len; - s.append(sizeof len, '\0'); - check(msg.AppendToString(&s)); - - // Warn if the message is large. - if (s.size() > 1000000) - cout << "serializing large message of " << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); - char *plen = reinterpret_cast<char*>(&len); - copy(plen, plen + sizeof len, s.begin()); -} - -template<typename T> -inline void -ser(ser_array &s, const T &msg) -{ - int len = msg.ByteSize(); - - // Grow the array as needed. - s.stretch(len + sizeof(uint32_t)); - - // Serialize message to a buffer with four-byte length prefix. - check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); - *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); -} - -/** - * Serialization. - */ -template<typename T> -inline void -ser(ostream &s, const T &msg) -{ - uint32_t len = htonl(uint32_t(msg.ByteSize())); - s.write(reinterpret_cast<const char*>(&len), sizeof len); - check(msg.SerializeToOstream(&s)); -} - -// -// Messaging -// - -/** - * Send a message to some destinations. - */ -inline void -bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) -{ - if (!fake_bcast) { - foreach (st_netfd_t dst, dsts) { - st_timed_write(dst, msg.data(), msg.size()); - } - } -} - -/** - * Send a message to some destinations, using whichever method of network IO - * was chosen (sync or async). - */ -template<typename T> -inline void -bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) -{ - ser_t s; - ser(s, msg); - bcastbuf(dsts, s); -} - -/** - * Send a message to a single recipient. - */ -inline void -sendbuf(st_netfd_t dst, const ser_t &msg) -{ - if (!fake_bcast) - st_timed_write(dst, msg.data(), msg.size()); -} - -/** - * Send a message to a single recipient. - */ -template<typename T> -inline void -sendmsg(st_netfd_t dst, const T &msg) -{ - ser_t s; - ser(s, msg); - sendbuf(dst, s); -} - -/** - * Read a message. This is done in two steps: first by reading the length - * prefix, then by reading the actual body. This function also provides a way - * to measure how much time is spent actually reading the message from the - * network. Such measurement only makes sense for large messages which take a - * long time to receive. - * - * \param[in] src The socket from which to read. - * - * \param[in] msg The protobuf to read into. - * - * \param[out] start_time If not null, record the time at which we start to - * receive the message (after the length is received). - * - * \param[out] stop_time If not null, record the time at which we finish - * receiving the message (before we deserialize the protobuf). - * - * \param[out] len If not null, record the size of the serialized message - * in bytes. - * - * \param[in] timeout on each of the two read operations (first one is on - * length, second one is on the rest). - * - * \return The length of the serialized message. - */ -template <typename T> -size_t -readmsg(st_netfd_t src, T & msg, long long *start_time = nullptr, long long - *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - // Read the message length. - uint32_t len; - checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - timeout), - static_cast<ssize_t>(sizeof len)); - if (start_time != nullptr) - *start_time = current_time_millis(); - len = ntohl(len); - - // Parse the message body. Try stack-allocation if possible. - scoped_array<char> sbuf; - char *buf; - if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); - else sbuf.reset(buf = new char[len]); - checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); - if (stop_time != nullptr) - *stop_time = current_time_millis(); - check(msg.ParseFromArray(buf, len)); - - return len; -} - -/** - * Same as the above readmsg(), but returns an internally constructed message. - * This is a "higher-level" readmsg() that relies on return-value optimization - * for avoiding unnecessary copies. - */ -template <typename T> -inline T -readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - T msg; - readmsg(src, msg, nullptr, nullptr, timeout); - return msg; -} - -/** - * Same as the above readmsg() but uses an st_reader instead of a raw - * st_netfd_t. - */ -template <typename T> -inline void -readmsg(st_reader &src, T & msg) -{ - managed_array<char> a = src.read(sizeof(uint32_t)); - uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); - check(msg.ParseFromArray(src.read(len), len)); -} - -template<typename T> -inline void -readmsg(anchored_stream_reader &src, T &msg) -{ - uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(checkpass(src.read(len)), len)); -} - -template<typename T> -inline void -readmsg(istream &src, T &msg) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); -#if 0 - IstreamInputStream iis(&src); - LimitingInputStream lis(&iis, len); - check(msg.ParseFromZeroCopyStream(&lis)); -#else - char buf[len]; - src.read(buf, len); - check(msg.ParseFromArray(buf, len)); -#endif -} - -inline uint32_t -readlen(istream &src) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); - ASSERT(len < 10000); - return len; -} - Added: ydb/trunk/src/util.lzz.clamp =================================================================== --- ydb/trunk/src/util.lzz.clamp (rev 0) +++ ydb/trunk/src/util.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -0,0 +1,534 @@ +#hdr +#include "unsetprefs.h" +#include <cstring> // size_t +#include <iosfwd> // streamoff +#include <st.h> +#include <string> +#include <map> +#include <set> +#include <utility> // pair +#include <vector> +#include <commons/array.h> +#include <commons/delegates.h> +#include <commons/nullptr.h> +#include <arpa/inet.h> // htonl, ntohl +//#include <commons/st/st.h> + +using namespace std; +using namespace boost; +using namespace commons; + +namespace commons { + class st_reader; + class stream_writer; + class anchored_stream_reader; +} +namespace ydb { namespace msg { typedef stream_writer writer; } } +using namespace ydb::msg; +namespace google { namespace protobuf { class Message; } } +using google::protobuf::Message; +#end + +#src +#include "unsetprefs.h" +#include <boost/foreach.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/scoped_array.hpp> +#include <commons/st/reader.h> +#include <commons/st/threads.h> +#include <commons/streamreader.h> +#include <commons/streamwriter.h> +#include <commons/time.h> +#include <sys/socket.h> // getpeername +#include <gtest/gtest.h> +#include <netinet/in.h> // in_addr etc. +#include <google/protobuf/message.h> +//#include <google/protobuf/io/zero_copy_stream_impl.h> +//using namespace google::protobuf::io; +#include "setprefs.h" +#end + +#if 1 + +// +// Globals +// + +bool fake_bcast, profile_threads, multirecover, debug_threads; +size_t buf_size; +long long write_thresh; + +// +// Display +// + +void +showdatarate(const char *action, streamoff len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << double(len) / double(time) / 1000 << " MB/s)" << endl; +} + +void +showdatarate(const char *action, size_t len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << double(len) / double(time) / 1000 << " MB/s)" << endl; +} + +void +showtput(const char *action, long long stop_time, long long start_time, + int stop_count, int start_count) +{ + long long time_diff = stop_time - start_time; + int count_diff = stop_count - start_count; + double rate = double(count_diff) * 1000. / double(time_diff); + cout << action << " " << count_diff << " txns [" + << start_count << ".." << stop_count + << "] in " << time_diff << " ms [" + << start_time << ".." << stop_time + << "] (" + << rate << " tps)" << endl; +} + +// +// Calculations +// + +pair<size_t, size_t> +recovery_range(size_t size, int mypos, int nnodes) +{ + return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, + multirecover ? size * (mypos + 1) / size_t(nnodes) : size); +} + +inline bool +check_interval(int seqno, int interval) +{ + return interval > 0 && seqno % interval == interval - 1; +} + +/** + * Return range * part / nparts, but with proper casting. Assumes that part < + * nparts. + */ +inline int +interp(int range, int part, int nparts) { + return static_cast<int>(static_cast<long long>(range) * part / nparts); +} + +#src +TEST(interp_test, basics) { + EXPECT_EQ(0, interp(3, 0, 3)); + EXPECT_EQ(1, interp(3, 1, 3)); + EXPECT_EQ(2, interp(3, 2, 3)); + EXPECT_EQ(3, interp(3, 3, 3)); + + EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); + EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); + EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); +} +#end + +/** + * Convenience function for calculating percentages. + */ +template<typename T> +inline double pct(T sub, T tot) +{ + return 100 * double(sub) / double(tot); +} + +// +// ST IO +// + +/** + * Perform an st_write but warn if it took over write_thresh ms. + */ +void +st_timed_write(st_netfd_t dst, const void *buf, size_t len) +{ + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() << " write of " << len + << " bytes to dst " << show_sockaddr(dst) << " blocked for " + << write_time << " ms" << endl; + } + } +} + +// +// ST Sockets +// + +char * +show_sockaddr(st_netfd_t fd) +{ + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd), + reinterpret_cast<sockaddr*>(&sa), + &salen)); + return inet_ntoa(sa.sin_addr); +} + +inline const string& +nfd2name(st_netfd_t fd) +{ + return nfdnames[fd]; +} + +map<st_netfd_t, string> nfdnames; + +// +// ST Threads +// + +/** + * The list of all threads. Keep track of these so that we may cleanly shut + * down all threads. + */ +set<st_thread_t> threads; + +/** + * RAII for adding/removing the current thread from the global threads set. + */ +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * For debug/error-printing purposes. + */ +typedef map<st_thread_t, string> threadnames_t; +threadnames_t threadnames; +st_thread_t last_thread; + +/** + * For profiling. + */ +map<st_thread_t, long long> threadtimes; +long long thread_start_time; + +/** + * Look up thread name, or just show thread ID. + */ +const string & +threadname(st_thread_t t = st_thread_self()) { + threadnames_t::iterator it = threadnames.find(t); + if (it == threadnames.end()) { + return threadnames[t] = lexical_cast<string>(t); + } else { + return it->second; + } +} + +/** + * Debug function for thread names. Remember what we're switching from. + */ +void +switch_out_cb() +{ + if (debug_threads) last_thread = st_thread_self(); + if (profile_threads) + threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; +} + +/** + * Debug function for thread names. Show what we're switching from/to. + */ +void +switch_in_cb() +{ + if (debug_threads && last_thread != st_thread_self()) { + cout << "switching"; + if (last_thread != 0) cout << " from " << threadname(last_thread); + cout << " to " << threadname() << endl; + } + if (profile_threads) + thread_start_time = current_time_millis(); +} + +/** + * Print to cerr a thread exception. + */ +ostream& +cerr_thread_ex(const std::exception &ex) +{ + return cerr << "exception in thread " << threadname() + << ": " << ex.what(); +} + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + */ +void +my_spawn_helper(const fn f) +{ + thread_eraser eraser; + try { f(); } + catch (std::exception &ex) { cerr_thread_ex(ex) << endl; } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + * \param[in] f The function to execute. + */ +st_thread_t +my_spawn(const fn &f, string name) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f)); + threads.insert(t); + threadnames[t] = name; + return t; +} + + +// +// Serialization +// + +/** + * Adapter for arrays to look like strings (for PB serialization). + */ +class ser_array +{ + commons::array<char> a_; + size_t size_; +public: + ser_array(size_t size = buf_size) : a_(size), size_(0) {} + char *data() const { return a_.get(); } + size_t size() const { return size_; } + void clear() { size_ = 0; } + void stretch(size_t size) { + if (size > a_.size()) + a_.reset(new char[size], size); + size_ = size; + } +}; + +//typedef string ser_t; +typedef ser_array ser_t; + +void +ser(writer &w, const Message &msg) +{ + uint32_t len = msg.ByteSize(); + w.mark(); + w.reserve(len); + check(msg.SerializeToArray(w.cur(), len)); + w.skip(len); +} + +/** + * Serialization. + * + * TODO: experiment with which method is the fastest: using a string as shown + * here or computing the bytesize then allocating (or grabbing/reserving) the + * array. + */ +void +ser(string &s, const Message &msg) +{ + // Serialize message to a buffer. + uint32_t len; + s.append(sizeof len, '\0'); + check(msg.AppendToString(&s)); + + // Warn if the message is large. + if (s.size() > 1000000) + cout << "serializing large message of " << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + copy(plen, plen + sizeof len, s.begin()); +} + +void +ser(ser_array &s, const Message &msg) +{ + int len = msg.ByteSize(); + + // Grow the array as needed. + s.stretch(len + sizeof(uint32_t)); + + // Serialize message to a buffer with four-byte length prefix. + check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); + *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); +} + +/** + * Serialization. + */ +void +ser(ostream &s, const Message &msg) +{ + uint32_t len = htonl(uint32_t(msg.ByteSize())); + s.write(reinterpret_cast<const char*>(&len), sizeof len); + check(msg.SerializeToOstream(&s)); +} + +// +// Messaging +// + +/** + * Send a message to some destinations. + */ +void +bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) +{ + if (!fake_bcast) { + foreach (st_netfd_t dst, dsts) { + st_timed_write(dst, msg.data(), msg.size()); + } + } +} + +/** + * Send a message to some destinations, using whichever method of network IO + * was chosen (sync or async). + */ +void +bcastmsg(const vector<st_netfd_t> &dsts, const Message &msg) +{ + ser_t s; + ser(s, msg); + bcastbuf(dsts, s); +} + +/** + * Send a message to a single recipient. + */ +void +sendbuf(st_netfd_t dst, const ser_t &msg) +{ + if (!fake_bcast) + st_timed_write(dst, msg.data(), msg.size()); +} + +/** + * Send a message to a single recipient. + */ +void +sendmsg(st_netfd_t dst, const Message &msg) +{ + ser_t s; + ser(s, msg); + sendbuf(dst, s); +} + +/** + * Read a message. This is done in two steps: first by reading the length + * prefix, then by reading the actual body. This function also provides a way + * to measure how much time is spent actually reading the message from the + * network. Such measurement only makes sense for large messages which take a + * long time to receive. + * + * \param[in] src The socket from which to read. + * + * \param[in] msg The protobuf to read into. + * + * \param[out] start_time If not null, record the time at which we start to + * receive the message (after the length is received). + * + * \param[out] stop_time If not null, record the time at which we finish + * receiving the message (before we deserialize the protobuf). + * + * \param[out] len If not null, record the size of the serialized message + * in bytes. + * + * \param[in] timeout on each of the two read operations (first one is on + * length, second one is on the rest). + * + * \return The length of the serialized message. + */ +size_t +readmsg(st_netfd_t src, Message &msg, long long *start_time = nullptr, long long + *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + // Read the message length. + uint32_t len; + checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, + timeout), + static_cast<ssize_t>(sizeof len)); + if (start_time != nullptr) + *start_time = current_time_millis(); + len = ntohl(len); + + // Parse the message body. Try stack-allocation if possible. + scoped_array<char> sbuf; + char *buf; + if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); + else sbuf.reset(buf = new char[len]); + checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); + if (stop_time != nullptr) + *stop_time = current_time_millis(); + check(msg.ParseFromArray(buf, len)); + + return len; +} + +/** + * Same as the above readmsg() but uses an st_reader instead of a raw + * st_netfd_t. + */ +void +readmsg(st_reader &src, Message &msg) +{ + managed_array<char> a = src.read(sizeof(uint32_t)); + uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); + check(msg.ParseFromArray(src.read(len), len)); +} + +void +readmsg(anchored_stream_reader &src, Message &msg) +{ + uint32_t len = ntohl(src.read<uint32_t>()); + check(msg.ParseFromArray(checkpass(src.read(len)), len)); +} + +void +readmsg(istream &src, Message &msg) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); +#if 0 + IstreamInputStream iis(&src); + LimitingInputStream lis(&iis, len); + check(msg.ParseFromZeroCopyStream(&lis)); +#else + char buf[len]; + src.read(buf, len); + check(msg.ParseFromArray(buf, len)); +#endif +} + +inline uint32_t +readlen(istream &src) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); + ASSERT(len < 10000); + return len; +} + +#endif Modified: ydb/trunk/src/ydb.lzz.clamp =================================================================== --- ydb/trunk/src/ydb.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/ydb.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -6,7 +6,6 @@ #include <string> #include <iostream> #include <st.h> -#include <commons/st/st.h> #include "tpcc/clock.h" #include "tpcc/randomgenerator.h" #include "tpcc/tpccclient.h" @@ -26,54 +25,23 @@ #include "unsetprefs.h" #include <csignal> // sigaction, etc. #include <cstring> // strsignal, size_t +#include <boost/archive/binary_iarchive.hpp> #include <boost/program_options.hpp> #include <gtest/gtest.h> #include <malloc.h> #include <string> +#include <commons/st/io.h> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> #include "setprefs.h" #end using namespace google; using namespace testing; +using namespace boost::archive; -// -// Utilities/system -// - +namespace { /** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (std::exception &ex) { - cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. Not actually - * used anywhere. - */ -st_thread_t -my_spawn(const function0<void> &f, string name, bool intr = false) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - threadnames[t] = name; - return t; -} - -/** * Memory monitor. */ void @@ -127,6 +95,7 @@ } } } +} // // Main @@ -361,13 +330,13 @@ if (use_pb_res) { run_leader<pb_traits, pb_traits>(minreps, leader_port); } else { - run_leader<pb_traits, rb_traits>(minreps, leader_port); + //run_leader<pb_traits, rb_traits>(minreps, leader_port); } } else { if (use_pb_res) { - run_leader<rb_traits, pb_traits>(minreps, leader_port); + //run_leader<rb_traits, pb_traits>(minreps, leader_port); } else { - run_leader<rb_traits, rb_traits>(minreps, leader_port); + //run_leader<rb_traits, rb_traits>(minreps, leader_port); } } } else { @@ -375,13 +344,13 @@ if (use_pb_res) { run_replica<pb_traits, pb_traits>(leader_host, leader_port, listen_port); } else { - run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); + //run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); } } else { if (use_pb_res) { - run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); + //run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); } else { - run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); + //run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); } } } @@ -394,6 +363,17 @@ } } +#if 0 +template<typename Types, typename RTypes> +void +run_leader(int minreps, uint16_t leader_port); +template<typename Types, typename RTypes> +void +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port); +#endif + +#if 1 +namespace { /** * Run the leader. */ @@ -415,6 +395,7 @@ vector<replica_info> replicas; st_closing_all_infos close_replicas(replicas); cout << "waiting for at least " << minreps << " replicas to join" << endl; + Join join; for (int i = 0; i < minreps; ++i) { st_netfd_t fd; { @@ -422,7 +403,7 @@ fd = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join = readmsg<Join>(fd); + readmsg(fd, join); replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); } cout << "got all " << minreps << " replicas" << endl; @@ -487,7 +468,8 @@ else throw break_exception(); } - Join join = readmsg<Join>(joiner); + Join join; + readmsg(joiner, join); replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); @@ -1072,3 +1054,5 @@ stop_hub.insert(st_thread_self()); } +} +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-24 06:10:52
|
Revision: 1329 http://assorted.svn.sourceforge.net/assorted/?rev=1329&view=rev Author: yangzhang Date: 2009-03-24 06:10:43 +0000 (Tue, 24 Mar 2009) Log Message: ----------- updated st test Modified Paths: -------------- cpp-commons/trunk/src/test/st.cc Modified: cpp-commons/trunk/src/test/st.cc =================================================================== --- cpp-commons/trunk/src/test/st.cc 2009-03-24 06:10:14 UTC (rev 1328) +++ cpp-commons/trunk/src/test/st.cc 2009-03-24 06:10:43 UTC (rev 1329) @@ -1,4 +1,4 @@ -#include <commons/st/st.h> +#include <commons/st/channel.h> #include <commons/unique_ptr.h> #include "test.h" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-24 06:10:27
|
Revision: 1328 http://assorted.svn.sourceforge.net/assorted/?rev=1328&view=rev Author: yangzhang Date: 2009-03-24 06:10:14 +0000 (Tue, 24 Mar 2009) Log Message: ----------- broke up st.h; added USE(); tweaked ASSERT() Modified Paths: -------------- cpp-commons/trunk/src/commons/assert.h cpp-commons/trunk/src/commons/streamwriter.h cpp-commons/trunk/src/commons/utility.h Added Paths: ----------- cpp-commons/trunk/src/commons/st/channel.h cpp-commons/trunk/src/commons/st/intr.h cpp-commons/trunk/src/commons/st/io.h cpp-commons/trunk/src/commons/st/reader.h cpp-commons/trunk/src/commons/st/sockets.h cpp-commons/trunk/src/commons/st/sync.h cpp-commons/trunk/src/commons/st/threads.h Removed Paths: ------------- cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/assert.h =================================================================== --- cpp-commons/trunk/src/commons/assert.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/assert.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -8,7 +8,7 @@ // This is not the "default" because it does not conform to the requirements of the C standard, // which requires that the NDEBUG version be ((void) 0). #ifdef NDEBUG -#define ASSERT(x) do { (void)sizeof(x); } while(0) +#define ASSERT(x) do { static_cast<void>(sizeof(x)); } while(0) #else #define ASSERT(x) assert(x) #endif Added: cpp-commons/trunk/src/commons/st/channel.h =================================================================== --- cpp-commons/trunk/src/commons/st/channel.h (rev 0) +++ cpp-commons/trunk/src/commons/st/channel.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,70 @@ +#ifndef COMMONS_ST_CHANNEL_H +#define COMMONS_ST_CHANNEL_H + +#include <commons/st/sync.h> +#include <boost/foreach.hpp> +#include <boost/shared_ptr.hpp> +#include <queue> +#include <vector> +#define foreach BOOST_FOREACH +#define shared_ptr boost::shared_ptr + +BEGIN_NAMESPACE(commons) + +/** + * An unbounded FIFO queue. ST threads can wait on this until elements have + * been pushed in (resulting in a waking signal). + */ +template <typename T> +class st_channel +{ + public: + template<typename U> void push(U &&x) { + q_.push(forward<U>(x)); + empty_.signal(); + } + T take() { + while (q_.empty()) { + empty_.wait(); + } + T x = move(front()); + q_.pop(); + return x; + } + const T& front() const { return q_.front(); } + T& front() { return q_.front(); } + bool empty() const { return q_.empty(); } + void pop() { q_.pop(); } + void clear() { while (!q_.empty()) q_.pop(); } + const std::queue<T> &queue() const { return q_; } + private: + std::queue<T> q_; + st_cond empty_; +}; + +/** + * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. + */ +template <typename T> +class st_multichannel +{ + public: + void push(const T &x) { + foreach (shared_ptr<st_channel<T> > q, qs) { + q->push(x); + } + } + st_channel<T> &subscribe() { + shared_ptr<st_channel<T> > q(new st_channel<T>); + qs.push_back(q); + return *q; + } + private: + vector<shared_ptr<st_channel<T> > > qs; +}; + +END_NAMESPACE + +#undef shared_ptr + +#endif Added: cpp-commons/trunk/src/commons/st/intr.h =================================================================== --- cpp-commons/trunk/src/commons/st/intr.h (rev 0) +++ cpp-commons/trunk/src/commons/st/intr.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,92 @@ +#ifndef COMMONS_ST_INTR_H +#define COMMONS_ST_INTR_H + +#include <boost/foreach.hpp> +#include <commons/assert.h> +#include <commons/utility.h> +#include <set> +#include <st.h> +#define foreach BOOST_FOREACH + +BEGIN_NAMESPACE(commons) + + /** + * A hub is a single point to signal to wake up a set of threads. Threads + * join the hub before calling a blocking operation if they want to make + * themselves interruptible on this hub. + */ + class st_intr_hub + { + public: + virtual void insert(st_thread_t t) = 0; + virtual void erase(st_thread_t t) = 0; + virtual ~st_intr_hub() {}; + }; + + /** + * The simplest hub, which only interrupts those who are currently joined in + * the hub (like a condition variable broadcast). + */ + class st_intr_cond : public st_intr_hub + { + public: + virtual ~st_intr_cond() {} + void insert(st_thread_t t) { threads.insert(t); } + void erase(st_thread_t t) { threads.erase(t); } + void signal() { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + private: + std::set<st_thread_t> threads; + }; + + /** + * Like st_intr_cond, but a bool instead, so there's state; newly joining + * threads may immediately be interrupted. Interruption occurs when this is + * set to true. + */ + class st_intr_bool : public st_intr_hub + { + public: + void insert(st_thread_t t) { + if (b) st_thread_interrupt(t); + else threads.insert(t); + } + void erase(st_thread_t t) { threads.erase(t); } + void set() { + b = true; + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + void reset() { + // If b is true, then any threads that join are immediately + // interrupted, so the set must be empty. + ASSERT(!b || threads.empty()); + b = false; + } + operator bool() const { return b; } + private: + std::set<st_thread_t> threads; + bool b; + }; + + /** + * RAII for making the current thread interruptible on a certain hub. + */ + class st_intr + { + public: + st_intr(st_intr_hub &hub) : hub_(hub) { hub.insert(st_thread_self()); } + ~st_intr() { hub_.erase(st_thread_self()); } + private: + st_intr_hub &hub_; + }; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/io.h =================================================================== --- cpp-commons/trunk/src/commons/st/io.h (rev 0) +++ cpp-commons/trunk/src/commons/st/io.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,40 @@ +#ifndef COMMONS_ST_IO_H +#define COMMONS_ST_IO_H + +#include <commons/check.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +using namespace std; + +UNUSED static void +st_read(st_netfd_t fd, void *buf, size_t len) +{ + checkeqnneg(st_read_fully(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); +} + +template<typename T> +void +st_read(st_netfd_t fd, T &x) +{ + st_read(fd, &x, sizeof x); +} + +UNUSED static void +st_write(st_netfd_t fd, const void *buf, size_t len) +{ + checkeqnneg(st_write(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); +} + +template<typename T> +void +st_write(st_netfd_t fd, const T &x) +{ + st_write(fd, &x, sizeof x); +} + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/reader.h =================================================================== --- cpp-commons/trunk/src/commons/st/reader.h (rev 0) +++ cpp-commons/trunk/src/commons/st/reader.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,64 @@ +#ifndef COMMONS_ST_READER_H +#define COMMONS_ST_READER_H + +#include <commons/check.h> +#include <commons/streamreader.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +using namespace boost; +using namespace std; + +class st_read_fn +{ +private: + st_netfd_t fd_; + st_utime_t to_; +public: + st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + size_t operator()(char *buf, size_t len) { + return size_t(checknnegerr(st_read(fd_, buf, len, to_))); + } +}; + +class st_read_fully_fn +{ +private: + st_netfd_t fd_; + st_utime_t to_; +public: + st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + void operator()(char *buf, size_t len) { + checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); + } +}; + +class st_reader +{ + EXPAND(stream_reader) +private: + stream_reader r_; +public: + st_reader(st_netfd_t fd, char *buf, size_t len) : + r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} + size_t unread() { return r_.unread(); } + size_t rem() { return r_.rem(); } + sized_array<char> &buf() { return r_.buf(); } + void reset_range(char *start, char *end) { r_.reset_range(start, end); } + void reset() { r_.reset(); } + char *start() { return r_.start(); } + char *end() { return r_.end(); } + bool accum(size_t req) { return r_.accum(req); } + void skip(size_t req) { r_.skip(req); } + managed_array<char> read(size_t req) { return r_.read(req); } + template<typename T> T read() { return r_.read<T>(); } + void shift() { r_.shift(); } +}; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/st/sockets.h (rev 0) +++ cpp-commons/trunk/src/commons/st/sockets.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,81 @@ +#ifndef COMMONS_ST_SOCKETS_H +#define COMMONS_ST_SOCKETS_H + +#include <commons/closing.h> +#include <commons/sockets.h> +#include <commons/utility.h> +#include <st.h> +#include <stx.h> + +BEGIN_NAMESPACE(commons) + +struct stfd_closer { + static void apply(st_netfd_t fd) { check0x(st_netfd_close(fd)); } +}; + +typedef closing<st_netfd_t, stfd_closer> st_closing; + +/** + * Connect to a TCP socket address. + * \param[in] host An IP address. + * \param[in] port The destination port. + * \param[in] timeout The timeout for the connect operation. + */ +UNUSED static st_netfd_t +st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) +{ + // Create remote socket address. + sockaddr_in sa = make_sockaddr(host, port); + + // Create the socket. + st_closing s(checkpass(st_netfd_open_socket(tcp_socket(true)))); + + // Connect. + check0x(st_connect(s.get(), reinterpret_cast<sockaddr*>(&sa), sizeof sa, timeout)); + return s.release(); +} + +/** + * Connect to a TCP socket address. + * \param[in] host Either an IP address or hostname. + * \param[in] port The destination port. + * \param[in] timeout The timeout for each of the DNS lookup and the connect + * operation. + * \todo Create variants that take and/or return sockaddr_in's. + */ +UNUSED static st_netfd_t +st_tcp_connect(const char *host, uint16_t port, st_utime_t timeout) +{ + in_addr ipaddr; + + // First try to parse as IP address. Note: inet_addr() is obsolete. Note: + // inet_aton returns 0 if address is invalid. + if (inet_aton(host, &ipaddr) == 0) { + // Then look up by hostname. + check0x(stx_dns_getaddr(host, &ipaddr, timeout)); + } + + return st_tcp_connect(ipaddr, port, timeout); +} + +/** + * Create a listener st_netfd_t. + * \param[in] port The port to listen on. + * \return The st_netfd_t. + */ +UNUSED static st_netfd_t +st_tcp_listen(uint16_t port) +{ + int sfd = tcp_listen(port); + try { + // Create a net file descriptor around a listener socket. + return checkpass(st_netfd_open_socket(sfd)); + } catch (...) { + close(sfd); + throw; + } +} + +END_NAMESPACE + +#endif Deleted: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/st/st.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -1,480 +0,0 @@ -#ifndef COMMONS_ST_ST_H -#define COMMONS_ST_ST_H - -#include <algorithm> -#include <boost/foreach.hpp> -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include <commons/array.h> -#include <commons/assert.h> -#include <commons/delegates.h> -#include <commons/nullptr.h> -#include <commons/streamreader.h> -#include <commons/sockets.h> -#include <commons/utility.h> -#include <exception> -#include <map> -#include <queue> -#include <set> -#include <sstream> -#include <st.h> -#include <stx.h> -#include <utility> - -#define foreach BOOST_FOREACH -#define shared_ptr boost::shared_ptr - -namespace commons -{ - using namespace boost; - using namespace std; - - enum { default_stack_size = 65536 }; - - struct stfd_closer { - static void apply(st_netfd_t fd) { check0x(st_netfd_close(fd)); } - }; - - typedef closing<st_netfd_t, stfd_closer> st_closing; - - /** - * RAII to acquire and release a st_mutex_t. Non-copyable. - */ - class st_lock - { - NONCOPYABLE(st_lock) - public: - st_lock(st_mutex_t mx) : mx_(mx) { check0x(st_mutex_lock(mx)); } - ~st_lock() { check0x(st_mutex_unlock(mx_)); } - private: - st_mutex_t mx_; - }; - - UNUSED static void - st_read(st_netfd_t fd, void *buf, size_t len) - { - checkeqnneg(st_read_fully(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } - - template<typename T> - void - st_read(st_netfd_t fd, T &x) - { - st_read(fd, &x, sizeof x); - } - - UNUSED static void - st_write(st_netfd_t fd, const void *buf, size_t len) - { - checkeqnneg(st_write(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } - - template<typename T> - void - st_write(st_netfd_t fd, const T &x) - { - st_write(fd, &x, sizeof x); - } - - - /** - * Run a function in pthread. - * \return The new pthread_t on success, 0 on failure. - * \todo Is it safe to treat the pthread_t as a pointer? - */ - UNUSED static st_thread_t - st_spawn(const fn& f) - { - return st_thread_create(&run_function0_null, - new fn(f), - true, - default_stack_size); - } - - UNUSED static void - st_join(st_thread_t t) - { - check0x(st_thread_join(t, nullptr)); - } - - /** - * Connect to a TCP socket address. - * \param[in] host An IP address. - * \param[in] port The destination port. - * \param[in] timeout The timeout for the connect operation. - */ - UNUSED static st_netfd_t - st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) - { - // Create remote socket address. - sockaddr_in sa = make_sockaddr(host, port); - - // Create the socket. - st_closing s(checkpass(st_netfd_open_socket(tcp_socket(true)))); - - // Connect. - check0x(st_connect(s.get(), reinterpret_cast<sockaddr*>(&sa), sizeof sa, timeout)); - return s.release(); - } - - /** - * Connect to a TCP socket address. - * \param[in] host Either an IP address or hostname. - * \param[in] port The destination port. - * \param[in] timeout The timeout for each of the DNS lookup and the connect - * operation. - * \todo Create variants that take and/or return sockaddr_in's. - */ - UNUSED static st_netfd_t - st_tcp_connect(const char *host, uint16_t port, st_utime_t timeout) - { - in_addr ipaddr; - - // First try to parse as IP address. Note: inet_addr() is obsolete. Note: - // inet_aton returns 0 if address is invalid. - if (inet_aton(host, &ipaddr) == 0) { - // Then look up by hostname. - check0x(stx_dns_getaddr(host, &ipaddr, timeout)); - } - - return st_tcp_connect(ipaddr, port, timeout); - } - - /** - * Create a listener st_netfd_t. - * \param[in] port The port to listen on. - * \return The st_netfd_t. - */ - UNUSED static st_netfd_t - st_tcp_listen(uint16_t port) - { - int sfd = tcp_listen(port); - try { - // Create a net file descriptor around a listener socket. - return checkpass(st_netfd_open_socket(sfd)); - } catch (...) { - close(sfd); - throw; - } - } - - /** - * Wraps st_cond_* errno-functions with exceptions and cleans up on - * destruction. - */ - class st_cond - { - NONCOPYABLE(st_cond) - public: - st_cond() : c(checkerr(st_cond_new())) {} - ~st_cond() { check0x(st_cond_destroy(c)); } - void wait() { check0x(st_cond_wait(c)); } - void wait(st_utime_t t) { check0x(st_cond_timedwait(c, t)); } - void signal() { st_cond_signal(c); } - void bcast() { st_cond_broadcast(c); } - private: - st_cond_t c; - }; - - /** - * Synchronized boolean. - */ - class st_bool - { - public: - st_bool(bool init = false) : c(), b(init) {} - void set() { b = true; c.bcast(); } - void reset() { b = false; c.bcast(); } - void waitset() { if (!b) c.wait(); } - void waitreset() { if (b) c.wait(); } - operator bool() { return b; } - private: - st_cond c; - bool b; - }; - - UNUSED static void toggle(st_bool& b) { if (b) b.reset(); else b.set(); } - - /** - * Wraps st_mutex_* errno-functions with exceptions and cleans up on - * destruction. - */ - class st_mutex - { - NONCOPYABLE(st_mutex) - public: - st_mutex() : m(checkerr(st_mutex_new())) {} - ~st_mutex() { check0x(st_mutex_destroy(m)); } - void lock() { check0x(st_mutex_lock(m)); } - bool trylock() { - int res = st_mutex_trylock(m); - if (res == 0) return true; - else if (errno == EBUSY) return false; - else check0x(res); - } - void unlock() { check0x(st_mutex_unlock(m)); } - private: - st_mutex_t m; - }; - - /** - * An unbounded FIFO queue. ST threads can wait on this until elements have - * been pushed in (resulting in a waking signal). - */ - template <typename T> - class st_channel - { - public: - template<typename U> void push(U &&x) { - q_.push(forward<U>(x)); - empty_.signal(); - } - T take() { - while (q_.empty()) { - empty_.wait(); - } - T x = move(front()); - q_.pop(); - return x; - } - const T& front() const { return q_.front(); } - T& front() { return q_.front(); } - bool empty() const { return q_.empty(); } - void pop() { q_.pop(); } - void clear() { while (!q_.empty()) q_.pop(); } - const std::queue<T> &queue() const { return q_; } - private: - std::queue<T> q_; - st_cond empty_; - }; - - /** - * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. - */ - template <typename T> - class st_multichannel - { - public: - void push(const T &x) { - foreach (shared_ptr<st_channel<T> > q, qs) { - q->push(x); - } - } - st_channel<T> &subscribe() { - shared_ptr<st_channel<T> > q(new st_channel<T>); - qs.push_back(q); - return *q; - } - private: - vector<shared_ptr<st_channel<T> > > qs; - }; - - /** - * A hub is a single point to signal to wake up a set of threads. Threads - * join the hub before calling a blocking operation if they want to make - * themselves interruptible on this hub. - */ - class st_intr_hub - { - public: - virtual void insert(st_thread_t t) = 0; - virtual void erase(st_thread_t t) = 0; - virtual ~st_intr_hub() {}; - }; - - /** - * The simplest hub, which only interrupts those who are currently joined in - * the hub (like a condition variable broadcast). - */ - class st_intr_cond : public st_intr_hub - { - public: - virtual ~st_intr_cond() {} - void insert(st_thread_t t) { threads.insert(t); } - void erase(st_thread_t t) { threads.erase(t); } - void signal() { - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - threads.clear(); - } - private: - std::set<st_thread_t> threads; - }; - - /** - * Like st_intr_cond, but a bool instead, so there's state; newly joining - * threads may immediately be interrupted. Interruption occurs when this is - * set to true. - */ - class st_intr_bool : public st_intr_hub - { - public: - void insert(st_thread_t t) { - if (b) st_thread_interrupt(t); - else threads.insert(t); - } - void erase(st_thread_t t) { threads.erase(t); } - void set() { - b = true; - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - threads.clear(); - } - void reset() { - // If b is true, then any threads that join are immediately - // interrupted, so the set must be empty. - ASSERT(!b || threads.empty()); - b = false; - } - operator bool() const { return b; } - private: - std::set<st_thread_t> threads; - bool b; - }; - - /** - * RAII for making the current thread interruptible on a certain hub. - */ - class st_intr - { - public: - st_intr(st_intr_hub &hub) : hub_(hub) { hub.insert(st_thread_self()); } - ~st_intr() { hub_.erase(st_thread_self()); } - private: - st_intr_hub &hub_; - }; - - class st_group_join_exception : public std::exception - { - public: - st_group_join_exception(const map<st_thread_t, std::exception> &th2ex) : - th2ex_(th2ex) {} - virtual ~st_group_join_exception() throw() {} - virtual const char *what() const throw() { - if (!th2ex_.empty() && s == "") { - bool first = true; - stringstream ss; - typedef pair<st_thread_t, std::exception> p; - foreach (p p, th2ex_) { - ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); - first = false; - } - s = ss.str(); - } - return s.c_str(); - } - private: - map<st_thread_t, std::exception> th2ex_; - mutable string s; - }; - - /** - * RAII for joining on a single thread. - */ - class st_joining - { - NONCOPYABLE(st_joining) - public: - st_joining(st_thread_t t) : t_(t) {} - ~st_joining() { if (t_ != nullptr) st_join(t_); } - private: - st_thread_t t_; - }; - - /** - * RAII for joining on all contained threads. Warning: st_join may throw - * exceptions. - */ - class st_thread_group - { - public: - ~st_thread_group() { - map<st_thread_t, std::exception> th2ex; - foreach (st_thread_t t, ts) { - try { st_join(t); } - catch (std::exception &ex) { th2ex[t] = ex; } - } - if (!th2ex.empty()) throw st_group_join_exception(th2ex); - } - void insert(st_thread_t t) { ts.insert(t); } - private: - std::set<st_thread_t> ts; - }; - -#if 0 -/// XXX - int count = 0; - size_t glen = 0; - - class st_read_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - size_t operator()(char *buf, size_t len) { - size_t x = size_t(checknnegerr(st_read(fd_, buf, len, to_))); - glen += x; - if ((++count & 0xf) == 0xf) - cout << "count " << count << " len " << len << " read " << x << " glen " << glen << endl; - return x; - // return size_t(checknnegerr(st_read(fd_, buf, len, to_))); - } - }; -#else - class st_read_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - size_t operator()(char *buf, size_t len) { - return size_t(checknnegerr(st_read(fd_, buf, len, to_))); - } - }; -#endif - - class st_read_fully_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - void operator()(char *buf, size_t len) { - checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); - } - }; - - class st_reader - { - EXPAND(stream_reader) - private: - stream_reader r_; - public: - st_reader(st_netfd_t fd, char *buf, size_t len) : - r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} - size_t unread() { return r_.unread(); } - size_t rem() { return r_.rem(); } - sized_array<char> &buf() { return r_.buf(); } - void reset_range(char *start, char *end) { r_.reset_range(start, end); } - void reset() { r_.reset(); } - char *start() { return r_.start(); } - char *end() { return r_.end(); } - bool accum(size_t req) { return r_.accum(req); } - void skip(size_t req) { r_.skip(req); } - managed_array<char> read(size_t req) { return r_.read(req); } - template<typename T> T read() { return r_.read<T>(); } - void shift() { r_.shift(); } - }; - -} - -#endif Added: cpp-commons/trunk/src/commons/st/sync.h =================================================================== --- cpp-commons/trunk/src/commons/st/sync.h (rev 0) +++ cpp-commons/trunk/src/commons/st/sync.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,84 @@ +#ifndef COMMONS_ST_SYNC_H +#define COMMONS_ST_SYNC_H + +#include <commons/check.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +/** + * RAII to acquire and release a st_mutex_t. Non-copyable. + */ +class st_lock +{ + NONCOPYABLE(st_lock) + public: + st_lock(st_mutex_t mx) : mx_(mx) { check0x(st_mutex_lock(mx)); } + ~st_lock() { check0x(st_mutex_unlock(mx_)); } + private: + st_mutex_t mx_; +}; + +/** + * Wraps st_cond_* errno-functions with exceptions and cleans up on + * destruction. + */ +class st_cond +{ + NONCOPYABLE(st_cond) + public: + st_cond() : c(checkerr(st_cond_new())) {} + ~st_cond() { check0x(st_cond_destroy(c)); } + void wait() { check0x(st_cond_wait(c)); } + void wait(st_utime_t t) { check0x(st_cond_timedwait(c, t)); } + void signal() { st_cond_signal(c); } + void bcast() { st_cond_broadcast(c); } + private: + st_cond_t c; +}; + +/** + * Synchronized boolean. + */ +class st_bool +{ + public: + st_bool(bool init = false) : c(), b(init) {} + void set() { b = true; c.bcast(); } + void reset() { b = false; c.bcast(); } + void waitset() { if (!b) c.wait(); } + void waitreset() { if (b) c.wait(); } + operator bool() { return b; } + private: + st_cond c; + bool b; +}; + +UNUSED static void toggle(st_bool& b) { if (b) b.reset(); else b.set(); } + +/** + * Wraps st_mutex_* errno-functions with exceptions and cleans up on + * destruction. + */ +class st_mutex +{ + NONCOPYABLE(st_mutex) + public: + st_mutex() : m(checkerr(st_mutex_new())) {} + ~st_mutex() { check0x(st_mutex_destroy(m)); } + void lock() { check0x(st_mutex_lock(m)); } + bool trylock() { + int res = st_mutex_trylock(m); + if (res == 0) return true; + else if (errno == EBUSY) return false; + else check0x(res); + } + void unlock() { check0x(st_mutex_unlock(m)); } + private: + st_mutex_t m; +}; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/threads.h =================================================================== --- cpp-commons/trunk/src/commons/st/threads.h (rev 0) +++ cpp-commons/trunk/src/commons/st/threads.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,98 @@ +#ifndef COMMONS_ST_THREADS_H +#define COMMONS_ST_THREADS_H + +#include <commons/utility.h> +#include <commons/delegates.h> +#include <boost/foreach.hpp> +#include <map> +#include <set> +#include <st.h> + +#define foreach BOOST_FOREACH + +BEGIN_NAMESPACE(commons) + +using namespace std; + +enum { default_stack_size = 65536 }; + +/** + * Run a function in pthread. + * \return The new pthread_t on success, 0 on failure. + * \todo Is it safe to treat the pthread_t as a pointer? + */ +UNUSED static st_thread_t +st_spawn(const fn& f) +{ + return st_thread_create(&run_function0_null, + new fn(f), + true, + default_stack_size); +} + +UNUSED static void +st_join(st_thread_t t) +{ + check0x(st_thread_join(t, nullptr)); +} + +class st_group_join_exception : public std::exception +{ + public: + st_group_join_exception(const map<st_thread_t, std::exception> &th2ex) : + th2ex_(th2ex) {} + virtual ~st_group_join_exception() throw() {} + virtual const char *what() const throw() { + if (!th2ex_.empty() && s == "") { + bool first = true; + stringstream ss; + typedef pair<st_thread_t, std::exception> p; + foreach (p p, th2ex_) { + ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); + first = false; + } + s = ss.str(); + } + return s.c_str(); + } + private: + map<st_thread_t, std::exception> th2ex_; + mutable string s; +}; + +/** + * RAII for joining on a single thread. + */ +class st_joining +{ + NONCOPYABLE(st_joining) + public: + st_joining(st_thread_t t) : t_(t) {} + ~st_joining() { if (t_ != nullptr) st_join(t_); } + private: + st_thread_t t_; +}; + +/** + * RAII for joining on all contained threads. Warning: st_join may throw + * exceptions. + */ +class st_thread_group +{ + public: + ~st_thread_group() { + map<st_thread_t, std::exception> th2ex; + foreach (st_thread_t t, ts) { + try { st_join(t); } + catch (std::exception &ex) { th2ex[t] = ex; } + } + if (!th2ex.empty()) throw st_group_join_exception(th2ex); + } + void insert(st_thread_t t) { ts.insert(t); } + private: + set<st_thread_t> ts; +}; + +END_NAMESPACE + +#endif Modified: cpp-commons/trunk/src/commons/streamwriter.h =================================================================== --- cpp-commons/trunk/src/commons/streamwriter.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/streamwriter.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -7,6 +7,7 @@ #include <cstring> #include <iostream> #include <iomanip> +#include <arpa/inet.h> namespace commons { @@ -22,7 +23,7 @@ char *unsent_; char *mark_; char *p_; - boost::function<void(void*, size_t)> flushcb; + boost::function<void(const void*, size_t)> flushcb; char *reserve(size_t n, char *p) { if (p + n > a_.end()) { // check that the reserved space will fit @@ -43,7 +44,7 @@ *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; } public: - stream_writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : + stream_writer(boost::function<void(const void*, size_t)> flushcb, char *a, size_t buf_size) : a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), p_(mark_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } Modified: cpp-commons/trunk/src/commons/utility.h =================================================================== --- cpp-commons/trunk/src/commons/utility.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/utility.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -17,4 +17,7 @@ #define UNUSED __attribute__((unused)) +/** Useful for temporarily causing a variable to be "used." */ +#define USE(x) static_cast<void>(x) + #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |