[asycxx-devel] SF.net SVN: asycxx:[38] trunk
Status: Alpha
Brought to you by:
joe_steeve
From: <joe...@us...> - 2009-04-08 07:39:51
|
Revision: 38 http://asycxx.svn.sourceforge.net/asycxx/?rev=38&view=rev Author: joe_steeve Date: 2009-04-08 07:39:49 +0000 (Wed, 08 Apr 2009) Log Message: ----------- removed 'Deferred' code from class:Reactor Reactor now directly manages 'Selectables'. Since both Transports and Listeners are Selectables, we can directly use them instead of callbacks. From: Joe Steeve <js...@hi...> Modified Paths: -------------- trunk/include/asycxx/Reactor.h trunk/src/Reactor.cxx Modified: trunk/include/asycxx/Reactor.h =================================================================== --- trunk/include/asycxx/Reactor.h 2009-04-08 07:39:05 UTC (rev 37) +++ trunk/include/asycxx/Reactor.h 2009-04-08 07:39:49 UTC (rev 38) @@ -14,74 +14,152 @@ #ifndef __HIPRO_ASYCXX__REACTOR_H__ #define __HIPRO_ASYCXX__REACTOR_H__ +#include <list> -#include <deque> - #include "_asycxx.h" -#include "Deferred.h" +#include "Selectable.h" + #define TIME_SENSITIVITY 200LL -class Reactor +namespace asycxx { -public: - Reactor (); - virtual ~Reactor (); + /** + * \class Reactor + * + * \brief The main event dispatching mechanism + * + * \details The reactor provides a mechanism to register + * 'Selectable's waiting for readability or writability. The reactor + * sits in a tight loop waiting for any of the 'Selectable's to + * become readable/writable. The exact mechanism of this depends on + * the Reactor implementation. Eg: SelectReactor uses select(), + * while EPollReactor uses epoll(). + */ + class Reactor + { + public: + Reactor(); + virtual ~Reactor(); - Deferred * OnReadable (int Fd) throw (); - Deferred * OnReadable (int Fd, h_msecs_t timeoutTime) throw (); - Deferred * OnWritable (int Fd) throw (); - Deferred * OnWritable (int Fd, h_msecs_t timeoutTime) throw (); - Deferred * OnTimeElapsed (h_msecs_t timeoutTime) throw (); - void UnRegister (Deferred *deferred) throw (); + /** + * \brief Gets the current time-stamp (millisecond accuracy) + * + * \details This method returns the current time-stamp. This + * time-stamp gets updated every time the Reactor wakes up. + */ + asycxx_timestamp_t CurrentTS (void) { return m_LastAwokenTS; } - h_timestamp_t CurrentTS (void) { return m_LastAwokenTS; } - void Run (void); + /** + * \brief Add a 'Selectable' to be notified when Readable + * + * \param[in] reader A 'Selectable' object + * + * \details This method adds the given 'Selectable' to the list of + * 'readers'. + */ + virtual void addReader (Selectable *reader); + + /** + * \brief Add a 'Selectable' to be notified when Writable + * + * \param[in] writer A 'Selectable' object + * + * \details This method adds the given 'Selectable' to the list of + * 'writers'. + */ + virtual void addWriter (Selectable *writer); + + /** + * \brief Remove a 'Selectable' from 'readers list' + * + * \param[in] reader A 'Selectable' object + * + * \details This method removes the given 'Selectable' from the + * list of 'readers'. The given 'Selectable' wont get 'Readable' + * notification anymore. + */ + virtual void removeReader (Selectable *reader); + + /** + * \brief Remove a 'Selectable' from 'writers list' + * + * \param[in] reader A 'Selectable' object + * + * \details This method removes the given 'Selectable' from the + * list of 'writers'. The given 'Selectable' wont get 'Writable' + * notification anymore. + */ + virtual void removeWriter (Selectable *writer); + + /** + * \brief Clear all the 'readers' and 'writers' + * + * \details This method clears the readers and writers list. All + * objects associated with the selectables in the respective lists + * would also be destroyed. + * + * Note: Unless the application has registered a 'Timer', the + * application would hang after calling this method. + */ + //virtual void resetAll (void); + + /** + * \brief Runs the reactor + * + * \details This method does not return. It blocks waiting for + * events on the registered 'Selectables' and notifies them. + */ + void Run (void); -protected: - class Event - { - public: - Event (); - ~Event (); + protected: + class Event + { + public: + Event (Selectable *selectable) + { + pSelectable = selectable; + Fd = pSelectable->Fd(); + bDie = false; + } + ~Event () {} - bool m_bDie; + int Fd; + bool bDie; + Selectable *pSelectable; + }; - int m_Fd; - h_msecs_t m_TimeoutTime; - h_timestamp_t m_LastTriggerTS; - h_timestamp_t m_NextTriggerTS; - Deferred m_Deferred; - }; - std::deque<Event *> m_readEvents; - std::deque<Event *> m_writeEvents; - std::deque<Event *> m_timeEvents; + std::list<Event *> m_readEvents; + std::list<Event *> m_writeEvents; - int m_PendingFDEvents; - virtual void WaitForEvents (void) = 0; - virtual void ProcessFDEvents (void) = 0; + /** + * \brief Wait until some event occurs + * + * \details This method should be implemented by the specific + * reactor implementations. This method should block till some event + * occurs and is available for processing. This method should return + * the number of pending file-descriptor events. + */ + virtual int WaitForEvents (void) = 0; - /** - * \brief The TimeStamp when latest activity happened - * - * \detail This holds the last time-stamp when the reactor was doing - * something. This should be updated by Reactor implementations - * (Select, Epoll, etc.) everytime they wake up to do something - * (attend to file-descriptor activity or handle timeout actions - */ - h_timestamp_t m_LastAwokenTS; - void DoFDEventNotification (Reactor::Event *ev); - void DoFDTimeoutNotification (Reactor::Event *ev); + /** + * \brief Process the timeouts of FD events + * + * \details This method looks up the list of events for timeouts and + * fires them. + */ + virtual void ProcessEvents (void) = 0; -private: - void ProcessTimeEvents (void); - void ProcessFDTimeouts (void); - void CleanupEventLists (void); - Deferred * AddToReadEvents (int Fd, h_msecs_t timeoutTime); - Deferred * AddToWriteEvents (int Fd, h_msecs_t timeoutTime); + private: + asycxx_timestamp_t m_LastAwokenTS; -}; + void CleanupEventLists (void); + void _CleanupEventList (std::list<Event *>& ev_list); + void addFdEvent (Selectable *s, const char *type, + std::list<Reactor::Event *>& ev_list); + }; +} #endif /* __HIPRO_ASYCXX__REACTOR_H__ */ /* Modified: trunk/src/Reactor.cxx =================================================================== --- trunk/src/Reactor.cxx 2009-04-08 07:39:05 UTC (rev 37) +++ trunk/src/Reactor.cxx 2009-04-08 07:39:49 UTC (rev 38) @@ -11,545 +11,191 @@ * *******************************************************************/ -/** - * \author Joe Steeve, jo...@hi... - * \class Reactor - * \brief Core 'reactor' functionality - * - * \details The reactor is the core. - */ #ifdef HAVE_CONFIG_H #include <asycxx-config.h> #endif #include <unistd.h> -#include <exception> #include "asycxx-common.h" #include <asycxx/Reactor.h> -#include <asycxx/Deferred.h> +#include <asycxx/Selectable.h> +#include <asycxx/Error.h> -////////////////////////////////////////////////////////////////////////////// +using namespace asycxx; -/** - * \author Joe Steeve, jo...@hi... - * \class Reactor::Event - * \brief Defines an event - * - * \details This class defines an 'Event' - */ - -/** - * \brief ctor. - */ -Reactor::Event::Event () -{ - m_Fd = -1; - m_TimeoutTime = 0L; - m_LastTriggerTS = 0L; - m_NextTriggerTS = 0L; - m_bDie = false; -} - -/** - * \brief dtor. - */ -Reactor::Event::~Event () -{ -} - - ////////////////////////////////////////////////////////////////////////////// -/** - * \brief Constructor - * +/* + * CTOR: */ Reactor::Reactor () { - m_PendingFDEvents = 0; - m_LastAwokenTS = GetCurrentTimeStamp (); + m_LastAwokenTS = GetCurrentTimeStamp(); } -/** - * \brief Destructor - * +/* + * DTOR: we clear up all the event lists here */ Reactor::~Reactor () { - size_t i; - Event *ev; + std::list<Event *>::iterator it; /* Destroy all event-infos */ - for (i=0; i<m_readEvents.size(); i++) + for (it = m_readEvents.begin(); it != m_readEvents.end(); it++) { - ev = m_readEvents[0]; - m_readEvents.pop_front(); - delete ev; + delete *it; } - for (i=0; i<m_writeEvents.size(); i++) + for (it = m_writeEvents.begin(); it != m_writeEvents.end(); it++) { - ev = m_writeEvents[0]; - m_writeEvents.pop_front(); - delete ev; + delete *it; } - for (i=0; i<m_timeEvents.size(); i++) - { - ev = m_timeEvents[0]; - m_timeEvents.pop_front(); - delete ev; - } } -/** - * \brief Get a deferred that fires when given 'Fd' becomes readable - * - * \details The deferred returned by this method does not keep track - * of time. It fires only when Fd becomes readable. +/* + * Methods to register the given 'Selectable's */ -Deferred * -Reactor::OnReadable (int Fd) throw () +void +Reactor::addReader (Selectable *reader) { - if (Fd == -1) - { THROW (DevError, "bad fd = -1"); } - - return AddToReadEvents (Fd, 0L); + ASSERT ((reader != NULL), "bad reader"); + ASSERT ((reader->Fd() != -1), "bad fd = -1"); + addFdEvent (reader, "reader", m_readEvents); } -/** - * \brief Get a deferred that fires when given 'Fd' becomes readable - * - * \details The deferred returned by this method fires when given 'Fd' - * becomes readable or when given 'timeoutTime' elapses. - */ -Deferred * -Reactor::OnReadable (int Fd, h_msecs_t timeoutTime) throw () -{ - if (Fd == -1) - { THROW (DevError, "bad fd = -1"); } - if (timeoutTime == 0L) - { THROW (DevError, "timeoutTime = 0 will wait for ever"); } - return AddToReadEvents(Fd, timeoutTime); -} -/** - * \brief Get a deferred that fires when given 'Fd' becomes readable - * - * \details The deferred returned by this method does not keep track - * of time. It fires only when Fd becomes writable. - */ -Deferred * -Reactor::OnWritable (int Fd) throw () +void +Reactor::addWriter (Selectable *writer) { - if (Fd == -1) - { THROW (DevError, "bad fd = -1"); } - - return AddToWriteEvents (Fd, 0L); + ASSERT ((writer != NULL), "bad reader"); + ASSERT ((writer->Fd() != -1), "bad fd = -1"); + addFdEvent (writer, "writer", m_writeEvents); } -/** - * \brief Get a deferred that fires when given 'Fd' becomes readable - * - * \details The deferred returned by this method fires when given 'Fd' - * becomes writable or when given 'timeoutTime' elapses. - */ -Deferred * -Reactor::OnWritable (int Fd, h_msecs_t timeoutTime) throw () -{ - if (Fd == -1) - { THROW (DevError, "bad fd = -1"); } - if (timeoutTime == 0L) - { THROW (DevError, "timeoutTime = 0 will wait for ever"); } - return AddToWriteEvents (Fd, timeoutTime); -} - -/** - * \brief Add the given Fd to the on-readable-event monitoring list - * - * \param[in] Fd A valid file descriptor - * \param[in] timeoutTime Timeout in milliseconds - * - * \details This method adds the given file-descriptor to the list of - * file-descriptors being monitored for 'readability'. +/* + * Method to add a given fd to the given list. Used as a common + * convenience function for the addReader, addWriter methods. */ -Deferred * -Reactor::AddToReadEvents (int Fd, h_msecs_t timeoutTime) +void +Reactor::addFdEvent (Selectable *s, const char *type, + std::list<Reactor::Event *>& ev_list) { - size_t i; + Reactor::Event *ev; + std::list<Event *>::iterator it; /* lets make sure we dont have this file-descriptor in our lists already */ - for (i=0; i < m_readEvents.size(); i++) + for (it = ev_list.begin(); it != ev_list.end(); it++) { - if ((m_readEvents[i]->m_Fd == Fd) && (m_readEvents[i]->m_bDie == false)) + ev = *it; + if ((ev->Fd == s->Fd()) && (ev->bDie == false)) { - THROW (DevError, "fd=%u alreaded registered for On-Readable event", - Fd); + THROW (DevError, "fd=%u '%s' exists", ev->Fd, type); } } - Reactor::Event *ev; - try - { - ev = new Reactor::Event(); - } - catch (std::bad_alloc &e) - { - THROW (OutOfMemoryError, "allocating a read-'Event' object"); - } + /* create a new event and configure it */ + ev = new Reactor::Event (s); - ev->m_Fd = Fd; - ev->m_TimeoutTime = timeoutTime; - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + timeoutTime; - - try - { - m_readEvents.push_back (ev); - LOG3 ("added Reactor::Event<%p> OnReadable: timeout=%lld msecs, fd=%d", - ev, timeoutTime, Fd); - } - catch (...) - { - delete ev; - THROW (OutOfMemoryError, "adding 'Event' to m_readEvents"); - } - - return &ev->m_Deferred; + /* add the event to our lists */ + ev_list.push_back (ev); + LOG3 ("added Reactor::Event<%p> '%s': fd=%d", ev, type, ev->Fd); } -/** - * \brief Add the given Fd to the on-writable-event monitoring list - * - * \param[in] Fd A valid file descriptor - * \param[in] timeoutTime Timeout in milliseconds - * - * \details This method adds the given file-descriptor to the list of - * file-descriptors being monitored for 'writability'. +/* + * Methods to remove events from readers, writers, and time-waiters + * lists. */ -Deferred * -Reactor::AddToWriteEvents (int Fd, h_msecs_t timeoutTime) +void +Reactor::removeReader (Selectable *reader) { - size_t i; - - /* lets make sure we dont have a similar event already registered */ - for (i=0; i < m_writeEvents.size(); i++) - { - if ((m_writeEvents[i]->m_Fd == Fd) && (m_writeEvents[i]->m_bDie == false)) - { - THROW (DevError, "fd=%u alreaded registered for On-Writable event", - Fd); - } - } - Reactor::Event *ev; - try + std::list<Event *>::iterator it; + for (it = m_readEvents.begin(); it != m_readEvents.end(); it++) { - ev = new Reactor::Event(); + ev = *it; + if (ev->Fd == reader->Fd()) { ev->bDie = true; return; } } - catch (std::bad_alloc &e) - { - THROW (OutOfMemoryError, "allocating a write-'Event' object"); - } - - ev->m_Fd = Fd; - ev->m_TimeoutTime = timeoutTime; - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + timeoutTime; - - try - { - m_writeEvents.push_back (ev); -// LOG3 ("added Reactor::Event<%p> OnWritable: timeout=%lld msecs, fd=%d", -// ev, timeoutTime, Fd); - } - catch (...) - { - delete ev; - THROW (OutOfMemoryError, "adding 'Event' to m_writeEvents"); - } - - return &ev->m_Deferred; } - -/** - * \brief Creates a time-elapsed event - * - * \param[in] timeoutTime time-out time in milliseconds - * - * \details This method registers a time-elapsed event for the given - * 'timeoutTime'. - */ -Deferred * -Reactor::OnTimeElapsed (h_msecs_t timeoutTime) throw () -{ - Reactor::Event *ev; - - try - { - ev = new Reactor::Event(); - } - catch (std::bad_alloc &e) - { - THROW (OutOfMemoryError, "allocating an 'Event' object"); - } - - ev->m_TimeoutTime = timeoutTime; - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + timeoutTime; - - try - { - m_timeEvents.push_back (ev); - LOG3 ("added Reactor::Event<%p> for OnTimeElapsed (%lld msecs)", - ev, timeoutTime); - } - catch (...) - { - delete ev; - THROW (OutOfMemoryError, "adding 'Event' to m_timeEvents"); - } - - return &ev->m_Deferred; -} - - -/** - * \brief Unregisters the event associated with the given deferred - * - * \param[in] deferred The Deferred that was provided at the time of - * registering for an event. - * - * \details This method looks up the event-lists for the event that is - * associated with the given deferred object, and marks it for - * removal. - */ -void -Reactor::UnRegister (Deferred *deferred) throw () -{ - size_t i; - for (i=0; i< m_readEvents.size(); i++) - { - if (&m_readEvents[i]->m_Deferred == deferred) - { - m_readEvents[i]->m_bDie = true; - return; - } - } - for (i=0; i< m_writeEvents.size(); i++) - { - if (&m_writeEvents[i]->m_Deferred == deferred) - { - m_writeEvents[i]->m_bDie = true; - return; - } - } - for (i=0; i< m_timeEvents.size(); i++) - { - if (&m_timeEvents[i]->m_Deferred == deferred) - { - m_timeEvents[i]->m_bDie = true; - return; - } - } -} - - -/** - * \brief Cleans up the inactive events - * - * \details This method looks through the event-lists for events that - * can be removed, and removes them permanently. - */ void -Reactor::CleanupEventLists (void) +Reactor::removeWriter (Selectable *writer) { - size_t i; Reactor::Event *ev; - - for (i=0; i< m_readEvents.size(); i++) + std::list<Event *>::iterator it; + for (it = m_writeEvents.begin(); it != m_writeEvents.end(); it++) { - if (m_readEvents[i]->m_bDie == true) - { - ev = m_readEvents[i]; - m_readEvents.erase(m_readEvents.begin()+i); - delete ev; - i--; - } + ev = *it; + if (ev->Fd == writer->Fd()) { ev->bDie = true; return; } } - for (i=0; i< m_writeEvents.size(); i++) - { - if (m_writeEvents[i]->m_bDie == true) - { - ev = m_writeEvents[i]; - m_writeEvents.erase(m_writeEvents.begin()+i); - delete ev; - i--; - } - } - for (i=0; i< m_timeEvents.size(); i++) - { - if (m_timeEvents[i]->m_bDie == true) - { - ev = m_timeEvents[i]; - m_timeEvents.erase(m_timeEvents.begin()+i); - delete ev; - i--; - } - } } -/** - * \brief Process time-events - * - * \details This method looks up the list of time-events for events - * that are ready to be triggered, and triggers them. +/* + * The main run-loop. Simply sits in a tight loop waiting for events. */ void -Reactor::ProcessTimeEvents (void) +Reactor::Run (void) { - size_t i; - Reactor::Event * ev; - h_msecs_t excessTime; - - for (i=0; i < m_timeEvents.size(); i++) + while (1) { - ev = m_timeEvents[i]; - if (m_LastAwokenTS >= ev->m_NextTriggerTS) - { - excessTime = m_LastAwokenTS - ev->m_NextTriggerTS; - ev->m_Deferred.NotifyTimeout (excessTime); - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + ev->m_TimeoutTime; - } - } -} + /* wait for events */ + WaitForEvents (); + /* update the current timestamp */ + m_LastAwokenTS = GetCurrentTimeStamp(); -/** - * \brief Process the timeouts of FD events - * - * \details This method looks up the list of events for timeouts and - * fires them. - */ -void -Reactor::ProcessFDTimeouts (void) -{ - size_t i; - Reactor::Event * e; + /* process file-descriptor events if we have any */ + ProcessEvents (); - /* check for readable descriptors */ - for (i=0; i<m_readEvents.size(); i++) - { - e = m_readEvents[i]; - DoFDTimeoutNotification (e); - } - /* check for writable descriptors */ - for (i=0; i<m_writeEvents.size(); i++) - { - e = m_writeEvents[i]; - DoFDTimeoutNotification (e); - } -} + /* update the timer so that it can fire timeouts */ + /* TODO: update the timer here */ - -/** - * \brief The main run-loop - * - * \details This is the main run-loop of the reactor. This is a - * continuous while loop which waits for events and dispatches them. - */ -void -Reactor::Run (void) -{ - while (1) - { - WaitForEvents (); - m_LastAwokenTS = GetCurrentTimeStamp (); - if (m_PendingFDEvents > 0) - { - ProcessFDEvents (); - } - else - { - ProcessFDTimeouts (); - } - if (m_PendingFDEvents > 0) - { - LOG3 ("not all pending fd-events were processed. %d remaining", - m_PendingFDEvents); - } - ProcessTimeEvents (); + /* cleanup the event lists of dead 'Selectable's */ CleanupEventLists (); }; - ERR ("FATAL: broken out of Reactor::Run(). should not happen"); + ERR ("FATAL: broken out of Reactor::Run(). should not have happened"); } -/** - * \brief Helper to notify read/write events on a file-descriptor - * - * \param[in] e Pointer to the Reactor::Event whose deferred should be - * fired. - * - * \details This method fires the deferred for the given event. If the - * event-notification failes with a BadFDError exception, then this - * calls the OnError call-back, and unregisters the event from the - * reactor. +/* + * This method looks through the event-lists for events that can be + * removed, and removes them permanently. */ void -Reactor::DoFDEventNotification (Reactor::Event *ev) +Reactor::CleanupEventLists (void) { - try - { - ev->m_Deferred.NotifyEvent (); - } - catch (BadFDError& e) - { - /* We got a bad-file descriptor here */ - ev->m_Deferred.NotifyError (); - UnRegister (&ev->m_Deferred); - } - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + ev->m_TimeoutTime; -// LOG3 ("EvNo:: updating timeout, fd=%u, ts=%llu, to_ts=%llu", -// ev->m_Fd, ev->m_LastTriggerTS, ev->m_NextTriggerTS); - m_PendingFDEvents --; + _CleanupEventList (m_readEvents); + _CleanupEventList (m_writeEvents); } - -/** - * \brief Helper to notify timeout events on a file-descriptor - * - * \param[in] e Pointer to the Reactor::Event whose deferred should be - * fired in case we hit a timeout. - * - * \details This method fires the 'timeout' method in the deferred for - * the given event. First checks to see if the timeout has elapsed. +/* + * Convenience function to clean up entries in the given event list */ void -Reactor::DoFDTimeoutNotification (Reactor::Event *ev) +Reactor::_CleanupEventList (std::list<Event *>& ev_list) { - h_msecs_t excessTime; + Reactor::Event *ev; + std::list<Event *>::iterator it; - if (ev->m_TimeoutTime != 0) - { - if (m_LastAwokenTS >= ev->m_NextTriggerTS) + it = ev_list.begin(); + while (it != ev_list.end()) + { + ev = *it; + if (ev->bDie == true) { - excessTime = m_LastAwokenTS - ev->m_NextTriggerTS; - ev->m_Deferred.NotifyTimeout (excessTime); - ev->m_LastTriggerTS = m_LastAwokenTS; - ev->m_NextTriggerTS = m_LastAwokenTS + ev->m_TimeoutTime; -// LOG3 ("ToNo:: updating timeout, fd=%u, ts=%llu, to_ts=%llu", -// ev->m_Fd, ev->m_LastTriggerTS, ev->m_NextTriggerTS); + it = ev_list.erase (it); + delete ev; } + else + { + it++; + } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |