[complement-svn] SF.net SVN: complement: [1840] branches/complement-sockios/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-04-02 07:23:17
|
Revision: 1840 http://complement.svn.sourceforge.net/complement/?rev=1840&view=rev Author: complement Date: 2008-04-02 00:23:05 -0700 (Wed, 02 Apr 2008) Log Message: ----------- simplify sock_processor_base; partially fix absent of destruction of Connect elements when server died; still under construction Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/sp.h branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-04-01 14:44:59 UTC (rev 1839) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-04-02 07:23:05 UTC (rev 1840) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/04/01 18:40:57 yeti> +// -*- C++ -*- Time-stamp: <08/04/02 10:25:57 ptr> /* * Copyright (c) 2008 @@ -53,34 +53,54 @@ template <class charT, class traits, class _Alloc> class basic_sockstream2; template <class charT, class traits, class _Alloc> class sock_processor_base; -namespace detail { - -template<class charT, class traits, class _Alloc> -class _sock_processor_base : +template <class charT, class traits, class _Alloc> +class sock_processor_base : public sock_base2, public basic_socket<charT,traits,_Alloc> { private: typedef basic_socket<charT,traits,_Alloc> basic_socket_t; - protected: - _sock_processor_base() : + public: + typedef basic_sockstream2<charT,traits,_Alloc> sockstream_t; + + struct adopt_new_t { }; + struct adopt_close_t { }; + struct adopt_data_t { }; + + sock_processor_base() : _mode( ios_base::in | ios_base::out ), _state( ios_base::goodbit ) { } - virtual ~_sock_processor_base() + explicit sock_processor_base( int port, sock_base2::stype t = sock_base2::sock_stream ) + { sock_processor_base::open( port, t, sock_base2::inet ); } + + virtual ~sock_processor_base() + { sock_processor_base::close(); } + + void open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ); + + void open( unsigned long addr, int port, sock_base2::stype type, sock_base2::protocol prot ) { - _sock_processor_base::close(); + in_addr _addr; + _addr.s_addr = htonl( addr ); + sock_processor_base::open( _addr, port, type, prot ); } - public: - void open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ); - void open( unsigned long addr, int port, sock_base2::stype type, sock_base2::protocol prot ); - void open( int port, sock_base2::stype type, sock_base2::protocol prot ); + void open( int port, sock_base2::stype type, sock_base2::protocol prot ) + { sock_processor_base::open(INADDR_ANY, port, type, prot); } virtual void close(); + virtual void operator ()( sockstream_t& s, const adopt_new_t& ) = 0; + virtual void operator ()( sockstream_t& s, const adopt_close_t& ) = 0; + virtual void operator ()( sockstream_t& s, const adopt_data_t& ) = 0; + + private: + sock_processor_base( const sock_processor_base& ); + sock_processor_base& operator =( const sock_processor_base& ); + protected: void setoptions_unsafe( sock_base2::so_t optname, bool on_off = true, int __v = 0 ); @@ -101,20 +121,15 @@ } private: - _sock_processor_base( const _sock_processor_base& ); - _sock_processor_base& operator =( const _sock_processor_base& ); - - private: unsigned long _mode; // open mode unsigned long _state; // state flags protected: std::tr2::mutex _fd_lck; - // xmt::condition _loop_cnd; }; template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ) +void sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ) { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); if ( basic_socket_t::is_open_unsafe() ) { @@ -156,7 +171,7 @@ // I am shure, this is socket of type SOCK_STREAM | SOCK_SEQPACKET, // so don't check return code from listen ::listen( basic_socket_t::_fd, SOMAXCONN ); - basic_socket_t::mgr->push( dynamic_cast<sock_processor_base<charT,traits,_Alloc>&>(*this) ); + basic_socket_t::mgr->push( *this ); } } else if ( prot == sock_base2::local ) { return; @@ -169,27 +184,13 @@ } template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::open( unsigned long addr, int port, sock_base2::stype type, sock_base2::protocol prot ) +void sock_processor_base<charT,traits,_Alloc>::close() { - in_addr _addr; - _addr.s_addr = htonl( addr ); - _sock_processor_base::open( _addr, port, type, prot ); -} - -template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::open( int port, sock_base2::stype type, sock_base2::protocol prot ) -{ - _sock_processor_base::open(INADDR_ANY, port, type, prot); -} - -template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::close() -{ std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); if ( !basic_socket_t::is_open_unsafe() ) { return; } - basic_socket<charT,traits,_Alloc>::mgr->pop( dynamic_cast<sock_processor_base<charT,traits,_Alloc>&>(*this), basic_socket_t::_fd ); + basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); #ifdef WIN32 ::closesocket( basic_socket_t::_fd ); #else @@ -200,7 +201,7 @@ } template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base2::shutdownflg dir ) +void sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base2::shutdownflg dir ) { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); if ( basic_socket_t::is_open_unsafe() ) { @@ -216,7 +217,7 @@ } template<class charT, class traits, class _Alloc> -void _sock_processor_base<charT,traits,_Alloc>::setoptions_unsafe( sock_base2::so_t optname, bool on_off, int __v ) +void sock_processor_base<charT,traits,_Alloc>::setoptions_unsafe( sock_base2::so_t optname, bool on_off, int __v ) { #ifdef __unix if ( basic_socket_t::is_open_unsafe() ) { @@ -242,36 +243,6 @@ #endif // __unix } -} // namespace detail - -template <class charT, class traits, class _Alloc> -class sock_processor_base : - public detail::_sock_processor_base<charT,traits,_Alloc> -{ - private: - typedef detail::_sock_processor_base<charT,traits,_Alloc> sp_base_t; - - public: - typedef basic_sockstream2<charT,traits,_Alloc> sockstream_t; - - struct adopt_new_t { }; - struct adopt_close_t { }; - struct adopt_data_t { }; - - sock_processor_base() - { } - - explicit sock_processor_base( int port, sock_base2::stype t = sock_base2::sock_stream ) - { - sp_base_t::open( port, t, sock_base2::inet ); - } - - - virtual void operator ()( sockstream_t& s, const adopt_new_t& ) = 0; - virtual void operator ()( sockstream_t& s, const adopt_close_t& ) = 0; - virtual void operator ()( sockstream_t& s, const adopt_data_t& ) = 0; -}; - typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor; template <class Connect, class charT = char, class traits = std::char_traits<charT>, class _Alloc = std::allocator<charT>, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& ) = &Connect::connect > @@ -294,6 +265,7 @@ static void __at_fork_prepare(); static void __at_fork_child(); static void __at_fork_parent(); + static std::tr2::mutex _init_lock; static int _count; static bool _at_fork; }; @@ -320,6 +292,18 @@ if ( ploop.joinable() ) { ploop.join(); } + // { + // std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + for ( typename worker_pool_t::iterator i = worker_pool.begin(); i != worker_pool.end(); ++i ) { + delete i->second; + } + worker_pool.clear(); + // } + for ( typename ready_pool_t::iterator j = ready_pool.begin(); j != ready_pool.end(); ++j ) { + delete j->c; + } + ready_pool.clear(); + ((Init *)Init_buf)->~Init(); } @@ -428,10 +412,11 @@ bool connect_processor<Connect, charT, traits, _Alloc, C>::Init::_at_fork = false; template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> +std::tr2::mutex connect_processor<Connect, charT, traits, _Alloc, C>::Init::_init_lock; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::Init::_guard( int direction ) { - static std::tr2::mutex _init_lock; - if ( direction ) { std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); if ( _count++ == 0 ) { @@ -455,22 +440,23 @@ template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_prepare() -{ } +{ _init_lock.lock(); } template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_child() { - std::cerr << "SHOULD NEVER HAPPEN!!!!\n"; + _init_lock.unlock(); if ( _count != 0 ) { - + // std::cerr << "SHOULD NEVER HAPPEN!!!!\n"; + throw std::logic_error( "Fork while connect_processor working may has unexpected behaviour in child process" ); } // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); } template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_parent() -{ } +{ _init_lock.unlock(); } template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream2<charT,traits,_Alloc>& )> char connect_processor<Connect, charT, traits, _Alloc, C>::Init_buf[128]; @@ -586,8 +572,9 @@ processor p; while ( pop_ready( p ) ) { - // std::cerr << "worker 1\n"; + std::cerr << "worker 1\n"; (p.c->*C)( *p.s ); + std::cerr << "worker 2\n"; if ( p.s->rdbuf()->in_avail() ) { std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); ready_pool.push_back( p ); @@ -595,6 +582,7 @@ std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); worker_pool[p.s] = p.c; } + std::cerr << "worker 3\n"; } } @@ -875,7 +863,7 @@ epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ); // walk through descr and detach every .p ? descr.erase( ifd ); - // std::cerr << "Remove listener EPOLLRDHUP\n"; + std::cerr << "Remove listener EPOLLRDHUP\n"; } else if ( ev[i].events & EPOLLIN ) { sockaddr addr; socklen_t sz = sizeof( sockaddr_in ); @@ -891,6 +879,24 @@ // std::cerr << "Accept, listener " << ev[i].data.fd << ", errno " << errno << std::endl; // throw system_error ? } +#if 0 + { + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.find( ev[i].data.fd ); + if ( closed_ifd != closed_queue.end() ) { + typename fd_container_type::iterator ifd = descr.begin(); + for ( ; ifd != descr.end(); ) { + if ( ifd->second.p == closed_ifd->second.p ) { + descr.erase( ifd++ ); + } else { + ++ifd + } + } + closed_queue.erase( closed_ifd ); + } + } + +#endif break; } // std::cerr << "listener accept " << fd << std::endl; @@ -923,6 +929,8 @@ } if ( closed_ifd == closed_queue.end() ) { (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); + } else { + std::cerr << "@@@ 1\n" << std::endl; } } else { std::cerr << "Accept, delete " << fd << std::endl; @@ -985,6 +993,8 @@ } if ( closed_ifd == closed_queue.end() ) { (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); + } else { + std::cerr << "@@@ 2\n" << std::endl; } } break; @@ -1041,6 +1051,8 @@ } if ( closed_ifd == closed_queue.end() ) { (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); + } else { + std::cerr << "@@@ 3\n" << std::endl; } } } else { @@ -1068,6 +1080,8 @@ } if ( closed_ifd == closed_queue.end() ) { (*info.p)( *info.s.s, typename socks_processor_t::adopt_close_t() ); + } else { + std::cerr << "@@@ 4\n" << std::endl; } } if ( (info.flags & fd_info::owner) != 0 ) { Modified: branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc =================================================================== --- branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-04-01 14:44:59 UTC (rev 1839) +++ branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-04-02 07:23:05 UTC (rev 1840) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/03/27 07:22:24 ptr> +// -*- C++ -*- Time-stamp: <08/04/02 01:48:59 ptr> /* * @@ -42,6 +42,9 @@ sock_basic_processor( port, t ) { } + ~simple_mgr() + { cerr << "In destructor\n"; } + protected: virtual void operator ()( sockstream_t& s, const adopt_new_t& ) { lock_guard<mutex> lk(lock); b.wait(); ++n_cnt; } @@ -301,7 +304,7 @@ // } unique_lock<mutex> lk( worker::lock ); - worker::cnd.timed_wait( lk, milliseconds( 100 ), visits_counter2 ); + worker::cnd.timed_wait( lk, milliseconds( 500 ), visits_counter2 ); EXAM_CHECK( worker::visits == 2 ); worker::visits = 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |