From: <tho...@us...> - 2011-01-16 16:38:44
|
Revision: 4108 http://bigdata.svn.sourceforge.net/bigdata/?rev=4108&view=rev Author: thompsonbry Date: 2011-01-16 16:38:37 +0000 (Sun, 16 Jan 2011) Log Message: ----------- Working on https://sourceforge.net/apps/trac/bigdata/ticket/230 (occasional errors reported through to the SPARQL client from normal termination rooted in an interrupt of a query when a LIMIT is satisfied). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/MathBOp.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/IHaltable.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -305,17 +305,17 @@ public IRunningQuery call() throws Exception { + IRunningQuery runningSubquery = null; IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; try { final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - final IRunningQuery runningQuery = queryEngine - .eval(subQueryOp); + runningSubquery = queryEngine.eval(subQueryOp); // Iterator visiting the subquery solutions. - subquerySolutionItr = runningQuery.iterator(); + subquerySolutionItr = runningSubquery.iterator(); // Copy solutions from the subquery to the query. BOpUtility.copy(subquerySolutionItr, parentContext @@ -323,20 +323,31 @@ null/* stats */); // wait for the subquery. - runningQuery.get(); + runningSubquery.get(); // done. - return runningQuery; + return runningSubquery; } catch (Throwable t) { - /* - * If a subquery fails, then propagate the error to the - * parent and rethrow the first cause error out of the - * subquery. - */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(t)); + if (runningSubquery == null + || runningSubquery.getCause() != null) { + /* + * If things fail before we start the subquery, or if a + * subquery fails (due to abnormal termination), then + * propagate the error to the parent and rethrow the + * first cause error out of the subquery. + * + * Note: IHaltable#getCause() considers exceptions + * triggered by an interrupt to be normal termination. + * Such exceptions are NOT propagated here and WILL NOT + * cause the parent query to terminate. + */ + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt(runningSubquery.getCause())); + } + + return runningSubquery; } finally { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -27,7 +27,6 @@ package com.bigdata.bop.controller; -import java.nio.channels.ClosedByInterruptException; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -43,10 +42,8 @@ import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; -import com.bigdata.util.InnerCause; /** * For each binding set presented, this operator executes a subquery. Any @@ -425,7 +422,7 @@ throw ex; } - + if (ncopied == 0L && optional) { /* @@ -442,28 +439,47 @@ } catch (Throwable t) { - /* - * Note: SliceOp will cause other operators to be - * interrupted during normal evaluation. Therefore, while - * these exceptions should cause the subquery to terminate, - * they should not be reported as errors to the parent - * query. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class) - && !InnerCause.isInnerCause(t, ClosedByInterruptException.class)) { - - /* - * If a subquery fails, then propagate the error to the - * parent and rethrow the first cause error out of the - * subquery. - */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(t)); +// /* +// * Note: SliceOp will cause other operators to be +// * interrupted during normal evaluation. Therefore, while +// * these exceptions should cause the subquery to terminate, +// * they should not be reported as errors to the parent +// * query. +// */ +// if (!InnerCause.isInnerCause(t, InterruptedException.class) +// && !InnerCause.isInnerCause(t, BufferClosedException.class) +// && !InnerCause.isInnerCause(t, ClosedByInterruptException.class)) { +// +// /* +// * If a subquery fails, then propagate the error to the +// * parent and rethrow the first cause error out of the +// * subquery. +// */ +// throw new RuntimeException(ControllerTask.this.context +// .getRunningQuery().halt(t)); +// +// } +// +// return runningSubquery; - } - - return runningSubquery; + if (runningSubquery == null + || runningSubquery.getCause() != null) { + /* + * If things fail before we start the subquery, or if a + * subquery fails (due to abnormal termination), then + * propagate the error to the parent and rethrow the + * first cause error out of the subquery. + * + * Note: IHaltable#getCause() considers exceptions + * triggered by an interrupt to be normal termination. + * Such exceptions are NOT propagated here and WILL NOT + * cause the parent query to terminate. + */ + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt(runningSubquery.getCause())); + } + + return runningSubquery; } finally { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -28,7 +28,6 @@ package com.bigdata.bop.engine; import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -51,12 +50,11 @@ import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; -import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.service.IBigdataFederation; -import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Haltable; +import com.bigdata.util.concurrent.IHaltable; /** * Abstract base class for various {@link IRunningQuery} implementations. The @@ -184,7 +182,7 @@ * Note: This is exposed to the {@link QueryEngine} to let it cache the * {@link Future} for recently finished queries. */ - final protected Future<Void> getFuture() { + final protected IHaltable<Void> getFuture() { return future; @@ -665,7 +663,7 @@ if (runState.isAllDone()) { // Normal termination. - halt(); + halt((Void)null); } @@ -771,14 +769,14 @@ } - public void halt() { + final public void halt(final Void v) { - lock.lock(); + lock.lock(); try { // signal normal completion. - future.halt((Void) null); + future.halt((Void) v); // interrupt anything which is running. cancel(true/* mayInterruptIfRunning */); @@ -791,7 +789,7 @@ } - public Throwable halt(final Throwable t) { + final public <T extends Throwable> T halt(final T t) { if (t == null) throw new IllegalArgumentException(); @@ -802,23 +800,8 @@ try { - /* - * Note: SliceOp will cause other operators to be interrupted - * during normal evaluation so it is not useful to log an - * InterruptedException @ ERROR. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class) - && !InnerCause.isInnerCause(t, ClosedByInterruptException.class)) { - log.error(toString(), t); - // signal error condition. - return future.halt(t); - } else { - // normal termination. - future.halt((Void)null/* result */); - // the caller's cause. - return t; - } + // halt the query, return [t]. + return future.halt(t); } finally { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -27,7 +27,6 @@ */ package com.bigdata.bop.engine; -import java.nio.channels.ClosedByInterruptException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -861,20 +860,12 @@ try { t.call(); } catch(Throwable t) { - /* - * Note: SliceOp will cause other operators to be - * interrupted during normal evaluation. Therefore, while - * these exceptions should cause the query to terminate, - * they should not be reported as errors to the query - * controller. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class) - && !InnerCause.isInnerCause(t, ClosedByInterruptException.class) - ) { - // Not an error that we should ignore. - throw t; - } + halt(t); + if (getCause() != null) { + // Abnormal termination. + throw new RuntimeException(getCause()); + } + // normal termination - swallow the exception. } finally { t.context.getStats().elapsed.add(System.currentTimeMillis() - begin); @@ -911,10 +902,12 @@ * error message is necessary in order to catch errors in * clientProxy.haltOp() (above and below). */ - final Throwable firstCause = halt(ex1); + // ensure halted. + halt(ex1); + final HaltOpMessage msg = new HaltOpMessage(getQueryId(), t.bopId, - t.partitionId, serviceId, firstCause, t.sinkId, + t.partitionId, serviceId, getCause()/*firstCauseIfError*/, t.sinkId, t.sinkMessagesOut.get(), t.altSinkId, t.altSinkMessagesOut.get(), t.context.getStats()); try { @@ -1484,7 +1477,7 @@ // return sink.flush(); } - public void abort(Throwable cause) { + public void abort(final Throwable cause) { open = false; q.halt(cause); // sink.abort(cause); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -29,7 +29,6 @@ import java.util.Map; import java.util.UUID; -import java.util.concurrent.Future; import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; @@ -40,6 +39,7 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ICloseableIterator; +import com.bigdata.util.concurrent.IHaltable; /** * Non-Remote interface exposing a limited set of the state of an executing @@ -48,7 +48,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public interface IRunningQuery extends Future<Void>{ +public interface IRunningQuery extends IHaltable<Void> { /** * The query. @@ -134,35 +134,35 @@ public long getElapsed(); /** - * Cancel the running query (normal termination). - * <p> - * Note: This method provides a means for an operator to indicate that the - * query should halt immediately for reasons other than abnormal - * termination. - * <p> - * Note: For abnormal termination of a query, just throw an exception out of - * the query operator implementation. - */ - void halt(); - - /** - * Cancel the query (abnormal termination). - * - * @param t - * The cause. - * - * @return The first cause. - * - * @throws IllegalArgumentException - * if the argument is <code>null</code>. - */ - Throwable halt(final Throwable t); - - /** - * Return the cause if the query was terminated by an exception. - * @return - */ - Throwable getCause(); +// * Cancel the running query (normal termination). +// * <p> +// * Note: This method provides a means for an operator to indicate that the +// * query should halt immediately for reasons other than abnormal +// * termination. +// * <p> +// * Note: For abnormal termination of a query, just throw an exception out of +// * the query operator implementation. +// */ +// void halt(); +// +// /** +// * Cancel the query (abnormal termination). +// * +// * @param t +// * The cause. +// * +// * @return The argument. +// * +// * @throws IllegalArgumentException +// * if the argument is <code>null</code>. +// */ +// Throwable halt(final Throwable t); +// +// /** +// * Return the cause if the query was terminated by an exception. +// * @return +// */ +// Throwable getCause(); /** * Return an iterator which will drain the solutions from the query. The Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -58,13 +58,14 @@ import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; -import com.bigdata.rdf.sail.bench.NanoSparqlClient; +import com.bigdata.rdf.sail.bench.NanoSparqlServer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; import com.bigdata.util.concurrent.DaemonThreadFactory; +import com.bigdata.util.concurrent.IHaltable; /** * A class managing execution of concurrent queries against a local @@ -367,13 +368,13 @@ * enough that we can not have a false cache miss on a system which is * heavily loaded by a bunch of light queries. */ - private LinkedHashMap<UUID, Future<Void>> doneQueries = new LinkedHashMap<UUID,Future<Void>>( + private LinkedHashMap<UUID, IHaltable<Void>> doneQueries = new LinkedHashMap<UUID,IHaltable<Void>>( 16/* initialCapacity */, .75f/* loadFactor */, true/* accessOrder */) { private static final long serialVersionUID = 1L; @Override - protected boolean removeEldestEntry(Map.Entry<UUID, Future<Void>> eldest) { + protected boolean removeEldestEntry(Map.Entry<UUID, IHaltable<Void>> eldest) { return size() > 100/* maximumCacheCapacity */; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -442,20 +442,12 @@ try { t.call(); } catch(Throwable t) { - /* - * Note: SliceOp will cause other operators to be - * interrupted during normal evaluation. Therefore, while - * these exceptions should cause the query to terminate, - * they should not be reported as errors to the query - * controller. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class) - && !InnerCause.isInnerCause(t, ClosedByInterruptException.class) - ) { - // Not an error that we should ignore. - throw t; - } + halt(t); + if (getCause() != null) { + // Abnormal termination. + throw getCause(); + } + // normal termination - swallow the exception. } finally { t.context.getStats().elapsed.add(System.currentTimeMillis() - begin); @@ -486,10 +478,12 @@ * error message is necessary in order to catch errors in * clientProxy.haltOp() (above and below). */ - final Throwable firstCause = halt(ex1); + + // ensure halted. + halt(ex1); final HaltOpMessage msg = new HaltOpMessage(getQueryId(), t.bopId, - -1/*partitionId*/, serviceId, firstCause, t.sinkId, + -1/*partitionId*/, serviceId, getCause()/*firstCauseIfError*/, t.sinkId, 0/*t.sinkMessagesOut.get()*/, t.altSinkId, 0/*t.altSinkMessagesOut.get()*/, t.context.getStats()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -34,7 +34,7 @@ public void run() { if (cause == null) - q.halt(); + q.halt((Void)null); else q.halt(cause); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -27,6 +27,7 @@ package com.bigdata.bop.join; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -77,6 +78,7 @@ import com.bigdata.service.DataService; import com.bigdata.striterator.IChunkedOrderedIterator; import com.bigdata.striterator.IKeyOrder; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Haltable; import com.bigdata.util.concurrent.LatchedExecutor; @@ -743,9 +745,8 @@ * This is used for processing errors and also if this task is * interrupted (because the sink has been closed). */ - halt(t); - + // reset the unsync buffers. try { // resetUnsyncBuffers(); @@ -1056,11 +1057,9 @@ return null; } catch (Throwable t) { + + throw new RuntimeException(halt(t)); - halt(t); - - throw new RuntimeException(t); - } } @@ -1692,10 +1691,8 @@ } catch (Throwable t) { - halt(t); + throw new RuntimeException(halt(t)); - throw new RuntimeException(t); - } finally { itr.close(); @@ -1900,10 +1897,8 @@ } catch (Throwable t) { - halt(t); + throw new RuntimeException(halt(t)); - throw new RuntimeException(t); - } finally { itr.close(); @@ -2092,10 +2087,8 @@ } catch (Throwable t) { - halt(t); + throw new RuntimeException(halt(t)); - throw new RuntimeException(t); - } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -336,7 +336,7 @@ if (log.isInfoEnabled()) log.info("Slice will interrupt query."); - context.getRunningQuery().halt(); + context.getRunningQuery().halt((Void) null); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -58,13 +58,15 @@ * <p> * This class embeds certain knowledge about which exceptions may be observed * during normal termination of asynchronous processes using I/O, thread pools, - * and {@link IBlockingBuffer}s. + * and {@link IBlockingBuffer}s. See + * {@link #isNormalTerminationCause(Throwable)} for a list of the + * {@link Throwable} causes which are treated as normal termination. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: AbstractHaltableProcess.java 2265 2009-10-26 12:51:06Z * thompsonbry $ */ -public class Haltable<V> implements Future<V> { +public class Haltable<V> implements IHaltable<V> { private final transient static Logger log = Logger .getLogger(Haltable.class); @@ -140,25 +142,26 @@ * the cause out of their own context.</strong> * * @param cause - * The cause. + * The cause (required). * * @return The argument. - * - * @throws IllegalArgumentException - * if the cause is <code>null</code>. */ final public <T extends Throwable> T halt(final T cause) { final boolean didHalt; lock.lock(); try { if (didHalt = !halt) { + /* + * This is the first cause. + */ + // note the first cause (and handle an illegal null if found). firstCause = (cause != null ? cause : new IllegalArgumentException()); + // note if abnormal termination (firstCause only) + error = !isNormalTerminationCause(firstCause); try { // signal *all* listeners. halted.signalAll(); - // note if abnormal termination (firstCause only) - error = !isNormalTerminationCause(cause); } finally { halt = true; // volatile write. } @@ -178,18 +181,20 @@ return cause; } - /** - * Return unless processing has been halted. The method should be invoked - * from within the execution of the process itself so that it may notice - * asynchronous termination. It will throw out the wrapper first cause if - * the process is halted. External processes waiting on the {@link Future} - * interface should use {@link #isDone()} which does not have the semantics - * of asserting that the process should still be running. - * - * @throws RuntimeException - * wrapping the {@link #firstCause} iff processing has been - * halted. - */ + /** + * Return unless processing has been halted. The method should be invoked + * from within the execution of the process itself so that it may notice + * asynchronous termination. It will throw out the wrapped first cause if + * the process is halted. + * <p> + * Note: External processes waiting on the {@link Future} interface should + * use {@link #isDone()} which does not have the semantics of asserting that + * the process should still be running. + * + * @throws RuntimeException + * wrapping the {@link #firstCause} iff processing has been + * halted. + */ final public void halted() { if (halt) { @@ -313,24 +318,25 @@ } - /** - * Return the first {@link Throwable} which caused this process to halt, but - * only for abnormal termination. - * - * @return The first {@link Throwable} which caused this process to halt and - * <code>null</code> if the process has not halted or if it halted - * through normal termination. - */ - final public Throwable getCause() { + final public Throwable getCause() { - if (!halt) - return null; + lock.lock(); + try { + + if (!halt) + return null; - if (!error) - return null; + if (!error) + return null; - return firstCause; + return firstCause; + } finally { + + lock.unlock(); + + } + } /** @@ -344,50 +350,51 @@ } - /** - * Return <code>true</code> if the {@link Throwable} is a known normal - * termination cause for the process. The method inspects the stack trace, - * examining both the outer and {@link InnerCause}s. The following causes - * are interpreted as normal termination: - * <dl> - * <dt>{@link InterruptedException}</dt> - * <dd>The process was terminated by an interrupt. Interrupts are typically - * used to terminate asynchronous processes when their production limit has - * been satisfied or the consumer otherwise chooses to - * {@link IAsynchronousIterator#close()} the iterator through which they are - * consuming results from the process.</dd> - * <dt>{@link CancellationException}</dt> - * <dd>A process has been canceled using its {@link Future}.</dd> - * <dt>{@link ClosedByInterruptException}</dt> - * <dd>A process was interrupted during an IO operation.</dd> - * <dt>{@link RejectedExecutionException}</dt> - * <dd>A process was not executed because the pool against which it was - * submitted had been shutdown (this of course implies that the work queue - * was unbounded).</dd> - * <dt>{@link BufferClosedException}</dt> - * <dd>The {@link IBlockingBuffer} on which the process was writing was - * asynchronously closed.</dd> - * </dl> - * - * @param cause - * The {@link Throwable}. - * - * @return <code>true</code> if the {@link Throwable} indicates normal - * termination. - */ + /** + * Return <code>true</code> if the {@link Throwable} is a known normal + * termination cause for the process. The method inspects the stack trace, + * examining both the outer and {@link InnerCause}s. The following causes + * are interpreted as normal termination: + * <dl> + * <dt>{@link InterruptedException}</dt> + * <dd>The process was terminated by an interrupt. Interrupts are typically + * used to terminate asynchronous processes when their production limit has + * been satisfied or the consumer otherwise chooses to + * {@link IAsynchronousIterator#close()} the iterator through which they are + * consuming results from the process.</dd> + * <dt>{@link CancellationException}</dt> + * <dd>A process has been canceled using its {@link Future}.</dd> + * <dt>{@link ClosedByInterruptException}</dt> + * <dd>A process was interrupted during an IO operation.</dd> + * <dt>{@link BufferClosedException}</dt> + * <dd>The {@link IBlockingBuffer} on which the process was writing was + * asynchronously closed.</dd> + * <dt>{@link RejectedExecutionException}</dt> + * <dd>A process was not executed because the pool against which it was + * submitted had been shutdown (this of course implies that the work queue + * was unbounded as a bounded pool will throw this exception if the work + * queue is full).</dd> + * </dl> + * + * @param cause + * The {@link Throwable}. + * + * @return <code>true</code> if the {@link Throwable} indicates normal + * termination. + * + * @see #getCause() + */ protected boolean isNormalTerminationCause(final Throwable cause) { -// if (InnerCause.isInnerCause(cause, CancelledException.class)) -// return true; if (InnerCause.isInnerCause(cause, InterruptedException.class)) return true; if (InnerCause.isInnerCause(cause, CancellationException.class)) return true; if (InnerCause.isInnerCause(cause, ClosedByInterruptException.class)) return true; + if (InnerCause.isInnerCause(cause, BufferClosedException.class)) + return true; if (InnerCause.isInnerCause(cause, RejectedExecutionException.class)) return true; - if (InnerCause.isInnerCause(cause, BufferClosedException.class)) - return true; return false; } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/IHaltable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/IHaltable.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/IHaltable.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -0,0 +1,46 @@ +package com.bigdata.util.concurrent; + +import java.util.concurrent.Future; + +/** + * Interface extends {@link Future} and provides an interface for managing the + * termination of a process from within that process. + * + * @param <V> + * The generic type of the computation to which the {@link Future} + * evaluates. + */ +public interface IHaltable<V> extends Future<V> { + + /** + * Halt (normal termination). + */ + void halt(V v); + + /** + * Halt (exception thrown). <strong>The caller is responsible for throwing + * their given <i>cause</i> out of their own context.</strong> As a + * convenience, this method returns the given <i>cause</>. + * + * @param cause + * The cause (required). + * + * @return The argument. + */ + <T extends Throwable> T halt(T cause); + + /** + * Return the first {@link Throwable} which caused this process to halt, but + * only for abnormal termination. + * <p> + * {@link IHaltable} considers exceptions triggered by an interrupt to be + * normal termination of the process and will return <code>null</code> for + * such exceptions. + * + * @return The first {@link Throwable} which caused this process to halt and + * <code>null</code> if the process has not halted or if it halted + * through normal termination. + */ + Throwable getCause(); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -80,17 +80,16 @@ return indexManager; } - /** - * NOP (you have to test things like slices with a full integration). - */ - public void halt() { - log.warn("Mock object does not implement halt()"); - } + @Override + public void halt(Void v) { + log.warn("Mock object does not implement halt(Void)"); + } - public Throwable halt(Throwable t) { + @Override + public <T extends Throwable> T halt(T cause) { log.warn("Mock object does not implement halt(Throwable)"); - return t; - } + return cause; + } public QueryEngine getQueryEngine() { throw new UnsupportedOperationException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -814,7 +814,7 @@ * Overridden to close the sink so the slice will terminate. */ @Override - public void halt() { + public void halt(Void v) { sink.close(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -55,7 +55,7 @@ /** * Required deep copy constructor. */ - public IsLiteral(final IsInline op) { + public IsLiteral(final IsLiteral op) { super(op); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/MathBOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/MathBOp.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/MathBOp.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -23,12 +23,13 @@ */ package com.bigdata.rdf.internal.constraints; +import java.util.Map; + import org.openrdf.query.algebra.MathExpr.MathOp; import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IValueExpression; -import com.bigdata.bop.IVariable; import com.bigdata.bop.ImmutableBOp; import com.bigdata.bop.NV; import com.bigdata.rdf.internal.IV; @@ -71,6 +72,27 @@ } + /** + * Required shallow copy constructor. + * + * @param args + * The operands. + * @param op + * The operation. + */ + public MathBOp(final BOp[] args, Map<String,Object> anns) { + + super(args,anns); + + if (args.length != 2 || args[0] == null || args[1] == null + || getProperty(Annotations.OP) == null) { + + throw new IllegalArgumentException(); + + } + + } + /** * * @param left @@ -84,11 +106,8 @@ public MathBOp(final IValueExpression<IV> left, final IValueExpression<IV> right, final MathOp op) { - super(new BOp[] { left, right }, NV.asMap(new NV(Annotations.OP, op))); + this(new BOp[] { left, right }, NV.asMap(new NV(Annotations.OP, op))); - if (left == null || right == null || op == null) - throw new IllegalArgumentException(); - } // /** Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -24,13 +24,11 @@ import org.openrdf.query.algebra.And; import org.openrdf.query.algebra.Bound; import org.openrdf.query.algebra.Compare; -import org.openrdf.query.algebra.Compare.CompareOp; import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.Group; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; import org.openrdf.query.algebra.MathExpr; -import org.openrdf.query.algebra.MathExpr.MathOp; import org.openrdf.query.algebra.MultiProjection; import org.openrdf.query.algebra.Not; import org.openrdf.query.algebra.Or; @@ -43,17 +41,18 @@ import org.openrdf.query.algebra.Regex; import org.openrdf.query.algebra.SameTerm; import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.StatementPattern.Scope; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.UnaryTupleOperator; import org.openrdf.query.algebra.Union; import org.openrdf.query.algebra.ValueConstant; import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.Compare.CompareOp; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.StatementPattern.Scope; import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; import org.openrdf.query.algebra.evaluation.iterator.FilterIterator; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.serql.AnonymousVarGenerator; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpUtility; @@ -62,12 +61,12 @@ import com.bigdata.bop.IConstant; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; -import com.bigdata.bop.IPredicate.Annotations; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.IPredicate.Annotations; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.constraint.AND; import com.bigdata.bop.constraint.BOUND; @@ -91,9 +90,9 @@ import com.bigdata.rdf.sail.sop.SOp; import com.bigdata.rdf.sail.sop.SOp2BOpUtility; import com.bigdata.rdf.sail.sop.SOpTree; -import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; import com.bigdata.rdf.sail.sop.SOpTreeBuilder; import com.bigdata.rdf.sail.sop.UnsupportedOperatorException; +import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; import com.bigdata.rdf.spo.DefaultGraphSolutionExpander; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; @@ -526,9 +525,8 @@ log.info("unrecognized value in query: " + ex.getValue()); } return new EmptyIteration<BindingSet, QueryEvaluationException>(); - } catch (QueryEvaluationException ex) { - throw ex; - } catch (Exception ex) { + } catch (Throwable ex) { +// log.error("Remove log stmt:"+ex,ex);// FIXME remove this - I am just looking for the root cause of something in the SAIL. throw new QueryEvaluationException(ex); } } @@ -716,50 +714,74 @@ // Submit query for evaluation. runningQuery = queryEngine.eval(query); - - // Iterator draining the query results. - final IAsynchronousIterator<IBindingSet[]> it1 = - runningQuery.iterator(); - - // De-chunk the IBindingSet[] visited by that iterator. - final IChunkedOrderedIterator<IBindingSet> it2 = - new ChunkedWrappedIterator<IBindingSet>( - new Dechunkerator<IBindingSet>(it1)); - // Materialize IVs as RDF Values. - CloseableIteration<BindingSet, QueryEvaluationException> result = - // Monitor IRunningQuery and cancel if Sesame iterator is closed. - new RunningQueryCloseableIteration<BindingSet, QueryEvaluationException>(runningQuery, - // Convert bigdata binding sets to Sesame binding sets. - new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( - // Materialize IVs as RDF Values. - new BigdataBindingSetResolverator(database, it2).start( - database.getExecutorService()))); - -// No - will deadlock if buffer fills up -// // Wait for the Future (checks for errors). -// runningQuery.get(); - - // use the basic filter iterator for remaining filters - if (sesameFilters != null) { - for (Filter f : sesameFilters) { - if (log.isDebugEnabled()) { - log.debug("attaching sesame filter: " + f); - } - result = new FilterIterator(f, result, this); - } - } + /* + * Wrap up the native bigdata query solution iterator as Sesame + * compatible iteration w/ any filters to be interpreted by Sesame. + */ + return wrapQuery(runningQuery, sesameFilters); - return result; - } catch (Throwable t) { - if (runningQuery != null) + if (runningQuery != null) { + // ensure query is halted. runningQuery.cancel(true/* mayInterruptIfRunning */); + } +// log.error("Remove log stmt"+t,t);// FIXME remove this - I am just looking for the root cause of something in the SAIL. throw new QueryEvaluationException(t); } } + /** + * Wrap the {@link IRunningQuery#iterator()}, returning a Sesame compatible + * iteration which will visit the materialized binding sets. + * + * @param runningQuery + * The query. + * @param sesameFilters + * Any filters to be applied by Sesame. + * + * @return The iterator. + * + * @throws QueryEvaluationException + */ + private CloseableIteration<BindingSet, QueryEvaluationException> wrapQuery( + final IRunningQuery runningQuery, + final Collection<Filter> sesameFilters) throws QueryEvaluationException { + + // The iterator draining the query solutions. + final IAsynchronousIterator<IBindingSet[]> it1 = runningQuery + .iterator(); + + // De-chunk the IBindingSet[] visited by that iterator. + final IChunkedOrderedIterator<IBindingSet> it2 = + new ChunkedWrappedIterator<IBindingSet>( + new Dechunkerator<IBindingSet>(it1)); + + // Materialize IVs as RDF Values. + CloseableIteration<BindingSet, QueryEvaluationException> result = + // Monitor IRunningQuery and cancel if Sesame iterator is closed. + new RunningQueryCloseableIteration<BindingSet, QueryEvaluationException>(runningQuery, + // Convert bigdata binding sets to Sesame binding sets. + new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( + // Materialize IVs as RDF Values. + new BigdataBindingSetResolverator(database, it2).start( + database.getExecutorService()))); + + // use the basic filter iterator for remaining filters + if (sesameFilters != null) { + for (Filter f : sesameFilters) { + if (log.isDebugEnabled()) { + log.debug("attaching sesame filter: " + f); + } + result = new FilterIterator(f, result, this); + } + } + + return result; + + } + // /** // * This is the method that will attempt to take a top-level join or left // * join and turn it into a native bigdata rule. The Sesame operators Join Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -3437,6 +3437,8 @@ } catch (QueryEvaluationException e) { +// log.error("Remove log stmt"+e,e);// FIXME remove this - I am just looking for the root cause of something in the SAIL. + throw new SailException(e); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java 2011-01-16 00:59:18 UTC (rev 4107) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java 2011-01-16 16:38:37 UTC (rev 4108) @@ -2,7 +2,7 @@ import info.aduna.iteration.CloseableIteration; -import java.util.concurrent.ExecutionException; +import java.util.NoSuchElementException; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryEvaluationException; @@ -24,6 +24,12 @@ private final IRunningQuery runningQuery; private final CloseableIteration<E, X> src; private boolean checkedFuture = false; + /** + * The next element is buffered so we can always return it if the + * {@link #runningQuery} was not aborted at the time that {@link #hasNext()} + * return <code>true</code>. + */ + private E current = null; public RunningQueryCloseableIteration(final IRunningQuery runningQuery, final CloseableIteration<E, X> src) { @@ -39,25 +45,69 @@ } public boolean hasNext() throws X { - return src.hasNext(); - } - public E next() throws X { + if (current != null) { + // Already buffered. + return true; + } + + if (!src.hasNext()) { + // Source is exhausted. + return false; + } + + // buffer the next element. + current = src.next(); + + // test for abnormal completion of the runningQuery. if (!checkedFuture && runningQuery.isDone()) { try { runningQuery.get(); } catch (InterruptedException e) { + /* + * Interrupted while waiting on the Future (should not happen + * since the Future is already done). + */ throw (X) new QueryEvaluationException(e); - } catch (ExecutionException e) { - throw (X) new QueryEvaluationException(e); + } catch (Throwable e) { + /* + * Exception thrown by the runningQuery. + */ + if (runningQuery.getCause() != null) { + // abnormal termination. + throw (X) new QueryEvaluationException(runningQuery.getCause()); + } + // otherwise this is normal termination. } checkedFuture = true; } - return src.next(); + + // the next element is now buffered. + return true; + } + public E next() throws X { + + if (!hasNext()) + throw new NoSuchElementException(); + + final E tmp = current; + + current = null; + + return tmp; + + } + + /** + * Operation is not supported. + */ public void remove() throws X { - src.remove(); + + // Not supported since we are buffering ahead. + throw new UnsupportedOperationException(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |