[complement-svn] SF.net SVN: complement:[1973] trunk/complement/explore/include/sockios
Status: Pre-Alpha
Brought to you by:
complement
|
From: <com...@us...> - 2008-10-13 10:36:27
|
Revision: 1973
http://complement.svn.sourceforge.net/complement/?rev=1973&view=rev
Author: complement
Date: 2008-10-13 10:36:17 +0000 (Mon, 13 Oct 2008)
Log Message:
-----------
sock_processor_base should wait release from sock_mgr
sock_processor_base may be accessed from sock_mgr after death,
so reference counting added; sock_processor_base will pass dtor
after sock_mgr lost reference to it (i.e. all clients of
sock_processor detached and destroyed); the same thoughts for
pass pointer to sock_processor_base via pipe to sock_mgr.
sock_processor may inform sockmgr about termination via two channels:
i) via listener's descriptor (EPOLLRDHUP | EPOLLHUP | EPOLLERR, in
sockmgr<charT,traits,_Alloc>::process_listener);
ii) via command on pipe ('listener_on_exit').
Because i) may never happens (or occur after ii)), removing sock_processor
from sockmgr tables done only when ii) processing, and not done
when processing i).
Before clean issues above, in ~sock_processor_base() condition
wait with timeout and print line if timeout occur (signal about error);
in sock_processor_base::release() checked counter, and if it become
negative (i.e. extra release() call), print stack (this is a error,
because of possible access to destroyed object).
Modified Paths:
--------------
trunk/complement/explore/include/sockios/sockmgr.cc
trunk/complement/explore/include/sockios/sockmgr.h
trunk/complement/explore/include/sockios/socksrv.cc
trunk/complement/explore/include/sockios/socksrv.h
trunk/complement/explore/include/sockios/sockstream
Modified: trunk/complement/explore/include/sockios/sockmgr.cc
===================================================================
--- trunk/complement/explore/include/sockios/sockmgr.cc 2008-10-13 10:35:31 UTC (rev 1972)
+++ trunk/complement/explore/include/sockios/sockmgr.cc 2008-10-13 10:36:17 UTC (rev 1973)
@@ -1,4 +1,4 @@
-// -*- C++ -*- Time-stamp: <08/07/01 14:40:03 yeti>
+// -*- C++ -*- Time-stamp: <08/10/07 01:12:40 ptr>
/*
* Copyright (c) 2008
@@ -48,7 +48,7 @@
} else {
typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd );
if ( ifd == descr.end() ) {
- // std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl;
+ std::cerr << __FILE__ << ":" << __LINE__ << " " << ev[i].data.fd << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_DEL, ev[i].data.fd, 0 ) < 0 ) {
// throw system_error
}
@@ -91,17 +91,23 @@
switch ( _ctl.cmd ) {
case listener:
ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
+#if 1
+ ev_add.data.u64 = 0ULL;
+#endif
ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd();
if ( ev_add.data.fd >= 0 ) {
if ( fcntl( ev_add.data.fd, F_SETFL, fcntl( ev_add.data.fd, F_GETFL ) | O_NONBLOCK ) != 0 ) {
// std::cerr << "xxx " << errno << " " << std::tr2::getpid() << std::endl;
+ static_cast<socks_processor_t*>(_ctl.data.ptr)->release();
throw std::runtime_error( "can't establish nonblock mode on listener" );
}
if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse?
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
// descr.erase( ev_add.data.fd );
// throw system_error
+ static_cast<socks_processor_t*>(_ctl.data.ptr)->release();
return;
}
} else {
@@ -109,6 +115,7 @@
if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
// throw system_error
+ static_cast<socks_processor_t*>(_ctl.data.ptr)->release();
return;
}
}
@@ -117,9 +124,13 @@
break;
case tcp_buffer:
ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
+#if 1
+ ev_add.data.u64 = 0ULL;
+#endif
ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd();
if ( ev_add.data.fd >= 0 ) {
if ( descr.find( ev_add.data.fd ) != descr.end() ) { // reuse?
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev_add.data.fd, &ev_add ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
// descr.erase( ev_add.data.fd );
@@ -129,9 +140,8 @@
} else {
// std::cerr << __FILE__ << ":" << __LINE__ << " " << ev_add.data.fd << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) {
- std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
- // throw system_error
- return;
+ // std::cerr << __FILE__ << ":" << __LINE__ << " " << system_error( posix_error::make_error_code( static_cast<posix_error::posix_errno>(errno) ) ).what() << std::endl;
+ return; // already closed?
}
}
descr[ev_add.data.fd] = fd_info( static_cast<sockbuf_t*>(_ctl.data.ptr) );
@@ -147,6 +157,7 @@
}
// dump_descr();
}
+ static_cast<socks_processor_t*>(_ctl.data.ptr)->release();
break;
case rqstop:
// std::cerr << "Stop request\n";
@@ -162,8 +173,9 @@
if ( ev.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR) ) {
// std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) {
- // throw system_error
- std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl;
+ // std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << system_error( posix_error::make_error_code( static_cast<posix_error::posix_errno>(errno) ) ).what() << std::endl;
+
+ // already closed?
}
if ( ifd->second.p != 0 ) {
@@ -175,6 +187,7 @@
}
}
+#if 0 // do it once, after 'listener_on_exit' command via pipe
listeners_final.insert(static_cast<void *>(ifd->second.p));
// std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
@@ -188,6 +201,7 @@
if ( lfd != -1 ) {
descr.erase( lfd );
}
+#endif
// dump_descr();
@@ -245,7 +259,11 @@
errno = 0;
epoll_event xev;
xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
+#if 1
+ xev.data.u64 = 0ULL;
+#endif
xev.data.fd = ev.data.fd;
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " "
<< errno << std::endl;
@@ -262,6 +280,9 @@
try {
epoll_event ev_add;
ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
+#if 1
+ ev_add.data.u64 = 0ULL;
+#endif
ev_add.data.fd = fd;
if ( descr.find( fd ) != descr.end() ) { // reuse?
@@ -338,6 +359,7 @@
xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
xev.data.fd = ev.data.fd;
info.flags |= fd_info::level_triggered;
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl;
}
@@ -367,7 +389,11 @@
{
epoll_event xev;
xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
+#if 1
+ xev.data.u64 = 0ULL;
+#endif
xev.data.fd = ev.data.fd;
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl;
}
@@ -386,6 +412,7 @@
xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT;
xev.data.fd = ev.data.fd;
info.flags &= ~static_cast<unsigned>(fd_info::level_triggered);
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_MOD, ev.data.fd, &xev ) < 0 ) {
std::cerr << __FILE__ << ":" << __LINE__ << " " << ev.data.fd << " " << errno << std::endl;
}
@@ -411,6 +438,7 @@
// std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << std::endl;
if ( info.p != 0 ) {
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) {
// throw system_error
std::cerr << __FILE__ << ":" << __LINE__ << " " << ifd->first << " " << errno << std::endl;
@@ -471,6 +499,7 @@
{
int myfd = -1;
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( !listeners_final.empty() ) {
// std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
if ( listeners_final.find( static_cast<void*>(p) ) != listeners_final.end() ) {
@@ -494,6 +523,7 @@
// if ( myfd != -1 ) {
// std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
p->stop();
+ p->release(); // socks_processor_t* p not under sockmgr control more
// }
}
}
Modified: trunk/complement/explore/include/sockios/sockmgr.h
===================================================================
--- trunk/complement/explore/include/sockios/sockmgr.h 2008-10-13 10:35:31 UTC (rev 1972)
+++ trunk/complement/explore/include/sockios/sockmgr.h 2008-10-13 10:36:17 UTC (rev 1973)
@@ -1,4 +1,4 @@
-// -*- C++ -*- Time-stamp: <08/09/08 23:18:28 ptr>
+// -*- C++ -*- Time-stamp: <08/10/07 01:11:58 ptr>
/*
* Copyright (c) 2008
@@ -49,6 +49,8 @@
#include <deque>
#include <functional>
+// #include <boost/shared_ptr.hpp>
+
namespace std {
template <class charT, class traits, class _Alloc> class basic_sockbuf;
@@ -128,7 +130,7 @@
{ }
unsigned flags;
- sockbuf_t* b;
+ sockbuf_t* b;
socks_processor_t* p;
};
@@ -148,6 +150,7 @@
sockmgr( int hint = 1024, int ret = 512 ) :
n_ret( ret )
{
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
efd = epoll_create( hint );
if ( efd < 0 ) {
// throw system_error( errno )
@@ -162,10 +165,14 @@
epoll_event ev_add;
ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+#if 1
+ ev_add.data.u64 = 0ULL;
+#endif
ev_add.data.fd = pipefd[0];
epoll_ctl( efd, EPOLL_CTL_ADD, pipefd[0], &ev_add );
_worker = new std::tr2::thread( _loop, this );
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
// ctl _ctl;
// _ctl.cmd = rqstart;
@@ -192,18 +199,22 @@
void push( socks_processor_t& p )
{
+ // std::cerr << __FILE__ << ":" << __LINE__ << " " << p.use_count() << std::endl;
ctl _ctl;
_ctl.cmd = listener;
_ctl.data.ptr = static_cast<void *>(&p);
+ p.addref();
int r = ::write( pipefd[1], &_ctl, sizeof(ctl) );
if ( r < 0 || r != sizeof(ctl) ) {
+ p.release();
throw std::runtime_error( "can't write to pipe" );
}
}
void push( sockbuf_t& s )
{
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
ctl _ctl;
_ctl.cmd = tcp_buffer;
_ctl.data.ptr = static_cast<void *>(&s);
@@ -221,14 +232,17 @@
_ctl.cmd = listener_on_exit;
_ctl.data.ptr = reinterpret_cast<void *>(&p);
+ p.addref();
int r = ::write( pipefd[1], &_ctl, sizeof(ctl) );
if ( r < 0 || r != sizeof(ctl) ) {
+ p.release();
throw std::runtime_error( "can't write to pipe" );
}
}
void exit_notify( sockbuf_t* b, sock_base::socket_type fd )
{
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
try {
std::tr2::unique_lock<std::tr2::mutex> lk( dll, std::tr2::defer_lock );
Modified: trunk/complement/explore/include/sockios/socksrv.cc
===================================================================
--- trunk/complement/explore/include/sockios/socksrv.cc 2008-10-13 10:35:31 UTC (rev 1972)
+++ trunk/complement/explore/include/sockios/socksrv.cc 2008-10-13 10:36:17 UTC (rev 1973)
@@ -1,4 +1,4 @@
-// -*- C++ -*- Time-stamp: <08/07/09 11:09:53 ptr>
+// -*- C++ -*- Time-stamp: <08/10/07 00:00:28 ptr>
/*
* Copyright (c) 2008
@@ -54,6 +54,7 @@
// so don't check return code from listen
::listen( basic_socket_t::_fd, SOMAXCONN );
basic_socket_t::mgr->push( *this );
+ // std::cerr << __FILE__ << ":" << __LINE__ << " " << count() << std::endl;
}
} else if ( prot == sock_base::local ) {
return;
Modified: trunk/complement/explore/include/sockios/socksrv.h
===================================================================
--- trunk/complement/explore/include/sockios/socksrv.h 2008-10-13 10:35:31 UTC (rev 1972)
+++ trunk/complement/explore/include/sockios/socksrv.h 2008-10-13 10:36:17 UTC (rev 1973)
@@ -1,4 +1,4 @@
-// -*- C++ -*- Time-stamp: <08/07/09 01:02:12 ptr>
+// -*- C++ -*- Time-stamp: <08/10/06 23:50:22 ptr>
/*
* Copyright (c) 2008
@@ -37,6 +37,10 @@
#include <functional>
#include <exception>
+// #include <boost/shared_ptr.hpp>
+
+#include <mt/callstack.h>
+
namespace std {
template <class charT, class traits, class _Alloc> class basic_sockbuf;
@@ -59,15 +63,51 @@
sock_processor_base() :
_mode( ios_base::in | ios_base::out ),
- _state( ios_base::goodbit )
+ _state( ios_base::goodbit ),
+ _chk( *this ),
+ _rcount( 0 )
{ }
- explicit sock_processor_base( int port, sock_base::stype t = sock_base::sock_stream )
+ explicit sock_processor_base( int port, sock_base::stype t = sock_base::sock_stream ) :
+ _chk( *this ),
+ _rcount( 0 )
{ sock_processor_base::open( port, t, sock_base::inet ); }
virtual ~sock_processor_base()
- { sock_processor_base::_close(); }
+ {
+ sock_processor_base::_close();
+ std::tr2::unique_lock<std::tr2::mutex> lk(_cnt_lck);
+ // _cnt_cnd.wait( lk, _chk );
+ if ( !_cnt_cnd.timed_wait( lk, std::tr2::seconds(1), _chk ) ) { // <-- debug
+ std::cerr << __FILE__ << ":" << __LINE__ << " " << _rcount << std::endl;
+ }
+ }
+
+ void addref()
+ {
+ std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck);
+ ++_rcount;
+ }
+
+ void release()
+ {
+ std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck);
+ if ( --_rcount == 0 ) {
+ _cnt_cnd.notify_one();
+ }
+ if ( _rcount < 0 ) { // <-- debug
+ xmt::callstack( std::cerr );
+ }
+ }
+
+ int count()
+ {
+ std::tr2::lock_guard<std::tr2::mutex> lk(_cnt_lck);
+ int tmp = _rcount;
+ return tmp;
+ }
+
void open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot );
void open( unsigned long addr, int port, sock_base::stype type, sock_base::protocol prot )
@@ -138,6 +178,28 @@
protected:
std::tr2::mutex _fd_lck;
+
+ class _cnt_checker
+ {
+ public:
+ _cnt_checker( sock_processor_base<charT,traits,_Alloc>& _p ) :
+ p( _p )
+ { }
+
+ private:
+ sock_processor_base<charT,traits,_Alloc>& p;
+
+ public:
+ bool operator ()() const
+ { return p._rcount == 0; }
+ };
+
+ _cnt_checker _chk;
+ std::tr2::mutex _cnt_lck;
+ std::tr2::condition_variable _cnt_cnd;
+ int _rcount;
+
+ friend class _cnt_checker;
};
typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor;
Modified: trunk/complement/explore/include/sockios/sockstream
===================================================================
--- trunk/complement/explore/include/sockios/sockstream 2008-10-13 10:35:31 UTC (rev 1972)
+++ trunk/complement/explore/include/sockios/sockstream 2008-10-13 10:36:17 UTC (rev 1973)
@@ -1,4 +1,4 @@
-// -*- C++ -*- Time-stamp: <08/10/01 00:27:31 ptr>
+// -*- C++ -*- Time-stamp: <08/10/07 00:01:51 ptr>
/*
* Copyright (c) 1997-1999, 2002, 2003, 2005-2008
@@ -306,6 +306,7 @@
if ( direction ) {
std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock );
if ( _count++ == 0 ) {
+ // std::cerr << __FILE__ << ":" << __LINE__ << std::endl;
basic_socket<charT,traits,_Alloc>::mgr = new std::detail::sockmgr<charT,traits,_Alloc>();
#ifdef __FIT_PTHREADS
if ( !_at_fork ) { // call only once
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|