[Assorted-commits] SF.net SVN: assorted:[1250] cpp-commons/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-04 23:31:12
|
Revision: 1250 http://assorted.svn.sourceforge.net/assorted/?rev=1250&view=rev Author: yangzhang Date: 2009-03-04 23:31:02 +0000 (Wed, 04 Mar 2009) Log Message: ----------- - refactored st_reader out of st.h into streamreader.h as stream_reader - added stream_writer from ydb (writer) - added unit tests for stream_reader, stream_writer - cleaned up stream_reader a bit - Modified Paths: -------------- cpp-commons/trunk/src/commons/exceptions.h cpp-commons/trunk/src/commons/st/st.h Added Paths: ----------- cpp-commons/trunk/src/commons/streamreader.h cpp-commons/trunk/src/commons/streamwriter.h cpp-commons/trunk/src/test/streamreader.cc cpp-commons/trunk/src/test/streamwriter.cc Modified: cpp-commons/trunk/src/commons/exceptions.h =================================================================== --- cpp-commons/trunk/src/commons/exceptions.h 2009-03-04 23:30:45 UTC (rev 1249) +++ cpp-commons/trunk/src/commons/exceptions.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -19,6 +19,16 @@ const string msg; }; + class msg_exception : public std::exception + { + public: + msg_exception(const string &msg) : msg_(msg) {} + ~msg_exception() throw() {} + const char *what() const throw() { return msg_.c_str(); } + private: + const string msg_; + }; + #define throw_operation_not_supported() throw operation_not_supported(__PRETTY_FUNCTION__) } Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-03-04 23:30:45 UTC (rev 1249) +++ cpp-commons/trunk/src/commons/st/st.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -8,6 +8,7 @@ #include <commons/array.h> #include <commons/delegates.h> #include <commons/nullptr.h> +#include <commons/streamreader.h> #include <commons/sockets.h> #include <commons/utility.h> #include <exception> @@ -372,172 +373,49 @@ std::set<st_thread_t> ts; }; - class eof_exception : public std::exception { - const char *what() const throw() { return "EOF"; } + class st_read_fn + { + private: + st_netfd_t fd_; + st_utime_t to_; + public: + st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + size_t operator()(char *buf, size_t len) { + return size_t(checknnegerr(st_read(fd_, buf, len, to_))); + } }; - /** - * Convenience class for reading from sockets. - */ - class st_reader + class st_read_fully_fn { - NONCOPYABLE(st_reader) - public: - st_reader(st_netfd_t fd, char *buf, size_t bufsize) : - fd_(fd), - buf_(buf, bufsize), - start_(buf_.get()), - end_(buf_.get()) - {} + private: + st_netfd_t fd_; + st_utime_t to_; + public: + st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + void operator()(char *buf, size_t len) { + checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); + } + }; - /** - * The size of the unconsumed range of bytes. - */ - size_t unread() { return end_ - start_; } - - /** - * The remaining number of bytes in the buffer - */ - size_t rem() { return buf_.end() - end_; } - - /** - * The entire read buffer. - */ - sized_array<char> &buf() { return buf_; } - - /** - * Manually update/adjust the pointers; useful after changing buf_. - */ - void reset_range(char *start, char *end) { - start_ = start; - end_ = end; - } - - /** - * Discard the requested number of bytes. - */ - void skip(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { - while (true) { - if (unread() >= req) { - // We have more unconsumed bytes than requested, so we're done. - start_ += req; - break; - } - - // We have more requested bytes than unconsumed, so need to keep - // reading. Skip over bytes... - req -= unread(); - // ...and reset pointers to discard current buffer. - start_ = end_ = buf_.get(); - - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - end_ += res; - - // If we got a premature EOF. - if (res == 0 && unread() < req) throw eof_exception(); - } - } - - /** - * Returns a char array that contains the requested number of bytes. If - * we hit an error or EOF, then an exception is thrown. - */ - managed_array<char> read(size_t req, st_utime_t to = ST_UTIME_NO_TIMEOUT) { - // Do we already have the requested data? - if (unread() >= req) { - managed_array<char> p(start_, false); - start_ += req; - return p; - } - - // Handle large arrays specially. - if (req > buf_.size()) { - managed_array<char> p(new char[req], true); - memcpy(p.get(), start_, unread()); - checkeqnneg(st_read_fully(fd_, p + unread(), req - unread(), to), static_cast<ssize_t>(req - unread())); - start_ = end_ = buf_.get(); - return p; - } - - // Shift things down if necessary. - if (req > static_cast<size_t>(buf_.end() - end_)) { - memmove(buf_.get(), start_, unread()); - size_t diff = start_ - buf_.get(); - start_ -= diff; - end_ -= diff; - } - - // Keep reading until we have enough. - while (unread() < req) { - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - if (res == 0) break; - else end_ += res; - } - - // If we got a premature EOF. - if (unread() < req) - throw eof_exception(); - - managed_array<char> p(start_, false); - start_ += req; - return p; - } - - template<typename T> - T read(st_utime_t to = ST_UTIME_NO_TIMEOUT) - { - size_t req = sizeof(T); - - // Do we already have the requested data? - if (unread() >= req) { - T x = *reinterpret_cast<const T*>(start_); - start_ += req; - return x; - } - - assert(req <= buf_.size()); - - // Shift things down if necessary. - if (req > static_cast<size_t>(buf_.end() - end_)) { - memmove(buf_.get(), start_, unread()); - size_t diff = start_ - buf_.get(); - start_ -= diff; - end_ -= diff; - } - - // Keep reading until we have enough. - while (unread() < req) { - ssize_t res = checknnegerr(st_read(fd_, end_, rem(), to)); - if (res == 0) break; - else end_ += res; - } - - // If we got a premature EOF. - if (unread() < req) - throw eof_exception(); - - T x = *reinterpret_cast<const T*>(start_); - start_ += req; - return x; - } - - private: - st_netfd_t fd_; - - /** - * The temporary storage buffer. - */ - sized_array<char> buf_; - - /** - * The start of the unconsumed range of bytes. - */ - char *start_; - - /** - * The end of the unconsumed range of bytes. - */ - char *end_; + class st_reader + { + EXPAND(stream_reader) + private: + stream_reader r_; + public: + st_reader(st_netfd_t fd, char *buf, size_t len) : + r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} + sized_array<char> &buf() { return r_.buf(); } + void reset_range(char *start, char *end) { r_.reset_range(start, end); } + char *start() { return r_.start(); } + char *end() { return r_.end(); } + bool accum(size_t req) { return r_.accum(req); } + void skip(size_t req) { r_.skip(req); } + managed_array<char> read(size_t req) { return r_.read(req); } + template<typename T> T read() { return r_.read<T>(); } + void shift() { r_.shift(); } }; } Added: cpp-commons/trunk/src/commons/streamreader.h =================================================================== --- cpp-commons/trunk/src/commons/streamreader.h (rev 0) +++ cpp-commons/trunk/src/commons/streamreader.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,247 @@ +#ifndef COMMONS_STREAMREADER_H +#define COMMONS_STREAMREADER_H + +#include <boost/function.hpp> +#include <commons/array.h> +#include <cstring> + +namespace commons { + + using namespace boost; + using namespace commons; + using namespace std; + + class eof_exception : public std::exception { + const char *what() const throw() { return "EOF"; } + }; + + /** + * Convenience wrapper for full-reading from data source streams. + */ + class repeat_reader + { + private: + boost::function<size_t(char*, size_t)> &reader_; + public: + repeat_reader(boost::function<size_t(char*, size_t)> &reader) + : reader_(reader) {} + void operator()(char *buf, size_t len) { + size_t res = 0; + while (res < len) { + res += reader_(buf + res, len - res); + if (res == 0) throw eof_exception(); + } + assert(res == len); + } + }; + + /** + * General-purpose stream reader for arbitrary data source streams. + */ + class stream_reader + { + NONCOPYABLE(stream_reader) + public: + stream_reader(boost::function<size_t(char*, size_t)> reader, + char *buf, size_t bufsize) : + reader_(reader), + full_reader_(repeat_reader(reader_)), + buf_(buf, bufsize), + start_(buf), + end_(buf) + {} + + stream_reader(boost::function<size_t(char*, size_t)> reader, + boost::function<void(char*, size_t)> full_reader, + char *buf, size_t bufsize) : + reader_(reader), + full_reader_(full_reader), + buf_(buf, bufsize), + start_(buf), + end_(buf) + {} + + /** + * The size of the unconsumed range of bytes. + */ + size_t unread() { return end_ - start_; } + + /** + * The remaining number of bytes in the buffer + */ + size_t rem() { return buf_.end() - end_; } + + /** + * The entire read buffer. + */ + sized_array<char> &buf() { return buf_; } + + /** + * Manually update/adjust the pointers; useful after changing buf_. + */ + void reset_range(char *start, char *end) { + start_ = start; + end_ = end; + } + + char *start() { return start_; } + char *end() { return end_; } + + /** + * Accumulate (but don't read/copy) the requested number of bytes. + */ + bool accum(size_t req) { + while (unread() < req && rem() > 0) { + size_t res = reader_(end_, rem()); + if (res == 0) throw eof_exception(); + end_ += res; + } + if (unread() < req) { + start_ = end_; + return false; + } else { + start_ += req; + return true; + } + } + + /** + * Discard the requested number of bytes. + */ + void skip(size_t req) { + if (unread() >= req) { + // We have more unconsumed bytes than requested, so we're done. + start_ += req; + return; + } + + // We have more requested bytes than unconsumed, so need to keep + // reading. Skip over bytes that are immediately available... + req -= unread(); + // ...and reset pointers to discard current buffer. + start_ = end_ = buf_.get(); + + // Keep reading until we have enough. + while (true) { + size_t res = reader_(end_, rem()); + if (res == 0) throw eof_exception(); + if (res == req) break; + if (res > req) { + // Now skip over the rest of the bytes, which weren't available. + start_ += req; + end_ += res; + break; + } + req -= res; + } + } + + /** + * Returns a char array that contains the requested number of bytes. If + * we hit an error or EOF, then an exception is thrown. + */ + managed_array<char> read(size_t req) { + // Do we already have the requested data? + if (unread() >= req) { + managed_array<char> p(start_, false); + start_ += req; + return p; + } + + // Handle large arrays specially. + if (req > buf_.size()) { + managed_array<char> p(new char[req], true); + memcpy(p.get(), start_, unread()); + full_reader_(p + unread(), req - unread()); + start_ = end_ = buf_.get(); + return p; + } + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) + shift(); + + // Keep reading until we have enough. + while (unread() < req) { + size_t res = reader_(end_, rem()); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (unread() < req) + throw eof_exception(); + + managed_array<char> p(start_, false); + start_ += req; + return p; + } + + template<typename T> + T read() + { + size_t req = sizeof(T); + + // Do we already have the requested data? + if (unread() >= req) { + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + + assert(req <= buf_.size()); + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) + shift(); + + // Keep reading until we have enough. + while (unread() < req) { + size_t res = reader_(end_, rem()); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (unread() < req) + throw eof_exception(); + + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + + /** + * Shift the unread bytes down to the start of the buffer. + */ + void shift() { + memmove(buf_.get(), start_, unread()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + + private: + boost::function<size_t(char*, size_t)> reader_; + + boost::function<void(char*, size_t)> full_reader_; + + /** + * The temporary storage buffer. + */ + sized_array<char> buf_; + + /** + * The start of the unconsumed range of bytes. + */ + char *start_; + + /** + * The end of the unconsumed range of bytes. + */ + char *end_; + }; + +} + +#endif Added: cpp-commons/trunk/src/commons/streamwriter.h =================================================================== --- cpp-commons/trunk/src/commons/streamwriter.h (rev 0) +++ cpp-commons/trunk/src/commons/streamwriter.h 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,92 @@ +#ifndef COMMONS_STREAMWRITER_H +#define COMMONS_STREAMWRITER_H + +#include <boost/function.hpp> +#include <commons/array.h> +#include <cstring> +#include <iostream> +#include <iomanip> + +namespace commons { + +using namespace boost; +using namespace commons; +using namespace std; + +class stream_writer +{ + NONCOPYABLE(stream_writer) +private: + sized_array<char> a_; + char *unsent_; + char *mark_; + char *p_; + boost::function<void(void*, size_t)> flushcb; + char *reserve(int n, char *p) { + if (p + n > a_.end()) { + // check that the reserved space will fit + assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); + // get rid of what we have + flush(); + size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); + memmove(a_.get() + sizeof(uint32_t), mark_, p_ - mark_); + mark_ = (unsent_ = a_.get()) + sizeof(uint32_t); + p_ -= diff; + p -= diff; + } + return p; + } + char *prefix() { return mark_ - sizeof(uint32_t); } + template<typename T> + void write_(T x, char *p) { + *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; + } +public: + stream_writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : + a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), + p_(mark_), flushcb(flushcb) {} + sized_array<char> &buf() { return a_; } + char *cur() { return p_; } + size_t pos() { return p_ - mark_; } + size_t size() { return a_.size(); } + void mark() { + if (p_ > mark_) { + // prefix last segment with its length + *reinterpret_cast<uint32_t*>(prefix()) = uint32_t(p_ - mark_); + // start new segment + mark_ = (p_ += sizeof(uint32_t)); + } + } + void reset() { p_ = mark_; } + void reserve(int n) { reserve(n, p_); } + void mark_and_flush() { + mark(); + flush(); + mark_ = p_ = (unsent_ = a_.get()) + sizeof(uint32_t); + } + void flush() { + if (prefix() > unsent_) { + flushcb(unsent_, prefix() - unsent_); + unsent_ = prefix(); + } + } + template<typename T> void skip() { reserve(sizeof(T)); p_ += sizeof(T); } + template<typename T> void write(T x) { write_(x, p_); p_ += sizeof x; } + template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } + void show() { + cout << static_cast<void*>(p_); + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << hex << setfill('0') << setw(2) + << int(static_cast<unsigned char>(a_.get()[i])); + cout << endl; + cout << static_cast<void*>(p_); + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << setfill(' ') << setw(2) << (i == pos() ? "^^" : ""); + cout << endl; + } +}; + + +} + +#endif Added: cpp-commons/trunk/src/test/streamreader.cc =================================================================== --- cpp-commons/trunk/src/test/streamreader.cc (rev 0) +++ cpp-commons/trunk/src/test/streamreader.cc 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,69 @@ +#include <commons/array.h> +#include <commons/streamreader.h> +#include <gtest/gtest.h> +using namespace commons; +using namespace testing; + +struct rfn { + size_t chunklen_; + array<char> &src_; + char *cur_; + rfn(size_t chunklen, array<char> &src) + : chunklen_(chunklen), src_(src), cur_(src.get()) {} + size_t operator()(char *buf, size_t len) { + len = min(chunklen_, len); + memcpy(buf, cur_, len); + cur_ += len; + return len; + } +}; + +TEST(reader, read) { + array<char> src(1024); + for (int i = 0; i < 10; ++i) + *(reinterpret_cast<int*>(src.get()) + i) = i; + + for (size_t chunklen = 1; chunklen < 50; ++chunklen) { + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + for (int i = 0; i < 10; ++i) + EXPECT_EQ(i, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + managed_array<char> ma = r.read(5 * sizeof(int)); + array<char> src2(1024); + memcpy(src2.get(), ma.get(), 5 * sizeof(int)); + array<char> rbuf2(1024); + stream_reader r2(rfn(chunklen, src2), rbuf2.get(), rbuf2.size()); + for (int i = 0; i < 5; ++i) + EXPECT_EQ(i, r2.read<int>()); + for (int i = 5; i < 10; ++i) + EXPECT_EQ(i, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + r.skip(9 * sizeof(int)); + EXPECT_EQ(9, r.read<int>()); + } + { + array<char> rbuf(9); + stream_reader r(rfn(chunklen, src), rbuf.get(), rbuf.size()); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_FALSE(r.accum(sizeof(int))); + r.shift(); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_TRUE(r.accum(sizeof(int))); + EXPECT_FALSE(r.accum(sizeof(int))); + } + } +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} Added: cpp-commons/trunk/src/test/streamwriter.cc =================================================================== --- cpp-commons/trunk/src/test/streamwriter.cc (rev 0) +++ cpp-commons/trunk/src/test/streamwriter.cc 2009-03-04 23:31:02 UTC (rev 1250) @@ -0,0 +1,34 @@ +#include <commons/array.h> +#include <commons/streamwriter.h> +#include <gtest/gtest.h> +using namespace commons; +using namespace testing; + +struct wfn { + array<char> &dst_; + char *cur_; + wfn(array<char> &dst) + : dst_(dst), cur_(dst.get()) {} + void operator()(void *buf, size_t len) { + memcpy(cur_, buf, len); + cur_ += len; + } +}; + +TEST(writer, write) { + array<char> dst(1024); + for (size_t wbufsize = 100; wbufsize < 101; ++wbufsize) { + array<char> wbuf(wbufsize); + stream_writer w(wfn(dst), wbuf.get(), wbuf.size()); + for (int i = 0; i < 10; ++i) + w.write(i); + w.mark_and_flush(); + for (int i = 0; i < 10; ++i) + EXPECT_EQ(i, *(reinterpret_cast<int*>(dst.get()) + i + 1)); + } +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |