From: <kha...@us...> - 2008-04-19 18:09:58
|
Revision: 1361 http://planeshift.svn.sourceforge.net/planeshift/?rev=1361&view=rev Author: khakilord Date: 2008-04-19 11:10:04 -0700 (Sat, 19 Apr 2008) Log Message: ----------- Rethought and reverted approach to full message queue as old implementation had bugs in acking. The network thread now blocks on a full queue, the most elegant solution. Not dealing with acks reduces potential bugs and reduces cpu spinning. Modified Paths: -------------- trunk/src/common/util/genrefqueue.h Modified: trunk/src/common/util/genrefqueue.h =================================================================== --- trunk/src/common/util/genrefqueue.h 2008-04-19 14:40:49 UTC (rev 1360) +++ trunk/src/common/util/genrefqueue.h 2008-04-19 18:10:04 UTC (rev 1361) @@ -48,8 +48,33 @@ { delete[] qbuffer; } - - /** This adds a message to the queue */ + + /** like above, but waits to add the next message, if the queue is full + * be careful with this. It's easy to deadlock! */ + bool AddWait(queuetype* msg, csTicks timeout = 0) + { + // is there's a space in the queue left just add it + CS::Threading::RecursiveMutexScopedLock lock(mutex); + while(true) + { + bool added = Add(msg); + if (added) + { + return true; + } + Error1("Queue full! Waiting.\n"); + + // Wait release mutex before waiting so that it is possible to + // add new messages. + if (!datacondition.Wait(mutex, timeout)) + { + // Timed out waiting for new message + return false; + } + } + } + + /** This adds a message to the queue and waits if it is full */ bool Add(queuetype* msg) { unsigned int tqend; @@ -60,9 +85,9 @@ { tqend = (qend + 1) % qsize; // check if queue is full - if (tqend == qstart) + while (tqend == qstart) { - return false; + datacondition.Wait(mutex); } // check are we having a refcount race (in which msg would already be destroyed) CS_ASSERT(msg->GetRefCount() > 0); @@ -103,6 +128,7 @@ } ptr->SetPending(false); + Interrupt(); return csPtr<queuetype>(ptr); } @@ -112,38 +138,24 @@ { // is there's a message in the queue left just return it CS::Threading::RecursiveMutexScopedLock lock(mutex); - csRef<queuetype> temp = Get(); - if (temp) + while(true) { - return csPtr<queuetype> (temp); - } + csRef<queuetype> temp = Get(); + if (temp) + { + return csPtr<queuetype> (temp); + } - // Wait release mutex before waiting so that it is possible to - // add new messages. - if (!datacondition.Wait(mutex, timeout)) - { - // Timed out waiting for new message - return 0; - } - - csRef<queuetype> ptr; - // if this is a weakref queue we should skip over null entries - while(!ptr.IsValid()) - { - // check if queue is empty - if (qstart == qend) + // Wait release mutex before waiting so that it is possible to + // add new messages. + if (!datacondition.Wait(mutex, timeout)) + { + // Timed out waiting for new message return 0; - - // removes Message from queue - ptr = qbuffer[qstart]; - qbuffer[qstart] = 0; - - qstart = (qstart + 1) % qsize; + } } - - ptr->SetPending(false); - - return csPtr<queuetype>(ptr); + CS_ASSERT(false); + return 0; } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |