assorted-commits Mailing List for Assorted projects (Page 27)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-03-05 23:14:58
|
Revision: 1252 http://assorted.svn.sourceforge.net/assorted/?rev=1252&view=rev Author: yangzhang Date: 2009-03-05 23:14:45 +0000 (Thu, 05 Mar 2009) Log Message: ----------- - accum returns residual count instead of bool - added anchor, reset Modified Paths: -------------- cpp-commons/trunk/src/commons/streamreader.h cpp-commons/trunk/src/test/streamreader.cc Modified: cpp-commons/trunk/src/commons/streamreader.h =================================================================== --- cpp-commons/trunk/src/commons/streamreader.h 2009-03-04 23:31:58 UTC (rev 1251) +++ cpp-commons/trunk/src/commons/streamreader.h 2009-03-05 23:14:45 UTC (rev 1252) @@ -48,7 +48,8 @@ full_reader_(repeat_reader(reader_)), buf_(buf, bufsize), start_(buf), - end_(buf) + end_(buf), + anchor_(buf) {} stream_reader(boost::function<size_t(char*, size_t)> reader, @@ -58,7 +59,8 @@ full_reader_(full_reader), buf_(buf, bufsize), start_(buf), - end_(buf) + end_(buf), + anchor_(buf) {} /** @@ -80,33 +82,47 @@ * Manually update/adjust the pointers; useful after changing buf_. */ void reset_range(char *start, char *end) { - start_ = start; + anchor_ = start_ = start; end_ = end; } + void set_anchor() { anchor_ = start_; } + + /** + * Reset the pointers, emptying the buffer. + */ + void reset() { + start_ = end_ = anchor_ = buf_.get(); + } + char *start() { return start_; } char *end() { return end_; } + char *anchor() { return anchor_; } /** - * Accumulate (but don't read/copy) the requested number of bytes. + * Accumulate (treat as read, but don't discard, and don't actually + * copy for reading) the requested number of bytes. Returns the + * remaining number of requested bytes that couldn't be accumulated + * because we ran out of buffer space. */ - bool accum(size_t req) { + size_t accum(size_t req) { while (unread() < req && rem() > 0) { size_t res = reader_(end_, rem()); if (res == 0) throw eof_exception(); end_ += res; } if (unread() < req) { + size_t ret = req - unread(); start_ = end_; - return false; + return ret; } else { start_ += req; - return true; + return 0; } } /** - * Discard the requested number of bytes. + * Discard the requested number of bytes. Disregards anchor. */ void skip(size_t req) { if (unread() >= req) { @@ -119,7 +135,7 @@ // reading. Skip over bytes that are immediately available... req -= unread(); // ...and reset pointers to discard current buffer. - start_ = end_ = buf_.get(); + reset(); // Keep reading until we have enough. while (true) { @@ -153,7 +169,7 @@ managed_array<char> p(new char[req], true); memcpy(p.get(), start_, unread()); full_reader_(p + unread(), req - unread()); - start_ = end_ = buf_.get(); + reset(); return p; } @@ -240,6 +256,11 @@ * The end of the unconsumed range of bytes. */ char *end_; + + /** + * Do not discard bytes >= this. + */ + char *anchor_; }; } Modified: cpp-commons/trunk/src/test/streamreader.cc =================================================================== --- cpp-commons/trunk/src/test/streamreader.cc 2009-03-04 23:31:58 UTC (rev 1251) +++ cpp-commons/trunk/src/test/streamreader.cc 2009-03-05 23:14:45 UTC (rev 1252) @@ -52,13 +52,13 @@ { array<char> rbuf(9); stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); - EXPECT_TRUE(r.accum(sizeof(int))); - EXPECT_TRUE(r.accum(sizeof(int))); - EXPECT_FALSE(r.accum(sizeof(int))); + EXPECT_EQ(0, r.accum(sizeof(int))); + EXPECT_EQ(0, r.accum(sizeof(int))); + EXPECT_EQ(3, r.accum(sizeof(int))); r.shift(); - EXPECT_TRUE(r.accum(sizeof(int))); - EXPECT_TRUE(r.accum(sizeof(int))); - EXPECT_FALSE(r.accum(sizeof(int))); + EXPECT_EQ(0, r.accum(sizeof(int))); + EXPECT_EQ(0, r.accum(sizeof(int))); + EXPECT_EQ(3, r.accum(sizeof(int))); } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 23:32:07
|
Revision: 1251 http://assorted.svn.sourceforge.net/assorted/?rev=1251&view=rev Author: yangzhang Date: 2009-03-04 23:31:58 +0000 (Wed, 04 Mar 2009) Log Message: ----------- moved writer out of ser.h and into commons Modified Paths: -------------- ydb/trunk/src/ser.h Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-04 23:31:02 UTC (rev 1250) +++ ydb/trunk/src/ser.h 2009-03-04 23:31:58 UTC (rev 1251) @@ -4,6 +4,7 @@ #include <commons/array.h> #include <commons/exceptions.h> #include <commons/st/st.h> +#include <commons/streamwriter.h> #include <commons/utility.h> #include <iomanip> #include <iostream> @@ -64,79 +65,7 @@ // TODO try to make all of the following conform to the std interfaces, if // amenable -class writer -{ - NONCOPYABLE(writer) - private: - sized_array<char> a_; - char *unsent_; - char *mark_; - char *p_; - boost::function<void(void*, size_t)> flushcb; - char *reserve(int n, char *p) { - if (p + n > a_.end()) { - // check that the reserved space will fit - assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); - // get rid of what we have - flush(); - size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); - memmove(a_.get() + sizeof(uint32_t), mark_, p_ - mark_); - mark_ = (unsent_ = a_.get()) + sizeof(uint32_t); - p_ -= diff; - p -= diff; - } - return p; - } - char *prefix() { return mark_ - sizeof(uint32_t); } - template<typename T> - void write_(T x, char *p) { - *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; - } - public: - writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : - a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), - p_(mark_), flushcb(flushcb) {} - sized_array<char> &buf() { return a_; } - char *cur() { return p_; } - size_t pos() { return p_ - mark_; } - size_t size() { return a_.size(); } - void mark() { - if (p_ > mark_) { - // prefix last segment with its length - *reinterpret_cast<uint32_t*>(prefix()) = uint32_t(p_ - mark_); - // start new segment - mark_ = (p_ += sizeof(uint32_t)); - } - } - void reset() { p_ = mark_; } - void reserve(int n) { reserve(n, p_); } - void mark_and_flush() { - mark(); - flush(); - mark_ = p_ = (unsent_ = a_.get()) + sizeof(uint32_t); - } - void flush() { - if (prefix() > unsent_) { - flushcb(unsent_, prefix() - unsent_); - unsent_ = prefix(); - } - } - void show() { - cout << static_cast<void*>(p_); - for (size_t i = 0; i < a_.size(); ++i) - cout << " " << hex << setfill('0') << setw(2) - << int(static_cast<unsigned char>(a_.get()[i])); - cout << endl; - cout << static_cast<void*>(p_); - for (size_t i = 0; i < a_.size(); ++i) - cout << " " << setfill(' ') << setw(2) << (i == pos() ? "^^" : ""); - cout << endl; - } - template<typename T> void skip() { reserve(sizeof(T)); p_ += sizeof(T); } - template<typename T> void write(T x) { write_(x, p_); p_ += sizeof x; } - template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } -}; - +typedef stream_writer writer; typedef st_reader reader; class stream This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 23:31:12
|
Revision: 1250 http://assorted.svn.sourceforge.net/assorted/?rev=1250&view=rev Author: yangzhang Date: 2009-03-04 23:31:02 +0000 (Wed, 04 Mar 2009) Log Message: ----------- - refactored st_reader out of st.h into streamreader.h as stream_reader - added stream_writer from ydb (writer) - added unit tests for stream_reader, stream_writer - cleaned up stream_reader a bit - Modified Paths: -------------- cpp-commons/trunk/src/commons/exceptions.h cpp-commons/trunk/src/commons/st/st.h Added Paths: ----------- cpp-commons/trunk/src/commons/streamreader.h cpp-commons/trunk/src/commons/streamwriter.h cpp-commons/trunk/src/test/streamreader.cc cpp-commons/trunk/src/test/streamwriter.cc Modified: cpp-commons/trunk/src/commons/exceptions.h =================================================================== --- cpp-commons/trunk/src/commons/exceptions.h 2009-03-04 23:30:45 UTC (rev 1249) +++ cpp-commons/trunk/src/commons/exceptions.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -19,6 +19,16 @@ const string msg; }; + class msg_exception : public std::exception + { + public: + msg_exception(const string &msg) : msg_(msg) {} + ~msg_exception() throw() {} + const char *what() const throw() { return msg_.c_str(); } + private: + const string msg_; + }; + #define throw_operation_not_supported() throw operation_not_supported(__PRETTY_FUNCTION__) } Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-03-04 23:30:45 UTC (rev 1249) +++ cpp-commons/trunk/src/commons/st/st.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -8,6 +8,7 @@ #include <commons/array.h> #include <commons/delegates.h> #include <commons/nullptr.h> +#include <commons/streamreader.h> #include <commons/sockets.h> #include <commons/utility.h> #include <exception> @@ -372,172 +373,49 @@ std::set<st_thread_t> ts; }; - class eof_exception : public std::exception { - const char *what() const throw() { return "EOF"; } + class st_read_fn + { + private: + st_netfd_t fd_; + st_utime_t to_; + public: + st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + size_t operator()(char *buf, size_t len) { + return size_t(checknnegerr(st_read(fd_, buf, len, to_))); + } }; - /** - * Convenience class for reading from sockets. - */ - class st_reader + class st_read_fully_fn { - NONCOPYABLE(st_reader) - public: - st_reader(st_netfd_t fd, char *buf, size_t bufsize) : - fd_(fd), - buf_(buf, bufsize), - start_(buf_.get()), - end_(buf_.get()) - {} + private: + st_netfd_t fd_; + st_utime_t to_; + public: + st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + void operator()(char *buf, size_t len) { + checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); + } + }; - /** - * The size of the unconsumed range of bytes. - */ - size_t unread() { return end_ - start_; } - - /** - * The remaining number of bytes in the buffer - */ - size_t rem() { return buf_.end() - end_; } - - /** - * The entire read buffer. - */ - sized_array<char> &buf() { return buf_; } - - /** - * Manually update/adjust the pointers; useful after changing buf_. - */ - void reset_range(char *start, char *end) { - start_ = start; - end_ = end; - } - - /** - * Discard the requested number of bytes. - */ - void skip(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { - while (true) { - if (unread() >= req) { - // We have more unconsumed bytes than requested, so we're done. - start_ += req; - break; - } - - // We have more requested bytes than unconsumed, so need to keep - // reading. Skip over bytes... - req -= unread(); - // ...and reset pointers to discard current buffer. - start_ = end_ = buf_.get(); - - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - end_ += res; - - // If we got a premature EOF. - if (res == 0 && unread() < req) throw eof_exception(); - } - } - - /** - * Returns a char array that contains the requested number of bytes. If - * we hit an error or EOF, then an exception is thrown. - */ - managed_array<char> read(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { - // Do we already have the requested data? - if (unread() >= req) { - managed_array<char> p(start_, false); - start_ += req; - return p; - } - - // Handle large arrays specially. - if (req > buf_.size()) { - managed_array<char> p(new char[req], true); - memcpy(p.get(), start_, unread()); - checkeqnneg(st_read_fully(fd_, p + unread(), req - unread(), to), static_cast<ssize_t>(req - unread())); - start_ = end_ = buf_.get(); - return p; - } - - // Shift things down if necessary. - if (req > static_cast<size_t>(buf_.end() - end_)) { - memmove(buf_.get(), start_, unread()); - size_t diff = start_ - buf_.get(); - start_ -= diff; - end_ -= diff; - } - - // Keep reading until we have enough. - while (unread() < req) { - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - if (res == 0) break; - else end_ += res; - } - - // If we got a premature EOF. - if (unread() < req) - throw eof_exception(); - - managed_array<char> p(start_, false); - start_ += req; - return p; - } - - template<typename T> - T read(st_utime_t to = ST_UTIME_NO_TIMEOUT) - { - size_t req = sizeof(T); - - // Do we already have the requested data? - if (unread() >= req) { - T x = *reinterpret_cast<const T*>(start_); - start_ += req; - return x; - } - - assert(req <= buf_.size()); - - // Shift things down if necessary. - if (req > static_cast<size_t>(buf_.end() - end_)) { - memmove(buf_.get(), start_, unread()); - size_t diff = start_ - buf_.get(); - start_ -= diff; - end_ -= diff; - } - - // Keep reading until we have enough. - while (unread() < req) { - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - if (res == 0) break; - else end_ += res; - } - - // If we got a premature EOF. - if (unread() < req) - throw eof_exception(); - - T x = *reinterpret_cast<const T*>(start_); - start_ += req; - return x; - } - - private: - st_netfd_t fd_; - - /** - * The temporary storage buffer. - */ - sized_array<char> buf_; - - /** - * The start of the unconsumed range of bytes. - */ - char *start_; - - /** - * The end of the unconsumed range of bytes. - */ - char *end_; + class st_reader + { + EXPAND(stream_reader) + private: + stream_reader r_; + public: + st_reader(st_netfd_t fd, char *buf, size_t len) : + r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} + sized_array<char> &buf() { return r_.buf(); } + void reset_range(char *start, char *end) { r_.reset_range(start, end); } + char *start() { return r_.start(); } + char *end() { return r_.end(); } + bool accum(size_t req) { return r_.accum(req); } + void skip(size_t req) { r_.skip(req); } + managed_array<char> read(size_t req) { return r_.read(req); } + template<typename T> T read() { return r_.read<T>(); } + void shift() { r_.shift(); } }; } Added: cpp-commons/trunk/src/commons/streamreader.h =================================================================== --- cpp-commons/trunk/src/commons/streamreader.h (rev 0) +++ cpp-commons/trunk/src/commons/streamreader.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,247 @@ +#ifndef COMMONS_STREAMREADER_H +#define COMMONS_STREAMREADER_H + +#include <boost/function.hpp> +#include <commons/array.h> +#include <cstring> + +namespace commons { + + using namespace boost; + using namespace commons; + using namespace std; + + class eof_exception : public std::exception { + const char *what() const throw() { return "EOF"; } + }; + + /** + * Convenience wrapper for full-reading from data source streams. + */ + class repeat_reader + { + private: + boost::function<size_t(char*, size_t)> &reader_; + public: + repeat_reader(boost::function<size_t(char*, size_t)> &reader) + : reader_(reader) {} + void operator()(char *buf, size_t len) { + size_t res = 0; + while (res < len) { + res += reader_(buf + res, len - res); + if (res == 0) throw eof_exception(); + } + assert(res == len); + } + }; + + /** + * General-purpose stream reader for arbitrary data source streams. + */ + class stream_reader + { + NONCOPYABLE(stream_reader) + public: + stream_reader(boost::function<size_t(char*, size_t)> reader, + char *buf, size_t bufsize) : + reader_(reader), + full_reader_(repeat_reader(reader_)), + buf_(buf, bufsize), + start_(buf), + end_(buf) + {} + + stream_reader(boost::function<size_t(char*, size_t)> reader, + boost::function<void(char*, size_t)> full_reader, + char *buf, size_t bufsize) : + reader_(reader), + full_reader_(full_reader), + buf_(buf, bufsize), + start_(buf), + end_(buf) + {} + + /** + * The size of the unconsumed range of bytes. + */ + size_t unread() { return end_ - start_; } + + /** + * The remaining number of bytes in the buffer + */ + size_t rem() { return buf_.end() - end_; } + + /** + * The entire read buffer. + */ + sized_array<char> &buf() { return buf_; } + + /** + * Manually update/adjust the pointers; useful after changing buf_. + */ + void reset_range(char *start, char *end) { + start_ = start; + end_ = end; + } + + char *start() { return start_; } + char *end() { return end_; } + + /** + * Accumulate (but don't read/copy) the requested number of bytes. + */ + bool accum(size_t req) { + while (unread() < req && rem() > 0) { + size_t res = reader_(end_, rem()); + if (res == 0) throw eof_exception(); + end_ += res; + } + if (unread() < req) { + start_ = end_; + return false; + } else { + start_ += req; + return true; + } + } + + /** + * Discard the requested number of bytes. + */ + void skip(size_t req) { + if (unread() >= req) { + // We have more unconsumed bytes than requested, so we're done. + start_ += req; + return; + } + + // We have more requested bytes than unconsumed, so need to keep + // reading. Skip over bytes that are immediately available... + req -= unread(); + // ...and reset pointers to discard current buffer. + start_ = end_ = buf_.get(); + + // Keep reading until we have enough. + while (true) { + size_t res = reader_(end_, rem()); + if (res == 0) throw eof_exception(); + if (res == req) break; + if (res > req) { + // Now skip over the rest of the bytes, which weren't available. + start_ += req; + end_ += res; + break; + } + req -= res; + } + } + + /** + * Returns a char array that contains the requested number of bytes. If + * we hit an error or EOF, then an exception is thrown. + */ + managed_array<char> read(size_t req) { + // Do we already have the requested data? + if (unread() >= req) { + managed_array<char> p(start_, false); + start_ += req; + return p; + } + + // Handle large arrays specially. + if (req > buf_.size()) { + managed_array<char> p(new char[req], true); + memcpy(p.get(), start_, unread()); + full_reader_(p + unread(), req - unread()); + start_ = end_ = buf_.get(); + return p; + } + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) + shift(); + + // Keep reading until we have enough. + while (unread() < req) { + size_t res = reader_(end_, rem()); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (unread() < req) + throw eof_exception(); + + managed_array<char> p(start_, false); + start_ += req; + return p; + } + + template<typename T> + T read() + { + size_t req = sizeof(T); + + // Do we already have the requested data? + if (unread() >= req) { + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + + assert(req <= buf_.size()); + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) + shift(); + + // Keep reading until we have enough. + while (unread() < req) { + size_t res = reader_(end_, rem()); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (unread() < req) + throw eof_exception(); + + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + + /** + * Shift the unread bytes down to the start of the buffer. + */ + void shift() { + memmove(buf_.get(), start_, unread()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + + private: + boost::function<size_t(char*, size_t)> reader_; + + boost::function<void(char*, size_t)> full_reader_; + + /** + * The temporary storage buffer. + */ + sized_array<char> buf_; + + /** + * The start of the unconsumed range of bytes. + */ + char *start_; + + /** + * The end of the unconsumed range of bytes. + */ + char *end_; + }; + +} + +#endif Added: cpp-commons/trunk/src/commons/streamwriter.h =================================================================== --- cpp-commons/trunk/src/commons/streamwriter.h (rev 0) +++ cpp-commons/trunk/src/commons/streamwriter.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,92 @@ +#ifndef COMMONS_STREAMWRITER_H +#define COMMONS_STREAMWRITER_H + +#include <boost/function.hpp> +#include <commons/array.h> +#include <cstring> +#include <iostream> +#include <iomanip> + +namespace commons { + +using namespace boost; +using namespace commons; +using namespace std; + +class stream_writer +{ + NONCOPYABLE(stream_writer) +private: + sized_array<char> a_; + char *unsent_; + char *mark_; + char *p_; + boost::function<void(void*, size_t)> flushcb; + char *reserve(int n, char *p) { + if (p + n > a_.end()) { + // check that the reserved space will fit + assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); + // get rid of what we have + flush(); + size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); + memmove(a_.get() + sizeof(uint32_t), mark_, p_ - mark_); + mark_ = (unsent_ = a_.get()) + sizeof(uint32_t); + p_ -= diff; + p -= diff; + } + return p; + } + char *prefix() { return mark_ - sizeof(uint32_t); } + template<typename T> + void write_(T x, char *p) { + *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; + } +public: + stream_writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : + a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), + p_(mark_), flushcb(flushcb) {} + sized_array<char> &buf() { return a_; } + char *cur() { return p_; } + size_t pos() { return p_ - mark_; } + size_t size() { return a_.size(); } + void mark() { + if (p_ > mark_) { + // prefix last segment with its length + *reinterpret_cast<uint32_t*>(prefix()) = uint32_t(p_ - mark_); + // start new segment + mark_ = (p_ += sizeof(uint32_t)); + } + } + void reset() { p_ = mark_; } + void reserve(int n) { reserve(n, p_); } + void mark_and_flush() { + mark(); + flush(); + mark_ = p_ = (unsent_ = a_.get()) + sizeof(uint32_t); + } + void flush() { + if (prefix() > unsent_) { + flushcb(unsent_, prefix() - unsent_); + unsent_ = prefix(); + } + } + template<typename T> void skip() { reserve(sizeof(T)); p_ += sizeof(T); } + template<typename T> void write(T x) { write_(x, p_); p_ += sizeof x; } + template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } + void show() { + cout << static_cast<void*>(p_); + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << hex << setfill('0') << setw(2) + << int(static_cast<unsigned char>(a_.get()[i])); + cout << endl; + cout << static_cast<void*>(p_); + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << setfill(' ') << setw(2) << (i == pos() ? "^^" : ""); + cout << endl; + } +}; + + +} + +#endif Added: cpp-commons/trunk/src/test/streamreader.cc =================================================================== --- cpp-commons/trunk/src/test/streamreader.cc (rev 0) +++ cpp-commons/trunk/src/test/streamreader.cc 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,69 @@ +#include <commons/array.h> +#include <commons/streamreader.h> +#include <gtest/gtest.h> +using namespace commons; +using namespace testing; + +struct rfn { + size_t chunklen_; + array<char> &src_; + char *cur_; + rfn(size_t chunklen, array<char> &src) + : chunklen_(chunklen), src_(src), cur_(src.get()) {} + size_t operator()(char *buf, size_t len) { + len = min(chunklen_, len); + memcpy(buf, cur_, len); + cur_ += len; + return len; + } +}; + +TEST(reader, read) { + array<char> src(1024); + for (int i = 0; i < 10; ++i) + *(reinterpret_cast<int*>(src.get()) + i) = i; + + for (size_t chunklen = 1; chunklen < 50; ++chunklen) { + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + for (int i = 0; i < 10; ++i) + EXPECT_EQ(i, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + managed_array<char> ma = r.read(5 * sizeof(int)); + array<char> src2(1024); + memcpy(src2.get(), ma.get(), 5 * sizeof(int)); + array<char> rbuf2(1024); + stream_reader r2(rfn(chunklen, src2), rbuf2.get(), rbuf2.size()); + for (int i = 0; i < 5; ++i) + EXPECT_EQ(i, r2.read<int>()); + for (int i = 5; i < 10; ++i) + EXPECT_EQ(i, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + r.skip(9 * sizeof(int)); + EXPECT_EQ(9, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_FALSE(r.accum(sizeof(int))); + r.shift(); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_FALSE(r.accum(sizeof(int))); + } + } +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} Added: cpp-commons/trunk/src/test/streamwriter.cc =================================================================== --- cpp-commons/trunk/src/test/streamwriter.cc (rev 0) +++ cpp-commons/trunk/src/test/streamwriter.cc 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,34 @@ +#include <commons/array.h> +#include <commons/streamwriter.h> +#include <gtest/gtest.h> +using namespace commons; +using namespace testing; + +struct wfn { + array<char> &dst_; + char *cur_; + wfn(array<char> &dst) + : dst_(dst), cur_(dst.get()) {} + void operator()(void *buf, size_t len) { + memcpy(cur_, buf, len); + cur_ += len; + } +}; + +TEST(writer, write) { + array<char> dst(1024); + for (size_t wbufsize = 100; wbufsize < 101; ++wbufsize) { + array<char> wbuf(wbufsize); + stream_writer w(wfn(dst), wbuf.get(), wbuf.size()); + for (int i = 0; i < 10; ++i) + w.write(i); + w.mark_and_flush(); + for (int i = 0; i < 10; ++i) + EXPECT_EQ(i, *(reinterpret_cast<int*>(dst.get()) + i + 1)); + } +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 23:30:56
|
Revision: 1249 http://assorted.svn.sourceforge.net/assorted/?rev=1249&view=rev Author: yangzhang Date: 2009-03-04 23:30:45 +0000 (Wed, 04 Mar 2009) Log Message: ----------- added debug info; moved build tests to test/build/; making way for unit tests Modified Paths: -------------- cpp-commons/trunk/src/test/Makefile Modified: cpp-commons/trunk/src/test/Makefile =================================================================== --- cpp-commons/trunk/src/test/Makefile 2009-03-04 23:30:14 UTC (rev 1248) +++ cpp-commons/trunk/src/test/Makefile 2009-03-04 23:30:45 UTC (rev 1249) @@ -1,4 +1,5 @@ CXXFLAGS = \ + -g3 \ -Wall \ -Werror \ -Wextra \ @@ -30,6 +31,15 @@ -Wvolatile-register-var \ -std=gnu++0x \ -all: $(patsubst %.cc,%.o,$(wildcard *.cc)) +LDLIBS = -lgtest +BUILD_OBJS = $(patsubst build/%.cc,build/%.o,$(wildcard build/*.cc)) +BINS = $(patsubst %.cc,%,$(wildcard *.cc)) -.PHONY: all +all: build $(BINS) + +build: $(BUILD_OBJS) + +clean: + rm -f $(BINS) + +.PHONY: all build clean This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 23:30:29
|
Revision: 1248 http://assorted.svn.sourceforge.net/assorted/?rev=1248&view=rev Author: yangzhang Date: 2009-03-04 23:30:14 +0000 (Wed, 04 Mar 2009) Log Message: ----------- moved build tests to test/build/ Modified Paths: -------------- cpp-commons/trunk/tools/check.bash Modified: cpp-commons/trunk/tools/check.bash =================================================================== --- cpp-commons/trunk/tools/check.bash 2009-03-04 19:37:48 UTC (rev 1247) +++ cpp-commons/trunk/tools/check.bash 2009-03-04 23:30:14 UTC (rev 1248) @@ -25,11 +25,12 @@ } build-tests() { - my-srcs | sed 's/^/#include </; s/$/>/' > test/all.cc + mkdir -p test/build/ + my-srcs | sed 's/^/#include </; s/$/>/' > test/build/all.cc for i in $(my-srcs) - do echo "#include <$i>" | clobber-if-diff test/$(basename ${i%.h}).cc + do echo "#include <$i>" | clobber-if-diff test/build/$(basename ${i%.h}).cc done - make -C test/ + make -C test/ build } check-include-guards() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 19:37:50
|
Revision: 1247 http://assorted.svn.sourceforge.net/assorted/?rev=1247&view=rev Author: yangzhang Date: 2009-03-04 19:37:48 +0000 (Wed, 04 Mar 2009) Log Message: ----------- added program to merge xchat logs Added Paths: ----------- sandbox/trunk/src/one-off-scripts/merge-xchatlogs/ sandbox/trunk/src/one-off-scripts/merge-xchatlogs/merge.py Added: sandbox/trunk/src/one-off-scripts/merge-xchatlogs/merge.py =================================================================== --- sandbox/trunk/src/one-off-scripts/merge-xchatlogs/merge.py (rev 0) +++ sandbox/trunk/src/one-off-scripts/merge-xchatlogs/merge.py 2009-03-04 19:37:48 UTC (rev 1247) @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +from __future__ import with_statement +from path import path +import os.path as pth +import sys, time, re + +src, dst, stage = map(path, sys.argv[1:]) +print 'staging' +dst.copytree(stage) + +print 'new' +new = set(map(path.basename, src.files())) - set(map(path.basename, dst.files())) +news = 0 +for p in new: + path(src / p).copy(stage / p) + news += 1 +print news + +print 'updates' +upd = set(map(path.basename, src.files())) & set(map(path.basename, dst.files())) +pat = re.compile(r' LOGGING AT (.*)\n$') +def findtime(lines): + for line in lines: + m = pat.search(line) + if m: + try: return time.strptime(m.group(1).rstrip()) + except: print >> sys.stderr, repr(line), repr(m.group(1)); raise + return None +upds = 0 +for p in upd: + with file(src / p) as f: tsrc = findtime(reversed(f.readlines())) + with file(dst / p) as f: tdst = findtime(f) + if tsrc is None or tdst is None: tsrc = tdst = None + if tsrc < tdst: + (src / p).copy(stage / p) + with file(dst / p) as g: cnt = g.read() + with file(stage / p, 'a') as f: f.write(cnt) + upds += 1 + else: print ' ', p +print upds Property changes on: sandbox/trunk/src/one-off-scripts/merge-xchatlogs/merge.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-04 06:55:47
|
Revision: 1246 http://assorted.svn.sourceforge.net/assorted/?rev=1246&view=rev Author: yangzhang Date: 2009-03-04 06:55:42 +0000 (Wed, 04 Mar 2009) Log Message: ----------- - added byte length prefixes to marked segments in writer - updated ser Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/main.lzz.clamp 2009-03-04 06:55:42 UTC (rev 1246) @@ -911,6 +911,7 @@ ResponseBatch &resbatch = *presbatch; ser_t serbuf; while (true) { + uint32_t prefix = 0; long long before_read = -1; if (read_thresh > 0) { before_read = current_time_millis(); @@ -918,7 +919,7 @@ { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -931,7 +932,17 @@ w.mark(); resbatch.Clear(); start_res(resbatch); + // XXX + //char *start = reader.start(); + //const Txn &first_txn = batch.txn(0); + //if (txn.seqno() < 0) { + //} else if (txn.seqno() == seqno + 1) { + //} else { + // // Skip entire message. + // reader. + //} for (int t = 0; t < batch.txn_size(); ++t) { + // XXX const Txn &txn = t == 0 ? first_txn : batch.txn(t); const Txn &txn = batch.txn(t); // Regular transaction. const char *action; @@ -955,8 +966,11 @@ } else { if (first_seqno == -1) first_seqno = txn.seqno(); - // Queue up for later processing once a snapshot has been received. - // XXX speed up + // Queue up entire buffer for later processing once a snapshot has + // been received. + // XXX backlog.push(array()); + // Stop the loop. + // XXX t = batch.txn_size(); backlog.push(to_pb_Txn(txn)); action = "backlogged"; } @@ -1053,6 +1067,7 @@ while (true) { finally f(loop_cleanup); + uint32_t prefix = 0; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). @@ -1061,7 +1076,7 @@ try { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -1079,7 +1094,7 @@ // to get all the acks back). st_intr intr(kill_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } for (int i = 0; i < batch.res_size(); ++i) { Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/ser.cc 2009-03-04 06:55:42 UTC (rev 1246) @@ -42,8 +42,9 @@ typedef typename types::Op Op; vector<st_netfd_t> dsts(1, dst); outstream os(dsts); - writer w(os, 90); - reader r(dst); + char *buf = new char[90]; + writer w(os, buf, 90); + reader r(dst, buf, 90); stream s(r,w); string str; const bool show = true; @@ -51,6 +52,7 @@ TxnBatch &batch = *p; for (int i = 0; i < nreps; ++i) { w.mark(); + w.show(); batch.Clear(); start_txn(batch); for (int t = 0; t < 2; ++t) { @@ -69,12 +71,13 @@ if (show) cout << w.pos() << '/' << w.size() << endl; if (types::is_pb()) push(batch, str, os); } + w.mark(); + w.show(); batch.Clear(); start_txn(batch); fin_txn(batch); - w.mark(); + w.mark_and_flush(); w.show(); - w.flush(); if (types::is_pb()) push(batch, str, os); } @@ -86,22 +89,26 @@ typedef typename types::Op Op; vector<st_netfd_t> v; outstream os(v); - writer w(os, 90); - reader r(src); + char *buf = new char[90]; + writer w(os, buf, 90); + reader r(src, buf, 90); stream s(r,w); string str; // XXX const bool show = true; scoped_ptr<TxnBatch> p(new_TxnBatch<TxnBatch>(s)); TxnBatch &batch = *p; while (true) { + uint32_t len; if (types::is_pb()) { - uint32_t len = r.read<uint32_t>(); + len = r.read<uint32_t>(); managed_array<char> a = r.read(len); check(batch.ParseFromArray(a.get(), len)); } else { + len = r.read<uint32_t>(); batch.Clear(); } - if (show) cout << "ntxn " << batch.txn_size() << endl; + if (show) w.show(); + if (show) cout << "len " << len << " ntxn " << batch.txn_size() << endl; if (batch.txn_size() == 0) break; for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/ser.h 2009-03-04 06:55:42 UTC (rev 1246) @@ -69,45 +69,56 @@ NONCOPYABLE(writer) private: sized_array<char> a_; + char *unsent_; + char *mark_; char *p_; - char *mark_; - char *unsent_; boost::function<void(void*, size_t)> flushcb; char *reserve(int n, char *p) { if (p + n > a_.end()) { - assert(size_t(p - mark_ + n) <= a_.size()); + // check that the reserved space will fit + assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); + // get rid of what we have flush(); - size_t diff = mark_ - a_.get(); - memmove(a_.get(), mark_, p_ - mark_); - unsent_ = mark_ = a_.get(); + size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); + memmove(a_.get() + sizeof(uint32_t), mark_, p_ - mark_); + mark_ = (unsent_ = a_.get()) + sizeof(uint32_t); p_ -= diff; p -= diff; } return p; } + char *prefix() { return mark_ - sizeof(uint32_t); } template<typename T> void write_(T x, char *p) { *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; } public: writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : - a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(p_), flushcb(flushcb) {} + a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), + p_(mark_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } char *cur() { return p_; } size_t pos() { return p_ - mark_; } size_t size() { return a_.size(); } - void mark() { mark_ = p_; } + void mark() { + if (p_ > mark_) { + // prefix last segment with its length + *reinterpret_cast<uint32_t*>(prefix()) = uint32_t(p_ - mark_); + // start new segment + mark_ = (p_ += sizeof(uint32_t)); + } + } void reset() { p_ = mark_; } void reserve(int n) { reserve(n, p_); } void mark_and_flush() { mark(); flush(); - unsent_ = mark_ = p_ = a_.get(); + mark_ = p_ = (unsent_ = a_.get()) + sizeof(uint32_t); } void flush() { - if (mark_ - unsent_ > 0) { - flushcb(unsent_, mark_ - unsent_); - unsent_ = mark_; + if (prefix() > unsent_) { + flushcb(unsent_, prefix() - unsent_); + unsent_ = prefix(); } } void show() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-03 18:50:44
|
Revision: 1245 http://assorted.svn.sourceforge.net/assorted/?rev=1245&view=rev Author: yangzhang Date: 2009-03-03 18:50:38 +0000 (Tue, 03 Mar 2009) Log Message: ----------- - fixed bug: was reading caught_up as int instead of char - fixed bug: was not clearing Response in ResponseBatch::add_res() - fixed bug: was serializing resbatch instead of batch in process_txns - add mark_and_flush() so that the buffer can be cleared out - replaced --wal with --twal and --pwal; reintroduced physical logging Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-03 04:44:03 UTC (rev 1244) +++ ydb/trunk/src/main.lzz.clamp 2009-03-03 18:50:38 UTC (rev 1245) @@ -68,7 +68,8 @@ size_t accept_joiner_size, buf_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, + debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, + use_pb, use_pb_res, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -531,7 +532,6 @@ void logbuf(const void *buf, size_t len) { of.write(reinterpret_cast<const char*>(buf), len); } -#if 0 void del(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; @@ -544,7 +544,6 @@ int op = op_commit; out & op; } -#endif private: enum { op_del, op_write, op_commit }; ofstream of; @@ -579,7 +578,7 @@ commons::array<char> rbuf(read_buf_size), wbuf(buf_size); reader r(nullptr, rbuf.get(), rbuf.size()); function<void(const void*, size_t)> fn; - if (use_wal) + if (use_twal) fn = bind(&wal::logbuf, g_wal, _1, _2); else fn = lambda(const void *buf, size_t len) { @@ -609,7 +608,9 @@ if (!newreps.empty() && seqno > 0) { start_txn(batch); fin_txn(batch); - w.mark(); + // TODO: verify that this made the catch-up stream more efficient, + // starting it only at the point necessary + w.mark_and_flush(); if (Types::is_pb()) { if (multirecover) bcastmsg(fds, batch); else sendmsg(fds[0], batch); @@ -684,17 +685,17 @@ bool do_bcast = !fds.empty() && !suppress_txn_msgs; if (Types::is_pb()) { // Broadcast/log/serialize. - if (force_ser || do_bcast || use_wal) { + if (force_ser || do_bcast || use_twal) { serbuf.clear(); ser(serbuf, batch); if (do_bcast) bcastbuf(fds, serbuf); - if (use_wal) g_wal->logbuf(serbuf); + if (use_twal) g_wal->logbuf(serbuf); } } else { // Reset if we have nobody to send to (incl. disk) or if we actually have // no txns (possible due to loop structure; want to avoid to avoid // confusing with the 0-txn message signifying "prepare a recovery msg"). - if (!do_bcast && !use_wal) { + if (!do_bcast && !use_twal) { w.reset(); } } @@ -715,8 +716,7 @@ fin_op(txn); fin_txn(batch); if (Types::is_pb()) bcastmsg(fds, batch); - w.mark(); - w.flush(); + w.mark_and_flush(); } } @@ -760,14 +760,14 @@ case Op::write: { int value = op.value(); - //if (use_wal) wal.write(key, value); + if (use_pwal) g_wal->write(key, value); if (it == map.end()) map[key] = value; else it->second = value; break; } case Op::del: if (it != map.end()) { - //if (use_wal) wal.del(key); + if (use_pwal) g_wal->del(key); map.erase(it); } break; @@ -776,7 +776,7 @@ } if (res != nullptr) fin_result(*res); - //if (use_wal) wal.commit(); + if (use_pwal) g_wal->commit(); } void @@ -880,6 +880,15 @@ // issued more since the Init message). int first_seqno = -1; + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(leader, rbuf.get(), rbuf.size()); + vector<st_netfd_t> leader_v(1, leader); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, wbuf.get(), wbuf.size()); + stream s(reader, w); + finally f(lambda () { long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), @@ -891,17 +900,10 @@ __ref(seqno_caught_up)); } __ref(send_states).push(shared_ptr<Recovery>()); + __ref(w).mark_and_flush(); + st_sleep(1); }); - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - st_reader reader(leader, rbuf.get(), rbuf.size()); - vector<st_netfd_t> leader_v(1, leader); - writer w(lambda(const void *buf, size_t len) { - checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - }, wbuf.get(), wbuf.size()); - stream s(reader, w); - try { scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); TxnBatch &batch = *pbatch; @@ -954,7 +956,7 @@ if (first_seqno == -1) first_seqno = txn.seqno(); // Queue up for later processing once a snapshot has been received. - // XXX + // XXX speed up backlog.push(to_pb_Txn(txn)); action = "backlogged"; } @@ -970,7 +972,7 @@ fin_res(resbatch); if (RTypes::is_pb() && resbatch.res_size() > 0) { serbuf.clear(); - ser(serbuf, batch); + ser(serbuf, resbatch); sendbuf(leader, serbuf); } } else if (multirecover || mypos == 0) { @@ -1086,7 +1088,14 @@ // catching up. If it has, then broadcast a signal so that all response // handlers will know about this event. int rseqno = res.seqno(); + if (rseqno <= last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(rseqno)); bool rcaught_up = res.caught_up(); + for (int r = 0; r < res.result_size(); ++r) { + cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; + } if (!caught_up && rcaught_up) { long long now = current_time_millis(), timediff = now - start_time; caught_up = true; @@ -1476,18 +1485,16 @@ while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); - if (p->seqno() > seqno) { - process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; - } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } + process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } showtput("replayer caught up; from backlog replayed", current_time_millis(), mid_time, seqno, mid_seqno); @@ -1611,8 +1618,10 @@ "use protocol buffers instead of raw buffers for txns") ("use-pb-res", po::bool_switch(&use_pb_res), "use protocol buffers instead of raw buffers for responses") - ("wal", po::bool_switch(&use_wal), - "enable ARIES write-ahead logging") + ("twal", po::bool_switch(&use_twal), + "enable transactional write-ahead logging") + ("pwal", po::bool_switch(&use_pwal), + "enable physical write-ahead logging") ("force-ser", po::bool_switch(&force_ser), "force issue_txns to serialize its Txns") ("leader,l", po::bool_switch(&is_leader), Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-03 04:44:03 UTC (rev 1244) +++ ydb/trunk/src/ser.h 2009-03-03 18:50:38 UTC (rev 1245) @@ -91,7 +91,7 @@ } public: writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : - a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(a_.get()), flushcb(flushcb) {} + a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(p_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } char *cur() { return p_; } size_t pos() { return p_ - mark_; } @@ -99,6 +99,11 @@ void mark() { mark_ = p_; } void reset() { p_ = mark_; } void reserve(int n) { reserve(n, p_); } + void mark_and_flush() { + mark(); + flush(); + unsent_ = mark_ = p_ = a_.get(); + } void flush() { if (mark_ - unsent_ > 0) { flushcb(unsent_, mark_ - unsent_); @@ -220,7 +225,7 @@ void set_seqno(int x) { w_.write(x); } void set_caught_up(char x) { w_.write(x); } int seqno() const { return r_.read<int>(); } - bool caught_up() const { return r_.read<int>(); } + bool caught_up() const { return r_.read<char>(); } void start_result() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } void add_result(int x) { w_.write(x); } void fin_result() { w_.write(nres_, off_ + sizeof(int) + sizeof(char)); } @@ -244,7 +249,7 @@ ResponseBatch(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), res_(s), nres_(unset) {} void Clear() { res_.Clear(); nres_ = unset; off_ = w_.pos(); } void start_res() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } - Response *add_res() { ++nres_; return &res_; } + Response *add_res() { ++nres_; res_.Clear(); return &res_; } void fin_res() { w_.write(nres_, off_); } int res_size() const { if (nres_ == unset) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-03 04:44:09
|
Revision: 1244 http://assorted.svn.sourceforge.net/assorted/?rev=1244&view=rev Author: yangzhang Date: 2009-03-03 04:44:03 +0000 (Tue, 03 Mar 2009) Log Message: ----------- added quick demo of funky while-defined-my syntax Added Paths: ----------- sandbox/trunk/src/pl/while-defined.pl Added: sandbox/trunk/src/pl/while-defined.pl =================================================================== --- sandbox/trunk/src/pl/while-defined.pl (rev 0) +++ sandbox/trunk/src/pl/while-defined.pl 2009-03-03 04:44:03 UTC (rev 1244) @@ -0,0 +1,7 @@ +#!/usr/bin/env perl +use strict; +while (defined(my $infile = glob("*.pl"))) { + open INPUT, "< $infile" or die "can't open $infile!"; + print "$infile\n"; + close INPUT; +} Property changes on: sandbox/trunk/src/pl/while-defined.pl ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-03 04:33:04
|
Revision: 1243 http://assorted.svn.sourceforge.net/assorted/?rev=1243&view=rev Author: yangzhang Date: 2009-03-03 04:33:00 +0000 (Tue, 03 Mar 2009) Log Message: ----------- added simple demo of arguments Added Paths: ----------- sandbox/trunk/src/pl/funargs.pl Added: sandbox/trunk/src/pl/funargs.pl =================================================================== --- sandbox/trunk/src/pl/funargs.pl (rev 0) +++ sandbox/trunk/src/pl/funargs.pl 2009-03-03 04:33:00 UTC (rev 1243) @@ -0,0 +1,13 @@ +#!/usr/bin/env perl + +# @ARGV: for command-line args +# @_: for function args + +sub f { + ($a,$b,$c) = @ARGV; + print "$a $b $c\n"; + print "\@ARGV: @ARGV\n"; + print "\@_: @_\n"; +} + +f(1,2,3); Property changes on: sandbox/trunk/src/pl/funargs.pl ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-02 23:52:44
|
Revision: 1242 http://assorted.svn.sourceforge.net/assorted/?rev=1242&view=rev Author: yangzhang Date: 2009-03-02 23:52:28 +0000 (Mon, 02 Mar 2009) Log Message: ----------- updated and published contact info, projects, links, software recs Modified Paths: -------------- personal-site/trunk/src/index.txt personal-site/trunk/src/recommendations.txt personal-site/trunk/static/plain.css Modified: personal-site/trunk/src/index.txt =================================================================== --- personal-site/trunk/src/index.txt 2009-03-02 23:32:25 UTC (rev 1241) +++ personal-site/trunk/src/index.txt 2009-03-02 23:52:28 UTC (rev 1242) @@ -38,11 +38,11 @@ //item( 'AIM', 'aim:goim?screenname=', 'sorta lame' ); //item( 'MSN', null, 'hotmail.com', 'noneofthegoodnameswereleft' ); //item( 'Yahoo', 'ymsgr:sendIM?', 'overbored' ); - //item( 'Jabber (Google)', 'xmpp:', 'gmail.com', 'yaaang' ); + item( 'Jabber (Google)', 'xmpp:', 'gmail.com', 'yaaang' ); //item( 'Skype', 'callto://', 'yaaang' ); //item( 'FreeNode', 'irc://irc.freenode.org/', 'zeeee' ); - //item( '"Office"', 'MIT/CSAIL <a href="http://whereis.mit.edu/map-jpg?selection=32&Buildings=go">32</a>-<a href="http://www.csail.mit.edu/resources/maps/9G/G908.gif">G908</a>, <a href="http://www.eecs.mit.edu/stata-link.html">Stata Center</a>, <a href="http://maps.google.com/maps?f=q&hl=en&geocode=&q=32+Vassar+St,+Cambridge,+MA+02139&sll=42.357926,-71.093616&sspn=0.00842,0.01575&ie=UTF8&ll=42.362682,-71.093495&spn=0.008419,0.01575&z=16&iwloc=addr">32 Vassar St., Cambridge, MA 02139</a>' ); - //item( 'GPG public key', '<a href="yang.gpg.asc">yang.gpg.asc</a> (<a href="http://pgp.mit.edu:11371/pks/lookup?op=get&search=0xB1E65B60">MIT PKS entry</a>)' ); + item( '"Office"', 'MIT/CSAIL <a href="http://whereis.mit.edu/map-jpg?selection=32&Buildings=go">32</a>-<a href="http://www.csail.mit.edu/resources/maps/9G/G908.gif">G908</a>, <a href="http://www.eecs.mit.edu/stata-link.html">Stata Center</a>, <a href="http://maps.google.com/maps?f=q&hl=en&geocode=&q=32+Vassar+St,+Cambridge,+MA+02139&sll=42.357926,-71.093616&sspn=0.00842,0.01575&ie=UTF8&ll=42.362682,-71.093495&spn=0.008419,0.01575&z=16&iwloc=addr">32 Vassar St., Cambridge, MA 02139</a>' ); + item( 'GPG public key', '<a href="yang.gpg.asc">yang.gpg.asc</a> (<a href="http://pgp.mit.edu:11371/pks/lookup?op=get&search=0xB1E65B60">MIT PKS entry</a>)' ); document.write( '</ul>' ); // ]]> </script> @@ -53,7 +53,7 @@ Projects -------- -Here are the non-confidential projects in which I've been involved. More +Here are the (non-confidential) projects in which I've been involved. More details when I have time! - [H-Store]: a distributed main-memory relational database management system @@ -208,14 +208,19 @@ - 6.431 Probabilistic Systems Analysis (spring 2006) - 6.875 Cryptography and Cryptanalysis (spring 2006) - 6.827 Implicit Parallel Programming (fall 2006) -- 6.033 Computer Systems Engineering (TA, spring 2007) - 6.867 Machine Learning (fall 2007) +Teaching + +- 6.033 Computer Systems Engineering (TA, spring 2007) +- 6.00 Introduction to Computer Science (TA, fall 2008) + <a name="stuff"></a> Stuff ----- +- [My blog](http://y_z.scripts.mit.edu/wp/) - [Links](links.html) - [Software recommendations](recommendations.html) <!-- - [Notes](notes/) --> Modified: personal-site/trunk/src/recommendations.txt =================================================================== --- personal-site/trunk/src/recommendations.txt 2009-03-02 23:32:25 UTC (rev 1241) +++ personal-site/trunk/src/recommendations.txt 2009-03-02 23:52:28 UTC (rev 1242) @@ -1,9 +1,8 @@ % Software Recommendations % Yang Zhang -<!-- -Software recommendations ---> +Underdogs +--------- Some underdogs that more people should know about. @@ -52,16 +51,8 @@ [Gobby]: http://gobby.0x539.de/trac/ [screen]: http://www.gnu.org/software/screen/ -- __[Opera]__: The fastest, smallest browser I've used is also the most usable - and comes with the most features out of the box. Its M2 mail client is - awesome too, but alas, it's too buggy. [KHTML] is probably the - closest-performing engine, but I've found it to be too crash-prone. - -[Opera]: http://www.opera.com/ -[KHTML]: http://en.wikipedia.org/wiki/KHTML - - __[gprof2dot]__: A handy tool for visualizing the callgraph results of gprof, - the Google CPU profiler, python cProfile, and more. + the Google CPU profiler, Python cProfile, and more. [gprof2dot]: http://code.google.com/p/jrfonseca/wiki/Gprof2Dot @@ -71,13 +62,41 @@ [wtf]: http://nmstl.sourceforge.net/doc/nmstl-guide.html [gstlfilt]: http://www.bdsoft.com/tools/stlfilt.html -<!-- +- __[clamp]__: A C++ preprocessor that provides lambda. Boost ScopeExit + provides something similar but less general. + +[clamp]: http://home.clara.net/raoulgough/clamp/ + +- __[Lazy C++]__: A C++ preprocessor that allows you to (mostly) ignore + separation of headers and source files. + +[Lazy C++]: http://www.lazycplusplus.com/ + +- __[Scala]__: An awesome JVM language. Give this a look if you're working on + the JVM and have some freedom to choose your tools. + +[Scala]: http://www.scala-lang.org/ + Preferred Applications +---------------------- -- web browser: [Opera] -- mail client: [Thunderbird] ---> +- Web browser: [Opera]. The fastest, smallest browser I've used is also the + most usable and comes with the most features out of the box. [Chrome] is + making great strides on the Windows-only front, and [Firefox] has a ton of + useful extensions. +[Opera]: http://www.opera.com/ +[Chrome]: http://www.google.com/chrome +[Firefox]: http://www.firefox.com/ + +- Mail: [Thunderbird]. Fast, usable, and generally "good enough" (sorry, + nothing astounding). I've tried various others, from Outlook to Horde to + Pine to Sup, and I think this one annoys me the least. I still prefer the + [Gmail] experience, but my main mailbox is on an IMAP server. + +[Thunderbird]: http://www.mozilla.com/en-US/thunderbird/ +[Gmail]: http://www.gmail.com/ + <!-- vim:ft=mkd:et:sw=2:ts=2:nocin --> Modified: personal-site/trunk/static/plain.css =================================================================== --- personal-site/trunk/static/plain.css 2009-03-02 23:32:25 UTC (rev 1241) +++ personal-site/trunk/static/plain.css 2009-03-02 23:52:28 UTC (rev 1242) @@ -42,8 +42,6 @@ } h1, h2, h3, h4, h5, h6, h1 a, h2 a { - color: gray; /* #527bbd; */ - color: gray; font-size: 18pt; font-weight: normal; font-family: arial; @@ -56,10 +54,11 @@ h1 { border-bottom: 2px solid silver; - letter-spacing: -3px; + color: gray; + letter-spacing: -2px; } h2 { - border-bottom: 2px solid silver; + border-bottom: 2px solid; padding-top: 0.5em; letter-spacing: -2px; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-02 23:32:31
|
Revision: 1241 http://assorted.svn.sourceforge.net/assorted/?rev=1241&view=rev Author: yangzhang Date: 2009-03-02 23:32:25 +0000 (Mon, 02 Mar 2009) Log Message: ----------- published new site Modified Paths: -------------- assorted-site/trunk/index.txt Modified: assorted-site/trunk/index.txt =================================================================== --- assorted-site/trunk/index.txt 2009-03-01 19:45:28 UTC (rev 1240) +++ assorted-site/trunk/index.txt 2009-03-02 23:32:25 UTC (rev 1241) @@ -30,9 +30,9 @@ - JFX Table: an editable table (spreadsheet) widget in [JavaFX] (done) - LZXGrid: an editable table (spreadsheet) widget in [OpenLaszlo] (done) - System utilities - - [GDB Visual Stack Debugger](gdb-stack-viz): a [Python-GDB] extension that + - GDB Visual Stack Debugger: a [Python-GDB] extension that helps visualize the stack; originally designed to aid instruction on stack - smashing (active) + smashing (hiatus) - [UDP Prober](udp-prober): small program that logs the RTTs of periodic UDP pings, and an exercise in using [`boost::asio`] (hiatus) - Throttled Repeater: small program that sends a fixed number of lines at a @@ -43,6 +43,7 @@ - [Cygwin Tools](cygwin-tools): tools for making Cygwin more pleasant to use (done) - Meta programming + - [clamp](clamp): the C++ lambda preprocessor; don't wait for C++0x! (done) - [Simple-Build](simple-build): YAML-based meta-build system for generating Makefiles; you do not want to use this (passive) - [Object Code Generator](object-codegen): currently targets Java @@ -54,7 +55,7 @@ - Tools for various websites/web applications - [Facebook Tools](facebook-tools): scrape a photo album, utilities for pyfacebook, and monitor adds/removes in your friend list (done) - - [Google Tools](google-tools): Google Reader archiver (active) + - [Google Tools](google-tools): Google Reader archiver (hiatus) - Myspace: crawl [MySpace] profiles within $n$ degrees of you for fast searches (done) - O'Reilly Safari: cache text from the [O'Reilly Safari] online bookshelf for @@ -78,13 +79,13 @@ - Wallpaper Tools: tools for managing wallpapers as they are being rotated through (done) - Exploration, experimentation, research - - [YDB](ydb): simple memory store that serves as a research testbed for approaches - to recovery in [VOLTDB] (H-Store) (active) - - [C++ Serialization Benchmark](serialization-bench): simple comparison of the - speed and verbosity of various readily-available approaches/libraries for - serializing lots of small ints (active) - - [C++ Containers Benchmark](container-bench): simple analysis of performance - behavior of various readily-available associative containers (active) + - [YDB](ydb): simple memory store that serves as a research testbed for + approaches to recovery in [H-Store] (active) + - C++ Serialization Benchmark: simple comparison of the speed and verbosity + of various readily-available approaches/libraries for serializing lots of + small ints (active) + - C++ Containers Benchmark: simple analysis of performance behavior of + various readily-available associative containers (active) - TCQ Wavelets: wavelet domain stream query processing for the Data Triage project in TelegraphCQ (done) - [Hash distribution](hash-dist): for observing the distribution of hash @@ -98,7 +99,7 @@ details, bugs, corner cases, features, etc.) (passive) - Miscellaneous - [Music Labeler](music-labeler): a slick GUI for quickly - labeling/categorizing music + labeling/categorizing music (passive) - [Mailing List Filter](mailing-list-filter): deal with high-volume mailing lists by filtering your mailbox for threads in which you were a participant (done) @@ -147,7 +148,7 @@ [js_beautify]: http://elfz.laacz.lv/beautify/ [Rotten Tomatoes]: http://www.rottentomatoes.com/ [Picard Tagger]: http://wiki.musicbrainz.org/PicardTagger -[VOLTDB]: http://db.cs.yale.edu/hstore/ +[H-Store]: http://db.cs.yale.edu/hstore/ What the statuses mean: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-01 19:45:42
|
Revision: 1240 http://assorted.svn.sourceforge.net/assorted/?rev=1240&view=rev Author: yangzhang Date: 2009-03-01 19:45:28 +0000 (Sun, 01 Mar 2009) Log Message: ----------- added demo of clamp and threads Added Paths: ----------- sandbox/trunk/src/cc/clamp2.cc.clamp sandbox/trunk/src/cc/clamp2.mk Added: sandbox/trunk/src/cc/clamp2.cc.clamp =================================================================== --- sandbox/trunk/src/cc/clamp2.cc.clamp (rev 0) +++ sandbox/trunk/src/cc/clamp2.cc.clamp 2009-03-01 19:45:28 UTC (rev 1240) @@ -0,0 +1,24 @@ +#include <boost/thread.hpp> +#include <iostream> +using namespace boost; +using namespace std; +#include "lambda_impl.clamp_h" + +int main() { + const string msgs[3] = {"first", "second", "third"}; + mutex m; + for (int i = 0; i < 3; ++i) { + boost::thread(lambda() { + mutex::scoped_lock l(__ref(m)); + cout << "message from thread " << __ctx(i) + << ": " << __ref(msgs[i]) << endl; + }); + } + return 0; +} + +#if 0 +clamp < clamp2.cc.clamp | sed '1d' > clamp2.cc +g++ clamp2.cc -lboost_thread-gcc43-mt -o clamp2 +./clamp2 +#endif Added: sandbox/trunk/src/cc/clamp2.mk =================================================================== --- sandbox/trunk/src/cc/clamp2.mk (rev 0) +++ sandbox/trunk/src/cc/clamp2.mk 2009-03-01 19:45:28 UTC (rev 1240) @@ -0,0 +1,4 @@ +LDLIBS = -lboost_thread-gcc43-mt +all: clamp2 +clamp2.cc: clamp2.cc.clamp + clamp < $< | sed '1d' > $@ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-26 18:32:56
|
Revision: 1239 http://assorted.svn.sourceforge.net/assorted/?rev=1239&view=rev Author: yangzhang Date: 2009-02-26 18:32:43 +0000 (Thu, 26 Feb 2009) Log Message: ----------- - fixed (single-node) recovery - removed verbose and added separate --*-display flags - refactored interval-checking - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-26 18:28:10 UTC (rev 1238) +++ ydb/trunk/README 2009-02-26 18:32:43 UTC (rev 1239) @@ -454,7 +454,6 @@ - tiny improvement - 1: 366K - 2: 360K -- TODO fix pb recovery - DONE figure out why there's such a dramatic slowdown as the DB grows - ydb @@ -470,6 +469,16 @@ - 5e7: 495K - there was an int overflow bug +- DONE fix pb recovery + - abysmal perf; long wait at the map dump + almost never catch up, but at + least it works + +- TODO speed up backlogging; don't create pb objects, just take buffers + +- TODO fix multi-recovery if necessary + +- TODO speed up map dump; don't use range partitioning, but hash partitioning + - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-26 18:28:10 UTC (rev 1238) +++ ydb/trunk/src/main.lzz.clamp 2009-02-26 18:32:43 UTC (rev 1239) @@ -61,10 +61,12 @@ // Configuration. st_utime_t timeout; -int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size, display_interval; +int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, + stop_on_seqno, batch_size, handle_responses_display, + catch_up_display, issue_display, + process_display; size_t accept_joiner_size, buf_size, read_buf_size; -bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, +bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; @@ -81,7 +83,10 @@ * Convenience function for calculating percentages. */ template<typename T> -double pct(T sub, T tot) { return 100 * double(sub) / double(tot); } +inline double pct(T sub, T tot) +{ + return 100 * double(sub) / double(tot); +} /** * Convenience class for performing long-jumping break. @@ -119,7 +124,7 @@ /** * Look up thread name, or just show thread ID. */ -string +inline string threadname(st_thread_t t = st_thread_self()) { if (threadnames.find(t) != threadnames.end()) { return threadnames[t]; @@ -131,7 +136,7 @@ /** * Debug function for thread names. Remember what we're switching from. */ -void +inline void switch_out_cb() { if (debug_threads) last_thread = st_thread_self(); @@ -142,7 +147,7 @@ /** * Debug function for thread names. Show what we're switching from/to. */ -void switch_in_cb() +inline void switch_in_cb() { if (debug_threads && last_thread != st_thread_self()) { cout << "switching"; @@ -303,7 +308,7 @@ } template<typename T> -void +inline void ser(ser_array &s, const T &msg) { int len = msg.ByteSize(); @@ -320,7 +325,7 @@ * Serialization. */ template<typename T> -void +inline void ser(ostream &s, const T &msg) { uint32_t len = htonl(uint32_t(msg.ByteSize())); @@ -492,7 +497,7 @@ * for avoiding unnecessary copies. */ template <typename T> -T +inline T readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { T msg; @@ -505,7 +510,7 @@ * st_netfd_t. */ template <typename T> -void +inline void readmsg(st_reader &src, T & msg) { managed_array<char> a = src.read(sizeof(uint32_t)); @@ -602,11 +607,14 @@ // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { - if (multirecover) { - bcastmsg(fds, batch); - } else { - sendmsg(fds[0], batch); + start_txn(batch); + fin_txn(batch); + w.mark(); + if (Types::is_pb()) { + if (multirecover) bcastmsg(fds, batch); + else sendmsg(fds[0], batch); } + batch.Clear(); } // Bring in any new members. // TODO more efficient: copy/extend/append @@ -642,15 +650,14 @@ } // Checkpoint. - if (seqno % chkpt == 0) { - if (verbose) - cout << "issued txn " << seqno << endl; + if (check_interval(seqno, yield_interval)) st_sleep(0); + if (check_interval(seqno, issue_display)) { + cout << "issued txn " << seqno << endl; if (timelim > 0 && current_time_millis() - start_time > timelim) { cout << "time's up; issued " << seqno << " txns in " << timelim << " ms" << endl; stop_hub.set(); } - st_sleep(0); } // For debugging purposes. @@ -809,14 +816,20 @@ } #end -template<typename Txn> shared_ptr<pb::Txn> to_pb_Txn(Txn txn); -template<> shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { +template<typename Txn> inline shared_ptr<pb::Txn> to_pb_Txn(Txn txn); +template<> inline shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { return shared_ptr<pb::Txn>(new pb::Txn(txn)); } -template<> shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { +template<> inline shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { shared_ptr<pb::Txn> ptxn(new pb::Txn()); ptxn->set_seqno(txn.seqno()); - // XXX FIXME + for (int o = 0; o < txn.op_size(); ++o) { + pb::Op *pop = ptxn->add_op(); + const msg::Op &op = txn.op(o); + pop->set_type(static_cast<Op_OpType>(op.type())); + pop->set_key(op.key()); + pop->set_value(op.value()); + } return ptxn; } @@ -946,14 +959,12 @@ action = "backlogged"; } - if (txn.seqno() % chkpt == 0) { - if (verbose) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; - } - st_sleep(0); + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << action << " txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; } } fin_res(resbatch); @@ -962,9 +973,10 @@ ser(serbuf, batch); sendbuf(leader, serbuf); } - } else { + } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster + cout << "generating recovery..." << endl; shared_ptr<Recovery> recovery(new Recovery); typedef ::map<int, int> mii_; mii_ map_(map.begin(), map.end()); @@ -1090,13 +1102,15 @@ stop_hub.set(); } } - if (display_interval > 0 && rseqno % display_interval == 0 && rseqno > 0) { + if (check_interval(rseqno, handle_responses_display)) { cout << rid << ": " << "got response " << rseqno << " from " << replica << "; "; long long display_time = current_time_millis(); showtput("handling", display_time, last_display_time, rseqno, - rseqno - display_interval); + rseqno - handle_responses_display); last_display_time = display_time; + } + if (check_interval(rseqno, yield_interval)) { st_sleep(0); } last_seqno = rseqno; @@ -1439,7 +1453,7 @@ for (int i = 0; i < recovery.pair_size(); ++i) { const Recovery_Pair &p = recovery.pair(i); __ref(map)[p.key()] = p.value(); - if (i % chkpt == 0) { + if (i % yield_interval == 0) { if (yield_during_build_up) st_sleep(0); } } @@ -1462,14 +1476,17 @@ while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); - process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); - if (p->seqno() % chkpt == 0) { - if (verbose) + if (p->seqno() > seqno) { + process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { cout << "processed txn " << p->seqno() << " off the backlog; " << "backlog.size = " << backlog.queue().size() << endl; - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } } showtput("replayer caught up; from backlog replayed", @@ -1483,6 +1500,12 @@ stop_hub.insert(st_thread_self()); } +inline bool +check_interval(int seqno, int interval) +{ + return interval > 0 && seqno % interval == interval - 1; +} + int sig_pipe[2]; /** @@ -1559,16 +1582,14 @@ "enable context switch debug outputs") ("profile-threads,q",po::bool_switch(&profile_threads), "enable profiling of threads") - ("verbose,v", po::bool_switch(&verbose), - "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery (for recoverer only)") + "yield periodically during build-up phase of recovery (for recoverer)") ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery (for recoverer only)") + "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), - "recover from multiple hosts, instead of just one (specified via leader only)") + "recover from multiple hosts, instead of just one (specified via leader)") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") ("dump,D", po::bool_switch(&dump), @@ -1585,7 +1606,7 @@ ("count-updates,u",po::bool_switch(&count_updates), "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), - "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") + "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader)") ("use-pb", po::bool_switch(&use_pb), "use protocol buffers instead of raw buffers for txns") ("use-pb-res", po::bool_switch(&use_pb_res), @@ -1597,26 +1618,36 @@ ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), - "exit after the joiner fully recovers (for leader only)") + "exit after the joiner fully recovers (for leader)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), - "number of txns to batch up in each msg (for leader only)") + "number of txns to batch up in each msg (for leader)") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), - "exit after txn seqno is issued (for leader only)") + "exit after txn seqno is issued (for leader)") ("accept-joiner-size,s", po::value<size_t>(&accept_joiner_size)->default_value(0), "accept recovering joiner (start recovery) after DB grows to this size " - "(for leader only)") - ("disp-interval", po::value<int>(&display_interval)->default_value(0), - "after this many txns, print current handling rate") - ("issuing-interval,i", + "(for leader)") + ("handle-responses-display", + po::value<int>(&handle_responses_display)->default_value(0), + "number of responses before printing current handling rate (for leader)") + ("catch-up-display", + po::value<int>(&catch_up_display)->default_value(0), + "number of catch-up txns before printing current recovery rate and queue length (for recoverer)") + ("issue-display", + po::value<int>(&issue_display)->default_value(0), + "number of txns before showing the current issue rate (for leader)") + ("process-display", + po::value<int>(&process_display)->default_value(0), + "number of txns before showing the current issue rate (for worker)") + ("issuing-interval", po::value<int>(&issuing_interval)->default_value(0), - "seconds to sleep between issuing txns (for leader only)") + "seconds to sleep between issuing txns (for leader)") ("min-ops,o", po::value<int>(&min_ops)->default_value(5), - "lower bound on randomly generated number of operations per txn (for leader only)") + "lower bound on randomly generated number of operations per txn (for leader)") ("max-ops,O", po::value<int>(&max_ops)->default_value(5), - "upper bound on randomly generated number of operations per txn (for leader only)") + "upper bound on randomly generated number of operations per txn (for leader)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " @@ -1631,18 +1662,18 @@ "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") - ("chkpt,c", po::value<int>(&chkpt)->default_value(1000), - "number of txns before yielding/verbose printing") + ("yield_interval,y", po::value<int>(&yield_interval)->default_value(10000), + "number of txns before yielding") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") ("write-thresh,w", po::value<long long>(&write_thresh)->default_value(200), - "if positive and any txn write exceeds this, then print a message (for replicas only)") + "if positive and any txn write exceeds this, then print a message") ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), - "if positive and any txn read exceeds this, then print a message (for replicas only)") + "if positive and any txn read exceeds this, then print a message") ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), - "port to listen on (replicas only)") + "port to listen on (for worker)") ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), - "timeout for IO operations (in microseconds)") + "timeout for some IO operations that should actually time out (in microseconds)") ("test", "execute unit tests instead of running the normal system") ("minreps,n", po::value<int>(&minreps)->default_value(2), "minimum number of replicas the system is willing to process txns on"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-26 18:28:17
|
Revision: 1238 http://assorted.svn.sourceforge.net/assorted/?rev=1238&view=rev Author: yangzhang Date: 2009-02-26 18:28:10 +0000 (Thu, 26 Feb 2009) Log Message: ----------- adjusted default numbers Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-26 06:41:25 UTC (rev 1237) +++ ydb/trunk/tools/test.bash 2009-02-26 18:28:10 UTC (rev 1238) @@ -287,7 +287,7 @@ rec-helper() { local leader=$1 shift - : ${seqno:=100000} + : ${seqno:=1000000} tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & sleep .1 # Run initial replicas. @@ -387,7 +387,7 @@ scaling-helper() { local leader=$1 shift - tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 10000000 ${extraargs:-}" & + tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 1000000 ${extraargs:-}" & sleep .1 for rep in "$@" do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader ${extraargs:-}" & This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-26 06:41:33
|
Revision: 1237 http://assorted.svn.sourceforge.net/assorted/?rev=1237&view=rev Author: yangzhang Date: 2009-02-26 06:41:25 +0000 (Thu, 26 Feb 2009) Log Message: ----------- - responses now reuse serialization buffer as well - added --disp-interval for response_handler - fixed int overflow in showtput - increased default -X param - added -pipe -march=native Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-26 03:17:16 UTC (rev 1236) +++ ydb/trunk/README 2009-02-26 06:41:25 UTC (rev 1237) @@ -450,9 +450,26 @@ - almost no diff - 1: 362K - 2: 355K -- TODO make same changes for sending responses +- DONE make same changes for sending responses + - tiny improvement + - 1: 366K + - 2: 360K - TODO fix pb recovery +- DONE figure out why there's such a dramatic slowdown as the DB grows + - ydb + - 1e5: 530K + - 1e6: 428K + - 2e6: 417K + - 3e6: before bug fix: -200K! this is because 3e6*1000 > INT_MAX + - 1e7: after bug fix: 412K + - p2 + - 1e5: 700K + - 1e6: 655K + - 1e7: 501K + - 5e7: 495K + - there was an int overflow bug + - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) @@ -462,7 +479,7 @@ - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) -- TODO reuse the serialization buffer in the pb path of ydb +- TODO batch up the responses until they make large-enough buffer in pb mode - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-26 03:17:16 UTC (rev 1236) +++ ydb/trunk/src/Makefile 2009-02-26 06:41:25 UTC (rev 1237) @@ -34,7 +34,7 @@ OPT := -g3 endif # CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) -CXX := $(WTF) $(CXX) +CXX := $(WTF) $(CXX) -pipe LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ @@ -71,6 +71,7 @@ -Wlong-long \ -Wvolatile-register-var \ -std=gnu++0x \ + -march=native \ $(CXXFLAGS) # \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-26 03:17:16 UTC (rev 1236) +++ ydb/trunk/src/main.lzz.clamp 2009-02-26 06:41:25 UTC (rev 1237) @@ -62,7 +62,7 @@ // Configuration. st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size; + stop_on_seqno, batch_size, display_interval; size_t accept_joiner_size, buf_size, read_buf_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, @@ -778,7 +778,7 @@ { long long time_diff = stop_time - start_time; int count_diff = stop_count - start_count; - double rate = count_diff * 1000 / double(time_diff); + double rate = double(count_diff) * 1000. / double(time_diff); cout << action << " " << count_diff << " txns [" << start_count << ".." << stop_count << "] in " << time_diff << " ms [" @@ -894,6 +894,7 @@ TxnBatch &batch = *pbatch; scoped_ptr<ResponseBatch> presbatch(new_ResponseBatch<ResponseBatch>(s)); ResponseBatch &resbatch = *presbatch; + ser_t serbuf; while (true) { long long before_read = -1; if (read_thresh > 0) { @@ -956,8 +957,11 @@ } } fin_res(resbatch); - if (resbatch.res_size() > 0 && RTypes::is_pb()) - sendmsg(leader, resbatch); + if (RTypes::is_pb() && resbatch.res_size() > 0) { + serbuf.clear(); + ser(serbuf, batch); + sendbuf(leader, serbuf); + } } else { // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster @@ -1028,6 +1032,8 @@ scoped_ptr<ResponseBatch> pbatch(new_ResponseBatch<ResponseBatch>(s)); ResponseBatch &batch = *pbatch; + long long last_display_time = current_time_millis(); + function<void()> loop_cleanup = bind(&response_handler::loop_cleanup, this); @@ -1084,11 +1090,13 @@ stop_hub.set(); } } - if (rseqno % chkpt == 0) { - if (verbose) { - cout << rid << ": "; - cout << "got response " << rseqno << " from " << replica << endl; - } + if (display_interval > 0 && rseqno % display_interval == 0 && rseqno > 0) { + cout << rid << ": " << "got response " << rseqno << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, rseqno, + rseqno - display_interval); + last_display_time = display_time; st_sleep(0); } last_seqno = rseqno; @@ -1598,6 +1606,8 @@ po::value<size_t>(&accept_joiner_size)->default_value(0), "accept recovering joiner (start recovery) after DB grows to this size " "(for leader only)") + ("disp-interval", po::value<int>(&display_interval)->default_value(0), + "after this many txns, print current handling rate") ("issuing-interval,i", po::value<int>(&issuing_interval)->default_value(0), "seconds to sleep between issuing txns (for leader only)") Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-26 03:17:16 UTC (rev 1236) +++ ydb/trunk/tools/test.bash 2009-02-26 06:41:25 UTC (rev 1237) @@ -387,7 +387,7 @@ scaling-helper() { local leader=$1 shift - tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 100000 ${extraargs:-}" & + tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 10000000 ${extraargs:-}" & sleep .1 for rep in "$@" do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader ${extraargs:-}" & This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-26 03:17:23
|
Revision: 1236 http://assorted.svn.sourceforge.net/assorted/?rev=1236&view=rev Author: yangzhang Date: 2009-02-26 03:17:16 +0000 (Thu, 26 Feb 2009) Log Message: ----------- - use ser_array adapter for arrays to stand in as the serialization type (instead of strings) - fixed the types on the rb pb-expander methods - added ser for ser_arrays - removed pb_size; unreliable - added some more notes Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-26 03:15:58 UTC (rev 1235) +++ ydb/trunk/README 2009-02-26 03:17:16 UTC (rev 1236) @@ -446,7 +446,11 @@ - almost no diff - 1: 362K - 2: 350K -- TODO use arrays instead of strings for pb and avoid dyn alloc +- DONE use arrays instead of strings for pb and avoid dyn alloc + - almost no diff + - 1: 362K + - 2: 355K +- TODO make same changes for sending responses - TODO fix pb recovery - TODO refactor st_reader, etc. to be generic opportunistic buffered readers Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-26 03:15:58 UTC (rev 1235) +++ ydb/trunk/src/main.lzz.clamp 2009-02-26 03:17:16 UTC (rev 1236) @@ -52,7 +52,6 @@ #define map_t dense_hash_map typedef pair<int, int> pii; typedef map_t<int, int> mii; -typedef string ser_t; template<typename T> void init_map(T &map) {} template<> void init_map(dense_hash_map<int, int> &map) { @@ -256,6 +255,28 @@ st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; /** + * Adapter for arrays to look like strings (for PB serialization). + */ +class ser_array +{ + commons::array<char> a_; + size_t size_; +public: + ser_array(size_t size = buf_size) : a_(size), size_(0) {} + char *data() const { return a_.get(); } + size_t size() const { return size_; } + void clear() { size_ = 0; } + void stretch(size_t size) { + if (size > a_.size()) + a_.reset(new char[size], size); + size_ = size; + } +}; + +//typedef string ser_t; +typedef ser_array ser_t; + +/** * Serialization. * * TODO: experiment with which method is the fastest: using a string as shown @@ -281,15 +302,18 @@ copy(plen, plen + sizeof len, s.begin()); } -/** - * Helper for getting the cached ByteSize of a message. - */ -template <typename T> -size_t -pb_size(const T &msg) { - // GetCachedSize returns 0 if no cached size. - size_t len = msg.GetCachedSize(); - return len == 0 ? msg.ByteSize() : len; +template<typename T> +void +ser(ser_array &s, const T &msg) +{ + int len = msg.ByteSize(); + + // Grow the array as needed. + s.stretch(len + sizeof(uint32_t)); + + // Serialize message to a buffer with four-byte length prefix. + check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); + *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); } /** @@ -299,7 +323,7 @@ void ser(ostream &s, const T &msg) { - uint32_t len = htonl(uint32_t(pb_size(msg))); + uint32_t len = htonl(uint32_t(msg.ByteSize())); s.write(reinterpret_cast<const char*>(&len), sizeof len); check(msg.SerializeToOstream(&s)); } Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-26 03:15:58 UTC (rev 1235) +++ ydb/trunk/src/ser.h 2009-02-26 03:17:16 UTC (rev 1236) @@ -34,11 +34,12 @@ #define EXPAND_PB \ bool AppendToString(string*) const { throw_operation_not_supported(); } \ + bool SerializeToArray(void*, int) const { throw_operation_not_supported(); } \ bool SerializeToString(string*) const { throw_operation_not_supported(); } \ bool SerializeToOstream(ostream*) const { throw_operation_not_supported(); } \ - bool ParseFromArray(void*, size_t) { throw_operation_not_supported(); } \ - size_t GetCachedSize() const { throw_operation_not_supported(); } \ - size_t ByteSize() const { throw_operation_not_supported(); } \ + bool ParseFromArray(void*, int) { throw_operation_not_supported(); } \ + int GetCachedSize() const { throw_operation_not_supported(); } \ + int ByteSize() const { throw_operation_not_supported(); } \ #define MAKE_TYPE_BATCH(name, ns, b) \ struct name##_types { \ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-26 03:16:07
|
Revision: 1235 http://assorted.svn.sourceforge.net/assorted/?rev=1235&view=rev Author: yangzhang Date: 2009-02-26 03:15:58 +0000 (Thu, 26 Feb 2009) Log Message: ----------- - removed checkpass wrappers around new operator - changed array::reset() to take size Modified Paths: -------------- cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/files.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-02-25 21:13:24 UTC (rev 1234) +++ cpp-commons/trunk/src/commons/array.h 2009-02-26 03:15:58 UTC (rev 1235) @@ -46,13 +46,13 @@ EXPAND(unique_ptr<T[]>) friend void swap<>(array<T> &a, array<T> &b); public: - explicit array(size_t n) : p_(checkpass(new T[n])), n_(n) {} + explicit array(size_t n) : p_(new T[n]), n_(n) {} size_t size() const { return n_; } T *get() const { return p_.get(); } T *release() { return p_.release(); } T *end() const { return this->get() + n_; } T operator[](size_t i) const { return p_[i]; } - void reset(T *p) { p_.reset(p); } + void reset(T *p, size_t n) { p_.reset(p); n_ = n; } private: unique_ptr<T[]> p_; size_t n_; Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-02-25 21:13:24 UTC (rev 1234) +++ cpp-commons/trunk/src/commons/files.h 2009-02-26 03:15:58 UTC (rev 1235) @@ -63,7 +63,7 @@ // TODO Why don't we need (static) cast here? Isn't this a lossy cast? len = sb.st_size; - char *buf = checkpass(new char[len + 1]); + char *buf = new char[len + 1]; checkeqnneg(pread(fd, buf, len, 0), static_cast<ssize_t>(len)); // TODO Use threads to pull data to the correct initial locations? Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-25 21:13:24 UTC (rev 1234) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-26 03:15:58 UTC (rev 1235) @@ -452,7 +452,7 @@ // Handle large arrays specially. if (req > buf_.size()) { - managed_array<char> p(checkpass(new char[req]), true); + managed_array<char> p(new char[req], true); memcpy(p.get(), start_, unread()); checkeqnneg(st_read_fully(fd_, p + unread(), req - unread(), to), static_cast<ssize_t>(req - unread())); start_ = end_ = buf_.get(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-25 21:13:36
|
Revision: 1234 http://assorted.svn.sourceforge.net/assorted/?rev=1234&view=rev Author: yangzhang Date: 2009-02-25 21:13:24 +0000 (Wed, 25 Feb 2009) Log Message: ----------- - removed (commented out) async sending - sendmsg no longer uses bcastmsg - reuse serialized msg in issue_txns - reuse serialization buffer in issue_txns - cleaned up sending code in issue_txns - added more notes Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-25 19:25:53 UTC (rev 1233) +++ ydb/trunk/README 2009-02-25 21:13:24 UTC (rev 1234) @@ -437,13 +437,23 @@ - 1: 518K 467K 359K 333K - 2: 505K 470K 350K 333K - 3: 485K 465K 335K 333K -- TODO get raw-buffer working in wal, 0-node +- DONE get raw-buffer working in wal, 0-node + - 0: 520K (vs 550K using pb) + - rb is a bit slower than pb + - -1: 495K (vs 350K using pb) + +- DONE reuse serialization buffer for pb + - almost no diff + - 1: 362K + - 2: 350K +- TODO use arrays instead of strings for pb and avoid dyn alloc +- TODO fix pb recovery + - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) - TODO try making a streambuf for st_write, then try it in conj with struct-less pb -- TODO fix pb recovery - TODO implement new recovery (add buffer swapping, add buffers to a list) - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) @@ -491,6 +501,8 @@ Longer term +- Dynamically switch between 0-node and n-node modes + - Testing - unit/regression/mock - performance tests Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-25 19:25:53 UTC (rev 1233) +++ ydb/trunk/src/main.lzz.clamp 2009-02-25 21:13:24 UTC (rev 1234) @@ -68,7 +68,7 @@ bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, - suppress_txn_msgs, use_bcast_async, fake_bcast, force_ser, fake_exec; + suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. @@ -304,6 +304,7 @@ check(msg.SerializeToOstream(&s)); } +#if 0 /** * The worker that performs the actual broadcasting. */ @@ -329,14 +330,14 @@ /** * Asynchronous version of the broadcaster. */ -template<typename T> void -bcastmsg_async(const vector<st_netfd_t> &dsts, const T &msg) +bcastbuf_async(const vector<st_netfd_t> &dsts, const ser_t &msg) { shared_ptr<string> p(new string); ser(*p.get(), msg); foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); } +#endif /** * Perform an st_write but warn if it took over write_thresh ms. @@ -362,17 +363,14 @@ } /** - * Send a message to some destinations (sequentially). + * Send a message to some destinations. */ -template<typename T> -void -bcastmsg_sync(const vector<st_netfd_t> &dsts, const T &msg /*, ser_t &s */) +inline void +bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) { - ser_t s; - ser(s, msg); if (!fake_bcast) { foreach (st_netfd_t dst, dsts) { - st_timed_write(dst, s.data(), s.size()); + st_timed_write(dst, msg.data(), msg.size()); } } } @@ -383,22 +381,33 @@ */ template<typename T> inline void -bcastmsg(const vector<st_netfd_t> &dsts, const T &msg /* XXX optimize this , ser_t &s */) +bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) { - if (use_bcast_async) bcastmsg_async(dsts, msg); - else bcastmsg_sync(dsts, msg); + ser_t s; + ser(s, msg); + bcastbuf(dsts, s); } /** * Send a message to a single recipient. */ +inline void +sendbuf(st_netfd_t dst, const ser_t &msg) +{ + if (!fake_bcast) + st_timed_write(dst, msg.data(), msg.size()); +} + +/** + * Send a message to a single recipient. + */ template<typename T> inline void sendmsg(st_netfd_t dst, const T &msg) { - // XXX optimize this - vector<st_netfd_t> dsts(1, dst); - bcastmsg(dsts, msg); + ser_t s; + ser(s, msg); + sendbuf(dst, s); } /** @@ -489,6 +498,7 @@ wal() : of("wal"), out(of) {} template <typename T> void log(const T &msg) { ser(of, msg); } + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } void logbuf(const void *buf, size_t len) { of.write(reinterpret_cast<const char*>(buf), len); } @@ -542,16 +552,6 @@ function<void(const void*, size_t)> fn; if (use_wal) fn = bind(&wal::logbuf, g_wal, _1, _2); - //else if (newreps.empty()) - // fn = lambda(const void *buf, size_t len) { - // // Prepare a new buffer to swap with the writer's current working buffer. - // new buffer; - // // Copy data past the end of the current buffer into the new buffer, so - // // that it's not lost. - // copy(); - // // Swap the current buffer with the new buffer. - // swap(); - // }; else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) @@ -569,6 +569,7 @@ for (int t = 0; t < batch_size; ++t) batch.add_txn(); + ser_t serbuf; while (!stop_hub) { w.mark(); batch.Clear(); @@ -649,25 +650,20 @@ } fin_txn(batch); + bool do_bcast = !fds.empty() && !suppress_txn_msgs; if (Types::is_pb()) { // Broadcast/log/serialize. - // TODO optimize: reuse serialization (have these functions take - // serialized buffers instead of message structures) - if (!fds.empty() && !suppress_txn_msgs) { - bcastmsg(fds, batch); + if (force_ser || do_bcast || use_wal) { + serbuf.clear(); + ser(serbuf, batch); + if (do_bcast) bcastbuf(fds, serbuf); + if (use_wal) g_wal->logbuf(serbuf); } - if (use_wal) { - g_wal->log(batch); - } - if (fds.empty() && suppress_txn_msgs && !use_wal && force_ser) { - string s; - ser(s, batch); - } } else { // Reset if we have nobody to send to (incl. disk) or if we actually have // no txns (possible due to loop structure; want to avoid to avoid // confusing with the 0-txn message signifying "prepare a recovery msg"). - if ((fds.empty() && !use_wal) || batch.txn_size() == 0) { + if (!do_bcast && !use_wal) { w.reset(); } } @@ -1554,8 +1550,6 @@ "when using --bcast-async, don't actually perform the socket write") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") - ("bcast-async", po::bool_switch(&use_bcast_async), - "broadcast messages asynchronously") ("count-updates,u",po::bool_switch(&count_updates), "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), @@ -1673,20 +1667,11 @@ my_spawn(memmon, "memmon"); } - // Start the message broadcaster thread, if requested. - st_thread_t bcaster_thread = use_bcast_async ? - my_spawn(bcaster, "bcaster") : nullptr; - long long start = thread_start_time = current_time_millis(); // At the end, cleanly stop the bcaster thread and print thread profiling // information. finally f(lambda() { - if (use_bcast_async) { - msgs.push(make_pair(nullptr, shared_ptr<string>())); - st_join(__ref(bcaster_thread)); - } - if (profile_threads) { long long end = current_time_millis(); long long all = end - __ref(start); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-25 19:26:01
|
Revision: 1233 http://assorted.svn.sourceforge.net/assorted/?rev=1233&view=rev Author: yangzhang Date: 2009-02-25 19:25:53 +0000 (Wed, 25 Feb 2009) Log Message: ----------- - fixed raw-buffer in 0-node mode and --wal modes: added buffer-sharing between reader and writer - added --read-buf-size - make main.lzz not writeable Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-25 19:25:50 UTC (rev 1232) +++ ydb/trunk/src/Makefile 2009-02-25 19:25:53 UTC (rev 1233) @@ -109,7 +109,9 @@ protoc --cpp_out=. $< %.lzz: %.lzz.clamp + rm -f $@ clamp < $< | sed '1d' > $@ + chmod -w $@ main.o: ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-25 19:25:50 UTC (rev 1232) +++ ydb/trunk/src/main.lzz.clamp 2009-02-25 19:25:53 UTC (rev 1233) @@ -64,7 +64,7 @@ st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, stop_on_seqno, batch_size; -size_t accept_joiner_size, buf_size; +size_t accept_joiner_size, buf_size, read_buf_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, @@ -537,10 +537,21 @@ 0); }); - reader r(nullptr); + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + reader r(nullptr, rbuf.get(), rbuf.size()); function<void(const void*, size_t)> fn; if (use_wal) fn = bind(&wal::logbuf, g_wal, _1, _2); + //else if (newreps.empty()) + // fn = lambda(const void *buf, size_t len) { + // // Prepare a new buffer to swap with the writer's current working buffer. + // new buffer; + // // Copy data past the end of the current buffer into the new buffer, so + // // that it's not lost. + // copy(); + // // Swap the current buffer with the new buffer. + // swap(); + // }; else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) @@ -548,11 +559,15 @@ static_cast<ssize_t>(len)); }; - writer w(fn, buf_size); + char *real_wbuf = newreps.empty() ? rbuf.get() : wbuf.get(); + size_t real_wbuf_size = newreps.empty() ? rbuf.size() : wbuf.size(); + writer w(fn, real_wbuf, real_wbuf_size); stream s(r,w); scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); TxnBatch batch = *pbatch; - for (int t = 0; t < batch_size; ++t) batch.add_txn(); + if (Types::is_pb()) + for (int t = 0; t < batch_size; ++t) + batch.add_txn(); while (!stop_hub) { w.mark(); @@ -569,6 +584,7 @@ } } // Bring in any new members. + // TODO more efficient: copy/extend/append while (!newreps.empty()) { fds.push_back(newreps.take().fd()); } @@ -576,6 +592,7 @@ // Generate some random transactions. start_txn(batch); for (int t = 0; t < batch_size; ++t) { + char *txn_start = w.cur(); Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); int count = randint(min_ops, max_ops + 1); @@ -594,8 +611,9 @@ // Process immediately if not bcasting. if (fds.empty()) { --seqno; + r.reset_range(txn_start, w.cur()); + if (!Types::is_pb()) txn.Clear(); process_txn<Types, pb_types>(g_map, txn, seqno, nullptr); - w.reset(); } // Checkpoint. @@ -630,38 +648,49 @@ ++seqno; } fin_txn(batch); - if (batch.txn_size() == 0) w.reset(); - // Broadcast. - if (Types::is_pb() && !fds.empty() && !suppress_txn_msgs) { - bcastmsg(fds, batch); - } else if (use_wal) { - g_wal->log(batch); - } else if (force_ser) { - string s; - ser(s, batch); + if (Types::is_pb()) { + // Broadcast/log/serialize. + // TODO optimize: reuse serialization (have these functions take + // serialized buffers instead of message structures) + if (!fds.empty() && !suppress_txn_msgs) { + bcastmsg(fds, batch); + } + if (use_wal) { + g_wal->log(batch); + } + if (fds.empty() && suppress_txn_msgs && !use_wal && force_ser) { + string s; + ser(s, batch); + } + } else { + // Reset if we have nobody to send to (incl. disk) or if we actually have + // no txns (possible due to loop structure; want to avoid to avoid + // confusing with the 0-txn message signifying "prepare a recovery msg"). + if ((fds.empty() && !use_wal) || batch.txn_size() == 0) { + w.reset(); + } } - if (fds.empty()) - w.reset(); - // Pause? if (do_pause) do_pause.waitreset(); } // This means "The End." - w.mark(); - batch.Clear(); - start_txn(batch); - Txn &txn = *batch.add_txn(); - txn.set_seqno(-1); - start_op(txn); - fin_op(txn); - fin_txn(batch); - if (Types::is_pb()) bcastmsg(fds, batch); - w.mark(); - w.flush(); + if (!fds.empty()) { + w.mark(); + batch.Clear(); + start_txn(batch); + Txn &txn = *batch.add_txn(); + txn.set_seqno(-1); + start_op(txn); + fin_op(txn); + fin_txn(batch); + if (Types::is_pb()) bcastmsg(fds, batch); + w.mark(); + w.flush(); + } } /** @@ -831,12 +860,13 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - st_reader reader(leader); + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(leader, rbuf.get(), rbuf.size()); vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), static_cast<ssize_t>(len)); - }, buf_size); + }, wbuf.get(), wbuf.size()); stream s(reader, w); try { @@ -968,10 +998,11 @@ finally f(bind(&response_handler::cleanup, this)); - st_reader reader(replica); + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(replica, rbuf.get(), rbuf.size()); writer w(lambda(const void*, size_t) { throw operation_not_supported("response handler should not be writing"); - }, buf_size); + }, wbuf.get(), wbuf.size()); stream s(reader,w); scoped_ptr<ResponseBatch> pbatch(new_ResponseBatch<ResponseBatch>(s)); @@ -1568,6 +1599,8 @@ ("leader-port,P", po::value<uint16_t>(&leader_port)->default_value(7654), "port the leader listens on") + ("read-buf", po::value<size_t>(&read_buf_size)->default_value(1e7), + "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") ("chkpt,c", po::value<int>(&chkpt)->default_value(1000), @@ -1600,8 +1633,6 @@ check(min_ops > 0); check(max_ops > 0); check(max_ops >= min_ops); - - if (minreps == 0 && !use_wal) use_pb = true; // XXX } catch (std::exception &ex) { cerr << ex.what() << endl << endl << desc << endl; return 1; Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-25 19:25:50 UTC (rev 1232) +++ ydb/trunk/src/ser.h 2009-02-25 19:25:53 UTC (rev 1233) @@ -13,13 +13,25 @@ #define END_NAMESPACE } #define MAKE_START_FIN_HELPER(MsgType, field, action) \ - template<typename T> void action##_##field(T &msg); \ - template<> void action##_##field(ydb::pb::MsgType&) {} \ - template<> void action##_##field(ydb::msg::MsgType& msg) { msg.action##_##field(); } + template<typename T> inline void action##_##field(T &msg); \ + template<> inline void action##_##field(ydb::pb::MsgType&) {} \ + template<> inline void action##_##field(ydb::msg::MsgType& msg) { \ + msg.action##_##field(); \ + } + #define MAKE_START_FIN(MsgType, field) \ MAKE_START_FIN_HELPER(MsgType, field, start) \ MAKE_START_FIN_HELPER(MsgType, field, fin) +#define MAKE_TOP_MSG(MsgType) \ + template<typename T> inline T *new_##MsgType(stream &s); \ + template<> inline ydb::pb::MsgType *new_##MsgType(stream &) { \ + return new ydb::pb::MsgType(); \ + } \ + template<> inline ydb::msg::MsgType *new_##MsgType(stream &s) { \ + return new ydb::msg::MsgType(s); \ + } + #define EXPAND_PB \ bool AppendToString(string*) const { throw_operation_not_supported(); } \ bool SerializeToString(string*) const { throw_operation_not_supported(); } \ @@ -28,6 +40,16 @@ size_t GetCachedSize() const { throw_operation_not_supported(); } \ size_t ByteSize() const { throw_operation_not_supported(); } \ +#define MAKE_TYPE_BATCH(name, ns, b) \ + struct name##_types { \ + typedef ydb::ns::TxnBatch TxnBatch; \ + typedef ydb::ns::Txn Txn; \ + typedef ydb::ns::Op Op; \ + typedef ydb::ns::Response Response; \ + typedef ydb::ns::ResponseBatch ResponseBatch; \ + static bool is_pb() { return b; } \ + }; + BEGIN_NAMESPACE(ydb) BEGIN_NAMESPACE(msg) @@ -45,7 +67,7 @@ { NONCOPYABLE(writer) private: - commons::array<char> a_; + sized_array<char> a_; char *p_; char *mark_; char *unsent_; @@ -67,9 +89,10 @@ *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; } public: - writer(boost::function<void(void*, size_t)> flushcb, size_t buf_size) : - a_(buf_size), p_(a_.get()), mark_(p_), unsent_(a_.get()), flushcb(flushcb) {} - commons::array<char> &buf() { return a_; } + writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : + a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(a_.get()), flushcb(flushcb) {} + sized_array<char> &buf() { return a_; } + char *cur() { return p_; } size_t pos() { return p_ - mark_; } size_t size() { return a_.size(); } void mark() { mark_ = p_; } @@ -179,12 +202,9 @@ EXPAND_PB }; -template<typename T> T *new_TxnBatch(stream &s); -template<> ydb::pb::TxnBatch *new_TxnBatch(stream &) { return new ydb::pb::TxnBatch(); } -template<> ydb::msg::TxnBatch *new_TxnBatch(stream &s) { return new ydb::msg::TxnBatch(s); } - MAKE_START_FIN(Txn, op) MAKE_START_FIN(TxnBatch, txn) +MAKE_TOP_MSG(TxnBatch) class Response { @@ -234,32 +254,13 @@ EXPAND_PB }; -template<typename T> T *new_ResponseBatch(stream &s); -template<> ydb::pb::ResponseBatch *new_ResponseBatch(stream &) { return new ydb::pb::ResponseBatch(); } -template<> ydb::msg::ResponseBatch *new_ResponseBatch(stream &s) { return new ydb::msg::ResponseBatch(s); } - MAKE_START_FIN(Response, result) MAKE_START_FIN(ResponseBatch, res) +MAKE_TOP_MSG(ResponseBatch) -struct pb_types { - typedef ydb::pb::TxnBatch TxnBatch; - typedef ydb::pb::Txn Txn; - typedef ydb::pb::Op Op; - typedef ydb::pb::Response Response; - typedef ydb::pb::ResponseBatch ResponseBatch; - static bool is_pb() { return true; } -}; +MAKE_TYPE_BATCH(pb, pb, true) +MAKE_TYPE_BATCH(rb, msg, false) -// rb = raw buffer -struct rb_types { - typedef ydb::msg::TxnBatch TxnBatch; - typedef ydb::msg::Txn Txn; - typedef ydb::msg::Op Op; - typedef ydb::msg::Response Response; - typedef ydb::msg::ResponseBatch ResponseBatch; - static bool is_pb() { return false; } -}; - END_NAMESPACE END_NAMESPACE This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-25 19:25:57
|
Revision: 1232 http://assorted.svn.sourceforge.net/assorted/?rev=1232&view=rev Author: yangzhang Date: 2009-02-25 19:25:50 +0000 (Wed, 25 Feb 2009) Log Message: ----------- - st_reader takes an externally managed buffer and allows for manual pointer adjustment via reset_range() - reset_range - added sized_array and associated swap() - updated setup.bash for the reorganized files Modified Paths: -------------- cpp-commons/trunk/setup.bash cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/setup.bash =================================================================== --- cpp-commons/trunk/setup.bash 2009-02-25 07:45:26 UTC (rev 1231) +++ cpp-commons/trunk/setup.bash 2009-02-25 19:25:50 UTC (rev 1232) @@ -3,4 +3,4 @@ pkg=cpp-commons . simple-setup.bash version=0.1 -install include/ src/commons +install include/ src/{boost,commons,yonat} Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-02-25 07:45:26 UTC (rev 1231) +++ cpp-commons/trunk/src/commons/array.h 2009-02-25 19:25:50 UTC (rev 1232) @@ -12,13 +12,36 @@ using namespace boost; template<typename T> class array; + template<typename T> class sized_array; template<typename T> void swap(array<T> &a, array<T> &b); + template<typename T> void swap(sized_array<T> &a, sized_array<T> &b); + // TODO: rename /** * A thin wrapper around arrays. Like a fixed-size vector. Unlike array * since the size can be dynamically determined. */ template<typename T> + class sized_array { + friend void swap<>(sized_array<T> &a, sized_array<T> &b); + public: + explicit sized_array(char *p, size_t n) : p_(p), n_(n) {} + size_t size() const { return n_; } + T *get() const { return p_; } + T *end() const { return p_ + n_; } + T operator[](size_t i) const { return p_[i]; } + void reset(T *p, size_t n) { p_ = p; n_ = n; } + private: + T *p_; + size_t n_; + }; + + // TODO: rename + /** + * A thin wrapper around arrays. Like a fixed-size vector. Unlike array + * since the size can be dynamically determined. + */ + template<typename T> class array { EXPAND(unique_ptr<T[]>) friend void swap<>(array<T> &a, array<T> &b); @@ -26,9 +49,10 @@ explicit array(size_t n) : p_(checkpass(new T[n])), n_(n) {} size_t size() const { return n_; } T *get() const { return p_.get(); } - void release() { p_.release(); } + T *release() { return p_.release(); } T *end() const { return this->get() + n_; } T operator[](size_t i) const { return p_[i]; } + void reset(T *p) { p_.reset(p); } private: unique_ptr<T[]> p_; size_t n_; @@ -45,7 +69,19 @@ swap(a.n_, b.n_); } + // TODO: try just specifying a single templated function /** + * Swap two arrays. + */ + template<typename T> + void + swap(sized_array<T> &a, sized_array<T> &b) + { + swap(a.p_, b.p_); + swap(a.n_, b.n_); + } + + /** * Conditionally-scoped, move-able, release-able, un-sized array. */ template<typename T> @@ -63,6 +99,7 @@ operator T*() { return p_; } operator const T*() const { return p_; } bool scoped() const { return scoped_; } + bool &scoped() { return scoped_; } private: T *p_; bool scoped_; Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-25 07:45:26 UTC (rev 1231) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-25 19:25:50 UTC (rev 1232) @@ -383,9 +383,9 @@ { NONCOPYABLE(st_reader) public: - st_reader(st_netfd_t fd, size_t bufsize = 1e7) : + st_reader(st_netfd_t fd, char *buf, size_t bufsize) : fd_(fd), - buf_(bufsize), + buf_(buf, bufsize), start_(buf_.get()), end_(buf_.get()) {} @@ -403,9 +403,17 @@ /** * The entire read buffer. */ - array<char> &buf() { return buf_; } + sized_array<char> &buf() { return buf_; } /** + * Manually update/adjust the pointers; useful after changing buf_. + */ + void reset_range(char *start, char *end) { + start_ = start; + end_ = end; + } + + /** * Discard the requested number of bytes. */ void skip(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { @@ -519,7 +527,7 @@ /** * The temporary storage buffer. */ - array<char> buf_; + sized_array<char> buf_; /** * The start of the unconsumed range of bytes. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-25 07:45:31
|
Revision: 1231 http://assorted.svn.sourceforge.net/assorted/?rev=1231&view=rev Author: yangzhang Date: 2009-02-25 07:45:26 +0000 (Wed, 25 Feb 2009) Log Message: ----------- lots of general cleanup - fixed warnings on remaining files - added swap for arrays, size_t - in st_reader, using (the faster) memcpy instead of (the slower) copy, after some experiments - using new check functions in load_file() - added algo.h, with just swap() - fixed missing includes - fixed include guards - added tools for generating build tests, checking include guard names, listing sources - added test Makefile with lots of warnings - moved files around so that third-party sources are not in commons/ - updated the README Modified Paths: -------------- cpp-commons/trunk/README cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/cpuid.h cpp-commons/trunk/src/commons/delegates.h cpp-commons/trunk/src/commons/exceptions.h cpp-commons/trunk/src/commons/files.h cpp-commons/trunk/src/commons/pthread/barrier.h cpp-commons/trunk/src/commons/rand.h cpp-commons/trunk/src/commons/region.h cpp-commons/trunk/src/commons/st/st.h cpp-commons/trunk/src/commons/strings.h cpp-commons/trunk/src/commons/threads.h cpp-commons/trunk/src/commons/x86asm.h Added Paths: ----------- cpp-commons/trunk/src/boost/ cpp-commons/trunk/src/boost/unique_ptr.hpp cpp-commons/trunk/src/commons/algo.h cpp-commons/trunk/src/test/Makefile cpp-commons/trunk/src/yonat/ cpp-commons/trunk/tools/check.bash Removed Paths: ------------- cpp-commons/trunk/src/commons/unique_ptr.hpp cpp-commons/trunk/src/commons/yonat/ cpp-commons/trunk/src/test/all.cc Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/README 2009-02-25 07:45:26 UTC (rev 1231) @@ -14,7 +14,6 @@ - RAII utilities, such as for closing file descriptors and `finally` objects - smart arrays: sized arrays, "managed" (moveable, conditionally scope-destroyed) arrays -- Howard Hinnant's C++03-emulated TR1 `unique_ptr.hpp` - `pool`: fixed-size object pools - bit manipulation - bundles of header includes @@ -40,6 +39,20 @@ - micro-utilities: noncopyable, expander annotations - x86 architecture-specific tools +Third-party code: + +- Howard Hinnant's [C++03-emulated TR1 `unique_ptr.hpp`] +- [Yonat's STL extensions] + - pointainer: auto-cleaning STL container of pointers. Many times this is a + better alternative than using smart pointers (no overhead, no + multi-threading problems). + - pointerator: iterator to T* that behaves like iterator to T. Useful for + iterating pointainers and containers of smart pointers. + - stringizer: turns an object into a std::string. + +[C++03-emulated TR1 `unique_ptr.hpp`]: http://home.roadrunner.com/~hinnant/unique_ptr03.html +[Yonat's STL extensions]: http://ootips.org/yonat/4dev/ + Setup ----- Copied: cpp-commons/trunk/src/boost/unique_ptr.hpp (from rev 1219, cpp-commons/trunk/src/commons/unique_ptr.hpp) =================================================================== --- cpp-commons/trunk/src/boost/unique_ptr.hpp (rev 0) +++ cpp-commons/trunk/src/boost/unique_ptr.hpp 2009-02-25 07:45:26 UTC (rev 1231) @@ -0,0 +1,535 @@ +/////////////////////////////////////////////////////////////////////////////// +// unique_ptr.hpp header file +// +// Copyright 2009 Howard Hinnant, Ion Gaztañaga. +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// See http://www.boost.org/libs/foreach for documentation + +// This is a C++03 emulation of std::unique_ptr placed in namespace boost. +// Reference http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2008/n2800.pdf +// for the latest unique_ptr specification, and +// reference http://www.open-std.org/jtc1/sc22/wg21/docs/lwg-active.html +// for any pending issues against this specification. + +#ifndef UNIQUE_PTR_HPP +#define UNIQUE_PTR_HPP + +#include <boost/utility/enable_if.hpp> +#include <boost/type_traits.hpp> +#include <boost/static_assert.hpp> +#include <boost/mpl/if.hpp> + +namespace boost +{ + +namespace detail_unique_ptr +{ + +typedef char one; +struct two {one _[2];}; + +// An is_convertible<From, To> that considers From an rvalue (consistent with C++0X). +// This is a simplified version neglecting the types function, array, void and abstract types +// I had to make a special case out of is_convertible<T,T> to make move-only +// types happy. + +namespace is_conv_imp +{ +template <class T> one test1(const T&); +template <class T> two test1(...); +template <class T> one test2(T); +template <class T> two test2(...); +template <class T> T source(); +} + +template <class T1, class T2> +struct is_convertible +{ + static const bool value = sizeof(is_conv_imp::test1<T2>(is_conv_imp::source<T1>())) == 1; +}; + +template <class T> +struct is_convertible<T, T> +{ + static const bool value = sizeof(is_conv_imp::test2<T>(is_conv_imp::source<T>())) == 1; +}; + +template <class T> +class rv +{ + T& r_; + +public: + explicit rv(T& r) : r_(r) {} + T* operator->() {return &r_;} + T& operator*() {return r_;} +}; + +template <class T> +struct identity +{ + typedef T type; +}; + +} // detail_unique_ptr + +template <class T> +inline +typename enable_if_c +< + !detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, + T& +>::type +move(T& t) +{ + return t; +} + +template <class T> +inline +typename enable_if_c +< + !detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, + const T& +>::type +move(const T& t) +{ + return t; +} + +template <class T> +inline +typename enable_if_c +< + detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, + T +>::type +move(T& t) +{ + return T(detail_unique_ptr::rv<T>(t)); +} + +template <class T> +inline +typename enable_if_c +< + is_reference<T>::value, + T +>::type +forward(typename detail_unique_ptr::identity<T>::type t) +{ + return t; +} + +template <class T> +inline +typename enable_if_c +< + !is_reference<T>::value, + T +>::type +forward(typename detail_unique_ptr::identity<T>::type& t) +{ + return move(t); +} + +template <class T> +inline +typename enable_if_c +< + !is_reference<T>::value, + T +>::type +forward(const typename detail_unique_ptr::identity<T>::type& t) +{ + return move(const_cast<T&>(t)); +} + +namespace detail_unique_ptr { + +// A move-aware but stripped-down compressed_pair which only optimizes storage for T2 +template <class T1, class T2, bool = is_empty<T2>::value> +class unique_ptr_storage +{ + T1 t1_; + T2 t2_; + + typedef typename add_reference<T2>::type T2_reference; + typedef typename add_reference<const T2>::type T2_const_reference; + + unique_ptr_storage(const unique_ptr_storage&); + unique_ptr_storage& operator=(const unique_ptr_storage&); +public: + operator rv<unique_ptr_storage>() {return rv<unique_ptr_storage>(*this);} + + unique_ptr_storage() : t1_(), t2_() {} + + explicit unique_ptr_storage(T1 t1) + : t1_(move(t1)), t2_() {} + + unique_ptr_storage(T1 t1, T2 t2) + : t1_(move(t1)), t2_(forward<T2>(t2)) {} + + T1& first() {return t1_;} + const T1& first() const {return t1_;} + + T2_reference second() {return t2_;} + T2_const_reference second() const {return t2_;} +}; + +template <class T1, class T2> +class unique_ptr_storage<T1, T2, true> + : private T2 +{ + T1 t1_; + typedef T2 t2_; + + unique_ptr_storage(const unique_ptr_storage&); + unique_ptr_storage& operator=(const unique_ptr_storage&); +public: + operator rv<unique_ptr_storage>() {return rv<unique_ptr_storage>(*this);} + + unique_ptr_storage() : t1_() {} + + explicit unique_ptr_storage(T1 t1) + : t1_(move(t1)) {} + + unique_ptr_storage(T1 t1, T2 t2) + : t2_(move(t2)), t1_(move(t1)) {} + + T1& first() {return t1_;} + const T1& first() const {return t1_;} + + T2& second() {return *this;} + const T2& second() const {return *this;} +}; + +template <class T1, class T2, bool b> +inline +void +swap(unique_ptr_storage<T1, T2, b>& x, unique_ptr_storage<T1, T2, b>& y) +{ + using std::swap; + swap(x.first(), y.first()); + swap(x.second(), y.second()); +} + +} // detail_unique_ptr + +template <class T> +struct default_delete +{ + default_delete() {} + template <class U> + default_delete(const default_delete<U>&, + typename enable_if_c<detail_unique_ptr::is_convertible<U*, T*>::value>::type* = 0) + {} + + void operator()(T* ptr) const + { + BOOST_STATIC_ASSERT(sizeof(T) > 0); + delete ptr; + } +}; + +template <class T> +struct default_delete<T[]> +{ + void operator()(T* ptr) const + { + BOOST_STATIC_ASSERT(sizeof(T) > 0); + delete [] ptr; + } + +private: + + template <class U> void operator()(U*) const; +}; + +namespace detail_unique_ptr +{ + +namespace pointer_type_imp +{ + +template <class U> static two test(...); +template <class U> static one test(typename U::pointer* = 0); + +} // pointer_type_imp + +template <class T> +struct has_pointer_type +{ + static const bool value = sizeof(pointer_type_imp::test<T>(0)) == 1; +}; + +namespace pointer_type_imp +{ + +template <class T, class D, bool = has_pointer_type<D>::value> +struct pointer_type +{ + typedef typename D::pointer type; +}; + +template <class T, class D> +struct pointer_type<T, D, false> +{ + typedef T* type; +}; + +} // pointer_type_imp + +template <class T, class D> +struct pointer_type +{ + typedef typename pointer_type_imp::pointer_type<T, + typename boost::remove_reference<D>::type>::type type; +}; + +} // detail_unique_ptr + +template <class T, class D = default_delete<T> > +class unique_ptr +{ +public: + typedef T element_type; + typedef D deleter_type; + typedef typename detail_unique_ptr::pointer_type<element_type, deleter_type>::type pointer; + +private: + detail_unique_ptr::unique_ptr_storage<pointer, deleter_type> ptr_; + + typedef typename add_reference<deleter_type>::type deleter_reference; + typedef typename add_reference<const deleter_type>::type deleter_const_reference; + + struct nat {int for_bool_;}; + + unique_ptr(unique_ptr&); + unique_ptr& operator=(unique_ptr&); + +public: + operator detail_unique_ptr::rv<unique_ptr>() {return detail_unique_ptr::rv<unique_ptr>(*this);} + unique_ptr(detail_unique_ptr::rv<unique_ptr> r) : ptr_(r->release(), forward<deleter_type>(r->get_deleter())) {} + unique_ptr& operator=(detail_unique_ptr::rv<unique_ptr> r) + { + reset(r->release()); + ptr_.second() = move(r->get_deleter()); + return *this; + } + + unique_ptr() + { + BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); + BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); + } + + explicit unique_ptr(pointer p) + : ptr_(p) + { + BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); + BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); + } + + unique_ptr(pointer p, typename mpl::if_<is_reference<D>, + volatile typename remove_reference<D>::type&, D>::type d) + : ptr_(move(p), forward<D>(const_cast<typename add_reference<D>::type>(d))) {} + + template <class U, class E> + unique_ptr(unique_ptr<U, E> u, + typename enable_if_c + < + !boost::is_array<U>::value && + detail_unique_ptr::is_convertible<typename unique_ptr<U>::pointer, pointer>::value && + detail_unique_ptr::is_convertible<E, deleter_type>::value && + ( + !is_reference<deleter_type>::value || + is_same<deleter_type, E>::value + ) + >::type* = 0) + : ptr_(u.release(), forward<D>(forward<E>(u.get_deleter()))) {} + + ~unique_ptr() {reset();} + + unique_ptr& operator=(int nat::*) + { + reset(); + return *this; + } + + template <class U, class E> + unique_ptr& + operator=(unique_ptr<U, E> u) + { + reset(u.release()); + ptr_.second() = move(u.get_deleter()); + return *this; + } + + typename add_reference<T>::type operator*() const {return *get();} + pointer operator->() const {return get();} + pointer get() const {return ptr_.first();} + deleter_reference get_deleter() {return ptr_.second();} + deleter_const_reference get_deleter() const {return ptr_.second();} + operator int nat::*() const {return get() ? &nat::for_bool_ : 0;} + + void reset(pointer p = pointer()) + { + pointer t = get(); + if (t != pointer()) + get_deleter()(t); + ptr_.first() = p; + } + + pointer release() + { + pointer tmp = get(); + ptr_.first() = pointer(); + return tmp; + } + + void swap(unique_ptr& u) {detail_unique_ptr::swap(ptr_, u.ptr_);} +}; + +template <class T, class D> +class unique_ptr<T[], D> +{ +public: + typedef T element_type; + typedef D deleter_type; + typedef typename detail_unique_ptr::pointer_type<element_type, deleter_type>::type pointer; + +private: + detail_unique_ptr::unique_ptr_storage<pointer, deleter_type> ptr_; + + typedef typename add_reference<deleter_type>::type deleter_reference; + typedef typename add_reference<const deleter_type>::type deleter_const_reference; + + struct nat {int for_bool_;}; + + unique_ptr(unique_ptr&); + unique_ptr& operator=(unique_ptr&); + +public: + operator detail_unique_ptr::rv<unique_ptr>() {return detail_unique_ptr::rv<unique_ptr>(*this);} + unique_ptr(detail_unique_ptr::rv<unique_ptr> r) : ptr_(r->release(), forward<deleter_type>(r->get_deleter())) {} + unique_ptr& operator=(detail_unique_ptr::rv<unique_ptr> r) + { + reset(r->release()); + ptr_.second() = move(r->get_deleter()); + return *this; + } + + unique_ptr() + { + BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); + BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); + } + + explicit unique_ptr(pointer p) + : ptr_(p) + { + BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); + BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); + } + + unique_ptr(pointer p, typename mpl::if_<is_reference<D>, + volatile typename remove_reference<D>::type&, D>::type d) + : ptr_(move(p), forward<D>(const_cast<typename add_reference<D>::type>(d))) {} + + ~unique_ptr() {reset();} + + T& operator[](size_t i) const {return get()[i];} + pointer get() const {return ptr_.first();} + deleter_reference get_deleter() {return ptr_.second();} + deleter_const_reference get_deleter() const {return ptr_.second();} + operator int nat::*() const {return get() ? &nat::for_bool_ : 0;} + + void reset(pointer p = pointer()) + { + pointer t = get(); + if (t != pointer()) + get_deleter()(t); + ptr_.first() = p; + } + + pointer release() + { + pointer tmp = get(); + ptr_.first() = pointer(); + return tmp; + } + + void swap(unique_ptr& u) {detail_unique_ptr::swap(ptr_, u.ptr_);} +private: + template <class U> + explicit unique_ptr(U, + typename enable_if_c<detail_unique_ptr::is_convertible<U, pointer>::value>::type* = 0); + + template <class U> + unique_ptr(U, typename mpl::if_<is_reference<D>, + volatile typename remove_reference<D>::type&, D>::type, + typename enable_if_c<detail_unique_ptr::is_convertible<U, pointer>::value>::type* = 0); +}; + +template<class T, class D> +inline +void +swap(unique_ptr<T, D>& x, unique_ptr<T, D>& y) +{ + x.swap(y); +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator==(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return x.get() == y.get(); +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator!=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return !(x == y); +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator<(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return x.get() < y.get(); +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator<=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return !(y < x); +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator>(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return y < x; +} + +template<class T1, class D1, class T2, class D2> +inline +bool +operator>=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) +{ + return !(x < y); +} + +} // boost + +#endif // UNIQUE_PTR_HPP Added: cpp-commons/trunk/src/commons/algo.h =================================================================== --- cpp-commons/trunk/src/commons/algo.h (rev 0) +++ cpp-commons/trunk/src/commons/algo.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -0,0 +1,16 @@ +#ifndef COMMONS_ALGO_H +#define COMMONS_ALGO_H + +#include <sys/types.h> + +namespace commons { + + void swap(size_t &x, size_t &y) { + x = x ^ y; + y = x ^ y; + x = x ^ y; + } + +} + +#endif Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/array.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -1,15 +1,19 @@ #ifndef COMMONS_ARRAY_H #define COMMONS_ARRAY_H +#include <boost/unique_ptr.hpp> +#include <commons/algo.h> #include <commons/check.h> #include <commons/nullptr.h> -#include <commons/unique_ptr.hpp> #include <commons/utility.h> namespace commons { using namespace boost; + template<typename T> class array; + template<typename T> void swap(array<T> &a, array<T> &b); + /** * A thin wrapper around arrays. Like a fixed-size vector. Unlike array * since the size can be dynamically determined. @@ -17,6 +21,7 @@ template<typename T> class array { EXPAND(unique_ptr<T[]>) + friend void swap<>(array<T> &a, array<T> &b); public: explicit array(size_t n) : p_(checkpass(new T[n])), n_(n) {} size_t size() const { return n_; } @@ -30,6 +35,17 @@ }; /** + * Swap two arrays. + */ + template<typename T> + void + swap(array<T> &a, array<T> &b) + { + swap(a.p_, b.p_); + swap(a.n_, b.n_); + } + + /** * Conditionally-scoped, move-able, release-able, un-sized array. */ template<typename T> Modified: cpp-commons/trunk/src/commons/cpuid.h =================================================================== --- cpp-commons/trunk/src/commons/cpuid.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/cpuid.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -31,7 +31,7 @@ { unsigned int a, b, c, d; cpuid(1, a, b, c, d); - return (unsigned short) (high(b) * 8); + return static_cast<unsigned short>(high(b) * 8); } /** Modified: cpp-commons/trunk/src/commons/delegates.h =================================================================== --- cpp-commons/trunk/src/commons/delegates.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/delegates.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -1,5 +1,5 @@ -#ifndef COMMONS_BOOST_DELEGATES_H -#define COMMONS_BOOST_DELEGATES_H +#ifndef COMMONS_DELEGATES_H +#define COMMONS_DELEGATES_H #include <boost/function.hpp> #include <boost/scoped_ptr.hpp> Modified: cpp-commons/trunk/src/commons/exceptions.h =================================================================== --- cpp-commons/trunk/src/commons/exceptions.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/exceptions.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -2,6 +2,7 @@ #define COMMONS_EXCEPTIONS_H #include <exception> +#include <string> namespace commons { Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/files.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -19,7 +19,7 @@ using namespace std; - class file_not_found_exception : exception { + class file_not_found_exception : std::exception { public: file_not_found_exception(const string & name) : name(name) {} virtual ~file_not_found_exception() throw() {} @@ -50,40 +50,35 @@ * buffer (size of the file). * TODO this probably isn't very safe, since we're demoting an off_t to a * size_t. Is there a healthier approach? - * TODO move to C99 commons */ char * - load_file(const char *path, size_t & len, unsigned int ncpus) { + load_file(const char *path, size_t & len, unsigned int ncpus) + { struct stat sb; - int fd; + int fd = checkpass(open(path, 0)); - fd = open(path, 0); - check(fd >= 0); - - check(fstat(fd, &sb) == 0); + check0x(fstat(fd, &sb)); check(sb.st_size <= 0xffffffff); // TODO Why don't we need (static) cast here? Isn't this a lossy cast? len = sb.st_size; - char *buf = new char[len + 1]; - check(buf); + char *buf = checkpass(new char[len + 1]); + checkeqnneg(pread(fd, buf, len, 0), static_cast<ssize_t>(len)); - ssize_t status = pread(fd, buf, len, 0); - size_t nread = static_cast<size_t>(status); - check(status != -1 && nread == len); - // TODO Use threads to pull data to the correct initial locations? - // size_t chunk_len = len / ncpus; - // for (unsigned int i = 0; i < ncpus; i++) { - // int off = i *chunk_len; - // ssize_t status = pread(fd, buf + off, chunk_len, off); - // // We read the whole chunk or hit the end. - // size_t nread = static_cast<size_t>(status); - // check(status != -1 && (nread == chunk_len || off + nread == len)); - // } + if (false) { + size_t chunk_len = len / ncpus; + for (unsigned int i = 0; i < ncpus; i++) { + size_t off = i * chunk_len; + size_t nread = static_cast<size_t> + (checknnegerr(pread(fd, buf + off, chunk_len, off))); + // We read the whole chunk or hit the end. + check(nread == chunk_len || off + nread == len); + } + } - check(close(fd) == 0); + check0x(close(fd)); buf[len] = '\0'; // don't let strcmp() run off the end return buf; Modified: cpp-commons/trunk/src/commons/pthread/barrier.h =================================================================== --- cpp-commons/trunk/src/commons/pthread/barrier.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/pthread/barrier.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -21,7 +21,8 @@ }; int -pthread_barrier_init(pthread_barrier_t* b, pthread_barrierattr_t* a, unsigned count) +pthread_barrier_init(pthread_barrier_t* b, pthread_barrierattr_t*, + unsigned count) { check0(pthread_mutex_init(&b->lock, NULL)); check0(pthread_cond_init (&b->cond, NULL)); Modified: cpp-commons/trunk/src/commons/rand.h =================================================================== --- cpp-commons/trunk/src/commons/rand.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/rand.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -1,5 +1,5 @@ -#ifndef COMMONS_RAND_H_ -#define COMMONS_RAND_H_ +#ifndef COMMONS_RAND_H +#define COMMONS_RAND_H #include <cassert> #include <cstdlib> // random, RAND_MAX Modified: cpp-commons/trunk/src/commons/region.h =================================================================== --- cpp-commons/trunk/src/commons/region.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/region.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -173,14 +173,16 @@ */ pointer allocate(size_type num, const void* = 0) { - return (pointer) region->alloc_mem(num * sizeof(T)); + return reinterpret_cast<pointer>(region->alloc_mem(num * sizeof(T))); } /** * Initialize elements of allocated storage p with value `value` using * placement new. */ - void construct(pointer p, const T& value) { new((void*)p)T(value); } + void construct(pointer p, const T& value) { + new (static_cast<void*>(p)) T(value); + } /** * Destroy elements of initialized storage p by calling their destructor. Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -401,6 +401,11 @@ size_t rem() { return buf_.end() - end_; } /** + * The entire read buffer. + */ + array<char> &buf() { return buf_; } + + /** * Discard the requested number of bytes. */ void skip(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { @@ -440,7 +445,7 @@ // Handle large arrays specially. if (req > buf_.size()) { managed_array<char> p(checkpass(new char[req]), true); - copy(start_, end_, p.get()); + memcpy(p.get(), start_, unread()); checkeqnneg(st_read_fully(fd_, p + unread(), req - unread(), to), static_cast<ssize_t>(req - unread())); start_ = end_ = buf_.get(); return p; @@ -448,7 +453,7 @@ // Shift things down if necessary. if (req > static_cast<size_t>(buf_.end() - end_)) { - copy(start_, end_, buf_.get()); + memmove(buf_.get(), start_, unread()); size_t diff = start_ - buf_.get(); start_ -= diff; end_ -= diff; @@ -486,7 +491,7 @@ // Shift things down if necessary. if (req > static_cast<size_t>(buf_.end() - end_)) { - copy(start_, end_, buf_.get()); + memmove(buf_.get(), start_, unread()); size_t diff = start_ - buf_.get(); start_ -= diff; end_ -= diff; Modified: cpp-commons/trunk/src/commons/strings.h =================================================================== --- cpp-commons/trunk/src/commons/strings.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/strings.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -49,19 +49,21 @@ /** * Look for a substring, but without null-termination conventions. + * + * TODO: remove or fix */ inline char * - unsafe_strstr(char *p, const char *q, const char *lim) + unsafe_strstr(char *p, const char *, const char *lim) { if (lim == 0) { while (true) { - for (; !(*p == '\0' && *(p+1) == '\0'); p++); + for (; !(*p == '\0' && *(p+1) == '\0'); p++) {} return p; } } else { check(p < lim); while (true) { - for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); + for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++) {} if (p == lim) return NULL; return p; } @@ -74,7 +76,7 @@ inline const char* unsafe_strstr(const char *p, const char *q, const char *lim) { - return unsafe_strstr((char*) p, q, lim); + return unsafe_strstr(const_cast<char*>(p), q, lim); } /** @@ -87,9 +89,9 @@ scan(const void* buf, size_t len) { char sum = 0; - const char* start = (const char*) buf; + const char* start = reinterpret_cast<const char*>(buf); for (const char* p = start; p < start + len; p++) { - sum += *p; + sum = static_cast<char>(sum | *p); } return sum; } @@ -105,7 +107,7 @@ { const size_t sz = 1024; char tmp[sz]; - const char* p = (const char*) buf; + const char* p = reinterpret_cast<const char*>(buf); const char* end = p + len; for (; p + sz < end; p += 1024) { memcpy(tmp, p, sz); Modified: cpp-commons/trunk/src/commons/threads.h =================================================================== --- cpp-commons/trunk/src/commons/threads.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/threads.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -2,6 +2,7 @@ #define COMMONS_THREADS_H #include <pthread.h> +#include <sched.h> #include <sys/syscall.h> #include <sys/types.h> #include <unistd.h> @@ -18,13 +19,19 @@ using namespace boost; /** + * Work-around to make CPU_SET etc. -Wold-style-cast-safe. + */ +#undef __CPUMASK +#define __CPUMASK(cpu) (static_cast<__cpu_mask>(1) << ((cpu) % __NCPUBITS)) + + /** * Get the current thread ID. Glibc does not provide a wrapper for this * system call. */ inline pid_t gettid() { - return (pid_t) syscall(SYS_gettid); + return static_cast<pid_t>(syscall(SYS_gettid)); } /** @@ -60,7 +67,8 @@ spawn(const boost::function<void()>& f) { pthread_t t; - return pthread_create(&t, NULL, &run_function0_null, (void*) new boost::function<void()>(f)) == 0 ? t : 0; + return pthread_create(&t, NULL, &run_function0_null, + new boost::function<void()>(f)) == 0 ? t : 0; } } Deleted: cpp-commons/trunk/src/commons/unique_ptr.hpp =================================================================== --- cpp-commons/trunk/src/commons/unique_ptr.hpp 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/unique_ptr.hpp 2009-02-25 07:45:26 UTC (rev 1231) @@ -1,535 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// unique_ptr.hpp header file -// -// Copyright 2009 Howard Hinnant, Ion Gaztañaga. -// Distributed under the Boost Software License, Version 1.0. (See -// accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) -// See http://www.boost.org/libs/foreach for documentation - -// This is a C++03 emulation of std::unique_ptr placed in namespace boost. -// Reference http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2008/n2800.pdf -// for the latest unique_ptr specification, and -// reference http://www.open-std.org/jtc1/sc22/wg21/docs/lwg-active.html -// for any pending issues against this specification. - -#ifndef UNIQUE_PTR_HPP -#define UNIQUE_PTR_HPP - -#include <boost/utility/enable_if.hpp> -#include <boost/type_traits.hpp> -#include <boost/static_assert.hpp> -#include <boost/mpl/if.hpp> - -namespace boost -{ - -namespace detail_unique_ptr -{ - -typedef char one; -struct two {one _[2];}; - -// An is_convertible<From, To> that considers From an rvalue (consistent with C++0X). -// This is a simplified version neglecting the types function, array, void and abstract types -// I had to make a special case out of is_convertible<T,T> to make move-only -// types happy. - -namespace is_conv_imp -{ -template <class T> one test1(const T&); -template <class T> two test1(...); -template <class T> one test2(T); -template <class T> two test2(...); -template <class T> T source(); -} - -template <class T1, class T2> -struct is_convertible -{ - static const bool value = sizeof(is_conv_imp::test1<T2>(is_conv_imp::source<T1>())) == 1; -}; - -template <class T> -struct is_convertible<T, T> -{ - static const bool value = sizeof(is_conv_imp::test2<T>(is_conv_imp::source<T>())) == 1; -}; - -template <class T> -class rv -{ - T& r_; - -public: - explicit rv(T& r) : r_(r) {} - T* operator->() {return &r_;} - T& operator*() {return r_;} -}; - -template <class T> -struct identity -{ - typedef T type; -}; - -} // detail_unique_ptr - -template <class T> -inline -typename enable_if_c -< - !detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, - T& ->::type -move(T& t) -{ - return t; -} - -template <class T> -inline -typename enable_if_c -< - !detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, - const T& ->::type -move(const T& t) -{ - return t; -} - -template <class T> -inline -typename enable_if_c -< - detail_unique_ptr::is_convertible<T, detail_unique_ptr::rv<T> >::value, - T ->::type -move(T& t) -{ - return T(detail_unique_ptr::rv<T>(t)); -} - -template <class T> -inline -typename enable_if_c -< - is_reference<T>::value, - T ->::type -forward(typename detail_unique_ptr::identity<T>::type t) -{ - return t; -} - -template <class T> -inline -typename enable_if_c -< - !is_reference<T>::value, - T ->::type -forward(typename detail_unique_ptr::identity<T>::type& t) -{ - return move(t); -} - -template <class T> -inline -typename enable_if_c -< - !is_reference<T>::value, - T ->::type -forward(const typename detail_unique_ptr::identity<T>::type& t) -{ - return move(const_cast<T&>(t)); -} - -namespace detail_unique_ptr { - -// A move-aware but stripped-down compressed_pair which only optimizes storage for T2 -template <class T1, class T2, bool = is_empty<T2>::value> -class unique_ptr_storage -{ - T1 t1_; - T2 t2_; - - typedef typename add_reference<T2>::type T2_reference; - typedef typename add_reference<const T2>::type T2_const_reference; - - unique_ptr_storage(const unique_ptr_storage&); - unique_ptr_storage& operator=(const unique_ptr_storage&); -public: - operator rv<unique_ptr_storage>() {return rv<unique_ptr_storage>(*this);} - - unique_ptr_storage() : t1_(), t2_() {} - - explicit unique_ptr_storage(T1 t1) - : t1_(move(t1)), t2_() {} - - unique_ptr_storage(T1 t1, T2 t2) - : t1_(move(t1)), t2_(forward<T2>(t2)) {} - - T1& first() {return t1_;} - const T1& first() const {return t1_;} - - T2_reference second() {return t2_;} - T2_const_reference second() const {return t2_;} -}; - -template <class T1, class T2> -class unique_ptr_storage<T1, T2, true> - : private T2 -{ - T1 t1_; - typedef T2 t2_; - - unique_ptr_storage(const unique_ptr_storage&); - unique_ptr_storage& operator=(const unique_ptr_storage&); -public: - operator rv<unique_ptr_storage>() {return rv<unique_ptr_storage>(*this);} - - unique_ptr_storage() : t1_() {} - - explicit unique_ptr_storage(T1 t1) - : t1_(move(t1)) {} - - unique_ptr_storage(T1 t1, T2 t2) - : t2_(move(t2)), t1_(move(t1)) {} - - T1& first() {return t1_;} - const T1& first() const {return t1_;} - - T2& second() {return *this;} - const T2& second() const {return *this;} -}; - -template <class T1, class T2, bool b> -inline -void -swap(unique_ptr_storage<T1, T2, b>& x, unique_ptr_storage<T1, T2, b>& y) -{ - using std::swap; - swap(x.first(), y.first()); - swap(x.second(), y.second()); -} - -} // detail_unique_ptr - -template <class T> -struct default_delete -{ - default_delete() {} - template <class U> - default_delete(const default_delete<U>&, - typename enable_if_c<detail_unique_ptr::is_convertible<U*, T*>::value>::type* = 0) - {} - - void operator()(T* ptr) const - { - BOOST_STATIC_ASSERT(sizeof(T) > 0); - delete ptr; - } -}; - -template <class T> -struct default_delete<T[]> -{ - void operator()(T* ptr) const - { - BOOST_STATIC_ASSERT(sizeof(T) > 0); - delete [] ptr; - } - -private: - - template <class U> void operator()(U*) const; -}; - -namespace detail_unique_ptr -{ - -namespace pointer_type_imp -{ - -template <class U> static two test(...); -template <class U> static one test(typename U::pointer* = 0); - -} // pointer_type_imp - -template <class T> -struct has_pointer_type -{ - static const bool value = sizeof(pointer_type_imp::test<T>(0)) == 1; -}; - -namespace pointer_type_imp -{ - -template <class T, class D, bool = has_pointer_type<D>::value> -struct pointer_type -{ - typedef typename D::pointer type; -}; - -template <class T, class D> -struct pointer_type<T, D, false> -{ - typedef T* type; -}; - -} // pointer_type_imp - -template <class T, class D> -struct pointer_type -{ - typedef typename pointer_type_imp::pointer_type<T, - typename boost::remove_reference<D>::type>::type type; -}; - -} // detail_unique_ptr - -template <class T, class D = default_delete<T> > -class unique_ptr -{ -public: - typedef T element_type; - typedef D deleter_type; - typedef typename detail_unique_ptr::pointer_type<element_type, deleter_type>::type pointer; - -private: - detail_unique_ptr::unique_ptr_storage<pointer, deleter_type> ptr_; - - typedef typename add_reference<deleter_type>::type deleter_reference; - typedef typename add_reference<const deleter_type>::type deleter_const_reference; - - struct nat {int for_bool_;}; - - unique_ptr(unique_ptr&); - unique_ptr& operator=(unique_ptr&); - -public: - operator detail_unique_ptr::rv<unique_ptr>() {return detail_unique_ptr::rv<unique_ptr>(*this);} - unique_ptr(detail_unique_ptr::rv<unique_ptr> r) : ptr_(r->release(), forward<deleter_type>(r->get_deleter())) {} - unique_ptr& operator=(detail_unique_ptr::rv<unique_ptr> r) - { - reset(r->release()); - ptr_.second() = move(r->get_deleter()); - return *this; - } - - unique_ptr() - { - BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); - BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); - } - - explicit unique_ptr(pointer p) - : ptr_(p) - { - BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); - BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); - } - - unique_ptr(pointer p, typename mpl::if_<is_reference<D>, - volatile typename remove_reference<D>::type&, D>::type d) - : ptr_(move(p), forward<D>(const_cast<typename add_reference<D>::type>(d))) {} - - template <class U, class E> - unique_ptr(unique_ptr<U, E> u, - typename enable_if_c - < - !boost::is_array<U>::value && - detail_unique_ptr::is_convertible<typename unique_ptr<U>::pointer, pointer>::value && - detail_unique_ptr::is_convertible<E, deleter_type>::value && - ( - !is_reference<deleter_type>::value || - is_same<deleter_type, E>::value - ) - >::type* = 0) - : ptr_(u.release(), forward<D>(forward<E>(u.get_deleter()))) {} - - ~unique_ptr() {reset();} - - unique_ptr& operator=(int nat::*) - { - reset(); - return *this; - } - - template <class U, class E> - unique_ptr& - operator=(unique_ptr<U, E> u) - { - reset(u.release()); - ptr_.second() = move(u.get_deleter()); - return *this; - } - - typename add_reference<T>::type operator*() const {return *get();} - pointer operator->() const {return get();} - pointer get() const {return ptr_.first();} - deleter_reference get_deleter() {return ptr_.second();} - deleter_const_reference get_deleter() const {return ptr_.second();} - operator int nat::*() const {return get() ? &nat::for_bool_ : 0;} - - void reset(pointer p = pointer()) - { - pointer t = get(); - if (t != pointer()) - get_deleter()(t); - ptr_.first() = p; - } - - pointer release() - { - pointer tmp = get(); - ptr_.first() = pointer(); - return tmp; - } - - void swap(unique_ptr& u) {detail_unique_ptr::swap(ptr_, u.ptr_);} -}; - -template <class T, class D> -class unique_ptr<T[], D> -{ -public: - typedef T element_type; - typedef D deleter_type; - typedef typename detail_unique_ptr::pointer_type<element_type, deleter_type>::type pointer; - -private: - detail_unique_ptr::unique_ptr_storage<pointer, deleter_type> ptr_; - - typedef typename add_reference<deleter_type>::type deleter_reference; - typedef typename add_reference<const deleter_type>::type deleter_const_reference; - - struct nat {int for_bool_;}; - - unique_ptr(unique_ptr&); - unique_ptr& operator=(unique_ptr&); - -public: - operator detail_unique_ptr::rv<unique_ptr>() {return detail_unique_ptr::rv<unique_ptr>(*this);} - unique_ptr(detail_unique_ptr::rv<unique_ptr> r) : ptr_(r->release(), forward<deleter_type>(r->get_deleter())) {} - unique_ptr& operator=(detail_unique_ptr::rv<unique_ptr> r) - { - reset(r->release()); - ptr_.second() = move(r->get_deleter()); - return *this; - } - - unique_ptr() - { - BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); - BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); - } - - explicit unique_ptr(pointer p) - : ptr_(p) - { - BOOST_STATIC_ASSERT(!is_reference<deleter_type>::value); - BOOST_STATIC_ASSERT(!is_pointer<deleter_type>::value); - } - - unique_ptr(pointer p, typename mpl::if_<is_reference<D>, - volatile typename remove_reference<D>::type&, D>::type d) - : ptr_(move(p), forward<D>(const_cast<typename add_reference<D>::type>(d))) {} - - ~unique_ptr() {reset();} - - T& operator[](size_t i) const {return get()[i];} - pointer get() const {return ptr_.first();} - deleter_reference get_deleter() {return ptr_.second();} - deleter_const_reference get_deleter() const {return ptr_.second();} - operator int nat::*() const {return get() ? &nat::for_bool_ : 0;} - - void reset(pointer p = pointer()) - { - pointer t = get(); - if (t != pointer()) - get_deleter()(t); - ptr_.first() = p; - } - - pointer release() - { - pointer tmp = get(); - ptr_.first() = pointer(); - return tmp; - } - - void swap(unique_ptr& u) {detail_unique_ptr::swap(ptr_, u.ptr_);} -private: - template <class U> - explicit unique_ptr(U, - typename enable_if_c<detail_unique_ptr::is_convertible<U, pointer>::value>::type* = 0); - - template <class U> - unique_ptr(U, typename mpl::if_<is_reference<D>, - volatile typename remove_reference<D>::type&, D>::type, - typename enable_if_c<detail_unique_ptr::is_convertible<U, pointer>::value>::type* = 0); -}; - -template<class T, class D> -inline -void -swap(unique_ptr<T, D>& x, unique_ptr<T, D>& y) -{ - x.swap(y); -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator==(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return x.get() == y.get(); -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator!=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return !(x == y); -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator<(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return x.get() < y.get(); -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator<=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return !(y < x); -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator>(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return y < x; -} - -template<class T1, class D1, class T2, class D2> -inline -bool -operator>=(const unique_ptr<T1, D1>& x, const unique_ptr<T2, D2>& y) -{ - return !(x < y); -} - -} // boost - -#endif // UNIQUE_PTR_HPP Modified: cpp-commons/trunk/src/commons/x86asm.h =================================================================== --- cpp-commons/trunk/src/commons/x86asm.h 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/commons/x86asm.h 2009-02-25 07:45:26 UTC (rev 1231) @@ -11,7 +11,7 @@ inline unsigned char high(unsigned int r) { - return ((r >> 8) & 0xffU); + return static_cast<unsigned char>((r >> 8) & 0xffU); } /** @@ -21,7 +21,7 @@ inline unsigned char low(unsigned int r) { - return (r & 0xffU); + return static_cast<unsigned char>(r & 0xffU); } } Added: cpp-commons/trunk/src/test/Makefile =================================================================== --- cpp-commons/trunk/src/test/Makefile (rev 0) +++ cpp-commons/trunk/src/test/Makefile 2009-02-25 07:45:26 UTC (rev 1231) @@ -0,0 +1,35 @@ +CXXFLAGS = \ + -Wall \ + -Werror \ + -Wextra \ + -Wstrict-null-sentinel \ + -Wold-style-cast \ + -Woverloaded-virtual \ + -Wsign-promo \ + -Wformat=2 \ + -Winit-self \ + -Wswitch-enum \ + -Wunused \ + -Wfloat-equal \ + -Wundef \ + -Wunsafe-loop-optimizations \ + -Wpointer-arith \ + -Wcast-qual \ + -Wcast-align \ + -Wwrite-strings \ + -Wconversion \ + -Wlogical-op \ + -Wno-aggregate-return \ + -Wno-missing-declarations \ + -Wno-missing-field-initializers \ + -Wmissing-format-attribute \ + -Wpacked \ + -Wredundant-decls \ + -Winvalid-pch \ + -Wlong-long \ + -Wvolatile-register-var \ + -std=gnu++0x \ + +all: $(patsubst %.cc,%.o,$(wildcard *.cc)) + +.PHONY: all Deleted: cpp-commons/trunk/src/test/all.cc =================================================================== --- cpp-commons/trunk/src/test/all.cc 2009-02-24 20:03:15 UTC (rev 1230) +++ cpp-commons/trunk/src/test/all.cc 2009-02-25 07:45:26 UTC (rev 1231) @@ -1,35 +0,0 @@ -/*************************************************************************** - * Copyright (C) 2007 by Yang Zhang * - * gmail:yaaang * - * * - * This program is free software; you can redistribute it and/or modify * - * it under the terms of the GNU Library General Public License as * - * published by the Free Software Foundation; either version 2 of the * - * License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU Library General Public * - * License along with this program; if not, write to the * - * Free Software Foundation, Inc., * - * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * - ***************************************************************************/ - - -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <commons/boost/threads.h> -#include <commons/check.h> -#include <commons/cpuid.h> -#include <commons/files.h> -#include <commons/hash.h> -#include <commons/strings.h> -#include <commons/time.h> -#include <commons/threads.h> -#include <commons/x86asm.h> - Property changes on: cpp-commons/trunk/src/yonat ___________________________________________________________________ Added: svn:mergeinfo + Added: cpp-commons/trunk/tools/check.bash =================================================================== --- cpp-commons/trunk/tools/check.bash (rev 0) +++ cpp-commons/trunk/tools/check.bash 2009-02-25 07:45:26 UTC (rev 1231) @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +. common.bash || exit 1 + +cd "$(dirname $0)/../src/" + +srcs() { + find boost/ commons/ yonat/ -name '*.h' -o -name '*.hpp' +} + +my-srcs() { + find commons/ -name '*.h' +} + +clobber-if-diff() { + if [[ -f "$1" ]] ; then + cat > "$1.tmp" + if diff "$1" "$1.tmp" > /dev/null + then rm "$1.tmp" + else mv "$1.tmp" "$1" + fi + else + cat > "$1" + fi +} + +build-tests() { + my-srcs | sed 's/^/#include </; s/$/>/' > test/all.cc + for i in $(my-srcs) + do echo "#include <$i>" | clobber-if-diff test/$(basename ${i%.h}).cc + done + make -C test/ +} + +check-include-guards() { + python -c " +from __future__ import with_statement +import re, sys +def check(x, y): + if not x == y: + raise Exception('failed: %s != %s' % (x,y)) +for path in sys.argv[1:]: + expected = '#ifndef ' + re.sub(r'[/\\.]', '_', path).upper() + with file(path) as f: + for line in f: + if line.strip() == expected: + break + else: + raise Exception('line not found: ' + expected) +" $(my-srcs) +} + +eval "$@" Property changes on: cpp-commons/trunk/tools/check.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-24 20:03:20
|
Revision: 1230 http://assorted.svn.sourceforge.net/assorted/?rev=1230&view=rev Author: yangzhang Date: 2009-02-24 20:03:15 +0000 (Tue, 24 Feb 2009) Log Message: ----------- added trial at beating memcpy Added Paths: ----------- sandbox/trunk/src/cc/copy.cc Added: sandbox/trunk/src/cc/copy.cc =================================================================== --- sandbox/trunk/src/cc/copy.cc (rev 0) +++ sandbox/trunk/src/cc/copy.cc 2009-02-24 20:03:15 UTC (rev 1230) @@ -0,0 +1,100 @@ +// Just can't beat memcpy! +#include <cstring> +#include <algorithm> +#include <commons/time.h> +using namespace commons; +using namespace std; +enum { size = (int)200e6, reps = 5 }; +void mydumbmemcpy(char *q, char *p, size_t size) { + for (size_t i = 0; i < size; ++i) + q[i] = p[i]; +} +void mycopy(char *p, char *end, char *q) { + for (; p < end; ++p, ++q) + *q = *p; +} +void mymemcpy(char *q, char *p, size_t size) { + long long *qq = reinterpret_cast<long long*>(q), + *pp = reinterpret_cast<long long*>(p); + size /= sizeof(long long); + for (size_t i = 0; i < size; ++i) + qq[i] = pp[i]; +} +void unrolled(char *q, char *p, size_t size) { + long long *qq = reinterpret_cast<long long*>(q), + *pp = reinterpret_cast<long long*>(p), + *end = qq + size / sizeof(long long); + while (qq < end) { + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + *qq++ = *pp++; + } +} +//void mycopy(char *p, char *end, char *q) { +// for (; p < end; ++p, ++q) +// *q = *p; +//} +int main() { + char *p = new char[size], *q = new char[size]; + cout << "memcpy" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + memcpy(q, p, size); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + cout << "copy" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + copy(p, p + size, q); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + cout << "unrolled" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + unrolled(q, p, size); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + cout << "mydumbmemcpy" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + mydumbmemcpy(q, p, size); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + cout << "mycopy" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + mycopy(p, p + size, q); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + cout << "mymemcpy" << endl; + for (int i = 0; i < reps; ++i) { + long long start = current_time_millis(); + mymemcpy(q, p, size); + long long diff = current_time_millis() - start; + cout << diff << " ms or " << size / double(diff) / 1000 << " MB/s" << endl; + } + return 0; +} +#if 0 +482 ms or 414.938 MB/s +122 ms or 1639.34 MB/s +117 ms or 1709.4 MB/s +120 ms or 1666.67 MB/s +118 ms or 1694.92 MB/s +-- +165 ms or 1212.12 MB/s +162 ms or 1234.57 MB/s +152 ms or 1315.79 MB/s +153 ms or 1307.19 MB/s +153 ms or 1307.19 MB/s +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-24 08:40:23
|
Revision: 1229 http://assorted.svn.sourceforge.net/assorted/?rev=1229&view=rev Author: yangzhang Date: 2009-02-24 08:40:14 +0000 (Tue, 24 Feb 2009) Log Message: ----------- - added raw-buf counterparts to the response msg types, with empty-only Responses - removed macros GETMSG, GETSA; cleanup - fixed always-fake-execing bug - fixed response sending bugs: sendmsg, marking, start_res/fin_res - refactored some more macros in ser.h - added --use-pb-res to distinguish between serialization methods of txns and responses - test.bash: just build ydb - added notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-23 23:55:26 UTC (rev 1228) +++ ydb/trunk/README 2009-02-24 08:40:14 UTC (rev 1229) @@ -427,8 +427,17 @@ - DONE see how fast p2 runs with fake exec - back up, 2.75M +- DONE add raw-buffer versions of the response message classes as well + - slight increase in speed + - 1: 518K + - 2: 505K + - 3: 485K +- DONE recap + - n: rb-rb rb-pb pb-rb pb-pb + - 1: 518K 467K 359K 333K + - 2: 505K 470K 350K 333K + - 3: 485K 465K 335K 333K - TODO get raw-buffer working in wal, 0-node -- TODO add raw-buffer versions of the response message classes as well - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-23 23:55:26 UTC (rev 1228) +++ ydb/trunk/src/main.lzz.clamp 2009-02-24 08:40:14 UTC (rev 1229) @@ -42,25 +42,9 @@ using namespace std; using namespace std::tr1; using namespace testing; - -using ydb::msg::reader; -using ydb::msg::writer; -using ydb::msg::stream; -using ydb::pb::ResponseBatch; -using ydb::pb::Response; -using ydb::pb::Recovery; -using ydb::pb::Recovery_Pair; -using ydb::pb::Init; -using ydb::pb::Join; -using ydb::pb::SockAddr; +using namespace ydb; using namespace ydb::pb; using namespace ydb::msg; - -#define GETMSG(buf) \ -checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); \ -if (stop_time != nullptr) \ - *stop_time = current_time_millis(); \ -check(msg.ParseFromArray(buf, len)); #end //#define map_t unordered_map @@ -71,7 +55,10 @@ typedef string ser_t; template<typename T> void init_map(T &map) {} -template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); map.set_deleted_key(-2); } +template<> void init_map(dense_hash_map<int, int> &map) { + map.set_empty_key(-1); + map.set_deleted_key(-2); +} // Configuration. st_utime_t timeout; @@ -80,7 +67,7 @@ size_t accept_joiner_size, buf_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, + debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, suppress_txn_msgs, use_bcast_async, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -220,13 +207,10 @@ st_netfd_t fd() const { return fd_; } /** The port on which the replica is listening. */ uint16_t port() const { return port_; } -#hdr -#define GETSA sockaddr_in sa; sockaddr(sa); return sa -#end /** The port on which the replica connected to us. */ - uint16_t local_port() const { GETSA.sin_port; } - uint32_t host() const { GETSA.sin_addr.s_addr; } - sockaddr_in sockaddr() const { GETSA; } + uint16_t local_port() const { return sockaddr().sin_port; } + uint32_t host() const { return sockaddr().sin_addr.s_addr; } + sockaddr_in sockaddr() const { sockaddr_in sa; sockaddr(sa); return sa; } void sockaddr(sockaddr_in &sa) const { socklen_t salen = sizeof sa; check0x(getpeername(st_netfd_fileno(fd_), @@ -398,7 +382,7 @@ * was chosen (sync or async). */ template<typename T> -void +inline void bcastmsg(const vector<st_netfd_t> &dsts, const T &msg /* XXX optimize this , ser_t &s */) { if (use_bcast_async) bcastmsg_async(dsts, msg); @@ -409,7 +393,7 @@ * Send a message to a single recipient. */ template<typename T> -void +inline void sendmsg(st_netfd_t dst, const T &msg) { // XXX optimize this @@ -456,15 +440,15 @@ *start_time = current_time_millis(); len = ntohl(len); - // Parse the message body. - if (len <= 4096) { - char buf[4096]; - GETMSG(buf); - } else { - //cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; - scoped_array<char> buf(new char[len]); - GETMSG(buf.get()); - } + // Parse the message body. Try stack-allocation if possible. + scoped_array<char> sbuf; + char *buf; + if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); + else sbuf.reset(buf = new char[len]); + checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); + if (stop_time != nullptr) + *stop_time = current_time_millis(); + check(msg.ParseFromArray(buf, len)); return len; } @@ -556,7 +540,7 @@ reader r(nullptr); function<void(const void*, size_t)> fn; if (use_wal) - fn = boost::bind(&wal::logbuf, g_wal, _1, _2); + fn = bind(&wal::logbuf, g_wal, _1, _2); else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) @@ -610,7 +594,7 @@ // Process immediately if not bcasting. if (fds.empty()) { --seqno; - process_txn<Types>(g_map, txn, seqno, nullptr); + process_txn<Types, pb_types>(g_map, txn, seqno, nullptr); w.reset(); } @@ -684,18 +668,19 @@ * Process a transaction: update DB state (incl. seqno) and send response to * leader. */ -template<typename Types> +template<typename Types, typename RTypes> void -process_txn(mii &map, const typename Types::Txn &txn, int &seqno, Response *res) +process_txn(mii&map, const typename Types::Txn &txn, int &seqno, + typename RTypes::Response *res) { typedef typename Types::Txn Txn; typedef typename Types::Op Op; - //wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); seqno = txn.seqno(); if (res != nullptr) { res->set_seqno(seqno); res->set_caught_up(true); + start_result(*res); } if (!fake_exec) { for (int o = 0; o < txn.op_size(); ++o) { @@ -733,6 +718,8 @@ } } } + if (res != nullptr) + fin_result(*res); //if (use_wal) wal.commit(); } @@ -773,12 +760,12 @@ } #end -template<typename Txn> shared_ptr<ydb::pb::Txn> to_pb_Txn(Txn txn); -template<> shared_ptr<ydb::pb::Txn> to_pb_Txn(ydb::pb::Txn txn) { - return shared_ptr<ydb::pb::Txn>(new ydb::pb::Txn(txn)); +template<typename Txn> shared_ptr<pb::Txn> to_pb_Txn(Txn txn); +template<> shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { + return shared_ptr<pb::Txn>(new pb::Txn(txn)); } -template<> shared_ptr<ydb::pb::Txn> to_pb_Txn(ydb::msg::Txn txn) { - shared_ptr<ydb::pb::Txn> ptxn(new ydb::pb::Txn()); +template<> shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { + shared_ptr<pb::Txn> ptxn(new pb::Txn()); ptxn->set_seqno(txn.seqno()); // XXX FIXME return ptxn; @@ -809,16 +796,18 @@ * * \param[in] wal The WAL. */ -template<typename Types> +template<typename Types, typename RTypes> void process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, - st_channel<shared_ptr<ydb::pb::Txn> > &backlog, int init_seqno, + st_channel<shared_ptr<pb::Txn> > &backlog, int init_seqno, int mypos, int nnodes) { typedef typename Types::TxnBatch TxnBatch; typedef typename Types::Txn Txn; typedef typename Types::Op Op; + typedef typename RTypes::Response Response; + typedef typename RTypes::ResponseBatch ResponseBatch; bool caught_up = init_seqno == 0; long long start_time = current_time_millis(), @@ -844,15 +833,17 @@ st_reader reader(leader); vector<st_netfd_t> leader_v(1, leader); - writer w(lambda(const void*, size_t) { - throw operation_not_supported("process_txns should not be writing"); - }, buf_size); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, buf_size); stream s(reader, w); try { scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); - TxnBatch batch = *pbatch; - ResponseBatch resbatch; + TxnBatch &batch = *pbatch; + scoped_ptr<ResponseBatch> presbatch(new_ResponseBatch<ResponseBatch>(s)); + ResponseBatch &resbatch = *presbatch; while (true) { long long before_read = -1; if (read_thresh > 0) { @@ -871,7 +862,9 @@ } } if (batch.txn_size() > 0) { + w.mark(); resbatch.Clear(); + start_res(resbatch); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -888,8 +881,8 @@ caught_up = true; } Response *res = resbatch.add_res(); - process_txn<Types>(map, txn, seqno, res); - if (!Types::is_pb()) { + process_txn<Types, RTypes>(map, txn, seqno, res); + if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); } action = "processed"; @@ -912,7 +905,8 @@ st_sleep(0); } } - if (resbatch.res_size() > 0) + fin_res(resbatch); + if (resbatch.res_size() > 0 && RTypes::is_pb()) sendmsg(leader, resbatch); } else { // Empty (default) TxnBatch means "generate a snapshot." @@ -967,14 +961,27 @@ last_seqno(-1) {} + template<typename Types> void run() { - finally f(boost::bind(&response_handler::cleanup, this)); + typedef typename Types::Response Response; + typedef typename Types::ResponseBatch ResponseBatch; + finally f(bind(&response_handler::cleanup, this)); + st_reader reader(replica); - ResponseBatch batch; + writer w(lambda(const void*, size_t) { + throw operation_not_supported("response handler should not be writing"); + }, buf_size); + stream s(reader,w); + scoped_ptr<ResponseBatch> pbatch(new_ResponseBatch<ResponseBatch>(s)); + ResponseBatch &batch = *pbatch; + + function<void()> loop_cleanup = + bind(&response_handler::loop_cleanup, this); + while (true) { - finally f(boost::bind(&response_handler::loop_cleanup, this)); + finally f(loop_cleanup); // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). @@ -982,7 +989,8 @@ // Stop-interruptible in case we're already caught up. try { st_intr intr(stop_hub); - readmsg(reader, batch); + if (Types::is_pb()) readmsg(reader, batch); + else batch.Clear(); } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -999,7 +1007,8 @@ // Only kill-interruptible because we want a clean termination (want // to get all the acks back). st_intr intr(kill_hub); - readmsg(reader, batch); + if (Types::is_pb()) readmsg(reader, batch); + else batch.Clear(); } for (int i = 0; i < batch.res_size(); ++i) { @@ -1007,7 +1016,9 @@ // Determine if this response handler's host (the only joiner) has finished // catching up. If it has, then broadcast a signal so that all response // handlers will know about this event. - if (!caught_up && res.caught_up()) { + int rseqno = res.seqno(); + bool rcaught_up = res.caught_up(); + if (!caught_up && rcaught_up) { long long now = current_time_millis(), timediff = now - start_time; caught_up = true; recover_signals.push(now); @@ -1022,14 +1033,14 @@ stop_hub.set(); } } - if (res.seqno() % chkpt == 0) { + if (rseqno % chkpt == 0) { if (verbose) { cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; + cout << "got response " << rseqno << " from " << replica << endl; } st_sleep(0); } - last_seqno = res.seqno(); + last_seqno = rseqno; } } } @@ -1079,12 +1090,13 @@ /** * Swallow replica responses. */ +template<typename Types> void handle_responses(st_netfd_t replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, bool caught_up) { response_handler h(replica, seqno, rid, recover_signals, caught_up); - h.run(); + h.run<Types>(); } /** @@ -1137,6 +1149,7 @@ /** * Run the leader. */ +template<typename Types, typename RTypes> void run_leader(int minreps, uint16_t leader_port) { @@ -1184,9 +1197,8 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function<void()> f = use_pb ? - bind(issue_txns<pb_types>, ref(newreps), ref(seqno), ref(accept_joiner)) : - bind(issue_txns<rb_types>, ref(newreps), ref(seqno), ref(accept_joiner)); + const function<void()> f = + bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); st_thread_t issue_txns_thread = my_spawn(f, "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_issue_txns(issue_txns_thread); @@ -1212,7 +1224,7 @@ st_thread_group handlers; int rid = 0; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), rid++, + handlers.insert(my_spawn(bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, ref(recover_signals), true), "handle_responses")); } @@ -1242,7 +1254,7 @@ // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, + handlers.insert(my_spawn(bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, ref(recover_signals), false), "handle_responses_joiner")); } catch (break_exception &ex) { @@ -1256,6 +1268,7 @@ /** * Run a replica. */ +template<typename Types, typename RTypes> void run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) { @@ -1329,13 +1342,10 @@ } // Process txns. - st_channel<shared_ptr<ydb::pb::Txn> > backlog; - const function<void()> process_fn = use_pb ? - bind(process_txns<pb_types>, leader, ref(map), ref(seqno), + st_channel<shared_ptr<pb::Txn> > backlog; + const function<void()> process_fn = + bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, - init.node_size()) : - bind(process_txns<rb_types>, leader, ref(map), ref(seqno), - ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, @@ -1391,9 +1401,9 @@ int mid_seqno = seqno; while (!backlog.empty()) { - using ydb::pb::Txn; + using pb::Txn; shared_ptr<Txn> p = backlog.take(); - process_txn<pb_types>(map, *p, seqno, nullptr); + process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " @@ -1520,7 +1530,9 @@ ("general-txns,g", po::bool_switch(&general_txns), "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") ("use-pb", po::bool_switch(&use_pb), - "use protocol buffers instead of raw buffers") + "use protocol buffers instead of raw buffers for txns") + ("use-pb-res", po::bool_switch(&use_pb_res), + "use protocol buffers instead of raw buffers for responses") ("wal", po::bool_switch(&use_wal), "enable ARIES write-ahead logging") ("force-ser", po::bool_switch(&force_ser), @@ -1671,9 +1683,33 @@ // Which role are we? if (is_leader) { - run_leader(minreps, leader_port); + if (use_pb) { + if (use_pb_res) { + run_leader<pb_types, pb_types>(minreps, leader_port); + } else { + run_leader<pb_types, rb_types>(minreps, leader_port); + } + } else { + if (use_pb_res) { + run_leader<rb_types, pb_types>(minreps, leader_port); + } else { + run_leader<rb_types, rb_types>(minreps, leader_port); + } + } } else { - run_replica(leader_host, leader_port, listen_port); + if (use_pb) { + if (use_pb_res) { + run_replica<pb_types, pb_types>(leader_host, leader_port, listen_port); + } else { + run_replica<pb_types, rb_types>(leader_host, leader_port, listen_port); + } + } else { + if (use_pb_res) { + run_replica<rb_types, pb_types>(leader_host, leader_port, listen_port); + } else { + run_replica<rb_types, rb_types>(leader_host, leader_port, listen_port); + } + } } return 0; Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-23 23:55:26 UTC (rev 1228) +++ ydb/trunk/src/ser.cc 2009-02-24 08:40:14 UTC (rev 1229) @@ -29,7 +29,8 @@ str.append(sizeof len, '\0'); check(batch.AppendToString(&str)); len = uint32_t(str.size() - sizeof len); - copy((char*) &len, (char*) &len + sizeof len, str.begin()); + char *p = reinterpret_cast<char*>(&len); + copy(p, p + sizeof len, str.begin()); os(str.data(), str.size()); } Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-23 23:55:26 UTC (rev 1228) +++ ydb/trunk/src/ser.h 2009-02-24 08:40:14 UTC (rev 1229) @@ -12,6 +12,22 @@ #define BEGIN_NAMESPACE(ns) namespace ns { #define END_NAMESPACE } +#define MAKE_START_FIN_HELPER(MsgType, field, action) \ + template<typename T> void action##_##field(T &msg); \ + template<> void action##_##field(ydb::pb::MsgType&) {} \ + template<> void action##_##field(ydb::msg::MsgType& msg) { msg.action##_##field(); } +#define MAKE_START_FIN(MsgType, field) \ + MAKE_START_FIN_HELPER(MsgType, field, start) \ + MAKE_START_FIN_HELPER(MsgType, field, fin) + +#define EXPAND_PB \ + bool AppendToString(string*) const { throw_operation_not_supported(); } \ + bool SerializeToString(string*) const { throw_operation_not_supported(); } \ + bool SerializeToOstream(ostream*) const { throw_operation_not_supported(); } \ + bool ParseFromArray(void*, size_t) { throw_operation_not_supported(); } \ + size_t GetCachedSize() const { throw_operation_not_supported(); } \ + size_t ByteSize() const { throw_operation_not_supported(); } \ + BEGIN_NAMESPACE(ydb) BEGIN_NAMESPACE(msg) @@ -160,38 +176,77 @@ return ntxn_; } const Txn &txn(int) const { txn_.Clear(); return txn_; } - bool AppendToString(string*) const { throw_operation_not_supported(); } - bool SerializeToString(string*) const { throw_operation_not_supported(); } - bool SerializeToOstream(ostream*) const { throw_operation_not_supported(); } - bool ParseFromArray(void*, size_t) { throw_operation_not_supported(); } - size_t GetCachedSize() const { throw_operation_not_supported(); } - size_t ByteSize() const { throw_operation_not_supported(); } + EXPAND_PB }; -template<typename T> void start_txn(T &batch); -template<> void start_txn(ydb::pb::TxnBatch &) {} -template<> void start_txn(ydb::msg::TxnBatch &batch) { batch.start_txn(); } +template<typename T> T *new_TxnBatch(stream &s); +template<> ydb::pb::TxnBatch *new_TxnBatch(stream &) { return new ydb::pb::TxnBatch(); } +template<> ydb::msg::TxnBatch *new_TxnBatch(stream &s) { return new ydb::msg::TxnBatch(s); } -template<typename T> void fin_txn(T &batch); -template<> void fin_txn(ydb::pb::TxnBatch &) {} -template<> void fin_txn(ydb::msg::TxnBatch &batch) { batch.fin_txn(); } +MAKE_START_FIN(Txn, op) +MAKE_START_FIN(TxnBatch, txn) -template<typename T> void start_op(T &txn); -template<> void start_op(ydb::pb::Txn &) {} -template<> void start_op(ydb::msg::Txn &txn) { txn.start_op(); } +class Response +{ + stream &s_; + reader &r_; + writer &w_; + size_t off_; + mutable short nres_; +public: + Response(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), nres_(unset) {} + void Clear() { nres_ = unset; off_ = w_.pos(); } + void set_seqno(int x) { w_.write(x); } + void set_caught_up(char x) { w_.write(x); } + int seqno() const { return r_.read<int>(); } + bool caught_up() const { return r_.read<int>(); } + void start_result() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } + void add_result(int x) { w_.write(x); } + void fin_result() { w_.write(nres_, off_ + sizeof(int) + sizeof(char)); } + int result_size() const { + if (nres_ == unset) + nres_ = r_.read<typeof(nres_)>(); + return nres_; + } + int result(int) const { return r_.read<int>(); } +}; -template<typename T> void fin_op(T &txn); -template<> void fin_op(ydb::pb::Txn &) {} -template<> void fin_op(ydb::msg::Txn &txn) { txn.fin_op(); } +class ResponseBatch +{ + stream &s_; + reader &r_; + writer &w_; + size_t off_; + mutable Response res_; + mutable short nres_; +public: + ResponseBatch(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), res_(s), nres_(unset) {} + void Clear() { res_.Clear(); nres_ = unset; off_ = w_.pos(); } + void start_res() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } + Response *add_res() { ++nres_; return &res_; } + void fin_res() { w_.write(nres_, off_); } + int res_size() const { + if (nres_ == unset) + nres_ = r_.read<typeof(nres_)>(); + return nres_; + } + const Response &res(int) { res_.Clear(); return res_; } + EXPAND_PB +}; -template<typename T> T *new_TxnBatch(stream &s); -template<> ydb::pb::TxnBatch *new_TxnBatch(stream &) { return new ydb::pb::TxnBatch(); } -template<> ydb::msg::TxnBatch *new_TxnBatch(stream &s) { return new ydb::msg::TxnBatch(s); } +template<typename T> T *new_ResponseBatch(stream &s); +template<> ydb::pb::ResponseBatch *new_ResponseBatch(stream &) { return new ydb::pb::ResponseBatch(); } +template<> ydb::msg::ResponseBatch *new_ResponseBatch(stream &s) { return new ydb::msg::ResponseBatch(s); } +MAKE_START_FIN(Response, result) +MAKE_START_FIN(ResponseBatch, res) + struct pb_types { typedef ydb::pb::TxnBatch TxnBatch; typedef ydb::pb::Txn Txn; typedef ydb::pb::Op Op; + typedef ydb::pb::Response Response; + typedef ydb::pb::ResponseBatch ResponseBatch; static bool is_pb() { return true; } }; @@ -200,6 +255,8 @@ typedef ydb::msg::TxnBatch TxnBatch; typedef ydb::msg::Txn Txn; typedef ydb::msg::Op Op; + typedef ydb::msg::Response Response; + typedef ydb::msg::ResponseBatch ResponseBatch; static bool is_pb() { return false; } }; Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-23 23:55:26 UTC (rev 1228) +++ ydb/trunk/tools/test.bash 2009-02-24 08:40:14 UTC (rev 1229) @@ -188,8 +188,7 @@ refresh-local cd ~/ydb/src make clean - PPROF=1 OPT=1 make WTF= - PPROF=1 OPT=1 make WTF= p2 + PPROF=1 OPT=1 make WTF= ydb } init-setup() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-23 23:55:33
|
Revision: 1228 http://assorted.svn.sourceforge.net/assorted/?rev=1228&view=rev Author: yangzhang Date: 2009-02-23 23:55:26 +0000 (Mon, 23 Feb 2009) Log Message: ----------- - fixed fake-exec bug with raw-buf (skipping op_size * Op_Size) - use dense_hash_map in p2 - added --fake-exec, --thresh to p2 (with thresh < 0 ==> no thresh) - added random keys/values to p2 to really build up a map - reintroduced -Wold-style-cast - added more notes/todos - added extraargs to p2() in test.bash Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/p2.cc ydb/trunk/src/ser.h ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/README 2009-02-23 23:55:26 UTC (rev 1228) @@ -397,6 +397,36 @@ Period 2/17- - DONE removed class outstream +- DONE dynamic switch between pb and zero-copy + +- DONE google dense hash map + - big improvement, again not in the direction we'd like + - 0: 550K + - 1: 490K + - 2: 485K + - 3: 475K +- DONE try again fake-exec + - WHOA! major gains + - 0: 1.9M + - 1: 1.5M + - 2: 1M + - 3: 657K + +- DONE see how p2 compares with ydb + - as before, 2.6M +- DONE try adding dense hash map to p2 + - some benefit, 2.9M +- DONE try adding randint to p2 + - huge negative impact! down to 505K + - almost slow as ydb??? sign that i should stop trying to opt ydb :) +- DONE see whether the rand inefficiency is coming from rand or from the random + map manip + - definitely the random map manip + - randspeed in rand-dist shows we get 8.8M rand/s for commons::posix_rand or + 9.9M rand/s for random() +- DONE see how fast p2 runs with fake exec + - back up, 2.75M + - TODO get raw-buffer working in wal, 0-node - TODO add raw-buffer versions of the response message classes as well - TODO refactor st_reader, etc. to be generic opportunistic buffered readers @@ -404,20 +434,11 @@ slow) - TODO try making a streambuf for st_write, then try it in conj with struct-less pb -- DONE dynamic switch between pb and zero-copy - TODO fix pb recovery - TODO implement new recovery (add buffer swapping, add buffers to a list) - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) -- TODO see how p2 compares with ydb - -- DONE google dense hash map - - big improvement, again not in the direction we'd like - - 0: 550K - - 1: 490K - - 2: 485K - - 3: 475K - TODO reuse the serialization buffer in the pb path of ydb - TODO show aries-write Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/src/Makefile 2009-02-23 23:55:26 UTC (rev 1228) @@ -45,7 +45,7 @@ -Werror \ -Wextra \ -Wstrict-null-sentinel \ - -Wno-old-style-cast \ + -Wold-style-cast \ -Woverloaded-virtual \ -Wsign-promo \ -Wformat=2 \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/src/main.lzz.clamp 2009-02-23 23:55:26 UTC (rev 1228) @@ -889,6 +889,9 @@ } Response *res = resbatch.add_res(); process_txn<Types>(map, txn, seqno, res); + if (!Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } action = "processed"; } else { if (first_seqno == -1) Modified: ydb/trunk/src/p2.cc =================================================================== --- ydb/trunk/src/p2.cc 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/src/p2.cc 2009-02-23 23:55:26 UTC (rev 1228) @@ -7,6 +7,7 @@ #include <commons/sockets.h> #include <commons/time.h> #include <exception> +#include <google/dense_hash_map> #include <iostream> #include <set> #include <string> @@ -14,6 +15,7 @@ #include <tr1/unordered_map> #include <vector> using namespace commons; +using namespace google; using namespace std; using namespace tr1; #define foreach BOOST_FOREACH @@ -24,15 +26,16 @@ ++c; \ t += current_time_millis() - start_time; -int bufsize = 1e8, chkpt = 1e4, batch_size = 1e4, thresh = 1e6; -bool verbose = true; +int bufsize = 1e8, chkpt = 1e4, batch_size, thresh; +bool fake_exec, verbose; long long start = 0, seltime = 0, readtime = 0, writetime = 0; int selcnt = 0, readcnt = 0, writecnt = 0; typedef managed_array<char> arr; arr mkarr(char *p = nullptr) { return arr(p, false); } -typedef unordered_map<int, int> map_t; +//typedef unordered_map<int, int> map_t; +typedef dense_hash_map<int, int> map_t; fd_set rfds, wfds, efds; @@ -187,8 +190,8 @@ buf_ = a; if (buf_ == nullptr) return; for (uint32_t i = 0; i < npairs; ++i) { - writeint(1); - writeint(2); + writeint(randint()); + writeint(randint()); } w_.write(); } @@ -207,7 +210,9 @@ long long start_; public: - replica(int fd) : fd_(fd), r_(fd), counter_(0), readcount_(0), start_(current_time_millis()) {} + replica(int fd) : fd_(fd), r_(fd), counter_(0), readcount_(0), start_(current_time_millis()) { + map_.set_empty_key(-1); + } int fd() { return fd_; } uint32_t readint() { @@ -228,11 +233,11 @@ for (uint32_t i = 0; i < npairs; ++i) { uint32_t k = readint(); uint32_t v = readint(); - map_[k] = v; + if (!fake_exec) map_[k] = v; ++counter_; if (counter_ % chkpt == 0) { //if (verbose) cout << current_time_millis() << ": count " << counter_ << endl; - if (counter_ > thresh) { + if (counter_ > thresh || thresh < 0) { long long end = current_time_millis(); double rate = counter_ / double(end - start_) * 1000; cout << "rate " << rate << " pairs/s " << rate / 5 << " tps; readcount " << readcount_ << endl; @@ -261,10 +266,12 @@ ("help,h", "show this help message") ("leader,l", po::bool_switch(&is_leader), "leader") ("verbose,v",po::bool_switch(&verbose), "verbose") + ("fake-exec",po::bool_switch(&fake_exec), "fake-exec") ("host,H", po::value<string>(&host)->default_value(string("localhost")), "hostname or address of the leader") - ("batch,b", po::value<int>(&batch_size)->default_value(1e4), "batch size"); + ("batch,b", po::value<int>(&batch_size)->default_value(1e4), "batch size") + ("thresh,X", po::value<int>(&thresh)->default_value(1e7), "thresh"); po::variables_map vm; try { po::store(po::parse_command_line(argc, argv, desc), vm); @@ -287,8 +294,8 @@ FD_ZERO(&efds); int srv = is_leader ? tcp_listen(7654, true) : -1; - int cli = is_leader ? -1 : tcp_connect(host.c_str(), 7654); - if (cli >= 0) checknnegerr(fcntl(cli, F_SETFL, O_NONBLOCK | fcntl(cli, F_GETFL, 0))); + int cli = is_leader ? + -1 : set_non_blocking(tcp_connect(host.c_str(), 7654)); int nfds = max(srv, cli); if (srv >= 0) FD_SET(srv, &rfds); if (cli >= 0) FD_SET(cli, &rfds); @@ -305,10 +312,7 @@ if (srv >= 0 && FD_ISSET(srv, &rfds)) { if (start == 0) { start = current_time_millis(); seltime = 0; } cout << "accept" << endl; - int r = checknnegerr(accept(srv, nullptr, nullptr)); - cout << fcntl(r, F_GETFL, 0) << ' '; - checknnegerr(fcntl(r, F_SETFL, O_NONBLOCK | fcntl(r, F_GETFL, 0))); - cout << fcntl(r, F_GETFL, 0) << endl; + int r = set_non_blocking(checknnegerr(accept(srv, nullptr, nullptr))); rs.push_back(new replica_channel(r)); nfds = max(nfds, r); FD_SET(r, &wfds); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/src/ser.h 2009-02-23 23:55:26 UTC (rev 1228) @@ -113,6 +113,8 @@ int value() const { return r_.read<int>(); } }; +const size_t Op_Size = sizeof(char) + sizeof(int) + sizeof(int); + class Txn { private: Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-23 23:54:23 UTC (rev 1227) +++ ydb/trunk/tools/test.bash 2009-02-23 23:55:26 UTC (rev 1228) @@ -512,15 +512,15 @@ p2-helper() { local leader="$1" shift - tagssh "$leader" "ydb/src/p2 -l | tail" & + tagssh "$leader" "ydb/src/p2 -l ${extraargs:-}" & sleep .1 { while (( $# > 0 )) ; do - tagssh "$1" "ydb/src/p2 -H $leader | tail" & + tagssh "$1" "ydb/src/p2 -H $leader ${extraargs:-}" & shift done time wait - } 2>&1 | fgrep real + } 2>&1 } p2() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |