From: <axl...@us...> - 2009-10-17 02:53:55
|
Revision: 574 http://hgengine.svn.sourceforge.net/hgengine/?rev=574&view=rev Author: axlecrusher Date: 2009-10-17 02:53:41 +0000 (Sat, 17 Oct 2009) Log Message: ----------- Make message manager thread safe. Used semaphores to avoid OS scheduling. Modified Paths: -------------- Mercury2/src/MercuryMessageManager.cpp Mercury2/src/MercuryMessageManager.h Modified: Mercury2/src/MercuryMessageManager.cpp =================================================================== --- Mercury2/src/MercuryMessageManager.cpp 2009-10-17 01:34:21 UTC (rev 573) +++ Mercury2/src/MercuryMessageManager.cpp 2009-10-17 02:53:41 UTC (rev 574) @@ -3,6 +3,9 @@ MercuryCTA HolderAllocator( sizeof(MessageHolder), 8 ); +/* MessageManager needs to be thread safe. Semaphores are used as locks (spin locks) to avoid OS rescheduling. +Locks need to be held for as little time as possible. Scoping is used to put locking classes on the stack +so that are ALWAYS released when out of scope.*/ MessageHolder::MessageHolder() :data(NULL),when(0) @@ -19,31 +22,40 @@ MessageHolder * m = new(HolderAllocator.Malloc()) MessageHolder(); m->message = message; m->data = data; - m->when = m_currTime + uint64_t(delay*1000000); - m_messageQueue.Push( m ); + + { + //scope the lock to a very small portion of code + MSemaphoreLock lock(&m_queueLock); + m->when = m_currTime + uint64_t(delay*1000000); + m_messageQueue.Push( m ); + } } void MercuryMessageManager::PumpMessages(const uint64_t& currTime) { - m_currTime = currTime; - while ( !m_messageQueue.Empty() ) { - if ( ((MessageHolder *)m_messageQueue.Peek())->when > m_currTime ) return; - - MessageHolder * message = (MessageHolder *)m_messageQueue.Pop(); - FireOffMessage( *message ); - SAFE_DELETE( message->data ); - HolderAllocator.Free(message); + MSemaphoreLock lock(&m_queueLock); + m_currTime = currTime; } + + for (MessageHolder* mh = GetNextMessageFromQueue(); mh; mh = GetNextMessageFromQueue()) + { + FireOffMessage( *mh ); + SAFE_DELETE( mh->data ); + HolderAllocator.Free(mh); + } } void MercuryMessageManager::RegisterForMessage(const MString& message, MessageHandler* ptr) { + MSemaphoreLock lock(&m_recipientLock); m_messageRecipients[message].push_back(ptr); } void MercuryMessageManager::UnRegisterForMessage(const MString& message, MessageHandler* ptr) { + MSemaphoreLock lock(&m_recipientLock); + std::list< MessageHandler* >& subscriptions = m_messageRecipients[message]; std::list< MessageHandler* >::iterator i = subscriptions.begin(); @@ -61,15 +73,38 @@ void MercuryMessageManager::FireOffMessage( const MessageHolder & message ) { -// std::map< MString, std::list< MessageHandler* > >::iterator i = m_messageRecipients.find(message.message); - std::list< MessageHandler* > * ref = m_messageRecipients.get( message.message ); - if ( ref ) + std::list< MessageHandler* > recipients; { - std::list< MessageHandler* >::iterator recipients = ref->begin(); + //copy list first (quick lock) + MSemaphoreLock lock(&m_recipientLock); + std::list< MessageHandler* > * r = m_messageRecipients.get( message.message ); + if ( r ) recipients = *r; + } + + if ( !recipients.empty() ) + { + std::list< MessageHandler* >::iterator recipient = recipients.begin(); + for (; recipient != recipients.end(); ++recipient) + { + (*recipient)->HandleMessage(message.message, *(message.data) ); + } + } +} + +MessageHolder* MercuryMessageManager::GetNextMessageFromQueue() +{ + /* We need to ensure that viewing the queue and retrieving the message + happens without the queue changing. */ + MSemaphoreLock lock(&m_queueLock); - for (; recipients != ref->end(); ++recipients) - (*recipients)->HandleMessage(message.message, *(message.data) ); + MessageHolder* mh = NULL; + if ( !m_messageQueue.Empty() ) + { + if ( ((MessageHolder *)m_messageQueue.Peek())->when > m_currTime ) return NULL; + mh = (MessageHolder *)m_messageQueue.Pop(); } + + return mh; } MercuryMessageManager& MercuryMessageManager::GetInstance() Modified: Mercury2/src/MercuryMessageManager.h =================================================================== --- Mercury2/src/MercuryMessageManager.h 2009-10-17 01:34:21 UTC (rev 573) +++ Mercury2/src/MercuryMessageManager.h 2009-10-17 02:53:41 UTC (rev 574) @@ -12,6 +12,8 @@ #include <Mint.h> #include <MAutoPtr.h> +#include <MSemaphore.h> + class MessageHolder : public RefBase { public: @@ -40,11 +42,16 @@ static MercuryMessageManager& GetInstance(); private: void FireOffMessage( const MessageHolder & message ); + MessageHolder* GetNextMessageFromQueue(); PriorityQueue m_messageQueue; uint64_t m_currTime; //microseconds MHash< std::list< MessageHandler* > > m_messageRecipients; + +// MercuryMutex m_lock; + MSemaphore m_queueLock; + MSemaphore m_recipientLock; }; static InstanceCounter<MercuryMessageManager> MMcounter("MessageManager"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |