From: <arn...@us...> - 2007-12-09 18:44:51
|
Revision: 99 http://adchpp.svn.sourceforge.net/adchpp/?rev=99&view=rev Author: arnetheduck Date: 2007-12-09 10:44:45 -0800 (Sun, 09 Dec 2007) Log Message: ----------- Simplified writers some Modified Paths: -------------- adchpp/trunk/SConstruct adchpp/trunk/adchpp/ManagedSocket.cpp adchpp/trunk/adchpp/ManagedSocket.h adchpp/trunk/adchpp/Socket.cpp adchpp/trunk/adchpp/SocketManager.cpp adchpp/trunk/adchpp/SocketManager.h adchpp/trunk/adchpp/Util.cpp adchpp/trunk/plugins/Script/examples/access.lua adchpp/trunk/readme.txt adchpp/trunk/swig/adchpp.i adchpp/trunk/swig/lua.i adchpp/trunk/swig/python.i adchpp/trunk/test/PyClient.py adchpp/trunk/unix/po/adchppd.pot Modified: adchpp/trunk/SConstruct =================================================================== --- adchpp/trunk/SConstruct 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/SConstruct 2007-12-09 18:44:45 UTC (rev 99) @@ -213,6 +213,8 @@ conf = Configure(env) +if conf.CheckCHeader('poll.h'): + conf.env.Append(CPPDEFINES='HAVE_POLL_H') if conf.CheckCHeader('sys/epoll.h'): conf.env.Append(CPPDEFINES=['HAVE_SYS_EPOLL_H']) if conf.CheckLib('dl', 'dlopen'): Modified: adchpp/trunk/adchpp/ManagedSocket.cpp =================================================================== --- adchpp/trunk/adchpp/ManagedSocket.cpp 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/ManagedSocket.cpp 2007-12-09 18:44:45 UTC (rev 99) @@ -34,6 +34,8 @@ ManagedSocket::ManagedSocket() throw() : outBuf(0), overFlow(0), disc(0) #ifdef _WIN32 , writeBuf(0) +#else +, blocked(false) #endif { } @@ -92,8 +94,12 @@ } ByteVector* ManagedSocket::prepareWrite() { - ByteVector* buffer = 0; + if(isBlocked()) { + return 0; + } + ByteVector* buffer = 0; + { FastMutex::Lock l(writeMutex); @@ -153,11 +159,11 @@ } void ManagedSocket::completeAccept() throw() { - SocketManager::getInstance()->addJob(std::tr1::bind(&ManagedSocket::processIncoming, this)); + SocketManager::getInstance()->addJob(connectedHandler); } -void ManagedSocket::failSocket() throw() { - SocketManager::getInstance()->addJob(std::tr1::bind(&ManagedSocket::processFail, this)); +void ManagedSocket::failSocket(int) throw() { + SocketManager::getInstance()->addJob(failedHandler); } void ManagedSocket::disconnect(Util::Reason reason) throw() { @@ -165,22 +171,14 @@ return; } - disc = GET_TICK(); + disc = GET_TICK() + SETTING(DISCONNECT_TIMEOUT); Util::reasons[reason]++; SocketManager::getInstance()->addDisconnect(this); } -void ManagedSocket::processIncoming() throw() { - connectedHandler(); -} - void ManagedSocket::processData(ByteVector* buf) throw() { dataHandler(*buf); Util::freeBuf = buf; } -void ManagedSocket::processFail() throw() { - failedHandler(); } - -} Modified: adchpp/trunk/adchpp/ManagedSocket.h =================================================================== --- adchpp/trunk/adchpp/ManagedSocket.h 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/ManagedSocket.h 2007-12-09 18:44:45 UTC (rev 99) @@ -73,15 +73,13 @@ void completeAccept() throw(); bool completeWrite(ByteVector* buf, size_t written) throw(); bool completeRead(ByteVector* buf) throw(); - void failSocket() throw(); + void failSocket(int error) throw(); void shutdown() { sock.shutdown(); } void close() { sock.disconnect(); } // Functions processing events - void processIncoming() throw(); void processData(ByteVector* buf) throw(); - void processFail() throw(); // No copies ManagedSocket(const ManagedSocket&); @@ -103,6 +101,12 @@ ByteVector* writeBuf; /** WSABUF for data being sent */ WSABUF wsabuf; + + bool isBlocked() { return writeBuf != 0; } +#else + bool blocked; + bool isBlocked() { return blocked; } + void setBlocked(bool blocked_) { blocked = blocked_; } #endif ConnectedHandler connectedHandler; Modified: adchpp/trunk/adchpp/Socket.cpp =================================================================== --- adchpp/trunk/adchpp/Socket.cpp 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/Socket.cpp 2007-12-09 18:44:45 UTC (rev 99) @@ -20,6 +20,10 @@ #include "Socket.h" +#ifdef HAVE_POLL_H +#include <poll.h> +#endif + namespace adchpp { using namespace std; @@ -37,13 +41,12 @@ { #ifdef _WIN32 checksocket(sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0, WSA_FLAG_OVERLAPPED)); - setBlocking(false); DWORD x = 0; setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&x, sizeof(x)); #else checksocket(sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)); - setBlocking(false); #endif + setBlocking(false); } break; case TYPE_UDP: @@ -75,13 +78,14 @@ #else sock = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); #endif - int x = 1; - ::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&x, sizeof(int)); if(sock == (socket_t)-1) { throw SocketException(socket_errno); } + int x = 1; + ::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&x, sizeof(int)); + tcpaddr.sin_family = AF_INET; tcpaddr.sin_port = htons(aPort); tcpaddr.sin_addr.s_addr = htonl(INADDR_ANY); @@ -254,25 +258,21 @@ if(waitFor & WAIT_READ || waitFor & WAIT_CONNECT) { fd.events |= POLLIN; } - if(waifFor & WAIT_WRITE) { + if(waitFor & WAIT_WRITE) { fd.events |= POLLOUT; } - int result = poll(&fd, 1, millis)); + int result = poll(&fd, 1, millis); if(result == 1) { if(fd.revents & POLLERR) { int y = 0; socklen_t z = sizeof(y); checksockerr(getsockopt(sock, SOL_SOCKET, SO_ERROR, (char*)&y, &z)); - if(y != 0) { - throw SocketException(y); - } - // Should never happen - throw SocketException("Unknown socket error"); + throw SocketException(y); } int ret = 0; - if(fr.revents & POLLIN) { + if(fd.revents & POLLIN) { ret |= waitFor & (WAIT_READ | WAIT_CONNECT); } if(fd.revents & POLLOUT) { @@ -384,9 +384,8 @@ void Socket::disconnect() throw() { if(sock != INVALID_SOCKET) { closesocket(sock); + sock = INVALID_SOCKET; } - - sock = INVALID_SOCKET; } } Modified: adchpp/trunk/adchpp/SocketManager.cpp =================================================================== --- adchpp/trunk/adchpp/SocketManager.cpp 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/SocketManager.cpp 2007-12-09 18:44:45 UTC (rev 99) @@ -48,12 +48,9 @@ struct MSOverlapped : OVERLAPPED { enum Types { - ACCEPT, + ACCEPT_DONE, READ_DONE, WRITE_DONE, - WRITE_WAITING, - WRITE_ALL, - DISCONNECT, SHUTDOWN } type; ManagedSocketPtr ms; @@ -69,17 +66,17 @@ } }; -class CompletionPort { +class Poller { public: - CompletionPort() : handle(INVALID_HANDLE_VALUE) { + Poller() : handle(INVALID_HANDLE_VALUE) { } - ~CompletionPort() { + ~Poller() { if(handle != INVALID_HANDLE_VALUE) ::CloseHandle(handle); } - bool create() { + bool init() { handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); return handle != NULL; } @@ -102,89 +99,124 @@ HANDLE handle; }; -class Writer : public Thread { -public: - static const size_t PREPARED_SOCKETS = 32; - - Writer() : stop(false) { + +#elif defined(HAVE_SYS_EPOLL_H) + +struct Poller { + Poller() : poll_fd(-1) { } - void addWriter(ManagedSocketPtr /*ms */) { - if(stop) - return; -#if 0 - MSOverlapped* overlapped = pool.get(); - *overlapped = MSOverlapped(MSOverlapped::WRITE_WAITING, ms); - - if(!port.post(overlapped)) { - LOGDT(SocketManager::className, "Fatal error while posting write to completion port: " + Util::translateError(::GetLastError())); + ~Poller() { + if(poll_fd != -1) { + close(poll_fd); } -#endif } - void addAllWriters() { - if(stop) - return; -#if 0 - MSOverlapped* overlapped = pool.get(); - *overlapped = MSOverlapped(MSOverlapped::WRITE_ALL); + bool init() { + poll_fd = epoll_create(1024); + if(poll_fd == -1) + return false; - if(!port.post(overlapped)) { - LOGDT(SocketManager::className, "Fatal error while posting writeAll to completion port: " + Util::translateError(::GetLastError())); + return true; + } + + bool associate(const ManagedSocketPtr& ms) { + struct epoll_event ev; + ev.data.ptr = reinterpret_cast<void*>(ms.get()); + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + return epoll_ctl(poll_fd, EPOLL_CTL_ADD, ms->getSocket(), &ev) == 0; + } + + bool associate(int fd) { + struct epoll_event ev; + ev.data.fd = fd; + ev.events = EPOLLIN; + return epoll_ctl(poll_fd, EPOLL_CTL_ADD, fd, &ev) == 0; + } + + bool get(vector<epoll_event>& events) { + events.clear(); + events.resize(1024); + while(true) { + int n = epoll_wait(poll_fd, &events[0], events.size(), WRITE_TIMEOUT); + if(n == -1) { + if(errno != EINTR) { + return false; + } + // Keep looping + } else { + events.resize(n); + return true; + } } -#endif } - void addDisconnect(ManagedSocketPtr ms) { - if(stop) - return; - - MSOverlapped* overlapped = pool.get(); - *overlapped = MSOverlapped(MSOverlapped::DISCONNECT, ms); - - if(!port.post(overlapped)) { - LOGDT(SocketManager::className, "Fatal error while posting disconnect to completion port: " + Util::translateError(::GetLastError())); - } - } + int poll_fd; +}; + +#else +#error No socket implementation for your platform +#endif // _WIN32 + +class Writer : public Thread { +public: + Writer() : stop(false) { + } + +#ifdef _WIN32 void shutdown() { stop = true; MSOverlapped* overlapped = pool.get(); *overlapped = MSOverlapped(MSOverlapped::SHUTDOWN); - if(!port.post(overlapped)) { + if(!poller.post(overlapped)) { LOGDT(SocketManager::className, "Fatal error while posting shutdown to completion port: " + Util::translateError(::GetLastError())); } join(); } +#else + void shutdown() { + stop = true; - void getErrors(SocketManager::ErrorMap& acceptErrors_, SocketManager::ErrorMap& readErrors_, SocketManager::ErrorMap& writeErrors_) { - FastMutex::Lock l(errorMutex); - acceptErrors_ = acceptErrors; - readErrors_ = readErrors; - writeErrors_ = writeErrors; + char ev = 0; + ::write(event[0], &ev, sizeof(ev)); + + join(); } - +#endif private: bool init() { - if(!port.create()) { - LOGDT(SocketManager::className, "Unable to create IO Completion port: " + Util::translateError(::GetLastError())); + if(!poller.init()) { + LOGDT(SocketManager::className, "Unable to start poller: " + Util::translateError(socket_errno)); return false; } try { - srv.listen(static_cast<short>(SETTING(SERVER_PORT))); + srv.listen(SETTING(SERVER_PORT)); + srv.setBlocking(false); } catch(const SocketException& e) { LOGDT(SocketManager::className, "Unable to create server socket: " + e.getError()); return false; } - if(!port.associate(srv.getSocket())) { - LOGDT(SocketManager::className, "Unable to associate IO Completion port: " + Util::translateError(::GetLastError())); + if(!poller.associate(srv.getSocket())) { + LOGDT(SocketManager::className, "Unable to associate server socket with poller: " + Util::translateError(socket_errno)); return false; } - + +#ifndef _WIN32 + if(socketpair(AF_UNIX, SOCK_STREAM, 0, event) == -1) { + LOGDT(SocketManager::className, "Unable to create event socketpair: " + Util::translateError(errno)); + return false; + } + + if(!poller.associate(event[1])) { + LOGDT(SocketManager::className, "Unable to associate event: " + Util::translateError(errno)); + return false; + } +#endif return true; } @@ -194,77 +226,18 @@ return 0; } - prepareAccept(); - - DWORD bytes = 0; - MSOverlapped* overlapped = 0; - uint32_t lastWrite = 0; - while(!stop || !accepting.empty() || !active.empty()) { - bool ret = port.get(&bytes, &overlapped); - //dcdebug("Event: %x, %x, %x, %x, %x, %x\n", (unsigned int)ret, (unsigned int)bytes, (unsigned int)ms, (unsigned int)overlapped, (unsigned int)overlapped->ms, (unsigned int)overlapped->type); +#ifdef _WIN32 + prepareAccept(); +#endif + while(!stop || !active.empty()) { + handleEvents(); - if(!ret) { - int error = ::GetLastError(); - if(overlapped == 0) { - if(error != WAIT_TIMEOUT) { - LOGDT(SocketManager::className, "Fatal error while getting status from completion port: " + Util::translateError(error)); - return error; - } - } else if(overlapped->type == MSOverlapped::ACCEPT) { - dcdebug("Error accepting: %s\n", Util::translateError(error).c_str()); - failAccept(overlapped->ms, error); - } else if(overlapped->type == MSOverlapped::READ_DONE) { - dcdebug("Error reading: %s\n", Util::translateError(error).c_str()); - failRead(overlapped->ms, error); - } else if(overlapped->type == MSOverlapped::WRITE_DONE) { - dcdebug("Error writing: %s\n", Util::translateError(error).c_str()); - failWrite(overlapped->ms, error); - } else { - dcdebug("Unknown error %d when waiting\n", overlapped->type); - } - } else { - switch(overlapped->type) { - case MSOverlapped::ACCEPT: { - checkDisconnects(); - handleAccept(overlapped->ms); - break; - } - case MSOverlapped::READ_DONE: { - handleReadDone(overlapped->ms); - break; - } - case MSOverlapped::WRITE_DONE: { - handleWriteDone(overlapped->ms, bytes); - break; - } - case MSOverlapped::WRITE_WAITING: { - prepareWrite(overlapped->ms); - break; - } - case MSOverlapped::WRITE_ALL: { - writeAll(); - break; - } - case MSOverlapped::DISCONNECT: { - handleDisconnect(overlapped->ms); - break; - } - case MSOverlapped::SHUTDOWN: { - handleShutdown(); - break; - } - } - } - if(overlapped != 0) { - pool.put(overlapped); - } - uint32_t now = GET_TICK(); if(now > lastWrite + WRITE_TIMEOUT) { - checkDisconnects(); writeAll(); + removeDisconnected(); lastWrite = now; } @@ -273,6 +246,57 @@ return 0; } +#ifdef _WIN32 + void handleEvents() { + DWORD bytes = 0; + MSOverlapped* overlapped = 0; + bool ret = poller.get(&bytes, &overlapped); + //dcdebug("Event: %x, %x, %x, %x, %x, %x\n", (unsigned int)ret, (unsigned int)bytes, (unsigned int)ms, (unsigned int)overlapped, (unsigned int)overlapped->ms, (unsigned int)overlapped->type); + + if(!ret) { + int error = ::GetLastError(); + if(overlapped == 0) { + if(error != WAIT_TIMEOUT) { + LOGDT(SocketManager::className, "Fatal error while getting status from completion port: " + Util::translateError(error)); + return; + } + } else if(overlapped->type == MSOverlapped::ACCEPT_DONE) { + dcdebug("Error accepting: %s\n", Util::translateError(error).c_str()); + failAccept(overlapped->ms, error); + } else if(overlapped->type == MSOverlapped::READ_DONE) { + dcdebug("Error reading: %s\n", Util::translateError(error).c_str()); + disconnect(overlapped->ms, error); + } else if(overlapped->type == MSOverlapped::WRITE_DONE) { + dcdebug("Error writing: %s\n", Util::translateError(error).c_str()); + failWrite(overlapped->ms, error); + } else { + dcdebug("Unknown error %d when waiting\n", overlapped->type); + } + } else { + switch(overlapped->type) { + case MSOverlapped::ACCEPT_DONE: { + handleAccept(overlapped->ms); + break; + } + case MSOverlapped::READ_DONE: { + handleReadDone(overlapped->ms); + break; + } + case MSOverlapped::WRITE_DONE: { + handleWriteDone(overlapped->ms, bytes); + break; + } + case MSOverlapped::SHUTDOWN: { + handleShutdown(); + break; + } + } + } + if(overlapped != 0) { + pool.put(overlapped); + } + } + void prepareAccept() throw() { if(stop) return; @@ -290,8 +314,8 @@ return; } - if(!port.associate(ms->getSocket())) { - LOGDT(SocketManager::className, "Unable to associate IO Completion port: " + Util::translateError(::GetLastError())); + if(!poller.associate(ms->getSocket())) { + LOGDT(SocketManager::className, "Unable to associate poller: " + Util::translateError(::GetLastError())); return; } @@ -301,7 +325,7 @@ ms->writeBuf->resize(ACCEPT_BUF_SIZE); MSOverlapped* overlapped = pool.get(); - *overlapped = MSOverlapped(MSOverlapped::ACCEPT, ms); + *overlapped = MSOverlapped(MSOverlapped::ACCEPT_DONE, ms); if(!::AcceptEx(srv.getSocket(), ms->getSocket(), &(*ms->writeBuf)[0], 0, ACCEPT_BUF_SIZE/2, ACCEPT_BUF_SIZE/2, &x, overlapped)) { int error = ::WSAGetLastError(); @@ -312,9 +336,6 @@ pool.put(overlapped); - FastMutex::Lock l(errorMutex); - acceptErrors[error]++; - return; } } @@ -340,7 +361,7 @@ SocketManager::getInstance()->incomingHandler(ms); ms->completeAccept(); - prepareRead(ms); + read(ms); // Prepare a new socket to replace this one... prepareAccept(); } @@ -348,14 +369,10 @@ void failAccept(ManagedSocketPtr& ms, int error) throw() { accepting.erase(ms); - prepareAccept(); - - FastMutex::Lock l(errorMutex); - acceptErrors[error]++; } - - void prepareRead(const ManagedSocketPtr& ms) throw() { + + void read(const ManagedSocketPtr& ms) throw() { if(stop) return; @@ -370,7 +387,7 @@ int error = ::WSAGetLastError(); if(error != WSA_IO_PENDING) { dcdebug("Error preparing read: %s\n", Util::translateError(error).c_str()); - failRead(ms, error); + disconnect(ms, error); } } } @@ -391,58 +408,38 @@ int error = ::WSAGetLastError(); if(error != WSAEWOULDBLOCK) { // Socket failed... - failRead(ms, error); + disconnect(ms, error); return; } - prepareRead(ms); + read(ms); return; } if(bytes == 0) { Util::freeBuf = readBuf; - failRead(ms, 0); + disconnect(ms, 0); return; } readBuf->resize(bytes); ms->completeRead(readBuf); - prepareRead(ms); + read(ms); } - void failRead(const ManagedSocketPtr& ms, int error) throw() { - if(active.find(ms) == active.end()) { + void write(const ManagedSocketPtr& ms) throw() { + if(stop || !(*ms)) { return; } - if(error != 0) { - FastMutex::Lock l(errorMutex); - readErrors[error]++; - } - - ms->close(); - - SocketSet::iterator i = disconnecting.find(ms); - if(i == disconnecting.end()) { - ms->failSocket(); - } else { - disconnecting.erase(i); - } - - active.erase(ms); - } - - void prepareWrite(const ManagedSocketPtr& ms) throw() { - if(stop || ms->writeBuf) { - return; - } - ms->writeBuf = ms->prepareWrite(); if(!ms->writeBuf) { - if(ms->disc) { - ms->close(); + uint32_t now = GET_TICK(); + + if(ms->disc || (ms->isBlocked() && ms->disc < now)) { + disconnect(ms, 0); } return; } @@ -457,11 +454,10 @@ if(::WSASend(ms->getSocket(), &ms->wsabuf, 1, &x, 0, reinterpret_cast<LPWSAOVERLAPPED>(overlapped), 0) != 0) { int error = ::WSAGetLastError(); if(error != WSA_IO_PENDING) { - failWrite(ms, error); pool.put(overlapped); + disconnect(ms, error); } } - return; } void handleWriteDone(const ManagedSocketPtr& ms, DWORD bytes) throw() { @@ -472,328 +468,51 @@ dcdebug("No buffer in handleWriteDone??\n"); return; } - if(ms->completeWrite(buf, bytes)) { - prepareWrite(ms); - } + ms->completeWrite(buf, bytes); } void failWrite(const ManagedSocketPtr& ms, int error) throw() { Util::freeBuf = ms->writeBuf; ms->writeBuf = 0; - - FastMutex::Lock l(errorMutex); - writeErrors[error]++; + disconnect(ms, error); } - - void writeAll() throw() { - for(SocketSet::iterator i = active.begin(); i != active.end(); ++i) { - prepareWrite(*i); - } - } - - void handleDisconnect(const ManagedSocketPtr& ms) throw() { - if(active.find(ms) == active.end()) { - return; - } - - if(disconnecting.find(ms) != disconnecting.end()) { - return; - } - - prepareWrite(ms); - disconnecting.insert(ms); - ms->failSocket(); - } - - void checkDisconnects() throw() { - uint32_t now = GET_TICK(); - for(SocketSet::iterator i = disconnecting.begin(); i != disconnecting.end(); ++i) { - const ManagedSocketPtr& ms = *i; - if(ms->disc + (uint32_t)SETTING(DISCONNECT_TIMEOUT) < now) { - ms->close(); - } - } - } - + void handleShutdown() throw() { for(SocketSet::iterator i = accepting.begin(); i != accepting.end(); ++i) { (*i)->close(); } for(SocketSet::iterator i = active.begin(); i != active.end(); ++i) { - (*i)->close(); + disconnect(*i, 0); } } - - FastMutex errorMutex; - SocketManager::ErrorMap acceptErrors; - SocketManager::ErrorMap readErrors; - SocketManager::ErrorMap writeErrors; - - CompletionPort port; - Socket srv; - - bool stop; - - Pool<MSOverlapped, ClearOverlapped> pool; - - typedef unordered_set<ManagedSocketPtr, PointerHash<ManagedSocket> > SocketSet; - /** Sockets that have a pending read */ - SocketSet active; - /** Sockets that have a pending accept */ - SocketSet accepting; - /** Sockets that are being written to but should be disconnected if timeout it reached */ - SocketSet disconnecting; -}; -#elif defined(HAVE_SYS_EPOLL_H) - -struct EPoll { - EPoll() : poll_fd(-1) { - } +#else - ~EPoll() { - if(poll_fd != -1) { - close(poll_fd); + void handleEvents() { + vector<epoll_event> events; + if(!poller.get(events)) { + LOGDT(SocketManager::className, "Poller failed: " + Util::translateError(errno)); } - } - - bool init() { - poll_fd = epoll_create(1024); - if(poll_fd == -1) - return false; - - return true; - } - - bool associate(const ManagedSocketPtr& ms) { - struct epoll_event ev; - ev.data.ptr = reinterpret_cast<void*>(ms.get()); - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - return epoll_ctl(poll_fd, EPOLL_CTL_ADD, ms->getSocket(), &ev) == 0; - } - - bool associate(int fd) { - struct epoll_event ev; - ev.data.fd = fd; - ev.events = EPOLLIN; - return epoll_ctl(poll_fd, EPOLL_CTL_ADD, fd, &ev) == 0; - } - - bool get(vector<epoll_event>& events) { - events.resize(1024); - while(true) { - int n = epoll_wait(poll_fd, &events[0], events.size(), WRITE_TIMEOUT); - if(n == -1) { - if(errno != EINTR) { - return false; - } - // Keep looping + for(vector<epoll_event>::iterator i = events.begin(); i != events.end(); ++i) { + epoll_event& ev = *i; + if(ev.data.fd == srv.getSocket()) { + accept(); + } else if(ev.data.fd == event[1]) { + handleShutdown(); } else { - events.resize(n); - return true; - } - } - } - - void remove(const ManagedSocketPtr& ms) { - // Buggy pre-2.6.9 kernels require non-null for last param - epoll_event e; - epoll_ctl(poll_fd, EPOLL_CTL_DEL, ms->getSocket(), &e); - } - - int poll_fd; -}; - -struct Event { - enum Type { - WRITE, - WRITE_ALL, - DISCONNECT, - REMOVE, - SHUTDOWN - } event; - ManagedSocketPtr ms; - - Event(Type event_, const ManagedSocketPtr& ms_) : event(event_), ms(ms_) { } - Event() : event(WRITE), ms(0) { } -}; - -struct ClearEvent { - void operator()(Event& evt) { - evt.ms = 0; - } -}; - -class Writer : public Thread { -public: - Writer() : stop(false) { - } - - void addWriter(const ManagedSocketPtr& ms) { - if(stop) - return; -/* - Event* ev = pool.get(); - *ev = Event(Event::WRITE, ms); - ::write(event[0], &ev, sizeof(ev));*/ - } - - void addAllWriters() { - if(stop) - return; -/* Event* ev = pool.get(); - *ev = Event(Event::WRITE_ALL, 0); - ::write(event[0], &ev, sizeof(ev));*/ - } - - void addDisconnect(const ManagedSocketPtr& ms) { - if(stop) - return; - - Event* ev = pool.get(); - *ev = Event(Event::DISCONNECT, ms); - ::write(event[0], &ev, sizeof(ev)); - } - - void shutdown() { - stop = true; - - Event* ev = pool.get(); - *ev = Event(Event::SHUTDOWN, 0); - ::write(event[0], &ev, sizeof(ev)); - - join(); - } - - void getErrors(SocketManager::ErrorMap& acceptErrors_, SocketManager::ErrorMap& readErrors_, SocketManager::ErrorMap& writeErrors_) { - FastMutex::Lock l(errorMutex); - acceptErrors_ = acceptErrors; - readErrors_ = readErrors; - writeErrors_ = writeErrors; - } - -private: - bool init() { - if(!poller.init()) { - LOGDT(SocketManager::className, "Unable to create initialize epoll: " + Util::translateError(errno)); - return false; - } - - try { - srv.listen(SETTING(SERVER_PORT)); - srv.setBlocking(false); - } catch(const SocketException& e) { - LOGDT(SocketManager::className, "Unable to create server socket: " + e.getError()); - return false; - } - - if(!poller.associate(srv.getSocket())) { - LOGDT(SocketManager::className, "Unable to set epoll: " + Util::translateError(errno)); - return false; - } - - if(socketpair(AF_UNIX, SOCK_STREAM, 0, event) == -1) { - LOGDT(SocketManager::className, "Unable to create event socketpair: " + Util::translateError(errno)); - return false; - } - if(!poller.associate(event[1])) { - LOGDT(SocketManager::className, "Unable to associate event: " + Util::translateError(errno)); - return false; - } - return true; - } - - virtual int run() { - LOGDT(SocketManager::className, "Writer starting"); - if(!init()) { - return 0; - } - - uint32_t lastWrite = 0; - std::vector<epoll_event> events; - while(!stop || !active.empty()) { - events.clear(); - - if(!poller.get(events)) { - LOGDT(SocketManager::className, "Poller failed: " + Util::translateError(errno)); - } - bool doevents = false; - for(std::vector<epoll_event>::iterator i = events.begin(); i != events.end(); ++i) { - epoll_event& ev = *i; - if(ev.data.fd == srv.getSocket()) { - accept(); - } else if(ev.data.fd == event[1]) { - doevents = true; - } else { - ManagedSocketPtr ms(reinterpret_cast<ManagedSocket*>(ev.data.ptr)); - if(ev.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) { - if(!read(ms)) - continue; - } - if(ev.events & EPOLLOUT) { - write(ms); - } + ManagedSocketPtr ms(reinterpret_cast<ManagedSocket*>(ev.data.ptr)); + if(ev.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) { + if(!read(ms)) + continue; } + if(ev.events & EPOLLOUT) { + ms->setBlocked(false); + } } - - if(doevents) { - handleEvents(); - } - - uint32_t now = GET_TICK(); - if(now > lastWrite + WRITE_TIMEOUT) { - checkDisconnects(); - writeAll(); - lastWrite = now; - } } - LOGDT(SocketManager::className, "Writer shutting down"); - return 0; } - - void handleEvents() { - while(true) { - size_t start = ev.size(); - ev.resize(64 * sizeof(Event*)); - int bytes = ::recv(event[1], &ev[0] + start, ev.size() - start, MSG_DONTWAIT); - if(bytes == -1) { - ev.resize(start); - int err = errno; - if(err == EAGAIN) { - return; - } - LOGDT(SocketManager::className, "Error reading from event[1]: " + Util::translateError(err)); - return; - } - ev.resize(bytes); - size_t events = bytes / sizeof(Event*); - for(size_t i = 0; i < events; ++i) { - Event** ee = reinterpret_cast<Event**>(&ev[i*sizeof(Event*)]); - Event* e = *ee; - switch(e->event) { - case Event::WRITE: { - write(e->ms); - } break; - case Event::WRITE_ALL: { - writeAll(); - } break; - case Event::DISCONNECT: { - disconnect(e->ms); - } break; - case Event::REMOVE: { - failRead(e->ms, 0); - } break; - case Event::SHUTDOWN: { - handleShutdown(); - } break; - } - pool.put(e); - } - ev.erase(ev.begin(), ev.begin() + events*sizeof(Event*)); - } - } - - void accept() throw() { + + void accept() { ManagedSocketPtr ms(new ManagedSocket()); try { ms->setIp(ms->sock.accept(srv)); @@ -812,15 +531,11 @@ read(ms); } catch (const SocketException& e) { LOGDT(SocketManager::className, "Unable to create socket: " + e.getError()); - if(e.getErrorCode() != 0) { - FastMutex::Lock l(errorMutex); - acceptErrors[e.getErrorCode()]++; - } return; } } - bool read(const ManagedSocketPtr& ms) throw() { + bool read(const ManagedSocketPtr& ms) { if(stop || !(*ms)) return false; @@ -834,14 +549,16 @@ Util::freeBuf = readBuf; int error = errno; - if(error != EAGAIN) { - failRead(ms, error); + if(error != EAGAIN && error != EINTR) { + ms->close(); + disconnect(ms, error); return false; } break; } else if(bytes == 0) { Util::freeBuf = readBuf; - failRead(ms, 0); + ms->close(); + disconnect(ms, 0); return false; } @@ -851,28 +568,7 @@ return true; } - void failRead(const ManagedSocketPtr& ms, int error) throw() { - if(!(*ms)) { - return; - } - - SocketSet::iterator i = disconnecting.find(ms); - if(i == disconnecting.end()) { - ms->failSocket(); - } else { - disconnecting.erase(i); - } - - poller.remove(ms); - ms->close(); - active.erase(ms); - if(error != 0) { - FastMutex::Lock l(errorMutex); - readErrors[error]++; - } - } - - void write(const ManagedSocketPtr& ms) throw() { + void write(const ManagedSocketPtr& ms) { if(stop || !(*ms)) { return; } @@ -881,8 +577,9 @@ ByteVector* writeBuf = ms->prepareWrite(); if(!writeBuf) { - if(ms->disc) { - addRemove(ms); + uint32_t now = GET_TICK(); + if(ms->disc || (ms->isBlocked() && ms->disc < now)) { + disconnect(ms, 0); } return; } @@ -895,7 +592,7 @@ return; } Util::freeBuf = writeBuf; - failWrite(ms, error); + disconnect(ms, error); return; } if(!ms->completeWrite(writeBuf, bytes)) { @@ -903,87 +600,88 @@ } } } - - void failWrite(const ManagedSocketPtr& ms, int error) throw() { - addRemove(ms); - if(error != 0) { - FastMutex::Lock l(errorMutex); - writeErrors[error]++; + + void handleShutdown() { + char buf; + int bytes = ::recv(event[1], &buf, 1, MSG_DONTWAIT); + if(bytes == -1) { + + int err = errno; + if(err == EAGAIN || err == EINTR) { + return; + } + LOGDT(SocketManager::className, "Error reading from event[1]: " + Util::translateError(err)); + return; } + + srv.disconnect(); + + for(SocketSet::iterator i = active.begin(); i != active.end(); ++i) { + disconnect(*i, 0); + } } + +#endif void writeAll() throw() { - for(SocketSet::iterator i = active.begin(); i != active.end(); ++i) { + if(active.empty()) { + return; + } + SocketSet::iterator start = active.begin(); + SocketSet::iterator end = active.end(); + SocketSet::iterator mid = start; + // Start at a random position each time in order not to favorise the first sockets... + std::advance(mid, Util::rand(active.size())); + + for(SocketSet::iterator i = mid; i != end; ++i) { write(*i); } + for(SocketSet::iterator i = start; i != mid; ++i) { + write(*i); + } } - void disconnect(const ManagedSocketPtr& ms) throw() { - if(!(*ms)) - return; - + void disconnect(const ManagedSocketPtr& ms, int error) { if(disconnecting.find(ms) != disconnecting.end()) { return; } - + disconnecting.insert(ms); - ms->failSocket(); - write(ms); + + ms->failSocket(error); } - void checkDisconnects() throw() { - uint32_t now = GET_TICK(); + void removeDisconnected() { for(SocketSet::iterator i = disconnecting.begin(); i != disconnecting.end(); ++i) { - const ManagedSocketPtr& ms = *i; - if(ms->disc + (uint32_t)SETTING(DISCONNECT_TIMEOUT) < now) { - ms->shutdown(); - } + (*i)->close(); + active.erase(*i); } } - void handleShutdown() throw() { - srv.disconnect(); - - for(SocketSet::iterator i = active.begin(); i != active.end(); ++i) { - addRemove(*i); - } - } - - // This is needed because calling close() on the socket - // will remove it from the epoll set (so the main loop won't - // be notified) - void addRemove(const ManagedSocketPtr& ms) { - Event* ev = pool.get(); - *ev = Event(Event::REMOVE, ms); - ::write(event[0], &ev, sizeof(ev)); - } - - EPoll poller; + Poller poller; Socket srv; - FastMutex errorMutex; - SocketManager::ErrorMap acceptErrors; - SocketManager::ErrorMap readErrors; - SocketManager::ErrorMap writeErrors; - bool stop; - - int event[2]; - std::vector<uint8_t> ev; - Pool<Event, ClearEvent> pool; - - typedef std::tr1::unordered_set<ManagedSocketPtr, PointerHash<ManagedSocket> > SocketSet; + typedef unordered_set<ManagedSocketPtr, PointerHash<ManagedSocket> > SocketSet; /** Sockets that have a pending read */ SocketSet active; /** Sockets that are being written to but should be disconnected if timeout it reached */ SocketSet disconnecting; -}; - + +#ifdef _WIN32 + Pool<MSOverlapped, ClearOverlapped> pool; + static const size_t PREPARED_SOCKETS = 32; + + /** Sockets that have a pending accept */ + SocketSet accepting; #else -#error No socket implementation for your platform -#endif // _WIN32 + int event[2]; + +#endif +}; + SocketManager::SocketManager() : writer(new Writer()) { } @@ -998,10 +696,12 @@ writer->start(); writer->setThreadPriority(Thread::HIGH); + ProcessQueue workQueue; + while(true) { processSem.wait(); { - FastMutex::Lock l(processCS); + FastMutex::Lock l(processMutex); workQueue.swap(processQueue); } for(ProcessQueue::iterator i = workQueue.begin(); i != workQueue.end(); ++i) { @@ -1018,19 +718,16 @@ } void SocketManager::addWriter(const ManagedSocketPtr& ms) throw() { - writer->addWriter(ms); } void SocketManager::addAllWriters() throw() { - writer->addAllWriters(); } void SocketManager::addDisconnect(const ManagedSocketPtr& ms) throw() { - writer->addDisconnect(ms); } void SocketManager::addJob(const Callback& callback) throw() { - FastMutex::Lock l(processCS); + FastMutex::Lock l(processMutex); processQueue.push_back(callback); processSem.signal(); @@ -1045,8 +742,4 @@ writer.release(); } -void SocketManager::getErrors(ErrorMap& acceptErrors_, ErrorMap& readErrors_, ErrorMap& writeErrors_) { - writer->getErrors(acceptErrors_, readErrors_, writeErrors_); } - -} Modified: adchpp/trunk/adchpp/SocketManager.h =================================================================== --- adchpp/trunk/adchpp/SocketManager.h 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/SocketManager.h 2007-12-09 18:44:45 UTC (rev 99) @@ -23,8 +23,8 @@ #include "forward.h" #include "Thread.h" -#include "Semaphores.h" #include "Mutex.h" +#include "Semaphores.h" #include "Singleton.h" namespace adchpp { @@ -37,15 +37,13 @@ void startup() throw(ThreadException) { start(); } void shutdown(); - void addWriter(const boost::intrusive_ptr<ManagedSocket>& ms) throw(); - void addDisconnect(const boost::intrusive_ptr<ManagedSocket>& ms) throw(); + void addWriter(const ManagedSocketPtr& ms) throw(); + void addDisconnect(const ManagedSocketPtr& ms) throw(); void addAllWriters() throw(); typedef std::tr1::function<void (const ManagedSocketPtr&)> IncomingHandler; void setIncomingHandler(const IncomingHandler& handler) { incomingHandler = handler; } - typedef std::tr1::unordered_map<int, size_t> ErrorMap; - ADCHPP_DLL void getErrors(ErrorMap& acceptErrors_, ErrorMap& readErrors_, ErrorMap& writeErrors_); private: friend class ManagedSocket; friend class Writer; @@ -54,10 +52,9 @@ typedef std::vector<Callback> ProcessQueue; - FastMutex processCS; + FastMutex processMutex; ProcessQueue processQueue; - ProcessQueue workQueue; Semaphore processSem; Modified: adchpp/trunk/adchpp/Util.cpp =================================================================== --- adchpp/trunk/adchpp/Util.cpp 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/adchpp/Util.cpp 2007-12-09 18:44:45 UTC (rev 99) @@ -123,6 +123,7 @@ // Only the server should be left now... aServer = url.substr(i, k-i); } + string Util::toAcp(const wstring& wString) { if(wString.empty()) return emptyString; Modified: adchpp/trunk/plugins/Script/examples/access.lua =================================================================== --- adchpp/trunk/plugins/Script/examples/access.lua 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/plugins/Script/examples/access.lua 2007-12-09 18:44:45 UTC (rev 99) @@ -483,27 +483,6 @@ end end - sm = adchpp.getSocketManager() - a = sm:getAcceptErrors() - r = sm:getReadErrors() - w = sm:getWriteErrors() - str = str .. "\nSocket errors: \n" - str = str .. " - Accept errors: \n" - for i = 0, a:size()-1 do - e = a[i] - str = str .. e.first .. "\t" .. e.second .. "\t" .. adchpp.Util_translateError(e.first) .. "\n" - end - str = str .. " - Read errors: \n" - for i = 0, r:size()-1 do - e = r[i] - str = str .. e.first .. "\t" .. e.second .. "\t" .. adchpp.Util_translateError(e.first) .. "\n" - end - str = str .. " - Write errors: \n" - for i = 0, w:size()-1 do - e = w[i] - str = str .. e.first .. "\t" .. e.second .. "\t" .. adchpp.Util_translateError(e.first) .. "\n" - end - local queued = cm:getQueuedBytes() local queueBytes = adchpp.Stats_queueBytes local queueCalls = adchpp.Stats_queueCalls Modified: adchpp/trunk/readme.txt =================================================================== --- adchpp/trunk/readme.txt 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/readme.txt 2007-12-09 18:44:45 UTC (rev 99) @@ -18,7 +18,7 @@ gcc 4.2+ (linux or mingw) boost (http://www.boost.org) scons (http://www.scons.org) -swig 1.3.31 +swig 1.3.33 ** Important!! The hub will _NOT_ run on Win9x/ME. ** Modified: adchpp/trunk/swig/adchpp.i =================================================================== --- adchpp/trunk/swig/adchpp.i 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/swig/adchpp.i 2007-12-09 18:44:45 UTC (rev 99) @@ -516,35 +516,6 @@ class SocketManager { public: - %extend { - std::vector<std::pair<int, size_t> > getAcceptErrors() { - std::vector<std::pair<int, size_t> > tmp; - adchpp::SocketManager::ErrorMap a, r, w; - self->getErrors(a, r, w); - for(SocketManager::ErrorMap::iterator i = a.begin(); i != a.end(); ++i) { - tmp.push_back(std::make_pair(i->first, i->second)); - } - return tmp; - } - std::vector<std::pair<int, size_t> > getReadErrors() { - std::vector<std::pair<int, size_t> > tmp; - adchpp::SocketManager::ErrorMap a, r, w; - self->getErrors(a, r, w); - for(SocketManager::ErrorMap::iterator i = r.begin(); i != r.end(); ++i) { - tmp.push_back(std::make_pair(i->first, i->second)); - } - return tmp; - } - std::vector<std::pair<int, size_t> > getWriteErrors() { - std::vector<std::pair<int, size_t> > tmp; - adchpp::SocketManager::ErrorMap a, r, w; - self->getErrors(a, r, w); - for(SocketManager::ErrorMap::iterator i = w.begin(); i != w.end(); ++i) { - tmp.push_back(std::make_pair(i->first, i->second)); - } - return tmp; - } - } }; class ClientManager Modified: adchpp/trunk/swig/lua.i =================================================================== --- adchpp/trunk/swig/lua.i 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/swig/lua.i 2007-12-09 18:44:45 UTC (rev 99) @@ -1,5 +1,6 @@ %module luadchpp + typedef unsigned int size_t; %wrapper %{ Modified: adchpp/trunk/swig/python.i =================================================================== --- adchpp/trunk/swig/python.i 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/swig/python.i 2007-12-09 18:44:45 UTC (rev 99) @@ -1,5 +1,6 @@ %module pyadchpp + %{ // Python pollution #undef socklen_t Modified: adchpp/trunk/test/PyClient.py =================================================================== --- adchpp/trunk/test/PyClient.py 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/test/PyClient.py 2007-12-09 18:44:45 UTC (rev 99) @@ -1,7 +1,7 @@ #!/usr/bin/python import sys -sys.path.append('../build/debug-mingw/bin') +sys.path.append('../build/debug-default/bin') CLIENTS = 100 @@ -60,7 +60,7 @@ def login(self, ipport): self.connect(ipport) cmd = AdcCommand(AdcCommand.CMD_SUP, AdcCommand.TYPE_HUB, 0) - cmd.addParam("ADBASE") + cmd.addParam("ADBASE").addParam("ADTIGR") self.command(cmd) self.expect(AdcCommand.CMD_SUP) sid = self.expect(AdcCommand.CMD_SID) Modified: adchpp/trunk/unix/po/adchppd.pot =================================================================== --- adchpp/trunk/unix/po/adchppd.pot 2007-12-04 22:04:21 UTC (rev 98) +++ adchpp/trunk/unix/po/adchppd.pot 2007-12-09 18:44:45 UTC (rev 99) @@ -7,7 +7,7 @@ msgstr "" "Project-Id-Version: \"adchpp\"--copyright-holder=\"Jacek Sieka\"\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2007-11-25 22:39+0100\n" +"POT-Creation-Date: 2007-12-09 18:46+0100\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL...@li...>\n" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |