|
From: <he...@us...> - 2012-04-09 21:00:13
|
Revision: 304
http://simspark.svn.sourceforge.net/simspark/?rev=304&view=rev
Author: hedayat
Date: 2012-04-09 21:00:07 +0000 (Mon, 09 Apr 2012)
Log Message:
-----------
Multi-threaded AgentControl patch by Andreas Seekircher added with some cleanups. It is now enabled by default but might be disabled for the next release if suspected.
Modified Paths:
--------------
trunk/spark/ChangeLog
trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp
trunk/spark/lib/oxygen/simulationserver/agentcontrol.h
trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp
trunk/spark/spark/spark.rb
Modified: trunk/spark/ChangeLog
===================================================================
--- trunk/spark/ChangeLog 2012-04-09 20:46:44 UTC (rev 303)
+++ trunk/spark/ChangeLog 2012-04-09 21:00:07 UTC (rev 304)
@@ -1,3 +1,15 @@
+2012-04-10 Hedayat Vatankhah <hed...@gm...>
+
+ * lib/oxygen/simulationserver/agentcontrol.h:
+ * lib/oxygen/simulationserver/agentcontrol.cpp:
+ * lib/oxygen/simulationserver/agentcontrol_c.cpp:
+ - AgentControl multi-threaded implementation added with some cleanups.
+ Thanks to Andreas Seekircher for providing the patch.
+
+ * spark/spark.rb:
+ - sets AgentControls multi-threaded mode according to $threadedAgentControl
+ variable
+
2012-04-07 Hedayat Vatankhah <hed...@gm...>
* lib/oxygen/simulationserver/simulationserver.cpp (SimulationServer::Step):
Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp
===================================================================
--- trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp 2012-04-09 20:46:44 UTC (rev 303)
+++ trunk/spark/lib/oxygen/simulationserver/agentcontrol.cpp 2012-04-09 21:00:07 UTC (rev 304)
@@ -29,13 +29,16 @@
using namespace boost;
using namespace std;
-AgentControl::AgentControl() : NetControl(), mSyncMode(false)
+AgentControl::AgentControl() : NetControl(), mSyncMode(false),
+ mMultiThreads(true), mThreadBarrierNew(NULL), nThreads(0)
{
+ mThreadBarrier = new boost::barrier(1);
mLocalAddr.setPort(3100);
}
AgentControl::~AgentControl()
{
+ delete mThreadBarrier;
}
void AgentControl::OnLink()
@@ -62,8 +65,24 @@
}
mGameControlServer->AgentConnect(client->id);
+
+ //Create a new thread and new barrier
+ if(mMultiThreads)
+ {
+ /**@todo Make this safe! */
+ if(mThreadBarrierNew != NULL)
+ GetLog()->Error()
+ << "(AgentControl) ERROR mThreadBarrierNew!=NULL!"
+ << " Agents connecting/disconnecting in same frame !\n";
+ nThreads++;
+ mThreadBarrierNew = new boost::barrier(nThreads+1);
+ boost::thread* newThread =
+ mThreadGroup.create_thread(boost::bind(&AgentControl::AgentThread,
+ this, client));
+ }
}
+
void AgentControl::ClientDisconnect(boost::shared_ptr<Client> client)
{
mClientSenses[client->id].clear();
@@ -90,52 +109,69 @@
return;
}
- // pass all received messages on to the GameControlServer
- for (
- TBufferMap::iterator iter = mBuffers.begin();
- iter != mBuffers.end();
- ++iter
- )
- {
- boost::shared_ptr<NetBuffer>& netBuff = (*iter).second;
- if (
- (netBuff.get() == 0) ||
- (netBuff->IsEmpty())
- )
- {
- continue;
- }
+ //if(!mMultiThreads)
+ //{
+ // pass all received messages on to the GameControlServer
+ for (
+ TBufferMap::iterator iter = mBuffers.begin();
+ iter != mBuffers.end();
+ ++iter
+ )
+ {
+ boost::shared_ptr<NetBuffer>& netBuff = (*iter).second;
+ if (
+ (netBuff.get() == 0) ||
+ (netBuff->IsEmpty())
+ )
+ {
+ continue;
+ }
- // lookup the client entry corresponding for the buffer
- // entry
- TAddrMap::iterator clientIter = mClients.find(netBuff->GetAddr());
- if (clientIter == mClients.end())
- {
- continue;
- }
- boost::shared_ptr<Client>& client = (*clientIter).second;
+ // lookup the client entry corresponding for the buffer
+ // entry
+ TAddrMap::iterator clientIter = mClients.find(netBuff->GetAddr());
+ if (clientIter == mClients.end())
+ {
+ continue;
+ }
+ boost::shared_ptr<Client>& client = (*clientIter).second;
- // lookup the AgentAspect node correspoding to the client
- boost::shared_ptr<AgentAspect> agent =
- mGameControlServer->GetAgentAspect(client->id);
- if (agent.get() == 0)
- {
- continue;
- }
-
- // parse and immediately realize the action
- string message;
- while (mNetMessage->Extract(netBuff,message))
- {
- agent->RealizeActions
- (mGameControlServer->Parse(client->id,message));
- }
- }
+ // start cycle for this client
+ StartCycle(client, netBuff);
+ }
+ /*}
+ else
+ {
+ mThreadAction = STARTCYCLE;
+ WaitMaster(); //let threads start
+ WaitMaster(); //wait for threads to finish
+ }*/
} while (!AgentsAreSynced());
}
+void AgentControl::StartCycle(const boost::shared_ptr<Client> &client,
+ boost::shared_ptr<NetBuffer> &netBuff)
+{
+ // lookup the AgentAspect node corresponding to the client
+ boost::shared_ptr<AgentAspect> agent =
+ mGameControlServer->GetAgentAspect(client->id);
+ if (agent.get() == 0)
+ {
+ return;
+ }
+ // parse and immediately realize the action
+ string message;
+ while (mNetMessage->Extract(netBuff,message))
+ {
+ agent->RealizeActions
+ (mGameControlServer->Parse(client->id,message));
+ }
+}
+
void AgentControl::SenseAgent()
{
+ //if(!mMultiThreads)
+ //{
int clientID;
for (
TAddrMap::iterator iter = mClients.begin();
@@ -149,6 +185,13 @@
SendClientMessage(iter->second, mClientSenses[clientID]);
}
}
+ /*}
+ else
+ {
+ mThreadAction = SENSEAGENT;
+ WaitMaster(); //let threads start
+ WaitMaster(); //wait for threads to finish
+ }*/
}
void AgentControl::EndCycle()
@@ -164,44 +207,57 @@
return;
}
+ if(!mMultiThreads)
+ {
+ // generate senses for all agents
+ for (
+ TAddrMap::iterator iter = mClients.begin();
+ iter != mClients.end();
+ ++iter
+ )
+ {
+ const boost::shared_ptr<Client> &client = (*iter).second;
+ EndCycle(client);
+ }
+ }
+ else
+ {
+ mThreadAction = ENDCYCLE;
+ WaitMaster(); //let threads start
+ WaitMaster(); //wait for threads to finish
+ }
+}
+
+void AgentControl::EndCycle(const boost::shared_ptr<Client> &client)
+{
boost::shared_ptr<BaseParser> parser = mGameControlServer->GetParser();
if (parser.get() == 0)
- {
- GetLog()->Error()
- << "(AgentControl) ERROR: got no parser from "
- << " the GameControlServer" << endl;
- return;
- }
+ {
+ GetLog()->Error()
+ << "(AgentControl) ERROR: got no parser from "
+ << " the GameControlServer" << endl;
+ return;
+ }
- // generate senses for all agents
- for (
- TAddrMap::iterator iter = mClients.begin();
- iter != mClients.end();
- ++iter
- )
- {
- const boost::shared_ptr<Client> &client = (*iter).second;
+ boost::shared_ptr<AgentAspect> agent =
+ mGameControlServer->GetAgentAspect(client->id);
+ if (agent.get() == 0)
+ {
+ return;
+ }
+ if (mSyncMode)
+ {
+ agent->SetSynced(false);
+ }
- boost::shared_ptr<AgentAspect> agent =
- mGameControlServer->GetAgentAspect(client->id);
- if (agent.get() == 0)
- {
- continue;
- }
- if (mSyncMode)
- {
- agent->SetSynced(false);
- }
+ boost::shared_ptr<PredicateList> senseList = agent->QueryPerceptors();
+ mClientSenses[client->id] = parser->Generate(senseList);
+ if (mClientSenses[client->id].empty())
+ {
+ return;
+ }
- boost::shared_ptr<PredicateList> senseList = agent->QueryPerceptors();
- mClientSenses[client->id] = parser->Generate(senseList);
- if (mClientSenses[client->id].empty())
- {
- continue;
- }
-
- mNetMessage->PrepareToSend(mClientSenses[client->id]);
- }
+ mNetMessage->PrepareToSend(mClientSenses[client->id]);
}
void AgentControl::SetSyncMode(bool syncMode)
@@ -221,6 +277,11 @@
}
}
+void AgentControl::SetMultiThreaded(bool multiThreaded)
+{
+ mMultiThreads = multiThreaded;
+}
+
bool AgentControl::AgentsAreSynced()
{
if (mSyncMode)
@@ -247,3 +308,84 @@
}
return true;
}
+
+
+void AgentControl::AgentThread(const boost::shared_ptr<Client> &client)
+{
+ boost::barrier *currentBarrier = mThreadBarrierNew;
+
+ while(client->socket->isOpen())
+ {
+ WaitSlave(currentBarrier);
+
+ //StartCycle not parallel:
+ // parser and agentState::addMessage not thread safe.
+ // additional synchronization required -> no speed-up !
+ if(mThreadAction == STARTCYCLE)
+ {
+
+ TBufferMap::iterator buf = mBuffers.find(client->addr);
+ if(buf != mBuffers.end())
+ {
+ boost::shared_ptr<NetBuffer>& netBuff = buf->second;
+ if (netBuff.get() != 0 && !netBuff->IsEmpty())
+ StartCycle(client, netBuff);
+ }
+
+ }
+
+ // SenseAgent not parallel: not enough computation, no speed-up !
+ else if(mThreadAction == SENSEAGENT)
+ {
+
+
+ std::string& senses = mClientSenses[client->id];
+ if (!senses.empty())
+ SendClientMessage(client, senses);
+
+ }
+
+ // Here we get a speed-up !
+ else if(mThreadAction == ENDCYCLE)
+ {
+
+ EndCycle(client);
+
+ }
+
+ WaitSlave(currentBarrier);
+ }
+
+ nThreads--;
+ if(mThreadBarrierNew != NULL)
+ GetLog()->Error()
+ << "(AgentControl) ERROR mThreadBarrierNew!=NULL!"
+ << " Agents connecting/disconnecting in same frame !\n";
+ mThreadBarrierNew = new boost::barrier(nThreads+1);
+ currentBarrier->wait();
+}
+
+void AgentControl::WaitMaster()
+{
+ if(mThreadBarrierNew != NULL)
+ {
+ boost::barrier *oldBarrier = mThreadBarrier;
+ mThreadBarrier = mThreadBarrierNew;
+ oldBarrier->wait();
+ mThreadBarrier->wait();
+ delete oldBarrier;
+ mThreadBarrierNew = NULL;
+ }
+ else
+ mThreadBarrier->wait();
+}
+
+void AgentControl::WaitSlave(boost::barrier* ¤tBarrier)
+{
+ currentBarrier->wait();
+ if(currentBarrier != mThreadBarrier)
+ {
+ currentBarrier = mThreadBarrier;
+ currentBarrier->wait();
+ }
+}
Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol.h
===================================================================
--- trunk/spark/lib/oxygen/simulationserver/agentcontrol.h 2012-04-09 20:46:44 UTC (rev 303)
+++ trunk/spark/lib/oxygen/simulationserver/agentcontrol.h 2012-04-09 21:00:07 UTC (rev 304)
@@ -23,6 +23,7 @@
#include "netcontrol.h"
#include <oxygen/oxygen_defines.h>
#include <oxygen/gamecontrolserver/gamecontrolserver.h>
+#include <boost/thread/barrier.hpp>
namespace oxygen
{
@@ -56,12 +57,34 @@
/** sets the AgentControl's sync mode */
void SetSyncMode(bool syncMode);
+ /** sets the AgentControl's sync mode */
+ void SetMultiThreaded(bool multiThreaded);
+
protected:
virtual void OnLink();
/** returns if the agents are synced with the srever */
bool AgentsAreSynced();
+ /** the thread function which does EndCycle for one agent in
+ * multi-threaded mode. */
+ void AgentThread(const boost::shared_ptr<Client> &client);
+
+ /** forwards all pending messages from a specific agent to the
+ GameControlServer */
+ void StartCycle(const boost::shared_ptr<Client> &client,
+ boost::shared_ptr<NetBuffer> &netBuff);
+
+ /** generates sense updates for a specific agent */
+ void EndCycle(const boost::shared_ptr<Client> &client);
+
+ /** waits for all agent threads to catch up */
+ void WaitMaster();
+
+ /** called in an agent thread to wait for the master thread to signal
+ * a new task */
+ void WaitSlave(boost::barrier* ¤tBarrier);
+
protected:
/** cached reference to the GameControlServer */
CachedPath<GameControlServer> mGameControlServer;
@@ -75,6 +98,20 @@
* proceed to the next cycle
*/
bool mSyncMode;
+
+ /** indicates if the AgentControl runs in multi-threads */
+ bool mMultiThreads;
+
+ /** barrier object for synchronizing threads in multi-threaded mode */
+ boost::barrier *mThreadBarrier;
+ boost::barrier *mThreadBarrierNew;
+
+ /** boost thread group to create a new thread when an agent connects */
+ boost::thread_group mThreadGroup;
+ int nThreads;
+
+ /** indicates what should happen in the agent thread right now */
+ enum { STARTCYCLE, SENSEAGENT, ENDCYCLE } mThreadAction;
};
DECLARE_CLASS(AgentControl);
Modified: trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp
===================================================================
--- trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp 2012-04-09 20:46:44 UTC (rev 303)
+++ trunk/spark/lib/oxygen/simulationserver/agentcontrol_c.cpp 2012-04-09 21:00:07 UTC (rev 304)
@@ -35,8 +35,22 @@
return true;
}
+FUNCTION(AgentControl, setMultiThreaded)
+{
+ bool inSet;
+
+ if ((in.GetSize() != 1) || (!in.GetValue(in[0], inSet)))
+ {
+ return false;
+ }
+
+ obj->SetMultiThreaded(inSet);
+ return true;
+}
+
void CLASS(AgentControl)::DefineClass()
{
DEFINE_BASECLASS(oxygen/NetControl);
DEFINE_FUNCTION(setSyncMode);
+ DEFINE_FUNCTION(setMultiThreaded);
}
Modified: trunk/spark/spark/spark.rb
===================================================================
--- trunk/spark/spark/spark.rb 2012-04-09 20:46:44 UTC (rev 303)
+++ trunk/spark/spark/spark.rb 2012-04-09 21:00:07 UTC (rev 304)
@@ -44,6 +44,7 @@
$agentType = 'tcp'
$agentPort = 3100
$agentSyncMode = false
+$threadedAgentControl = true
# (MonitorControl) constants
#
@@ -311,6 +312,7 @@
agentControl.setServerPort($agentPort)
agentControl.setStep($agentStep)
agentControl.setSyncMode($agentSyncMode)
+ agentControl.setMultiThreaded($threadedAgentControl)
end
if ($agentType == 'udp')
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|