[complement-svn] SF.net SVN: complement: [1833] branches/complement-sockios/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2008-03-11 15:03:58
|
Revision: 1833 http://complement.svn.sourceforge.net/complement/?rev=1833&view=rev Author: complement Date: 2008-03-11 08:03:22 -0700 (Tue, 11 Mar 2008) Log Message: ----------- core implementation of new sockstream, that based on epoll and has uniform sockets management Modified Paths: -------------- branches/complement-sockios/explore/lib/sockios/Makefile.inc branches/complement-sockios/explore/lib/sockios/ut/Makefile.inc branches/complement-sockios/explore/lib/sockios/ut/sockios_test_suite.cc Added Paths: ----------- branches/complement-sockios/explore/include/sockios/sockstream2 branches/complement-sockios/explore/include/sockios/sockstream2.cc branches/complement-sockios/explore/include/sockios/sp.h branches/complement-sockios/explore/lib/sockios/_sp.cc branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.cc branches/complement-sockios/explore/lib/sockios/ut/sockios2_test.h Copied: branches/complement-sockios/explore/include/sockios/sockstream2 (from rev 1828, branches/complement-sockios/explore/include/sockios/sockstream) =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream2 (rev 0) +++ branches/complement-sockios/explore/include/sockios/sockstream2 2008-03-11 15:03:22 UTC (rev 1833) @@ -0,0 +1,766 @@ +// -*- C++ -*- Time-stamp: <08/03/07 01:16:27 ptr> + +/* + * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 + * Petr Ovtchenkov + * + * Portion Copyright (c) 1999-2001 + * Parallel Graphics Ltd. + * + * Licensed under the Academic Free License version 3.0 + * + */ + +#ifndef __SOCKIOS_SOCKSTREAM2 +#define __SOCKIOS_SOCKSTREAM2 + +#ifndef __config_feature_h +#include <config/feature.h> +#endif + +#if !defined(__sun) && !defined(_WIN32) // i.e. __linux and __hpux +#include <sys/poll.h> // pollfd +#endif + +#include <mt/mutex> +#include <mt/condition_variable> +#include <mt/date_time> + +#include <netdb.h> +#include <netinet/in.h> + +#include <iosfwd> +#include <ios> +#include <streambuf> +#include <iostream> +#include <string> +#include <stdexcept> + +#ifdef WIN32 +# include <winsock2.h> +#else // WIN32 +# include <unistd.h> +# include <sys/types.h> +# if defined(__hpux) && !defined(_INCLUDE_XOPEN_SOURCE_EXTENDED) +# define _INCLUDE_XOPEN_SOURCE_EXTENDED +# endif +# include <sys/socket.h> +# if !defined(__UCLIBC__) && !defined(__FreeBSD__) && !defined(__OpenBSD__) && !defined(__NetBSD__) +# include <stropts.h> +# endif +# ifdef __sun +# include <sys/conf.h> +# endif +# include <netinet/in.h> +# include <arpa/inet.h> +# include <netdb.h> +# ifdef __hpux +// # ifndef socklen_t // HP-UX 10.01 +// typedef int socklen_t; +// # endif +# endif +# include <cerrno> +#endif // !WIN32 + +#include <sockios/netinfo.h> + +#ifdef STLPORT +_STLP_BEGIN_NAMESPACE +#else +namespace std { +#endif + +class sock_base2 +{ + public: + typedef unsigned long shutdownflg; +#ifdef __unix + typedef int socket_type; +#endif +#ifdef WIN32 + typedef SOCKET socket_type; +#endif + + enum stype { + sock_stream = SOCK_STREAM, // stream socket + sock_dgram = SOCK_DGRAM, // datagram socket + sock_raw = SOCK_RAW, // raw-protocol interface + sock_rdm = SOCK_RDM, // reliably-delivered message + sock_seqpacket = SOCK_SEQPACKET // sequenced packet stream + }; + + enum protocol { + local, // local to host (pipes, portals) + inet // internetwork: UDP, TCP, etc. + }; + + // Option flags per-socket. + enum so_t { + so_debug = SO_DEBUG, // turn on debugging info recording +#ifndef __linux + so_acceptconn = SO_ACCEPTCONN, // socket has had listen() +#endif + so_reuseaddr = SO_REUSEADDR, // allow local address reuse + so_keepalive = SO_KEEPALIVE, // keep connections alive + so_dontroute = SO_DONTROUTE, // just use interface addresses + so_broadcast = SO_BROADCAST, // permit sending of broadcast msgs +#ifndef __linux + so_useloopback = SO_USELOOPBACK, // bypass hardware when possible +#endif + so_linger = SO_LINGER, // linger on close if data present + so_oobinline = SO_OOBINLINE, // leave received OOB data in line + // Additional options, not kept in so_options. + so_sndbuf = SO_SNDBUF, // send buffer size + so_rcvbuf = SO_RCVBUF, // receive buffer size + so_sndlowat = SO_SNDLOWAT, // send low-water mark + so_rcvlowat = SO_RCVLOWAT, // receive low-water mark + so_sndtimeo = SO_SNDTIMEO, // send timeout + so_rcvtimeo = SO_RCVTIMEO, // receive timeout + so_error = SO_ERROR, // get error status and clear + so_type = SO_TYPE // get socket type +#ifdef __sun // indeed HP-UX 11.00 also has it, but 10.01 not + , + so_prototype = SO_PROTOTYPE // get/set protocol type +#endif + }; + + // Level number for (get/set)sockopt() to apply to socket itself. +// enum _level { +// sol_socket = SOL_SOCKET +// }; + + enum shutdownflags { + stop_in = 0x1, + stop_out = 0x2 + }; + +#ifdef WIN32 + class Init + { + // sometimes I need Init outside sock_base... + // private: + public: + __FIT_DECLSPEC Init(); + __FIT_DECLSPEC ~Init(); + + friend class sock_base; + }; + + protected: + __FIT_DECLSPEC sock_base(); + __FIT_DECLSPEC ~sock_base(); +#endif +}; + +class socket_timeout : + public std::exception +{ + public: + socket_timeout() + { } + + virtual char const *what() throw() + { return "socket timeout"; } +}; + +class socket_read_timeout : + public socket_timeout +{ + public: + socket_read_timeout() + { } + + virtual char const *what() throw() + { return "socket read timeout"; } +}; + +class socket_write_timeout : + public socket_timeout +{ + public: + socket_write_timeout() + { } + + virtual char const *what() throw() + { return "socket write timeout"; } +}; + +namespace detail { +template <class charT, class traits, class _Alloc> class sockmgr; +} // namespace detail + +template<class charT, class traits, class _Alloc> +class basic_socket +{ + protected: + basic_socket() : + _fd( -1 ), + _use_rdtimeout( false ), + _use_wrtimeout( false ), + _notify_close( false ) + { new( Init_buf ) Init(); } + + ~basic_socket() + { ((Init *)Init_buf)->~Init(); } + + bool is_open_unsafe() const + { return _fd != -1; } + sock_base2::socket_type fd_unsafe() const + { return _fd; } + + class Init + { + public: + Init() + { _guard( 1 ); } + ~Init() + { _guard( 0 ); } + + private: + static void _guard( int direction ); + static void __at_fork_prepare(); + static void __at_fork_child(); + static void __at_fork_parent(); + static int _count; + }; + + static char Init_buf[]; + + public: +#ifdef WIN32 + typedef u_short family_type; +#else +# ifdef sa_family_t + typedef sa_family_t family_type; +# else // HP-UX 10.01 + typedef unsigned short family_type; +# endif +#endif + + template <class Duration> + void rdtimeout( const Duration& ); + void rdtimeout() // infinite + { _use_rdtimeout = false; } + + template <class Duration> + void wrtimeout( const Duration& ); + void wrtimeout() // infinite + { _use_wrtimeout = false; } + + sock_base2::socket_type fd() const { return _fd;} + bool is_open() const + { return _fd != -1; } + + family_type family() const + { return /* is_open() ? */ _address.any.sa_family /* : 0 */; } + + int port() const + { return /* is_open() && */ _address.any.sa_family == AF_INET ? _address.inet.sin_port : 0; } + + unsigned long inet_addr() const + { return /* is_open() && */ _address.any.sa_family == AF_INET ? _address.inet.sin_addr.s_addr : 0; } + + const sockaddr_in& inet_sockaddr() const throw( std::domain_error ) + { + if ( _address.any.sa_family != AF_INET ) { + throw domain_error( "socket not belongs to inet type" ); + } + return /* is_open() && */ _address.inet; + } + + protected: + sock_base2::socket_type _fd; + + union sockaddr_t { + sockaddr_in inet; + sockaddr any; + } _address; + + std::tr2::milliseconds _rdtimeout; + std::tr2::milliseconds _wrtimeout; + bool _use_rdtimeout; + bool _use_wrtimeout; + bool _notify_close; + + static detail::sockmgr<charT,traits,_Alloc>* mgr; + friend class Init; +}; + +template<class charT, class traits, class _Alloc> +int basic_socket<charT,traits,_Alloc>::Init::_count = 0; + +template<class charT, class traits, class _Alloc> +void basic_socket<charT,traits,_Alloc>::Init::_guard( int direction ) +{ + static std::tr2::mutex _init_lock; + + if ( direction ) { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( _count++ == 0 ) { + basic_socket<charT,traits,_Alloc>::mgr = new detail::sockmgr<charT,traits,_Alloc>(); +// #ifdef __FIT__PTHREADS +// pthread_atfork( __at_fork_prepare, __at_fork_parent, __at_fork_child ); +// #endif +// _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); + } + } else { + std::tr2::lock_guard<std::tr2::mutex> lk( _init_lock ); + if ( --_count == 0 ) { + delete basic_socket<charT,traits,_Alloc>::mgr; + } + } +} + +template<class charT, class traits, class _Alloc> +void basic_socket<charT,traits,_Alloc>::Init::__at_fork_prepare() +{ } + +template<class charT, class traits, class _Alloc> +void basic_socket<charT,traits,_Alloc>::Init::__at_fork_child() +{ + if ( _count != 0 ) { + // stop mgr + delete basic_socket<charT,traits,_Alloc>::mgr; + basic_socket<charT,traits,_Alloc>::mgr = new detail::sockmgr<charT,traits,_Alloc>(); + } + // _sock_processor_base::_idx = std::tr2::this_thread::xalloc(); +} + +template<class charT, class traits, class _Alloc> +void basic_socket<charT,traits,_Alloc>::Init::__at_fork_parent() +{ } + +template<class charT, class traits, class _Alloc> +char basic_socket<charT,traits,_Alloc>::Init_buf[128]; + +template <class charT, class traits, class _Alloc> +detail::sockmgr<charT,traits,_Alloc>* basic_socket<charT,traits,_Alloc>::mgr; + +#ifdef STLPORT +_STLP_END_NAMESPACE +#else +} // namespace std +#endif + +#include <sockios/sp.h> + +#ifdef STLPORT +_STLP_BEGIN_NAMESPACE +#else +namespace std { +#endif + +template<class charT, class traits, class _Alloc> +class basic_sockbuf2 : + public basic_streambuf<charT, traits>, + public basic_socket<charT,traits,_Alloc> +{ + private: + typedef basic_socket<charT,traits,_Alloc> basic_socket_t; + + public: + typedef basic_ios<charT, traits> ios_type; + typedef basic_sockbuf2<charT, traits, _Alloc> sockbuf_type; + typedef typename traits::state_type state_t; + + public: + /* Inherited from basic_streambuf : */ + typedef charT char_type; + typedef typename traits::int_type int_type; + typedef typename traits::pos_type pos_type; + typedef typename traits::off_type off_type; + typedef traits traits_type; + /* */ + + basic_sockbuf2() : + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) // hmm, 3.3.6 + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) + { } + + basic_sockbuf2( const char *hostname, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol prot = sock_base2::inet ) : + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) + { open( hostname, port, type, prot ); } + + basic_sockbuf2( const in_addr& addr, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol prot = sock_base2::inet ) : + rdready( *this ), +#if !defined(STLPORT) && defined(__GNUC__) +#if ((__GNUC__ < 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ < 4))) + _mode( ios_base::openmode(__ios_flags::_S_in | __ios_flags::_S_out) ), +#else // 4.1.1 + _mode( _S_in | _S_out ), +#endif // __GNUC__ +#else // STLPORT + _mode( 0 ), +#endif // STLPORT + _bbuf(0), + _ebuf(0), + _allocated( true ) + { open( addr, type, prot ); } + + virtual ~basic_sockbuf2() + { + close(); + _M_deallocate_block(); + } + + sockbuf_type *open( const char *hostname, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol prot = sock_base2::inet ); + + sockbuf_type *open( const in_addr& addr, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol prot = sock_base2::inet ); + + sockbuf_type *open( sock_base2::socket_type s, + sock_base2::stype t = sock_base2::sock_stream ); + + sockbuf_type *open( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype t = sock_base2::sock_stream ); + + sockbuf_type *attach( sock_base2::socket_type s, + sock_base2::stype t = sock_base2::sock_stream ); + + sockbuf_type *attach( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype t = sock_base2::sock_stream ); + + sockbuf_type *close(); + void shutdown( sock_base2::shutdownflg dir ); + + sock_base2::stype stype() const + { return _type; } + + protected: + virtual streamsize showmanyc() + { return this->egptr() - this->gptr(); } + + virtual int_type underflow(); + virtual int_type overflow( int_type c = traits::eof() ); + virtual int_type pbackfail( int_type c = traits::eof() ) + { + if ( !basic_socket_t::is_open() ) + return traits::eof(); + + if ( this->gptr() <= this->eback() ) { + return traits::eof(); + } + + this->gbump(-1); + if ( !traits::eq_int_type(c,traits::eof()) ) { + *this->gptr() = traits::to_char_type(c); + return c; + } + + return traits::not_eof(c); + } + + // Buffer managment and positioning: + virtual basic_streambuf<charT, traits> *setbuf(char_type *s, streamsize n ) + { + if ( s != 0 && n != 0 ) { + _M_deallocate_block(); + _allocated = false; + _bbuf = s; + _ebuf = s + n; + } + return this; + } + + virtual int sync(); + virtual streamsize xsputn(const char_type *s, streamsize n); + + private: // Helper functions + charT* _bbuf; + charT* _ebuf; + bool _allocated; // true, if _bbuf should be deallocated + + // Precondition: 0 < __n <= max_size(). + charT* _M_allocate( size_t __n ) { return _M_data_allocator.allocate(__n); } + void _M_deallocate( charT* __p, size_t __n ) + { if (__p) _M_data_allocator.deallocate(__p, __n); } + + void _M_allocate_block(size_t __n) + { + if ( _allocated ) { + if ( __n <= max_size() ) { + _bbuf = _M_allocate(__n); + _ebuf = _bbuf + __n; + // _STLP_ASSERT( __n > 0 ? _bbuf != 0 : _bbuf == 0 ); + } else + this->_M_throw_length_error(); + } + } + + void _M_deallocate_block() + { if ( _allocated ) _M_deallocate(_bbuf, _ebuf - _bbuf); } + + size_t max_size() const { return (size_t(-1) / sizeof(charT)) - 1; } + +#ifdef STLPORT + void _M_throw_length_error() const + { _STLP_THROW(length_error("basic_sockbuf2")); } +#else + void _M_throw_length_error() const + { throw length_error("basic_sockbuf2"); } +#endif + +#ifdef STLPORT + typedef typename _Alloc_traits<charT, _Alloc>::allocator_type allocator_type; +#else + typedef _Alloc allocator_type; +#endif + /* typedef __allocator<charT, _Alloc> _Alloc_type; */ + + /* _Alloc_type */ allocator_type _M_data_allocator; + + class rdready_t + { + public: + rdready_t( sockbuf_type& self ) : + b( self ) + { } + bool operator ()() const + { return b.showmanyc() != 0; } + private: + sockbuf_type& b; + } rdready; + + sockbuf_type *_open_sockmgr( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype t = sock_base2::sock_stream ); + + private: + typedef basic_sockbuf2<charT,traits,_Alloc> _Self_type; + int (basic_sockbuf2<charT,traits,_Alloc>::*_xwrite)( const void *, size_t ); + int (basic_sockbuf2<charT,traits,_Alloc>::*_xread)( void *, size_t ); + int write( const void *buf, size_t n ) +#ifndef WIN32 + { return ::write( basic_socket_t::_fd, buf, n ); } +#else + { return ::send( basic_socket_t::_fd, (const char *)buf, n, 0 ); } +#endif + int send( const void *buf, size_t n ) +#ifdef WIN32 + { return ::send( basic_socket_t::_fd, (const char *)buf, n, 0 ); } +#else + { return ::send( basic_socket_t::_fd, buf, n, 0 ); } +#endif + int sendto( const void *buf, size_t n ) +#ifdef WIN32 + { return ::sendto( basic_socket_t::_fd, (const char *)buf, n, 0, &basic_socket_t::_address.any, sizeof( sockaddr_in ) ); } +#else + { return ::sendto( basic_socket_t::_fd, buf, n, 0, &basic_socket_t::_address.any, sizeof( sockaddr_in ) ); } +#endif + + int read( void *buf, size_t n ) +#ifdef WIN32 + { return ::recv( basic_socket_t::_fd, (char *)buf, n, 0 ); } +#else + { return ::read( basic_socket_t::_fd, buf, n ); } +#endif + int recv( void *buf, size_t n ) +#ifdef WIN32 + { return ::recv( basic_socket_t::_fd, (char *)buf, n, 0 ); } +#else + { return ::recv( basic_socket_t::_fd, buf, n, 0 ); } +#endif + int recvfrom( void *buf, size_t n ); + void __hostname(); + + ios_base::openmode _mode; + sock_base2::stype _type; + + std::tr2::mutex ulck; + std::tr2::condition_variable ucnd; + + friend class detail::sockmgr<charT,traits,_Alloc>; +}; + +template <class charT, class traits, class _Alloc> +class basic_sockstream2 : + public sock_base2, + public basic_iostream<charT,traits> +{ + public: + /* Inherited from basic_iostream + typedef charT char_type; + typedef typename traits::int_type int_type; + typedef typename traits::pos_type pos_type; + typedef typename traits::off_type off_type; + */ + basic_sockstream2() : + sock_base2(), + basic_iostream<charT,traits>( 0 ) + { basic_ios<charT,traits>::init(&_sb); } + basic_sockstream2( const char *hostname, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol pro = sock_base2::inet ) : + sock_base2(), + basic_iostream<charT,traits>( 0 ) + { + basic_ios<charT,traits>::init(&_sb); + basic_iostream<charT,traits>::clear(); + if ( _sb.open( hostname, port, type, pro ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + basic_sockstream2( const in_addr& addr, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol pro = sock_base2::inet ) : + sock_base2(), + basic_iostream<charT,traits>( 0 ) + { + basic_ios<charT,traits>::init(&_sb); + basic_iostream<charT,traits>::clear(); + if ( _sb.open( addr, port, type, pro ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + basic_sockstream2( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype type = sock_base2::sock_stream ) : + sock_base2(), + basic_iostream<charT,traits>( 0 ) + { + basic_ios<charT,traits>::init(&_sb); + basic_iostream<charT,traits>::clear(); + if ( _sb.open( s, addr, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + basic_sockstream2( sock_base2::socket_type s, + sock_base2::stype type = sock_base2::sock_stream ) : + sock_base2(), + basic_iostream<charT,traits>( 0 ) + { + basic_ios<charT,traits>::init(&_sb); + basic_iostream<charT,traits>::clear(); + if ( _sb.open( s, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + basic_sockbuf2<charT,traits,_Alloc>* rdbuf() const + { return const_cast<basic_sockbuf2<charT,traits,_Alloc>*>(&_sb); } + + bool is_open() const + { return _sb.is_open(); } + + void open( const char *hostname, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol pro = sock_base2::inet ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.open( hostname, port, type, pro ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + void open( const in_addr& addr, int port, + sock_base2::stype type = sock_base2::sock_stream, + sock_base2::protocol pro = sock_base2::inet ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.open( addr, port, type, pro ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + // only for sock_stream : inet now! + void open( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype type = sock_base2::sock_stream ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.open( s, addr, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + void open( sock_base2::socket_type s, + sock_base2::stype type = sock_base2::sock_stream ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.open( s, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + void attach( sock_base2::socket_type s, const sockaddr& addr, + sock_base2::stype type = sock_base2::sock_stream ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.attach( s, addr, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + void attach( sock_base2::socket_type s, + sock_base2::stype type = sock_base2::sock_stream ) + { + basic_iostream<charT,traits>::clear(); + if ( _sb.attach( s, type ) == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit | ios_base::badbit ); + } + } + + void close() + { + if ( _sb.is_open() ) { + if ( _sb.close() == 0 ) { + basic_ios<charT,traits>::setstate( ios_base::failbit ); + } else { + basic_iostream<charT,traits>::clear(); + } + } + } + + void setoptions( sock_base2::so_t optname, bool on_off = true, + int __v = 0 ); + + private: + basic_sockbuf2<charT,traits,_Alloc> _sb; +}; + +typedef basic_sockbuf2<char,char_traits<char>,allocator<char> > sockbuf2; +// typedef basic_sockbuf2<wchar_t,char_traits<wchar_t>,allocator<wchar_t> > wsockbuf; +typedef basic_sockstream2<char,char_traits<char>,allocator<char> > sockstream2; +// typedef basic_sockstream<wchar_t,char_traits<wchar_t>,allocator<wchar_t> > wsockstream; + +#ifdef STLPORT +_STLP_END_NAMESPACE +#else +} // namespace std +#endif + +#ifndef __STL_LINK_TIME_INSTANTIATION +#include <sockios/sockstream2.cc> +#endif + +#endif // __SOCKIOS_SOCKSTREAM2 Copied: branches/complement-sockios/explore/include/sockios/sockstream2.cc (from rev 1828, branches/complement-sockios/explore/include/sockios/sockstream.cc) =================================================================== --- branches/complement-sockios/explore/include/sockios/sockstream2.cc (rev 0) +++ branches/complement-sockios/explore/include/sockios/sockstream2.cc 2008-03-11 15:03:22 UTC (rev 1833) @@ -0,0 +1,541 @@ +// -*- C++ -*- Time-stamp: <08/03/07 01:16:27 ptr> + +/* + * Copyright (c) 1997-1999, 2002, 2003, 2005-2008 + * Petr Ovtchenkov + * + * Portion Copyright (c) 1999-2001 + * Parallel Graphics Ltd. + * + * Licensed under the Academic Free License version 3.0 + * + */ + +#include <sockios/netinfo.h> + +#if defined(__unix) && !defined(__UCLIBC__) && !defined(__FreeBSD__) && !defined(__OpenBSD__) && !defined(__NetBSD__) +# include <stropts.h> // for ioctl() call +#endif + +#include <fcntl.h> + +#ifdef STLPORT +_STLP_BEGIN_NAMESPACE +#else +namespace std { +#endif + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::open( const char *name, int port, + sock_base2::stype type, + sock_base2::protocol prot ) +{ return basic_sockbuf2<charT, traits, _Alloc>::open( std::findhost( name ), port, type, prot ); } + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::open( const in_addr& addr, int port, + sock_base2::stype type, + sock_base2::protocol prot ) +{ + if ( basic_socket_t::is_open() ) { + return 0; + } + try { + _mode = ios_base::in | ios_base::out; + _type = type; +#ifdef WIN32 + WSASetLastError( 0 ); +#endif + if ( prot == sock_base2::inet ) { + basic_socket_t::_fd = socket( PF_INET, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + throw std::runtime_error( "can't open socket" ); + } + basic_socket_t::_address.inet.sin_family = AF_INET; + // htons is a define at least in Linux 2.2.5-15, and it's expantion fail + // for gcc 2.95.3 +#if defined(linux) && defined(htons) && defined(__bswap_16) + basic_socket_t::_address.inet.sin_port = ((((port) >> 8) & 0xff) | (((port) & 0xff) << 8)); +#else + basic_socket_t::_address.inet.sin_port = htons( port ); +#endif // linux && htons + basic_socket_t::_address.inet.sin_addr = addr; + + // Generally, stream sockets may successfully connect() only once + if ( connect( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof( basic_socket_t::_address ) ) == -1 ) { + throw std::domain_error( "connect fail" ); + } + if ( type == sock_base2::sock_stream ) { + _xwrite = &_Self_type::write; + _xread = &_Self_type::read; + } else if ( type == sock_base2::sock_dgram ) { + _xwrite = &_Self_type::send; + _xread = &_Self_type::recv; + } + } else if ( prot == sock_base2::local ) { + basic_socket_t::_fd = socket( PF_UNIX, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + throw std::runtime_error( "can't open socket" ); + } + } else { // other protocols not implemented yet + throw std::invalid_argument( "protocol not implemented" ); + } + + if ( _bbuf == 0 ) { + struct ifconf ifc; + struct ifreq ifr; + ifc.ifc_len = sizeof(ifreq); + ifc.ifc_req = 𝔦 + int mtu = ((ioctl(basic_socket_t::_fd, SIOCGIFMTU, &ifc) < 0 ? 1500 : ifr.ifr_mtu) - 20 - (type == sock_base2::sock_stream ? 20 : 8 )) / sizeof(charT); + int qlen = ioctl(basic_socket_t::_fd, SIOCGIFTXQLEN, &ifc) < 0 ? 2 : ifr.ifr_qlen; + _M_allocate_block( type == sock_base2::sock_stream ? mtu * qlen * 2 : mtu * 2 ); + } + + if ( _bbuf == 0 ) { + throw std::length_error( "can't allocate block" ); + } + + if ( fcntl( basic_socket_t::_fd, F_SETFL, fcntl( basic_socket_t::_fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + basic_socket_t::_notify_close = true; + basic_socket_t::mgr->push( *this ); + } + catch ( std::domain_error& ) { +#ifdef WIN32 + // _errno = WSAGetLastError(); + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + catch ( std::length_error& ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + catch ( std::runtime_error& ) { +#ifdef WIN32 + // _errno = WSAGetLastError(); +#else +#endif + return 0; + } + catch ( std::invalid_argument& ) { + return 0; + } + + return this; +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::open( sock_base2::socket_type s, sock_base2::stype t ) +{ + if ( basic_socket_t::is_open() || s == -1 ) { + return 0; + } + + sockaddr sa; + socklen_t sl = sizeof(sa); + getsockname( s, &sa, &sl ); + + return basic_sockbuf<charT, traits, _Alloc>::open( s, sa, t ); +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::open( sock_base2::socket_type s, + const sockaddr& addr, + sock_base2::stype t ) +{ + basic_sockbuf2<charT, traits, _Alloc>* ret = _open_sockmgr( s, addr, t ); + if ( ret != 0 ) { + basic_socket_t::_notify_close = true; + basic_socket_t::mgr->push( *this ); + } + return ret; +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::_open_sockmgr( sock_base2::socket_type s, + const sockaddr& addr, + sock_base2::stype t ) +{ + if ( basic_socket_t::is_open() || s == -1 ) { + return 0; + } + basic_socket_t::_fd = s; + memcpy( (void *)&basic_socket_t::_address.any, (const void *)&addr, sizeof(sockaddr) ); + _mode = ios_base::in | ios_base::out; + _type = t; +#ifdef WIN32 + WSASetLastError( 0 ); +#endif + if ( t == sock_base2::sock_stream ) { + _xwrite = &_Self_type::write; + _xread = &_Self_type::read; + } else if ( t == sock_base2::sock_dgram ) { + _xwrite = &_Self_type::sendto; + _xread = &_Self_type::recvfrom; + } else { + basic_socket_t::_fd = -1; + return 0; // unsupported type + } + + if ( _bbuf == 0 ) { + struct ifconf ifc; + struct ifreq ifr; + ifc.ifc_len = sizeof(ifreq); + ifc.ifc_req = 𝔦 + int mtu = ((ioctl(basic_socket_t::_fd, SIOCGIFMTU, &ifc) < 0 ? 1500 : ifr.ifr_mtu) - 20 - (t == sock_base2::sock_stream ? 20 : 8 )) / sizeof(charT); + int qlen = ioctl(basic_socket_t::_fd, SIOCGIFTXQLEN, &ifc) < 0 ? 2 : ifr.ifr_qlen; + _M_allocate_block( t == sock_base2::sock_stream ? mtu * qlen * 2 : mtu * 2); + } + + if ( _bbuf == 0 ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return 0; + } + + if ( fcntl( basic_socket_t::_fd, F_SETFL, fcntl( basic_socket_t::_fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + + return this; +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::attach( sock_base2::socket_type s, + sock_base2::stype t ) +{ + if ( basic_socket_t::is_open() || s == -1 ) { + return 0; + } + + sockaddr sa; + socklen_t sl = sizeof(sa); + getsockname( s, &sa, &sl ); + + return basic_sockbuf<charT, traits, _Alloc>::attach( s, sa, t ); +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::attach( sock_base2::socket_type s, + const sockaddr& addr, + sock_base2::stype t ) +{ + if ( basic_socket_t::is_open() || s == -1 ) { + return 0; + } + + // _doclose = false; + return basic_sockbuf2<charT, traits, _Alloc>::open( dup(s), addr, t ); +} + +template<class charT, class traits, class _Alloc> +basic_sockbuf2<charT, traits, _Alloc> * +basic_sockbuf2<charT, traits, _Alloc>::close() +{ + if ( !basic_socket_t::is_open() ) + return 0; + + // if ( _doclose ) { +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + // } + + // _STLP_ASSERT( _bbuf != 0 ); + // put area before get area + setp( _bbuf, _bbuf + ((_ebuf - _bbuf)>>1) ); + setg( this->epptr(), this->epptr(), this->epptr() ); + + if ( basic_socket_t::_notify_close ) { + basic_socket_t::mgr->exit_notify( this, basic_socket_t::_fd ); + basic_socket_t::_notify_close = false; + } + + basic_socket_t::_fd = -1; + + return this; +} + +template<class charT, class traits, class _Alloc> +void basic_sockbuf2<charT, traits, _Alloc>::shutdown( sock_base2::shutdownflg dir ) +{ + if ( basic_socket_t::is_open() ) { + if ( (dir & (sock_base2::stop_in | sock_base2::stop_out)) == + (sock_base2::stop_in | sock_base2::stop_out) ) { + ::shutdown( basic_socket_t::_fd, 2 ); + } else if ( dir & sock_base2::stop_in ) { + ::shutdown( basic_socket_t::_fd, 0 ); + } else if ( dir & sock_base2::stop_out ) { + ::shutdown( basic_socket_t::_fd, 1 ); + } + } +} + +template<class charT, class traits, class _Alloc> +__FIT_TYPENAME basic_sockbuf2<charT, traits, _Alloc>::int_type +basic_sockbuf2<charT, traits, _Alloc>::underflow() +{ + if( !basic_socket_t::is_open() ) + return traits::eof(); + + std::tr2::unique_lock<std::tr2::mutex> lk( ulck ); + + if ( this->gptr() < this->egptr() ) + return traits::to_int_type(*this->gptr()); + + if ( this->egptr() == this->gptr() ) { // fullfilled: _ebuf == gptr() + setg( this->eback(), this->eback(), this->eback() ); + } + + // setg( this->eback(), this->eback(), this->eback() + offset ); + // wait on condition + if ( basic_socket_t::_use_rdtimeout ) { + ucnd.timed_wait( lk, basic_socket_t::_rdtimeout, rdready ); + } else { + ucnd.wait( lk, rdready ); + } + + return traits::to_int_type(*this->gptr()); +} + +template<class charT, class traits, class _Alloc> +__FIT_TYPENAME basic_sockbuf2<charT, traits, _Alloc>::int_type +basic_sockbuf2<charT, traits, _Alloc>::overflow( int_type c ) +{ + if ( !basic_socket_t::is_open() ) + return traits::eof(); + + if ( !traits::eq_int_type( c, traits::eof() ) && this->pptr() < this->epptr() ) { + sputc( traits::to_char_type(c) ); + return c; + } + + long count = this->pptr() - this->pbase(); + + if ( count ) { + count *= sizeof(charT); + long offset = (this->*_xwrite)( this->pbase(), count ); + if ( offset < 0 ) { + if ( errno == EAGAIN ) { + pollfd wpfd; + wpfd.fd = basic_socket_t::_fd; + wpfd.events = POLLOUT | POLLHUP | POLLWRNORM; + wpfd.revents = 0; + while ( poll( &wpfd, 1, basic_socket_t::_use_wrtimeout ? basic_socket_t::_wrtimeout.count() : -1 ) <= 0 ) { // wait infinite + if ( errno == EINTR ) { // may be interrupted, check and ignore + errno = 0; + continue; + } + return traits::eof(); + } + if ( (wpfd.revents & POLLERR) != 0 ) { + return traits::eof(); + } + offset = (this->*_xwrite)( this->pbase(), count ); + if ( offset < 0 ) { + return traits::eof(); + } + } else { + return traits::eof(); + } + } + if ( offset < count ) { + // MUST BE: (offset % sizeof(char_traits)) == 0 ! + offset /= sizeof(charT); + count /= sizeof(charT); + traits::move( this->pbase(), this->pbase() + offset, count - offset ); + // std::copy_backword( this->pbase() + offset, this->pbase() + count, this->pbase() ); + setp( this->pbase(), this->epptr() ); // require: set pptr + this->pbump( count - offset ); + if( !traits::eq_int_type(c,traits::eof()) ) { + sputc( traits::to_char_type(c) ); + } + + return traits::not_eof(c); + } + } + + setp( this->pbase(), this->epptr() ); // require: set pptr + if( !traits::eq_int_type(c,traits::eof()) ) { + sputc( traits::to_char_type(c) ); + } + + return traits::not_eof(c); +} + +template<class charT, class traits, class _Alloc> +int basic_sockbuf2<charT, traits, _Alloc>::sync() +{ + if ( !basic_socket_t::is_open() ) { + return -1; + } + + long count = this->pptr() - this->pbase(); + if ( count ) { + // _STLP_ASSERT( this->pbase() != 0 ); + count *= sizeof(charT); + long start = 0; + while ( count > 0 ) { + long offset = (this->*_xwrite)( this->pbase() + start, count ); + if ( offset < 0 ) { + if ( errno == EAGAIN ) { + pollfd wpfd; + wpfd.fd = basic_socket_t::_fd; + wpfd.events = POLLOUT | POLLHUP | POLLWRNORM; + wpfd.revents = 0; + while ( poll( &wpfd, 1, basic_socket_t::_use_wrtimeout ? basic_socket_t::_wrtimeout.count() : -1 ) <= 0 ) { // wait infinite + if ( errno == EINTR ) { // may be interrupted, check and ignore + errno = 0; + continue; + } + return -1; + } + if ( (wpfd.revents & POLLERR) != 0 ) { + return -1; + } + offset = (this->*_xwrite)( this->pbase() + start, count ); + if ( offset < 0 ) { + return -1; + } + } else { + return -1; + } + } + count -= offset; + start += offset; + } + setp( this->pbase(), this->epptr() ); // require: set pptr + } + + return 0; +} + +template<class charT, class traits, class _Alloc> +streamsize basic_sockbuf2<charT, traits, _Alloc>:: +xsputn( const char_type *s, streamsize n ) +{ + if ( !basic_socket_t::is_open() || s == 0 || n == 0 ) { + return 0; + } + + if ( this->epptr() - this->pptr() > n ) { + traits::copy( this->pptr(), s, n ); + this->pbump( n ); + } else { + streamsize __n_put = this->epptr() - this->pptr(); + traits::copy( this->pptr(), s, __n_put ); + this->pbump( __n_put ); + + if ( traits::eq_int_type(overflow(),traits::eof()) ) + return 0; + + setp( (char_type *)(s + __n_put), (char_type *)(s + n) ); + this->pbump( n - __n_put ); + + if ( traits::eq_int_type(overflow(),traits::eof()) ) { + setp( _bbuf, _bbuf + ((_ebuf - _bbuf) >> 1) ); + return 0; + } + setp( _bbuf, _bbuf + ((_ebuf - _bbuf) >> 1) ); + } + return n; +} + +template<class charT, class traits, class _Alloc> +int basic_sockbuf2<charT, traits, _Alloc>::recvfrom( void *buf, size_t n ) +{ +#if defined(_WIN32) || (defined(__hpux) && !defined(_INCLUDE_POSIX1C_SOURCE)) + int sz = sizeof( sockaddr_in ); +#else + socklen_t sz = sizeof( sockaddr_in ); +#endif + + typename basic_socket_t::sockaddr_t addr; + +#ifdef __FIT_POLL + pollfd pfd; + pfd.fd = basic_socket_t::_fd; + pfd.events = POLLIN; +#endif // __FIT_POLL + do { +#ifdef __FIT_POLL + pfd.revents = 0; + if ( poll( &pfd, 1, /* _timeout */ -1 ) > 0 ) { // wait infinite + // get address of caller only + char buff[32]; + ::recvfrom( basic_socket_t::_fd, buff, 32, MSG_PEEK, &addr.any, &sz ); + } else { + return 0; // poll wait infinite, so it can't return 0 (timeout), so it return -1. + } +#endif // __FIT_POLL + if ( memcmp( &basic_socket_t::_address.inet, &addr.inet, sizeof(sockaddr_in) ) == 0 ) { +#ifdef WIN32 + return ::recvfrom( basic_socket_t::_fd, (char *)buf, n, 0, &basic_socket_t::_address.any, &sz ); +#else + return ::recvfrom( basic_socket_t::_fd, buf, n, 0, &basic_socket_t::_address.any, &sz ); +#endif + } + xmt::Thread::yield(); + } while ( true ); + + return 0; // never +} + +template<class charT, class traits, class _Alloc> +void basic_sockstream2<charT, traits, _Alloc>::setoptions( sock_base2::so_t optname, bool on_off, int __v ) +{ +#ifdef __unix + if ( _sb.is_open() ) { + if ( optname != sock_base2::so_linger ) { + int turn = on_off ? 1 : 0; + if ( setsockopt( _sb.fd(), SOL_SOCKET, (int)optname, (const void *)&turn, + (socklen_t)sizeof(int) ) != 0 ) { + this->setstate( ios_base::failbit ); + } + } else { + linger l; + l.l_onoff = on_off ? 1 : 0; + l.l_linger = __v; + if ( setsockopt( _sb.fd(), SOL_SOCKET, (int)optname, (const void *)&l, + (socklen_t)sizeof(linger) ) != 0 ) { + this->setstate( ios_base::failbit ); + } + } + } else { + this->setstate( ios_base::failbit ); + } +#endif // __unix +} + +#ifdef STLPORT +_STLP_END_NAMESPACE +#else +} // namespace std +#endif + Added: branches/complement-sockios/explore/include/sockios/sp.h =================================================================== --- branches/complement-sockios/explore/include/sockios/sp.h (rev 0) +++ branches/complement-sockios/explore/include/sockios/sp.h 2008-03-11 15:03:22 UTC (rev 1833) @@ -0,0 +1,697 @@ +// -*- C++ -*- Time-stamp: <08/03/07 01:40:59 ptr> + +/* + * Copyright (c) 2008 + * Petr Ovtchenkov + * + * Licensed under the Academic Free License Version 3.0 + * + */ + +#ifndef __SOCKIOS_SP_H +#define __SOCKIOS_SP_H + +#include <sys/epoll.h> + +#ifndef EPOLLRDHUP +# define EPOLLRDHUP 0x2000 +#endif + +#include <fcntl.h> + +#include <cerrno> +#include <mt/thread> +#include <mt/mutex> + +#ifdef STLPORT +# include <unordered_map> +# include <unordered_set> +// # include <hash_map> +// # include <hash_set> +// # define __USE_STLPORT_HASH +# define __USE_STLPORT_TR1 +#else +# if defined(__GNUC__) && (__GNUC__ < 4) +# include <ext/hash_map> +# include <ext/hash_set> +# define __USE_STD_HASH +# else +# include <tr1/unordered_map> +# include <tr1/unordered_set> +# define __USE_STD_TR1 +# endif +#endif + +#include <sockios/sockstream> + +namespace std { + +template <class charT, class traits, class _Alloc> class basic_sockbuf2; +template <class charT, class traits, class _Alloc> class basic_sockstream2; +template <class charT, class traits, class _Alloc> class sock_processor_base; + +namespace detail { + +template<class charT, class traits, class _Alloc> +class _sock_processor_base : + public sock_base2, + public basic_socket<charT,traits,_Alloc> +{ + private: + typedef basic_socket<charT,traits,_Alloc> basic_socket_t; + + protected: + _sock_processor_base() : + _mode( ios_base::in | ios_base::out ), + _state( ios_base::goodbit ) + { } + + virtual ~_sock_processor_base() + { + _sock_processor_base::close(); + } + + public: + void open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ); + void open( unsigned long addr, int port, sock_base2::stype type, sock_base2::protocol prot ); + void open( int port, sock_base2::stype type, sock_base2::protocol prot ); + + virtual void close(); + + protected: + void setoptions_unsafe( sock_base2::so_t optname, bool on_off = true, int __v = 0 ); + + public: + bool is_open() const + { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); return basic_socket_t::is_open_unsafe(); } + bool good() const + { return _state == ios_base::goodbit; } + + sock_base2::socket_type fd() const + { std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); sock_base2::socket_type tmp = basic_socket_t::fd_unsafe(); return tmp; } + + void shutdown( sock_base2::shutdownflg dir ); + void setoptions( sock_base2::so_t optname, bool on_off = true, int __v = 0 ) + { + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + setoptions_unsafe( optname, on_off, __v ); + } + + private: + _sock_processor_base( const _sock_processor_base& ); + _sock_processor_base& operator =( const _sock_processor_base& ); + + private: + unsigned long _mode; // open mode + unsigned long _state; // state flags + + protected: + std::tr2::mutex _fd_lck; + // xmt::condition _loop_cnd; +}; + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::open( const in_addr& addr, int port, sock_base2::stype type, sock_base2::protocol prot ) +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + return; + } + _mode = ios_base::in | ios_base::out; + _state = ios_base::goodbit; +#ifdef WIN32 + ::WSASetLastError( 0 ); +#endif + if ( prot == sock_base2::inet ) { + basic_socket_t::_fd = socket( PF_INET, type, 0 ); + if ( basic_socket_t::_fd == -1 ) { + _state |= ios_base::failbit | ios_base::badbit; + return; + } + // _open = true; + basic_socket_t::_address.inet.sin_family = AF_INET; + basic_socket_t::_address.inet.sin_port = htons( port ); + basic_socket_t::_address.inet.sin_addr.s_addr = addr.s_addr; + + if ( type == sock_base2::sock_stream || type == sock_base2::sock_seqpacket ) { + // let's try reuse local address + setoptions_unsafe( sock_base2::so_reuseaddr, true ); + } + + if ( ::bind( basic_socket_t::_fd, &basic_socket_t::_address.any, sizeof(basic_socket_t::_address) ) == -1 ) { + _state |= ios_base::failbit; +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; + return; + } + + if ( type == sock_base2::sock_stream || type == sock_base2::sock_seqpacket ) { + // I am shure, this is socket of type SOCK_STREAM | SOCK_SEQPACKET, + // so don't check return code from listen + ::listen( basic_socket_t::_fd, SOMAXCONN ); + basic_socket_t::mgr->push( dynamic_cast<sock_processor_base<charT,traits,_Alloc>&>(*this) ); + } + } else if ( prot == sock_base2::local ) { + return; + } else { + return; + } + _state = ios_base::goodbit; + + return; +} + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::open( unsigned long addr, int port, sock_base2::stype type, sock_base2::protocol prot ) +{ + in_addr _addr; + _addr.s_addr = htonl( addr ); + _sock_processor_base::open( _addr, port, type, prot ); +} + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::open( int port, sock_base2::stype type, sock_base2::protocol prot ) +{ + _sock_processor_base::open(INADDR_ANY, port, type, prot); +} + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::close() +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( !basic_socket_t::is_open_unsafe() ) { + return; + } +#ifdef WIN32 + ::closesocket( basic_socket_t::_fd ); +#else + ::shutdown( basic_socket_t::_fd, 2 ); + ::close( basic_socket_t::_fd ); +#endif + basic_socket_t::_fd = -1; +} + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::shutdown( sock_base2::shutdownflg dir ) +{ + std::tr2::lock_guard<std::tr2::mutex> lk(_fd_lck); + if ( basic_socket_t::is_open_unsafe() ) { + if ( (dir & (sock_base2::stop_in | sock_base2::stop_out)) == + (sock_base2::stop_in | sock_base2::stop_out) ) { + ::shutdown( basic_socket_t::_fd, 2 ); + } else if ( dir & sock_base2::stop_in ) { + ::shutdown( basic_socket_t::_fd, 0 ); + } else if ( dir & sock_base2::stop_out ) { + ::shutdown( basic_socket_t::_fd, 1 ); + } + } +} + +template<class charT, class traits, class _Alloc> +void _sock_processor_base<charT,traits,_Alloc>::setoptions_unsafe( sock_base2::so_t optname, bool on_off, int __v ) +{ +#ifdef __unix + if ( basic_socket_t::is_open_unsafe() ) { + if ( optname != sock_base2::so_linger ) { + int turn = on_off ? 1 : 0; + if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&turn, + (socklen_t)sizeof(int) ) != 0 ) { + _state |= ios_base::failbit; + } + } else { + linger l; + l.l_onoff = on_off ? 1 : 0; + l.l_linger = __v; + if ( setsockopt( basic_socket_t::_fd, SOL_SOCKET, (int)optname, (const void *)&l, + (socklen_t)sizeof(linger) ) != 0 ) { + _state |= ios_base::failbit; + } + + } + } else { + _state |= ios_base::failbit; + } +#endif // __unix +} + +} // namespace detail + +template <class charT, class traits, class _Alloc> +class sock_processor_base : + public detail::_sock_processor_base<charT,traits,_Alloc> +{ + private: + typedef detail::_sock_processor_base<charT,traits,_Alloc> sp_base_t; + + public: + typedef basic_sockstream2<charT,traits,_Alloc> sockstream_t; + + struct adopt_new_t { }; + struct adopt_close_t { }; + struct adopt_data_t { }; + + sock_processor_base() + { } + + explicit sock_processor_base( int port, sock_base2::stype t = sock_base2::sock_stream ) + { + sp_base_t::open( port, t, sock_base2::inet ); + } + + + virtual void operator ()( sockstream_t& s, const adopt_new_t& ) = 0; + virtual void operator ()( sockstream_t& s, const adopt_close_t& ) = 0; + virtual void operator ()( sockstream_t& s, const adopt_data_t& ) = 0; +}; + +typedef sock_processor_base<char,char_traits<char>,allocator<char> > sock_basic_processor; + +namespace detail { + +template<class charT, class traits, class _Alloc> +class sockmgr +{ + private: + typedef basic_sockstream2<charT,traits,_Alloc> sockstream_t; + typedef basic_sockbuf2<charT,traits,_Alloc> sockbuf_t; + typedef sock_processor_base<charT,traits,_Alloc> socks_processor_t; + + enum { + listener, + // tcp_stream, + tcp_buffer, + rqstop, + rqstart + }; + + struct fd_info + { + enum { + listener = 0x1, + level_triggered = 0x2, + owner = 0x4, + buffer = 0x8 + }; + + unsigned flags; + union { + sockstream_t* s; + sockbuf_t* b; + } s; + socks_processor_t *p; + }; + + struct ctl { + int cmd; + union { + int fd; + void *ptr; + } data; + }; + + static void _loop( sockmgr *me ) + { me->io_worker(); } + + public: + sockmgr( int hint = 1024, int ret = 512 ) : + n_ret( ret ) + { + efd = epoll_create( hint ); + if ( efd < 0 ) { + // throw system_error( errno ) + } + pipe( pipefd ); // check err + // cfd = pipefd[1]; + + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + ev_add.data.fd = pipefd[0]; + epoll_ctl( efd, EPOLL_CTL_ADD, pipefd[0], &ev_add ); + + _worker = new std::tr2::thread( _loop, this ); + + // ctl _ctl; + // _ctl.cmd = rqstart; + + // write( pipefd[1], &_ctl, sizeof(ctl) ); + } + + ~sockmgr() + { + if ( _worker->joinable() ) { + ctl _ctl; + _ctl.cmd = rqstop; + + write( pipefd[1], &_ctl, sizeof(ctl) ); + + _worker->join(); + } + close( pipefd[1] ); + close( pipefd[0] ); + close( efd ); + delete _worker; + } + + void push( socks_processor_t& p ) + { + ctl _ctl; + _ctl.cmd = listener; + _ctl.data.ptr = static_cast<void *>(&p); + + write( pipefd[1], &_ctl, sizeof(ctl) ); + } + +#if 0 + void push( sockstream_t& s ) + { + ctl _ctl; + _ctl.cmd = tcp_stream; + _ctl.data.ptr = static_cast<void *>(&s); + + write( pipefd[1], &_ctl, sizeof(ctl) ); + } +#endif + + void push( sockbuf_t& s ) + { + ctl _ctl; + _ctl.cmd = tcp_buffer; + _ctl.data.ptr = static_cast<void *>(&s); + + errno = 0; + int r = write( pipefd[1], &_ctl, sizeof(ctl) ); + } + + void exit_notify( sockbuf_t* b, int fd ) + { + fd_info info = { 0, reinterpret_cast<sockstream_t*>(b), 0 }; + std::tr2::lock_guard<std::tr2::mutex> lk( cll ); + closed_queue[fd] = info; + } + + private: + sockmgr( const sockmgr& ) + { } + sockmgr& operator =( const sockmgr& ) + { return *this; } + + void io_worker(); + + int efd; + int pipefd[2]; + std::tr2::thread *_worker; + const int n_ret; + +#ifdef __USE_STLPORT_HASH + typedef std::hash_map<int,fd_info> fd_container_type; +#endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_map<int, fd_info> fd_container_type; +#endif +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) + typedef std::tr1::unordered_map<int, fd_info> fd_container_type; +#endif + + fd_container_type descr; + fd_container_type closed_queue; + std::tr2::mutex cll; +}; + +template<class charT, class traits, class _Alloc> +void sockmgr<charT,traits,_Alloc>::io_worker() +{ + epoll_event ev[n_ret]; + +/* + ctl _xctl; + int r = read( pipefd[0], &_xctl, sizeof(ctl) ); + + if ( _xctl.cmd == rqstart ) { + std::cerr << "io_worker fine" << std::endl; + } else { + std::cerr << "io_worker not fine, " << r << ", " << errno << std::endl; + } +*/ + + for ( ; ; ) { + int n = epoll_wait( efd, &ev[0], n_ret, -1 ); + if ( n < 0 ) { + if ( errno == EINTR ) { + continue; + } + // throw system_error + } + for ( int i = 0; i < n; ++i ) { + if ( ev[i].data.fd == pipefd[0] ) { + epoll_event ev_add; + ctl _ctl; + int r = read( pipefd[0], &_ctl, sizeof(ctl) ); + if ( r < 0 ) { + // throw system_error + } else if ( r == 0 ) { + return; + } + + switch ( _ctl.cmd ) { + case listener: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<socks_processor_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + fd_info new_info = { fd_info::listener, 0, static_cast<socks_processor_t*>(_ctl.data.ptr) }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); + // throw system_error + } + } + break; +#if 0 + case tcp_stream: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<sockstream_t*>(_ctl.data.ptr)->rdbuf()->fd(); + if ( ev_add.data.fd >= 0 ) { + fd_info new_info = { 0, static_cast<sockstream_t*>(_ctl.data.ptr), 0 }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); + // throw system_error + } + } + break; +#endif + case tcp_buffer: + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = static_cast<sockbuf_t*>(_ctl.data.ptr)->fd(); + if ( ev_add.data.fd >= 0 ) { + fd_info new_info = { fd_info::buffer, static_cast<sockstream_t* /* sockbuf_t* */ >(_ctl.data.ptr), 0 }; + descr[ev_add.data.fd] = new_info; + if ( epoll_ctl( efd, EPOLL_CTL_ADD, ev_add.data.fd, &ev_add ) < 0 ) { + descr.erase( ev_add.data.fd ); + // throw system_error + } + } + break; + case rqstop: + return; + break; + } + + continue; + } + + typename fd_container_type::iterator ifd = descr.find( ev[i].data.fd ); + if ( ifd == descr.end() ) { + throw std::logic_error( "file descriptor in epoll, but not in descr[]" ); + } + + fd_info& info = ifd->second; + if ( info.flags & fd_info::listener ) { + if ( ev[i].events & EPOLLRDHUP ) { + epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ); + // walk through descr and detach every .p ? + descr.erase( ifd ); + } else if ( ev[i].events & EPOLLIN ) { + sockaddr addr; + socklen_t sz = sizeof( sockaddr_in ); + + int fd = accept( ev[i].data.fd, &addr, &sz ); + if ( fcntl( fd, F_SETFL, fcntl( fd, F_GETFL ) | O_NONBLOCK ) != 0 ) { + throw std::runtime_error( "can't establish nonblock mode" ); + } + sockstream_t* s; + + try { + s = new sockstream_t(); + if ( s->rdbuf()->_open_sockmgr( fd, addr ) ) { + epoll_event ev_add; + ev_add.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT; + ev_add.data.fd = fd; + fd_info new_info = { fd_info::owner, s, info.p }; + descr[fd] = new_info; + + if ( epoll_ctl( efd, EPOLL_CTL_ADD, fd, &ev_add ) < 0 ) { + std::cerr << "Accept, add " << fd << ", errno " << errno << std::endl; + descr.erase( fd ); + // throw system_error + } + (*info.p)( *s, typename socks_processor_t::adopt_new_t() ); + } else { + std::cerr << "Accept, delete " << fd << std::endl; + delete s; + } + } + catch ( const std::bad_alloc& ) { + // nothing + } + catch ( ... ) { + descr.erase( fd ); + delete s; + } + } + } else { + if ( ev[i].events & EPOLLIN ) { + if ( (info.flags & fd_info::owner) == 0 ) { + // marginal case: me not owner (registerd via push(), + // when I owner, I know destroy point), + // already closed, but I not see closed event yet; + // object may be deleted already, so I can't + // call b->egptr() etc. here + std::tr2::lock_guard<std::tr2::mutex> lck( cll ); + typename fd_container_type::iterator closed_ifd = closed_queue.find( ev[i].data.fd ); + if ( closed_ifd != closed_queue.end() ) { + closed_queue.erase( closed_ifd ); + if ( epoll_ctl( efd, EPOLL_CTL_DEL, ifd->first, 0 ) < 0 ) { + // throw system_error + } + descr.erase( ifd ); + continue; + } + } + sockbuf_t* b = (info.flags & fd_info::buffer != 0) ? info.s.b : info.s.s->rdbuf(); + errno = 0; + for ( ; ; ) { + if ( b->_ebuf == b->egptr() ) { + // process extract data from buffer too slow for us! + if ( (info.flags & fd_info::level_triggered) == 0 ) { + epoll_event xev; + xev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + xev.data.fd = ev[i].data.fd; + in... [truncated message content] |