From: Christian P. <cp...@us...> - 2005-06-06 12:09:43
|
Update of /cvsroot/pclasses/pclasses2/src/Net In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv28363/src/Net Modified Files: InetSocket.cpp ServerSocketManager.cpp Socket.cpp Log Message: - Fixed Socket::setBlocking() - Added AsyncSocket, AsyncStreamSocket, AsyncInetSocket, AsyncTCPSocket Index: ServerSocketManager.cpp =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/Net/ServerSocketManager.cpp,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- ServerSocketManager.cpp 10 Feb 2005 19:10:37 -0000 1.1 +++ ServerSocketManager.cpp 6 Jun 2005 12:09:31 -0000 1.2 @@ -82,11 +82,11 @@ // left empty... } -void ServerSocketManager::slotConnected(SocketListener& l) +void ServerSocketManager::slotConnected() { - StreamSocketServer& srv = (StreamSocketServer&)l.socket(); - onConnect(srv); - sigConnect.fire(srv); +// StreamSocketServer& srv = (StreamSocketServer&)l.device(); +// onConnect(srv); +// sigConnect.fire(srv); } Index: Socket.cpp =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/Net/Socket.cpp,v retrieving revision 1.9 retrieving revision 1.10 diff -u -d -r1.9 -r1.10 --- Socket.cpp 10 Feb 2005 19:13:45 -0000 1.9 +++ Socket.cpp 6 Jun 2005 12:09:31 -0000 1.10 @@ -19,6 +19,7 @@ ***************************************************************************/ #include "pclasses/pclasses-config.h" +#include "pclasses/Trace.h" #include "pclasses/Net/Socket.h" #include "SocketOption.h" #include "../System/timeout.h" @@ -472,12 +473,13 @@ P_SOURCEINFO); long flags = ret; - if(enable) - ret |= O_NONBLOCK; + if(!enable) + flags |= O_NONBLOCK; else - ret &= ~O_NONBLOCK; + flags &= ~O_NONBLOCK; ret = ::fcntl(handle(), F_SETFL, flags); + if(ret == -1) throw IO::IOError(errno, "Could not get file-descriptor flags", P_SOURCEINFO); } @@ -488,6 +490,327 @@ } +AsyncSocket::AsyncSocket() throw() +: _eventQueue(System::EventQueue::instance()), + _listener(0), + _connState(NotConnected) +{ +} + +AsyncSocket::AsyncSocket(System::EventQueue& evq) throw() +: _eventQueue(evq), + _listener(0), + _connState(NotConnected) +{ +} + +AsyncSocket::AsyncSocket(Domain domain, Type type, int proto) throw(IO::IOError) +: Socket(), + _eventQueue(System::EventQueue::instance()), + _listener(0), + _connState(NotConnected) +{ + open(domain, type, proto); +} + +AsyncSocket::~AsyncSocket() throw() +{ + close(); +} + +#define BLOCKING_ERROR(x) (x == EWOULDBLOCK || x == EINPROGRESS) + +void AsyncSocket::connect(const NetworkAddress& addr, port_t port) + throw(IO::IOError) +{ + try + { + Socket::connect(addr, port); + } + catch(IO::IOError& err) + { + if(BLOCKING_ERROR(err.errorNo())) + { + _connState = Connecting; + _listener->setEventMask(_listener->eventMask() | + SocketListener::NotifyWrite); + return; + } + + P_TRACE(AsyncSocket) << "connection failed"; + throw; + } + + P_TRACE(AsyncSocket) << "connection succeeded immediatly"; + + _connState = Connected; + onConnect(); +} + +void AsyncSocket::open(Domain domain, Type type, int proto) + throw(IO::IOError) +{ + Socket::open(domain, type, proto); + + _listener = new SocketListener(_eventQueue, *this); + _listener->sigRead.bind(make_method(this, &AsyncSocket::readNotify)); + _listener->sigWrite.bind(make_method(this, &AsyncSocket::writeNotify)); + _listener->sigError.bind(make_method(this, &AsyncSocket::errorNotify)); + + setBlocking(false); +} + +void AsyncSocket::open(int handle, Domain domain, Type type, int proto) + throw(IO::IOError) +{ + Socket::open(handle, domain, type, proto); + + _listener = new SocketListener(_eventQueue, *this); + _listener->sigRead.bind(make_method(this, &AsyncSocket::readNotify)); + _listener->sigWrite.bind(make_method(this, &AsyncSocket::writeNotify)); + _listener->sigError.bind(make_method(this, &AsyncSocket::errorNotify)); + + setBlocking(false); +} + +void AsyncSocket::_close() throw(IO::IOError) +{ + Socket::_close(); + + _connState = NotConnected; + + if(_listener) + { + delete _listener; + _listener = 0; + } + + if(!_writeBuffer.empty()) + _writeBuffer.clear(); +} + +size_t AsyncSocket::_read(char* buffer, size_t count) throw(IO::IOError) +{ + size_t ret; + try + { + ret = Socket::_read(buffer, count); + } + catch(IO::IOError& err) + { + if(BLOCKING_ERROR(err.errorNo())) + return 0; + + throw; + } + + return ret; +} + +size_t AsyncSocket::_peek(char* buffer, size_t count) throw(IO::IOError) +{ + return Socket::_peek(buffer, count); +} + +size_t AsyncSocket::_write(const char* buffer, size_t count) throw(IO::IOError) +{ + size_t ret; + + // we must write the previously buffered data first ... + if(!_writeBuffer.empty()) + { + try + { + ret = Socket::_write(_writeBuffer.data(), _writeBuffer.size()); + } + catch(IO::IOError& err) + { + if(BLOCKING_ERROR(err.errorNo())) + return 0; + + throw; + } + + // dequeue written data ... + if(ret > 0) + { + P_TRACE(AsyncSocket) << "Wrote " << ret << " bytes from buffer"; + _writeBuffer.pop(ret); + } + + // if we did not write everything from the buffer, queue the entire + // buffer that has been passed to us ... + if(!_writeBuffer.empty()) + { + P_TRACE(AsyncSocket) << "Queued " << count << " bytes into buffer"; + _writeBuffer.push(buffer, count); + + // we want to get notified on next write-event ... + _listener->setEventMask(_listener->eventMask() | + SocketListener::NotifyWrite); + + return count; + } + } + + try + { + ret = Socket::_write(buffer, count); + } + catch(IO::IOError& err) + { + if(BLOCKING_ERROR(err.errorNo())) + return 0; + + throw; + } + + // queue the remaining data from the buffer ... + if(ret < count) + { + P_TRACE(AsyncSocket) << "Wrote " << ret << " bytes, queued " << count - ret << " bytes"; + _writeBuffer.push(buffer + ret, count - ret); + + // we want to get notified on next write-event ... + _listener->setEventMask(_listener->eventMask() | + SocketListener::NotifyWrite); + } + else + { + P_TRACE(AsyncSocket) << "Wrote everything, buffer empty"; + + // buffer is empty, we dont want to get notified on next write-event ... + _listener->setEventMask(_listener->eventMask() & + ~SocketListener::NotifyWrite); + } + + return count; +} + +void AsyncSocket::readNotify() +{ + P_TRACE(AsyncSocket) << "readNotify()"; + + char tmp[4096]; + size_t ret; + + try + { + ret = read(tmp, sizeof(tmp)); + } + catch(IO::IOError& err) + { + switch(_connState) + { + case Connecting: + _connState = NotConnected; + onConnectFailed(err); + break; + + case Connected: + _connState = NotConnected; + onClose(); + break; + + default: + onError(err); + break; + } + + // dont get notified on further read-events ... + _listener->setEventMask(_listener->eventMask() & + ~SocketListener::NotifyRead); + + return; + } + + // if we did read nothing, EOF has been reached + if(!ret) + { + if(_connState == Connected) + { + _connState = NotConnected; + onClose(); + } + + // dont get notified on further read-events ... + _listener->setEventMask(_listener->eventMask() & + ~SocketListener::NotifyRead); + } + else + { + onRead(tmp, ret); + } +} + +void AsyncSocket::writeNotify() +{ + P_TRACE(AsyncSocket) << "writeNotify()"; + + // if connect() has been called previously, the connection has succeeded + if(_connState == Connecting) + { + P_TRACE(AsyncSocket) << "connection succeeded"; + _connState = Connected; + onConnect(); + } + + // write buffered data first ... + try + { + _write(0, 0); + } + catch(IO::IOError& err) + { + onError(err); + return; + } + + onWrite(); +} + +void AsyncSocket::errorNotify() +{ + P_TRACE(AsyncSocket) << "errorNotify()"; + //onError(); +} + +void AsyncSocket::onConnect() +{ + P_TRACE(AsyncSocket) << "onConnect()"; +} + +void AsyncSocket::onConnectFailed(const IO::IOError& err) +{ + P_TRACE(AsyncSocket) << "onConnectFailed()"; +} + +void AsyncSocket::onClose() +{ + P_TRACE(AsyncSocket) << "onClose()"; +} + +void AsyncSocket::onError(const IO::IOError& err) +{ + P_TRACE(AsyncSocket) << "onError()"; +} + +void AsyncSocket::onRead(const char* buffer, size_t count) +{ + P_TRACE(AsyncSocket) << "onRead(buffer,count)"; +} + +void AsyncSocket::onWrite() +{ + P_TRACE(AsyncSocket) << "onWrite()"; +} + +AsyncSocket::ConnectState AsyncSocket::connectState() const throw() +{ + return _connState; +} + + DatagramSocket::DatagramSocket() throw() : Socket() { @@ -509,6 +832,7 @@ return opt.get() == 1 ? true : false; } + StreamSocket::StreamSocket() throw() : Socket() { @@ -539,6 +863,36 @@ Socket::open(srv.accept(), srv.domain(), srv.type(), srv.protocol()); } + +AsyncStreamSocket::AsyncStreamSocket() throw() +{ +} + +AsyncStreamSocket::AsyncStreamSocket(Domain d, int proto) throw(IO::IOError) +{ + AsyncSocket::open(d, Socket::Stream, proto); +} + +AsyncStreamSocket::AsyncStreamSocket(StreamSocketServer& srv) throw(IO::IOError) +{ + open(srv); +} + +AsyncStreamSocket::~AsyncStreamSocket() throw() +{ +} + +void AsyncStreamSocket::open(Domain d, int proto) throw(IO::IOError) +{ + AsyncSocket::open(d, Socket::Stream, proto); +} + +void AsyncStreamSocket::open(StreamSocketServer& srv) throw(IO::IOError) +{ + AsyncSocket::open(srv.accept(), srv.domain(), srv.type(), srv.protocol()); +} + + StreamSocketServer::StreamSocketServer(Domain domain, int proto) throw(IO::IOError) : Socket(domain, Socket::Stream, proto) { @@ -607,6 +961,7 @@ return d; } + enum SocketEvent { SocketEventRead = 0, SocketEventWrite = 1, @@ -656,22 +1011,21 @@ }; - SocketListener::SocketListener(Socket& s) -: EventListener(&s) +: IOListener(s), EventListener(&s) { using System::FdListener; - _handle = (unsigned long)new SocketFdListener(this, s.handle(), + _handle = (void*)new SocketFdListener(this, s.handle(), FdListener::Read|FdListener::Write|FdListener::Error); } SocketListener::SocketListener(System::EventQueue& evq, Socket& s) -: EventListener(evq, &s) +: IOListener(s), EventListener(evq, &s) { using System::FdListener; - _handle = (unsigned long)new SocketFdListener(this, s.handle(), + _handle = (void*)new SocketFdListener(this, s.handle(), FdListener::Read|FdListener::Write|FdListener::Error); } @@ -680,18 +1034,16 @@ delete (SocketFdListener*)_handle; } -Socket& SocketListener::socket() const throw() -{ - return *((Socket*)sender()); -} - void SocketListener::setEventMask(int mask) { - ((SocketFdListener*)_handle)->setFlags(mask); - - // Wakeup the thread so it can get the new event flags ... - System::FdListenerList& lst = System::FdListenerList::instance(eventQueue()); - lst.wakeup(); + if(mask != eventMask()) + { + ((SocketFdListener*)_handle)->setFlags(mask); + + // Wakeup the thread so it can get the new event flags ... + System::FdListenerList& lst = System::FdListenerList::instance(eventQueue()); + lst.wakeup(); + } } int SocketListener::eventMask() const throw() @@ -701,35 +1053,25 @@ void SocketListener::signaled(const System::Event& ev) { + P_TRACE(SocketListener) << "got signaled. sender=" << ev.sender() << ", id=" << ev.id(); + switch(ev.id()) { case SocketEventRead: onRead(); - sigRead.fire(*this); + sigRead.fire(); break; case SocketEventWrite: onWrite(); - sigWrite.fire(*this); + sigWrite.fire(); break; case SocketEventError: onError(); - sigError.fire(*this); + sigError.fire(); break; } } -void SocketListener::onRead() -{ -} - -void SocketListener::onWrite() -{ -} - -void SocketListener::onError() -{ -} - } // !namespace Net Index: InetSocket.cpp =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/Net/InetSocket.cpp,v retrieving revision 1.3 retrieving revision 1.4 diff -u -d -r1.3 -r1.4 --- InetSocket.cpp 26 Jan 2005 10:25:40 -0000 1.3 +++ InetSocket.cpp 6 Jun 2005 12:09:30 -0000 1.4 @@ -73,6 +73,25 @@ return opt.get(); } +AsyncInetSocket::AsyncInetSocket() throw() +{ +} + +AsyncInetSocket::AsyncInetSocket(Type t, int proto) throw(IO::IOError) +{ + AsyncSocket::open(Socket::Inet, t, proto); +} + +AsyncInetSocket::~AsyncInetSocket() throw() +{ +} + +void AsyncInetSocket::open(Type type, int proto) throw(IO::IOError) +{ + AsyncSocket::open(Socket::Inet, type, proto); +} + + UDPSocket::UDPSocket() throw(IO::IOError) : InetSocket(), DatagramSocket() @@ -185,6 +204,31 @@ } +AsyncTCPSocket::AsyncTCPSocket() throw(IO::IOError) +{ + AsyncInetSocket::open(Socket::Stream, IPPROTO_TCP); +} + +AsyncTCPSocket::AsyncTCPSocket(StreamSocketServer& srv) throw(IO::IOError) +{ + open(srv); +} + +AsyncTCPSocket::~AsyncTCPSocket() throw() +{ +} + +void AsyncTCPSocket::open() throw(IO::IOError) +{ + AsyncInetSocket::open(Socket::Stream, IPPROTO_TCP); } +void AsyncTCPSocket::open(StreamSocketServer& srv) throw(IO::IOError) +{ + AsyncStreamSocket::open(srv); } + + +} // !namespace Net + +} // !namespace P |