From: Christian P. <cp...@us...> - 2005-02-07 18:41:17
|
Update of /cvsroot/pclasses/pclasses2/src/System In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv18685/src/System Modified Files: Makefile.am Added Files: EventQueue.cpp FdListener.h FdListener.posix.cpp Log Message: Added EventQueue, Event. Added FdListener and FdListenerThread. --- NEW FILE: FdListener.posix.cpp --- /*************************************************************************** * Copyright (C) 2005 by Christian Prochnow * * cp...@se... * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #include "FdListener.h" #include "timeout.h" #include <sys/select.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> namespace P { namespace System { FdListener::FdListener(int fd, int flags) : _fd(fd), _flags(flags) { } FdListener::~FdListener() { } int FdListener::fd() const throw() { return _fd; } int FdListener::flags() const throw() { return _flags; } void FdListener::setFlags(int flags) throw() { _flags = flags; } void FdListener::onRead() { } void FdListener::onWrite() { } void FdListener::onError() { } // our internal wakeup listener class class WakeupListener: public FdListener { public: WakeupListener(int handle) : FdListener(handle, FdListener::Read) { } protected: void onRead() { // read data from pipe .. we dont want to get signaled forever ... char tmp[256]; ::read(fd(), tmp, sizeof(tmp)); } }; FdListenerThread* FdListenerThread::_theThread = 0; FdListenerThread::FdListenerThread() throw(SystemError) : Thread(true), _shouldExit(false) { // we use a pipe for wakeup signaling ... int ret = ::pipe(_wakeupPipe); if(ret == -1) throw SystemError(errno, "Could not create wakeup pipe", P_SOURCEINFO); ::fcntl(_wakeupPipe[0], F_SETFL, O_NONBLOCK); _wakeupListener = new WakeupListener(_wakeupPipe[0]); _listeners.insert(_wakeupListener); } FdListenerThread::~FdListenerThread() throw() { delete _wakeupListener; ::close(_wakeupPipe[0]); ::close(_wakeupPipe[1]); } void FdListenerThread::addListener(FdListener* l) { CriticalSection::ScopedLock lck(_listenersCs); _listeners.insert(l); wakeup(); } void FdListenerThread::removeListener(FdListener* l) { CriticalSection::ScopedLock lck(_listenersCs); _listeners.erase(l); wakeup(); } void FdListenerThread::stop() { _shouldExit = true; wakeup(); } void FdListenerThread::wakeup() { ::write(_wakeupPipe[1], "W", 1); } FdListenerThread& FdListenerThread::instance() { static CriticalSection cs; CriticalSection::ScopedLock lck(cs); if(!_theThread) { _theThread = new FdListenerThread(); _theThread->start(); } return *_theThread; } int populate_fd_sets(fd_set* read_fds, fd_set* write_fds, fd_set* error_fds, FdListenerThread::ListenerSet& l) { int highest_fd = 0; int num_read_fds = 0, num_write_fds = 0, num_error_fds = 0; FD_ZERO(read_fds); FD_ZERO(write_fds); FD_ZERO(error_fds); for(FdListenerThread::ListenerSet::const_iterator i = l.begin(); i != l.end(); ++i) { FdListener* listener = *i; int fd = listener->fd(); int lflags = listener->flags(); if(lflags & FdListener::Read) { FD_SET(fd, read_fds); ++num_read_fds; if(fd > highest_fd) highest_fd = fd; } if(lflags & FdListener::Write) { FD_SET(fd, write_fds); ++num_write_fds; if(fd > highest_fd) highest_fd = fd; } if(lflags & FdListener::Error) { FD_SET(fd, error_fds); ++num_error_fds; if(fd > highest_fd) highest_fd = fd; } } return highest_fd; } int FdListenerThread::main() { fd_set read_fds, write_fds, error_fds; struct timeval timeout; unsigned int timeout_ms = 0; while(!_shouldExit) { int highest_fd; { CriticalSection::ScopedLock lck(_listenersCs); highest_fd = populate_fd_sets(&read_fds, &write_fds, &error_fds, _listeners); } Private::get_timeout(&timeout, timeout_ms, Private::TIMEOUT_RELATIVE); int ret = ::select(highest_fd + 1, &read_fds, &write_fds, &error_fds, &timeout); if(ret == -1) { //@@fixme ... we don't want to throw from Thread::main() !!! throw SystemError(errno, "Could not select() for events", P_SOURCEINFO); } else if(ret) { // i think we can hold the lock when dispatching the events ... CriticalSection::ScopedLock lck(_listenersCs); for(ListenerSet::const_iterator i = _listeners.begin(); i != _listeners.end(); ++i) { FdListener* l = *i; int fd = l->fd(); int flags = l->flags(); if(flags & FdListener::Read && FD_ISSET(fd, &read_fds)) { flags &= ~FdListener::Read; l->onRead(); } if(flags & FdListener::Write && FD_ISSET(fd, &write_fds)) { flags &= ~FdListener::Write; l->onWrite(); } if(flags & FdListener::Error && FD_ISSET(fd, &error_fds)) { flags &= ~FdListener::Error; l->onError(); } if(flags != l->flags()) l->setFlags(flags); } } } delete this; return 0; } } // !namespace System } // !namespace P Index: Makefile.am =================================================================== RCS file: /cvsroot/pclasses/pclasses2/src/System/Makefile.am,v retrieving revision 1.12 retrieving revision 1.13 diff -u -d -r1.12 -r1.13 --- Makefile.am 31 Jan 2005 11:19:38 -0000 1.12 +++ Makefile.am 7 Feb 2005 18:41:07 -0000 1.13 @@ -55,12 +55,14 @@ if WITH_POSIX_IO IO_Sources = IOHandle.posix.cpp Pipe.posix.cpp File.posix.cpp \ - FileInfo.posix.cpp Directory.posix.cpp Process.posix.cpp + FileInfo.posix.cpp Directory.posix.cpp Process.posix.cpp \ + FdListener.posix.cpp endif if WITH_WIN32_IO IO_Sources = IOHandle.win32.cpp Pipe.win32.cpp File.win32.cpp \ - FileInfo.win32.cpp Directory.win32.cpp Process.win32.cpp + FileInfo.win32.cpp Directory.win32.cpp Process.win32.cpp \ + FdListener.win32.cpp endif if WITH_POSIX_TIME @@ -73,8 +75,6 @@ System_Sources = CdRomDevice.linux.cpp -#EventLoop_Sources = EventLoop.select.cpp - EXTRA_DIST = CriticalSection.generic.cpp CriticalSection.win32.cpp \ Mutex.solaris.cpp Mutex.posix.cpp Mutex.win32.cpp SharedMemory.sysv.cpp \ SharedMemory.posix.cpp SharedMemory.win32.cpp @@ -91,8 +91,7 @@ $(Semaphore_Sources) $(SharedMem_Sources) $(SharedLib_Sources) \ SharedLib.common.cpp FileInfo.common.cpp ProcessIO.cpp Process.common.cpp \ $(IO_Sources) File.common.cpp Pipe.common.cpp StorageDevice.common.cpp \ - $(Time_Sources) PathFinder.cpp $(System_Sources) -# EventLoop.common.cpp $(EventLoop_Sources) + $(Time_Sources) PathFinder.cpp $(System_Sources) EventQueue.cpp libpclasses_system_la_LDFLAGS = -no-undefined --- NEW FILE: FdListener.h --- /*************************************************************************** * Copyright (C) 2005 by Christian Prochnow * * cp...@se... * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #ifndef P_System_FdListener_h #define P_System_FdListener_h #include "pclasses/System/Thread.h" #include "pclasses/System/CriticalSection.h" #include "pclasses/System/SystemError.h" #include <set> namespace P { namespace System { class PSYSTEM_EXPORT FdListener { public: friend class FdListenerThread; enum Flags { Read = 0x01, Write = 0x02, Error = 0x04 }; FdListener(int fd, int flags); virtual ~FdListener(); int fd() const throw(); int flags() const throw(); void setFlags(int flags) throw(); protected: virtual void onRead(); virtual void onWrite(); virtual void onError(); private: int _fd; int _flags; }; class PSYSTEM_EXPORT FdListenerThread: private Thread { public: typedef std::set<FdListener*> ListenerSet; void addListener(FdListener* l); void removeListener(FdListener* l); void wakeup(); void stop(); static FdListenerThread& instance(); private: FdListenerThread() throw(SystemError); ~FdListenerThread() throw(); int main(); volatile bool _shouldExit; int _wakeupPipe[2]; FdListener* _wakeupListener; ListenerSet _listeners; CriticalSection _listenersCs; static FdListenerThread* _theThread; }; } // !namespace System } // !namespace P #endif --- NEW FILE: EventQueue.cpp --- /*************************************************************************** * Copyright (C) 2005 by Christian Prochnow * * cp...@se... * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #include "pclasses/System/EventQueue.h" #include <set> namespace P { namespace System { Event::Event(void* sender, unsigned int id) : _sender(sender), _id(id) { } Event::~Event() { } void* Event::sender() const throw() { return _sender; } unsigned int Event::id() const throw() { return _id; } EventListener::EventListener(void* sender) : _eventQueue(EventQueue::instance()), _sender(sender) { _eventQueue.addListener(this); } EventListener::EventListener(EventQueue& evq, void* sender) : _eventQueue(evq), _sender(sender) { _eventQueue.addListener(this); } EventListener::~EventListener() { _eventQueue.removeListener(this); } EventQueue& EventListener::eventQueue() const throw() { return _eventQueue; } void* EventListener::sender() const throw() { return _sender; } ThreadKey<EventQueue> EventQueue::_theQueues; EventQueue::EventQueue() : _eventQMutex(), _eventQCond(_eventQMutex) { } EventQueue::~EventQueue() { } void EventQueue::post(const Event& ev) { Mutex::ScopedLock lck(_eventQMutex); _eventQueue.push(ev); _eventQCond.signal(); } void EventQueue::wait(Event& ev) { Mutex::ScopedLock lck(_eventQMutex); while(_eventQueue.empty()) _eventQCond.wait(lck); ev = _eventQueue.front(); _eventQueue.pop(); } bool EventQueue::tryWait(Event& ev, unsigned int timeout) { Mutex::ScopedLock lck(_eventQMutex); if(_eventQueue.empty()) { if(!_eventQCond.tryWait(lck, timeout) || _eventQueue.empty()) return false; } ev = _eventQueue.front(); _eventQueue.pop(); return true; } void EventQueue::dispatch(const Event& ev) { // we should call the EventListener's without held mutex lock, since a // signaled EventListener may modify our list of listeners. std::set<EventListener*> _dispatchers; // we need exclusive access to the multimap ... _listenersMutex.lock(); std::pair<ListenerMap::iterator, ListenerMap::iterator> p = _listeners.equal_range(ev.sender()); // copy the EventListeners that should be notified. while(p.first != p.second) { EventListener* l = (*p.first).second; _dispatchers.insert(l); ++p.first; } // unlock the mutex _listenersMutex.unlock(); // now notify all EventListener's without held lock std::set<EventListener*>::iterator i = _dispatchers.begin(); while(i != _dispatchers.end()) { (*i)->signaled(ev); ++i; } } void EventQueue::addListener(EventListener* l) { Mutex::ScopedLock lck(_listenersMutex); _listeners.insert(std::make_pair(l->sender(), l)); } void EventQueue::removeListener(EventListener* l) { Mutex::ScopedLock lck(_listenersMutex); std::pair<ListenerMap::iterator, ListenerMap::iterator> p = _listeners.equal_range(l->sender()); // the EventListener may have been added multiple times. // only remove the first one ... while(p.first != p.second) { if((*p.first).second == l) { _listeners.erase(p.first); break; } ++p.first; } } EventQueue& EventQueue::instance() { // we have one EventQueue per thread ... EventQueue* q = _theQueues; if(!q) { q = new EventQueue(); _theQueues = q; } return *q; } } // !namespace System } // !namespace P |