From: <jom...@us...> - 2012-07-13 21:26:08
|
Revision: 1701 http://jason.svn.sourceforge.net/jason/?rev=1701&view=rev Author: jomifred Date: 2012-07-13 21:26:01 +0000 (Fri, 13 Jul 2012) Log Message: ----------- improve performance of centralised + pool of threads Modified Paths: -------------- trunk/examples/game-of-life/bin/c-build.xml trunk/src/jason/architecture/AgArch.java trunk/src/jason/asSemantics/TransitionSystem.java trunk/src/jason/environment/Environment.java trunk/src/jason/infra/centralised/RunCentralisedMAS.java Modified: trunk/examples/game-of-life/bin/c-build.xml =================================================================== --- trunk/examples/game-of-life/bin/c-build.xml 2012-07-12 15:01:11 UTC (rev 1700) +++ trunk/examples/game-of-life/bin/c-build.xml 2012-07-13 21:26:01 UTC (rev 1701) @@ -11,7 +11,7 @@ June 24, 2011 - 10:32:24 --> -<project name ="game_of_life" +<project name ="game_of_life-custom" basedir=".." default="run"> Modified: trunk/src/jason/architecture/AgArch.java =================================================================== --- trunk/src/jason/architecture/AgArch.java 2012-07-12 15:01:11 UTC (rev 1700) +++ trunk/src/jason/architecture/AgArch.java 2012-07-13 21:26:01 UTC (rev 1701) @@ -52,7 +52,7 @@ * * Users can customise the architecture by overriding some methods of this class. */ -public class AgArch implements AgArchInfraTier { +public class AgArch implements AgArchInfraTier, Comparable<AgArch> { private TransitionSystem ts = null; @@ -265,5 +265,26 @@ public int getCycleNumber() { return cycleNumber; } + + @Override + public String toString() { + return "arch-"+getAgName(); + } + @Override + public int hashCode() { + return getAgName().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj == this) return true; + if (obj instanceof AgArch) return this.getAgName().equals(((AgArch)obj).getAgName()); + return false; + } + + public int compareTo(AgArch o) { + return getAgName().compareTo(o.getAgName()); + } } Modified: trunk/src/jason/asSemantics/TransitionSystem.java =================================================================== --- trunk/src/jason/asSemantics/TransitionSystem.java 2012-07-12 15:01:11 UTC (rev 1700) +++ trunk/src/jason/asSemantics/TransitionSystem.java 2012-07-13 21:26:01 UTC (rev 1701) @@ -1103,7 +1103,7 @@ /* plus the other parts of the agent architecture besides */ /* the actual transition system of the AS interpreter */ /**********************************************************************/ - public void reasoningCycle() { + public boolean reasoningCycle() { if (logger.isLoggable(Level.FINE)) logger.fine("Start new reasoning cycle"); getUserAgArch().reasoningCycleStarting(); @@ -1130,13 +1130,13 @@ C.addExternalEv(PlanLibrary.TE_IDLE); } else { getUserAgArch().sleep(); - return; + return false; } } step = State.StartRC; do { - if (!getUserAgArch().isRunning()) return; + if (!getUserAgArch().isRunning()) return false; applySemanticRule(); } while (step != State.StartRC); @@ -1151,6 +1151,8 @@ logger.log(Level.SEVERE, "*** ERROR in the transition system. "+conf.C+"\nCreating a new C!", e); conf.C.create(); } + + return true; } // Auxiliary functions Modified: trunk/src/jason/environment/Environment.java =================================================================== --- trunk/src/jason/environment/Environment.java 2012-07-12 15:01:11 UTC (rev 1700) +++ trunk/src/jason/environment/Environment.java 2012-07-13 21:26:01 UTC (rev 1701) @@ -321,9 +321,9 @@ return c; } - @SuppressWarnings("unchecked") public boolean containsPercept(String agName, Literal per) { if (per != null && agName != null) { + @SuppressWarnings("rawtypes") List agl = (List)agPercepts.get(agName); if (agl != null) { return agl.contains(per); Modified: trunk/src/jason/infra/centralised/RunCentralisedMAS.java =================================================================== --- trunk/src/jason/infra/centralised/RunCentralisedMAS.java 2012-07-12 15:01:11 UTC (rev 1700) +++ trunk/src/jason/infra/centralised/RunCentralisedMAS.java 2012-07-13 21:26:01 UTC (rev 1701) @@ -46,12 +46,13 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; import java.util.logging.Level; @@ -438,7 +439,7 @@ } } - /** creates on thread per agent */ + /** creates one thread per agent */ private void createAgsThreads() { for (CentralisedAgArch ag : ags.values()) { ag.setControlInfraTier(control); @@ -450,98 +451,95 @@ } } - private BlockingQueue<Runnable> myAgTasks; - private BlockingQueue<Runnable> mySleepAgs; + private Set<CentralisedAgArch> sleepingAgs; + private ExecutorService executor; /** creates a pool of threads shared by all agents */ private void createThreadPool() { - myAgTasks = new LinkedBlockingQueue<Runnable>(); - mySleepAgs = new LinkedBlockingQueue<Runnable>(); - - // create a thread that - // 1. creates the pool - // 2. feeds the pool with agent reasoning cycles - new Thread("feed-pool") { - public void run() { - // initially, add all agents in the tasks - for (CentralisedAgArch ag : ags.values()) { - myAgTasks.offer(ag); - } - - // get the max number of threads in the pool - int maxthreads = 10; - try { - if (project.getInfrastructure().hasParameters()) { - maxthreads = Integer.parseInt(project.getInfrastructure().getParameter(1)); - logger.info("Creating a thread pool with "+maxthreads+" thread(s)."); - } - } catch (Exception e) { - logger.warning("Error getting the number of thread for the pool."); - } + sleepingAgs = Collections.synchronizedSet(new HashSet<CentralisedAgArch>()); - // define pool size - int poolSize = ags.size(); - if (poolSize > maxthreads) { - poolSize = maxthreads; - } - - // create the pool - ExecutorService executor = Executors.newFixedThreadPool(poolSize); - - // include tasks in the pool - while (runner != null) { - try { - executor.execute(myAgTasks.take()); - // note that the agent, when finished the cycle, - // add themselves in the myAgTasks queue - } catch (InterruptedException e) { } - } - executor.shutdownNow(); + int maxthreads = 10; + try { + if (project.getInfrastructure().hasParameters()) { + maxthreads = Integer.parseInt(project.getInfrastructure().getParameter(1)); + logger.info("Creating a thread pool with "+maxthreads+" thread(s)."); } - }.start(); + } catch (Exception e) { + logger.warning("Error getting the number of thread for the pool."); + } + + // define pool size + int poolSize = ags.size(); + if (poolSize > maxthreads) { + poolSize = maxthreads; + } - // create a thread that wakeup the sleeping agents - new Thread("wake-sleep-ag") { + // create the pool + // executor = Executors.newCachedThreadPool(); + executor = Executors.newFixedThreadPool(poolSize); + + // initially, add all agents in the tasks + for (CentralisedAgArch ag : ags.values()) { + //myAgTasks.offer(ag); + executor.execute(ag); + } + + /*new Thread("monitor") { public void run() { while (runner != null) { try { - Runnable ag = mySleepAgs.poll(); - while (ag != null) { - myAgTasks.offer(ag); - ag = mySleepAgs.poll(); - } - sleep(2000); + System.out.println("#ag:"+ags.size()); + System.out.println("#slepping ags:"+mySleepAgs.size()); + try { + ThreadPoolExecutor tp = (ThreadPoolExecutor)executor; + System.out.println("#queue:"+tp.getQueue().size()); + System.out.println("#active:"+tp.getActiveCount()); + } catch (Exception e) { } + sleep(3000); } catch (InterruptedException e) { } } } - }.start(); + }.start();*/ } /** an agent architecture for the infra based on thread pool */ private final class CentralisedAgArchForPool extends CentralisedAgArch { - boolean inSleep; @Override - public void sleep() { - mySleepAgs.offer(this); - inSleep = true; + public void sleep() { + //if (sleepingAgs.contains(this)) + // System.out.println("*** ops already slepping "+this); + sleepingAgs.add(this); } @Override public void wake() { - if (mySleepAgs.remove(this)) - myAgTasks.offer(this); + if (sleepingAgs.remove(this)) { + /*try { + ThreadPoolExecutor tp = (ThreadPoolExecutor)executor; + if (tp.getQueue().contains(this)) { + System.out.println("ops... ading ag that is already in the pool "+this); + } + } catch (Exception e) { }*/ + executor.execute(this); + } } @Override + //synchronized public void run() { if (isRunning()) { - inSleep = false; - getTS().reasoningCycle(); - if (!inSleep) - myAgTasks.offer(this); + if (getTS().reasoningCycle()) { // the agent run a cycle (did not enter in sleep) + /*try { + ThreadPoolExecutor tp = (ThreadPoolExecutor)executor; + if (tp.getQueue().contains(this)) { + System.out.println("*** ops... ading ag that is already in the pool "+this); + } + } catch (Exception e) { }*/ + executor.execute(this); + } } - } + } } protected void stopAgs() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |