Thread: [complement-svn] SF.net SVN: complement: [1841] branches/complement-sockios/explore/include/ sockio
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-04-02 18:44:25
|
Revision: 1841 http://complement.svn.sourceforge.net/complement/?rev=1841&view=rev Author: complement Date: 2008-04-02 11:44:21 -0700 (Wed, 02 Apr 2008) Log Message: ----------- spit long function into shorter ones; under construction Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/sp.h Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-04-02 07:23:05 UTC (rev 1840) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-04-02 18:44:21 UTC (rev 1841) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/04/02 10:25:57 ptr> +// -*- C++ -*- Time-stamp: <08/04/02 22:22:19 yeti> /* * Copyright (c) 2008 @@ -735,13 +735,7 @@ sockmgr& operator =( const sockmgr& ) { return *this; } - void io_worker(); - int efd; - int pipefd[2]; - std::tr2::thread *_worker; - const int n_ret; - #ifdef __USE_STLPORT_HASH typedef std::hash_map<int,fd_info> fd_container_type; #endif @@ -752,6 +746,16 @@ typedef std::tr1::unordered_map<int, fd_info> fd_container_type; #endif + void io_worker(); + void cmd_from_pipe(); + void process_listener( epoll_event&, typename fd_container_type::iterator ); + void process_regular( epoll_event&, typename fd_container_type::iterator ); + + int efd; + int pipefd[2]; + std::tr2::thread *_worker; + const int n_ret; + fd_container_type descr; fd_container_type closed_queue; std::tr2::mutex cll; @@ -787,341 +791,363 @@ // std::cerr << "epoll i = " << i << std::endl; if ( ev[i].data.fd == pipefd[0] ) { // std::cerr << "on pipe\n"; - epoll_event ev_add; - ctl _ctl; - int r = read( pipefd[0], &_ctl, sizeof(ctl) ); - if ( r < 0 ) { + cmd_from_pipe(); + } else { + // std::cerr << "#\n"; + + typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); + if ( ifd == descr.end() ) { + throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); + } + + fd_info& info = ifd->second; + if ( info.flags & fd_info::listener ) { + // std::cerr << "%\n"; + process_listener( ev[i], ifd ); + } else { + // std::cerr << "not listener\n"; + process_regular( ev[i], ifd ); + } + } + } + } + } + catch ( std::exception& e ) { + std::cerr << e.what() << std::endl; + } +} + +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::cmd_from_pipe() +{ + epoll_event ev_add; + ctl _ctl; + + int r = read( pipefd[0], &_ctl, sizeof(ctl) ); + if ( r < 0 ) { + // throw system_error + // std::cerr << "Read pipe\n"; + } else if ( r == 0 ) { + std::cerr << "Read pipe 0\n"; + return; + } + + switch ( _ctl.cmd ) { + case listener: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; + throw std::runtime_error( "can't establish nonblock mode on listener" ); + } + fd_info new_info = { fd_info::listener, 0, static_cast<socks_processor_t*>(_ctl.data.ptr) }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); // throw system_error - // std::cerr << "Read pipe\n"; - } else if ( r == 0 ) { - std::cerr << "Read pipe 0\n"; - return; } - - switch ( _ctl.cmd ) { - case listener: - ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); - if ( ev_add.data.fd >= 0 ) { - if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { - // std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl; - throw std::runtime_error( "can't establish nonblock mode on listener" ); - } - fd_info new_info = { fd_info::listener, 0, static_cast<socks_processor_t*>(_ctl.data.ptr) }; - descr[ev_add.data.fd] = new_info; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - descr.erase( ev_add.data.fd ); - // throw system_error - } - } - break; + } + break; #if 0 - case tcp_stream: - ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - ev_add.data.fd = static_cast<sockstream_t*>(_ctl.data.ptr)->rdbuf()->fd(); - if ( ev_add.data.fd >= 0 ) { - fd_info new_info = { 0, static_cast<sockstream_t*>(_ctl.data.ptr), 0 }; - descr[ev_add.data.fd] = new_info; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - descr.erase( ev_add.data.fd ); - // throw system_error - } - } - break; + case tcp_stream: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<sockstream_t*>(_ctl.data.ptr)->rdbuf()->fd(); + if ( ev_add.data.fd >= 0 ) { + fd_info new_info = { 0, static_cast<sockstream_t*>(_ctl.data.ptr), 0 }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); + // throw system_error + } + } + break; #endif - case tcp_buffer: - ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); - if ( ev_add.data.fd >= 0 ) { - fd_info new_info = { fd_info::buffer, static_cast<sockstream_t* /* sockbuf_t* */ >(_ctl.data.ptr), 0 }; - descr[ev_add.data.fd] = new_info; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { - descr.erase( ev_add.data.fd ); - // throw system_error - } - } - break; - case rqstop: - // std::cerr << "Stop request\n"; - return; - break; + case tcp_buffer: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + fd_info new_info = { fd_info::buffer, static_cast<sockstream_t* /* sockbuf_t* */ >(_ctl.data.ptr), 0 }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); + // throw system_error } - - continue; } - // std::cerr << "#\n"; + break; + case rqstop: + // std::cerr << "Stop request\n"; + return; + break; + } +} - typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); - if ( ifd == descr.end() ) { - throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); - } +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::process_listener( epoll_event& ev, typename sockmgr<charT,traits,_Alloc>::fd_container_type::iterator ifd ) +{ + if ( ev.events & EPOLLRDHUP ) { + epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ); + // walk through descr and detach every .p ? + descr.erase( ifd ); + std::cerr << "Remove listener EPOLLRDHUP\n"; + } else if ( ev.events & EPOLLIN ) { + sockaddr addr; + socklen_t sz = sizeof( sockaddr_in ); - fd_info& info = ifd->second; - if ( info.flags & fd_info::listener ) { - // std::cerr << "%\n"; - if ( ev[i].events & EPOLLRDHUP ) { - epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ); - // walk through descr and detach every .p ? - descr.erase( ifd ); - std::cerr << "Remove listener EPOLLRDHUP\n"; - } else if ( ev[i].events & EPOLLIN ) { - sockaddr addr; - socklen_t sz = sizeof( sockaddr_in ); + fd_info& info = ifd->second; - for ( ; ; ) { - int fd = accept( ev[i].data.fd, &addr, &sz ); - if ( fd < 0 ) { - std::cerr << "Accept, listener # " << ev[i].data.fd << ", errno " << errno << std::endl; - if ( (errno == EINTR) || (errno == ECONNABORTED) /* || (errno == ERESTARTSYS) */ ) { - continue; - } - if ( !(errno == EAGAIN || errno == EWOULDBLOCK) ) { - // std::cerr << "Accept, listener " << ev[i].data.fd << ", errno " << errno << std::endl; - // throw system_error ? - } + for ( ; ; ) { + int fd = accept( ev.data.fd, &addr, &sz ); + if ( fd < 0 ) { + std::cerr << "Accept, listener # " << ev.data.fd << ", errno " << errno << std::endl; + if ( (errno == EINTR) || (errno == ECONNABORTED) /* || (errno == ERESTARTSYS) */ ) { + continue; + } + if ( !(errno == EAGAIN || errno == EWOULDBLOCK) ) { + // std::cerr << "Accept, listener " << ev[i].data.fd << ", errno " << errno << std::endl; + // throw system_error ? + } #if 0 - { - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.find( ev[i].data.fd ); - if ( closed_ifd != closed_queue.end() ) { - typename fd_container_type::iterator ifd = descr.begin(); - for ( ; ifd != descr.end(); ) { - if ( ifd->second.p == closed_ifd->second.p ) { - descr.erase( ifd++ ); - } else { - ++ifd - } - } - closed_queue.erase( closed_ifd ); + { + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.find( ev.data.fd ); + if ( closed_ifd != closed_queue.end() ) { + typename fd_container_type::iterator ifd = descr.begin(); + for ( ; ifd != descr.end(); ) { + if ( ifd->second.p == closed_ifd->second.p ) { + descr.erase( ifd++ ); + } else { + ++ifd; } } + closed_queue.erase( closed_ifd ); + } + } +#endif + break; + } + // std::cerr << "listener accept " << fd << std::endl; + if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + sockstream_t* s; + + try { + s = new sockstream_t(); + if ( s->rdbuf()->_open_sockmgr( fd, addr ) ) { + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = fd; + fd_info new_info = { fd_info::owner, s, info.p }; + descr[fd] = new_info; -#endif + if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { + std::cerr << "Accept, add " << fd << ", errno " << errno << std::endl; + descr.erase( fd ); + // throw system_error + } + std::cerr << "adopt_new_t()\n"; + std::tr2::lock_guard<std::tr2::mutex> lk( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.begin(); + for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { + if ( closed_ifd->second.p == info.p ) { break; } - // std::cerr << "listener accept " << fd << std::endl; - if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { - throw std::runtime_error( "can't establish nonblock mode" ); - } - sockstream_t* s; + } + if ( closed_ifd == closed_queue.end() ) { + (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); + } else { + std::cerr << "@@@ 1\n" << std::endl; + } + } else { + std::cerr << "Accept, delete " << fd << std::endl; + delete s; + } + } + catch ( const std::bad_alloc& ) { + // nothing + } + catch ( ... ) { + descr.erase( fd ); + delete s; + } + } + } else { + // std::cerr << "listener: " << std::hex << ev.events << std::dec << std::endl; + } +} - try { - s = new sockstream_t(); - if ( s->rdbuf()->_open_sockmgr( fd, addr ) ) { - epoll_event ev_add; - ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - ev_add.data.fd = fd; - fd_info new_info = { fd_info::owner, s, info.p }; - descr[fd] = new_info; +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::process_regular( epoll_event& ev, typename sockmgr<charT,traits,_Alloc>::fd_container_type::iterator ifd ) +{ + fd_info& info = ifd->second; - if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { - std::cerr << "Accept, add " << fd << ", errno " << errno << std::endl; - descr.erase( fd ); - // throw system_error - } - std::cerr << "adopt_new_t()\n"; - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.begin(); - for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( closed_ifd->second.p == info.p ) { - break; - } - } - if ( closed_ifd == closed_queue.end() ) { - (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); - } else { - std::cerr << "@@@ 1\n" << std::endl; - } - } else { - std::cerr << "Accept, delete " << fd << std::endl; - delete s; - } - } - catch ( const std::bad_alloc& ) { - // nothing - } - catch ( ... ) { - descr.erase( fd ); - delete s; - } + if ( ev.events & EPOLLIN ) { + if ( (info.flags & fd_info::owner) == 0 ) { + // marginal case: me not owner (registerd via push(), + // when I owner, I know destroy point), + // already closed, but I not see closed event yet; + // object may be deleted already, so I can't + // call b->egptr() etc. here + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.find( ev.data.fd ); + if ( closed_ifd != closed_queue.end() ) { + closed_queue.erase( closed_ifd ); + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + } + descr.erase( ifd ); + return; + } + } + sockbuf_t* b = (info.flags & fd_info::buffer != 0) ? info.s.b : info.s.s->rdbuf(); + errno = 0; + for ( ; ; ) { + if ( b->_ebuf == b->egptr() ) { + // process extract data from buffer too slow for us! + if ( (info.flags & fd_info::level_triggered) == 0 ) { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + xev.data.fd = ev.data.fd; + info.flags |= fd_info::level_triggered; + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << "X " << ev.data.fd << ", " << errno << std::endl; } - } else { - // std::cerr << "listener: " << std::hex << ev[i].events << std::dec << std::endl; } - } else { - // std::cerr << "not listener\n"; - if ( ev[i].events & EPOLLIN ) { - if ( (info.flags & fd_info::owner) == 0 ) { - // marginal case: me not owner (registerd via push(), - // when I owner, I know destroy point), - // already closed, but I not see closed event yet; - // object may be deleted already, so I can't - // call b->egptr() etc. here - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.find( ev[i].data.fd ); - if ( closed_ifd != closed_queue.end() ) { - closed_queue.erase( closed_ifd ); - if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { - // throw system_error - } - descr.erase( ifd ); - continue; + std::cerr << "Z " << ev.data.fd << ", " << errno << std::endl; + if ( info.p != 0 ) { + std::tr2::lock_guard<std::tr2::mutex> lk( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.begin(); + for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { + if ( closed_ifd->second.p == info.p ) { + break; } } - sockbuf_t* b = (info.flags & fd_info::buffer != 0) ? info.s.b : info.s.s->rdbuf(); + if ( closed_ifd == closed_queue.end() ) { + (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); + } else { + std::cerr << "@@@ 2\n" << std::endl; + } + } + break; + } + std::cerr << "ptr " << (void *)b->egptr() << ", " << errno << std::endl; + long offset = read( ev.data.fd, b->egptr(), sizeof(charT) * (b->_ebuf - b->egptr()) ); + std::cerr << "offset " << offset << ", " << errno << std::endl; + if ( offset < 0 ) { + if ( (errno == EAGAIN) || (errno == EINTR) ) { errno = 0; - for ( ; ; ) { - if ( b->_ebuf == b->egptr() ) { - // process extract data from buffer too slow for us! - if ( (info.flags & fd_info::level_triggered) == 0 ) { - epoll_event xev; - xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; - xev.data.fd = ev[i].data.fd; - info.flags |= fd_info::level_triggered; - if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev[i].data.fd, &xev ) < 0 ) { - std::cerr << "X " << ev[i].data.fd << ", " << errno << std::endl; - } - } - std::cerr << "Z " << ev[i].data.fd << ", " << errno << std::endl; - if ( info.p != 0 ) { - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.begin(); - for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( closed_ifd->second.p == info.p ) { - break; - } - } - if ( closed_ifd == closed_queue.end() ) { - (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); - } else { - std::cerr << "@@@ 2\n" << std::endl; - } - } + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + xev.data.fd = ev.data.fd; + epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ); + break; + } else { + switch ( errno ) { + // case EINTR: // read was interrupted + // continue; + // break; + case EFAULT: // Bad address + case ECONNRESET: // Connection reset by peer + ev.events |= EPOLLRDHUP; // will be processed below break; - } - std::cerr << "ptr " << (void *)b->egptr() << ", " << errno << std::endl; - long offset = read( ev[i].data.fd, b->egptr(), sizeof(charT) * (b->_ebuf - b->egptr()) ); - std::cerr << "offset " << offset << ", " << errno << std::endl; - if ( offset < 0 ) { - if ( (errno == EAGAIN) || (errno == EINTR) ) { - errno = 0; - epoll_event xev; - xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - xev.data.fd = ev[i].data.fd; - epoll_ctl( efd, EPOLL_CTL_MOD, ev[i].data.fd, &xev ); - break; - } else { - switch ( errno ) { - // case EINTR: // read was interrupted - // continue; - // break; - case EFAULT: // Bad address - case ECONNRESET: // Connection reset by peer - ev[i].events |= EPOLLRDHUP; // will be processed below - break; - default: - std::cerr << "not listener, other " << ev[i].data.fd << std::hex << ev[i].events << std::dec << " : " << errno << std::endl; - break; - } - break; - } - } else if ( offset > 0 ) { - offset /= sizeof(charT); // if offset % sizeof(charT) != 0, rest will be lost! - - if ( info.flags & fd_info::level_triggered ) { - epoll_event xev; - xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; - xev.data.fd = ev[i].data.fd; - info.flags &= ~static_cast<unsigned>(fd_info::level_triggered); - if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev[i].data.fd, &xev ) < 0 ) { - std::cerr << "Y " << ev[i].data.fd << ", " << errno << std::endl; - } - } - std::tr2::lock_guard<std::tr2::mutex> lk( b->ulck ); - b->setg( b->eback(), b->gptr(), b->egptr() + offset ); - b->ucnd.notify_one(); - if ( info.p != 0 ) { - // std::cerr << "data here" << std::endl; - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.begin(); - for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( closed_ifd->second.p == info.p ) { - break; - } - } - if ( closed_ifd == closed_queue.end() ) { - (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); - } else { - std::cerr << "@@@ 3\n" << std::endl; - } - } - } else { - std::cerr << "K " << ev[i].data.fd << ", " << errno << std::endl; - // EPOLLRDHUP may be missed in kernel, but offset 0 is the same - ev[i].events |= EPOLLRDHUP; // will be processed below + default: + std::cerr << "not listener, other " << ev.data.fd << std::hex << ev.events << std::dec << " : " << errno << std::endl; break; - } } - } else { - std::cerr << "Q\n"; + break; } - if ( (ev[i].events & EPOLLRDHUP) || (ev[i].events & EPOLLHUP) || (ev[i].events & EPOLLERR) ) { - // std::cerr << "Poll EPOLLRDHUP " << ev[i].data.fd << ", " << errno << std::endl; - if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { - // throw system_error + } else if ( offset > 0 ) { + offset /= sizeof(charT); // if offset % sizeof(charT) != 0, rest will be lost! + + if ( info.flags & fd_info::level_triggered ) { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + xev.data.fd = ev.data.fd; + info.flags &= ~static_cast<unsigned>(fd_info::level_triggered); + if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) { + std::cerr << "Y " << ev.data.fd << ", " << errno << std::endl; } - if ( info.p != 0 ) { - std::tr2::lock_guard<std::tr2::mutex> lk( cll ); - typename fd_container_type::iterator closed_ifd = closed_queue.begin(); - for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { - if ( closed_ifd->second.p == info.p ) { - break; - } + } + std::tr2::lock_guard<std::tr2::mutex> lk( b->ulck ); + b->setg( b->eback(), b->gptr(), b->egptr() + offset ); + b->ucnd.notify_one(); + if ( info.p != 0 ) { + // std::cerr << "data here" << std::endl; + std::tr2::lock_guard<std::tr2::mutex> lk( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.begin(); + for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { + if ( closed_ifd->second.p == info.p ) { + break; } - if ( closed_ifd == closed_queue.end() ) { - (*info.p)( *info.s.s, typename socks_processor_t::adopt_close_t() ); - } else { - std::cerr << "@@@ 4\n" << std::endl; - } } - if ( (info.flags & fd_info::owner) != 0 ) { - delete info.s.s; + if ( closed_ifd == closed_queue.end() ) { + (*info.p)( *info.s.s, typename socks_processor_t::adopt_data_t() ); } else { - if ( (info.flags & fd_info::buffer) != 0 ) { - info.s.b->close(); - } else { - info.s.s->close(); - } - std::tr2::lock_guard<std::tr2::mutex> lck( cll ); - closed_queue.erase( ev[i].data.fd ); + std::cerr << "@@@ 3\n" << std::endl; } - descr.erase( ifd ); } - // if ( ev[i].events & EPOLLHUP ) { - // std::cerr << "Poll HUP" << std::endl; - // } - // if ( ev[i].events & EPOLLERR ) { - // std::cerr << "Poll ERR" << std::endl; - // } - if ( ev[i].events & EPOLLPRI ) { - std::cerr << "Poll PRI" << std::endl; + } else { + std::cerr << "K " << ev.data.fd << ", " << errno << std::endl; + // EPOLLRDHUP may be missed in kernel, but offset 0 is the same + ev.events |= EPOLLRDHUP; // will be processed below + break; + } + } + } else { + std::cerr << "Q\n"; + } + + if ( (ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) != 0 ) { + // std::cerr << "Poll EPOLLRDHUP " << ev.data.fd << ", " << errno << std::endl; + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + } + if ( info.p != 0 ) { + std::tr2::lock_guard<std::tr2::mutex> lk( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.begin(); + for ( ; closed_ifd != closed_queue.end(); ++closed_ifd ) { + if ( closed_ifd->second.p == info.p ) { + break; } - if ( ev[i].events & EPOLLRDNORM ) { - std::cerr << "Poll RDNORM" << std::endl; - } - if ( ev[i].events & EPOLLRDBAND ) { - std::cerr << "Poll RDBAND" << std::endl; - } - if ( ev[i].events & EPOLLMSG ) { - std::cerr << "Poll MSG" << std::endl; - } } + if ( closed_ifd == closed_queue.end() ) { + (*info.p)( *info.s.s, typename socks_processor_t::adopt_close_t() ); + } else { + std::cerr << "@@@ 4\n" << std::endl; + } } + if ( (info.flags & fd_info::owner) != 0 ) { + delete info.s.s; + } else { + if ( (info.flags & fd_info::buffer) != 0 ) { + info.s.b->close(); + } else { + info.s.s->close(); + } + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + closed_queue.erase( ev.data.fd ); + } + descr.erase( ifd ); } + // if ( ev.events & EPOLLHUP ) { + // std::cerr << "Poll HUP" << std::endl; + // } + // if ( ev.events & EPOLLERR ) { + // std::cerr << "Poll ERR" << std::endl; + // } + if ( ev.events & EPOLLPRI ) { + std::cerr << "Poll PRI" << std::endl; } - catch ( std::exception& e ) { - std::cerr << e.what() << std::endl; + if ( ev.events & EPOLLRDNORM ) { + std::cerr << "Poll RDNORM" << std::endl; } + if ( ev.events & EPOLLRDBAND ) { + std::cerr << "Poll RDBAND" << std::endl; + } + if ( ev.events & EPOLLMSG ) { + std::cerr << "Poll MSG" << std::endl; + } } } //detail This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <com...@us...> - 2008-04-03 06:12:53
|
Revision: 1842 http://complement.svn.sourceforge.net/complement/?rev=1842&view=rev Author: complement Date: 2008-04-02 23:12:50 -0700 (Wed, 02 Apr 2008) Log Message: ----------- throw from loop, if stop cmd detected; under construction Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/sp.h Modified: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h 2008-04-02 18:44:21 UTC (rev 1841) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-04-03 06:12:50 UTC (rev 1842) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/04/02 22:22:19 yeti> +// -*- C++ -*- Time-stamp: <08/04/03 01:05:05 ptr> /* * Copyright (c) 2008 @@ -778,40 +778,40 @@ */ try { - for ( ; ; ) { - int n = epoll_wait( efd, &ev[0], /* n_ret */ 512, -1 ); - if ( n < 0 ) { - if ( errno == EINTR ) { - continue; + for ( ; ; ) { + int n = epoll_wait( efd, &ev[0], /* n_ret */ 512, -1 ); + if ( n < 0 ) { + if ( errno == EINTR ) { + continue; + } + // throw system_error } - // throw system_error - } - // std::cerr << "epoll see " << n << std::endl; - for ( int i = 0; i < n; ++i ) { - // std::cerr << "epoll i = " << i << std::endl; - if ( ev[i].data.fd == pipefd[0] ) { - // std::cerr << "on pipe\n"; - cmd_from_pipe(); - } else { - // std::cerr << "#\n"; + // std::cerr << "epoll see " << n << std::endl; + for ( int i = 0; i < n; ++i ) { + // std::cerr << "epoll i = " << i << std::endl; + if ( ev[i].data.fd == pipefd[0] ) { + // std::cerr << "on pipe\n"; + cmd_from_pipe(); + } else { + // std::cerr << "#\n"; - typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); - if ( ifd == descr.end() ) { - throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); - } + typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); + if ( ifd == descr.end() ) { + throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); + } - fd_info& info = ifd->second; - if ( info.flags & fd_info::listener ) { - // std::cerr << "%\n"; - process_listener( ev[i], ifd ); - } else { - // std::cerr << "not listener\n"; - process_regular( ev[i], ifd ); + fd_info& info = ifd->second; + if ( info.flags & fd_info::listener ) { + // std::cerr << "%\n"; + process_listener( ev[i], ifd ); + } else { + // std::cerr << "not listener\n"; + process_regular( ev[i], ifd ); + } } } } } - } catch ( std::exception& e ) { std::cerr << e.what() << std::endl; } @@ -828,8 +828,8 @@ // throw system_error // std::cerr << "Read pipe\n"; } else if ( r == 0 ) { - std::cerr << "Read pipe 0\n"; - return; + // std::cerr << "Read pipe 0\n"; + throw runtime_error( "Read pipe return 0" ); } switch ( _ctl.cmd ) { @@ -877,7 +877,7 @@ break; case rqstop: // std::cerr << "Stop request\n"; - return; + throw runtime_error( "Stop request (normal flow)" ); break; } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |