[complement-svn] SF.net SVN: complement:[1973] trunk/complement/explore/include/sockios
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-10-13 10:36:27
|
Revision: 1973 http://complement.svn.sourceforge.net/complement/?rev=1973&view=rev Author: complement Date: 2008-10-13 10:36:17 +0000 (Mon, 13 Oct 2008) Log Message: ----------- sock_processor_base should wait release from sock_mgr sock_processor_base may be accessed from sock_mgr after death, so reference counting added; sock_processor_base will pass dtor after sock_mgr lost reference to it (i.e. all clients of sock_processor detached and destroyed); the same thoughts for pass pointer to sock_processor_base via pipe to sock_mgr. sock_processor may inform sockmgr about termination via two channels: i) via listener's descriptor (EPOLLRDHUP | EPOLLHUP | EPOLLERR, in sockmgr<charT,traits,_Alloc>::process_listener); ii) via command on pipe ('listener_on_exit'). Because i) may never happens (or occur after ii)), removing sock_processor from sockmgr tables done only when ii) processing, and not done when processing i). Before clean issues above, in ~sock_processor_base() condition wait with timeout and print line if timeout occur (signal about error); in sock_processor_base::release() checked counter, and if it become negative (i.e. extra release() call), print stack (this is a error, because of possible access to destroyed object). Modified Paths: -------------- trunk/complement/explore/include/sockios/sockmgr.cc trunk/complement/explore/include/sockios/sockmgr.h trunk/complement/explore/include/sockios/socksrv.cc trunk/complement/explore/include/sockios/socksrv.h trunk/complement/explore/include/sockios/sockstream Modified: trunk/complement/explore/include/sockios/sockmgr.cc =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.cc 2008-10-13 10:35:31 UTC (rev 1972) +++ trunk/complement/explore/include/sockios/sockmgr.cc 2008-10-13 10:36:17 UTC (rev 1973) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/07/01 14:40:03 yeti> +// -*- C++ -*- Time-stamp: <08/10/07 01:12:40 ptr> /* * Copyright (c) 2008 @@ -48,7 +48,7 @@ } else { typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); if ( ifd == descr.end() ) { - // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ev[i].data.fd, 0 ) < 0 ) { // throw system_error } @@ -91,17 +91,23 @@ switch ( _ctl.cmd ) { case listener: ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; +#if 1 + ev_add.data.u64 = 0ULL; +#endif ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); if ( ev_add.data.fd >= 0 ) { if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; + static_cast<socks_processor_t*>(_ctl.data.ptr)->release(); throw std::runtime_error( "can't establish nonblock mode on listener" ); } if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; // descr.erase( ev_add.data.fd ); // throw system_error + static_cast<socks_processor_t*>(_ctl.data.ptr)->release(); return; } } else { @@ -109,6 +115,7 @@ if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; // throw system_error + static_cast<socks_processor_t*>(_ctl.data.ptr)->release(); return; } } @@ -117,9 +124,13 @@ break; case tcp_buffer: ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; +#if 1 + ev_add.data.u64 = 0ULL; +#endif ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); if ( ev_add.data.fd >= 0 ) { if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; // descr.erase( ev_add.data.fd ); @@ -129,9 +140,8 @@ } else { // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - // throw system_error - return; + // std::cerr << __FILE__ << ":" << __LINE__ << " " << system_error( posix_error::make_error_code( static_cast<posix_error::posix_errno>(errno) ) ).what() << std::endl; + return; // already closed? } } descr[ev_add.data.fd] = fd_info( static_cast<sockbuf_t*>(_ctl.data.ptr) ); @@ -147,6 +157,7 @@ } // dump_descr(); } + static_cast<socks_processor_t*>(_ctl.data.ptr)->release(); break; case rqstop: // std::cerr << "Stop request\n"; @@ -162,8 +173,9 @@ if ( ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) { // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { - // throw system_error - std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << system_error( posix_error::make_error_code( static_cast<posix_error::posix_errno>(errno) ) ).what() << std::endl; + + // already closed? } if ( ifd->second.p != 0 ) { @@ -175,6 +187,7 @@ } } +#if 0 // do it once, after 'listener_on_exit' command via pipe listeners_final.insert(static_cast<void *>(ifd->second.p)); // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; @@ -188,6 +201,7 @@ if ( lfd != -1 ) { descr.erase( lfd ); } +#endif // dump_descr(); @@ -245,7 +259,11 @@ errno = 0; epoll_event xev; xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; +#if 1 + xev.data.u64 = 0ULL; +#endif xev.data.fd = ev.data.fd; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; @@ -262,6 +280,9 @@ try { epoll_event ev_add; ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; +#if 1 + ev_add.data.u64 = 0ULL; +#endif ev_add.data.fd = fd; if ( descr.find( fd ) != descr.end() ) { // reuse? @@ -338,6 +359,7 @@ xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; xev.data.fd = ev.data.fd; info.flags |= fd_info::level_triggered; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; } @@ -367,7 +389,11 @@ { epoll_event xev; xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; +#if 1 + xev.data.u64 = 0ULL; +#endif xev.data.fd = ev.data.fd; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; } @@ -386,6 +412,7 @@ xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; xev.data.fd = ev.data.fd; info.flags &= ~static_cast<unsigned>(fd_info::level_triggered); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; } @@ -411,6 +438,7 @@ // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( info.p != 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { // throw system_error std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; @@ -471,6 +499,7 @@ { int myfd = -1; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( !listeners_final.empty() ) { // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( listeners_final.find( static_cast<void*>(p) ) != listeners_final.end() ) { @@ -494,6 +523,7 @@ // if ( myfd != -1 ) { // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; p->stop(); + p->release(); // socks_processor_t* p not under sockmgr control more // } } } Modified: trunk/complement/explore/include/sockios/sockmgr.h =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.h 2008-10-13 10:35:31 UTC (rev 1972) +++ trunk/complement/explore/include/sockios/sockmgr.h 2008-10-13 10:36:17 UTC (rev 1973) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/09/08 23:18:28 ptr> +// -*- C++ -*- Time-stamp: <08/10/07 01:11:58 ptr> /* * Copyright (c) 2008 @@ -49,6 +49,8 @@ #include <deque> #include <functional> +// #include <boost/shared_ptr.hpp> + namespace std { template <class charT, class traits, class _Alloc> class basic_sockbuf; @@ -128,7 +130,7 @@ { } unsigned flags; - sockbuf_t* b; + sockbuf_t* b; socks_processor_t* p; }; @@ -148,6 +150,7 @@ sockmgr( int hint = 1024, int ret = 512 ) : n_ret( ret ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; efd = epoll_create( hint ); if ( efd < 0 ) { // throw system_error( errno ) @@ -162,10 +165,14 @@ epoll_event ev_add; ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; +#if 1 + ev_add.data.u64 = 0ULL; +#endif ev_add.data.fd = pipefd[0]; epoll_ctl( efd, EPOLL_CTL_ADD, pipefd[0], &ev_add ); _worker = new std::tr2::thread( _loop, this ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; // ctl _ctl; // _ctl.cmd = rqstart; @@ -192,18 +199,22 @@ void push( socks_processor_t& p ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << p.use_count() << std::endl; ctl _ctl; _ctl.cmd = listener; _ctl.data.ptr = static_cast<void *>(&p); + p.addref(); int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); if ( r < 0 || r != sizeof(ctl) ) { + p.release(); throw std::runtime_error( "can't write to pipe" ); } } void push( sockbuf_t& s ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; ctl _ctl; _ctl.cmd = tcp_buffer; _ctl.data.ptr = static_cast<void *>(&s); @@ -221,14 +232,17 @@ _ctl.cmd = listener_on_exit; _ctl.data.ptr = reinterpret_cast<void *>(&p); + p.addref(); int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); if ( r < 0 || r != sizeof(ctl) ) { + p.release(); throw std::runtime_error( "can't write to pipe" ); } } void exit_notify( sockbuf_t* b, sock_base::socket_type fd ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; try { std::tr2::unique_lock<std::tr2::mutex> lk( dll, std::tr2::defer_lock ); Modified: trunk/complement/explore/include/sockios/socksrv.cc =================================================================== --- trunk/complement/explore/include/sockios/socksrv.cc 2008-10-13 10:35:31 UTC (rev 1972) +++ trunk/complement/explore/include/sockios/socksrv.cc 2008-10-13 10:36:17 UTC (rev 1973) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/07/09 11:09:53 ptr> +// -*- C++ -*- Time-stamp: <08/10/07 00:00:28 ptr> /* * Copyright (c) 2008 @@ -54,6 +54,7 @@ // so don't check return code from listen ::listen( basic_socket_t::_fd, SOMAXCONN ); basic_socket_t::mgr->push( *this ); + // std::cerr << __FILE__ << ":" << __LINE__ << " " << count() << std::endl; } } else if ( prot == sock_base::local ) { return; Modified: trunk/complement/explore/include/sockios/socksrv.h =================================================================== --- trunk/complement/explore/include/sockios/socksrv.h 2008-10-13 10:35:31 UTC (rev 1972) +++ trunk/complement/explore/include/sockios/socksrv.h 2008-10-13 10:36:17 UTC (rev 1973) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/07/09 01:02:12 ptr> +// -*- C++ -*- Time-stamp: <08/10/06 23:50:22 ptr> /* * Copyright (c) 2008 @@ -37,6 +37,10 @@ #include <functional> #include <exception> +// #include <boost/shared_ptr.hpp> + +#include <mt/callstack.h> + namespace std { template <class charT, class traits, class _Alloc> class basic_sockbuf; @@ -59,15 +63,51 @@ sock_processor_base() : _mode( ios_base::in | ios_base::out ), - _state( ios_base::goodbit ) + _state( ios_base::goodbit ), + _chk( *this ), + _rcount( 0 ) { } - explicit sock_processor_base( int port, sock_base::stype t = sock_base::sock_stream ) + explicit sock_processor_base( int port, sock_base::stype t = sock_base::sock_stream ) : + _chk( *this ), + _rcount( 0 ) { sock_processor_base::open( port, t, sock_base::inet ); } virtual ~sock_processor_base() - { sock_processor_base::_close(); } + { + sock_processor_base::_close(); + std::tr2::unique_lock<std::tr2::mutex> lk(_cnt_lck); + // _cnt_cnd.wait( lk, _chk ); + if ( !_cnt_cnd.timed_wait( lk, std::tr2::seconds(1), _chk ) ) { // <-- debug + std::cerr << __FILE__ << ":" << __LINE__ << " " << _rcount << std::endl; + } + } + + void addref() + { + std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck); + ++_rcount; + } + + void release() + { + std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck); + if ( --_rcount == 0 ) { + _cnt_cnd.notify_one(); + } + if ( _rcount < 0 ) { // <-- debug + xmt::callstack( std::cerr ); + } + } + + int count() + { + std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck); + int tmp = _rcount; + return tmp; + } + void open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ); void open( unsigned long addr, int port, sock_base::stype type, sock_base::protocol prot ) @@ -138,6 +178,28 @@ protected: std::tr2::mutex _fd_lck; + + class _cnt_checker + { + public: + _cnt_checker( sock_processor_base<charT,traits,_Alloc>& _p ) : + p( _p ) + { } + + private: + sock_processor_base<charT,traits,_Alloc>& p; + + public: + bool operator ()() const + { return p._rcount == 0; } + }; + + _cnt_checker _chk; + std::tr2::mutex _cnt_lck; + std::tr2::condition_variable _cnt_cnd; + int _rcount; + + friend class _cnt_checker; }; typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor; Modified: trunk/complement/explore/include/sockios/sockstream =================================================================== --- trunk/complement/explore/include/sockios/sockstream 2008-10-13 10:35:31 UTC (rev 1972) +++ trunk/complement/explore/include/sockios/sockstream 2008-10-13 10:36:17 UTC (rev 1973) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/10/01 00:27:31 ptr> +// -*- C++ -*- Time-stamp: <08/10/07 00:01:51 ptr> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -306,6 +306,7 @@ if ( direction ) { std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); if ( _count++ == 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; basic_socket<charT,traits,_Alloc>::mgr = new std::detail::sockmgr<charT,traits,_Alloc>(); #ifdef __FIT_PTHREADS if ( !_at_fork ) { // call only once This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |