[complement-svn] SF.net SVN: complement: [1913] branches/complement-sockios/explore/include/ sockio
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-06-26 05:45:16
|
Revision: 1913 http://complement.svn.sourceforge.net/complement/?rev=1913&view=rev Author: complement Date: 2008-06-25 22:45:14 -0700 (Wed, 25 Jun 2008) Log Message: ----------- Try to resolve problem: I don't see event on closed listener's descriptor Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/socksrv.cc branches/complement-sockios/explore/include/sockios/socksrv.h branches/complement-sockios/explore/include/sockios/sp.cc branches/complement-sockios/explore/include/sockios/sp.h Modified: branches/complement-sockios/explore/include/sockios/socksrv.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.cc 2008-06-26 05:44:53 UTC (rev 1912) +++ branches/complement-sockios/explore/include/sockios/socksrv.cc 2008-06-26 05:45:14 UTC (rev 1913) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 14:54:16 yeti> +// -*- C++ -*- Time-stamp: <08/06/18 22:25:39 yeti> /* * Copyright (c) 2008 @@ -72,19 +72,23 @@ if ( !basic_socket_t::is_open_unsafe() ) { return; } - basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + #ifdef WIN32 ::closesocket( basic_socket_t::_fd ); #else ::shutdown( basic_socket_t::_fd, 2 ); ::close( basic_socket_t::_fd ); #endif + basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); basic_socket_t::_fd = -1; } template<class charT, class traits, class _Alloc> void sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base::shutdownflg dir ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); if ( basic_socket_t::is_open_unsafe() ) { if ( (dir & (sock_base::stop_in | sock_base::stop_out)) == @@ -185,8 +189,10 @@ template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::close() { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; base_t::close(); +#if 0 { std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); _in_work = false; // <--- set before cnd.notify_one(); (below in this func) @@ -198,6 +204,7 @@ // std::cerr << "=== " << ready_pool.size() << std::endl; cnd.notify_one(); } +#endif } template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> @@ -286,9 +293,11 @@ p = ready_pool.front(); // it may contain p.c == 0, p.s == 0, if !in_work() ready_pool.pop_front(); // std::cerr << "pop 1\n"; +#if 0 if ( p.c == 0 ) { // wake up, but _in_work may be still true here (in processor pipe?), return false; // even I know that _in_work <- false before notification... } // so, check twice +#endif } // std::cerr << "pop 2\n"; @@ -319,4 +328,18 @@ } } + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::stop() +{ + std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); + _in_work = false; // <--- set before cnd.notify_one(); (below in this func) + + std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); + ready_pool.push_back( processor() ); // make ready_pool not empty + // std::cerr << "=== " << ready_pool.size() << std::endl; + cnd.notify_one(); + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; +} + } // namespace std Modified: branches/complement-sockios/explore/include/sockios/socksrv.h =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.h 2008-06-26 05:44:53 UTC (rev 1912) +++ branches/complement-sockios/explore/include/sockios/socksrv.h 2008-06-26 05:45:14 UTC (rev 1913) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 15:44:44 yeti> +// -*- C++ -*- Time-stamp: <08/06/18 22:25:25 yeti> /* * Copyright (c) 2008 @@ -86,6 +86,14 @@ { sock_processor_base::open(INADDR_ANY, port, type, prot); } virtual void close(); +#if 0 + virtual void stop() = 0; +#else + virtual void stop() + { /* abort(); */ } + // void stop() + // { (this->*_real_stop)(); } +#endif #if 0 virtual sockbuf_t* operator ()( sock_base::socket_type fd, const sockaddr& ) = 0; @@ -99,7 +107,6 @@ virtual void operator ()( sock_base::socket_type fd ) { abort(); } #endif - private: sock_processor_base( const sock_processor_base& ); sock_processor_base& operator =( const sock_processor_base& ); @@ -115,6 +122,8 @@ return s; } + void (sock_processor_base::*_real_stop)(); + public: bool is_open() const { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); return basic_socket_t::is_open_unsafe(); } @@ -173,14 +182,14 @@ not_empty( *this ), _in_work( false ), ploop( loop, this ) - { new( Init_buf ) Init(); } + { new( Init_buf ) Init(); /* base_t::_real_stop = &connect_processor::_xstop; */ } explicit connect_processor( int port ) : base_t( port, sock_base::sock_stream ), not_empty( *this ), _in_work( false ), ploop( loop, this ) - { new( Init_buf ) Init(); } + { new( Init_buf ) Init(); /* base_t::_real_stop = &connect_processor::_xstop; */ } virtual ~connect_processor() { @@ -190,9 +199,24 @@ ploop.join(); } - basic_socket<charT,traits,_Alloc>::mgr->final( *this ); + // basic_socket<charT,traits,_Alloc>::mgr->final( *this ); +#if 0 { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + if ( worker_pool.empty() && ready_pool.empty() ) { + break; + } + + for ( ; ; ) { + + } + } +#endif + + + { std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); cerr << __FILE__ << ":" << __LINE__ << " " << ready_pool.size() << endl; } @@ -200,16 +224,19 @@ { std::tr2::lock_guard<std::tr2::mutex> lk2( wklock ); cerr << __FILE__ << ":" << __LINE__ << " " << worker_pool.size() << endl; +#if 0 for ( typename worker_pool_t::iterator i = worker_pool.begin(); i != worker_pool.end(); ++i ) { delete i->second.c; delete i->second.s; } +#endif } ((Init *)Init_buf)->~Init(); } virtual void close(); + virtual void stop(); void wait() { if ( ploop.joinable() ) { ploop.join(); } } Modified: branches/complement-sockios/explore/include/sockios/sp.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-26 05:44:53 UTC (rev 1912) +++ branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-26 05:45:14 UTC (rev 1913) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 17:41:00 yeti> +// -*- C++ -*- Time-stamp: <08/06/18 22:37:15 yeti> /* * Copyright (c) 2008 @@ -45,6 +45,19 @@ for ( int i = 0; i < n; ++i ) { // std::cerr << "epoll i = " << i << std::endl; + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + for ( typename fd_container_type::iterator closed_ifd = closed_queue.begin(); closed_ifd != closed_queue.end(); ++closed_ifd ) { + if ( epoll_ctl( efd, EPOLL_CTL_DEL, closed_ifd->first, 0 ) < 0 ) { + // ignore + } + if ( closed_ifd->first == ev[i].data.fd ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + } + // descr.erase( closed_ifd->first ); + } + closed_queue.clear(); + // at this point closed queue empty + if ( ev[i].data.fd == pipefd[0] ) { // std::cerr << "on pipe\n"; cmd_from_pipe(); @@ -118,25 +131,22 @@ if ( ev_add.data.fd >= 0 ) { fd_info new_info = { 0, static_cast<sockbuf_t*>(_ctl.data.ptr), 0 }; descr[ev_add.data.fd] = new_info; - - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - - typename fd_container_type::iterator closed_ifd = closed_queue.find( ev_add.data.fd ); - if ( closed_ifd != closed_queue.end() ) { // reuse same fd? - closed_queue.erase( closed_ifd ); - if ( epoll_ctl( efd, EPOLL_CTL_DEL, ev_add.data.fd, 0 ) < 0 ) { - // throw system_error - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - } - // descr.erase( ifd ); - } - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { descr.erase( ev_add.data.fd ); // throw system_error } } break; + case listener_on_exit: + listeners_final.insert( _ctl.data.ptr ); + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + { + int lfd = check_closed_listener( reinterpret_cast<socks_processor_t*>(_ctl.data.ptr) ); + if ( lfd != -1 ) { + descr.erase( -1 ); + } + } + break; case rqstop: // std::cerr << "Stop request\n"; throw std::detail::stop_request(); // runtime_error( "Stop request (normal flow)" ); @@ -147,6 +157,7 @@ template<class charT, class traits, class _Alloc> void sockmgr<charT,traits,_Alloc>::process_listener( epoll_event& ev, typename sockmgr<charT,traits,_Alloc>::fd_container_type::iterator ifd ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( ev.events & EPOLLRDHUP ) { if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { // throw system_error @@ -161,36 +172,21 @@ } } + listeners_final.insert(static_cast<void *>(ifd->second.p)); + + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + check_closed_listener( ifd->second.p ); + descr.erase( ifd ); - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.find( ev.data.fd ); - if ( closed_ifd != closed_queue.end() && closed_ifd->second.p == ifd->second.p ) { - // listener in process of close - closed_queue.erase( closed_ifd ); - } - return; } if ( (ev.events & EPOLLIN) == 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; return; // I don't know what to do this case... } - // { - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - for ( typename fd_container_type::iterator closed_ifd = closed_queue.begin(); closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( epoll_ctl( efd, EPOLL_CTL_DEL, closed_ifd->first, 0 ) < 0 ) { - // ignore - } - if ( closed_ifd->first == ifd->first ) { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - } - descr.erase( closed_ifd->first ); - } - closed_queue.clear(); - // at this point closed queue empty - sockaddr addr; socklen_t sz = sizeof( sockaddr_in ); @@ -223,16 +219,14 @@ } } + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + + listeners_final.insert(static_cast<void *>(ifd->second.p)); + + check_closed_listener( ifd->second.p ); + descr.erase( ifd ); - // check closed_queue, due to ifd->second.p->close(); add record in it -// std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.find( ev.data.fd ); - if ( closed_ifd != closed_queue.end() && closed_ifd->second.p == ifd->second.p ) { - // listener in process of close - closed_queue.erase( closed_ifd ); - } - // throw system_error ? std::cerr << __FILE__ << ":" << __LINE__ << std::endl; } else { // back to listen errno = 0; @@ -287,20 +281,6 @@ fd_info& info = ifd->second; - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - - for ( typename fd_container_type::iterator closed_ifd = closed_queue.begin(); closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( epoll_ctl( efd, EPOLL_CTL_DEL, closed_ifd->first, 0 ) < 0 ) { - // ignore - } - if ( closed_ifd->first == ifd->first ) { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - } - descr.erase( closed_ifd->first ); - } - closed_queue.clear(); - // at this point closed queue empty - sockbuf_t* b = info.b; if ( b == 0 ) { // marginal case: sockbuf wasn't created by processor... if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { @@ -308,6 +288,8 @@ } if ( info.p != 0 ) { // ... but controlled by processor (*info.p)( ifd->first, typename socks_processor_t::adopt_close_t() ); + + check_closed_listener( info.p ); } descr.erase( ifd ); std::cerr << __FILE__ << ":" << __LINE__ << std::endl; @@ -390,7 +372,7 @@ (*info.p)( ev.data.fd ); } } else { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << std::endl; // std::cerr << "K " << ev.data.fd << ", " << errno << std::endl; // EPOLLRDHUP may be missed in kernel, but offset 0 is the same ev.events |= EPOLLRDHUP; // will be processed below @@ -410,11 +392,12 @@ if ( info.p != 0 ) { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; (*info.p)( ifd->first, typename socks_processor_t::adopt_close_t() ); + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + check_closed_listener( info.p ); } else { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; b->close(); } - // std::tr2::lock_guard<std::tr2::mutex> lck( cll ); closed_queue.erase( ev.data.fd ); descr.erase( ifd ); } @@ -457,7 +440,6 @@ ++ifd; } } -#endif std::tr2::lock_guard<std::tr2::mutex> lk( cll ); @@ -469,8 +451,42 @@ ++closed_ifd; } } +#endif } +template<class charT, class traits, class _Alloc> +int sockmgr<charT,traits,_Alloc>::check_closed_listener( socks_processor_t* p ) +{ + int myfd = -1; + + if ( !listeners_final.empty() ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + if ( listeners_final.find( static_cast<void*>(p) ) != listeners_final.end() ) { + for ( typename fd_container_type::iterator i = descr.begin(); i != descr.end(); ++i ) { + if ( i->second.p == p ) { + if ( (i->second.flags & fd_info::listener) == 0 ) { // it's not me! + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + return -1; + } + myfd = i->first; + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + } + } + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + + // no more connection with this listener + listeners_final.erase( static_cast<void*>(p) ); + + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + + p->stop(); + } + } + + return myfd; +} + + } // namespace detail } // namespace std Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-06-26 05:44:53 UTC (rev 1912) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-06-26 05:45:14 UTC (rev 1913) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 15:09:45 yeti> +// -*- C++ -*- Time-stamp: <08/06/18 22:36:24 yeti> /* * Copyright (c) 2008 @@ -80,7 +80,8 @@ listener, tcp_buffer, rqstop, - rqstart + rqstart, + listener_on_exit }; struct fd_info @@ -180,9 +181,18 @@ void pop( socks_processor_t& p, sock_base::socket_type _fd ) { - fd_info info = { fd_info::listener, 0, &p }; - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - closed_queue[_fd] = info; + ctl _ctl; + _ctl.cmd = listener_on_exit; + _ctl.data.ptr = reinterpret_cast<void *>(&p); + + int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); + if ( r < 0 || r != sizeof(ctl) ) { + throw std::runtime_error( "can't write to pipe" ); + } + +// fd_info info = { fd_info::listener, 0, &p }; +// std::tr2::lock_guard<std::tr2::mutex> lk( cll ); +// closed_queue[_fd] = info; } void final( socks_processor_t& p ); @@ -200,15 +210,19 @@ sockmgr& operator =( const sockmgr& ) { return *this; } + int check_closed_listener( socks_processor_t* p ); #ifdef __USE_STLPORT_HASH typedef std::hash_map<sock_base::socket_type,fd_info> fd_container_type; + typedef std::hash_set<void *> listener_container_type; #endif #ifdef __USE_STD_HASH typedef __gnu_cxx::hash_map<sock_base::socket_type, fd_info> fd_container_type; + typedef __gnu_cxx::hash_set<void *> listener_container_type; #endif #if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) typedef std::tr1::unordered_map<sock_base::socket_type, fd_info> fd_container_type; + typedef std::tr1::unordered_set<void *> listener_container_type; #endif void io_worker(); @@ -223,6 +237,7 @@ fd_container_type descr; fd_container_type closed_queue; + listener_container_type listeners_final; std::tr2::mutex cll; std::tr2::mutex dll; }; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |