[complement-svn] SF.net SVN: complement: [1918] branches/complement-sockios/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-06-26 05:46:45
|
Revision: 1918 http://complement.svn.sourceforge.net/complement/?rev=1918&view=rev Author: complement Date: 2008-06-25 22:46:44 -0700 (Wed, 25 Jun 2008) Log Message: ----------- Seems problem in accepting connection after processor ready to exit. Problem coupled with fact, that connection_processor and client socket streams are in single process, but in different thread. This may lead to sequence: - connection_processor queue empty, it ready to exit - accept or epoll signal about incoming connection and connection_processor should process it Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/socksrv.cc branches/complement-sockios/explore/include/sockios/socksrv.h branches/complement-sockios/explore/include/sockios/sockstream.cc branches/complement-sockios/explore/include/sockios/sp.cc branches/complement-sockios/explore/include/sockios/sp.h branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc Modified: branches/complement-sockios/explore/include/sockios/socksrv.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.cc 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/include/sockios/socksrv.cc 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/24 18:34:58 yeti> +// -*- C++ -*- Time-stamp: <08/06/25 21:28:07 yeti> /* * Copyright (c) 2008 @@ -66,13 +66,13 @@ } template<class charT, class traits, class _Alloc> -void sock_processor_base<charT,traits,_Alloc>::close() +void sock_processor_base<charT,traits,_Alloc>::_close() { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); if ( !basic_socket_t::is_open_unsafe() ) { return; } - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << basic_socket_t::_fd << std::endl; #ifdef WIN32 ::closesocket( basic_socket_t::_fd ); @@ -187,10 +187,10 @@ char connect_processor<Connect, charT, traits, _Alloc, C>::Init_buf[128]; template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::close() +void connect_processor<Connect, charT, traits, _Alloc, C>::_close() { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - base_t::close(); + base_t::_close(); #if 0 { @@ -218,14 +218,16 @@ Connect* c = new Connect( *s ); // bad point! I can't read from s in ctor indeed! - // if ( s->rdbuf()->in_avail() ) { - // std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); - // ready_pool.push_back( processor( c, s ) ); - // cnd.notify_one(); - // } else { - std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); - worker_pool.insert( std::make_pair( fd, processor( c, s ) ) ); - // } + if ( s->rdbuf()->in_avail() ) { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( processor( c, s ) ); + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + cnd.notify_one(); + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + worker_pool.insert( std::make_pair( fd, processor( c, s ) ) ); + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + } return s->rdbuf(); } @@ -233,10 +235,12 @@ template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( sock_base::socket_type fd, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_close_t& ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; { std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); typename worker_pool_t::iterator i = worker_pool.find( fd ); if ( i != worker_pool.end() ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; delete i->second.c; delete i->second.s; // std::cerr << "oops\n"; @@ -257,6 +261,10 @@ } if ( p.c != 0 ) { // (*p.c)( *p.s ); + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + + (p.c->*C)( *p.s ); + delete p.c; delete p.s; } @@ -265,6 +273,8 @@ template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( sock_base::socket_type fd ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + processor p; { @@ -331,12 +341,28 @@ std::cerr << __FILE__ << ":" << __LINE__ << std::endl; } } + + { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + std::cerr << __FILE__ << ":" << __LINE__ << " " << worker_pool.size() << std::endl; + } + + { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + std::cerr << __FILE__ << ":" << __LINE__ << " " << ready_pool.size() << std::endl; + } } template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> void connect_processor<Connect, charT, traits, _Alloc, C>::stop() { + _stop(); +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::_stop() +{ std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); _in_work = false; // <--- set before cnd.notify_one(); (below in this func) @@ -344,6 +370,8 @@ ready_pool.push_back( processor() ); // make ready_pool not empty std::cerr << __FILE__ << ":" << __LINE__ << std::endl; cnd.notify_one(); + } else { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; } } Modified: branches/complement-sockios/explore/include/sockios/socksrv.h =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.h 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/include/sockios/socksrv.h 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/19 20:14:01 yeti> +// -*- C++ -*- Time-stamp: <08/06/25 21:30:31 yeti> /* * Copyright (c) 2008 @@ -66,7 +66,7 @@ virtual ~sock_processor_base() { - sock_processor_base::close(); + sock_processor_base::_close(); // Never uncomment next line: // basic_socket<charT,traits,_Alloc>::mgr->final( *this ); @@ -85,7 +85,8 @@ void open( int port, sock_base::stype type, sock_base::protocol prot ) { sock_processor_base::open(INADDR_ANY, port, type, prot); } - virtual void close(); + virtual void close() + { _close(); } #if 0 virtual void stop() = 0; #else @@ -122,8 +123,10 @@ return s; } - void (sock_processor_base::*_real_stop)(); + void _close(); + // void (sock_processor_base::*_real_stop)(); + public: bool is_open() const { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); return basic_socket_t::is_open_unsafe(); } @@ -193,8 +196,10 @@ virtual ~connect_processor() { - connect_processor::close(); + connect_processor::_close(); + // _stop(); + if ( ploop.joinable() ) { ploop.join(); } @@ -235,7 +240,8 @@ ((Init *)Init_buf)->~Init(); } - virtual void close(); + virtual void close() + { connect_processor::_close(); } virtual void stop(); void wait() @@ -298,6 +304,8 @@ }; bool pop_ready( processor& ); + void _close(); + void _stop(); #ifdef __USE_STLPORT_HASH typedef std::hash_map<sock_base::socket_type, processor> worker_pool_t; Modified: branches/complement-sockios/explore/include/sockios/sockstream.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream.cc 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/include/sockios/sockstream.cc 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 17:09:02 yeti> +// -*- C++ -*- Time-stamp: <08/06/25 18:48:36 yeti> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -272,10 +272,11 @@ setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); setg( this->epptr(), this->epptr(), this->epptr() ); - if ( basic_socket_t::_notify_close ) { - basic_socket_t::mgr->exit_notify( this, basic_socket_t::_fd ); - basic_socket_t::_notify_close = false; - } + // if ( basic_socket_t::_notify_close ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << basic_socket_t::_fd << std::endl; + basic_socket_t::mgr->exit_notify( this, basic_socket_t::_fd ); + // basic_socket_t::_notify_close = false; + // } basic_socket_t::_fd = -1; Modified: branches/complement-sockios/explore/include/sockios/sp.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/25 12:00:59 ptr> +// -*- C++ -*- Time-stamp: <08/06/25 22:25:48 yeti> /* * Copyright (c) 2008 @@ -45,19 +45,6 @@ for ( int i = 0; i < n; ++i ) { // std::cerr << "epoll i = " << i << std::endl; - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - for ( typename fd_container_type::iterator closed_ifd = closed_queue.begin(); closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( epoll_ctl( efd, EPOLL_CTL_DEL, closed_ifd->first, 0 ) < 0 ) { - // ignore - } - if ( closed_ifd->first == ev[i].data.fd ) { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - } - // descr.erase( closed_ifd->first ); - } - closed_queue.clear(); - // at this point closed queue empty - if ( ev[i].data.fd == pipefd[0] ) { // std::cerr << "on pipe\n"; cmd_from_pipe(); @@ -66,6 +53,7 @@ typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); if ( ifd == descr.end() ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ev[i].data.fd, 0 ) < 0 ) { // throw system_error } @@ -103,6 +91,7 @@ if ( r < 0 ) { // throw system_error // std::cerr << "Read pipe\n"; + throw std::detail::stop_request(); // runtime_error( "Stop request (normal flow)" ); } else if ( r == 0 ) { // std::cerr << "Read pipe 0\n"; throw runtime_error( "Read pipe return 0" ); @@ -117,22 +106,44 @@ // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; throw std::runtime_error( "can't establish nonblock mode on listener" ); } + if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // descr.erase( ev_add.data.fd ); + // throw system_error + return; + } + } else { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // throw system_error + return; + } + } descr[ev_add.data.fd] = fd_info( static_cast<socks_processor_t*>(_ctl.data.ptr) ); - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - descr.erase( ev_add.data.fd ); - // throw system_error - } } break; case tcp_buffer: ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); if ( ev_add.data.fd >= 0 ) { + if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse? + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // descr.erase( ev_add.data.fd ); + // throw system_error + return; + } + } else { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + // throw system_error + return; + } + } descr[ev_add.data.fd] = fd_info( static_cast<sockbuf_t*>(_ctl.data.ptr) ); - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - descr.erase( ev_add.data.fd ); - // throw system_error - } } break; case listener_on_exit: @@ -158,8 +169,10 @@ { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl; } if ( ifd->second.p != 0 ) { @@ -212,9 +225,10 @@ } if ( !(errno == EAGAIN /* || errno == EWOULDBLOCK */ ) ) { // EWOULDBLOCK == EAGAIN // std::cerr << "Accept, listener " << ev.data.fd << ", errno " << errno << std::endl; - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " + << errno << std::endl; // throw system_error } @@ -260,11 +274,21 @@ ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; ev_add.data.fd = fd; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { - descr.erase( fd ); - // throw system_error + if ( descr.find( fd ) != descr.end() ) { // reuse? std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - return; // throw + if ( epoll_ctl( efd, EPOLL_CTL_MOD, fd, &ev_add ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << errno << std::endl; + descr.erase( fd ); + // throw system_error + return; // throw + } + } else { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { + // throw system_error + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + return; // throw + } } std::cerr << __FILE__ << ":" << __LINE__ << std::endl; @@ -292,6 +316,7 @@ sockbuf_t* b = info.b; if ( b == 0 ) { // marginal case: sockbuf wasn't created by processor... + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { // throw system_error } @@ -399,7 +424,7 @@ if ( (ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) != 0 ) { // std::cerr << "Poll EPOLLRDHUP " << ev.data.fd << ", " << errno << std::endl; - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { // throw system_error std::cerr << __FILE__ << ":" << __LINE__ << std::endl; @@ -412,7 +437,6 @@ socks_processor_t* p = info.p; - closed_queue.erase( ev.data.fd ); descr.erase( ifd ); int lfd = check_closed_listener( p ); @@ -420,10 +444,13 @@ descr.erase( lfd ); } } else { - std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + b->_notify_close = false; // avoid deadlock + std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + } + descr.erase( ifd ); b->close(); - closed_queue.erase( ev.data.fd ); - descr.erase( ifd ); } dump_descr(); } @@ -452,6 +479,7 @@ { int myfd = -1; + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( !listeners_final.empty() ) { std::cerr << __FILE__ << ":" << __LINE__ << std::endl; if ( listeners_final.find( static_cast<void*>(p) ) != listeners_final.end() ) { @@ -472,7 +500,10 @@ std::cerr << __FILE__ << ":" << __LINE__ << std::endl; - p->stop(); + // if ( myfd != -1 ) { + // std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + p->stop(); + // } } } Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/25 11:54:39 ptr> +// -*- C++ -*- Time-stamp: <08/06/25 22:10:36 yeti> /* * Copyright (c) 2008 @@ -225,17 +225,36 @@ if ( r < 0 || r != sizeof(ctl) ) { throw std::runtime_error( "can't write to pipe" ); } - -// fd_info info = { fd_info::listener, 0, &p }; -// std::tr2::lock_guard<std::tr2::mutex> lk( cll ); -// closed_queue[_fd] = info; } void exit_notify( sockbuf_t* b, sock_base::socket_type fd ) { // fd_info info = { 0, 0, 0 }; - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - closed_queue[fd] = fd_info(); + // std::tr2::lock_guard<std::tr2::mutex> lk( dll ); + + try { + std::tr2::unique_lock<std::tr2::mutex> lk( dll, std::tr2::try_to_lock ); + + if ( b->_notify_close ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + typename fd_container_type::iterator i = descr.find( fd ); + if ( i != descr.end() ) { + if ( (i->second.b == b) && (i->second.p == 0) ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, fd, 0 ) < 0 ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + // throw system_error + } + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + descr.erase( i ); + } + } + b->_notify_close = false; + } + } + catch ( const std::tr2::lock_error& ) { + std::cerr << __FILE__ << ":" << __LINE__ << " " << fd << std::endl; + } } private: @@ -271,9 +290,7 @@ const int n_ret; fd_container_type descr; - fd_container_type closed_queue; listener_container_type listeners_final; - std::tr2::mutex cll; std::tr2::mutex dll; }; Modified: branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc =================================================================== --- branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-06-26 05:46:17 UTC (rev 1917) +++ branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc 2008-06-26 05:46:44 UTC (rev 1918) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/17 16:35:39 yeti> +// -*- C++ -*- Time-stamp: <08/06/25 17:59:36 yeti> /* * @@ -266,7 +266,14 @@ { lock_guard<mutex> lk(lock); --cnt; } void connect( sockstream& s ) - { lock_guard<mutex> lk(lock); getline( s, line ); ++rd; line_cnd.notify_one(); } + { + lock_guard<mutex> lk(lock); + getline( s, line ); + cerr << __FILE__ << ":" << __LINE__ << " " << s.good() << " " + << s.rdbuf()->in_avail() << endl; + ++rd; + line_cnd.notify_one(); + } // void close() // { } @@ -294,7 +301,7 @@ mutex worker::lock; int worker::cnt = 0; -/* volatile */ int worker::visits = 0; +int worker::visits = 0; condition_variable worker::cnd; string worker::line; condition_variable worker::line_cnd; @@ -413,7 +420,7 @@ } unique_lock<mutex> lk( worker::lock ); - EXAM_CHECK( worker::line_cnd.timed_wait( lk, milliseconds( 500 ), worker::rd_counter1 ) ); + EXAM_CHECK( worker::line_cnd.timed_wait( lk, milliseconds( 50000 ), worker::rd_counter1 ) ); // cerr << worker::line << endl; EXAM_CHECK( worker::line == "Hello, world!" ); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |