From: Christian P. <cp...@us...> - 2005-02-10 19:14:28
|
Update of /cvsroot/pclasses/pclasses2/src/System In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2439/src/System Modified Files: FdListener.h FdListener.posix.cpp EventQueue.cpp Log Message: Renamed FdListenerThread -> FdListenerList. Waiting for signaled Fd's is now done by the EventQueue - not by a seperate Thread. Added Socket::setBlocking(bool). Index: FdListener.posix.cpp =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/System/FdListener.posix.cpp,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- FdListener.posix.cpp 7 Feb 2005 18:41:07 -0000 1.1 +++ FdListener.posix.cpp 10 Feb 2005 19:13:42 -0000 1.2 @@ -83,11 +83,15 @@ } }; +typedef std::map< + EventQueue*, + FdListenerList* +> InstMap; -FdListenerThread* FdListenerThread::_theThread = 0; +InstMap FdListenerList::_theLists; -FdListenerThread::FdListenerThread() throw(SystemError) -: Thread(true), _shouldExit(false) +FdListenerList::FdListenerList(EventQueue& evq) throw(SystemError) +: _eventQueue(evq) { // we use a pipe for wakeup signaling ... int ret = ::pipe(_wakeupPipe); @@ -100,54 +104,50 @@ _listeners.insert(_wakeupListener); } -FdListenerThread::~FdListenerThread() throw() +FdListenerList::~FdListenerList() throw() { delete _wakeupListener; ::close(_wakeupPipe[0]); ::close(_wakeupPipe[1]); } -void FdListenerThread::addListener(FdListener* l) +void FdListenerList::addListener(FdListener* l) { CriticalSection::ScopedLock lck(_listenersCs); _listeners.insert(l); wakeup(); } -void FdListenerThread::removeListener(FdListener* l) +void FdListenerList::removeListener(FdListener* l) { CriticalSection::ScopedLock lck(_listenersCs); _listeners.erase(l); wakeup(); } -void FdListenerThread::stop() -{ - _shouldExit = true; - wakeup(); -} - -void FdListenerThread::wakeup() +void FdListenerList::wakeup() { ::write(_wakeupPipe[1], "W", 1); } -FdListenerThread& FdListenerThread::instance() +FdListenerList& FdListenerList::instance(EventQueue& evq) { static CriticalSection cs; CriticalSection::ScopedLock lck(cs); - if(!_theThread) + InstMap::const_iterator i = _theLists.find(&evq); + if(i == _theLists.end()) { - _theThread = new FdListenerThread(); - _theThread->start(); + FdListenerList* thread = new FdListenerList(evq); + _theLists.insert(std::make_pair(&evq, thread)); + return *thread; } - return *_theThread; + return *i->second; } int populate_fd_sets(fd_set* read_fds, fd_set* write_fds, fd_set* error_fds, - FdListenerThread::ListenerSet& l) + FdListenerList::ListenerSet& l) { int highest_fd = 0; int num_read_fds = 0, num_write_fds = 0, num_error_fds = 0; @@ -156,7 +156,7 @@ FD_ZERO(write_fds); FD_ZERO(error_fds); - for(FdListenerThread::ListenerSet::const_iterator i = l.begin(); + for(FdListenerList::ListenerSet::const_iterator i = l.begin(); i != l.end(); ++i) { FdListener* listener = *i; @@ -194,68 +194,61 @@ return highest_fd; } -int FdListenerThread::main() +void FdListenerList::wait(unsigned int timeout_ms) { fd_set read_fds, write_fds, error_fds; struct timeval timeout; - unsigned int timeout_ms = 0; - while(!_shouldExit) + int highest_fd; { - int highest_fd; - { - CriticalSection::ScopedLock lck(_listenersCs); + CriticalSection::ScopedLock lck(_listenersCs); - highest_fd = - populate_fd_sets(&read_fds, &write_fds, &error_fds, _listeners); - } + highest_fd = + populate_fd_sets(&read_fds, &write_fds, &error_fds, _listeners); + } - Private::get_timeout(&timeout, timeout_ms, Private::TIMEOUT_RELATIVE); + Private::get_timeout(&timeout, timeout_ms, Private::TIMEOUT_RELATIVE); - int ret = ::select(highest_fd + 1, &read_fds, &write_fds, &error_fds, - &timeout); + int ret = ::select(highest_fd + 1, &read_fds, &write_fds, &error_fds, + &timeout); - if(ret == -1) - { - //@@fixme ... we don't want to throw from Thread::main() !!! - throw SystemError(errno, "Could not select() for events", P_SOURCEINFO); - } - else if(ret) + if(ret == -1) + { + //@@fixme ... we don't want to throw from Thread::main() !!! + throw SystemError(errno, "Could not select() for events", P_SOURCEINFO); + } + else if(ret) + { + // i think we can hold the lock when dispatching the events ... + CriticalSection::ScopedLock lck(_listenersCs); + + for(ListenerSet::const_iterator i = _listeners.begin(); + i != _listeners.end(); ++i) { - // i think we can hold the lock when dispatching the events ... - CriticalSection::ScopedLock lck(_listenersCs); + FdListener* l = *i; + int fd = l->fd(); + int flags = l->flags(); - for(ListenerSet::const_iterator i = _listeners.begin(); - i != _listeners.end(); ++i) + if(flags & FdListener::Read && FD_ISSET(fd, &read_fds)) { - FdListener* l = *i; - int fd = l->fd(); - int flags = l->flags(); - - if(flags & FdListener::Read && FD_ISSET(fd, &read_fds)) - { - flags &= ~FdListener::Read; - l->onRead(); - } - if(flags & FdListener::Write && FD_ISSET(fd, &write_fds)) - { - flags &= ~FdListener::Write; - l->onWrite(); - } - if(flags & FdListener::Error && FD_ISSET(fd, &error_fds)) - { - flags &= ~FdListener::Error; - l->onError(); - } - - if(flags != l->flags()) - l->setFlags(flags); + flags &= ~FdListener::Read; + l->onRead(); } + if(flags & FdListener::Write && FD_ISSET(fd, &write_fds)) + { + flags &= ~FdListener::Write; + l->onWrite(); + } + if(flags & FdListener::Error && FD_ISSET(fd, &error_fds)) + { + flags &= ~FdListener::Error; + l->onError(); + } + + if(flags != l->flags()) + l->setFlags(flags); } } - - delete this; - return 0; } Index: FdListener.h =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/System/FdListener.h,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- FdListener.h 7 Feb 2005 18:41:07 -0000 1.1 +++ FdListener.h 10 Feb 2005 19:13:42 -0000 1.2 @@ -21,10 +21,10 @@ #ifndef P_System_FdListener_h #define P_System_FdListener_h -#include "pclasses/System/Thread.h" #include "pclasses/System/CriticalSection.h" #include "pclasses/System/SystemError.h" #include <set> +#include <map> namespace P { @@ -32,7 +32,7 @@ class PSYSTEM_EXPORT FdListener { public: - friend class FdListenerThread; + friend class FdListenerList; enum Flags { Read = 0x01, @@ -58,7 +58,9 @@ int _flags; }; -class PSYSTEM_EXPORT FdListenerThread: private Thread { +class EventQueue; + +class PSYSTEM_EXPORT FdListenerList { public: typedef std::set<FdListener*> ListenerSet; @@ -66,24 +68,26 @@ void removeListener(FdListener* l); void wakeup(); - void stop(); - static FdListenerThread& instance(); + void wait(unsigned int timeout); - private: - FdListenerThread() throw(SystemError); - ~FdListenerThread() throw(); + static FdListenerList& instance(EventQueue& evq); - int main(); + private: + FdListenerList(EventQueue& evq) throw(SystemError); + ~FdListenerList() throw(); - volatile bool _shouldExit; int _wakeupPipe[2]; FdListener* _wakeupListener; + EventQueue& _eventQueue; ListenerSet _listeners; CriticalSection _listenersCs; - static FdListenerThread* _theThread; + static std::map< + EventQueue*, + FdListenerList* + > _theLists; }; Index: EventQueue.cpp =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/System/EventQueue.cpp,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- EventQueue.cpp 7 Feb 2005 18:41:06 -0000 1.1 +++ EventQueue.cpp 10 Feb 2005 19:13:43 -0000 1.2 @@ -21,6 +21,10 @@ #include "pclasses/System/EventQueue.h" #include <set> +#ifndef WIN32 +# include "FdListener.h" +#endif + namespace P { namespace System { @@ -76,55 +80,66 @@ ThreadKey<EventQueue> EventQueue::_theQueues; EventQueue::EventQueue() -: _eventQMutex(), _eventQCond(_eventQMutex) { + _private = (void*)&FdListenerList::instance(*this); } EventQueue::~EventQueue() { } +void EventQueue::send(const Event& ev) +{ + dispatch(ev); +} + void EventQueue::post(const Event& ev) { - Mutex::ScopedLock lck(_eventQMutex); + CriticalSection::ScopedLock lck(_eventQueueCs); _eventQueue.push(ev); - _eventQCond.signal(); + + // wakeup the FdListener + FdListenerList* lst = (FdListenerList*)_private; + lst->wakeup(); } void EventQueue::wait(Event& ev) { - Mutex::ScopedLock lck(_eventQMutex); - while(_eventQueue.empty()) - _eventQCond.wait(lck); - - ev = _eventQueue.front(); - _eventQueue.pop(); + while(!tryWait(ev, 100000)); } bool EventQueue::tryWait(Event& ev, unsigned int timeout) { - Mutex::ScopedLock lck(_eventQMutex); - + _eventQueueCs.lock(); if(_eventQueue.empty()) { - if(!_eventQCond.tryWait(lck, timeout) - || _eventQueue.empty()) + FdListenerList* lst = (FdListenerList*)_private; + _eventQueueCs.unlock(); + + lst->wait(100000); + + _eventQueueCs.lock(); + if(_eventQueue.empty()) + { return false; + _eventQueueCs.unlock(); + } } ev = _eventQueue.front(); _eventQueue.pop(); - return true; + _eventQueueCs.unlock(); + return true; } void EventQueue::dispatch(const Event& ev) { // we should call the EventListener's without held mutex lock, since a // signaled EventListener may modify our list of listeners. - std::set<EventListener*> _dispatchers; + std::set<EventListener*> dispatchers; // we need exclusive access to the multimap ... - _listenersMutex.lock(); + _listenersCs.lock(); std::pair<ListenerMap::iterator, ListenerMap::iterator> p = _listeners.equal_range(ev.sender()); @@ -133,16 +148,16 @@ while(p.first != p.second) { EventListener* l = (*p.first).second; - _dispatchers.insert(l); + dispatchers.insert(l); ++p.first; } // unlock the mutex - _listenersMutex.unlock(); + _listenersCs.unlock(); // now notify all EventListener's without held lock - std::set<EventListener*>::iterator i = _dispatchers.begin(); - while(i != _dispatchers.end()) + std::set<EventListener*>::iterator i = dispatchers.begin(); + while(i != dispatchers.end()) { (*i)->signaled(ev); ++i; @@ -151,13 +166,13 @@ void EventQueue::addListener(EventListener* l) { - Mutex::ScopedLock lck(_listenersMutex); + CriticalSection::ScopedLock lck(_listenersCs); _listeners.insert(std::make_pair(l->sender(), l)); } void EventQueue::removeListener(EventListener* l) { - Mutex::ScopedLock lck(_listenersMutex); + CriticalSection::ScopedLock lck(_listenersCs); std::pair<ListenerMap::iterator, ListenerMap::iterator> p = _listeners.equal_range(l->sender()); |