|
From: <axl...@us...> - 2009-10-17 02:53:55
|
Revision: 574
http://hgengine.svn.sourceforge.net/hgengine/?rev=574&view=rev
Author: axlecrusher
Date: 2009-10-17 02:53:41 +0000 (Sat, 17 Oct 2009)
Log Message:
-----------
Make message manager thread safe. Used semaphores to avoid OS scheduling.
Modified Paths:
--------------
Mercury2/src/MercuryMessageManager.cpp
Mercury2/src/MercuryMessageManager.h
Modified: Mercury2/src/MercuryMessageManager.cpp
===================================================================
--- Mercury2/src/MercuryMessageManager.cpp 2009-10-17 01:34:21 UTC (rev 573)
+++ Mercury2/src/MercuryMessageManager.cpp 2009-10-17 02:53:41 UTC (rev 574)
@@ -3,6 +3,9 @@
MercuryCTA HolderAllocator( sizeof(MessageHolder), 8 );
+/* MessageManager needs to be thread safe. Semaphores are used as locks (spin locks) to avoid OS rescheduling.
+Locks need to be held for as little time as possible. Scoping is used to put locking classes on the stack
+so that are ALWAYS released when out of scope.*/
MessageHolder::MessageHolder()
:data(NULL),when(0)
@@ -19,31 +22,40 @@
MessageHolder * m = new(HolderAllocator.Malloc()) MessageHolder();
m->message = message;
m->data = data;
- m->when = m_currTime + uint64_t(delay*1000000);
- m_messageQueue.Push( m );
+
+ {
+ //scope the lock to a very small portion of code
+ MSemaphoreLock lock(&m_queueLock);
+ m->when = m_currTime + uint64_t(delay*1000000);
+ m_messageQueue.Push( m );
+ }
}
void MercuryMessageManager::PumpMessages(const uint64_t& currTime)
{
- m_currTime = currTime;
- while ( !m_messageQueue.Empty() )
{
- if ( ((MessageHolder *)m_messageQueue.Peek())->when > m_currTime ) return;
-
- MessageHolder * message = (MessageHolder *)m_messageQueue.Pop();
- FireOffMessage( *message );
- SAFE_DELETE( message->data );
- HolderAllocator.Free(message);
+ MSemaphoreLock lock(&m_queueLock);
+ m_currTime = currTime;
}
+
+ for (MessageHolder* mh = GetNextMessageFromQueue(); mh; mh = GetNextMessageFromQueue())
+ {
+ FireOffMessage( *mh );
+ SAFE_DELETE( mh->data );
+ HolderAllocator.Free(mh);
+ }
}
void MercuryMessageManager::RegisterForMessage(const MString& message, MessageHandler* ptr)
{
+ MSemaphoreLock lock(&m_recipientLock);
m_messageRecipients[message].push_back(ptr);
}
void MercuryMessageManager::UnRegisterForMessage(const MString& message, MessageHandler* ptr)
{
+ MSemaphoreLock lock(&m_recipientLock);
+
std::list< MessageHandler* >& subscriptions = m_messageRecipients[message];
std::list< MessageHandler* >::iterator i = subscriptions.begin();
@@ -61,15 +73,38 @@
void MercuryMessageManager::FireOffMessage( const MessageHolder & message )
{
-// std::map< MString, std::list< MessageHandler* > >::iterator i = m_messageRecipients.find(message.message);
- std::list< MessageHandler* > * ref = m_messageRecipients.get( message.message );
- if ( ref )
+ std::list< MessageHandler* > recipients;
{
- std::list< MessageHandler* >::iterator recipients = ref->begin();
+ //copy list first (quick lock)
+ MSemaphoreLock lock(&m_recipientLock);
+ std::list< MessageHandler* > * r = m_messageRecipients.get( message.message );
+ if ( r ) recipients = *r;
+ }
+
+ if ( !recipients.empty() )
+ {
+ std::list< MessageHandler* >::iterator recipient = recipients.begin();
+ for (; recipient != recipients.end(); ++recipient)
+ {
+ (*recipient)->HandleMessage(message.message, *(message.data) );
+ }
+ }
+}
+
+MessageHolder* MercuryMessageManager::GetNextMessageFromQueue()
+{
+ /* We need to ensure that viewing the queue and retrieving the message
+ happens without the queue changing. */
+ MSemaphoreLock lock(&m_queueLock);
- for (; recipients != ref->end(); ++recipients)
- (*recipients)->HandleMessage(message.message, *(message.data) );
+ MessageHolder* mh = NULL;
+ if ( !m_messageQueue.Empty() )
+ {
+ if ( ((MessageHolder *)m_messageQueue.Peek())->when > m_currTime ) return NULL;
+ mh = (MessageHolder *)m_messageQueue.Pop();
}
+
+ return mh;
}
MercuryMessageManager& MercuryMessageManager::GetInstance()
Modified: Mercury2/src/MercuryMessageManager.h
===================================================================
--- Mercury2/src/MercuryMessageManager.h 2009-10-17 01:34:21 UTC (rev 573)
+++ Mercury2/src/MercuryMessageManager.h 2009-10-17 02:53:41 UTC (rev 574)
@@ -12,6 +12,8 @@
#include <Mint.h>
#include <MAutoPtr.h>
+#include <MSemaphore.h>
+
class MessageHolder : public RefBase
{
public:
@@ -40,11 +42,16 @@
static MercuryMessageManager& GetInstance();
private:
void FireOffMessage( const MessageHolder & message );
+ MessageHolder* GetNextMessageFromQueue();
PriorityQueue m_messageQueue;
uint64_t m_currTime; //microseconds
MHash< std::list< MessageHandler* > > m_messageRecipients;
+
+// MercuryMutex m_lock;
+ MSemaphore m_queueLock;
+ MSemaphore m_recipientLock;
};
static InstanceCounter<MercuryMessageManager> MMcounter("MessageManager");
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|