[complement-svn] SF.net SVN: complement: [1902] branches/complement-sockios/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-06-26 05:40:33
|
Revision: 1902 http://complement.svn.sourceforge.net/complement/?rev=1902&view=rev Author: complement Date: 2008-06-25 22:40:30 -0700 (Wed, 25 Jun 2008) Log Message: ----------- Sockets service with central control for incoming data. server's part moved into socksrv.* files. sockmgr (control incoming data for _all_ sockets) delegate sockstream creation to server part (derived from sock_processor_base), but require pointer to to sockbuf from it; dstruction of streams and Connect objects delegated to server part (derived from sock_processor_base); Current unit tests pass. Modified Paths: -------------- branches/complement-sockios/explore/include/sockios/sockstream branches/complement-sockios/explore/include/sockios/sockstream.cc branches/complement-sockios/explore/include/sockios/sp.cc branches/complement-sockios/explore/include/sockios/sp.h branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc Added Paths: ----------- branches/complement-sockios/explore/include/sockios/socksrv.cc branches/complement-sockios/explore/include/sockios/socksrv.h Added: branches/complement-sockios/explore/include/sockios/socksrv.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.cc (rev 0) +++ branches/complement-sockios/explore/include/sockios/socksrv.cc 2008-06-26 05:40:30 UTC (rev 1902) @@ -0,0 +1,316 @@ +// -*- C++ -*- Time-stamp: <08/06/11 21:41:27 yeti> + +/* + * Copyright (c) 2008 + * Petr Ovtchenkov + * + * Licensed under the Academic Free License Version 3.0 + * + */ + +namespace std { + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ) +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + return; + } + _mode = ios_base::in | ios_base::out; + _state = ios_base::goodbit; +#ifdef WIN32 + ::WSASetLastError( 0 ); +#endif + if ( prot == sock_base::inet ) { + basic_socket_t::_fd = socket( PF_INET, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + _state |= ios_base::failbit | ios_base::badbit; + return; + } + // _open = true; + basic_socket_t::_address.inet.sin_family = AF_INET; + basic_socket_t::_address.inet.sin_port = htons( port ); + basic_socket_t::_address.inet.sin_addr.s_addr = addr.s_addr; + + if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { + // let's try reuse local address + setoptions_unsafe( sock_base::so_reuseaddr, true ); + } + + if ( ::bind( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof(basic_socket_t::_address) ) == -1 ) { + _state |= ios_base::failbit; +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return; + } + + if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { + // I am shure, this is socket of type SOCK_STREAM | SOCK_SEQPACKET, + // so don't check return code from listen + ::listen( basic_socket_t::_fd, SOMAXCONN ); + basic_socket_t::mgr->push( *this ); + } + } else if ( prot == sock_base::local ) { + return; + } else { + return; + } + _state = ios_base::goodbit; + + return; +} + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::close() +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( !basic_socket_t::is_open_unsafe() ) { + return; + } + basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::shutdown( basic_socket_t::_fd, 2 ); + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; +} + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base::shutdownflg dir ) +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + if ( (dir & (sock_base::stop_in | sock_base::stop_out)) == + (sock_base::stop_in | sock_base::stop_out) ) { + ::shutdown( basic_socket_t::_fd, 2 ); + } else if ( dir & sock_base::stop_in ) { + ::shutdown( basic_socket_t::_fd, 0 ); + } else if ( dir & sock_base::stop_out ) { + ::shutdown( basic_socket_t::_fd, 1 ); + } + } +} + +template<class charT, class traits, class _Alloc> +void sock_processor_base<charT,traits,_Alloc>::setoptions_unsafe( sock_base::so_t optname, bool on_off, int __v ) +{ +#ifdef __unix + if ( basic_socket_t::is_open_unsafe() ) { + if ( optname != sock_base::so_linger ) { + int turn = on_off ? 1 : 0; + if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&turn, + (socklen_t)sizeof(int) ) != 0 ) { + _state |= ios_base::failbit; + } + } else { + linger l; + l.l_onoff = on_off ? 1 : 0; + l.l_linger = __v; + if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&l, + (socklen_t)sizeof(linger) ) != 0 ) { + _state |= ios_base::failbit; + } + + } + } else { + _state |= ios_base::failbit; + } +#endif // __unix +} + + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +int connect_processor<Connect, charT, traits, _Alloc, C>::Init::_count = 0; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +bool connect_processor<Connect, charT, traits, _Alloc, C>::Init::_at_fork = false; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +std::tr2::mutex connect_processor<Connect, charT, traits, _Alloc, C>::Init::_init_lock; + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::_guard( int direction ) +{ + if ( direction ) { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( _count++ == 0 ) { +#ifdef __FIT_PTHREADS + if ( !_at_fork ) { // call only once + if ( pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ) ) { + // throw system_error + } + _at_fork = true; + } +#endif +// _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); + } + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( --_count == 0 ) { + + } + } +} + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_prepare() +{ _init_lock.lock(); } + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_child() +{ + _init_lock.unlock(); + + if ( _count != 0 ) { + // std::cerr << "SHOULD NEVER HAPPEN!!!!\n"; + throw std::logic_error( "Fork while connect_processor working may has unexpected behaviour in child process" ); + } + // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); +} + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_parent() +{ _init_lock.unlock(); } + +template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +char connect_processor<Connect, charT, traits, _Alloc, C>::Init_buf[128]; + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::close() +{ + base_t::close(); + + { + std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); + _in_work = false; // <--- set before cnd.notify_one(); (below in this func) + } + + std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); + ready_pool.push_back( processor() ); // make ready_pool not empty + // std::cerr << "=== " << ready_pool.size() << std::endl; + cnd.notify_one(); +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::sockbuf_t* connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( sock_base::socket_type fd, const sockaddr& addr ) +{ + typename base_t::sockstream_t* s = base_t::create_stream( fd, addr ); + + Connect* c = new Connect( *s ); // bad point! I can't read from s in ctor indeed! + + if ( s->rdbuf()->in_avail() ) { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( processor( c, s ) ); + cnd.notify_one(); + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + worker_pool.insert( std::make_pair( fd, processor( c, s ) ) ); + } + + return s->rdbuf(); +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( sock_base::socket_type fd, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_close_t& ) +{ + { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + typename worker_pool_t::iterator i = worker_pool.find( fd ); + if ( i != worker_pool.end() ) { + delete i->second.c; + delete i->second.s; + // std::cerr << "oops\n"; + worker_pool.erase( i ); + return; + } + } + + processor p; + { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + typename ready_pool_t::iterator j = std::find( ready_pool.begin(), ready_pool.end(), /* std::bind2nd( typename processor::equal_to(), &s ) */ fd ); + if ( j != ready_pool.end() ) { + // std::cerr << "oops 2\n"; + p = *j; + ready_pool.erase( j ); + } + } + if ( p.c != 0 ) { + // (*p.c)( *p.s ); + delete p.c; + delete p.s; + } +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( sock_base::socket_type fd ) +{ + processor p; + + { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + typename worker_pool_t::const_iterator i = worker_pool.find( fd ); + if ( i == worker_pool.end() ) { + return; + } + p = i->second; + worker_pool.erase( i ); + } + + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( p ); + cnd.notify_one(); + // std::cerr << "notify data " << (void *)c << " " << ready_pool.size() << std::endl; +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +bool connect_processor<Connect, charT, traits, _Alloc, C>::pop_ready( processor& p ) +{ + { + std::tr2::unique_lock<std::tr2::mutex> lk( rdlock ); + + cnd.wait( lk, not_empty ); + p = ready_pool.front(); // it may contain p.c == 0, p.s == 0, if !in_work() + ready_pool.pop_front(); + // std::cerr << "pop 1\n"; + if ( p.c == 0 ) { // wake up, but _in_work may be still true here (in processor pipe?), + return false; // even I know that _in_work <- false before notification... + } // so, check twice + } + + // std::cerr << "pop 2\n"; + + std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); + return _in_work ? true : false; +} + +template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> +void connect_processor<Connect, charT, traits, _Alloc, C>::worker() +{ + _in_work = true; + + processor p; + + while ( pop_ready( p ) ) { + // std::cerr << "worker 1\n"; + (p.c->*C)( *p.s ); + // std::cerr << "worker 2\n"; + if ( p.s->rdbuf()->in_avail() ) { + std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); + ready_pool.push_back( p ); + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + worker_pool[p.s->rdbuf()->fd()] = p; + } + // std::cerr << "worker 3\n"; + } +} + +} // namespace std Added: branches/complement-sockios/explore/include/sockios/socksrv.h =================================================================== --- branches/complement-sockios/explore/include/sockios/socksrv.h (rev 0) +++ branches/complement-sockios/explore/include/sockios/socksrv.h 2008-06-26 05:40:30 UTC (rev 1902) @@ -0,0 +1,317 @@ +// -*- C++ -*- Time-stamp: <08/06/11 21:40:56 yeti> + +/* + * Copyright (c) 2008 + * Petr Ovtchenkov + * + * Licensed under the Academic Free License Version 3.0 + * + */ + +#ifndef __SOCKIOS_SOCKSRV_H +#define __SOCKIOS_SOCKSRV_H + +#include <cerrno> +#include <mt/thread> +#include <mt/mutex> +#include <mt/condition_variable> + +#ifdef STLPORT +# include <unordered_map> +# include <unordered_set> +# define __USE_STLPORT_TR1 +#else +# if defined(__GNUC__) && (__GNUC__ < 4) +# include <ext/hash_map> +# include <ext/hash_set> +# define __USE_STD_HASH +# else +# include <tr1/unordered_map> +# include <tr1/unordered_set> +# define __USE_STD_TR1 +# endif +#endif + +#include <sockios/sockstream> +#include <deque> +#include <functional> + +namespace std { + +template <class charT, class traits, class _Alloc> class basic_sockbuf; +template <class charT, class traits, class _Alloc> class basic_sockstream; +template <class charT, class traits, class _Alloc> class sock_processor_base; + +template <class charT, class traits, class _Alloc> +class sock_processor_base : + public sock_base, + public basic_socket<charT,traits,_Alloc> +{ + private: + typedef basic_socket<charT,traits,_Alloc> basic_socket_t; + + public: + typedef basic_sockstream<charT,traits,_Alloc> sockstream_t; + typedef basic_sockbuf<charT,traits,_Alloc> sockbuf_t; + + struct adopt_close_t { }; + + sock_processor_base() : + _mode( ios_base::in | ios_base::out ), + _state( ios_base::goodbit ) + { } + + explicit sock_processor_base( int port, sock_base::stype t = sock_base::sock_stream ) + { sock_processor_base::open( port, t, sock_base::inet ); } + + virtual ~sock_processor_base() + { + sock_processor_base::close(); + + std::cerr << __FILE__ << ":" << __LINE__ << " " << (void*)this << " " << std::tr2::getpid() << std::endl; + // Never uncomment next line: + // basic_socket<charT,traits,_Alloc>::mgr->final( *this ); + // this lead to virtual fuction call, that is already pure here. + } + + void open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ); + + void open( unsigned long addr, int port, sock_base::stype type, sock_base::protocol prot ) + { + in_addr _addr; + _addr.s_addr = htonl( addr ); + sock_processor_base::open( _addr, port, type, prot ); + } + + void open( int port, sock_base::stype type, sock_base::protocol prot ) + { sock_processor_base::open(INADDR_ANY, port, type, prot); } + + virtual void close(); + + virtual sockbuf_t* operator ()( sock_base::socket_type fd, const sockaddr& ) = 0; + virtual void operator ()( sock_base::socket_type fd, const adopt_close_t& ) = 0; + virtual void operator ()( sock_base::socket_type fd ) = 0; + + private: + sock_processor_base( const sock_processor_base& ); + sock_processor_base& operator =( const sock_processor_base& ); + + protected: + void setoptions_unsafe( sock_base::so_t optname, bool on_off = true, int __v = 0 ); + sockstream_t* create_stream( int fd, const sockaddr& addr ) + { + sockstream_t* s = new sockstream_t(); + s->rdbuf()->_open_sockmgr( fd, addr ); + return s; + } + + public: + bool is_open() const + { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); return basic_socket_t::is_open_unsafe(); } + bool good() const + { return _state == ios_base::goodbit; } + + sock_base::socket_type fd() const + { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); sock_base::socket_type tmp = basic_socket_t::fd_unsafe(); return tmp; } + + void shutdown( sock_base::shutdownflg dir ); + void setoptions( sock_base::so_t optname, bool on_off = true, int __v = 0 ) + { + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + setoptions_unsafe( optname, on_off, __v ); + } + + private: + unsigned long _mode; // open mode + unsigned long _state; // state flags + + protected: + std::tr2::mutex _fd_lck; +}; + +typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor; + +template <class Connect, class charT = char, class traits = std::char_traits<charT>, class _Alloc = std::allocator<charT>, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& ) = &Connect::connect > +class connect_processor : + public sock_processor_base<charT,traits,_Alloc> +{ + private: + typedef sock_processor_base<charT,traits,_Alloc> base_t; + + class Init + { + public: + Init() + { _guard( 1 ); } + ~Init() + { _guard( 0 ); } + + private: + static void _guard( int direction ); + static void __at_fork_prepare(); + static void __at_fork_child(); + static void __at_fork_parent(); + static std::tr2::mutex _init_lock; + static int _count; + static bool _at_fork; + }; + + static char Init_buf[]; + + public: + connect_processor() : + not_empty( *this ), + _in_work( false ), + ploop( loop, this ) + { new( Init_buf ) Init(); } + + explicit connect_processor( int port ) : + base_t( port, sock_base::sock_stream ), + not_empty( *this ), + _in_work( false ), + ploop( loop, this ) + { new( Init_buf ) Init(); } + + virtual ~connect_processor() + { + connect_processor::close(); + if ( ploop.joinable() ) { + ploop.join(); + } + // { + // std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); + for ( typename worker_pool_t::iterator i = worker_pool.begin(); i != worker_pool.end(); ++i ) { + // delete i->second; + delete i->second.s; + delete i->second.c; + } + worker_pool.clear(); + // } + for ( typename ready_pool_t::iterator j = ready_pool.begin(); j != ready_pool.end(); ++j ) { + delete j->c; + } + ready_pool.clear(); + + std::cerr << __FILE__ << ":" << __LINE__ << " " << (void*)this << " " << std::tr2::getpid() << std::endl; + basic_socket<charT,traits,_Alloc>::mgr->final( *this ); + + ((Init *)Init_buf)->~Init(); + } + + virtual void close(); + + void wait() + { if ( ploop.joinable() ) { ploop.join(); } } + + private: + virtual typename base_t::sockbuf_t* operator ()( sock_base::socket_type fd, const sockaddr& ); + virtual void operator ()( sock_base::socket_type fd, const typename base_t::adopt_close_t& ); + virtual void operator ()( sock_base::socket_type fd ); + + static void loop( connect_processor* me ) + { me->worker(); } + + void worker(); + + private: + connect_processor( const connect_processor& ) + { } + + connect_processor& operator =( const connect_processor& ) + { return *this; } + + + struct processor + { + processor() : + c(0), + s(0) + { } + processor( Connect* __c, typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* __s ) : + c(__c), + s(__s) + { } + processor( const processor& p ) : + c( p.c ), + s( p.s ) + { } + + processor& operator =( const processor& p ) + { c = p.c; s = p.s; return *this; } + + Connect* c; + typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* s; + + bool operator ==( const processor& p ) const + { return s == p.s; } + bool operator ==( const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* st ) const + { return const_cast<const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t*>(s) == st; } + bool operator ==( sock_base::socket_type fd ) const + { return s == 0 ? (fd == -1) : (s->rdbuf()->fd() == fd); } + +/* + struct equal_to : + public std::binary_function<processor, typename sock_processor_base<charT,traits,_Alloc>::sockstream_t*, bool> + { + bool operator()(const processor& __x, const typename sock_processor_base<charT,traits,_Alloc>::sockstream_t* __y) const + { return __x == __y; } + }; +*/ + }; + + bool pop_ready( processor& ); + +#ifdef __USE_STLPORT_HASH + typedef std::hash_map<sock_base::socket_type, processor> worker_pool_t; +#endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_map<sock_base::socket_type, processor> worker_pool_t; +#endif +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) + typedef std::tr1::unordered_map<sock_base::socket_type, processor> worker_pool_t; +#endif + typedef std::deque<processor> ready_pool_t; + + struct _not_empty + { + _not_empty( connect_processor& p ) : + me( p ) + { } + + bool operator()() const + { return !me.ready_pool.empty(); } + + connect_processor& me; + } not_empty; + + worker_pool_t worker_pool; + ready_pool_t ready_pool; + bool _in_work; + std::tr2::mutex wklock; + std::tr2::mutex rdlock; + std::tr2::condition_variable cnd; + std::tr2::mutex inwlock; + std::tr2::condition_variable cnd_inwk; + std::tr2::thread ploop; + + friend struct _not_empty; +}; + +} // namesapce std + +#ifdef __USE_STLPORT_HASH +# undef __USE_STLPORT_HASH +#endif +#ifdef __USE_STD_HASH +# undef __USE_STD_HASH +#endif +#ifdef __USE_STLPORT_TR1 +# undef __USE_STLPORT_TR1 +#endif +#ifdef __USE_STD_TR1 +# undef __USE_STD_TR1 +#endif + +#include <sockios/socksrv.cc> + +#endif /* __SOCKIOS_SOCKSRV_H */ Modified: branches/complement-sockios/explore/include/sockios/sockstream =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream 2008-06-16 14:59:26 UTC (rev 1901) +++ branches/complement-sockios/explore/include/sockios/sockstream 2008-06-26 05:40:30 UTC (rev 1902) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/09 22:22:05 yeti> +// -*- C++ -*- Time-stamp: <08/06/11 14:45:16 yeti> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -252,7 +252,8 @@ void wrtimeout() // infinite { _use_wrtimeout = false; } - sock_base::socket_type fd() const { return _fd;} + sock_base::socket_type fd() const + { return _fd; } bool is_open() const { return _fd != -1; } @@ -373,8 +374,8 @@ template<class charT, class traits, class _Alloc> class basic_sockbuf : - public basic_streambuf<charT, traits> - // public basic_socket<charT,traits,_Alloc> + public basic_streambuf<charT, traits>, + public basic_socket<charT,traits,_Alloc> { private: typedef basic_socket<charT,traits,_Alloc> basic_socket_t; @@ -383,7 +384,6 @@ typedef basic_ios<charT, traits> ios_type; typedef basic_sockbuf<charT, traits, _Alloc> sockbuf_type; typedef typename traits::state_type state_t; - typedef typename basic_socket_t::family_type family_type; public: /* Inherited from basic_streambuf : */ @@ -395,23 +395,62 @@ /* */ basic_sockbuf() : - impl( 0 ) + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) // hmm, 3.3.6 + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) { } basic_sockbuf( const char *hostname, int port, sock_base::stype type = sock_base::sock_stream, sock_base::protocol prot = sock_base::inet ) : - impl( 0 ) + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) { open( hostname, port, type, prot ); } basic_sockbuf( const in_addr& addr, int port, sock_base::stype type = sock_base::sock_stream, sock_base::protocol prot = sock_base::inet ) : - impl( 0 ) + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) { open( addr, type, prot ); } virtual ~basic_sockbuf() - { close(); } + { + close(); + _M_deallocate_block(); + } sockbuf_type *open( const char *hostname, int port, sock_base::stype type = sock_base::sock_stream, @@ -419,15 +458,13 @@ sockbuf_type *open( const in_addr& addr, int port, sock_base::stype type = sock_base::sock_stream, - sock_base::protocol prot = sock_base::inet ) - { return impl->open( addr, port, type, prot ) ? this : 0; } + sock_base::protocol prot = sock_base::inet ); sockbuf_type *open( sock_base::socket_type s, sock_base::stype t = sock_base::sock_stream ); sockbuf_type *open( sock_base::socket_type s, const sockaddr& addr, - sock_base::stype t = sock_base::sock_stream ) - { return impl->open( s, addr, t ) ? this : 0; } + sock_base::stype t = sock_base::sock_stream ); sockbuf_type *attach( sock_base::socket_type s, sock_base::stype t = sock_base::sock_stream ); @@ -435,65 +472,156 @@ sockbuf_type *attach( sock_base::socket_type s, const sockaddr& addr, sock_base::stype t = sock_base::sock_stream ); - sockbuf_type *close() - { return impl->close() ? this : 0; } + sockbuf_type *close(); + void shutdown( sock_base::shutdownflg dir ); - void shutdown( sock_base::shutdownflg dir ) - { impl->shutdown( dir ); } - sock_base::stype stype() const - { return impl->stype(); } + { return _type; } - template <class Duration> - void rdtimeout( const Duration& d ) - { impl->rdtimeout( d ); } - void rdtimeout() // infinite - { impl->rdtimeout(); } + protected: + virtual streamsize showmanyc() + { return this->egptr() - this->gptr(); } - template <class Duration> - void wrtimeout( const Duration& d ) - { impl->wrtimeout( d ); } - void wrtimeout() // infinite - { impl->wrtimeout(); } + virtual int_type underflow(); + virtual int_type overflow( int_type c = traits::eof() ); + virtual int_type pbackfail( int_type c = traits::eof() ) + { + if ( !basic_socket_t::is_open() ) + return traits::eof(); - sock_base::socket_type fd() const { return impl->fd();} - bool is_open() const - { return impl->is_open(); } + if ( this->gptr() <= this->eback() ) { + return traits::eof(); + } - family_type family() const - { return impl->family(); } + this->gbump(-1); + if ( !traits::eq_int_type(c,traits::eof()) ) { + *this->gptr() = traits::to_char_type(c); + return c; + } - int port() const - { return impl->port(); } + return traits::not_eof(c); + } - unsigned long inet_addr() const - { return impl->inet_addr(); } + // Buffer managment and positioning: + virtual basic_streambuf<charT, traits> *setbuf(char_type *s, streamsize n ) + { + if ( s != 0 && n != 0 ) { + _M_deallocate_block(); + _allocated = false; + _bbuf = s; + _ebuf = s + n; + } + return this; + } - const sockaddr_in& inet_sockaddr() const throw( std::domain_error ) - { return impl->inet_sockaddr(); } + virtual int sync(); + virtual streamsize xsputn(const char_type *s, streamsize n); + private: // Helper functions + charT* _bbuf; + charT* _ebuf; + bool _allocated; // true, if _bbuf should be deallocated - protected: - virtual streamsize showmanyc() - { return impl->showmanyc(); } + // Precondition: 0 < __n <= max_size(). + charT* _M_allocate( size_t __n ) { return _M_data_allocator.allocate(__n); } + void _M_deallocate( charT* __p, size_t __n ) + { if (__p) _M_data_allocator.deallocate(__p, __n); } - virtual int_type underflow() - { return impl->underflow(); } - virtual int_type overflow( int_type c = traits::eof() ) - { return impl->overflow(c); } - virtual int_type pbackfail( int_type c = traits::eof() ) - { return impl->pbackfail( c ); } - // Buffer managment and positioning: - virtual basic_streambuf<charT, traits> *setbuf(char_type *s, streamsize n ) - { impl->setbuf( s, n ); return this; } + void _M_allocate_block(size_t __n) + { + if ( _allocated ) { + if ( __n <= max_size() ) { + _bbuf = _M_allocate(__n); + _ebuf = _bbuf + __n; + // _STLP_ASSERT( __n > 0 ? _bbuf != 0 : _bbuf == 0 ); + } else + this->_M_throw_length_error(); + } + } - virtual int sync() - { return impl->sync(); } - virtual streamsize xsputn(const char_type *s, streamsize n) - { return impl->xsputn( s, n ); } + void _M_deallocate_block() + { if ( _allocated ) _M_deallocate(_bbuf, _ebuf - _bbuf); } + + size_t max_size() const { return (size_t(-1) / sizeof(charT)) - 1; } +#ifdef STLPORT + void _M_throw_length_error() const + { _STLP_THROW(length_error("basic_sockbuf")); } +#else + void _M_throw_length_error() const + { throw length_error("basic_sockbuf"); } +#endif + +#ifdef STLPORT + typedef typename _Alloc_traits<charT, _Alloc>::allocator_type allocator_type; +#else + typedef _Alloc allocator_type; +#endif + /* typedef __allocator<charT, _Alloc> _Alloc_type; */ + + /* _Alloc_type */ allocator_type _M_data_allocator; + + class rdready_t + { + public: + rdready_t( sockbuf_type& self ) : + b( self ) + { } + bool operator ()() const + { return b.showmanyc() != 0; } + private: + sockbuf_type& b; + } rdready; + + sockbuf_type *_open_sockmgr( sock_base::socket_type s, const sockaddr& addr, + sock_base::stype t = sock_base::sock_stream ); + private: - detail::basic_sockbuf_aux<charT,traits,_Alloc> *impl; + typedef basic_sockbuf<charT,traits,_Alloc> _Self_type; + int (basic_sockbuf<charT,traits,_Alloc>::*_xwrite)( const void *, size_t ); + int (basic_sockbuf<charT,traits,_Alloc>::*_xread)( void *, size_t ); + int write( const void *buf, size_t n ) +#ifndef WIN32 + { return ::write( basic_socket_t::_fd, buf, n ); } +#else + { return ::send( basic_socket_t::_fd, (const char *)buf, n, 0 ); } +#endif + int send( const void *buf, size_t n ) +#ifdef WIN32 + { return ::send( basic_socket_t::_fd, (const char *)buf, n, 0 ); } +#else + { return ::send( basic_socket_t::_fd, buf, n, 0 ); } +#endif + int sendto( const void *buf, size_t n ) +#ifdef WIN32 + { return ::sendto( basic_socket_t::_fd, (const char *)buf, n, 0, &basic_socket_t::_address.any, sizeof( sockaddr_in ) ); } +#else + { return ::sendto( basic_socket_t::_fd, buf, n, 0, &basic_socket_t::_address.any, sizeof( sockaddr_in ) ); } +#endif + + int read( void *buf, size_t n ) +#ifdef WIN32 + { return ::recv( basic_socket_t::_fd, (char *)buf, n, 0 ); } +#else + { return ::read( basic_socket_t::_fd, buf, n ); } +#endif + int recv( void *buf, size_t n ) +#ifdef WIN32 + { return ::recv( basic_socket_t::_fd, (char *)buf, n, 0 ); } +#else + { return ::recv( basic_socket_t::_fd, buf, n, 0 ); } +#endif + int recvfrom( void *buf, size_t n ); + void __hostname(); + + ios_base::openmode _mode; + sock_base::stype _type; + + std::tr2::mutex ulck; + std::tr2::condition_variable ucnd; + + friend class detail::sockmgr<charT,traits,_Alloc>; + friend class sock_processor_base<charT,traits,_Alloc>; }; template <class charT, class traits, class _Alloc> Modified: branches/complement-sockios/explore/include/sockios/sockstream.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream.cc 2008-06-16 14:59:26 UTC (rev 1901) +++ branches/complement-sockios/explore/include/sockios/sockstream.cc 2008-06-26 05:40:30 UTC (rev 1902) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/09 21:59:13 yeti> +// -*- C++ -*- Time-stamp: <08/06/11 14:47:58 yeti> /* * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 @@ -32,7 +32,111 @@ sock_base::protocol prot ) { return basic_sockbuf<charT, traits, _Alloc>::open( std::findhost( name ), port, type, prot ); } +template<class charT, class traits, class _Alloc> +basic_sockbuf<charT, traits, _Alloc> * +basic_sockbuf<charT, traits, _Alloc>::open( const in_addr& addr, int port, + sock_base::stype type, + sock_base::protocol prot ) +{ + if ( basic_socket_t::is_open() ) { + return 0; + } + try { + _mode = ios_base::in | ios_base::out; + _type = type; +#ifdef WIN32 + WSASetLastError( 0 ); +#endif + if ( prot == sock_base::inet ) { + basic_socket_t::_fd = socket( PF_INET, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + throw std::runtime_error( "can't open socket" ); + } + basic_socket_t::_address.inet.sin_family = AF_INET; + // htons is a define at least in Linux 2.2.5-15, and it's expantion fail + // for gcc 2.95.3 +#if defined(linux) && defined(htons) && defined(__bswap_16) + basic_socket_t::_address.inet.sin_port = ((((port) >> 8) & 0xff) | (((port) & 0xff) << 8)); +#else + basic_socket_t::_address.inet.sin_port = htons( port ); +#endif // linux && htons + basic_socket_t::_address.inet.sin_addr = addr; + + // Generally, stream sockets may successfully connect() only once + if ( connect( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof( basic_socket_t::_address ) ) == -1 ) { + throw std::domain_error( "connect fail" ); + } + if ( type == sock_base::sock_stream ) { + _xwrite = &_Self_type::write; + _xread = &_Self_type::read; + } else if ( type == sock_base::sock_dgram ) { + _xwrite = &_Self_type::send; + _xread = &_Self_type::recv; + } + } else if ( prot == sock_base::local ) { + basic_socket_t::_fd = socket( PF_UNIX, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + throw std::runtime_error( "can't open socket" ); + } + } else { // other protocols not implemented yet + throw std::invalid_argument( "protocol not implemented" ); + } + if ( _bbuf == 0 ) { + struct ifconf ifc; + struct ifreq ifr; + ifc.ifc_len = sizeof(ifreq); + ifc.ifc_req = 𝔦 + int mtu = ((ioctl(basic_socket_t::_fd, SIOCGIFMTU, &ifc) < 0 ? 1500 : ifr.ifr_mtu) - 20 - (type == sock_base::sock_stream ? 20 : 8 )) / sizeof(charT); + int qlen = ioctl(basic_socket_t::_fd, SIOCGIFTXQLEN, &ifc) < 0 ? 2 : ifr.ifr_qlen; + _M_allocate_block( type == sock_base::sock_stream ? mtu * qlen * 2 : mtu * 2 ); + } + + if ( _bbuf == 0 ) { + throw std::length_error( "can't allocate block" ); + } + + if ( fcntl( basic_socket_t::_fd, F_SETFL, fcntl( basic_socket_t::_fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + basic_socket_t::_notify_close = true; + basic_socket_t::mgr->push( *this ); + } + catch ( std::domain_error& ) { +#ifdef WIN32 + // _errno = WSAGetLastError(); + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + catch ( std::length_error& ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + catch ( std::runtime_error& ) { +#ifdef WIN32 + // _errno = WSAGetLastError(); +#else +#endif + return 0; + } + catch ( std::invalid_argument& ) { + return 0; + } + + return this; +} + template<class charT, class traits, class _Alloc> basic_sockbuf<charT, traits, _Alloc> * basic_sockbuf<charT, traits, _Alloc>::open( sock_base::socket_type s, sock_base::stype t ) @@ -50,6 +154,20 @@ template<class charT, class traits, class _Alloc> basic_sockbuf<charT, traits, _Alloc> * +basic_sockbuf<charT, traits, _Alloc>::open( sock_base::socket_type s, + const sockaddr& addr, + sock_base::stype t ) +{ + basic_sockbuf<charT, traits, _Alloc>* ret = _open_sockmgr( s, addr, t ); + if ( ret != 0 ) { + basic_socket_t::_notify_close = true; + basic_socket_t::mgr->push( *this ); + } + return ret; +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf<charT, traits, _Alloc> * basic_sockbuf<charT, traits, _Alloc>::attach( sock_base::socket_type s, sock_base::stype t ) { @@ -79,6 +197,316 @@ } template<class charT, class traits, class _Alloc> +basic_sockbuf<charT, traits, _Alloc> * +basic_sockbuf<charT, traits, _Alloc>::_open_sockmgr( sock_base::socket_type s, + const sockaddr& addr, + sock_base::stype t ) +{ + if ( basic_socket_t::is_open() || s == -1 ) { + return 0; + } + basic_socket_t::_fd = s; + memcpy( (void *)&basic_socket_t::_address.any, (const void *)&addr, sizeof(sockaddr) ); + _mode = ios_base::in | ios_base::out; + _type = t; +#ifdef WIN32 + WSASetLastError( 0 ); +#endif + if ( t == sock_base::sock_stream ) { + _xwrite = &_Self_type::write; + _xread = &_Self_type::read; + } else if ( t == sock_base::sock_dgram ) { + _xwrite = &_Self_type::sendto; + _xread = &_Self_type::recvfrom; + } else { + basic_socket_t::_fd = -1; + return 0; // unsupported type + } + + if ( _bbuf == 0 ) { + struct ifconf ifc; + struct ifreq ifr; + ifc.ifc_len = sizeof(ifreq); + ifc.ifc_req = 𝔦 + int mtu = ((ioctl(basic_socket_t::_fd, SIOCGIFMTU, &ifc) < 0 ? 1500 : ifr.ifr_mtu) - 20 - (t == sock_base::sock_stream ? 20 : 8 )) / sizeof(charT); + int qlen = ioctl(basic_socket_t::_fd, SIOCGIFTXQLEN, &ifc) < 0 ? 2 : ifr.ifr_qlen; + _M_allocate_block( t == sock_base::sock_stream ? mtu * qlen * 2 : mtu * 2); + } + + if ( _bbuf == 0 ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + + if ( fcntl( basic_socket_t::_fd, F_SETFL, fcntl( basic_socket_t::_fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + + return this; +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf<charT, traits, _Alloc> * +basic_sockbuf<charT, traits, _Alloc>::close() +{ + if ( !basic_socket_t::is_open() ) + return 0; + + // if ( _doclose ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + // } + + // _STLP_ASSERT( _bbuf != 0 ); + // put area before get area + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + + if ( basic_socket_t::_notify_close ) { + basic_socket_t::mgr->exit_notify( this, basic_socket_t::_fd ); + basic_socket_t::_notify_close = false; + } + + basic_socket_t::_fd = -1; + + return this; +} + +template<class charT, class traits, class _Alloc> +void basic_sockbuf<charT, traits, _Alloc>::shutdown( sock_base::shutdownflg dir ) +{ + if ( basic_socket_t::is_open_unsafe() ) { + if ( (dir & (sock_base::stop_in | sock_base::stop_out)) == + (sock_base::stop_in | sock_base::stop_out) ) { + ::shutdown( basic_socket_t::_fd, 2 ); + } else if ( dir & sock_base::stop_in ) { + ::shutdown( basic_socket_t::_fd, 0 ); + } else if ( dir & sock_base::stop_out ) { + ::shutdown( basic_socket_t::_fd, 1 ); + } + } +} + +template<class charT, class traits, class _Alloc> +__FIT_TYPENAME basic_sockbuf<charT, traits, _Alloc>::int_type +basic_sockbuf<charT, traits, _Alloc>::underflow() +{ + if( !basic_socket_t::is_open() ) + return traits::eof(); + + std::tr2::unique_lock<std::tr2::mutex> lk( ulck ); + + if ( this->gptr() < this->egptr() ) + return traits::to_int_type(*this->gptr()); + + if ( this->egptr() == this->gptr() ) { // fullfilled: _ebuf == gptr() + setg( this->eback(), this->eback(), this->eback() ); + } + + // setg( this->eback(), this->eback(), this->eback() + offset ); + // wait on condition + if ( basic_socket_t::_use_rdtimeout ) { + ucnd.timed_wait( lk, basic_socket_t::_rdtimeout, rdready ); + } else { + ucnd.wait( lk, rdready ); + } + + return traits::to_int_type(*this->gptr()); +} + +template<class charT, class traits, class _Alloc> +__FIT_TYPENAME basic_sockbuf<charT, traits, _Alloc>::int_type +basic_sockbuf<charT, traits, _Alloc>::overflow( int_type c ) +{ + if ( !basic_socket_t::is_open() ) + return traits::eof(); + + if ( !traits::eq_int_type( c, traits::eof() ) && this->pptr() < this->epptr() ) { + sputc( traits::to_char_type(c) ); + return c; + } + + long count = this->pptr() - this->pbase(); + + if ( count ) { + count *= sizeof(charT); + long offset = (this->*_xwrite)( this->pbase(), count ); + if ( offset < 0 ) { + if ( errno == EAGAIN ) { + pollfd wpfd; + wpfd.fd = basic_socket_t::_fd; + wpfd.events = POLLOUT | POLLHUP | POLLWRNORM; + wpfd.revents = 0; + while ( poll( &wpfd, 1, basic_socket_t::_use_wrtimeout ? basic_socket_t::_wrtimeout.count() : -1 ) <= 0 ) { // wait infinite + if ( errno == EINTR ) { // may be interrupted, check and ignore + errno = 0; + continue; + } + return traits::eof(); + } + if ( (wpfd.revents & POLLERR) != 0 ) { + return traits::eof(); + } + offset = (this->*_xwrite)( this->pbase(), count ); + if ( offset < 0 ) { + return traits::eof(); + } + } else { + return traits::eof(); + } + } + if ( offset < count ) { + // MUST BE: (offset % sizeof(char_traits)) == 0 ! + offset /= sizeof(charT); + count /= sizeof(charT); + traits::move( this->pbase(), this->pbase() + offset, count - offset ); + // std::copy_backword( this->pbase() + offset, this->pbase() + count, this->pbase() ); + setp( this->pbase(), this->epptr() ); // require: set pptr + this->pbump( count - offset ); + if( !traits::eq_int_type(c,traits::eof()) ) { + sputc( traits::to_char_type(c) ); + } + + return traits::not_eof(c); + } + } + + setp( this->pbase(), this->epptr() ); // require: set pptr + if( !traits::eq_int_type(c,traits::eof()) ) { + sputc( traits::to_char_type(c) ); + } + + return traits::not_eof(c); +} + +template<class charT, class traits, class _Alloc> +int basic_sockbuf<charT, traits, _Alloc>::sync() +{ + if ( !basic_socket_t::is_open() ) { + return -1; + } + + long count = this->pptr() - this->pbase(); + if ( count ) { + // _STLP_ASSERT( this->pbase() != 0 ); + count *= sizeof(charT); + long start = 0; + while ( count > 0 ) { + long offset = (this->*_xwrite)( this->pbase() + start, count ); + if ( offset < 0 ) { + if ( errno == EAGAIN ) { + pollfd wpfd; + wpfd.fd = basic_socket_t::_fd; + wpfd.events = POLLOUT | POLLHUP | POLLWRNORM; + wpfd.revents = 0; + while ( poll( &wpfd, 1, basic_socket_t::_use_wrtimeout ? basic_socket_t::_wrtimeout.count() : -1 ) <= 0 ) { // wait infinite + if ( errno == EINTR ) { // may be interrupted, check and ignore + errno = 0; + continue; + } + return -1; + } + if ( (wpfd.revents & POLLERR) != 0 ) { + return -1; + } + offset = (this->*_xwrite)( this->pbase() + start, count ); + if ( offset < 0 ) { + return -1; + } + } else { + return -1; + } + } + count -= offset; + start += offset; + } + setp( this->pbase(), this->epptr() ); // require: set pptr + } + + return 0; +} + +template<class charT, class traits, class _Alloc> +streamsize basic_sockbuf<charT, traits, _Alloc>::xsputn( const char_type *s, streamsize n ) +{ + if ( !basic_socket_t::is_open() || s == 0 || n == 0 ) { + return 0; + } + + if ( this->epptr() - this->pptr() > n ) { + traits::copy( this->pptr(), s, n ); + this->pbump( n ); + } else { + streamsize __n_put = this->epptr() - this->pptr(); + traits::copy( this->pptr(), s, __n_put ); + this->pbump( __n_put ); + + if ( traits::eq_int_type(overflow(),traits::eof()) ) + return 0; + + setp( (char_type *)(s + __n_put), (char_type *)(s + n) ); + this->pbump( n - __n_put ); + + if ( traits::eq_int_type(overflow(),traits::eof()) ) { + setp( _bbuf, _bbuf + ((_ebuf - _bbuf) >> 1) ); + return 0; + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf) >> 1) ); + } + return n; +} + +template<class charT, class traits, class _Alloc> +int basic_sockbuf<charT, traits, _Alloc>::recvfrom( void *buf, size_t n ) +{ +#if defined(_WIN32) || (defined(__hpux) && !defined(_INCLUDE_POSIX1C_SOURCE)) + int sz = sizeof( sockaddr_in ); +#else + socklen_t sz = sizeof( sockaddr_in ); +#endif + + typename basic_socket_t::sockaddr_t addr; + +#ifdef __FIT_POLL + pollfd pfd; + pfd.fd = basic_socket_t::_fd; + pfd.events = POLLIN; +#endif // __FIT_POLL + do { +#ifdef __FIT_POLL + pfd.revents = 0; + if ( poll( &pfd, 1, /* _timeout */ -1 ) > 0 ) { // wait infinite + // get address of caller only + char buff[32]; + ::recvfrom( basic_socket_t::_fd, buff, 32, MSG_PEEK, &addr.any, &sz ); + } else { + return 0; // poll wait infinite, so it can't return 0 (timeout), so it return -1. + } +#endif // __FIT_POLL + if ( memcmp( &basic_socket_t::_address.inet, &addr.inet, sizeof(sockaddr_in) ) == 0 ) { +#ifdef WIN32 + return ::recvfrom( basic_socket_t::_fd, (char *)buf, n, 0, &basic_socket_t::_address.any, &sz ); +#else + return ::recvfrom( basic_socket_t::_fd, buf, n, 0, &basic_socket_t::_address.any, &sz ); +#endif + } + // xmt::Thread::yield(); + } while ( true ); + + return 0; // never +} + +template<class charT, class traits, class _Alloc> void basic_sockstream<charT, traits, _Alloc>::setoptions( sock_base::so_t optname, bool on_off, int __v ) { #ifdef __unix Modified: branches/complement-sockios/explore/include/sockios/sp.cc =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-16 14:59:26 UTC (rev 1901) +++ branches/complement-sockios/explore/include/sockios/sp.cc 2008-06-26 05:40:30 UTC (rev 1902) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <08/06/09 23:51:34 ptr> +// -*- C++ -*- Time-stamp: <08/06/11 21:42:37 yeti> /* * Copyright (c) 2008 @@ -10,741 +10,9 @@ namespace std { -template<class charT, class traits, class _Alloc> -void sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base::stype type, sock_base::protocol prot ) -{ - std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); - if ( basic_socket_t::is_open_unsafe() ) { - return; - } - _mode = ios_base::in | ios_base::out; - _state = ios_base::goodbit; -#ifdef WIN32 - ::WSASetLastError( 0 ); -#endif - if ( prot == sock_base::inet ) { - basic_socket_t::_fd = socket( PF_INET, type, 0 ); - if ( basic_socket_t::_fd == -1 ) { - _state |= ios_base::failbit | ios_base::badbit; - return; - } - // _open = true; - basic_socket_t::_address.inet.sin_family = AF_INET; - basic_socket_t::_address.inet.sin_port = htons( port ); - basic_socket_t::_address.inet.sin_addr.s_addr = addr.s_addr; - - if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { - // let's try reuse local address - setoptions_unsafe( sock_base::so_reuseaddr, true ); - } - - if ( ::bind( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof(basic_socket_t::_address) ) == -1 ) { - _state |= ios_base::failbit; -#ifdef WIN32 - ::closesocket( basic_socket_t::_fd ); -#else - ::close( basic_socket_t::_fd ); -#endif - basic_socket_t::_fd = -1; - return; - } - - if ( type == sock_base::sock_stream || type == sock_base::sock_seqpacket ) { - // I am shure, this is socket of type SOCK_STREAM | SOCK_SEQPACKET, - // so don't check return code from listen - ::listen( basic_socket_t::_fd, SOMAXCONN ); - basic_socket_t::mgr->push( *this ); - } - } else if ( prot == sock_base::local ) { - return; - } else { - return; - } - _state = ios_base::goodbit; - - return; -} - -template<class charT, class traits, class _Alloc> -void sock_processor_base<charT,traits,_Alloc>::close() -{ - std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); - if ( !basic_socket_t::is_open_unsafe() ) { - return; - } - basic_socket<charT,traits,_Alloc>::mgr->pop( *this, basic_socket_t::_fd ); -#ifdef WIN32 - ::closesocket( basic_socket_t::_fd ); -#else - ::shutdown( basic_socket_t::_fd, 2 ); - ::close( basic_socket_t::_fd ); -#endif - basic_socket_t::_fd = -1; -} - -template<class charT, class traits, class _Alloc> -void sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base::shutdownflg dir ) -{ - std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); - if ( basic_socket_t::is_open_unsafe() ) { - if ( (dir & (sock_base::stop_in | sock_base::stop_out)) == - (sock_base::stop_in | sock_base::stop_out) ) { - ::shutdown( basic_socket_t::_fd, 2 ); - } else if ( dir & sock_base::stop_in ) { - ::shutdown( basic_socket_t::_fd, 0 ); - } else if ( dir & sock_base::stop_out ) { - ::shutdown( basic_socket_t::_fd, 1 ); - } - } -} - -template<class charT, class traits, class _Alloc> -void sock_processor_base<charT,traits,_Alloc>::setoptions_unsafe( sock_base::so_t optname, bool on_off, int __v ) -{ -#ifdef __unix - if ( basic_socket_t::is_open_unsafe() ) { - if ( optname != sock_base::so_linger ) { - int turn = on_off ? 1 : 0; - if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&turn, - (socklen_t)sizeof(int) ) != 0 ) { - _state |= ios_base::failbit; - } - } else { - linger l; - l.l_onoff = on_off ? 1 : 0; - l.l_linger = __v; - if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&l, - (socklen_t)sizeof(linger) ) != 0 ) { - _state |= ios_base::failbit; - } - - } - } else { - _state |= ios_base::failbit; - } -#endif // __unix -} - - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -int connect_processor<Connect, charT, traits, _Alloc, C>::Init::_count = 0; - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -bool connect_processor<Connect, charT, traits, _Alloc, C>::Init::_at_fork = false; - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -std::tr2::mutex connect_processor<Connect, charT, traits, _Alloc, C>::Init::_init_lock; - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::Init::_guard( int direction ) -{ - if ( direction ) { - std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); - if ( _count++ == 0 ) { -#ifdef __FIT_PTHREADS - if ( !_at_fork ) { // call only once - if ( pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ) ) { - // throw system_error - } - _at_fork = true; - } -#endif -// _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); - } - } else { - std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); - if ( --_count == 0 ) { - - } - } -} - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_prepare() -{ _init_lock.lock(); } - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_child() -{ - _init_lock.unlock(); - - if ( _count != 0 ) { - // std::cerr << "SHOULD NEVER HAPPEN!!!!\n"; - throw std::logic_error( "Fork while connect_processor working may has unexpected behaviour in child process" ); - } - // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); -} - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::Init::__at_fork_parent() -{ _init_lock.unlock(); } - -template<class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -char connect_processor<Connect, charT, traits, _Alloc, C>::Init_buf[128]; - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::close() -{ - base_t::close(); - - { - std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); - _in_work = false; // <--- set before cnd.notify_one(); (below in this func) - } - - std::tr2::lock_guard<std::tr2::mutex> lk2( rdlock ); - ready_pool.push_back( processor() ); // make ready_pool not empty - // std::cerr << "=== " << ready_pool.size() << std::endl; - cnd.notify_one(); -} - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( int fd, const sockaddr& addr ) -{ - typename base_t::sockstream_t* s = base_t::create_stream( fd, addr ); - - Connect* c = new Connect( *s ); // bad point! I can't read from s in ctor indeed! - - if ( s->rdbuf()->in_avail() ) { - std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); - ready_pool.push_back( processor( c, s ) ); - cnd.notify_one(); - } else { - std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); - worker_pool.insert( std::make_pair( fd, processor( c, s ) ) ); - } -} - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( int fd, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_close_t& ) -{ - { - std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); - typename worker_pool_t::iterator i = worker_pool.find( fd ); - if ( i != worker_pool.end() ) { - delete i->second.s; - delete i->second.c; - // std::cerr << "oops\n"; - worker_pool.erase( i ); - return; - } - } - - Connect* c = 0; - { - std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); - typename ready_pool_t::iterator j = std::find( ready_pool.begin(), ready_pool.end(), /* std::bind2nd( typename processor::equal_to(), &s ) */ fd ); - if ( j != ready_pool.end() ) { - // std::cerr << "oops 2\n"; - c = j->c; - ready_pool.erase( j ); - } - } - if ( c != 0 ) { -//using unknown variable s -// (c->*C)( s ); - delete c; - } -} - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::operator ()( int fd, const typename connect_processor<Connect, charT, traits, _Alloc, C>::base_t::adopt_data_t& ) -{ - processor p; - - { - std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); - typename worker_pool_t::const_iterator i = worker_pool.find( fd ); - if ( i == worker_pool.end() ) { - return; - } - p = i->second; - worker_pool.erase( i ); - } - - std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); - ready_pool.push_back( p ); - cnd.notify_one(); - // std::cerr << "notify data " << (void *)c << " " << ready_pool.size() << std::endl; -} - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -bool connect_processor<Connect, charT, traits, _Alloc, C>::pop_ready( processor& p ) -{ - { - std::tr2::unique_lock<std::tr2::mutex> lk( rdlock ); - - cnd.wait( lk, not_empty ); - p = ready_pool.front(); // it may contain p.c == 0, p.s == 0, if !in_work() - ready_pool.pop_front(); - // std::cerr << "pop 1\n"; - if ( p.c == 0 ) { // wake up, but _in_work may be still true here (in processor pipe?), - return false; // even I know that _in_work <- false before notification... - } // so, check twice - } - - // std::cerr << "pop 2\n"; - - std::tr2::lock_guard<std::tr2::mutex> lk(inwlock); - return _in_work ? true : false; -} - - -template <class Connect, class charT, class traits, class _Alloc, void (Connect::*C)( std::basic_sockstream<charT,traits,_Alloc>& )> -void connect_processor<Connect, charT, traits, _Alloc, C>::worker() -{ - _in_work = true; - - processor p; - - while ( pop_ready( p ) ) { - // std::cerr << "worker 1\n"; - (p.c->*C)( *p.s ); - // std::cerr << "worker 2\n"; - if ( p.s->rdbuf()->in_avail() ) { - std::tr2::lock_guard<std::tr2::mutex> lk( rdlock ); - ready_pool.push_back( p ); - } else { - std::tr2::lock_guard<std::tr2::mutex> lk( wklock ); - worker_pool[p.s->rdbuf()->fd()] = p; - ... [truncated message content] |