[Assorted-commits] SF.net SVN: assorted:[1328] cpp-commons/trunk/src/commons
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-24 06:10:27
|
Revision: 1328 http://assorted.svn.sourceforge.net/assorted/?rev=1328&view=rev Author: yangzhang Date: 2009-03-24 06:10:14 +0000 (Tue, 24 Mar 2009) Log Message: ----------- broke up st.h; added USE(); tweaked ASSERT() Modified Paths: -------------- cpp-commons/trunk/src/commons/assert.h cpp-commons/trunk/src/commons/streamwriter.h cpp-commons/trunk/src/commons/utility.h Added Paths: ----------- cpp-commons/trunk/src/commons/st/channel.h cpp-commons/trunk/src/commons/st/intr.h cpp-commons/trunk/src/commons/st/io.h cpp-commons/trunk/src/commons/st/reader.h cpp-commons/trunk/src/commons/st/sockets.h cpp-commons/trunk/src/commons/st/sync.h cpp-commons/trunk/src/commons/st/threads.h Removed Paths: ------------- cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/assert.h =================================================================== --- cpp-commons/trunk/src/commons/assert.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/assert.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -8,7 +8,7 @@ // This is not the "default" because it does not conform to the requirements of the C standard, // which requires that the NDEBUG version be ((void) 0). #ifdef NDEBUG -#define ASSERT(x) do { (void)sizeof(x); } while(0) +#define ASSERT(x) do { static_cast<void>(sizeof(x)); } while(0) #else #define ASSERT(x) assert(x) #endif Added: cpp-commons/trunk/src/commons/st/channel.h =================================================================== --- cpp-commons/trunk/src/commons/st/channel.h (rev 0) +++ cpp-commons/trunk/src/commons/st/channel.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,70 @@ +#ifndef COMMONS_ST_CHANNEL_H +#define COMMONS_ST_CHANNEL_H + +#include <commons/st/sync.h> +#include <boost/foreach.hpp> +#include <boost/shared_ptr.hpp> +#include <queue> +#include <vector> +#define foreach BOOST_FOREACH +#define shared_ptr boost::shared_ptr + +BEGIN_NAMESPACE(commons) + +/** + * An unbounded FIFO queue. ST threads can wait on this until elements have + * been pushed in (resulting in a waking signal). + */ +template <typename T> +class st_channel +{ + public: + template<typename U> void push(U &&x) { + q_.push(forward<U>(x)); + empty_.signal(); + } + T take() { + while (q_.empty()) { + empty_.wait(); + } + T x = move(front()); + q_.pop(); + return x; + } + const T& front() const { return q_.front(); } + T& front() { return q_.front(); } + bool empty() const { return q_.empty(); } + void pop() { q_.pop(); } + void clear() { while (!q_.empty()) q_.pop(); } + const std::queue<T> &queue() const { return q_; } + private: + std::queue<T> q_; + st_cond empty_; +}; + +/** + * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. + */ +template <typename T> +class st_multichannel +{ + public: + void push(const T &x) { + foreach (shared_ptr<st_channel<T> > q, qs) { + q->push(x); + } + } + st_channel<T> &subscribe() { + shared_ptr<st_channel<T> > q(new st_channel<T>); + qs.push_back(q); + return *q; + } + private: + vector<shared_ptr<st_channel<T> > > qs; +}; + +END_NAMESPACE + +#undef shared_ptr + +#endif Added: cpp-commons/trunk/src/commons/st/intr.h =================================================================== --- cpp-commons/trunk/src/commons/st/intr.h (rev 0) +++ cpp-commons/trunk/src/commons/st/intr.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,92 @@ +#ifndef COMMONS_ST_INTR_H +#define COMMONS_ST_INTR_H + +#include <boost/foreach.hpp> +#include <commons/assert.h> +#include <commons/utility.h> +#include <set> +#include <st.h> +#define foreach BOOST_FOREACH + +BEGIN_NAMESPACE(commons) + + /** + * A hub is a single point to signal to wake up a set of threads. Threads + * join the hub before calling a blocking operation if they want to make + * themselves interruptible on this hub. + */ + class st_intr_hub + { + public: + virtual void insert(st_thread_t t) = 0; + virtual void erase(st_thread_t t) = 0; + virtual ~st_intr_hub() {}; + }; + + /** + * The simplest hub, which only interrupts those who are currently joined in + * the hub (like a condition variable broadcast). + */ + class st_intr_cond : public st_intr_hub + { + public: + virtual ~st_intr_cond() {} + void insert(st_thread_t t) { threads.insert(t); } + void erase(st_thread_t t) { threads.erase(t); } + void signal() { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + private: + std::set<st_thread_t> threads; + }; + + /** + * Like st_intr_cond, but a bool instead, so there's state; newly joining + * threads may immediately be interrupted. Interruption occurs when this is + * set to true. + */ + class st_intr_bool : public st_intr_hub + { + public: + void insert(st_thread_t t) { + if (b) st_thread_interrupt(t); + else threads.insert(t); + } + void erase(st_thread_t t) { threads.erase(t); } + void set() { + b = true; + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + void reset() { + // If b is true, then any threads that join are immediately + // interrupted, so the set must be empty. + ASSERT(!b || threads.empty()); + b = false; + } + operator bool() const { return b; } + private: + std::set<st_thread_t> threads; + bool b; + }; + + /** + * RAII for making the current thread interruptible on a certain hub. + */ + class st_intr + { + public: + st_intr(st_intr_hub &hub) : hub_(hub) { hub.insert(st_thread_self()); } + ~st_intr() { hub_.erase(st_thread_self()); } + private: + st_intr_hub &hub_; + }; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/io.h =================================================================== --- cpp-commons/trunk/src/commons/st/io.h (rev 0) +++ cpp-commons/trunk/src/commons/st/io.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,40 @@ +#ifndef COMMONS_ST_IO_H +#define COMMONS_ST_IO_H + +#include <commons/check.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +using namespace std; + +UNUSED static void +st_read(st_netfd_t fd, void *buf, size_t len) +{ + checkeqnneg(st_read_fully(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); +} + +template<typename T> +void +st_read(st_netfd_t fd, T &x) +{ + st_read(fd, &x, sizeof x); +} + +UNUSED static void +st_write(st_netfd_t fd, const void *buf, size_t len) +{ + checkeqnneg(st_write(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); +} + +template<typename T> +void +st_write(st_netfd_t fd, const T &x) +{ + st_write(fd, &x, sizeof x); +} + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/reader.h =================================================================== --- cpp-commons/trunk/src/commons/st/reader.h (rev 0) +++ cpp-commons/trunk/src/commons/st/reader.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,64 @@ +#ifndef COMMONS_ST_READER_H +#define COMMONS_ST_READER_H + +#include <commons/check.h> +#include <commons/streamreader.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +using namespace boost; +using namespace std; + +class st_read_fn +{ +private: + st_netfd_t fd_; + st_utime_t to_; +public: + st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + size_t operator()(char *buf, size_t len) { + return size_t(checknnegerr(st_read(fd_, buf, len, to_))); + } +}; + +class st_read_fully_fn +{ +private: + st_netfd_t fd_; + st_utime_t to_; +public: + st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + void operator()(char *buf, size_t len) { + checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); + } +}; + +class st_reader +{ + EXPAND(stream_reader) +private: + stream_reader r_; +public: + st_reader(st_netfd_t fd, char *buf, size_t len) : + r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} + size_t unread() { return r_.unread(); } + size_t rem() { return r_.rem(); } + sized_array<char> &buf() { return r_.buf(); } + void reset_range(char *start, char *end) { r_.reset_range(start, end); } + void reset() { r_.reset(); } + char *start() { return r_.start(); } + char *end() { return r_.end(); } + bool accum(size_t req) { return r_.accum(req); } + void skip(size_t req) { r_.skip(req); } + managed_array<char> read(size_t req) { return r_.read(req); } + template<typename T> T read() { return r_.read<T>(); } + void shift() { r_.shift(); } +}; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/st/sockets.h (rev 0) +++ cpp-commons/trunk/src/commons/st/sockets.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,81 @@ +#ifndef COMMONS_ST_SOCKETS_H +#define COMMONS_ST_SOCKETS_H + +#include <commons/closing.h> +#include <commons/sockets.h> +#include <commons/utility.h> +#include <st.h> +#include <stx.h> + +BEGIN_NAMESPACE(commons) + +struct stfd_closer { + static void apply(st_netfd_t fd) { check0x(st_netfd_close(fd)); } +}; + +typedef closing<st_netfd_t, stfd_closer> st_closing; + +/** + * Connect to a TCP socket address. + * \param[in] host An IP address. + * \param[in] port The destination port. + * \param[in] timeout The timeout for the connect operation. + */ +UNUSED static st_netfd_t +st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) +{ + // Create remote socket address. + sockaddr_in sa = make_sockaddr(host, port); + + // Create the socket. + st_closing s(checkpass(st_netfd_open_socket(tcp_socket(true)))); + + // Connect. + check0x(st_connect(s.get(), reinterpret_cast<sockaddr*>(&sa), sizeof sa, timeout)); + return s.release(); +} + +/** + * Connect to a TCP socket address. + * \param[in] host Either an IP address or hostname. + * \param[in] port The destination port. + * \param[in] timeout The timeout for each of the DNS lookup and the connect + * operation. + * \todo Create variants that take and/or return sockaddr_in's. + */ +UNUSED static st_netfd_t +st_tcp_connect(const char *host, uint16_t port, st_utime_t timeout) +{ + in_addr ipaddr; + + // First try to parse as IP address. Note: inet_addr() is obsolete. Note: + // inet_aton returns 0 if address is invalid. + if (inet_aton(host, &ipaddr) == 0) { + // Then look up by hostname. + check0x(stx_dns_getaddr(host, &ipaddr, timeout)); + } + + return st_tcp_connect(ipaddr, port, timeout); +} + +/** + * Create a listener st_netfd_t. + * \param[in] port The port to listen on. + * \return The st_netfd_t. + */ +UNUSED static st_netfd_t +st_tcp_listen(uint16_t port) +{ + int sfd = tcp_listen(port); + try { + // Create a net file descriptor around a listener socket. + return checkpass(st_netfd_open_socket(sfd)); + } catch (...) { + close(sfd); + throw; + } +} + +END_NAMESPACE + +#endif Deleted: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/st/st.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -1,480 +0,0 @@ -#ifndef COMMONS_ST_ST_H -#define COMMONS_ST_ST_H - -#include <algorithm> -#include <boost/foreach.hpp> -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include <commons/array.h> -#include <commons/assert.h> -#include <commons/delegates.h> -#include <commons/nullptr.h> -#include <commons/streamreader.h> -#include <commons/sockets.h> -#include <commons/utility.h> -#include <exception> -#include <map> -#include <queue> -#include <set> -#include <sstream> -#include <st.h> -#include <stx.h> -#include <utility> - -#define foreach BOOST_FOREACH -#define shared_ptr boost::shared_ptr - -namespace commons -{ - using namespace boost; - using namespace std; - - enum { default_stack_size = 65536 }; - - struct stfd_closer { - static void apply(st_netfd_t fd) { check0x(st_netfd_close(fd)); } - }; - - typedef closing<st_netfd_t, stfd_closer> st_closing; - - /** - * RAII to acquire and release a st_mutex_t. Non-copyable. - */ - class st_lock - { - NONCOPYABLE(st_lock) - public: - st_lock(st_mutex_t mx) : mx_(mx) { check0x(st_mutex_lock(mx)); } - ~st_lock() { check0x(st_mutex_unlock(mx_)); } - private: - st_mutex_t mx_; - }; - - UNUSED static void - st_read(st_netfd_t fd, void *buf, size_t len) - { - checkeqnneg(st_read_fully(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } - - template<typename T> - void - st_read(st_netfd_t fd, T &x) - { - st_read(fd, &x, sizeof x); - } - - UNUSED static void - st_write(st_netfd_t fd, const void *buf, size_t len) - { - checkeqnneg(st_write(fd, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } - - template<typename T> - void - st_write(st_netfd_t fd, const T &x) - { - st_write(fd, &x, sizeof x); - } - - - /** - * Run a function in pthread. - * \return The new pthread_t on success, 0 on failure. - * \todo Is it safe to treat the pthread_t as a pointer? - */ - UNUSED static st_thread_t - st_spawn(const fn& f) - { - return st_thread_create(&run_function0_null, - new fn(f), - true, - default_stack_size); - } - - UNUSED static void - st_join(st_thread_t t) - { - check0x(st_thread_join(t, nullptr)); - } - - /** - * Connect to a TCP socket address. - * \param[in] host An IP address. - * \param[in] port The destination port. - * \param[in] timeout The timeout for the connect operation. - */ - UNUSED static st_netfd_t - st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) - { - // Create remote socket address. - sockaddr_in sa = make_sockaddr(host, port); - - // Create the socket. - st_closing s(checkpass(st_netfd_open_socket(tcp_socket(true)))); - - // Connect. - check0x(st_connect(s.get(), reinterpret_cast<sockaddr*>(&sa), sizeof sa, timeout)); - return s.release(); - } - - /** - * Connect to a TCP socket address. - * \param[in] host Either an IP address or hostname. - * \param[in] port The destination port. - * \param[in] timeout The timeout for each of the DNS lookup and the connect - * operation. - * \todo Create variants that take and/or return sockaddr_in's. - */ - UNUSED static st_netfd_t - st_tcp_connect(const char *host, uint16_t port, st_utime_t timeout) - { - in_addr ipaddr; - - // First try to parse as IP address. Note: inet_addr() is obsolete. Note: - // inet_aton returns 0 if address is invalid. - if (inet_aton(host, &ipaddr) == 0) { - // Then look up by hostname. - check0x(stx_dns_getaddr(host, &ipaddr, timeout)); - } - - return st_tcp_connect(ipaddr, port, timeout); - } - - /** - * Create a listener st_netfd_t. - * \param[in] port The port to listen on. - * \return The st_netfd_t. - */ - UNUSED static st_netfd_t - st_tcp_listen(uint16_t port) - { - int sfd = tcp_listen(port); - try { - // Create a net file descriptor around a listener socket. - return checkpass(st_netfd_open_socket(sfd)); - } catch (...) { - close(sfd); - throw; - } - } - - /** - * Wraps st_cond_* errno-functions with exceptions and cleans up on - * destruction. - */ - class st_cond - { - NONCOPYABLE(st_cond) - public: - st_cond() : c(checkerr(st_cond_new())) {} - ~st_cond() { check0x(st_cond_destroy(c)); } - void wait() { check0x(st_cond_wait(c)); } - void wait(st_utime_t t) { check0x(st_cond_timedwait(c, t)); } - void signal() { st_cond_signal(c); } - void bcast() { st_cond_broadcast(c); } - private: - st_cond_t c; - }; - - /** - * Synchronized boolean. - */ - class st_bool - { - public: - st_bool(bool init = false) : c(), b(init) {} - void set() { b = true; c.bcast(); } - void reset() { b = false; c.bcast(); } - void waitset() { if (!b) c.wait(); } - void waitreset() { if (b) c.wait(); } - operator bool() { return b; } - private: - st_cond c; - bool b; - }; - - UNUSED static void toggle(st_bool& b) { if (b) b.reset(); else b.set(); } - - /** - * Wraps st_mutex_* errno-functions with exceptions and cleans up on - * destruction. - */ - class st_mutex - { - NONCOPYABLE(st_mutex) - public: - st_mutex() : m(checkerr(st_mutex_new())) {} - ~st_mutex() { check0x(st_mutex_destroy(m)); } - void lock() { check0x(st_mutex_lock(m)); } - bool trylock() { - int res = st_mutex_trylock(m); - if (res == 0) return true; - else if (errno == EBUSY) return false; - else check0x(res); - } - void unlock() { check0x(st_mutex_unlock(m)); } - private: - st_mutex_t m; - }; - - /** - * An unbounded FIFO queue. ST threads can wait on this until elements have - * been pushed in (resulting in a waking signal). - */ - template <typename T> - class st_channel - { - public: - template<typename U> void push(U &&x) { - q_.push(forward<U>(x)); - empty_.signal(); - } - T take() { - while (q_.empty()) { - empty_.wait(); - } - T x = move(front()); - q_.pop(); - return x; - } - const T& front() const { return q_.front(); } - T& front() { return q_.front(); } - bool empty() const { return q_.empty(); } - void pop() { q_.pop(); } - void clear() { while (!q_.empty()) q_.pop(); } - const std::queue<T> &queue() const { return q_; } - private: - std::queue<T> q_; - st_cond empty_; - }; - - /** - * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. - */ - template <typename T> - class st_multichannel - { - public: - void push(const T &x) { - foreach (shared_ptr<st_channel<T> > q, qs) { - q->push(x); - } - } - st_channel<T> &subscribe() { - shared_ptr<st_channel<T> > q(new st_channel<T>); - qs.push_back(q); - return *q; - } - private: - vector<shared_ptr<st_channel<T> > > qs; - }; - - /** - * A hub is a single point to signal to wake up a set of threads. Threads - * join the hub before calling a blocking operation if they want to make - * themselves interruptible on this hub. - */ - class st_intr_hub - { - public: - virtual void insert(st_thread_t t) = 0; - virtual void erase(st_thread_t t) = 0; - virtual ~st_intr_hub() {}; - }; - - /** - * The simplest hub, which only interrupts those who are currently joined in - * the hub (like a condition variable broadcast). - */ - class st_intr_cond : public st_intr_hub - { - public: - virtual ~st_intr_cond() {} - void insert(st_thread_t t) { threads.insert(t); } - void erase(st_thread_t t) { threads.erase(t); } - void signal() { - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - threads.clear(); - } - private: - std::set<st_thread_t> threads; - }; - - /** - * Like st_intr_cond, but a bool instead, so there's state; newly joining - * threads may immediately be interrupted. Interruption occurs when this is - * set to true. - */ - class st_intr_bool : public st_intr_hub - { - public: - void insert(st_thread_t t) { - if (b) st_thread_interrupt(t); - else threads.insert(t); - } - void erase(st_thread_t t) { threads.erase(t); } - void set() { - b = true; - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - threads.clear(); - } - void reset() { - // If b is true, then any threads that join are immediately - // interrupted, so the set must be empty. - ASSERT(!b || threads.empty()); - b = false; - } - operator bool() const { return b; } - private: - std::set<st_thread_t> threads; - bool b; - }; - - /** - * RAII for making the current thread interruptible on a certain hub. - */ - class st_intr - { - public: - st_intr(st_intr_hub &hub) : hub_(hub) { hub.insert(st_thread_self()); } - ~st_intr() { hub_.erase(st_thread_self()); } - private: - st_intr_hub &hub_; - }; - - class st_group_join_exception : public std::exception - { - public: - st_group_join_exception(const map<st_thread_t, std::exception> &th2ex) : - th2ex_(th2ex) {} - virtual ~st_group_join_exception() throw() {} - virtual const char *what() const throw() { - if (!th2ex_.empty() && s == "") { - bool first = true; - stringstream ss; - typedef pair<st_thread_t, std::exception> p; - foreach (p p, th2ex_) { - ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); - first = false; - } - s = ss.str(); - } - return s.c_str(); - } - private: - map<st_thread_t, std::exception> th2ex_; - mutable string s; - }; - - /** - * RAII for joining on a single thread. - */ - class st_joining - { - NONCOPYABLE(st_joining) - public: - st_joining(st_thread_t t) : t_(t) {} - ~st_joining() { if (t_ != nullptr) st_join(t_); } - private: - st_thread_t t_; - }; - - /** - * RAII for joining on all contained threads. Warning: st_join may throw - * exceptions. - */ - class st_thread_group - { - public: - ~st_thread_group() { - map<st_thread_t, std::exception> th2ex; - foreach (st_thread_t t, ts) { - try { st_join(t); } - catch (std::exception &ex) { th2ex[t] = ex; } - } - if (!th2ex.empty()) throw st_group_join_exception(th2ex); - } - void insert(st_thread_t t) { ts.insert(t); } - private: - std::set<st_thread_t> ts; - }; - -#if 0 -/// XXX - int count = 0; - size_t glen = 0; - - class st_read_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - size_t operator()(char *buf, size_t len) { - size_t x = size_t(checknnegerr(st_read(fd_, buf, len, to_))); - glen += x; - if ((++count & 0xf) == 0xf) - cout << "count " << count << " len " << len << " read " << x << " glen " << glen << endl; - return x; - // return size_t(checknnegerr(st_read(fd_, buf, len, to_))); - } - }; -#else - class st_read_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - size_t operator()(char *buf, size_t len) { - return size_t(checknnegerr(st_read(fd_, buf, len, to_))); - } - }; -#endif - - class st_read_fully_fn - { - private: - st_netfd_t fd_; - st_utime_t to_; - public: - st_read_fully_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) - : fd_(fd), to_(to) {} - void operator()(char *buf, size_t len) { - checkeqnneg(st_read_fully(fd_, buf, len, to_), ssize_t(len)); - } - }; - - class st_reader - { - EXPAND(stream_reader) - private: - stream_reader r_; - public: - st_reader(st_netfd_t fd, char *buf, size_t len) : - r_(st_read_fn(fd), st_read_fully_fn(fd), buf, len) {} - size_t unread() { return r_.unread(); } - size_t rem() { return r_.rem(); } - sized_array<char> &buf() { return r_.buf(); } - void reset_range(char *start, char *end) { r_.reset_range(start, end); } - void reset() { r_.reset(); } - char *start() { return r_.start(); } - char *end() { return r_.end(); } - bool accum(size_t req) { return r_.accum(req); } - void skip(size_t req) { r_.skip(req); } - managed_array<char> read(size_t req) { return r_.read(req); } - template<typename T> T read() { return r_.read<T>(); } - void shift() { r_.shift(); } - }; - -} - -#endif Added: cpp-commons/trunk/src/commons/st/sync.h =================================================================== --- cpp-commons/trunk/src/commons/st/sync.h (rev 0) +++ cpp-commons/trunk/src/commons/st/sync.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,84 @@ +#ifndef COMMONS_ST_SYNC_H +#define COMMONS_ST_SYNC_H + +#include <commons/check.h> +#include <commons/utility.h> +#include <st.h> + +BEGIN_NAMESPACE(commons) + +/** + * RAII to acquire and release a st_mutex_t. Non-copyable. + */ +class st_lock +{ + NONCOPYABLE(st_lock) + public: + st_lock(st_mutex_t mx) : mx_(mx) { check0x(st_mutex_lock(mx)); } + ~st_lock() { check0x(st_mutex_unlock(mx_)); } + private: + st_mutex_t mx_; +}; + +/** + * Wraps st_cond_* errno-functions with exceptions and cleans up on + * destruction. + */ +class st_cond +{ + NONCOPYABLE(st_cond) + public: + st_cond() : c(checkerr(st_cond_new())) {} + ~st_cond() { check0x(st_cond_destroy(c)); } + void wait() { check0x(st_cond_wait(c)); } + void wait(st_utime_t t) { check0x(st_cond_timedwait(c, t)); } + void signal() { st_cond_signal(c); } + void bcast() { st_cond_broadcast(c); } + private: + st_cond_t c; +}; + +/** + * Synchronized boolean. + */ +class st_bool +{ + public: + st_bool(bool init = false) : c(), b(init) {} + void set() { b = true; c.bcast(); } + void reset() { b = false; c.bcast(); } + void waitset() { if (!b) c.wait(); } + void waitreset() { if (b) c.wait(); } + operator bool() { return b; } + private: + st_cond c; + bool b; +}; + +UNUSED static void toggle(st_bool& b) { if (b) b.reset(); else b.set(); } + +/** + * Wraps st_mutex_* errno-functions with exceptions and cleans up on + * destruction. + */ +class st_mutex +{ + NONCOPYABLE(st_mutex) + public: + st_mutex() : m(checkerr(st_mutex_new())) {} + ~st_mutex() { check0x(st_mutex_destroy(m)); } + void lock() { check0x(st_mutex_lock(m)); } + bool trylock() { + int res = st_mutex_trylock(m); + if (res == 0) return true; + else if (errno == EBUSY) return false; + else check0x(res); + } + void unlock() { check0x(st_mutex_unlock(m)); } + private: + st_mutex_t m; +}; + +END_NAMESPACE + +#endif Added: cpp-commons/trunk/src/commons/st/threads.h =================================================================== --- cpp-commons/trunk/src/commons/st/threads.h (rev 0) +++ cpp-commons/trunk/src/commons/st/threads.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -0,0 +1,98 @@ +#ifndef COMMONS_ST_THREADS_H +#define COMMONS_ST_THREADS_H + +#include <commons/utility.h> +#include <commons/delegates.h> +#include <boost/foreach.hpp> +#include <map> +#include <set> +#include <st.h> + +#define foreach BOOST_FOREACH + +BEGIN_NAMESPACE(commons) + +using namespace std; + +enum { default_stack_size = 65536 }; + +/** + * Run a function in pthread. + * \return The new pthread_t on success, 0 on failure. + * \todo Is it safe to treat the pthread_t as a pointer? + */ +UNUSED static st_thread_t +st_spawn(const fn& f) +{ + return st_thread_create(&run_function0_null, + new fn(f), + true, + default_stack_size); +} + +UNUSED static void +st_join(st_thread_t t) +{ + check0x(st_thread_join(t, nullptr)); +} + +class st_group_join_exception : public std::exception +{ + public: + st_group_join_exception(const map<st_thread_t, std::exception> &th2ex) : + th2ex_(th2ex) {} + virtual ~st_group_join_exception() throw() {} + virtual const char *what() const throw() { + if (!th2ex_.empty() && s == "") { + bool first = true; + stringstream ss; + typedef pair<st_thread_t, std::exception> p; + foreach (p p, th2ex_) { + ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); + first = false; + } + s = ss.str(); + } + return s.c_str(); + } + private: + map<st_thread_t, std::exception> th2ex_; + mutable string s; +}; + +/** + * RAII for joining on a single thread. + */ +class st_joining +{ + NONCOPYABLE(st_joining) + public: + st_joining(st_thread_t t) : t_(t) {} + ~st_joining() { if (t_ != nullptr) st_join(t_); } + private: + st_thread_t t_; +}; + +/** + * RAII for joining on all contained threads. Warning: st_join may throw + * exceptions. + */ +class st_thread_group +{ + public: + ~st_thread_group() { + map<st_thread_t, std::exception> th2ex; + foreach (st_thread_t t, ts) { + try { st_join(t); } + catch (std::exception &ex) { th2ex[t] = ex; } + } + if (!th2ex.empty()) throw st_group_join_exception(th2ex); + } + void insert(st_thread_t t) { ts.insert(t); } + private: + set<st_thread_t> ts; +}; + +END_NAMESPACE + +#endif Modified: cpp-commons/trunk/src/commons/streamwriter.h =================================================================== --- cpp-commons/trunk/src/commons/streamwriter.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/streamwriter.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -7,6 +7,7 @@ #include <cstring> #include <iostream> #include <iomanip> +#include <arpa/inet.h> namespace commons { @@ -22,7 +23,7 @@ char *unsent_; char *mark_; char *p_; - boost::function<void(void*, size_t)> flushcb; + boost::function<void(const void*, size_t)> flushcb; char *reserve(size_t n, char *p) { if (p + n > a_.end()) { // check that the reserved space will fit @@ -43,7 +44,7 @@ *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; } public: - stream_writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : + stream_writer(boost::function<void(const void*, size_t)> flushcb, char *a, size_t buf_size) : a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), p_(mark_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } Modified: cpp-commons/trunk/src/commons/utility.h =================================================================== --- cpp-commons/trunk/src/commons/utility.h 2009-03-24 00:10:13 UTC (rev 1327) +++ cpp-commons/trunk/src/commons/utility.h 2009-03-24 06:10:14 UTC (rev 1328) @@ -17,4 +17,7 @@ #define UNUSED __attribute__((unused)) +/** Useful for temporarily causing a variable to be "used." */ +#define USE(x) static_cast<void>(x) + #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |