[CJ-dev] commonjava-projects/commonjava-util/src/java/org/commonjava/util ThreadThrottle.java,1.1,1.
Brought to you by:
johnqueso
From: <joh...@co...> - 2004-02-19 14:30:29
|
Update of /cvsroot/commonjava/commonjava-projects/commonjava-util/src/java/org/commonjava/util In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2943/src/java/org/commonjava/util Modified Files: ThreadThrottle.java Log Message: flushed out most of the problems with ThreadThrottle. It should work properly now. Index: ThreadThrottle.java =================================================================== RCS file: /cvsroot/commonjava/commonjava-projects/commonjava-util/src/java/org/commonjava/util/ThreadThrottle.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- ThreadThrottle.java 18 Sep 2003 06:28:59 -0000 1.1 +++ ThreadThrottle.java 19 Feb 2004 14:20:06 -0000 1.2 @@ -53,6 +53,9 @@ this.maxPendingSize = maxPendingSize; this.maxRunningSize = maxRunningSize; this.defaultThreadWaitMillis = defaultThreadWaitMillis; + if(LOG.isDebugEnabled()){ + LOG.debug("maxPendingSize {" + maxPendingSize + "}; maxRunningSize {" + maxRunningSize + "}"); + } } /** Adjust the Pending thread limit. @@ -116,43 +119,15 @@ /** Wait for all threads throttled by this throttle are finished. * @param wait Per-thread timeout for the join operation. */ - public void joinAll(long wait){ - if(LOG.isDebugEnabled()){ - LOG.debug("Entering joinAll() with wait"); + public void joinAll(long wait) throws InterruptedException{ + if(LOG.isTraceEnabled()){ + LOG.trace("Entering joinAll() with wait"); } - while(pending.size() > 0 || running.size() > 0){ - Thread[] runningThreads = null; - - synchronized(runningLock){ - runningThreads = (Thread[])running.toArray(new Thread[running.size()]); - } - - if(runningThreads != null){ - for(int i=0, len=runningThreads.length; i<len; i++){ - if(runningThreads[i] != null){ - if(LOG.isDebugEnabled()){ - LOG.debug("joining thread: " + runningThreads[i].getName()); - } - - try{ - runningThreads[i].join(wait); - } - catch(InterruptedException ex){} - } - } - } - + if(pending.size() > 0 || running.size() > 0){ synchronized(runningLock){ - if(pending.size() > 0 && running.size() < 1){ - if(LOG.isDebugEnabled()){ - LOG.debug("waiting for idle threads to run."); - } - - try{ - runningLock.wait(wait); - } - catch(InterruptedException ex){} + while(running.size() > 0){ + runningLock.wait(wait); } } } @@ -160,43 +135,15 @@ /** Wait for all threads throttled by this throttle are finished. */ - public void joinAll(){ - if(LOG.isDebugEnabled()){ - LOG.debug("Entering joinAll()"); + public void joinAll() throws InterruptedException{ + if(LOG.isTraceEnabled()){ + LOG.trace("Entering joinAll()"); } - while(pending.size() > 0 || running.size() > 0){ - Thread[] runningThreads = null; - - synchronized(runningLock){ - runningThreads = (Thread[])running.toArray(new Thread[running.size()]); - } - - if(runningThreads != null){ - for(int i=0, len=runningThreads.length; i<len; i++){ - if(runningThreads[i] != null){ - if(LOG.isDebugEnabled()){ - LOG.debug("joining thread: " + runningThreads[i].getName()); - } - - try{ - runningThreads[i].join(); - } - catch(InterruptedException ex){} - } - } - } - + if(pending.size() > 0 || running.size() > 0){ synchronized(runningLock){ - if(pending.size() > 0 && running.size() < 1){ - if(LOG.isDebugEnabled()){ - LOG.debug("waiting for idle threads to run."); - } - - try{ - runningLock.wait(); - } - catch(InterruptedException ex){} + while(running.size() > 0){ + runningLock.wait(); } } } @@ -275,24 +222,31 @@ private Thread _getThread(Runnable runnable, String threadName, long threadWaitMillis){ Thread t = null; synchronized(pendingLock){ - long startTime = System.currentTimeMillis(); - while((pending.size() >= maxPendingSize) && - ((System.currentTimeMillis() - startTime) < threadWaitMillis)) + long start = System.currentTimeMillis(); + while(pending.size() >= maxPendingSize && + (threadWaitMillis < 1 || + System.currentTimeMillis() - start < threadWaitMillis)) { - if (LOG.isDebugEnabled()) { - LOG.debug("waiting for pending thread count to drop."); + if (LOG.isDebugEnabled()) {LOG.debug("Pending queue size: " + pending.size());} + if (LOG.isTraceEnabled()) { + LOG.trace("waiting for pending thread count to drop."); } try{ - pendingLock.wait(threadWaitMillis); + if(threadWaitMillis > 0){ + pendingLock.wait(threadWaitMillis); + } + else{ + pendingLock.wait(); + } } catch(InterruptedException ex){ } } if(pending.size() >= maxPendingSize){ - if(LOG.isDebugEnabled()){ - LOG.debug("pending.size() {" + pending.size() + "} is greater than or equal to maxPendingSize {" + maxPendingSize + "}"); + if(LOG.isTraceEnabled()){ + LOG.trace("pending.size() {" + pending.size() + "} is greater than or equal to maxPendingSize {" + maxPendingSize + "}"); } throw new EmptyStackException(); @@ -308,41 +262,26 @@ pending.add(t); } - if(LOG.isDebugEnabled()){ - LOG.debug("returning thread from " + this); + if(LOG.isTraceEnabled()){ + LOG.trace("returning thread from " + this); } return t; } - /** Callback for NotifierRunnable to adjust the counts of running and pending - * in response to the starting of the runnable's run() method. - */ - private void threadStarted(){ - synchronized(pendingLock){ - pending.remove(Thread.currentThread()); - pendingLock.notify(); - } - - synchronized(runningLock){ - running.add(Thread.currentThread()); - runningLock.notify(); - } - - synchronized(this){ - notifyAll(); - } - } - /** Callback for NotifierRunnable to adjust the count of running * in response to the finish of the runnable's run() method. */ private void threadFinished(){ + if (LOG.isTraceEnabled()) {LOG.trace("Acquiring lock on running queue to remove finished thread.");} synchronized(runningLock){ + if (LOG.isDebugEnabled()) {LOG.debug("Running queue size: " + running.size());} running.remove(Thread.currentThread()); - runningLock.notify(); + runningLock.notifyAll(); + if (LOG.isDebugEnabled()) {LOG.debug("Running queue size: " + running.size());} } + if (LOG.isTraceEnabled()) {LOG.trace("Notifying object \'this\' of finished thread.");} synchronized(this){ notifyAll(); } @@ -353,28 +292,45 @@ * of milliseconds, approximately, and fail if it is not woken before then. * @param waitMillis The number of milliseconds to wait before failing. */ - private void maybeWaitForThreshold(long waitMillis){ + private void maybeWaitAndStart() throws InterruptedException{ + boolean added = false; synchronized(runningLock){ - long startTime = System.currentTimeMillis(); - while((running.size() >= maxRunningSize) && - ((System.currentTimeMillis() - startTime) < waitMillis)) + while(running.size() >= maxRunningSize) { - if (LOG.isDebugEnabled()) { - LOG.debug("waiting for running thread count to drop."); + if (LOG.isDebugEnabled()) {LOG.debug("BEGIN WHILE: Running queue size: " + running.size());} + + if (LOG.isTraceEnabled()) { + LOG.trace("waiting for running thread count to drop."); } - try{ - runningLock.wait(waitMillis); + runningLock.wait(); + } + + // In case another thread's successful add to the running queue triggers this thread's wait() to end... + if(running.size() < maxRunningSize){ + if (LOG.isDebugEnabled()) {LOG.debug("PRE-PUSH: Running queue size: " + running.size());} + + if (LOG.isTraceEnabled()) { + LOG.trace("launching next thread"); } - catch(InterruptedException ex){} - if (LOG.isDebugEnabled()) { - LOG.debug("launching next thread"); - } + running.add(Thread.currentThread()); + added = true; + + if (LOG.isDebugEnabled()) {LOG.debug("POST-PUSH: Running queue size: " + running.size());} + } + } + + if(added){ + synchronized(pendingLock){ + if (LOG.isDebugEnabled()) {LOG.debug("POST-WAIT&START: Pending queue size: " + pending.size());} + pending.remove(Thread.currentThread()); + pendingLock.notifyAll(); + if (LOG.isDebugEnabled()) {LOG.debug("POST-WAIT&START_POST-PUSH: Pending queue size: " + pending.size());} } - if(running.size() >= maxRunningSize){ - throw new EmptyStackException(); + synchronized(this){ + notifyAll(); } } } @@ -406,18 +362,17 @@ * */ public void run() { - maybeWaitForThreshold(waitMillis); - - if(runnable instanceof EnhancedControlRunnable){ - ((EnhancedControlRunnable)runnable).preStart(); + try { + maybeWaitAndStart(); + runnable.run(); } - - threadStarted(); - runnable.run(); - threadFinished(); - - if(runnable instanceof EnhancedControlRunnable){ - ((EnhancedControlRunnable)runnable).postFinish(); + catch (InterruptedException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Thread caught InterruptedException while awaiting execution slot.", e); + } + } + finally{ + threadFinished(); } } |