[Assorted-commits] SF.net SVN: assorted:[1077] cpp-commons/trunk/src/commons
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-11-30 22:22:25
|
Revision: 1077 http://assorted.svn.sourceforge.net/assorted/?rev=1077&view=rev Author: yangzhang Date: 2008-11-30 21:34:08 +0000 (Sun, 30 Nov 2008) Log Message: ----------- - added nullptr - delegates - added swallow - removed exception handling from run_function0 - st - updated stfd - cleaned up st_tcp_listen - split st_tcp_connect into overloads - added st_lock, st_join, st_mutex, st_cond, st_bool, st_channel, st_intr_hub, st_intr_cond, st_intr_bool, st_intr, st_group_join_exception, st_joining, st_thread_group - sockets - cleaned up casts - updated tcp_listen signature - check - added checkerr, checknnegerr, checkeqnneg - added some comments on how to revamp check.h Modified Paths: -------------- cpp-commons/trunk/src/commons/boost/delegates.h cpp-commons/trunk/src/commons/check.h cpp-commons/trunk/src/commons/sockets.h cpp-commons/trunk/src/commons/st/st.h Added Paths: ----------- cpp-commons/trunk/src/commons/nullptr.h Modified: cpp-commons/trunk/src/commons/boost/delegates.h =================================================================== --- cpp-commons/trunk/src/commons/boost/delegates.h 2008-11-29 07:51:27 UTC (rev 1076) +++ cpp-commons/trunk/src/commons/boost/delegates.h 2008-11-30 21:34:08 UTC (rev 1077) @@ -3,12 +3,20 @@ #include <boost/function.hpp> #include <boost/scoped_ptr.hpp> +#include <iostream> // for cerr namespace commons { using namespace boost; + using namespace std; + void + swallow(const function0<void> f) { + try { f(); } + catch (exception &ex) { cerr << ex.what() << endl; } + } + /** * Delegate for running a 0-ary void function. */ @@ -16,8 +24,7 @@ run_function0(void *p) { scoped_ptr< const function0<void> > pf((const function0<void>*) p); - try { (*pf)(); } - catch (exception ex) { cerr << ex.what() << endl; } + (*pf)(); } /** @@ -30,6 +37,17 @@ return NULL; } + // TODO: is there a way to do this? +// /** +// * Exception-returning delegate for running a 0-ary void function. +// */ +// void* +// run_function0_ex(void* p) +// { +// try { run_function0(p); } +// catch (exception &ex) { return exception(ex); } +// } + } #endif Modified: cpp-commons/trunk/src/commons/check.h =================================================================== --- cpp-commons/trunk/src/commons/check.h 2008-11-29 07:51:27 UTC (rev 1076) +++ cpp-commons/trunk/src/commons/check.h 2008-11-30 21:34:08 UTC (rev 1077) @@ -1,16 +1,32 @@ #ifndef COMMONS_CHECK_H #define COMMONS_CHECK_H -#define buflen 4096 - #include <exception> #include <sstream> #include <string> #include <commons/die.h> +// TODO: rename: +// - chk(x): verifies and return x; if not, throw strerror +// - chk(x, msg...): prepend "$msg: " to strerror +// - chk(x, false): don't use errno/strerror +// - chk(x, false, msg...) +// - chknoret(x): doesn't return x and doesn't use strerror +// - chk0(x): verifies !x; if not, throw strerror +// - same overloads as chk +// - chkpos(x): returns x; if not positive, throw strerror +// - same overloads as chk +// - chknneg(x): returns x; if negative, throw strerror +// - same overloads as chk +// - chkeq(x,y): if x!=y, throw; if x<0, throw strerror +// +// - chkr(): same as chk but returns -1 instead of throwing + namespace commons { + enum { buflen = 4096 }; + // TODO: deal with va args // TODO: provide strerror in other functions too // TODO: better way to deal with errno? (rather than resetting it) @@ -22,7 +38,7 @@ public: check_exception(const string & name) : name(name) {} virtual ~check_exception() throw() {} - const char *what() const throw() { return name.c_str(); } + virtual const char *what() const throw() { return name.c_str(); } private: const string name; }; @@ -38,7 +54,6 @@ vsnprintf(buf, buflen, fmt, ap); ss << buf; } - ss << endl; throw check_exception(ss.str()); } } @@ -72,6 +87,28 @@ return x; } + template<typename T> inline T + _checknnegerr(T x, const char *file, int line) + { + if (x < 0) { + int e = errno; + errno = 0; + _check(x, file, line, "expecting >=0, got %d: %s", x, strerror(e)); + } + return x; + } + + template<typename T> inline T + _checkerr(T x, const char *file, int line) + { + if (!x) { + int e = errno; + errno = 0; + _check(x, file, line, "expecting !=0, got %d: %s", x, strerror(e)); + } + return x; + } + template<typename T> inline void _check0(T x, const char *file, int line) { @@ -97,6 +134,19 @@ } } + template<typename T, typename U> inline void + _checkeqnneg(T l, U r, const char *file, int line) + { + if (l < 0) { + int e = errno; + errno = 0; + _check(l == r, file, line, + "expecting %d, got %d: %s", r, l, strerror(e)); + } else { + _checkeq(l, r, file, line); + } + } + } /** @@ -117,11 +167,21 @@ #define checkpass(expr, msg...) _checkpass(expr, __FILE__, __LINE__, ## msg) /** + * Same as checkpass(), but includes the strerror message in the exception. + */ +#define checkerr(expr) _checkerr(expr, __FILE__, __LINE__) + +/** * Checks that the value is non-negative. */ #define checknneg(expr, msg...) _checknneg(expr, __FILE__, __LINE__, ## msg) /** + * Same as checknneg, but include the strerror in the exception. + */ +#define checknnegerr(expr) _checknnegerr(expr, __FILE__, __LINE__) + +/** * Checks that the value is 0. Same as check0(), but throws an error rather * than return. Also clears the errno on error. The resulting exception * includes the strerror(). @@ -134,6 +194,13 @@ */ #define checkeq(l, r, msg...) _checkeq(l, r, __FILE__, __LINE__, ## msg) +/** + * Like checkeq, but if the left value is negative, then interpret that as a + * sign of error, and include the errorstr in the exception message. + */ +#define checkeqnneg(l, r, msg...) \ + _checkeqnneg(l, r, __FILE__, __LINE__, ## msg) + #if 0 #define checkmsg(cond, msg) \ bool b__ = cond; \ Added: cpp-commons/trunk/src/commons/nullptr.h =================================================================== --- cpp-commons/trunk/src/commons/nullptr.h (rev 0) +++ cpp-commons/trunk/src/commons/nullptr.h 2008-11-30 21:34:08 UTC (rev 1077) @@ -0,0 +1,28 @@ +#ifndef COMMONS_NULLPTR_H +#define COMMONS_NULLPTR_H + +// From <http://en.wikibooks.org/wiki/More_C%2B%2B_Idioms/nullptr> + +namespace commons +{ + + const // It is a const object... + class nullptr_t + { + public: + template<class T> + operator T*() const // convertible to any type of null non-member pointer... + { return 0; } + + template<class C, class T> + operator T C::*() const // or any type of null member pointer... + { return 0; } + + private: + void operator&() const; // Can't take address of nullptr + + } nullptr = {}; + +} + +#endif Modified: cpp-commons/trunk/src/commons/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/sockets.h 2008-11-29 07:51:27 UTC (rev 1076) +++ cpp-commons/trunk/src/commons/sockets.h 2008-11-30 21:34:08 UTC (rev 1077) @@ -6,6 +6,8 @@ #include <netinet/in.h> #include <strings.h> #include <sys/socket.h> +#include <sys/types.h> +#include <fcntl.h> #include <unistd.h> #include <commons/check.h> @@ -20,7 +22,7 @@ * \return The server socket. */ int - server_socket(int port, bool nb = false) + server_socket(uint16_t port, bool nb = false) { // Create the socket. int fd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); @@ -28,8 +30,8 @@ try { // Configure the socket. int n = 1; - check0x(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, - sizeof(n))); + check0x(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast<char *>(&n), sizeof(n))); // Make our socket non-blocking if desired. if (nb) { @@ -37,14 +39,14 @@ } // Create the local socket address. - struct sockaddr_in sa; + sockaddr_in sa; bzero(&sa, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); // Bind the socket to the local socket address. - check0x(bind(fd, (struct sockaddr*) &sa, sizeof(struct sockaddr_in))); + check0x(bind(fd, (sockaddr*) &sa, sizeof sa)); return fd; } catch (...) { @@ -60,7 +62,7 @@ * \return The listener socket. */ int - tcp_listen(int port, bool nb = false) + tcp_listen(uint16_t port, bool nb = false) { int fd = server_socket(port, nb); try { Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2008-11-29 07:51:27 UTC (rev 1076) +++ cpp-commons/trunk/src/commons/st/st.h 2008-11-30 21:34:08 UTC (rev 1077) @@ -1,12 +1,22 @@ #ifndef COMMONS_ST_ST_H #define COMMONS_ST_ST_H +#include <exception> +#include <map> +#include <queue> +#include <set> +#include <sstream> #include <st.h> #include <stx.h> +#include <commons/nullptr.h> +// delegates.h must be included after sockets.h due to bind() conflicts. #include <commons/sockets.h> #include <commons/boost/delegates.h> +#include <boost/foreach.hpp> #include <boost/function.hpp> +#define foreach BOOST_FOREACH + namespace commons { using namespace boost; @@ -28,20 +38,32 @@ }; /** - * non-copyable. - * \todo remove? this seems to be bad if it closes the file! + * \todo Make non-copyable. */ class stfd { public: stfd(st_netfd_t fd) : fd_(fd), sclose(fd) {} - const st_netfd_t fd() const { return fd_; } + st_netfd_t fd() const { return fd_; } + operator st_netfd_t() const { return fd_; } private: const st_netfd_t fd_; st_closing sclose; }; /** + * RAII to acquire and release a st_mutex_t. Non-copyable. + */ + class st_lock + { + public: + st_lock(st_mutex_t mx) : mx_(mx) { check0x(st_mutex_lock(mx)); } + ~st_lock() { check0x(st_mutex_unlock(mx_)); } + private: + st_mutex_t mx_; + }; + + /** * Run a function in pthread. * \return The new pthread_t on success, 0 on failure. * \todo Is it safe to treat the pthread_t as a pointer? @@ -55,36 +77,35 @@ default_stack_size); } + void + st_join(st_thread_t t) + { + check0x(st_thread_join(t, nullptr)); + } + /** * Connect to a TCP socket address. - * \param[in] host Either an IP address or hostname. + * \param[in] host An IP address. * \param[in] port The destination port. - * \param[in] timeout The timeout for each of the DNS lookup and the connect - * operation. - * \todo Create variants that take or return the sockaddr. + * \param[in] timeout The timeout for the connect operation. */ st_netfd_t - st_tcp_connect(char *host, int port, st_utime_t timeout) + st_tcp_connect(in_addr host, uint16_t port, st_utime_t timeout) { // Create remote socket address. struct sockaddr_in sa; bzero(&sa, sizeof sa); sa.sin_family = AF_INET; sa.sin_port = htons(port); + sa.sin_addr = host; - // First try to parse as IP address. Note: inet_addr() is obsolete. - if (inet_aton(host, &sa.sin_addr) != 0) { - // Then look up by hostname. - check0x(stx_dns_getaddr(host, &sa.sin_addr, timeout)); - } - // Create the socket. int sfd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); st_netfd_t nfd = st_netfd_open_socket(sfd); // Connect. try { - check0x(st_connect(nfd, (struct sockaddr*) &sa, sizeof sa, timeout)); + check0x(st_connect(nfd, (sockaddr*) &sa, sizeof sa, timeout)); return nfd; } catch (...) { st_netfd_close(nfd); @@ -93,26 +114,261 @@ } /** + * Connect to a TCP socket address. + * \param[in] host Either an IP address or hostname. + * \param[in] port The destination port. + * \param[in] timeout The timeout for each of the DNS lookup and the connect + * operation. + * \todo Create variants that take and/or return sockaddr_in's. + */ + st_netfd_t + st_tcp_connect(const char *host, uint16_t port, st_utime_t timeout) + { + in_addr ipaddr; + + // First try to parse as IP address. Note: inet_addr() is obsolete. Note: + // inet_aton returns 0 if address is invalid. + if (inet_aton(host, &ipaddr) == 0) { + // Then look up by hostname. + check0x(stx_dns_getaddr(host, &ipaddr, timeout)); + } + + return st_tcp_connect(ipaddr, port, timeout); + } + + /** * Create a listener st_netfd_t. * \param[in] port The port to listen on. * \return The st_netfd_t. */ st_netfd_t - st_tcp_listen(int port) + st_tcp_listen(uint16_t port) { int sfd = tcp_listen(port); try { // Create a net file descriptor around a listener socket. - st_netfd_t nfd = st_netfd_open_socket(sfd); - checkpass(nfd); - //st_netfd_t nfd = checkpass(st_netfd_open_socket(sfd)); - return nfd; + return checkpass(st_netfd_open_socket(sfd)); } catch (...) { close(sfd); throw; } } + /** + * Wraps st_cond_* errno-functions with exceptions and cleans up on + * destruction. + */ + class st_cond + { + public: + st_cond() : c(checkerr(st_cond_new())) {} + ~st_cond() { check0x(st_cond_destroy(c)); } + void wait() { check0x(st_cond_wait(c)); } + void wait(st_utime_t t) { check0x(st_cond_timedwait(c, t)); } + void signal() { st_cond_signal(c); } + void bcast() { st_cond_broadcast(c); } + private: + st_cond_t c; + }; + + /** + * Synchronized boolean. + */ + class st_bool + { + public: + st_bool(bool init = false) : b(init) {} + void set() { b = true; c.bcast(); } + void reset() { b = false; c.bcast(); } + void waitset() { if (!b) c.wait(); } + void waitreset() { if (b) c.wait(); } + operator bool() { return b; } + private: + st_cond c; + bool b; + }; + + /** + * Wraps st_mutex_* errno-functions with exceptions and cleans up on + * destruction. + */ + class st_mutex + { + public: + st_mutex() : m(checkerr(st_mutex_new())) {} + ~st_mutex() { check0x(st_mutex_destroy(m)); } + void lock() { check0x(st_mutex_lock(m)); } + bool trylock() { + int res = st_mutex_trylock(m); + if (res == 0) return true; + else if (errno == EBUSY) return false; + else check0x(res); + } + void unlock() { check0x(st_mutex_unlock(m)); } + private: + st_mutex_t m; + }; + + /** + * An unbounded FIFO queue. ST threads can wait on this until elements have + * been pushed in (resulting in a waking signal). + */ + template <typename T> + class st_channel + { + public: + void push(const T &x) { + q_.push(x); + empty_.signal(); + } + T take() { + while (q_.empty()) + empty_.wait(); + T x = front(); + q_.pop(); + return x; + } + const T& front() const { return q_.front(); } + bool empty() const { return q_.empty(); } + void pop() { q_.pop(); } + void clear() { while (!q_.empty()) q_.pop(); } + const std::queue<T> &queue() const { return q_; } + private: + std::queue<T> q_; + st_cond empty_; + }; + + /** + * A hub is a single point to signal to wake up a set of threads. Threads + * join the hub before calling a blocking operation if they want to make + * themselves interruptible on this hub. + */ + class st_intr_hub + { + public: + virtual void insert(st_thread_t t) = 0; + virtual void erase(st_thread_t t) = 0; + }; + + /** + * The simplest hub, which only interrupts those who are currently joined in + * the hub (like a condition variable broadcast). + */ + class st_intr_cond : public st_intr_hub + { + public: + void insert(st_thread_t t) { threads.insert(t); } + void erase(st_thread_t t) { threads.erase(t); } + void signal() { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + private: + set<st_thread_t> threads; + }; + + /** + * Like st_intr_cond, but a bool instead, so there's state; newly joining + * threads may immediately be interrupted. Interruption occurs when this is + * set to true. + */ + class st_intr_bool : public st_intr_hub + { + public: + void insert(st_thread_t t) { + if (b) st_thread_interrupt(t); + else threads.insert(t); + } + void erase(st_thread_t t) { threads.erase(t); } + void set() { + b = true; + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + threads.clear(); + } + void reset() { + // If b is true, then any threads that join are immediately + // interrupted, so the set must be empty. + assert(!b || threads.empty()); + b = false; + } + operator bool() const { return b; } + private: + std::set<st_thread_t> threads; + bool b; + }; + + /** + * RAII for making the current thread interruptible on a certain hub. + */ + class st_intr + { + public: + st_intr(st_intr_hub &hub) : hub_(hub) { hub.insert(st_thread_self()); } + ~st_intr() { hub_.erase(st_thread_self()); } + private: + st_intr_hub &hub_; + }; + + class st_group_join_exception : public exception + { + public: + st_group_join_exception(const map<st_thread_t, exception> &th2ex) : + th2ex_(th2ex) {} + virtual ~st_group_join_exception() throw() {} + virtual const char *what() const throw() { + if (!th2ex_.empty() && s == "") { + bool first = true; + stringstream ss; + typedef pair<st_thread_t, exception> p; + foreach (p p, th2ex_) { + ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); + first = false; + } + const_cast<string&>(s) = ss.str(); + } + return s.c_str(); + } + private: + map<st_thread_t, exception> th2ex_; + string s; + }; + + /** + * RAII for joining on a single thread. + */ + class st_joining + { + public: + st_joining(st_thread_t t) : t_(t) {} + ~st_joining() { st_join(t_); } + private: + st_thread_t t_; + }; + + /** + * RAII for joining on all contained threads. Warning: st_join may throw + * exceptions. + */ + class st_thread_group + { + public: + ~st_thread_group() { + map<st_thread_t, exception> th2ex; + foreach (st_thread_t t, ts) { + try { st_join(t); } + catch (exception &ex) { th2ex[t] = ex; } + } + if (!th2ex.empty()) throw st_group_join_exception(th2ex); + } + void insert(st_thread_t t) { ts.insert(t); } + private: + set<st_thread_t> ts; + }; + } #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |