From: <he...@us...> - 2012-04-09 21:00:13
|
Revision: 304 http://simspark.svn.sourceforge.net/simspark/?rev=304&view=rev Author: hedayat Date: 2012-04-09 21:00:07 +0000 (Mon, 09 Apr 2012) Log Message: ----------- Multi-threaded AgentControl patch by Andreas Seekircher added with some cleanups. It is now enabled by default but might be disabled for the next release if suspected. Modified Paths: -------------- trunk/spark/ChangeLog trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp trunk/spark/lib/oxygen/simulationserver/agentcontrol.h trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp trunk/spark/spark/spark.rb Modified: trunk/spark/ChangeLog =================================================================== --- trunk/spark/ChangeLog 2012-04-09 20:46:44 UTC (rev 303) +++ trunk/spark/ChangeLog 2012-04-09 21:00:07 UTC (rev 304) @@ -1,3 +1,15 @@ +2012-04-10 Hedayat Vatankhah <hed...@gm...> + + * lib/oxygen/simulationserver/agentcontrol.h: + * lib/oxygen/simulationserver/agentcontrol.cpp: + * lib/oxygen/simulationserver/agentcontrol_c.cpp: + - AgentControl multi-threaded implementation added with some cleanups. + Thanks to Andreas Seekircher for providing the patch. + + * spark/spark.rb: + - sets AgentControls multi-threaded mode according to $threadedAgentControl + variable + 2012-04-07 Hedayat Vatankhah <hed...@gm...> * lib/oxygen/simulationserver/simulationserver.cpp (SimulationServer::Step): Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp =================================================================== --- trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp 2012-04-09 20:46:44 UTC (rev 303) +++ trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp 2012-04-09 21:00:07 UTC (rev 304) @@ -29,13 +29,16 @@ using namespace boost; using namespace std; -AgentControl::AgentControl() : NetControl(), mSyncMode(false) +AgentControl::AgentControl() : NetControl(), mSyncMode(false), + mMultiThreads(true), mThreadBarrierNew(NULL), nThreads(0) { + mThreadBarrier = new boost::barrier(1); mLocalAddr.setPort(3100); } AgentControl::~AgentControl() { + delete mThreadBarrier; } void AgentControl::OnLink() @@ -62,8 +65,24 @@ } mGameControlServer->AgentConnect(client->id); + + //Create a new thread and new barrier + if(mMultiThreads) + { + /**@todo Make this safe! */ + if(mThreadBarrierNew != NULL) + GetLog()->Error() + << "(AgentControl) ERROR mThreadBarrierNew!=NULL!" + << " Agents connecting/disconnecting in same frame !\n"; + nThreads++; + mThreadBarrierNew = new boost::barrier(nThreads+1); + boost::thread* newThread = + mThreadGroup.create_thread(boost::bind(&AgentControl::AgentThread, + this, client)); + } } + void AgentControl::ClientDisconnect(boost::shared_ptr<Client> client) { mClientSenses[client->id].clear(); @@ -90,52 +109,69 @@ return; } - // pass all received messages on to the GameControlServer - for ( - TBufferMap::iterator iter = mBuffers.begin(); - iter != mBuffers.end(); - ++iter - ) - { - boost::shared_ptr<NetBuffer>& netBuff = (*iter).second; - if ( - (netBuff.get() == 0) || - (netBuff->IsEmpty()) - ) - { - continue; - } + //if(!mMultiThreads) + //{ + // pass all received messages on to the GameControlServer + for ( + TBufferMap::iterator iter = mBuffers.begin(); + iter != mBuffers.end(); + ++iter + ) + { + boost::shared_ptr<NetBuffer>& netBuff = (*iter).second; + if ( + (netBuff.get() == 0) || + (netBuff->IsEmpty()) + ) + { + continue; + } - // lookup the client entry corresponding for the buffer - // entry - TAddrMap::iterator clientIter = mClients.find(netBuff->GetAddr()); - if (clientIter == mClients.end()) - { - continue; - } - boost::shared_ptr<Client>& client = (*clientIter).second; + // lookup the client entry corresponding for the buffer + // entry + TAddrMap::iterator clientIter = mClients.find(netBuff->GetAddr()); + if (clientIter == mClients.end()) + { + continue; + } + boost::shared_ptr<Client>& client = (*clientIter).second; - // lookup the AgentAspect node correspoding to the client - boost::shared_ptr<AgentAspect> agent = - mGameControlServer->GetAgentAspect(client->id); - if (agent.get() == 0) - { - continue; - } - - // parse and immediately realize the action - string message; - while (mNetMessage->Extract(netBuff,message)) - { - agent->RealizeActions - (mGameControlServer->Parse(client->id,message)); - } - } + // start cycle for this client + StartCycle(client, netBuff); + } + /*} + else + { + mThreadAction = STARTCYCLE; + WaitMaster(); //let threads start + WaitMaster(); //wait for threads to finish + }*/ } while (!AgentsAreSynced()); } +void AgentControl::StartCycle(const boost::shared_ptr<Client> &client, + boost::shared_ptr<NetBuffer> &netBuff) +{ + // lookup the AgentAspect node corresponding to the client + boost::shared_ptr<AgentAspect> agent = + mGameControlServer->GetAgentAspect(client->id); + if (agent.get() == 0) + { + return; + } + // parse and immediately realize the action + string message; + while (mNetMessage->Extract(netBuff,message)) + { + agent->RealizeActions + (mGameControlServer->Parse(client->id,message)); + } +} + void AgentControl::SenseAgent() { + //if(!mMultiThreads) + //{ int clientID; for ( TAddrMap::iterator iter = mClients.begin(); @@ -149,6 +185,13 @@ SendClientMessage(iter->second, mClientSenses[clientID]); } } + /*} + else + { + mThreadAction = SENSEAGENT; + WaitMaster(); //let threads start + WaitMaster(); //wait for threads to finish + }*/ } void AgentControl::EndCycle() @@ -164,44 +207,57 @@ return; } + if(!mMultiThreads) + { + // generate senses for all agents + for ( + TAddrMap::iterator iter = mClients.begin(); + iter != mClients.end(); + ++iter + ) + { + const boost::shared_ptr<Client> &client = (*iter).second; + EndCycle(client); + } + } + else + { + mThreadAction = ENDCYCLE; + WaitMaster(); //let threads start + WaitMaster(); //wait for threads to finish + } +} + +void AgentControl::EndCycle(const boost::shared_ptr<Client> &client) +{ boost::shared_ptr<BaseParser> parser = mGameControlServer->GetParser(); if (parser.get() == 0) - { - GetLog()->Error() - << "(AgentControl) ERROR: got no parser from " - << " the GameControlServer" << endl; - return; - } + { + GetLog()->Error() + << "(AgentControl) ERROR: got no parser from " + << " the GameControlServer" << endl; + return; + } - // generate senses for all agents - for ( - TAddrMap::iterator iter = mClients.begin(); - iter != mClients.end(); - ++iter - ) - { - const boost::shared_ptr<Client> &client = (*iter).second; + boost::shared_ptr<AgentAspect> agent = + mGameControlServer->GetAgentAspect(client->id); + if (agent.get() == 0) + { + return; + } + if (mSyncMode) + { + agent->SetSynced(false); + } - boost::shared_ptr<AgentAspect> agent = - mGameControlServer->GetAgentAspect(client->id); - if (agent.get() == 0) - { - continue; - } - if (mSyncMode) - { - agent->SetSynced(false); - } + boost::shared_ptr<PredicateList> senseList = agent->QueryPerceptors(); + mClientSenses[client->id] = parser->Generate(senseList); + if (mClientSenses[client->id].empty()) + { + return; + } - boost::shared_ptr<PredicateList> senseList = agent->QueryPerceptors(); - mClientSenses[client->id] = parser->Generate(senseList); - if (mClientSenses[client->id].empty()) - { - continue; - } - - mNetMessage->PrepareToSend(mClientSenses[client->id]); - } + mNetMessage->PrepareToSend(mClientSenses[client->id]); } void AgentControl::SetSyncMode(bool syncMode) @@ -221,6 +277,11 @@ } } +void AgentControl::SetMultiThreaded(bool multiThreaded) +{ + mMultiThreads = multiThreaded; +} + bool AgentControl::AgentsAreSynced() { if (mSyncMode) @@ -247,3 +308,84 @@ } return true; } + + +void AgentControl::AgentThread(const boost::shared_ptr<Client> &client) +{ + boost::barrier *currentBarrier = mThreadBarrierNew; + + while(client->socket->isOpen()) + { + WaitSlave(currentBarrier); + + //StartCycle not parallel: + // parser and agentState::addMessage not thread safe. + // additional synchronization required -> no speed-up ! + if(mThreadAction == STARTCYCLE) + { + + TBufferMap::iterator buf = mBuffers.find(client->addr); + if(buf != mBuffers.end()) + { + boost::shared_ptr<NetBuffer>& netBuff = buf->second; + if (netBuff.get() != 0 && !netBuff->IsEmpty()) + StartCycle(client, netBuff); + } + + } + + // SenseAgent not parallel: not enough computation, no speed-up ! + else if(mThreadAction == SENSEAGENT) + { + + + std::string& senses = mClientSenses[client->id]; + if (!senses.empty()) + SendClientMessage(client, senses); + + } + + // Here we get a speed-up ! + else if(mThreadAction == ENDCYCLE) + { + + EndCycle(client); + + } + + WaitSlave(currentBarrier); + } + + nThreads--; + if(mThreadBarrierNew != NULL) + GetLog()->Error() + << "(AgentControl) ERROR mThreadBarrierNew!=NULL!" + << " Agents connecting/disconnecting in same frame !\n"; + mThreadBarrierNew = new boost::barrier(nThreads+1); + currentBarrier->wait(); +} + +void AgentControl::WaitMaster() +{ + if(mThreadBarrierNew != NULL) + { + boost::barrier *oldBarrier = mThreadBarrier; + mThreadBarrier = mThreadBarrierNew; + oldBarrier->wait(); + mThreadBarrier->wait(); + delete oldBarrier; + mThreadBarrierNew = NULL; + } + else + mThreadBarrier->wait(); +} + +void AgentControl::WaitSlave(boost::barrier* ¤tBarrier) +{ + currentBarrier->wait(); + if(currentBarrier != mThreadBarrier) + { + currentBarrier = mThreadBarrier; + currentBarrier->wait(); + } +} Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol.h =================================================================== --- trunk/spark/lib/oxygen/simulationserver/agentcontrol.h 2012-04-09 20:46:44 UTC (rev 303) +++ trunk/spark/lib/oxygen/simulationserver/agentcontrol.h 2012-04-09 21:00:07 UTC (rev 304) @@ -23,6 +23,7 @@ #include "netcontrol.h" #include <oxygen/oxygen_defines.h> #include <oxygen/gamecontrolserver/gamecontrolserver.h> +#include <boost/thread/barrier.hpp> namespace oxygen { @@ -56,12 +57,34 @@ /** sets the AgentControl's sync mode */ void SetSyncMode(bool syncMode); + /** sets the AgentControl's sync mode */ + void SetMultiThreaded(bool multiThreaded); + protected: virtual void OnLink(); /** returns if the agents are synced with the srever */ bool AgentsAreSynced(); + /** the thread function which does EndCycle for one agent in + * multi-threaded mode. */ + void AgentThread(const boost::shared_ptr<Client> &client); + + /** forwards all pending messages from a specific agent to the + GameControlServer */ + void StartCycle(const boost::shared_ptr<Client> &client, + boost::shared_ptr<NetBuffer> &netBuff); + + /** generates sense updates for a specific agent */ + void EndCycle(const boost::shared_ptr<Client> &client); + + /** waits for all agent threads to catch up */ + void WaitMaster(); + + /** called in an agent thread to wait for the master thread to signal + * a new task */ + void WaitSlave(boost::barrier* ¤tBarrier); + protected: /** cached reference to the GameControlServer */ CachedPath<GameControlServer> mGameControlServer; @@ -75,6 +98,20 @@ * proceed to the next cycle */ bool mSyncMode; + + /** indicates if the AgentControl runs in multi-threads */ + bool mMultiThreads; + + /** barrier object for synchronizing threads in multi-threaded mode */ + boost::barrier *mThreadBarrier; + boost::barrier *mThreadBarrierNew; + + /** boost thread group to create a new thread when an agent connects */ + boost::thread_group mThreadGroup; + int nThreads; + + /** indicates what should happen in the agent thread right now */ + enum { STARTCYCLE, SENSEAGENT, ENDCYCLE } mThreadAction; }; DECLARE_CLASS(AgentControl); Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp =================================================================== --- trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp 2012-04-09 20:46:44 UTC (rev 303) +++ trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp 2012-04-09 21:00:07 UTC (rev 304) @@ -35,8 +35,22 @@ return true; } +FUNCTION(AgentControl, setMultiThreaded) +{ + bool inSet; + + if ((in.GetSize() != 1) || (!in.GetValue(in[0], inSet))) + { + return false; + } + + obj->SetMultiThreaded(inSet); + return true; +} + void CLASS(AgentControl)::DefineClass() { DEFINE_BASECLASS(oxygen/NetControl); DEFINE_FUNCTION(setSyncMode); + DEFINE_FUNCTION(setMultiThreaded); } Modified: trunk/spark/spark/spark.rb =================================================================== --- trunk/spark/spark/spark.rb 2012-04-09 20:46:44 UTC (rev 303) +++ trunk/spark/spark/spark.rb 2012-04-09 21:00:07 UTC (rev 304) @@ -44,6 +44,7 @@ $agentType = 'tcp' $agentPort = 3100 $agentSyncMode = false +$threadedAgentControl = true # (MonitorControl) constants # @@ -311,6 +312,7 @@ agentControl.setServerPort($agentPort) agentControl.setStep($agentStep) agentControl.setSyncMode($agentSyncMode) + agentControl.setMultiThreaded($threadedAgentControl) end if ($agentType == 'udp') This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |