[complement-svn] SF.net SVN: complement: [1924] trunk/complement/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-06-27 11:20:51
|
Revision: 1924 http://complement.svn.sourceforge.net/complement/?rev=1924&view=rev Author: complement Date: 2008-06-27 04:20:41 -0700 (Fri, 27 Jun 2008) Log Message: ----------- Merge branch 'master' of /export/hostel/pub/scm/complement Modified Paths: -------------- trunk/complement/explore/include/sockios/sockmgr.cc trunk/complement/explore/include/sockios/sockmgr.h trunk/complement/explore/include/sockios/sockstream trunk/complement/explore/include/sockios/sockstream.cc trunk/complement/explore/include/stem/Cron.h trunk/complement/explore/include/stem/EvManager.h trunk/complement/explore/include/stem/EventHandler.h trunk/complement/explore/include/stem/NetTransport.h trunk/complement/explore/lib/sockios/ChangeLog trunk/complement/explore/lib/sockios/Makefile.inc trunk/complement/explore/lib/sockios/_sockmgr.cc trunk/complement/explore/lib/sockios/ut/Makefile.inc trunk/complement/explore/lib/sockios/ut/names.cc trunk/complement/explore/lib/sockios/ut/sockios_test.cc trunk/complement/explore/lib/sockios/ut/sockios_test.h trunk/complement/explore/lib/sockios/ut/sockios_test_suite.cc trunk/complement/explore/lib/stem/ChangeLog trunk/complement/explore/lib/stem/Cron.cc trunk/complement/explore/lib/stem/EvManager.cc trunk/complement/explore/lib/stem/Makefile.inc trunk/complement/explore/lib/stem/NetTransport.cc trunk/complement/explore/lib/stem/_EventHandler.cc trunk/complement/explore/lib/stem/ut/unit_test.cc Added Paths: ----------- trunk/complement/explore/include/sockios/socksrv.cc trunk/complement/explore/include/sockios/socksrv.h trunk/complement/explore/lib/sockios/ut/sockios2_test.cc trunk/complement/explore/lib/sockios/ut/sockios2_test.h Modified: trunk/complement/explore/include/sockios/sockmgr.cc =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.cc 2008-06-27 10:28:54 UTC (rev 1923) +++ trunk/complement/explore/include/sockios/sockmgr.cc 2008-06-27 11:20:41 UTC (rev 1924) @@ -1,595 +1,515 @@ -// -*- C++ -*- Time-stamp: <07/09/19 11:43:21 ptr> +// -*- C++ -*- Time-stamp: <08/06/26 09:10:47 ptr> /* - * Copyright (c) 1997-1999, 2002, 2003, 2005-2007 + * Copyright (c) 2008 * Petr Ovtchenkov * - * Portion Copyright (c) 1999-2001 - * Parallel Graphics Ltd. - * * Licensed under the Academic Free License Version 3.0 * */ -#include <algorithm> -#include <functional> - -// #ifdef __unix -// extern "C" int nanosleep(const struct timespec *, struct timespec *); -// #endif - -#ifdef STLPORT -_STLP_BEGIN_NAMESPACE -#else namespace std { -#endif -#ifndef __FIT_NO_POLL +namespace detail { -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -void sockmgr_stream_MP<Connect,C,T>::_open( sock_base::stype t ) +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::io_worker() { - xmt::scoped_lock lk(_fd_lck); - if ( is_open_unsafe() ) { - if ( t == sock_base::sock_stream ) { - _accept = &_Self_type::accept_tcp; - _pfd.reserve( 32 ); - if ( _pfd.size() == 0 ) { - int pipefd[2]; - pipe( pipefd ); // check err - _cfd = pipefd[1]; + epoll_event ev[/*n_ret*/ 512 ]; - _pfd.resize( 2 ); - _pfd[0].fd = fd_unsafe(); - _pfd[0].events = POLLIN; - _pfd[1].fd = pipefd[0]; - _pfd[1].events = POLLIN; - } - } else if ( t == sock_base::sock_dgram ) { - _accept = &_Self_type::accept_udp; - if ( _pfd.size() == 0 ) { - int pipefd[2]; - pipe( pipefd ); // check err - _cfd = pipefd[1]; +/* + ctl _xctl; + int r = read( pipefd[0], &_xctl, sizeof(ctl) ); - _pfd.resize( 2 ); - _pfd[0].fd = fd_unsafe(); - _pfd[0].events = POLLIN; - _pfd[1].fd = pipefd[0]; - _pfd[1].events = POLLIN; - } - } else { - throw invalid_argument( "sockmgr_stream_MP" ); - } - - _loop_cnd.set( false ); - loop_thr.launch( loop, this /* , 0, PTHREAD_STACK_MIN * 2 */ ); - _loop_cnd.try_wait(); + if ( _xctl.cmd == rqstart ) { + std::cerr << "io_worker fine" << std::endl; + } else { + std::cerr << "io_worker not fine, " << r << ", " << errno << std::endl; } -} +*/ -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -void sockmgr_stream_MP<Connect,C,T>::open( const in_addr& addr, int port, sock_base::stype t ) -{ - basic_sockmgr::open( addr, port, t, sock_base::inet ); - sockmgr_stream_MP<Connect,C,T>::_open( t ); -} + try { + for ( ; ; ) { + int n = epoll_wait( efd, &ev[0], /* n_ret */ 512, -1 ); -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -void sockmgr_stream_MP<Connect,C,T>::open( unsigned long addr, int port, sock_base::stype t ) -{ - basic_sockmgr::open( addr, port, t, sock_base::inet ); - sockmgr_stream_MP<Connect,C,T>::_open( t ); -} + if ( n < 0 ) { + if ( errno == EINTR ) { + errno = 0; + continue; + } + // throw system_error + } -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -void sockmgr_stream_MP<Connect,C,T>::open( int port, sock_base::stype t ) -{ - basic_sockmgr::open( port, t, sock_base::inet ); - sockmgr_stream_MP<Connect,C,T>::_open( t ); -} + std::tr2::lock_guard<std::tr2::mutex> lk( dll ); -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -bool sockmgr_stream_MP<Connect,C,T>::_shift_fd() -{ - bool ret = false; - typename iterator_traits<typename _fd_sequence::iterator>::difference_type d; + for ( int i = 0; i < n; ++i ) { + if ( ev[i].data.fd == pipefd[0] ) { + cmd_from_pipe(); + } else { + typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); + if ( ifd == descr.end() ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ev[i].data.fd, 0 ) < 0 ) { + // throw system_error + } + continue; + // throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); + } - for ( _fd_sequence::iterator j = _pfd.begin() + 2; j != _pfd.end(); ++j ) { // _pfd at least 2 in size - // if ( j->fd == -1 ) { - // cerr << __FILE__ << ":" << __LINE__ << endl; - // } - if ( j->revents != 0 ) { - xmt::scoped_lock _l( _c_lock ); - // We should distinguish closed socket from income message - typename container_type::iterator i = - find_if( _M_c.begin(), _M_c.end(), bind2nd( _M_comp, j->fd ) ); - // Solaris return ERROR on poll, before close socket - if ( i == _M_c.end() ) { - // Socket already closed (may be after read/write failure) - // this way may not notify poll (like in HP-UX 11.00) via POLLERR flag - // as made in Solaris - d = j - _pfd.begin(); - _pfd.erase( j ); - j = _pfd.begin() + (d - 1); - i = _M_c.begin(); - while ( (i = find_if( i, _M_c.end(), bind2nd( _M_comp, -1 ) )) != _M_c.end() ) { - _dlock.lock(); - _conn_pool.erase( std::remove( _conn_pool.begin(), _conn_pool.end(), i ), _conn_pool.end() ); - _dlock.unlock(); - _M_c.erase( i++ ); + fd_info& info = ifd->second; + if ( info.flags & fd_info::listener ) { + process_listener( ev[i], ifd ); + } else { + process_regular( ev[i], ifd ); + } } - } else if ( j->revents & POLLERR /* | POLLHUP | POLLNVAL */ ) { - // poll first see closed socket - d = j - _pfd.begin(); - _pfd.erase( j ); - j = _pfd.begin() + (d - 1); - _dlock.lock(); - _conn_pool.erase( std::remove( _conn_pool.begin(), _conn_pool.end(), i ), _conn_pool.end() ); - _dlock.unlock(); - _M_c.erase( i ); - } else { - // Check that other side close socket: - // on Linux and (?) Solaris I see normal POLLIN event, and see error - // only after attempt to read something. - // Due to this fd isn't stream (it's upper than stream), - // I can't use ioctl with I_PEEK command here. - char x; - int nr = recv( j->fd, reinterpret_cast<void *>(&x), 1, MSG_PEEK ); - if ( nr <= 0 ) { // I can't read even one byte: this designate closed - // socket operation - d = j - _pfd.begin(); - _pfd.erase( j ); - j = _pfd.begin() + (d - 1); - _dlock.lock(); - _conn_pool.erase( std::remove( _conn_pool.begin(), _conn_pool.end(), i ), _conn_pool.end() ); - _dlock.unlock(); - _M_c.erase( i ); - } else { // normal data available for reading - _dlock.lock(); - _conn_pool.push_back( i ); - // xmt::Thread::gettime( &_tpush ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _dlock.unlock(); - - /* erase: I don't want to listen this socket - * (it may be polled elsewhere, during connection - * processing). - * This help to avoid unwanted processing of the same socket - * in different threads: the socket in process can't - * come here before it will be re-added after processing - */ - d = j - _pfd.begin(); - _pfd.erase( j ); - j = _pfd.begin() + (d - 1); - - ret = true; // mark that I add somthing in connection queue - } } } } - - return ret; + catch ( std::detail::stop_request& ) { + // this is possible, normal flow of operation + } + catch ( std::exception& e ) { + std::cerr << e.what() << std::endl; + } } -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -bool sockmgr_stream_MP<Connect,C,T>::accept_tcp() +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::cmd_from_pipe() { - if ( !is_open() ) { - return false; + epoll_event ev_add; + ctl _ctl; + + int r = read( pipefd[0], &_ctl, sizeof(ctl) ); + if ( r < 0 ) { + // throw system_error + throw std::detail::stop_request(); // runtime_error( "Stop request (normal flow)" ); + } else if ( r == 0 ) { + throw runtime_error( "Read pipe return 0" ); } - _xsockaddr addr; - socklen_t sz = sizeof( sockaddr_in ); - bool _in_buf; - - do { - _in_buf = false; - _pfd[0].revents = 0; - _pfd[1].revents = 0; - while ( poll( &_pfd[0], _pfd.size(), -1 ) < 0 ) { // wait infinite - if ( errno == EINTR ) { // may be interrupted, check and ignore - errno = 0; - continue; + switch ( _ctl.cmd ) { + case listener: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; + throw std::runtime_error( "can't establish nonblock mode on listener" ); + } + if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // descr.erase( ev_add.data.fd ); + // throw system_error + return; + } + } else { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // throw system_error + return; + } + } + descr[ev_add.data.fd] = fd_info( static_cast<socks_processor_t*>(_ctl.data.ptr) ); } - return false; // poll wait infinite, so it can't return 0 (timeout), so it return -1. - } + break; + case tcp_buffer: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // descr.erase( ev_add.data.fd ); + // throw system_error + return; + } + } else { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // throw system_error + return; + } + } + descr[ev_add.data.fd] = fd_info( static_cast<sockbuf_t*>(_ctl.data.ptr) ); + } + break; + case listener_on_exit: + listeners_final.insert( _ctl.data.ptr ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + { + int lfd = check_closed_listener( reinterpret_cast<socks_processor_t*>(_ctl.data.ptr) ); + if ( lfd != -1 ) { + descr.erase( lfd ); + } + // dump_descr(); + } + break; + case rqstop: + // std::cerr << "Stop request\n"; + throw std::detail::stop_request(); // runtime_error( "Stop request (normal flow)" ); + // break; + } +} - // New connction open before data read from opened sockets. - // This policy may be worth to revise. - - if ( (_pfd[0].revents & POLLERR) != 0 || (_pfd[1].revents & POLLERR) != 0 ) { - return false; // return 0 only for binded socket, or control socket +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::process_listener( epoll_event& ev, typename sockmgr<charT,traits,_Alloc>::fd_container_type::iterator ifd ) +{ + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + if ( ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; } - if ( _pfd[0].revents != 0 ) { - xmt::scoped_lock lk(_fd_lck); - if ( !is_open_unsafe() ) { // may be already closed - return false; + if ( ifd->second.p != 0 ) { + ifd->second.p->close(); + for ( typename fd_container_type::iterator i = descr.begin(); i != descr.end(); ++i ) { + if ( (i->second.p == ifd->second.p) && (i->second.b != 0) ) { + i->second.b->shutdown( sock_base::stop_in | sock_base::stop_out ); + } } - // poll found event on binded socket - sock_base::socket_type _sd = ::accept( fd_unsafe(), &addr.any, &sz ); - if ( _sd == -1 ) { - // check and set errno - // _STLP_ASSERT( _sd == -1 ); - return false; - } + } - try { - xmt::scoped_lock _l( _c_lock ); - _M_c.push_back( _Connect() ); - _M_c.back().open( _sd, addr.any ); - _Connect *cl_new = &_M_c.back(); - if ( cl_new->s.rdbuf()->in_avail() > 0 ) { - // this is the case when user read from sockstream - // in ctor above; push processing of this stream - xmt::scoped_lock lk(_dlock); - _conn_pool.push_back( --_M_c.end() ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _in_buf = true; - } + listeners_final.insert(static_cast<void *>(ifd->second.p)); - pollfd newfd; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - newfd.fd = _sd; - newfd.events = POLLIN; - newfd.revents = 0; + socks_processor_t* p = ifd->second.p; - _pfd.push_back( newfd ); - } - catch ( ... ) { - } - } - if ( _pfd[1].revents != 0 ) { // fd come back for poll - pollfd rfd; - ::read( _pfd[1].fd, reinterpret_cast<char *>(&rfd.fd), sizeof(sock_base::socket_type) ); - rfd.events = POLLIN; - rfd.revents = 0; + descr.erase( ifd ); - _pfd.push_back( rfd ); + int lfd = check_closed_listener( p ); + + if ( lfd != -1 ) { + descr.erase( lfd ); } - } while ( !_shift_fd() && !_in_buf ); - - return true; // something was pushed in connection queue (by _shift_fd) -} -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -bool sockmgr_stream_MP<Connect,C,T>::accept_udp() -{ - if ( !is_open() ) { - return false; + // dump_descr(); + + return; } - _xsockaddr addr; + if ( (ev.events & EPOLLIN) == 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << std::hex << ev.events << std::dec << std::endl; + return; // I don't know what to do this case... + } + + sockaddr addr; socklen_t sz = sizeof( sockaddr_in ); - bool _in_buf; - // Problem here: - // if I see event on pfd[1], I should set fd_in_work = 1 and process it below in loop; - // but if no event on pfd[1], I don't really know wether pfd[1] polling in - // connect_processor or not; size of _conn_pool don't help here too ... + fd_info info = ifd->second; - // Hmm, but not all so bad: if I will see event on pfd[0] here, I just - // add SAME iterator to _conn_pool, and hope that observer process it accurate... - - int pret = poll( &_pfd[1], 1, 1 ); // timeout as short as possible - int fd_in_work = pret == 0 ? 0 : 1; - // int fd_in_work = 0; - - do { - _in_buf = false; - _pfd[0].revents = 0; - _pfd[1].revents = 0; - while ( poll( &_pfd[0 + fd_in_work], /* _pfd.size() */ 2 - fd_in_work, -1 ) < 0 ) { // wait infinite - if ( errno == EINTR ) { // may be interrupted, check and ignore + for ( ; ; ) { + int fd = accept( ev.data.fd, &addr, &sz ); + if ( fd < 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ /* << " " << std::tr2::getpid() */ << std::endl; + if ( (errno == EINTR) || (errno == ECONNABORTED) /* || (errno == ERESTARTSYS) */ ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; errno = 0; continue; } - return false; // poll wait infinite, so it can't return 0 (timeout), so it return -1. - } + if ( !(errno == EAGAIN /* || errno == EWOULDBLOCK */ ) ) { // EWOULDBLOCK == EAGAIN + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " + << errno << std::endl; + // throw system_error + } - // New connction open before data read from opened sockets. - // This policy may be worth to revise. - - if ( (_pfd[0].revents & POLLERR) != 0 /* || (_pfd[1].revents & POLLERR) != 0 */ ) { - return false; // return 0 only for binded socket, or control socket - } - - if ( _pfd[0].revents != 0 ) { - xmt::scoped_lock lk(_fd_lck); - if ( !is_open_unsafe() ) { // may be already closed - return false; - } - try { - xmt::scoped_lock _l( _c_lock ); - // - if ( _M_c.empty() ) { - _M_c.push_back( _Connect() ); - // poll found event on binded socket - // to fill addr.any only, for _M_c.back().open() call - char buff[1]; - ::recvfrom( fd_unsafe(), buff, 1, MSG_PEEK, &addr.any, &sz ); - _M_c.back().open( fd_unsafe(), addr.any, sock_base::sock_dgram ); - _Connect *cl_new = &_M_c.back(); - if ( cl_new->s.rdbuf()->in_avail() > 0 ) { - // this is the case when user read from sockstream - // in ctor above; push processing of this stream - xmt::scoped_lock lk(_dlock); - _conn_pool.push_back( _M_c.begin() ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _in_buf = true; - fd_in_work = 1; + if ( ifd->second.p != 0 ) { + ifd->second.p->close(); + for ( typename fd_container_type::iterator i = descr.begin(); i != descr.end(); ++i ) { + if ( (i->second.p == ifd->second.p) && (i->second.b != 0) ) { + i->second.b->shutdown( sock_base::stop_in | sock_base::stop_out ); + } } - } else { // normal data available for reading - xmt::scoped_lock lk(_dlock); - _conn_pool.push_back( _M_c.begin() ); - // xmt::Thread::gettime( &_tpush ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _in_buf = true; - fd_in_work = 1; } - // if addr.any pesent in _M_c - // typename container_type::iterator i = - // find_if( _M_c.begin(), _M_c.end(), bind2nd( _M_comp_inet, addr.any ) ); - // if ( i == _M_c.end() ) { - // _M_c.push_back( _Connect() ); - // _M_c.back().open( fd(), addr.any, sock_base::sock_dgram ); - // _Connect *cl_new = &_M_c.back(); - // } - // - // ... - // - } - catch ( ... ) { - } - } - if ( _pfd[1].revents != 0 ) { // fd come back for poll - // really not used (i.e. this is fd()), but we need to read it from pipe - sock_base::socket_type _xfd; - ::read( _pfd[1].fd, reinterpret_cast<char *>(&_xfd), sizeof(sock_base::socket_type) ); - fd_in_work = 0; - } - } while ( /* !_shift_fd() && */ !_in_buf ); - return true; -} + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -void sockmgr_stream_MP<Connect,C,T>::_close_by_signal( int ) -{ -#ifdef _PTHREADS - void *_uw_save = *((void **)pthread_getspecific( xmt::Thread::mtkey() ) + _idx ); - _Self_type *me = static_cast<_Self_type *>( _uw_save ); + socks_processor_t* p = ifd->second.p; + listeners_final.insert( static_cast<void *>(p) ); - me->close(); -#else -#error "Fix me!" -#endif -} + descr.erase( ifd ); -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -xmt::Thread::ret_t sockmgr_stream_MP<Connect,C,T>::loop( void *p ) -{ - _Self_type *me = static_cast<_Self_type *>(p); - me->loop_thr.pword( _idx ) = me; // push pointer to self for signal processing - xmt::Thread::ret_t rtc = 0; - xmt::Thread::ret_t rtc_observer = 0; + check_closed_listener( p ); - xmt::Thread thr_observer; - - try { - me->_loop_cnd.set( true ); - - me->_follow = true; - - while ( (me->*me->_accept)() /* && me->_is_follow() */ ) { - if ( thr_observer.bad() ) { - if ( thr_observer.is_join_req() ) { - rtc_observer = thr_observer.join(); - if ( rtc_observer != 0 ) { - rtc = reinterpret_cast<xmt::Thread::ret_t>(-2); // there was connect_processor that was killed - } + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // dump_descr(); + } else { // back to listen + errno = 0; + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + xev.data.fd = ev.data.fd; + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " + << errno << std::endl; } - thr_observer.launch( observer, me, 0, PTHREAD_STACK_MIN * 2 ); } + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + return; } - } - catch ( ... ) { - rtc = reinterpret_cast<xmt::Thread::ret_t>(-1); - } + if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + throw std::runtime_error( "can't establish nonblock mode" ); + } + + try { + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = fd; - xmt::block_signal( SIGINT ); - xmt::block_signal( SIGPIPE ); - xmt::block_signal( SIGCHLD ); - xmt::block_signal( SIGPOLL ); + if ( descr.find( fd ) != descr.end() ) { // reuse? + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_MOD, fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << " " << errno << std::endl; + descr.erase( fd ); + // throw system_error + return; // throw + } + } else { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { + // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << " " << errno << std::endl; + return; // throw + } + } - me->_dlock.lock(); - me->_follow = false; - me->_pool_cnd.set( true, true ); - me->_observer_cnd.set( true ); - me->_dlock.unlock(); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + sockbuf_t* b = (*info.p)( fd, addr ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - // me->_c_lock.lock(); - ::close( me->_cfd ); - ::close( me->_pfd[1].fd ); - me->close(); - // me->_c_lock.unlock(); - rtc_observer = thr_observer.join(); - - xmt::scoped_lock _l( me->_c_lock ); - me->_M_c.clear(); // FIN still may not come yet; force close - if ( rtc_observer != 0 && rtc == 0 ) { - rtc = reinterpret_cast<xmt::Thread::ret_t>(-2); // there was connect_processor that was killed + descr[fd] = fd_info( b, info.p ); + } + catch ( const std::bad_alloc& ) { + // nothing + descr.erase( fd ); + } + catch ( ... ) { + descr.erase( fd ); + } } - - return rtc; } -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -xmt::Thread::ret_t sockmgr_stream_MP<Connect,C,T>::connect_processor( void *p ) +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::process_regular( epoll_event& ev, typename sockmgr<charT,traits,_Alloc>::fd_container_type::iterator ifd ) { - _Self_type *me = static_cast<_Self_type *>(p); + fd_info& info = ifd->second; - try { - timespec idle( me->_idle ); - typename _Sequence::iterator c; + sockbuf_t* b = info.b; + if ( b == 0 ) { // marginal case: sockbuf wasn't created by processor... + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; + // throw system_error + } + if ( info.p != 0 ) { // ... but controlled by processor + (*info.p)( ifd->first, typename socks_processor_t::adopt_close_t() ); - { - xmt::scoped_lock lk(me->_dlock); - if ( me->_conn_pool.empty() ) { - me->_pool_cnd.set( false ); - me->_observer_cnd.set( false ); - return 0; + socks_processor_t* p = info.p; + descr.erase( ifd ); + int lfd = check_closed_listener( p ); + if ( lfd != -1 ) { + descr.erase( lfd ); } - c = me->_conn_pool.front(); - me->_conn_pool.pop_front(); - xmt::gettime( &me->_tpop ); + } else { + descr.erase( ifd ); } + return; + } - do { - sockstream& stream = c->s; - if ( stream.is_open() ) { - (c->_proc->*C)( stream ); - if ( stream.is_open() && stream.good() ) { - if ( stream.rdbuf()->in_avail() > 0 ) { - // socket has buffered data, push it back to queue - xmt::scoped_lock lk(me->_dlock); - me->_conn_pool.push_back( c ); - me->_observer_cnd.set( true ); - me->_pool_cnd.set( true ); - if ( !me->_follow ) { - break; - } - c = me->_conn_pool.front(); - me->_conn_pool.pop_front(); - xmt::gettime( &me->_tpop ); - // xmt::Thread::gettime( &me->_tpush ); - continue; - } else { // no buffered data, return socket to poll - sock_base::socket_type rfd = stream.rdbuf()->fd(); - ::write( me->_cfd, reinterpret_cast<const char *>(&rfd), sizeof(sock_base::socket_type) ); - } - } else { - me->_dlock.lock(); - me->_conn_pool.erase( std::remove( me->_conn_pool.begin(), me->_conn_pool.end(), c ), me->_conn_pool.end() ); - me->_dlock.unlock(); + errno = 0; - xmt::scoped_lock _l( me->_c_lock ); - me->_M_c.erase( c ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + if ( ev.events & EPOLLIN ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + for ( ; ; ) { + if ( b->_ebuf == b->egptr() ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // process extract data from buffer too slow for us! + if ( (info.flags & fd_info::level_triggered) == 0 ) { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + xev.data.fd = ev.data.fd; + info.flags |= fd_info::level_triggered; + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; + } } + if ( info.p != 0 ) { + (*info.p)( ev.data.fd ); + } + break; } - { - xmt::scoped_lock lk(me->_dlock); - if ( me->_conn_pool.empty() ) { - lk.unlock(); - if ( me->_pool_cnd.try_wait_delay( &idle ) != 0 ) { - lk.lock(); - me->_pool_cnd.set( false ); - me->_observer_cnd.set( false ); - return 0; + long offset = read( ev.data.fd, b->egptr(), sizeof(charT) * (b->_ebuf - b->egptr()) ); + + if ( offset < 0 ) { + switch ( errno ) { + case EINTR: // read was interrupted + errno = 0; + continue; + break; + case EFAULT: // Bad address + case ECONNRESET: // Connection reset by peer + errno = 0; + ev.events |= EPOLLRDHUP; // will be processed below + break; + case EAGAIN: + // case EWOULDBLOCK: + errno = 0; + { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + xev.data.fd = ev.data.fd; + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; + } + } + break; + default: + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + break; + } + break; + } else if ( offset > 0 ) { + offset /= sizeof(charT); // if offset % sizeof(charT) != 0, rest will be lost! + + if ( info.flags & fd_info::level_triggered ) { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + xev.data.fd = ev.data.fd; + info.flags &= ~static_cast<unsigned>(fd_info::level_triggered); + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl; } - if ( !me->_is_follow() ) { // before _conn_pool.front() - return 0; - } - lk.lock(); - if ( me->_conn_pool.empty() ) { - return 0; - } } - c = me->_conn_pool.front(); - me->_conn_pool.pop_front(); - xmt::gettime( &me->_tpop ); + std::tr2::lock_guard<std::tr2::mutex> lk( b->ulck ); + b->setg( b->eback(), b->gptr(), b->egptr() + offset ); + b->ucnd.notify_one(); + if ( info.p != 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << std::endl; + (*info.p)( ev.data.fd ); + } + } else { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << std::endl; + // EPOLLRDHUP may be missed in kernel, but offset 0 is the same + ev.events |= EPOLLRDHUP; // will be processed below + break; } - } while ( me->_is_follow() ); + } } - catch ( ... ) { - return reinterpret_cast<xmt::Thread::ret_t>(-1); - } - return 0; -} + if ( (ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) != 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; + } -template <class Connect, void (Connect::*C)( std::sockstream& ), void (Connect::*T)() > -xmt::Thread::ret_t sockmgr_stream_MP<Connect,C,T>::observer( void *p ) -{ - _Self_type *me = static_cast<_Self_type *>(p); - xmt::Thread::ret_t rtc = 0; - int pool_size[3]; - // size_t thr_pool_size[3]; - timespec tpop; - timespec delta( me->_busylimit ); - timespec alarm( me->_alarm ); + if ( info.p != 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + (*info.p)( ifd->first, typename socks_processor_t::adopt_close_t() ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - timespec idle( me->_idle ); + socks_processor_t* p = info.p; - timespec now; - std::fill( pool_size, pool_size + 3, 0 ); + descr.erase( ifd ); - try { - xmt::ThreadMgr mgr; + int lfd = check_closed_listener( p ); + if ( lfd != -1 ) { + descr.erase( lfd ); + } + } else { + b->_notify_close = false; // avoid deadlock + // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; + } + descr.erase( ifd ); + b->close(); + } + // dump_descr(); + } + // if ( ev.events & EPOLLHUP ) { + // std::cerr << "Poll HUP" << std::endl; + // } + // if ( ev.events & EPOLLERR ) { + // std::cerr << "Poll ERR" << std::endl; + // } + if ( ev.events & EPOLLPRI ) { + std::cerr << "Poll PRI" << std::endl; + } + if ( ev.events & EPOLLRDNORM ) { + std::cerr << "Poll RDNORM" << std::endl; + } + if ( ev.events & EPOLLRDBAND ) { + std::cerr << "Poll RDBAND" << std::endl; + } + if ( ev.events & EPOLLMSG ) { + std::cerr << "Poll MSG" << std::endl; + } +} - mgr.launch( connect_processor, me /* , 0, 0, PTHREAD_STACK_MIN * 2 */ ); +template<class charT, class traits, class _Alloc> +int sockmgr<charT,traits,_Alloc>::check_closed_listener( socks_processor_t* p ) +{ + int myfd = -1; - do { - // std::swap( pool_size[0], pool_size[1] ); - std::rotate( pool_size, pool_size, pool_size + 3 ); - { - xmt::scoped_lock lk(me->_dlock); - pool_size[2] = static_cast<int>(me->_conn_pool.size()); - tpop = me->_tpop; - } - if ( pool_size[2] != 0 ) { - if ( me->_thr_limit > mgr.size() ) { - if ( (pool_size[0] - 2 * pool_size[1] + pool_size[2]) > 0 || - pool_size[2] > me->_thr_limit - /* pool_size[1] > 3 && pool_size[0] <= pool_size[1] */ ) { - // queue not empty and not decrease - mgr.launch( connect_processor, me /* , 0, 0, PTHREAD_STACK_MIN * 2 */ ); - } else { - xmt::gettime( &now ); - if ( (tpop + delta) < now ) { - // a long time was since last pop from queue - mgr.launch( connect_processor, me /* , 0, 0, PTHREAD_STACK_MIN * 2 */ ); - } + if ( !listeners_final.empty() ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + if ( listeners_final.find( static_cast<void*>(p) ) != listeners_final.end() ) { + for ( typename fd_container_type::iterator i = descr.begin(); i != descr.end(); ++i ) { + if ( i->second.p == p ) { + if ( (i->second.flags & fd_info::listener) == 0 ) { // it's not me! + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + return -1; } + myfd = i->first; + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; } - mgr.garbage_collector(); - xmt::delay( &alarm ); - } else { - if ( /* me->_is_follow() && */ me->_observer_cnd.try_wait_delay( &idle ) != 0 && mgr.size() == 0 ) { - return rtc; - } } - } while ( me->_is_follow() ); + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - int count = 24; - while ( mgr.size() > 0 && count > 0 ) { - me->_pool_cnd.set( true, true ); - xmt::delay( &alarm ); - alarm *= 1.2; - --count; + // no more connection with this listener + listeners_final.erase( static_cast<void*>(p) ); + + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + + // if ( myfd != -1 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + p->stop(); + // } } - if ( mgr.size() > 0 ) { - mgr.signal( SIGTERM ); - rtc = reinterpret_cast<xmt::Thread::ret_t>(-1); - } } - catch ( ... ) { - rtc = reinterpret_cast<xmt::Thread::ret_t>(-1); - } - return rtc; + return myfd; } -#endif // !__FIT_NO_POLL +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::dump_descr() +{ + for ( typename fd_container_type::iterator i = descr.begin(); i != descr.end(); ++i ) { + std::cerr << i->first << " " + << std::hex + << i->second.flags + << " " + << (void*)i->second.b + << " " + << (void*)i->second.p + << std::dec + << endl; + } +} -#ifdef STLPORT -_STLP_END_NAMESPACE -#else + +} // namespace detail + } // namespace std -#endif Modified: trunk/complement/explore/include/sockios/sockmgr.h =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.h 2008-06-27 10:28:54 UTC (rev 1923) +++ trunk/complement/explore/include/sockios/sockmgr.h 2008-06-27 11:20:41 UTC (rev 1924) @@ -1,351 +1,350 @@ -// -*- C++ -*- Time-stamp: <07/09/06 21:23:52 ptr> +// -*- C++ -*- Time-stamp: <08/06/27 00:50:33 ptr> /* - * Copyright (c) 1997-1999, 2002, 2003, 2005-2007 + * Copyright (c) 2008 * Petr Ovtchenkov * - * Portion Copyright (c) 1999-2001 - * Parallel Graphics Ltd. - * * Licensed under the Academic Free License Version 3.0 * */ -#ifndef __SOCKMGR_H -#define __SOCKMGR_H +#ifndef __SOCKIOS_SOCKMGR_H +#define __SOCKIOS_SOCKMGR_H -#ifndef __config_feature_h -#include <config/feature.h> -#endif +#include <sys/epoll.h> -#ifndef __SOCKSTREAM__ -#include <sockios/sockstream> +#ifndef EPOLLRDHUP +# define EPOLLRDHUP 0x2000 #endif -#include <vector> -#include <deque> +#include <fcntl.h> + #include <cerrno> +#include <mt/thread> +#include <mt/mutex> +#include <mt/condition_variable> -#ifndef __XMT_H -#include <mt/xmt.h> -#endif +#include <sockios/socksrv.h> -#ifndef __THR_MGR_H -#include <mt/thr_mgr.h> +#ifdef STLPORT +# include <unordered_map> +# include <unordered_set> +// # include <hash_map> +// # include <hash_set> +// # define __USE_STLPORT_HASH +# define __USE_STLPORT_TR1 +#else +# if defined(__GNUC__) && (__GNUC__ < 4) +# include <ext/hash_map> +# include <ext/hash_set> +# define __USE_STD_HASH +# else +# include <tr1/unordered_map> +# include <tr1/unordered_set> +# define __USE_STD_TR1 +# endif #endif -#ifdef __unix -#include <poll.h> -#endif +#include <sockios/sockstream> +#include <deque> +#include <functional> -#ifdef STLPORT -_STLP_BEGIN_NAMESPACE -#else namespace std { -#endif -union _xsockaddr { - sockaddr_in inet; - sockaddr any; +template <class charT, class traits, class _Alloc> class basic_sockbuf; +template <class charT, class traits, class _Alloc> class basic_sockstream; +template <class charT, class traits, class _Alloc> class sock_processor_base; + + +template <class charT, class traits, class _Alloc> class basic_sockbuf; + +namespace detail { + +class stop_request : + public std::exception +{ + public: + virtual const char* what() throw() + { return "sockmgr receive stop reqest"; } }; -class basic_sockmgr : - public sock_base +template<class charT, class traits, class _Alloc> +class sockmgr { private: - class Init - { - public: - Init(); - ~Init(); - private: - static void _guard( int ); - static void __at_fork_prepare(); - static void __at_fork_child(); - static void __at_fork_parent(); + typedef basic_sockstream<charT,traits,_Alloc> sockstream_t; + typedef basic_sockbuf<charT,traits,_Alloc> sockbuf_t; + typedef sock_processor_base<charT,traits,_Alloc> socks_processor_t; + + enum { + listener, + tcp_buffer, + rqstop, + rqstart, + listener_on_exit }; - public: - basic_sockmgr(); - virtual ~basic_sockmgr(); + struct fd_info + { + enum { + listener = 0x1, + level_triggered = 0x2 + }; - protected: - __FIT_DECLSPEC void open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ); - __FIT_DECLSPEC void open( unsigned long addr, int port, sock_base::stype type, sock_base::protocol prot ); - __FIT_DECLSPEC void open( int port, sock_base::stype type, sock_base::protocol prot ); + fd_info() : + flags(0U), + b(0), + p(0) + { } - virtual __FIT_DECLSPEC void close(); - bool is_open_unsafe() const - { return _fd != -1; } - sock_base::socket_type fd_unsafe() const - { return _fd; } - __FIT_DECLSPEC - void setoptions_unsafe( sock_base::so_t optname, bool on_off = true, - int __v = 0 ); + fd_info( unsigned f, sockbuf_t* buf, socks_processor_t* proc ) : + flags(f), + b(buf), + p(proc) + { } - public: - bool is_open() const - { xmt::scoped_lock lk(_fd_lck); return is_open_unsafe(); } - bool good() const - { return _state == ios_base::goodbit; } + fd_info( sockbuf_t* buf, socks_processor_t* proc ) : + flags(0U), + b(buf), + p(proc) + { } - sock_base::socket_type fd() const - { xmt::scoped_lock lk(_fd_lck); sock_base::socket_type tmp = fd_unsafe(); return tmp; } + fd_info( sockbuf_t* buf ) : + flags(0U), + b(buf), + p(0) + { } - __FIT_DECLSPEC - void shutdown( sock_base::shutdownflg dir ); - void setoptions( sock_base::so_t optname, bool on_off = true, - int __v = 0 ) - { - xmt::scoped_lock lk(_fd_lck); - setoptions_unsafe( optname, on_off, __v ); - } + fd_info( socks_processor_t* proc ) : + flags(listener), + b(0), + p(proc) + { } - private: - basic_sockmgr( const basic_sockmgr& ); - basic_sockmgr& operator =( const basic_sockmgr& ); + fd_info( const fd_info& info ) : + flags( info.flags ), + b( info.b ), + p( info.p ) + { } - private: - sock_base::socket_type _fd; // master socket - unsigned long _mode; // open mode - unsigned long _state; // state flags - int _errno; // error state - _xsockaddr _address; + unsigned flags; + sockbuf_t* b; + socks_processor_t* p; + }; - protected: - static int _idx; - friend class Init; + struct ctl + { + int cmd; + union { + sock_base::socket_type fd; + void *ptr; + } data; + }; - protected: - xmt::mutex _fd_lck; - xmt::condition _loop_cnd; -}; + static void _loop( sockmgr *me ) + { me->io_worker(); } -class ConnectionProcessorTemplate_MP // As reference -{ public: - ConnectionProcessorTemplate_MP( std::sockstream& ) - { } + sockmgr( int hint = 1024, int ret = 512 ) : + n_ret( ret ) + { + efd = epoll_create( hint ); + if ( efd < 0 ) { + // throw system_error( errno ) + throw std::runtime_error( "epoll_create" ); + } + if ( pipe( pipefd ) < 0 ) { // check err + ::close( efd ); + // throw system_error; + throw std::runtime_error( "pipe" ); + } + // cfd = pipefd[1]; - void connect( std::sockstream& ) - { } - void close() - { } -}; + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + ev_add.data.fd = pipefd[0]; + epoll_ctl( efd, EPOLL_CTL_ADD, pipefd[0], &ev_add ); -#ifndef __FIT_NO_POLL + _worker = new std::tr2::thread( _loop, this ); -template <class Connect, void (Connect::*C)( std::sockstream& ) = &Connect::connect, void (Connect::*T)() = &Connect::close > -class sockmgr_stream_MP : - public basic_sockmgr -{ - public: - sockmgr_stream_MP() : - basic_sockmgr(), - _thr_limit( 31 ) - { - _busylimit.tv_sec = 0; - _busylimit.tv_nsec = 90000000; // i.e 0.09 sec - _alarm.tv_sec = 0; - _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 10; - _idle.tv_nsec = 0; // i.e 10 sec + // ctl _ctl; + // _ctl.cmd = rqstart; + + // write( pipefd[1], &_ctl, sizeof(ctl) ); } - explicit sockmgr_stream_MP( const in_addr& addr, int port, sock_base::stype t = sock_base::sock_stream ) : - basic_sockmgr(), - _thr_limit( 31 ) + ~sockmgr() { - open( addr, port, t ); - _busylimit.tv_sec = 0; - _busylimit.tv_nsec = 90000000; // i.e 0.09 sec - _alarm.tv_sec = 0; - _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 10; - _idle.tv_nsec = 0; // i.e 10 sec + if ( _worker->joinable() ) { + ctl _ctl; + _ctl.cmd = rqstop; + _ctl.data.ptr = 0; + + ::write( pipefd[1], &_ctl, sizeof(ctl) ); + + _worker->join(); + } + ::close( pipefd[1] ); + ::close( pipefd[0] ); + ::close( efd ); + delete _worker; } - explicit sockmgr_stream_MP( unsigned long addr, int port, sock_base::stype t = sock_base::sock_stream ) : - basic_sockmgr(), - _thr_limit( 31 ) + void push( socks_processor_t& p ) { - open( addr, port, t ); - _busylimit.tv_sec = 0; - _busylimit.tv_nsec = 90000000; // i.e 0.09 sec - _alarm.tv_sec = 0; - _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 10; - _idle.tv_nsec = 0; // i.e 10 sec + ctl _ctl; + _ctl.cmd = listener; + _ctl.data.ptr = static_cast<void *>(&p); + + int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); + if ( r < 0 || r != sizeof(ctl) ) { + throw std::runtime_error( "can't write to pipe" ); + } } - explicit sockmgr_stream_MP( int port, sock_base::stype t = sock_base::sock_stream ) : - basic_sockmgr(), - _thr_limit( 31 ) + void push( sockbuf_t& s ) { - open( port, t ); - _busylimit.tv_sec = 0; - _busylimit.tv_nsec = 50000000; // i.e 0.05 sec - _alarm.tv_sec = 0; - _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 10; - _idle.tv_nsec = 0; // i.e 10 sec + ctl _ctl; + _ctl.cmd = tcp_buffer; + _ctl.data.ptr = static_cast<void *>(&s); + + errno = 0; + int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); + if ( r < 0 || r != sizeof(ctl) ) { + throw std::runtime_error( "can't write to pipe" ); + } } - ~sockmgr_stream_MP() - { loop_thr.join(); } + void pop( socks_processor_t& p, sock_base::socket_type _fd ) + { + ctl _ctl; + _ctl.cmd = listener_on_exit; + _ctl.data.ptr = reinterpret_cast<void *>(&p); - private: - sockmgr_stream_MP( const sockmgr_stream_MP<Connect,C,T>& ); - sockmgr_stream_MP<Connect,C,T>& operator =( const sockmgr_stream_MP<Connect,C,T>& ); + int r = ::write( pipefd[1], &_ctl, sizeof(ctl) ); + if ( r < 0 || r != sizeof(ctl) ) { + throw std::runtime_error( "can't write to pipe" ); + } + } - public: - void open( const in_addr& addr, int port, sock_base::stype t = sock_base::sock_stream ); - void open( unsigned long addr, int port, sock_base::stype t = sock_base::sock_stream ); - void open( int port, sock_base::stype t = sock_base::sock_stream ); + void exit_notify( sockbuf_t* b, sock_base::socket_type fd ) + { + try { + std::tr2::unique_lock<std::tr2::mutex> lk( dll, std::tr2::defer_lock ); - virtual void close() - { basic_sockmgr::close(); } + if ( lk.try_lock() ) { + if ( b->_notify_close ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + typename fd_container_type::iterator i = descr.find( fd ); + if ( (i != descr.end()) && (i->second.b == b) && (i->second.p == 0) ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, fd, 0 ) < 0 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + // throw system_error + } + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + descr.erase( i ); + } + b->_notify_close = false; + } + } + } + catch ( const std::tr2::lock_error& ) { + // std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + } + } - void wait() - { loop_thr.join(); } - - void detach( sockstream& ) // remove sockstream from polling in manager + private: + sockmgr( const sockmgr& ) { } + sockmgr& operator =( const sockmgr& ) + { return *this; } - void setbusytime( const timespec& t ) - { _busylimit = t; } + int check_closed_listener( socks_processor_t* p ); + void dump_descr(); - void setobservertime( const timespec& t ) - { _alarm = t; } +#ifdef __USE_STLPORT_HASH + typedef std::hash_map<sock_base::socket_type,fd_info> fd_container_type; + typedef std::hash_set<void *> listener_container_type; +#endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_map<sock_base::socket_type, fd_info> fd_container_type; + typedef __gnu_cxx::hash_set<void *> listener_container_type; +#endif +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) + typedef std::tr1::unordered_map<sock_base::socket_type, fd_info> fd_container_type; + typedef std::tr1::unordered_set<void *> listener_container_type; +#endif - void setidletime( const timespec& t ) - { _idle = t; } + void io_worker(); + void cmd_from_pipe(); + void process_listener( epoll_event&, typename fd_container_type::iterator ); + void process_regular( epoll_event&, typename fd_container_type::iterator ); - void setthreadlimit( unsigned lim ) - { _thr_limit = std::max( 3U, lim ) - 1; } + int efd; + int pipefd[2]; + std::tr2::thread *_worker; + const int n_ret; - protected: + fd_container_type descr; + listener_container_type listeners_final; + std::tr2::mutex dll; +}; - struct _Connect - { - _Connect() : - _proc( 0 ) - { } +} //detail - _Connect( const _Connect& ) : - _proc( 0 ) - { } +} // namesapce std - ~_Connect() - { if ( _proc ) { s.close(); (_proc->*T)(); delete _proc; _proc = 0; } } +#if defined(__USE_STLPORT_HASH) || defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) +# define __HASH_NAMESPACE std +#endif +#if defined(__USE_STD_HASH) +# define __HASH_NAMESPACE __gnu_cxx +#endif - void open( sock_base::socket_type st, const sockaddr& addr, sock_base::stype t = sock_base::sock_stream ) - { s.open( st, addr, t ); _proc = new Connect( s ); } +namespace __HASH_NAMESPACE { - sockstream s; - Connect *_proc; - }; +#ifdef __USE_STD_TR1 +namespace tr1 { +#endif - typedef std::vector<pollfd> _fd_sequence; - typedef std::list<_Connect> _Sequence; - typedef std::deque<typename _Sequence::iterator> _connect_pool_sequence; +template <class charT, class traits, class _Alloc> +struct hash<std::basic_sockstream<charT, traits, _Alloc>* > +{ + size_t operator()(const std::basic_sockstream<charT, traits, _Alloc>* __x) const + { return reinterpret_cast<size_t>(__x); } +}; - void _open( sock_base::stype t = sock_base::sock_stream ); - static xmt::Thread::ret_t loop( void * ); - static xmt::Thread::ret_t connect_processor( void * ); - static xmt::Thread::ret_t observer( void * ); - - struct fd_equal : - public std::binary_function<_Connect,int,bool> - { - bool operator()(const _Connect& __x, int __y) const - { return __x.s.rdbuf()->fd() == __y; } - }; - - struct iaddr_equal : - public std::binary_function<_Connect,sockaddr,bool> - { - bool operator()(const _Connect& __x, const sockaddr& __y) const - { return memcmp( &(__x.s.rdbuf()->inet_sockaddr()), reinterpret_cast<const sockaddr_in *>(&__y), sizeof(sockaddr_in) ) == 0; } - }; - - struct pfd_equal : - public std::binary_function<typename _fd_sequence::value_type,int,bool> - { - bool operator()(const typename _fd_sequence::value_type& __x, int __y) const - { return __x.fd == __y; } - }; - - typedef bool (sockmgr_stream_MP<Connect,C,T>::*accept_type)(); - -#if 0 - accept_type _accept; - _Connect *accept() // workaround for CC - { return (this->*_accept)(); } -#else - accept_type _accept; +#ifdef __USE_STD_TR1 +} #endif - bool accept_tcp(); - bool accept_udp(); - private: - xmt::Thread loop_thr; - - protected: - typedef sockmgr_stream_MP<Connect,C,T> _Self_type; - typedef fd_equal _Compare; - typedef iaddr_equal _Compare_inet; - typedef typename _Sequence::value_type value_type; - typedef typename _Sequence::size_type size_type; - typedef _Sequence container_type; - - typedef typename _Sequence::reference reference; - typedef typename _Sequence::const_reference const_reference; - - _Sequence _M_c; - _Compare _M_comp; - _Compare_inet _M_comp_inet; - pfd_equal _pfdcomp; - xmt::mutex _c_lock; - - _fd_sequence _pfd; - int _cfd; // sock_base::socket_type - _connect_pool_sequence _conn_pool; - xmt::condition _pool_cnd; - xmt::mutex _dlock; - timespec _tpop; - - xmt::mutex _flock; - bool _follow; - - xmt::condition _observer_cnd; - timespec _busylimit; // start new thread to process incoming - // requests, if processing thread busy - // more then _busylimit - timespec _alarm; // check and make decision about start - // new thread with _alarm interval - timespec _idle; // do nothing _idle time before thread - // terminate - - unsigned _thr_limit; - - private: - bool _shift_fd(); - static void _close_by_signal( int ); - bool _is_follow() const - { xmt::scoped_lock lk( _flock ); bool tmp = _follow; return tmp; } +#if defined(__GNUC__) && (__GNUC__ < 4) +template<> +struct hash<void *> +{ + size_t operator()(const void *__x) const + { return reinterpret_cast<size_t>(__x); } }; +#endif // __GNUC__ < 4 -#endif // !__FIT_NO_POLL +} // namespace __HASH_NAMESPACE -#ifdef STLPORT -_STLP_END_NAMESPACE -#else -} // namespace std +#undef __HASH_NAMESPACE + +#ifdef __USE_STLPORT_HASH +# undef __USE_STLPORT_HASH #endif +#ifdef __USE_STD_HASH +# undef __USE_STD_HASH +#endif +#ifdef __USE_STLPORT_TR1 +# undef __USE_STLPORT_TR1 +#endif +#ifdef __USE_STD_TR1 +# undef __USE_STD_TR1 +#endif -#ifndef __STL_LINK_TIME_INSTANTIATION #include <sockios/sockmgr.cc> -#endif -#endif // __SOCKMGR_H +#endif /* __SOCKIOS_SOCKMGR_H */ Added: trunk/complement/explore/include/sockios/socksrv.cc =================================================================== --- trunk/complement/explore/include/sockios/socksrv.cc (rev 0) +++ trunk/complement/explore/include/sockios/socksrv.cc 2008-06-27 11:20:41 UTC (rev 1924) @@ -0,0 +1,341 @@ +// -*- C++ -*- Time-stamp: <08/06/26 09:00:54 ptr> + +/* + * Copyright (c) 2008 + * Petr Ovtchenkov + * + * Licensed under the Academic Free License Version 3.0 + * + */ + +namespace std { + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ) +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + return; + } + _mode = ios_base::in | ios_base::out; + _state = ios_base::goodbit; +#ifdef WIN32 + ::WSASetLastError( 0 ); +#endif + if ( prot == sock_base::inet ) { + basic_socket_t::_fd = socket( PF_INET, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + _state |= ios_base::failbit | ios_base::badbit; + return; + } + // _open = true; + basic_socket_t::_address.inet.sin_family = AF_INET; + basic_socket_t::_address.inet.sin_port = htons( port ); + basic_socket_t::_address.inet.sin_addr.s_addr = addr.s_addr; + + if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { + // let's try reuse local address + setoptions_unsafe( sock_base::so_reuseaddr, true ); + } + + if ( ::bind( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof(basic_socket_t::_address) ) == -1 ) { + _state |= ios_base::failbit; +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return; + } + + if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { + // I am shure, this is socket of type SOCK_STREAM | SOCK_SEQPACKET, + // so don't check return code from listen + ::listen( basic_socket_t::_fd, SOMAXCONN ); + basic_socket_t::mgr->push( *this ); + } + } else if ( prot == sock_base::local ) { + return; + } else { + return; + } + _state = ios_base::goodbit; + + return; +} + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::_close() +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( !basic_socket_t::is_open_unsafe() ) { + return; + } + // std::cerr << __FILE__ << ":" << __LINE__ << " " << basic_socket_t::_fd << std::endl; + +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::shutdown( basic_socket_t::_fd, 2 ); + ::close( basic_socket_t::_fd ); +#endif + basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); + basic_socket_t::_fd = -1; +} + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base::shutdownflg dir ) +{ + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + if ( (dir & ... [truncated message content] |