[complement-svn] SF.net SVN: complement: [1835] branches/complement-sockios/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-03-26 11:10:48
|
Revision: 1835 http://complement.svn.sourceforge.net/complement/?rev=1835&view=rev Author: complement Date: 2008-03-26 04:10:43 -0700 (Wed, 26 Mar 2008) Log Message: ----------- connection processor implemented Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/sockstream2 branches/complement-sockios/explore/include/sockios/sockstream2.cc branches/complement-sockios/explore/include/sockios/sp.h branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.h branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc Modified: branches/complement-sockios/explore/include/sockios/sockstream2 =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream2 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/include/sockios/sockstream2 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/03/07 01:16:27 ptr> +// -*- C++ -*- Time-stamp: <08/03/26 09:54:59 ptr> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -222,6 +222,7 @@ static void __at_fork_child(); static void __at_fork_parent(); static int _count; + static bool _at_fork; }; static char Init_buf[]; @@ -290,6 +291,9 @@ int basic_socket<charT,traits,_Alloc>::Init::_count = 0; template<class charT, class traits, class _Alloc> +bool basic_socket<charT,traits,_Alloc>::Init::_at_fork = false; + +template<class charT, class traits, class _Alloc> void basic_socket<charT,traits,_Alloc>::Init::_guard( int direction ) { static std::tr2::mutex _init_lock; @@ -298,9 +302,14 @@ std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); if ( _count++ == 0 ) { basic_socket<charT,traits,_Alloc>::mgr = new detail::sockmgr<charT,traits,_Alloc>(); -// #ifdef __FIT__PTHREADS -// pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ); -// #endif +#ifdef __FIT_PTHREADS + if ( !_at_fork ) { // call only once + if ( pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ) ) { + // throw system_error; + } + _at_fork = true; + } +#endif // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); } } else { @@ -313,7 +322,8 @@ template<class charT, class traits, class _Alloc> void basic_socket<charT,traits,_Alloc>::Init::__at_fork_prepare() -{ } +{ +} template<class charT, class traits, class _Alloc> void basic_socket<charT,traits,_Alloc>::Init::__at_fork_child() Modified: branches/complement-sockios/explore/include/sockios/sockstream2.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream2.cc 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/include/sockios/sockstream2.cc 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/03/07 01:16:27 ptr> +// -*- C++ -*- Time-stamp: <08/03/18 09:51:28 ptr> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -285,7 +285,7 @@ template<class charT, class traits, class _Alloc> void basic_sockbuf2<charT, traits, _Alloc>::shutdown( sock_base2::shutdownflg dir ) { - if ( basic_socket_t::is_open() ) { + if ( basic_socket_t::is_open_unsafe() ) { if ( (dir & (sock_base2::stop_in | sock_base2::stop_out)) == (sock_base2::stop_in | sock_base2::stop_out) ) { ::shutdown( basic_socket_t::_fd, 2 ); Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/03/07 01:40:59 ptr> +// -*- C++ -*- Time-stamp: <08/03/26 09:55:19 ptr> /* * Copyright (c) 2008 @@ -22,6 +22,7 @@ #include <cerrno> #include <mt/thread> #include <mt/mutex> +#include <mt/condition_variable> #ifdef STLPORT # include <unordered_map> @@ -43,6 +44,8 @@ #endif #include <sockios/sockstream> +#include <deque> +#include <functional> namespace std { @@ -270,6 +273,313 @@ typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor; +template <class Connect, class charT = char, class traits = std::char_traits<charT>, class _Alloc = std::allocator<charT>, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& ) = &Connect::connect > +class connect_processor : + public sock_processor_base<charT,traits,_Alloc> +{ + private: + typedef sock_processor_base<charT,traits,_Alloc> base_t; + + class Init + { + public: + Init() + { _guard( 1 ); } + ~Init() + { _guard( 0 ); } + + private: + static void _guard( int direction ); + static void __at_fork_prepare(); + static void __at_fork_child(); + static void __at_fork_parent(); + static int _count; + }; + + static char Init_buf[]; + + public: + connect_processor() : + not_empty( *this ), + _in_work( false ), + ploop( loop, this ) + { new( Init_buf ) Init(); } + + explicit connect_processor( int port ) : + base_t( port, sock_base2::sock_stream ), + not_empty( *this ), + _in_work( false ), + ploop( loop, this ) + { new( Init_buf ) Init(); } + + ~connect_processor() + { + connect_processor::close(); + if ( ploop.joinable() ) { + ploop.join(); + } + ((Init *)Init_buf)->~Init(); + } + + virtual void close(); + + void wait() + { if ( ploop.joinable() ) { ploop.join(); } } + + private: + virtual void operator ()( typename base_t::sockstream_t& s, const typename base_t::adopt_new_t& ); + virtual void operator ()( typename base_t::sockstream_t& s, const typename base_t::adopt_close_t& ); + virtual void operator ()( typename base_t::sockstream_t& s, const typename base_t::adopt_data_t& ); + + + static void loop( connect_processor* me ) + { me->worker(); } + + void worker(); + + private: + struct processor + { + processor() : + c(0), + s(0) + { } + processor( Connect* __c, typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* __s ) : + c(__c), + s(__s) + { } + processor( const processor& p ) : + c( p.c ), + s( p.s ) + { } + + processor& operator =( const processor& p ) + { c = p.c; s = p.s; return *this; } + + Connect* c; + typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* s; + + bool operator ==( const processor& p ) const + { return s == p.s; } + bool operator ==( const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* st ) const + { return const_cast<const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t*>(s) == st; } + +/* + struct equal_to : + public std::binary_function<processor, typename sock_processor_base<charT,traits,_Alloc>::sockstream_t*, bool> + { + bool operator()(const processor& __x, const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* __y) const + { return __x == __y; } + }; +*/ + }; + + bool pop_ready( processor& ); + +#ifdef __USE_STLPORT_HASH + typedef std::hash_map<typename base_t::sockstream_t*,Connect*> worker_pool_t; +#endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_map<typename base_t::sockstream_t*,Connect*> worker_pool_t; +#endif +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) + typedef std::tr1::unordered_map<typename base_t::sockstream_t*,Connect*> worker_pool_t; +#endif + typedef std::deque<processor> ready_pool_t; + + struct _not_empty + { + _not_empty( connect_processor& p ) : + me( p ) + { } + + bool operator()() const + { return !me.ready_pool.empty(); } + + connect_processor& me; + } not_empty; + + worker_pool_t worker_pool; + ready_pool_t ready_pool; + volatile bool _in_work; + std::tr2::mutex wklock; + std::tr2::mutex rdlock; + std::tr2::condition_variable cnd; + std::tr2::mutex inwlock; + std::tr2::condition_variable cnd_inwk; + std::tr2::thread ploop; + + friend struct _not_empty; +}; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +int connect_processor<Connect, charT, traits, _Alloc, C>::Init::_count = 0; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::_guard( int direction ) +{ + static std::tr2::mutex _init_lock; + + if ( direction ) { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( _count++ == 0 ) { + +// #ifdef __FIT__PTHREADS +// pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ); +// #endif +// _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); + } + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( --_count == 0 ) { + + } + } +} + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_prepare() +{ } + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_child() +{ + if ( _count != 0 ) { + + } + // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); +} + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_parent() +{ } + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +char connect_processor<Connect, charT, traits, _Alloc, C>::Init_buf[128]; + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::close() +{ + base_t::close(); + + { + std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); + _in_work = false; // <--- set before cnd.notify_one(); (below in this func) + } + + std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); + ready_pool.push_back( processor() ); // make ready_pool not empty + // std::cerr << "=== " << ready_pool.size() << std::endl; + cnd.notify_one(); +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::sockstream_t& s, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_new_t& ) +{ + Connect* c = new Connect( s ); + if ( s.rdbuf()->in_avail() ) { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( processor( c, &s ) ); + cnd.notify_one(); + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + worker_pool[&s] = c; + } +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::sockstream_t& s, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_close_t& ) +{ + { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + typename worker_pool_t::iterator i = worker_pool.find( &s ); + if ( i != worker_pool.end() ) { + delete i->second; + // std::cerr << "oops\n"; + worker_pool.erase( i ); + return; + } + } + + Connect* c = 0; + { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + typename ready_pool_t::iterator j = std::find( ready_pool.begin(), ready_pool.end(), /* std::bind2nd( typename processor::equal_to(), &s ) */ &s ); + if ( j != ready_pool.end() ) { + // std::cerr << "oops 2\n"; + c = j->c; + ready_pool.erase( j ); + } + } + if ( c != 0 ) { + (c->*C)( s ); + delete c; + } +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::sockstream_t& s, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_data_t& ) +{ + Connect* c; + + { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + typename worker_pool_t::const_iterator i = worker_pool.find( &s ); + if ( i == worker_pool.end() ) { + return; + } + c = i->second; + worker_pool.erase( i ); + } + + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( processor( c, &s ) ); + cnd.notify_one(); + // std::cerr << "notify data " << (void *)c << " " << ready_pool.size() << std::endl; +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +bool connect_processor<Connect, charT, traits, _Alloc, C>::pop_ready( processor& p ) +{ + { + std::tr2::unique_lock<std::tr2::mutex> lk( rdlock ); + + cnd.wait( lk, not_empty ); + p = ready_pool.front(); // it may contain p.c == 0, p.s == 0, if !in_work() + ready_pool.pop_front(); + // std::cerr << "pop 1\n"; + if ( p.c == 0 ) { // wake up, but _in_work may be still true here (in processor pipe?), + return false; // even I know that _in_work <- false before notification... + } // so, check twice + } + + // std::cerr << "pop 2\n"; + + std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); + return _in_work ? true : false; +} + + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::worker() +{ + _in_work = true; + + processor p; + + while ( pop_ready( p ) ) { + // std::cerr << "worker 1\n"; + (p.c->*C)( *p.s ); + if ( p.s->rdbuf()->in_avail() ) { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( p ); + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + worker_pool[p.s] = p.c; + } + } +} + namespace detail { template<class charT, class traits, class _Alloc> @@ -424,7 +734,7 @@ template<class charT, class traits, class _Alloc> void sockmgr<charT,traits,_Alloc>::io_worker() { - epoll_event ev[n_ret]; + epoll_event ev[/*n_ret*/ 512 ]; /* ctl _xctl; @@ -437,22 +747,28 @@ } */ + try { for ( ; ; ) { - int n = epoll_wait( efd, &ev[0], n_ret, -1 ); + int n = epoll_wait( efd, &ev[0], /* n_ret */ 512, -1 ); if ( n < 0 ) { if ( errno == EINTR ) { continue; } // throw system_error } + // std::cerr << "epoll see " << n << std::endl; for ( int i = 0; i < n; ++i ) { + // std::cerr << "epoll i = " << i << std::endl; if ( ev[i].data.fd == pipefd[0] ) { + // std::cerr << "on pipe\n"; epoll_event ev_add; ctl _ctl; int r = read( pipefd[0], &_ctl, sizeof(ctl) ); if ( r < 0 ) { // throw system_error + // std::cerr << "Read pipe\n"; } else if ( r == 0 ) { + // std::cerr << "Read pipe 0\n"; return; } @@ -461,6 +777,10 @@ ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); if ( ev_add.data.fd >= 0 ) { + if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; + throw std::runtime_error( "can't establish nonblock mode on listener" ); + } fd_info new_info = { fd_info::listener, 0, static_cast<socks_processor_t*>(_ctl.data.ptr) }; descr[ev_add.data.fd] = new_info; if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { @@ -496,12 +816,14 @@ } break; case rqstop: + // std::cerr << "Stop request\n"; return; break; } continue; } + // std::cerr << "#\n"; typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); if ( ifd == descr.end() ) { @@ -510,49 +832,67 @@ fd_info& info = ifd->second; if ( info.flags & fd_info::listener ) { + // std::cerr << "%\n"; if ( ev[i].events & EPOLLRDHUP ) { epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ); // walk through descr and detach every .p ? descr.erase( ifd ); + // std::cerr << "Remove listener EPOLLRDHUP\n"; } else if ( ev[i].events & EPOLLIN ) { sockaddr addr; socklen_t sz = sizeof( sockaddr_in ); - int fd = accept( ev[i].data.fd, &addr, &sz ); - if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { - throw std::runtime_error( "can't establish nonblock mode" ); - } - sockstream_t* s; + for ( ; ; ) { + int fd = accept( ev[i].data.fd, &addr, &sz ); + if ( fd < 0 ) { + if ( (errno == EINTR) || (errno == ECONNABORTED) /* || (errno == ERESTARTSYS) */ ) { + continue; + } + if ( !(errno == EAGAIN || errno == EWOULDBLOCK) ) { + // std::cerr << "Accept, listener " << ev[i].data.fd << ", errno " << errno << std::endl; + // throw system_error ? + } + break; + } + // std::cerr << "listener accept " << fd << std::endl; + if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + sockstream_t* s; - try { - s = new sockstream_t(); - if ( s->rdbuf()->_open_sockmgr( fd, addr ) ) { - epoll_event ev_add; - ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - ev_add.data.fd = fd; - fd_info new_info = { fd_info::owner, s, info.p }; - descr[fd] = new_info; + try { + s = new sockstream_t(); + if ( s->rdbuf()->_open_sockmgr( fd, addr ) ) { + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = fd; + fd_info new_info = { fd_info::owner, s, info.p }; + descr[fd] = new_info; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { - std::cerr << "Accept, add " << fd << ", errno " << errno << std::endl; - descr.erase( fd ); - // throw system_error + if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { + // std::cerr << "Accept, add " << fd << ", errno " << errno << std::endl; + descr.erase( fd ); + // throw system_error + } + (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); + } else { + // std::cerr << "Accept, delete " << fd << std::endl; + delete s; } - (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); - } else { - std::cerr << "Accept, delete " << fd << std::endl; + } + catch ( const std::bad_alloc& ) { + // nothing + } + catch ( ... ) { + descr.erase( fd ); delete s; } } - catch ( const std::bad_alloc& ) { - // nothing - } - catch ( ... ) { - descr.erase( fd ); - delete s; - } + } else { + // std::cerr << "listener: " << std::hex << ev[i].events << std::dec << std::endl; } } else { + // std::cerr << "not listener\n"; if ( ev[i].events & EPOLLIN ) { if ( (info.flags & fd_info::owner) == 0 ) { // marginal case: me not owner (registerd via push(), @@ -620,6 +960,7 @@ b->setg( b->eback(), b->gptr(), b->egptr() + offset ); b->ucnd.notify_one(); if ( info.p != 0 ) { + // std::cerr << "data here" << std::endl; (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); } } else { @@ -675,12 +1016,53 @@ } } } + } + catch ( std::exception& e ) { + std::cerr << e.what() << std::endl; + } } } //detail } // namesapce std +#if defined(__USE_STLPORT_HASH) || defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) +# define __HASH_NAMESPACE std +#endif +#if defined(__USE_STD_HASH) +# define __HASH_NAMESPACE __gnu_cxx +#endif + +namespace __HASH_NAMESPACE { + +#ifdef __USE_STD_TR1 +namespace tr1 { +#endif + +template <class charT, class traits, class _Alloc> +struct hash<std::basic_sockstream2<charT, traits, _Alloc>* > +{ + size_t operator()(const std::basic_sockstream2<charT, traits, _Alloc>* __x) const + { return reinterpret_cast<size_t>(__x); } +}; + +#ifdef __USE_STD_TR1 +} +#endif + +#if defined(__GNUC__) && (__GNUC__ < 4) +template<> +struct hash<void *> +{ + size_t operator()(const void *__x) const + { return reinterpret_cast<size_t>(__x); } +}; +#endif // __GNUC__ < 4 + +} // namespace __HASH_NAMESPACE + +#undef __HASH_NAMESPACE + #ifdef __USE_STLPORT_HASH # undef __USE_STLPORT_HASH #endif Modified: branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc =================================================================== --- branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/03/06 23:31:13 ptr> +// -*- C++ -*- Time-stamp: <08/03/26 11:48:40 ptr> /* * @@ -14,6 +14,8 @@ #include <sockios/sockstream2> #include <mt/mutex> +#include <sys/wait.h> +#include <mt/shm.h> using namespace std; using namespace std::tr2; @@ -28,33 +30,6 @@ /* ************************************************************ */ -class worker -{ - public: - worker( sockstream& ) - { lock_guard<mutex> lk(lock); ++cnt; ++visits; } - - ~worker() - { lock_guard<mutex> lk(lock); --cnt; } - - void connect( sockstream& ) - { } - - void close() - { } - - static int get_visits() - { lock_guard<mutex> lk(lock); return visits; } - - static mutex lock; - static int cnt; - static int visits; -}; - -mutex worker::lock; -int worker::cnt = 0; -int worker::visits = 0; - class simple_mgr : public sock_basic_processor { @@ -145,7 +120,7 @@ return EXAM_RESULT; } -int EXAM_IMPL(sockios2_test::ctor_dtor) +int EXAM_IMPL(sockios2_test::connect_disconnect) { { simple_mgr srv( 2008 ); @@ -211,82 +186,261 @@ } } -#if 0 - // Check, that number of ctors of Cnt is the same as number of called dtors - // i.e. all created Cnt was released. + return EXAM_RESULT; +} + +class worker +{ + public: + worker( sockstream2& ) + { lock_guard<mutex> lk(lock); ++cnt; ++visits; cnd.notify_one(); } + + ~worker() + { lock_guard<mutex> lk(lock); --cnt; } + + void connect( sockstream2& s ) + { lock_guard<mutex> lk(lock); getline( s, line ); ++rd; line_cnd.notify_one(); } + +// void close() +// { } + + static int get_visits() + { lock_guard<mutex> lk(lock); return visits; } + + static mutex lock; + static int cnt; + static /* volatile */ int visits; + static condition_variable cnd; + static string line; + static condition_variable line_cnd; + static int rd; + // static barrier b; +}; + +mutex worker::lock; +int worker::cnt = 0; +/* volatile */ int worker::visits = 0; +condition_variable worker::cnd; +string worker::line; +condition_variable worker::line_cnd; +int worker::rd = 0; + +// barrier worker::b; + +// void stopper( connect_processor<worker>* prss ) +// { +// b.wait(); +// prss->close(); +// } + +bool visits_counter1() +{ + return worker::visits == 1; +} + +bool visits_counter2() +{ + return worker::visits == 2; +} + +bool rd_counter1() +{ + return worker::rd == 1; +} + +int EXAM_IMPL(sockios2_test::processor_core) +{ { - sockmgr_stream_MP<Cnt> srv( port ); + connect_processor<worker> prss( 2008 ); + EXAM_CHECK( prss.good() ); + EXAM_CHECK( prss.is_open() ); + { - sockstream s1( "localhost", port ); + sockstream2 s( "localhost", 2008 ); - EXAM_CHECK( s1.good() ); - EXAM_CHECK( s1.is_open() ); + EXAM_CHECK( s.good() ); + EXAM_CHECK( s.is_open() ); - s1 << "1234" << endl; +// for ( int i = 0; i < 64; ++i ) { // give chance to process it +// std::tr2::this_thread::yield(); +// } + unique_lock<mutex> lk( worker::lock ); + + worker::cnd.timed_wait( lk, milliseconds( 100 ), visits_counter1 ); + + EXAM_CHECK( worker::visits == 1 ); + worker::visits = 0; + } + } + { + lock_guard<mutex> lk( worker::lock ); + EXAM_CHECK( worker::cnt == 0 ); + } + + { + connect_processor<worker> prss( 2008 ); + + EXAM_CHECK( prss.good() ); + EXAM_CHECK( prss.is_open() ); + + { + sockstream2 s1( "localhost", 2008 ); + EXAM_CHECK( s1.good() ); EXAM_CHECK( s1.is_open() ); - while ( Cnt::get_visits() == 0 ) { - xmt::Thread::yield(); - } - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 1 ); - Cnt::lock.unlock(); - } - srv.close(); - srv.wait(); + sockstream2 s2( "localhost", 2008 ); - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 0 ); - Cnt::visits = 0; - Cnt::lock.unlock(); + EXAM_CHECK( s2.good() ); + EXAM_CHECK( s2.is_open() ); + +// for ( int i = 0; i < 1024; ++i ) { // give chance to process it +// std::tr2::this_thread::yield(); +// } + unique_lock<mutex> lk( worker::lock ); + + worker::cnd.timed_wait( lk, milliseconds( 100 ), visits_counter2 ); + + EXAM_CHECK( worker::visits == 2 ); + worker::visits = 0; + } } + { + lock_guard<mutex> lk( worker::lock ); + EXAM_CHECK( worker::cnt == 0 ); + } - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 0 ); - Cnt::lock.unlock(); + // check before sockstream2 was closed { - sockmgr_stream_MP<Cnt> srv( port ); + connect_processor<worker> prss( 2008 ); + EXAM_CHECK( prss.good() ); + EXAM_CHECK( prss.is_open() ); + { - sockstream s1( "localhost", port ); - sockstream s2( "localhost", port ); + sockstream2 s1( "localhost", 2008 ); EXAM_CHECK( s1.good() ); EXAM_CHECK( s1.is_open() ); - EXAM_CHECK( s2.good() ); - EXAM_CHECK( s2.is_open() ); - s1 << "1234" << endl; - s2 << "1234" << endl; + s1 << "Hello, world!" << endl; + unique_lock<mutex> lk( worker::lock ); + worker::cnd.timed_wait( lk, milliseconds( 100 ), rd_counter1 ); + + // cerr << worker::line << endl; + EXAM_CHECK( worker::line == "Hello, world!" ); + worker::line = ""; + worker::rd = 0; + } + } + + + EXAM_CHECK( worker::line == "" ); + + // check after sockstream2 was closed, i.e. ensure, that all data available read before close + { + connect_processor<worker> prss( 2008 ); + + EXAM_CHECK( prss.good() ); + EXAM_CHECK( prss.is_open() ); + + { + sockstream2 s1( "localhost", 2008 ); + EXAM_CHECK( s1.good() ); EXAM_CHECK( s1.is_open() ); - EXAM_CHECK( s2.good() ); - EXAM_CHECK( s2.is_open() ); - while ( Cnt::get_visits() < 2 ) { - xmt::Thread::yield(); - } - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 2 ); - Cnt::lock.unlock(); + + s1 << "Hello, world!" << endl; } - srv.close(); - srv.wait(); + unique_lock<mutex> lk( worker::lock ); + worker::cnd.timed_wait( lk, milliseconds( 100 ), rd_counter1 ); - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 0 ); - Cnt::lock.unlock(); + // cerr << worker::line << endl; + EXAM_CHECK( worker::line == "Hello, world!" ); + worker::line = ""; + worker::rd = 0; } - Cnt::lock.lock(); - EXAM_CHECK( Cnt::cnt == 0 ); - Cnt::lock.unlock(); -#endif + EXAM_CHECK( worker::line == "" ); return EXAM_RESULT; } + +int EXAM_IMPL(sockios2_test::fork) +{ + const char fname[] = "/tmp/sockios2_test.shm"; + + /* You must work very carefully with sockets, theads and fork: it unsafe in principle + and no way to make it safe. Never try to pass _opened_ connection via fork. + Here I create sockstream, but without connection (it check that io processing + loop in underlying sockmgr finish and restart smoothly in child process). + */ + sockstream2 s; + + // worker::lock.lock(); + worker::visits = 0; + // worker::lock.unlock(); + + try { + xmt::shm_alloc<0> seg; + seg.allocate( fname, 4096, xmt::shm_base::create | xmt::shm_base::exclusive, 0660 ); + + xmt::allocator_shm<barrier_ip,0> shm; + barrier_ip& b = *new ( shm.allocate( 1 ) ) barrier_ip(); + + try { + + EXAM_CHECK( worker::visits == 0 ); + + this_thread::fork(); + { + connect_processor<worker> prss( 2008 ); + + EXAM_CHECK_ASYNC( worker::visits == 0 ); + + b.wait(); // -- align here + + EXAM_CHECK_ASYNC( prss.good() ); + EXAM_CHECK_ASYNC( prss.is_open() ); + + unique_lock<mutex> lk( worker::lock ); + worker::cnd.timed_wait( lk, milliseconds( 100 ), visits_counter1 ); + + EXAM_CHECK_ASYNC( worker::visits == 1 ); + } + + exit( 0 ); + } + catch ( std::tr2::fork_in_parent& child ) { + b.wait(); // -- align here + + s.open( "localhost", 2008 ); + + EXAM_CHECK( s.good() ); + EXAM_CHECK( s.is_open() ); + + int stat = -1; + EXAM_CHECK( waitpid( child.pid(), &stat, 0 ) == child.pid() ); + if ( WIFEXITED(stat) ) { + EXAM_CHECK( WEXITSTATUS(stat) == 0 ); + } else { + EXAM_ERROR( "child interrupted" ); + } + + EXAM_CHECK( worker::visits == 0 ); + } + shm.deallocate( &b ); + seg.deallocate(); + unlink( fname ); + } + catch ( xmt::shm_bad_alloc& err ) { + EXAM_ERROR( err.what() ); + } + + return EXAM_RESULT; +} Modified: branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.h =================================================================== --- branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.h 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.h 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/18 08:52:26 ptr> +// -*- C++ -*- Time-stamp: <08/03/25 06:22:29 ptr> /* * @@ -22,7 +22,9 @@ ~sockios2_test(); int EXAM_DECL(srv_core); - int EXAM_DECL(ctor_dtor); + int EXAM_DECL(connect_disconnect); + int EXAM_DECL(processor_core); + int EXAM_DECL(fork); }; #endif // __sockios2_test_h Modified: branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc =================================================================== --- branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc 2008-03-26 10:57:21 UTC (rev 1834) +++ branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc 2008-03-26 11:10:43 UTC (rev 1835) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/19 00:19:19 ptr> +// -*- C++ -*- Time-stamp: <08/03/25 07:48:17 ptr> /* * @@ -80,8 +80,10 @@ sockios2_test test2; - t.add( &sockios2_test::ctor_dtor, test2, "sockios2_test::ctor_dtor", - t.add( &sockios2_test::srv_core, test2, "sockios2_test::srv_core" ) ); + t.add( &sockios2_test::fork, test2, "sockios2_test::fork", + t.add( &sockios2_test::processor_core, test2, "sockios2_test::processor_core", + t.add( &sockios2_test::connect_disconnect, test2, "sockios2_test::connect_disconnect", + t.add( &sockios2_test::srv_core, test2, "sockios2_test::srv_core" ) ) ) ); return t.girdle(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |