[Aglets-commits] aglets/src/com/ibm/aglets/thread AgletThread.java, 1.1, 1.2 AgletThreadPool.java,
Status: Beta
Brought to you by:
cat4hire
|
From: Luca F. <cat...@us...> - 2009-07-28 07:05:09
|
Update of /cvsroot/aglets/aglets/src/com/ibm/aglets/thread In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv18746/src/com/ibm/aglets/thread Modified Files: AgletThreadPool.java Added Files: AgletThread.java Log Message: Merge of experimental branch. Index: AgletThreadPool.java =================================================================== RCS file: /cvsroot/aglets/aglets/src/com/ibm/aglets/thread/AgletThreadPool.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -d -r1.2 -r1.3 --- AgletThreadPool.java 27 Jul 2009 10:31:42 -0000 1.2 +++ AgletThreadPool.java 28 Jul 2009 07:04:54 -0000 1.3 @@ -1,281 +1,269 @@ package com.ibm.aglets.thread; +import org.aglets.log.*; - -import com.ibm.aglets.*; +import com.ibm.aglet.AgletException; +import com.ibm.aglet.message.MessageManager; +import com.ibm.aglets.MessageManagerImpl; import java.util.*; -import com.ibm.aglet.AgletProxy; -import com.ibm.aglet.MessageManager; -import com.ibm.aglet.message.*; /** - * This class represents a pool for a set of aglet threads. Agents - * and message managers can pop and push threads from the pool - * in order to handle execution threads. Using a singleton pool - * like this avoid the creation of a lot of threads within the - * platform. - * @author Luca Ferrari cat...@us... - * 28-mag-2005 - * @version 1.0 + * A thread pool that provides pooling base mechanism for aglet threads. The idea behind this pool is + * quite simple: once a new thread is required the pool must be able to provide it. Provide a + * thread means either: + * a) provide an already created and idle thread; + * b) create a new thread if no one idle thread is available. + * Once a thread has finished its work it must be inserted again in the pool, thus it can be + * used in further thread needed conditions. + * @author Luca Ferrari - cat...@us... + * */ public class AgletThreadPool { - - /** - * A self refernce to myself, used to create - * a singleton. - */ - private static AgletThreadPool myself = null; - - /** - * The real pool that will contain the threads. - */ - private Stack pool = null; - - - /** - * A pool of special threads, used to deliver a single message to an agent. - */ - private Stack deliveryPool = null; - - /** - * The max number of thread this pool must handle. - */ - private int size; - - /** - * The number of thread handled currenlty. - */ - private int handled; - - - /** - * Base constructor: it creates the pool-stack and initializes - * the size of the pool. - * @param size the max number of threads this pool can create - */ - protected AgletThreadPool(int size){ - super(); - this.size = size; - AgletThreadPool.myself = this; - this.pool = new Stack(); - this.deliveryPool = new Stack(); - this.handled = 0; - } - - - - /** - * Obtains the current instance of the pool. If no instance have been - * created yet, creates an instance for 10 threads. - * @return the reference to the pool - */ - public static AgletThreadPool getInstance(){ - if( AgletThreadPool.myself != null ){ - return AgletThreadPool.myself; - } - else{ - synchronized (AgletThreadPool.class){ - AgletThreadPool.myself = new AgletThreadPool(10); - return AgletThreadPool.myself; - - } - } - - } + // the logger for this pool + private AgletsLogger logger = AgletsLogger.getLogger(AgletThreadPool.class.getName()); + + /** + * The minimum size of the pool, that is the minimum number thread this pool must + * create on startup. + */ + private int minPoolSize = 10; + + /** + * The max size of the pool, that is the max number of thread this pool can contain. + * After such number has been reached, the pool must suspend each request of a new thread + * until one becomes idle (and thus available) again. + */ + private int maxPoolSize = 100; + + /** + * A self reference to this object. This class is supposed to be a singleton, + * thus only one pool can exists in the whole system. + */ + private static AgletThreadPool mySelf = null; + + /** + * The pool will keep the threads into a stack container. + */ + private Stack<AgletThread> threads = null; + + /** + * A thread group that contains all the threads this pool will create. + */ + private ThreadGroup threadGroup = new ThreadGroup("AgletThreadGroup"); + + /** + * A counter that indicates how many threads this pool has created until now. + * It is useful for checking that the pool has not gone over the maxPoolSize value. + */ + private int createdThread = 0; + + /** + * A counter that provides an information about the number of busy threads in the pool. + */ + private int busyThreads = 0; + + /** + * Initializes the structures of the pool and creates the first threads (the minimum + * available threads) that will be used. + * + */ + private AgletThreadPool(){ + super(); + this.threads = new Stack(); + // create the min threads + logger.info("AgletThreadPool starting with " + this.minPoolSize + " min threads"); + for(int i=0; i<this.minPoolSize; i++) + this.createNewThread(); - /** - * A service to get an AgletThread. If possible, a thread is exctracted from the - * queue, otherwise a new thread is created. If the maximum count of the pool is reached, - * the process is suspended (wait) until a new thread becomes available - * in the pool. - * <B>Important</B>: in a shared thread environment like this, it is important - * to set the right message manager for the thread, otherwise messages - * will be delivered to the wrong aglet. - * @param group the group of the (in case) new thread - * @param manager the message manager that must be associated to the thread - * @return - */ - protected synchronized final AgletThread getThread(ThreadGroup group, MessageManager manager){ - // as first I need to check if there's an available - // thread in the pool - - AgletThread t = null; - - - if( this.pool.size() >= 1 ){ - // I've got at least one ready thread, get it - t = (AgletThread) this.getWaitingThread(); - } - else{ - // if here there's no thread in the pool, I should - // create a new thread but it depends on the - // number of threads I've already created. - - // If the number of created thread has reached the - // maximun count, then stop the creation and wait - // until a new thread becomes available - if( this.handled == this.size ){ - try{ - while( this.pool.size() == 0){ - this.wait(); - } - - // if here at least one thread is in the pool - t = (AgletThread)this.getWaitingThread(); - - }catch(InterruptedException ex){ - System.err.println("Cannot wait for another thread in the pool"); - ex.printStackTrace(); - return null; - } - } - else{ - // i can create a new thread - t = new AgletThread(group, manager); - t.setMessageManager(manager); - // should I change the group of the thread???? - this.handled++; - - } - } - - - // if here I've got the thread - t.setMessageManager(manager); - return t; + logger.info("AgletThreadPool ready"); + } + + /** + * An utility method that creates a new thread, sets the group of the thread and + * pushes it into the stack, thus the new thread is available to the pool. Moreover, + * this method increases the createdThread value, thus to store the number of threads + * created by this pool. + * + */ + protected void createNewThread(){ + logger.debug("Creating a new thread (thread number " + this.createdThread + ")"); + AgletThread thread = new AgletThread(this.threadGroup); + thread.setName("PooledAgletThread num." + this.createdThread); + thread.setDaemon(true); // all the pooled thread are daemons, they are utility threads + this.threads.push(thread); + this.createdThread++; + } + + /** + * Creates a new instance of the pool and returns it to the caller. + * @return the instance of the thread pool for the running platform. + */ + public synchronized static AgletThreadPool getInstance(){ + // check if the pool is already ready + if( mySelf != null ) + return mySelf; + else{ + mySelf = new AgletThreadPool(); + return mySelf; } + } + + + /** + * Provides a new AgletThread to the caller. The AgletThread will be used combined with + * the specified message manager (that must be not-null). This method checks the pool to see if a + * good thread is contained in it, and thus such thread is returned. Otherwise, if possible (i.e., + * the pool has not yet reached the max size) a new thread is created. In the case there are no + * available threads and no more thread can be created (due to the reach of the max pool size), the + * caller thread is suspended waiting for a new thread to be available. + * @param messageManager the message manager that will be used for the new thread + * @return the AgletThread to use to manage messages. + * @throws AgletException if the specified message manager is null, or a problem occurs while + * waiting for a new thread to be available on the pool. + */ + public synchronized AgletThread pop(MessageManager messageManager) throws AgletException{ + // first of all check if the message manager is valid + if( (messageManager == null) || (! (messageManager instanceof MessageManagerImpl)) ) + throw new AgletException("Cannot get a thread for a null message manager"); + + // the thread to return... + AgletThread thread = null; - - /** - * A service to extract a waiting thread from the pool. It is important - * to note that once placed in the pool the thread could be not yet sleeping, - * that means it is going to call the wait method, but it hasn't done yet. - * For this reason this method searches in the pool for a thread that - * is effectively waiting (i.e., it has done the wait call). The loop is - * performed twice on the pool, to avoid infinite loop. - * @return the first waiting thread in the pool - */ - protected final AgletThread getWaitingThread(){ - AgletThread t = null; - boolean find = false; - - if( this.pool != null && this.pool.size() >= 1){ - int looper = this.pool.size()*2; - - while( find == false && looper >= 0){ - t = (AgletThread) this.pool.pop(); - looper--; - - if( t.isWaiting() == false){ - // re-insert the thread in the pool - this.pool.push(t); - } - else - find = true; - } - } - - return t; + // check the size of the stack/pool. If it is zero there are not threads available + // thus a new thread should be created, but this only if the number of created + // threads does not exceed the maxPoolSize, otherwise it is required to wait... + if( (this.threads.size() == 0) && (this.createdThread < this.maxPoolSize) ){ + // create a new thread + this.createNewThread(); } + else + while( (this.threads.size() == 0) && (this.createdThread > this.maxPoolSize) ){ + // I cannot create no more threads, the max size of the pool has already been + // reached, thus the caller must wait until a new thread is available + logger.debug("Waiting for a thread to re-enter the pool and be available..."); + try{ + this.wait(); + }catch(InterruptedException ex){ + logger.error("Exception caught while waiting for a thread to be available in the pool!", ex); + throw new AgletException(); + } + + } // end of while + // now if I'm here there must be at least an available thread in the pool, pop + // it and return it to the caller. + logger.debug("Popping a thread from the pool"); + thread = this.threads.pop(); + this.busyThreads++; + logger.debug("Returning the thread " + thread); - /** - * A method to place a thread in the pool, that means releasing - * a thread thus other agents/message managers can use it. The thread - * is inserted in the pool only if it does not already is in the pool - * and if the max number of thread managed from this pool has not been reached - * yet (to avoid continue pushing of new threads). In the case - * a process is waiting for a thread (wiat), it is notified. - * @param thread the thread to release - * @return true if the thread is placed in the pool, false otherwise. - */ - public synchronized boolean push(AgletThread thread){ - // check the argument - if( thread == null ){ - return false; - } - - // check if the thread is already contained in the pool - // or if the maximum size of the pool has been reached - // (i.e., avoid continue pushing of new threads!) - if( this.pool.contains(thread) || this.pool.size() == this.size ){ - return false; - } - - // now insert the thread in the pool - this.pool.push(thread); - - // notify sleeping threads - this.notifyAll(); - - - return true; - } - + // sets the message manager for this thread + thread.setMessageManager((MessageManagerImpl)messageManager); - /** - * A service to get a thread from the pool. - * @param manager the message manager to use with the thread - * @return the thread obtained from the pool (could be a new thread). - */ - public AgletThread pop(MessageManager manager){ - AgletThread t = this.getThread(Thread.currentThread().getThreadGroup(), manager); - //t.notifyAll(); - return t; - } + // all done! + return thread; + } + + /** + * Inserts (or re-inserts) a thread into the pool, making it available for other + * requests. Please note that if the pool already contains a reference to such thread + * or a number of threads greater than the maxPoolSize have been pushed, the pushing is aborted. + * An exception is thrown if it is forced to push a thread that does not belong to the + * thread group of the pooled threads. + * @param thread the thread to insert into the pool. + * @exception if the thread group is different from the one the pool has set at the thread creation. + */ + public synchronized void push(AgletThread thread) throws AgletException{ + // check if the thread is valid, or if it is already contained in the pool + // or if too much threads have been already be pushed in the pool! + if( thread == null || this.contains(thread) || this.threads.size() > this.maxPoolSize) + return; - protected void dumpPool(){ - for(int i=0; i<this.pool.size(); i++){ - System.out.println("Thread in pool: "+this.pool.elementAt(i)); - } + // check if the thread group is strictly the same of the pool thread group + if( this.threadGroup.equals(thread.getThreadGroup()) == false ){ + logger.error("Trying to push a thread with a group different from the pool thread group!"); + throw new AgletException("Trying to push a thread with a group different from the pool thread group!"); } + // simply push the thread to the stack + logger.debug("Pushing back the thread in the pool"); + thread.setMessageManager(null); + this.threads.push(thread); + this.busyThreads--; + } + + /** + * Gets back the maxPoolSize. + * @return the maxPoolSize + */ + public synchronized int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Sets the maxPoolSize value. + * @param maxPoolSize the maxPoolSize to set + */ + public synchronized void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + /** + * Gets back the minPoolSize. + * @return the minPoolSize + */ + public synchronized int getMinPoolSize() { + return minPoolSize; + } + + /** + * Sets the minPoolSize value. + * @param minPoolSize the minPoolSize to set + */ + public synchronized void setMinPoolSize(int minPoolSize) { + this.minPoolSize = minPoolSize; + } + + /** + * Gets back the threadGroup. + * @return the threadGroup + */ + public synchronized final ThreadGroup getThreadGroup() { + return threadGroup; + } + + /** + * Checks if a specific thread is currently owned by the thread pool. + * @param thread the thread to check + * @return true if the thread is currently handled by this pool + */ + public synchronized final boolean contains(AgletThread thread){ + if( thread == null || this.threads.size() == 0 ) + return false; + else + return this.threads.contains(thread); + } + + + /** + * Provides the information about how many threads are busy at the moment + * in the pool. + * @return the number of busy threads + */ + public synchronized final int getBusyThreadsNumber(){ + return this.busyThreads; + } + + /** + * Returns the number of created threads of this pool. + * @return the number of threads that this pool has created up to now + */ + public synchronized int getCreatedThread() { + return this.createdThread; + } + - /** - * Returns a delivery message thread. Please note that this code does not keep into account of the size - * of the thread pool, that means, each time a delivery thread is required and no one is available, it - * will be created. This is due to the fact that the number of delivery thread created should be less - * than the number of thread used to execute agents. - * @param proxy the proxy for the delivery thread. - * @param message the message to deliver. - * @return - */ - public DeliveryMessageThread getDeliveryMessageThread(AgletProxy proxy, Message message){ - // check parameters - if( proxy == null || message == null ) - return null; - - // synchornize - synchronized(this){ - if( this.deliveryPool.size() == 0 ){ - // no delivery thread yet created, create a new one - DeliveryMessageThread dmt = new DeliveryMessageThread(proxy, message); - return dmt; - } - else{ - // we have a thread in the pool, get it - DeliveryMessageThread dmt = (DeliveryMessageThread) this.deliveryPool.pop(); - return dmt; - } - } - } - - - /** - * Re-insert a thread in the pool of the delivery thread. - * @param thread the thread to re-insert. - */ - public void pushDeliveryMessageThread(DeliveryMessageThread thread){ - if( thread != null && this.deliveryPool != null && this.deliveryPool.contains(thread) == false ) - synchronized(this){ - this.deliveryPool.push(thread); - } - else - return; - } - - } --- NEW FILE: AgletThread.java --- package com.ibm.aglets.thread; /* * @(#)AgletThread.java * * IBM Confidential-Restricted * * OCO Source Materials * * 03L7246 (c) Copyright IBM Corp. 1996, 1998 * * The source code for this program is not published or otherwise * divested of its trade secrets, irrespective of what has been * deposited with the U.S. Copyright Office. */ import org.aglets.log.AgletsLogger; import com.ibm.aglet.Aglet; import com.ibm.aglet.AgletException; import com.ibm.aglet.InvalidAgletException; import com.ibm.aglet.message.Message; import com.ibm.aglet.message.MessageManager; import com.ibm.aglets.LocalAgletRef; import com.ibm.aglets.MessageImpl; import com.ibm.aglets.MessageManagerImpl; /** * The thread for message handling. * * * Notes from Luca Ferrari - cat...@us...: * * This class is a specific thread used to manage the message delivery. Once an agent must receive/process * a message, a thread is activated and used to process such message. In other words, the handleMessage() method * of an aglet is run on top of one of AgletThread. * * Please note that the thread management involves the message manager and the message itself, since * the steps are the following: * 1) a message is delivered from a sender to the destination agent. The addressee agent stores the message * into a message queue, hold from the message manager. * 2) the message manager, once one or more thread have been stored in the queue, pops a thread and start * processing such threads one by one. In particular: * a) the message is passed to the thread; * b) the thread invokes the handle() method on the message itself * c) the message invokes the handleMessage() on the aglet, passing itself as argument * d) the thread exits the monitor, that is informs the message manager that the above message * has been delivered. The message manager processes another message or leaves the thread. In the first case * there is a chain that produces that until the message queue is empty the thread is hold to process * messages. In the second case the thread is free, and the message manager waits until a new message comes. * * * In the 2.0.2 schema each agent has its own threadSpool, that is a stack of threads used to manage * only messages related to the owner agent. Once the thread has delivered the message, it is pushed back into * such stack (that is contained in the message manager). * * In the 2.1.0 schema there is a thread pool that, globally, provides threads for the whole messaging system. Thus * the message manager does not handle any more a private stack of threads but requires them to the pool. * Once the thread has delivered the message and no more messages must be processed for this agent, the * message manager pushes it back in the thread pool This allows a thread * to be used for different agents at different times. Please note that this implies that a thread must know not only * the message it is going to process, but also the message manager that oredered that, for coherence. * The thread has also two ways of locking: * _ processing = it is processing a message, thus it cannot receive changes about the message itself or the message manager; * _ changing = it is changing either the message manager or the message to process and thus cannot process it. * * Please note that the message manager will push and pop the thread again when it process a next message, this can bring to * situations where the next message is processed by a different thread and, in general, wastes a little resources. Maybe this will be * fixed in the future. * * 07/ago/07 */ public final class AgletThread extends Thread { private boolean valid = true; private boolean start = false; private boolean loop_started = false; private MessageManagerImpl messageManager = null; private MessageImpl message = null; /** * A counter about the number of created threads. */ private static int count = 1; // counts the number of thread created. /** * The logger of this class of thread(s). */ private static AgletsLogger logger = AgletsLogger.getLogger("com.ibm.aglets.thread.AgletThread"); /** * The number of messages this thread has handled, just for a little statistic * count. */ private int messageHandled = 0; /** * Indicates if the thread is processing or not the current message. */ private boolean processing = false; /** * Indicates if the something external to this thread is going to change the message to handle. */ private boolean changing = false; /** * This flag indicates if the thread is managing a reentrant message. */ private boolean reentrant = false; /** * Builds a thread knowing only the group. Used from the thread factory. * @param group the thread group. */ public AgletThread(ThreadGroup group){ super(group, "AgletThread num.:" + (count++) ); this.messageManager = null; this.setPriority(group.getMaxPriority()); } public AgletThread(ThreadGroup group, MessageManager m) { super(group, "No." + (count++) + ']'); messageManager = (MessageManagerImpl)m; setPriority(group.getMaxPriority()); } public static MessageImpl getCurrentMessage() { Thread t = Thread.currentThread(); if (t instanceof AgletThread) { AgletThread at = (AgletThread) t; return at.getMessage(); } return null; } /** * This method is called each time a message must be processed. The idea is that * this method should be called thru the proxy chain, and from this method the thread * must be started/restarted and should process the message itself. Please note that this * method is called within the sender's thread stack, and the message must be processed * within the receiver thread stack. * @param msg the message to process. */ public void handleMessage(MessageImpl msg) { // check if the message is valid if( msg == null ) return; logger.debug("Handling a message => " + msg); // set the message to use this.setMessage(msg); } synchronized public void invalidate() { // Debug.check(); if (valid) { valid = false; start = true; notifyAll(); } // Debug.check(); } /** * The execution cycle of this thread. The thread will continue to process * messages until it gets no more messages, and then it will suspend itself * waiting for a new message to come. */ public void run() { // if the loop of handing messages is already started return, so thus // no more than one run call can be done. if (loop_started) { // to assure that aglet cannot call run on this thread. return; } // set this thread as "started to handle messages" loop_started = true; start = false; // get the reference of the agent behind the message manager if( this.messageManager == null ) return; // the message manager is not valid! try { while (valid) { try { logger.debug("AgletThread is starting processing"); this.setReentrant(false); // if the process is here and is re-entrant now I'm processing // a re-entrant message, thus after this I have to suspend myself. this.setProcessing(true); // get the right reference to the aglet behind the current // message manager. This must be done each time in the cycle because // the thread could be suspended or the message manager could be changed // if the thread has passed thru the pool. MessageManagerImpl manager = this.getMessageManager(); logger.debug("The message manager is " + manager + ", the message is " + message); LocalAgletRef ref = manager.getAgletRef(); message.handle(ref); // handle the message this.messageHandled++; // increment the number of messages handled by this thread synchronized(this){ if( ! this.isReentrant() ){ message = null; // invalidate the message so to not repeat the handling logger.debug("AgletThread has invalidate the message just processed (no reentrant find!)"); } } this.setProcessing(false); logger.debug("AgletThread finished processing a message"); } catch (RuntimeException ex) { logger.error("Exception caught while processing a message", ex); valid = false; throw ex; } catch (Error ex) { logger.error("Error caught while processing a message"); valid = false; throw ex; } catch (InvalidAgletException ex) { logger.error("Exception caught while processing a message", ex); valid = false; start = true; } finally { // if the thread is valid, i.e., it has not been stopped // then invoke special methods on the message manager to process // another message (thus once the thread has been activated all messages are processed) // or to process another message (if present) and to push back the thread in the pool. if (valid && (! this.isReentrant())) { // push the thread back into the pool... logger.debug("The thread is going to be pushed back in the pool..."); messageManager.pushThreadAndExitMonitorIfOwner(this); } else { // process one more message... messageManager.exitMonitorIfOwner(); } } // here the message has been processed, thus I can suspend myself // waiting for a new message to process synchronized (this) { while (valid && this.message == null && (! this.isReentrant())) { try { logger.debug("Thread suspending waiting for a next message..."); this.wait(); } catch (InterruptedException ex) { logger.error("Exception caught while waiting for an incoming message", ex); } } } } } finally { // messageManager.removeThread(this); message = null; // Debug.end(); } } /** * Provides a printable version of this thread and its state. */ public String toString() { StringBuffer buffer = new StringBuffer(100); buffer.append("[" + this.getClass().getName() + "] "); buffer.append("Thread number "); buffer.append(count); if( this.messageManager != null ){ buffer.append("\n\tMessageManager: )"); buffer.append(this.messageManager); } if( this.message != null ){ buffer.append("\n\tMessage: "); buffer.append(this.message); } buffer.append("\n\t\tvalid: " + this.valid); buffer.append("\n\t\tmessageHandled: " + this.messageHandled); buffer.append("\n\t\tstart: " + this.start); buffer.append("\n"); return buffer.toString(); } /** * Gets back the message. * @return the message */ protected synchronized MessageImpl getMessage() { return message; } /** * Sets the message value. * @param message the message to set */ protected synchronized void setMessage(MessageImpl message) { try{ // first of all check if the thread is alive and is already processing // a message, in such case wait until it has finished of processing // the message, then change it. while( this.isAlive() && this.isProcessing() && (! this.isReentrant() )){ logger.debug("Thread waiting to set the message to process..."); this.wait(); // suspend myself until the current message has been processed } // substitute the message this.setChanging(true); logger.debug("Setting the message " + message + " for the current thread"); this.message = message; this.start = true; // no more changing the message to handle this.setChanging(false); // is the thread running? if( ! this.isAlive() ) this.start(); // resume suspended threads this.notifyAll(); }catch(InterruptedException e){ logger.error("Exception caught while trying to set a new message to process.", e); } } /** * Gets back the messageManager. * @return the messageManager */ protected synchronized MessageManagerImpl getMessageManager() { return messageManager; } /** * Sets the messageManager value. * @param messageManager the messageManager to set */ protected synchronized void setMessageManager(MessageManagerImpl messageManager) { try{ // first of all check if the thread is alive and is already processing // a message, in such case wait until it has finished of processing // the message, then change it. while( this.isAlive() && this.isProcessing() ){ this.wait(); // suspend myself until the current message has been processed } // substitute the message this.setChanging(true); this.messageManager = messageManager; this.start = true; // no more changing the message to handle this.setChanging(false); // resume suspended threads this.notifyAll(); }catch(InterruptedException e){ logger.error("Exception caught while trying to set a new messageManager.", e); } } /** * Used to understand if the current thread is processing a message or not, and thus if it is * safe to change the message this thread is handling. * @return the processing */ protected synchronized boolean isProcessing() { return processing; } /** * Sets the processing value. * @param processing the processing to set */ protected synchronized void setProcessing(boolean processing) { // check if the thread is already changing the current message, // thus I need to suspend and to wait, otherwise proceed try{ while( this.isAlive() && this.isChanging() ){ logger.debug("Thread waiting to set the processing flag..."); this.wait(); } this.processing = processing; // resume suspended threads this.notifyAll(); }catch(InterruptedException e){ logger.error("Exception caught while set processing value", e); } } /** * Gets back the changing. * @return the changing */ protected synchronized boolean isChanging() { return changing; } /** * Sets the changing value. * @param changing the changing to set */ protected synchronized void setChanging(boolean changing) { // if the thread is already processing a message I must wait try{ while( this.isAlive() && this.isProcessing() && (! this.isReentrant() ) ){ logger.debug("Thread waiting to set the changing flag..."); this.wait(); } this.changing = changing; // resume suspended threads this.notifyAll(); }catch(InterruptedException e){ logger.error("Exception caught while set the changing status", e); } this.changing = changing; } /** * Gets back the reentrant flag. * @return the reentrant */ public synchronized boolean isReentrant() { return reentrant; } /** * Sets the reentrant value. A re-entrant thread is a thread that must process another message * sent from the same thread itself to the same agent. * @param reentrant the reentrant to set */ public synchronized void setReentrant(boolean reentrant) { logger.debug("This aglet thread will process one more re-entrant message!"); this.reentrant = reentrant; } } |