[complement-svn] SF.net SVN: complement: [1447] trunk/complement/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2006-12-13 18:29:22
|
Revision: 1447 http://svn.sourceforge.net/complement/?rev=1447&view=rev Author: complement Date: 2006-12-13 10:29:16 -0800 (Wed, 13 Dec 2006) Log Message: ----------- container now single owner of _Connect objects; call close() and destructor of Connect instance incapsulated into _Connect Modified Paths: -------------- trunk/complement/explore/include/sockios/sockmgr.cc trunk/complement/explore/include/sockios/sockmgr.h trunk/complement/explore/lib/sockios/ChangeLog Modified: trunk/complement/explore/include/sockios/sockmgr.cc =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.cc 2006-12-13 18:25:00 UTC (rev 1446) +++ trunk/complement/explore/include/sockios/sockmgr.cc 2006-12-13 18:29:16 UTC (rev 1447) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <06/11/29 18:43:21 ptr> +// -*- C++ -*- Time-stamp: <06/12/13 18:07:06 ptr> /* * Copyright (c) 1997-1999, 2002, 2003, 2005, 2006 @@ -96,6 +96,9 @@ typename iterator_traits<typename _fd_sequence::iterator>::difference_type d; for ( _fd_sequence::iterator j = _pfd.begin() + 2; j != _pfd.end(); ++j ) { // _pfd at least 2 in size + // if ( j->fd == -1 ) { + // cerr << __FILE__ << ":" << __LINE__ << endl; + // } if ( j->revents != 0 ) { // We should distinguish closed socket from income message typename container_type::iterator i = @@ -108,23 +111,22 @@ d = j - _pfd.begin(); _pfd.erase( j ); j = _pfd.begin() + (d - 1); - for ( i = _M_c.begin(); i != _M_c.end(); ++i ) { - if ( (*i)->s->rdbuf()->fd() == -1 ) { - (*i)->s->close(); - (*i)->_proc->close(); - delete (*i)->_proc; - (*i)->_proc = 0; - } + i = _M_c.begin(); + while ( (i = find_if( i, _M_c.end(), bind2nd( _M_comp, -1 ) )) != _M_c.end() ) { + _dlock.lock(); + std::remove( _conn_pool.begin(), _conn_pool.end(), i ); + _dlock.unlock(); + _M_c.erase( i++ ); } } else if ( j->revents & POLLERR /* | POLLHUP | POLLNVAL */ ) { // poll first see closed socket d = j - _pfd.begin(); _pfd.erase( j ); j = _pfd.begin() + (d - 1); - (*i)->s->close(); - (*i)->_proc->close(); - delete (*i)->_proc; - (*i)->_proc = 0; + _dlock.lock(); + std::remove( _conn_pool.begin(), _conn_pool.end(), i ); + _dlock.unlock(); + _M_c.erase( i ); } else { // Check that other side close socket: // on Linux and (?) Solaris I see normal POLLIN event, and see error @@ -138,13 +140,13 @@ d = j - _pfd.begin(); _pfd.erase( j ); j = _pfd.begin() + (d - 1); - (*i)->s->close(); - (*i)->_proc->close(); - delete (*i)->_proc; - (*i)->_proc = 0; + _dlock.lock(); + std::remove( _conn_pool.begin(), _conn_pool.end(), i ); + _dlock.unlock(); + _M_c.erase( i ); } else { // normal data available for reading _dlock.lock(); - _conn_pool.push_back( *i ); + _conn_pool.push_back( i ); // xmt::Thread::gettime( &_tpush ); _pool_cnd.set( true ); _observer_cnd.set( true ); @@ -215,38 +217,17 @@ try { _Connect *cl_new; - typename container_type::iterator i = - find_if( _M_c.begin(), _M_c.end(), bind2nd( _M_comp, -1 ) ); - - if ( i == _M_c.end() ) { // we need new message processor - cl_new = new _Connect(); - cl_new->s = new sockstream(); - cl_new->s->open( _sd, addr.any ); - cl_new->_proc = new Connect( *cl_new->s ); - _M_c.push_back( cl_new ); - if ( cl_new->s->rdbuf()->in_avail() > 0 ) { - // this is the case when user read from sockstream - // in ctor above; push processing of this stream - MT_REENTRANT( _dlock, _1 ); - _conn_pool.push_back( cl_new ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _in_buf = true; - } - } else { // we can reuse old - cl_new = *i; - cl_new->s->open( _sd, addr.any ); - // delete cl_new->_proc; // may be new ( cl_new->_proc ) Connect( *cl_new->s ); - cl_new->_proc = new Connect( *cl_new->s ); - if ( cl_new->s->rdbuf()->in_avail() > 0 ) { - // this is the case when user read from sockstream - // in ctor above; push processing of this stream - MT_REENTRANT( _dlock, _1 ); - _conn_pool.push_back( cl_new ); - _pool_cnd.set( true ); - _observer_cnd.set( true ); - _in_buf = true; - } + _M_c.push_back( _Connect() ); + _M_c.back().open( _sd, addr.any ); + cl_new = &_M_c.back(); + if ( cl_new->s.rdbuf()->in_avail() > 0 ) { + // this is the case when user read from sockstream + // in ctor above; push processing of this stream + MT_REENTRANT( _dlock, _1 ); + _conn_pool.push_back( --_M_c.end() ); + _pool_cnd.set( true ); + _observer_cnd.set( true ); + _in_buf = true; } pollfd newfd; @@ -276,6 +257,7 @@ template <class Connect> bool sockmgr_stream_MP<Connect>::accept_udp() { +#if 0 if ( !is_open() ) { return false; } @@ -295,8 +277,8 @@ typename container_type::iterator i = _M_c.begin(); sockbuf *b; while ( i != _M_c.end() ) { - b = (*i)->s->rdbuf(); - if ( (*i)->s->is_open() && b->stype() == sock_base::sock_dgram && + b = (*i).s.rdbuf(); + if ( (*i).s.is_open() && b->stype() == sock_base::sock_dgram && b->port() == addr.inet.sin_port && b->inet_addr() == addr.inet.sin_addr.s_addr ) { _c_lock.unlock(); @@ -305,9 +287,8 @@ ++i; } - cl = new _Connect(); - cl->s = new sockstream(); - _M_c.push_back( cl ); + _M_c.push_back( Connect() ); + cl->s->open( dup( fd() ), addr.any, sock_base::sock_dgram ); cl->_proc = new Connect( *cl->s ); _c_lock.unlock(); @@ -316,6 +297,7 @@ _c_lock.unlock(); cl = 0; } +#endif return true /* cl */; } @@ -361,57 +343,39 @@ } } catch ( ... ) { - me->_c_lock.lock(); - - for ( typename container_type::iterator i = me->_M_c.begin(); i != me->_M_c.end(); ++i ) { - if ( (*i)->s->is_open() ) { // close all not closed yet - (*i)->s->close(); - (*i)->_proc->close(); - } - delete (*i)->s; - (*i)->s = 0; - delete (*i)->_proc; - (*i)->_proc = 0; - } - ::close( me->_cfd ); - ::close( me->_pfd[1].fd ); - me->close(); - me->_c_lock.unlock(); - rtc.iword = -1; - me->_dlock.lock(); me->_follow = false; me->_pool_cnd.set( true, true ); me->_observer_cnd.set( true ); me->_dlock.unlock(); + // me->_c_lock.lock(); + ::close( me->_cfd ); + ::close( me->_pfd[1].fd ); + me->close(); + // me->_c_lock.unlock(); + rtc.iword = -1; + + me->mgr.join(); + return rtc; // throw; } - me->_c_lock.lock(); - - for ( typename container_type::iterator i = me->_M_c.begin(); i != me->_M_c.end(); ++i ) { - if ( (*i)->s->is_open() ) { // close all not closed yet - (*i)->s->close(); - (*i)->_proc->close(); - } - delete (*i)->s; - (*i)->s = 0; - delete (*i)->_proc; - (*i)->_proc = 0; - } - ::close( me->_cfd ); - ::close( me->_pfd[1].fd ); - me->close(); - me->_c_lock.unlock(); - me->_dlock.lock(); me->_follow = false; me->_pool_cnd.set( true, true ); me->_observer_cnd.set( true ); me->_dlock.unlock(); + // me->_c_lock.lock(); + ::close( me->_cfd ); + ::close( me->_pfd[1].fd ); + me->close(); + // me->_c_lock.unlock(); + + me->mgr.join(); + return rtc; } @@ -423,30 +387,27 @@ rtc.iword = 0; try { - sockstream *stream; timespec idle( me->_idle ); int idle_count = 0; me->_dlock.lock(); - _Connect *c = 0; + typename _Sequence::iterator c; + bool _non_empty = false; if ( me->_conn_pool.size() != 0 ) { c = me->_conn_pool.front(); me->_conn_pool.pop_front(); + _non_empty = true; xmt::Thread::gettime( &me->_tpop ); } me->_dlock.unlock(); do { - if ( c != 0 ) { - stream = c->s; - if ( stream->is_open() ) { - c->_proc->connect( *stream ); - if ( c->s == 0 || !stream->good() ) { - if ( c->_proc != 0 ) { - c->_proc->close(); - } - } else if ( stream->is_open() ) { - if ( stream->rdbuf()->in_avail() > 0 ) { + if (_non_empty ) { + sockstream& stream = c->s; + if ( stream.is_open() ) { + c->_proc->connect( stream ); + if ( stream.is_open() && stream.good() ) { + if ( stream.rdbuf()->in_avail() > 0 ) { // socket has buffered data, push it back to queue MT_REENTRANT( me->_dlock, _1 ); me->_conn_pool.push_back( c ); @@ -454,15 +415,15 @@ me->_pool_cnd.set( true ); // xmt::Thread::gettime( &me->_tpush ); } else { // no buffered data, return socket to poll - sock_base::socket_type rfd = stream->rdbuf()->fd(); + sock_base::socket_type rfd = stream.rdbuf()->fd(); ::write( me->_cfd, reinterpret_cast<const char *>(&rfd), sizeof(sock_base::socket_type) ); } - } else if ( c->_proc ) { - c->_proc->close(); } } } + _non_empty = false; + for ( idle_count = 0; idle_count < 2; ++idle_count ) { { MT_REENTRANT( me->_dlock, _1 ); @@ -472,6 +433,7 @@ if ( me->_conn_pool.size() != 0 ) { c = me->_conn_pool.front(); me->_conn_pool.pop_front(); + _non_empty = true; xmt::Thread::gettime( &me->_tpop ); break; } Modified: trunk/complement/explore/include/sockios/sockmgr.h =================================================================== --- trunk/complement/explore/include/sockios/sockmgr.h 2006-12-13 18:25:00 UTC (rev 1446) +++ trunk/complement/explore/include/sockios/sockmgr.h 2006-12-13 18:29:16 UTC (rev 1447) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <06/11/27 17:13:22 ptr> +// -*- C++ -*- Time-stamp: <06/12/13 17:38:44 ptr> /* * Copyright (c) 1997-1999, 2002, 2003, 2005, 2006 @@ -145,8 +145,8 @@ _busylimit.tv_nsec = 90000000; // i.e 0.09 sec _alarm.tv_sec = 0; _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 300; - _idle.tv_nsec = 0; // i.e 5 min + _idle.tv_sec = 10; + _idle.tv_nsec = 0; // i.e 10 sec } explicit sockmgr_stream_MP( const in_addr& addr, int port, sock_base::stype t = sock_base::sock_stream ) : @@ -158,8 +158,8 @@ _busylimit.tv_nsec = 90000000; // i.e 0.09 sec _alarm.tv_sec = 0; _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 300; - _idle.tv_nsec = 0; // i.e 5 min + _idle.tv_sec = 10; + _idle.tv_nsec = 0; // i.e 10 sec } explicit sockmgr_stream_MP( unsigned long addr, int port, sock_base::stype t = sock_base::sock_stream ) : @@ -171,8 +171,8 @@ _busylimit.tv_nsec = 90000000; // i.e 0.09 sec _alarm.tv_sec = 0; _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 300; - _idle.tv_nsec = 0; // i.e 5 min + _idle.tv_sec = 10; + _idle.tv_nsec = 0; // i.e 10 sec } explicit sockmgr_stream_MP( int port, sock_base::stype t = sock_base::sock_stream ) : @@ -184,8 +184,8 @@ _busylimit.tv_nsec = 50000000; // i.e 0.05 sec _alarm.tv_sec = 0; _alarm.tv_nsec = 50000000; // i.e 0.05 sec - _idle.tv_sec = 300; - _idle.tv_nsec = 0; // i.e 5 min + _idle.tv_sec = 10; + _idle.tv_nsec = 0; // i.e 10 sec } ~sockmgr_stream_MP() @@ -218,13 +218,29 @@ protected: - struct _Connect { - sockstream *s; + struct _Connect + { + _Connect() : + _proc( 0 ) + { } + + _Connect( const _Connect& ) : + _proc( 0 ) + { } + + ~_Connect() + { if ( _proc ) { s.close(); _proc->close(); } delete _proc; } + + void open( sock_base::socket_type st, const sockaddr& addr ) + { s.open( st, addr ); _proc = new Connect( s ); } + + sockstream s; Connect *_proc; }; typedef std::vector<pollfd> _fd_sequence; - typedef std::deque<_Connect *> _connect_pool_sequence; + typedef std::list<_Connect> _Sequence; + typedef std::deque<typename _Sequence::iterator> _connect_pool_sequence; void _open( sock_base::stype t = sock_base::sock_stream ); static xmt::Thread::ret_code loop( void * ); @@ -232,10 +248,10 @@ static xmt::Thread::ret_code observer( void * ); struct fd_equal : - public std::binary_function<_Connect *,int,bool> + public std::binary_function<_Connect,int,bool> { - bool operator()(const _Connect *__x, int __y) const - { return __x->s->rdbuf()->fd() == __y; } + bool operator()(const _Connect& __x, int __y) const + { return __x.s.rdbuf()->fd() == __y; } }; struct pfd_equal : @@ -262,7 +278,6 @@ protected: typedef sockmgr_stream_MP<Connect> _Self_type; - typedef std::vector<_Connect *> _Sequence; typedef fd_equal _Compare; typedef typename _Sequence::value_type value_type; typedef typename _Sequence::size_type size_type; @@ -274,7 +289,7 @@ _Sequence _M_c; _Compare _M_comp; pfd_equal _pfdcomp; - xmt::Mutex _c_lock; + // xmt::Mutex _c_lock; _fd_sequence _pfd; int _cfd; // sock_base::socket_type Modified: trunk/complement/explore/lib/sockios/ChangeLog =================================================================== --- trunk/complement/explore/lib/sockios/ChangeLog 2006-12-13 18:25:00 UTC (rev 1446) +++ trunk/complement/explore/lib/sockios/ChangeLog 2006-12-13 18:29:16 UTC (rev 1447) @@ -1,3 +1,9 @@ +2006-12-13 Petr Ovtchenkov <pt...@is...> + + * sockmgr.h, sockmgr.cc: container now single owner + of _Connect objects; call close() and destructor of + Connect instance incapsulated into _Connect; + 2006-11-29 Petr Ovtchenkov <pt...@is...> * sockmgr.cc: don't delete objects via _conn_pool This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |