From: <tho...@us...> - 2011-03-08 15:07:05
|
Revision: 4280 http://bigdata.svn.sourceforge.net/bigdata/?rev=4280&view=rev Author: thompsonbry Date: 2011-03-08 15:06:58 +0000 (Tue, 08 Mar 2011) Log Message: ----------- Integrated FutureTaskMon into several places. It provides a means of identifying callers who might have interrupted a FutureTask when the appropriate log level is set. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -50,6 +50,7 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.fed.FederatedRunningQuery; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.BufferClosedException; @@ -593,7 +594,7 @@ * responsible for communicating the changes in the query's running state * back to the {@link RunState} object on the query controller. */ - private class ChunkFutureTask extends FutureTask<Void> { + private class ChunkFutureTask extends FutureTaskMon<Void> { private final ChunkTask t; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -58,6 +58,7 @@ import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.counters.CAT; import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; @@ -614,7 +615,7 @@ */ public void init() { - final FutureTask<Void> ft = new FutureTask<Void>(new QueryEngineTask( + final FutureTask<Void> ft = new FutureTaskMon<Void>(new QueryEngineTask( priorityQueue), (Void) null); if (engineFuture.compareAndSet(null/* expect */, ft)) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -44,6 +44,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -345,7 +346,7 @@ final OperatorTask opTask = new OperatorTask(bopId, src); // Wrap task with error handling and handshaking logic. - final FutureTask<Void> ft = new FutureTask<Void>( + final FutureTask<Void> ft = new FutureTaskMon<Void>( new OperatorTaskWrapper(opTask), null/* result */); if (operatorFutures.putIfAbsent(bopId, ft) != null) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -60,6 +60,7 @@ import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.BytesUtil; import com.bigdata.btree.keys.IKeyBuilder; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.counters.CAT; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; @@ -1411,7 +1412,7 @@ for (AccessPathTask task : tasks) { - final FutureTask<Void> ft = new FutureTask<Void>(task); + final FutureTask<Void> ft = new FutureTaskMon<Void>(task); futureTasks.add(ft); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -42,6 +42,7 @@ static private final transient Logger log = Logger .getLogger(FutureTaskMon.class); + private volatile boolean didStart = false; public FutureTaskMon(Callable<T> callable) { super(callable); @@ -50,17 +51,41 @@ public FutureTaskMon(Runnable runnable, T result) { super(runnable, result); } + + /** + * {@inheritDoc} + * <p> + * Hooked to notice when the task has been started. + */ + @Override + public void run() { + didStart = true; + super.run(); + } - public boolean cancel(boolean mayInterruptIfRunning) { - if (mayInterruptIfRunning && log.isDebugEnabled()) { + /** + * {@inheritDoc} + * <p> + * Overridden to conditionally log @ DEBUG if the caller caused the task to + * be interrupted. This can be used to search for sources of interrupts. + */ + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + + final boolean didStart = this.didStart; + + final boolean ret = super.cancel(mayInterruptIfRunning); + + if (didStart && mayInterruptIfRunning && ret && log.isDebugEnabled()) { try { throw new RuntimeException("cancel call trace"); } catch (RuntimeException re) { log.debug("May interrupt running task", re); } } - - return super.cancel(mayInterruptIfRunning); + + return ret; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -27,10 +27,8 @@ package com.bigdata.concurrent; -import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Halted; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Running; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Shutdown; -import static com.bigdata.concurrent.NonBlockingLockManager.RunState.ShutdownNow; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Starting; import java.lang.ref.WeakReference; @@ -769,7 +767,7 @@ * @param <T> * The generic type of the outcome for the {@link Future}. */ - protected class LockFutureTask<T> extends FutureTask<T> { + protected class LockFutureTask<T> extends FutureTaskMon<T> { private final R[] resource; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -1422,7 +1422,7 @@ * @param <T> * The generic type of the outcome for the {@link Future}. */ - static public class LockFutureTask<R extends Comparable<R>, T> extends FutureTask<T> { + static public class LockFutureTask<R extends Comparable<R>, T> extends FutureTaskMon<T> { /** * The instance of the outer class. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -63,6 +63,7 @@ import com.bigdata.cache.ConcurrentWeakValueCache; import com.bigdata.cache.ConcurrentWeakValueCacheWithTimeout; import com.bigdata.cache.HardReferenceQueue; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.config.Configuration; import com.bigdata.config.IValidator; import com.bigdata.config.IntegerRangeValidator; @@ -3764,7 +3765,7 @@ // true the token is valid and this service is the quorum leader final boolean isLeader = quorum.getMember().isLeader(prepareToken); - final FutureTask<Boolean> ft = new FutureTask<Boolean>(new Runnable() { + final FutureTask<Boolean> ft = new FutureTaskMon<Boolean>(new Runnable() { public void run() { @@ -3842,7 +3843,7 @@ public Future<Void> commit2Phase(final long commitTime) { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { @@ -3896,7 +3897,7 @@ public Future<Void> abort2Phase(final long token) { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { getQuorum().assertQuorum(token); @@ -4011,7 +4012,7 @@ /** NOP. */ public Future<Void> bounceZookeeperConnection() { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { } }, null); @@ -4023,7 +4024,7 @@ * Does pipeline remove/add. */ public Future<Void> moveToEndOfPipeline() { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { final QuorumActor<?, ?> actor = quorum.getActor(); actor.pipelineRemove(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |