From: <tho...@us...> - 2010-09-23 20:11:07
|
Revision: 3617 http://bigdata.svn.sourceforge.net/bigdata/?rev=3617&view=rev Author: thompsonbry Date: 2010-09-23 20:10:58 +0000 (Thu, 23 Sep 2010) Log Message: ----------- Finally chased down one bug which I had introduced in QueryResultIterator. I've added a bunch of unit tests and the PipelineType annotation, which will eventually support both vectored and operator at a time evaluation. I am still chasing the bug with multiple chunk messages flowing through the query controller. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -167,7 +167,7 @@ * override this method. */ BOpEvaluationContext getEvaluationContext(); - + /** * Return <code>true</code> iff this operator is an access path which writes * on the database. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -344,7 +344,8 @@ public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append(getClass().getName()); +// sb.append(getClass().getName()); + sb.append(super.toString()); sb.append("("); for (int i = 0; i < args.length; i++) { final BOp t = args[i]; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -27,31 +27,18 @@ */ package com.bigdata.bop; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; - import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.RunningQuery; -import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; -import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.TimestampUtility; -import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.relation.locator.IResourceLocator; -import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.eval.IJoinNexus; -import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; -import com.bigdata.striterator.IKeyOrder; import com.ibm.icu.impl.ByteBuffer; /** @@ -62,18 +49,10 @@ */ public class BOpContext<E> extends BOpContextBase { - static private final Logger log = Logger.getLogger(BOpContext.class); + static private final transient Logger log = Logger.getLogger(BOpContext.class); private final IRunningQuery runningQuery; -// private final IBigdataFederation<?> fed; -// -// private final IIndexManager indexManager; -// -// private final long readTimestamp; -// -// private final long writeTimestamp; - private final int partitionId; private final BOpStats stats; @@ -95,60 +74,8 @@ public IRunningQuery getRunningQuery() { return runningQuery; } - + /** - * The {@link IBigdataFederation} IFF the operator is being evaluated on an - * {@link IBigdataFederation}. When evaluating operations against an - * {@link IBigdataFederation}, this reference provides access to the - * scale-out view of the indices and to other bigdata services. - */ - @Override - public IBigdataFederation<?> getFederation() { - return runningQuery.getFederation(); - } - - /** - * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs - * against the local indices. In scale-out, query evaluation proceeds shard - * wise and this {@link IIndexManager} MUST be able to read on the - * {@link ILocalBTreeView}. - */ - @Override - public IIndexManager getIndexManager() { - return runningQuery.getIndexManager(); - } - - /** - * Return the {@link Executor} on to which the operator may submit tasks. - * <p> - * Note: The is the {@link ExecutorService} associated with the - * <em>local</em> {@link #getIndexManager() index manager}. - */ - public final Executor getExecutorService() { - return runningQuery.getIndexManager().getExecutorService(); - } - -// /** -// * The timestamp or transaction identifier against which the query is -// * reading. -// * -// * @deprecated by {@link BOp.Annotations#TIMESTAMP} -// */ -// public final long getReadTimestamp() { -// return runningQuery.getReadTimestamp(); -// } -// -// /** -// * The timestamp or transaction identifier against which the query is -// * writing. -// * -// * @deprecated by {@link BOp.Annotations#TIMESTAMP} -// */ -// public final long getWriteTimestamp() { -// return runningQuery.getWriteTimestamp(); -// } - - /** * The index partition identifier -or- <code>-1</code> if the index is not * sharded. */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -27,7 +27,11 @@ */ package com.bigdata.bop; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + import org.apache.log4j.Logger; + import com.bigdata.bop.engine.QueryEngine; import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; @@ -53,7 +57,7 @@ */ public class BOpContextBase { - static private final Logger log = Logger.getLogger(BOpContextBase.class); + static private final transient Logger log = Logger.getLogger(BOpContextBase.class); private final QueryEngine queryEngine; @@ -63,7 +67,7 @@ * wise and this {@link IIndexManager} MUST be able to read on the * {@link ILocalBTreeView}. */ - public IIndexManager getIndexManager() { + final public IIndexManager getIndexManager() { return queryEngine.getIndexManager(); } @@ -73,11 +77,21 @@ * {@link IBigdataFederation}, this reference provides access to the * scale-out view of the indices and to other bigdata services. */ - public IBigdataFederation<?> getFederation() { + final public IBigdataFederation<?> getFederation() { return queryEngine.getFederation(); } /** + * Return the {@link Executor} on to which the operator may submit tasks. + * <p> + * Note: The is the {@link ExecutorService} associated with the + * <em>local</em> {@link #getIndexManager() index manager}. + */ + public final Executor getExecutorService() { + return getIndexManager().getExecutorService(); + } + + /** * * @param indexManager * The <strong>local</strong> {@link IIndexManager}. Query Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -27,6 +27,7 @@ package com.bigdata.bop; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -374,7 +375,7 @@ * @param op * A {@link BOp}. * - * @return The index. + * @return The index, which is immutable and thread-safe. * * @throws DuplicateBOpIdException * if there are two or more {@link BOp}s having the same @@ -412,7 +413,8 @@ throw new DuplicateBOpException(t.toString()); } } - return map; + // wrap to ensure immutable and thread-safe. + return Collections.unmodifiableMap(map); } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -209,6 +209,16 @@ protected static transient final TimeUnit chunkTimeoutUnit = TimeUnit.MILLISECONDS; /** + * Return the {@link PipelineType} of the operator (default + * {@link PipelineType#Vectored}). + */ + public PipelineType getPipelineType() { + + return PipelineType.Vectored; + + } + + /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a * given query (default <code>false</code>). Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -0,0 +1,68 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 21, 2010 + */ + +package com.bigdata.bop; + +/** + * Return the type of pipelining supported by an operator. + * <p> + * Note: bigdata does not support tuple-at-a-time processing. Only vectored and + * operator-at-a-time processing. Tuple at a time processing is generally very + * inefficient. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public enum PipelineType { + + /** + * Vectored operators stream chunks of intermediate results from one + * operator to the next using producer / consumer pattern. Each time a set + * of intermediate results is available for a vectored operator, it is + * evaluated against those inputs producing another set of intermediate + * results for its target operator(s). Vectored operators may be evaluated + * many times during a given query and often have excellent parallelism due + * to the concurrent evaluation of the different operators on different sets + * of intermediate results. + */ + Vectored, + + /** + * The operator will run exactly once and must wait for all of its inputs to + * be assembled before it runs. + * <p> + * There are some operations for which this is always true, such as SORT. + * Other operations MAY use operator-at-once evaluation in order to benefit + * from a combination of more efficient IO patterns and simpler design. + * However, pipelined operators using large memory blocks have many of the + * benefits of operator-at-once evaluation. By deferring their evaluation + * until some minimum number of source data blocks are available, they may + * be evaluated once or more than once, depending on the data scale. + */ + OneShot; + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -30,6 +30,8 @@ import java.util.Map; +import cern.colt.Arrays; + import com.bigdata.bop.AbstractChunkedOrderedIteratorOp; import com.bigdata.bop.BOp; import com.bigdata.bop.Constant; @@ -414,7 +416,15 @@ for (Map.Entry<String, Object> e : annotations.entrySet()) { if (!first) sb.append(", "); - sb.append(e.getKey() + "=" + e.getValue()); + // @todo remove relation name hack when making relation name a scalar. + if (Annotations.RELATION_NAME.equals(e.getKey()) + && e.getValue() != null + && e.getValue().getClass().isArray()) { + sb.append(e.getKey() + "=" + + Arrays.toString((String[]) e.getValue())); + } else { + sb.append(e.getKey() + "=" + e.getValue()); + } first = false; } sb.append("]"); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -35,6 +35,7 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkAccessor; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -56,6 +57,16 @@ */ private static final long serialVersionUID = 1L; + public interface Annotations extends BindingSetPipelineOp.Annotations { + + /** + * An optional {@link IConstraint}[] which places restrictions on the + * legal patterns in the variable bindings. + */ + String CONSTRAINTS = CopyBindingSetOp.class.getName() + ".constraints"; + + } + /** * Deep copy constructor. * @@ -75,10 +86,19 @@ super(args, annotations); } + /** + * @see Annotations#CONSTRAINTS + */ + public IConstraint[] constraints() { + + return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); + + } + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - return new FutureTask<Void>(new CopyTask(context)); - + return new FutureTask<Void>(new CopyTask(this, context)); + } /** @@ -90,11 +110,19 @@ static private class CopyTask implements Callable<Void> { private final BOpContext<IBindingSet> context; - - CopyTask(final BOpContext<IBindingSet> context) { - + + /** + * The constraint (if any) specified for the join operator. + */ + final private IConstraint[] constraints; + + CopyTask(final CopyBindingSetOp op, + final BOpContext<IBindingSet> context) { + this.context = context; - + + this.constraints = op.constraints(); + } public Void call() throws Exception { @@ -108,9 +136,10 @@ final IBindingSet[] chunk = source.next(); stats.chunksIn.increment(); stats.unitsIn.add(chunk.length); - sink.add(chunk); + final IBindingSet[] tmp = applyConstraints(chunk); + sink.add(tmp); if (sink2 != null) - sink2.add(chunk); + sink2.add(tmp); } sink.flush(); if (sink2 != null) @@ -124,6 +153,56 @@ } } - } + private IBindingSet[] applyConstraints(final IBindingSet[] chunk) { + + if (constraints == null) { + /* + * No constraints, copy all binding sets. + */ + + return chunk; + + } + + /* + * Copy binding sets which satisfy the constraint(s). + */ + + IBindingSet[] t = new IBindingSet[chunk.length]; + + int j = 0; + + for (int i = 0; i < chunk.length; i++) { + + final IBindingSet bindingSet = chunk[i]; + + if (context.isConsistent(constraints, bindingSet)) { + + t[j++] = bindingSet; + + } + + } + + if (j != chunk.length) { + + // allocate exact size array. + final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array + .newInstance(chunk[0].getClass(), j); + + // make a dense copy. + System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */, + 0/* dstPos */, j/* len */); + + t = tmp; + + } + + return t; + + } + + } // class CopyTask + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -54,7 +54,7 @@ * be mapped across shards or nodes as appropriate for the parent. UNION runs on * the query controller. In order to avoid routing intermediate results through * the controller, the {@link BindingSetPipelineOp.Annotations#SINK_REF} of each - * child operand should be overriden to specify the parent of the UNION + * child operand should be overridden to specify the parent of the UNION * operator. * <p> * UNION can not be used when the intermediate results must be routed into the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -67,21 +67,26 @@ * #of chunks in. */ final public CAT chunksIn = new CAT(); +// final public AtomicLong chunksIn = new AtomicLong(); /** * #of units sets in (tuples, elements, binding sets, etc). */ final public CAT unitsIn = new CAT(); +// final public AtomicLong unitsIn = new AtomicLong(); /** * #of chunks out. */ final public CAT chunksOut = new CAT(); +// final public AtomicLong chunksOut = new AtomicLong(); + /** * #of units sets in (tuples, elements, binding sets, etc). */ final public CAT unitsOut = new CAT(); +// final public AtomicLong unitsOut = new AtomicLong(); /** * Constructor. @@ -105,15 +110,20 @@ unitsIn.add(o.unitsIn.get()); unitsOut.add(o.unitsOut.get()); chunksOut.add(o.chunksOut.get()); +// chunksIn.addAndGet(o.chunksIn.get()); +// unitsIn.addAndGet(o.unitsIn.get()); +// unitsOut.addAndGet(o.unitsOut.get()); +// chunksOut.addAndGet(o.chunksOut.get()); } + public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(getClass().getName()); - sb.append("{chunksIn=" + chunksIn.estimate_get()); - sb.append(",unitsIn=" + unitsIn.estimate_get()); - sb.append(",chunksOut=" + chunksOut.estimate_get()); - sb.append(",unitsOut=" + unitsOut.estimate_get()); + sb.append("{chunksIn=" + chunksIn.get()); + sb.append(",unitsIn=" + unitsIn.get()); + sb.append(",chunksOut=" + chunksOut.get()); + sb.append(",unitsOut=" + unitsOut.get()); toString(sb); // extension hook sb.append("}"); return sb.toString(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -3,6 +3,8 @@ import java.io.Serializable; import java.util.UUID; +import com.bigdata.bop.BOp; + /** * A message sent to the {@link IQueryClient} when an operator is done executing * for some chunk of inputs. @@ -53,7 +55,7 @@ * scale-out, this is one per index partition over which the intermediate * results were mapped. */ - final public int sinkChunksOut; + final public int sinkMessagesOut; /** * The operator identifier for the alternative sink -or- <code>null</code> @@ -71,7 +73,7 @@ * results were mapped. It is zero if there was no alternative sink for the * operator. */ - final public int altSinkChunksOut; + final public int altSinkMessagesOut; /** * The statistics for the execution of the bop against the partition on the @@ -91,10 +93,19 @@ * The node which executed the operator. * @param cause * <code>null</code> unless execution halted abnormally. - * @param chunksOut - * A map reporting the #of binding set chunks which were output - * for each downstream operator for which at least one chunk of - * output was produced. + * @param sinkId + * The {@link BOp.Annotations#BOP_ID} of the default sink and + * <code>null</code> if there is no sink (for example, if this is + * the last operator in the pipeline). + * @param sinkMessagesOut + * The number of {@link IChunkMessage} which were sent to the + * operator for the default sink. + * @param altSinkId + * The {@link BOp.Annotations#BOP_ID} of the alternative sink and + * <code>null</code> if there is no alternative sink. + * @param altSinkMessagesOut + * The number of {@link IChunkMessage} which were sent to the + * operator for the alternative sink. * @param taskStats * The statistics for the execution of that bop on that shard and * service. @@ -103,8 +114,8 @@ // final UUID queryId, final int bopId, final int partitionId, final UUID serviceId, Throwable cause, // - final Integer sinkId, final int sinkChunksOut,// - final Integer altSinkId, final int altSinkChunksOut,// + final Integer sinkId, final int sinkMessagesOut,// + final Integer altSinkId, final int altSinkMessagesOut,// final BOpStats taskStats) { this.queryId = queryId; @@ -113,9 +124,9 @@ this.serviceId = serviceId; this.cause = cause; this.sinkId = sinkId; - this.sinkChunksOut = sinkChunksOut; + this.sinkMessagesOut = sinkMessagesOut; this.altSinkId = altSinkId; - this.altSinkChunksOut = altSinkChunksOut; + this.altSinkMessagesOut = altSinkMessagesOut; this.taskStats = taskStats; } @@ -128,9 +139,9 @@ if (cause != null) sb.append(",cause=" + cause); sb.append(",sinkId=" + sinkId); - sb.append(",sinkChunksOut=" + sinkChunksOut); + sb.append(",sinkChunksOut=" + sinkMessagesOut); sb.append(",altSinkId=" + altSinkId); - sb.append(",altSinkChunksOut=" + altSinkChunksOut); + sb.append(",altSinkChunksOut=" + altSinkMessagesOut); sb.append(",stats=" + taskStats); sb.append("}"); return sb.toString(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -49,4 +49,18 @@ */ void bufferReady(IChunkMessage<IBindingSet> msg) throws RemoteException; + /** + * Notify a service that the query has been terminated. The peer MUST NOT + * cancel the query synchronously as that can lead to a deadlock with the + * query controller. Instead, the peer should queue a task to cancel the + * query and then return. + * + * @param queryId + * The query identifier. + * @param cause + * The cause. When <code>null</code>, this is presumed to be + * normal query termination. + */ + void cancelQuery(UUID queryId, Throwable cause) throws RemoteException; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -56,7 +56,14 @@ IIndexManager getIndexManager(); /** - * Terminate query evaluation + * Cancel the running query (normal termination). + * <p> + * Note: This method provides a means for an operator to indicate that the + * query should halt immediately for reasons other than abnormal + * termination. + * <p> + * Note: For abnormal termination of a query, just throw an exception out of + * the query operator implementation. */ void halt(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -48,18 +48,16 @@ private static final Logger log = Logger.getLogger(PipelineUtility.class); /** - * Return <code>true</code> iff the <i>runningCountMap</i> AND - * <i>availableChunkMap</i> map are ZERO (0) for both the given operator and - * for all operators which proceed the given operator in the tree structure - * of its operands. + * Return <code>true</code> iff <i>availableChunkMap</i> map is ZERO (0) for + * the given operator and its descendants AND the <i>runningCountMap</i> is + * ZERO (0) for the operator and all descendants of the operator. For the + * purposes of this method, only {@link BOp#args() operands} are considered + * as descendants. * <p> - * Note: The movement of the intermediate binding set chunks forms an - * acyclic directed graph. We can decide whether or not a {@link BOp} in the - * query plan can be triggered by the current activity pattern by inspecting - * the {@link BOp} and its operands recursively. If neither the {@link BOp} - * nor any of its operands (recursively) has non-zero activity then the - * {@link BOp} can not be triggered and this method will return - * <code>true</code>. + * Note: The movement of the intermediate binding set chunks during query + * processing forms an acyclic directed graph. We can decide whether or not + * a {@link BOp} in the query plan can be triggered by the current activity + * pattern by inspecting the {@link BOp} and its operands recursively. * * @param bopId * The identifier for an operator which appears in the query @@ -92,8 +90,10 @@ if (queryPlan == null) throw new IllegalArgumentException(); + if (queryIndex == null) throw new IllegalArgumentException(); + if (availableChunkCountMap == null) throw new IllegalArgumentException(); @@ -103,7 +103,7 @@ throw new NoSuchBOpException(bopId); final Iterator<BOp> itr = BOpUtility.preOrderIterator(op); - + while (itr.hasNext()) { final BOp t = itr.next(); @@ -112,8 +112,17 @@ if (id == null) continue; + { + /* + * If the operator is running then it is, defacto, "not done." + * + * If any descendants of the operator are running, then they + * could cause the operator to be re-triggered and it is "not + * done." + */ + final AtomicLong runningCount = runningCountMap.get(id); if (runningCount != null && runningCount.get() != 0) { @@ -125,11 +134,16 @@ return false; } - + } { - + + /* + * Any chunks available for the operator in question or any of + * its descendants could cause that operator to be triggered. + */ + final AtomicLong availableChunkCount = availableChunkCountMap .get(id); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -41,31 +41,16 @@ import org.apache.log4j.Logger; -import alice.tuprolog.Prolog; - import com.bigdata.bop.BOp; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IPredicate; -import com.bigdata.bop.bset.Union; import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; -import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.spo.SPORelation; -import com.bigdata.relation.IMutableRelation; -import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.IElementFilter; -import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.Program; -import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; -import com.bigdata.service.ndx.IAsynchronousWriteBufferFactory; -import com.bigdata.striterator.ChunkedArrayIterator; -import com.bigdata.striterator.IChunkedOrderedIterator; /** * A class managing execution of concurrent queries against a local @@ -185,132 +170,6 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * - * FIXME Unit tests for non-distinct {@link IElementFilter}s on an - * {@link IPredicate}, unit tests for distinct element filter on an - * {@link IPredicate} which is capable of distributed operations. Do not use - * distinct where not required (SPOC, only one graph, etc). - * <p> - * It seems like the right way to approach this is by unifying the stackable CTC - * striterator pattern with the chunked iterator pattern and passing the query - * engine (or the bop context) into the iterator construction process (or simply - * requesting that the query engine construct the iterator stack). - * <p> - * In terms of harmonization, it is difficult to say which way would work - * better. In the short term we could simply allow both and mask the differences - * in how we construct the filters, but the conversion to/from striterators and - * chunked iterators seems to waste a bit of effort. - * <p> - * The trickiest part of all of this is to allow a distributed filter pattern - * where the filter gets created on a set of nodes identified by the operator - * and the elements move among those nodes using the query engine's buffers. - * <p> - * To actually implement the distributed distinct filter we need to stack the - * following: - * - * <pre> - * - ITupleIterator - * - Resolve ITuple to Element (e.g., SPOC). - * - Layer on optional IElementFilter associated with the IPredicate. - * - Layer on SameVariableConstraint iff required (done by AccessPath) - * - Resolve SPO to SPO, stripping off the context position. - * - Chunk SPOs (SPO[], IKeyOrder), where the key order is from the access path. - * - Filter SPO[] using DHT constructed on specified nodes of the cluster. - * The SPO[] chunks should be packaged into NIO buffers and shipped to those - * nodes. The results should be shipped back as a bit vectors packaged into - * a NIO buffers. - * - Dechunk SPO[] to SPO since that is the current expectation for the filter - * stack. - * - The result then gets wrapped as a {@link IChunkedOrderedIterator} by - * the AccessPath using a {@link ChunkedArrayIterator}. - * </pre> - * - * This stack is a bit complex(!). But it is certainly easy enough to generate - * the necessary bits programmatically. - * - * FIXME Handling the {@link Union} of binding sets. Consider whether the chunk - * combiner logic from the {@link DistributedJoinTask} could be reused. - * - * FIXME INSERT and DELETE which will construct elements using - * {@link IRelation#newElement(java.util.List, IBindingSet)} from a binding set - * and then use {@link IMutableRelation#insert(IChunkedOrderedIterator)} and - * {@link IMutableRelation#delete(IChunkedOrderedIterator)}. For s/o, we first - * need to move the bits into the right places so it makes sense to unpack the - * processing of the loop over the elements and move the data around, writing on - * each index as necessary. There could be eventually consistent approaches to - * this as well. For justifications we need to update some additional indices, - * in which case we are stuck going through {@link IRelation} rather than - * routing data directly or using the {@link IAsynchronousWriteBufferFactory}. - * For example, we could handle routing and writing in s/o as follows: - * - * <pre> - * INSERT(relation,bindingSets) - * - * expands to - * - * SEQUENCE( - * SELECT(s,p,o), // drop bindings that we do not need - * PARALLEL( - * INSERT_INDEX(spo), // construct (s,p,o) elements and insert - * INSERT_INDEX(pos), // construct (p,o,s) elements and insert - * INSERT_INDEX(osp), // construct (o,s,p) elements and insert - * )) - * - * </pre> - * - * The output of the SELECT operator would be automatically mapped against the - * shards on which the next operators need to write. Since there is a nested - * PARALLEL operator, the mapping will be against the shards of each of the - * given indices. (A simpler operator would invoke - * {@link SPORelation#insert(IChunkedOrderedIterator)}. Handling justifications - * requires that we also formulate the justification chain from the pattern of - * variable bindings in the rule). - * - * FIXME Handle {@link Program}s. There are three flavors, which should probably - * be broken into three operators: sequence(ops), set(ops), and closure(op). The - * 'set' version would be parallelized, or at least have an annotation for - * parallel evaluation. These things belong in the same broad category as the - * join graph since they are operators which control the evaluation of other - * operators (the current pipeline join also has that characteristic which it - * uses to do the nested index subqueries). - * - * FIXME SPARQL to BOP translation - * <p> - * The initial pass should translate from {@link IRule} to {@link BOp}s so we - * can immediately begin running SPARQL queries against the {@link QueryEngine}. - * A second pass should explore a rules base translation from the openrdf SPARQL - * operator tree into {@link BOp}s, perhaps using an embedded {@link Prolog} - * engine. What follows is a partial list of special considerations for that - * translation: - * <ul> - * <li>Distinct can be trivially enforced for default graph queries against the - * SPOC index.</li> - * <li>Local distinct should wait until there is more than one tuple from the - * index since a single tuple does not need to be made distinct using a hash - * map.</li> - * <li>Low volume distributed queries should use solution modifiers which - * evaluate on the query controller node rather than using distributed sort, - * distinct, slice, or aggregation operators.</li> - * <li></li> - * <li></li> - * <li></li> - * <li>High volume queries should use special operators (different - * implementations of joins, use an external merge sort, etc).</li> - * </ul> - * - * FIXME SPARQL Coverage: Add native support for all SPARQL operators. A lot of - * this can be picked up from Sesame. Some things, such as isIRI() can be done - * natively against the {@link IV}. Likewise, there is already a set of - * comparison methods for {@link IV}s which are inlined values. Add support for - * <ul> - * <li></li> - * <li></li> - * <li></li> - * <li></li> - * <li></li> - * <li></li> - * </ul> - * * @todo Expander patterns will continue to exist until we handle the standalone * backchainers in a different manner for scale-out so add support for * those for now. @@ -536,6 +395,8 @@ if (q.isCancelled()) continue; final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll(); + if (chunk == null) + continue; if (log.isTraceEnabled()) log.trace("Accepted chunk: " + chunk); try { @@ -820,6 +681,9 @@ */ protected RunningQuery getRunningQuery(final UUID queryId) { + if(queryId == null) + throw new IllegalArgumentException(); + return runningQueries.get(queryId); } @@ -868,4 +732,13 @@ } + /** + * {@inheritDoc} + * <p> + * The default implementation is a NOP. + */ + public void cancelQuery(UUID queryId, Throwable cause) { + // NOP + } + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -0,0 +1,69 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 21, 2010 + */ + +package com.bigdata.bop.engine; + +import com.bigdata.bop.PipelineOp; + +/** + * Annotations understood by the {@link QueryEngine} which are used for some + * unit tests but which should not be used for real queries. + * <p> + * Note: This class is in the main source tree because {@link QueryEngine} + * references it, but the annotations defined here should only be specified from + * within a unit test. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface QueryEngineTestAnnotations { + + /** + * When <code>true</code>, each chunk will be sent out using its own + * {@link IChunkMessage}. Otherwise the {@link QueryEngine} MAY (and + * generally does) combine the chunks in the output of a given operator + * evaluation pass into a single {@link IChunkMessage} per target query + * peer. + * <p> + * Note: This annotation was introduced to make it easier to control the #of + * {@link IChunkMessage}s output from a given operator and thereby diagnose + * {@link RunState} termination conditions linked to having multiple + * {@link IChunkMessage}s. + * <p> + * Note: Just controlling the {@link PipelineOp.Annotations#CHUNK_CAPACITY} + * and {@link PipelineOp.Annotations#CHUNK_OF_CHUNKS_CAPACITY} is not enough + * to force the {@link QueryEngine} to run the an operator once per source + * chunk. The {@link QueryEngine} normally combines chunks together. You + * MUST also specify this annotation in order for the query engine to send + * multiple {@link IChunkMessage} rather than just one. + */ + String ONE_MESSAGE_PER_CHUNK = QueryEngineTestAnnotations.class.getName() + + ".oneMessagePerChunk"; + + boolean DEFAULT_ONE_MESSAGE_PER_CHUNK = false; + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -0,0 +1,98 @@ +package com.bigdata.bop.engine; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.bigdata.relation.accesspath.IAsynchronousIterator; + +/** + * Delegate pattern cancels the {@link RunningQuery} when the iterator is + * {@link #close() closed} and signals normal completion of the query once the + * iterator is exhausted. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +class QueryResultIterator<E> implements IAsynchronousIterator<E> { + + private final RunningQuery runningQuery; + + private final IAsynchronousIterator<E> src; + + private final AtomicBoolean open = new AtomicBoolean(true); + + public QueryResultIterator(final RunningQuery runningQuery, + final IAsynchronousIterator<E> src) { + + if (runningQuery == null) + throw new IllegalArgumentException(); + + if (src == null) + throw new IllegalArgumentException(); + + this.runningQuery = runningQuery; + + this.src = src; + + } + + public void close() { + if (open.compareAndSet(true/* expect */, false/* update */)) { + try { + runningQuery.cancel(true/* mayInterruptIfRunning */); + } finally { + src.close(); + } + } + } + + private void normalCompletion() { + if (open.compareAndSet(true/* expect */, false/* update */)) { + /* + * Note: DO NOT halt the query here!!!! That will cause it to not + * accept any more messages. Just close the source iterator. + */ +// try { +// runningQuery.halt(); +// } finally { + src.close(); +// } + } + } + + public boolean isExhausted() { +// return src.isExhausted(); + if (src.isExhausted()) { + normalCompletion(); + return true; + } + return false; + } + + public boolean hasNext() { +// return src.hasNext(); + if (!src.hasNext()) { + normalCompletion(); + return false; + } + return true; + } + + public boolean hasNext(long timeout, TimeUnit unit) + throws InterruptedException { + return src.hasNext(timeout, unit); + } + + public E next(long timeout, TimeUnit unit) throws InterruptedException { + return src.next(timeout, unit); + } + + public E next() { + return src.next(); + } + + public void remove() { + src.remove(); + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-23 16:09:13 UTC (rev 3616) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-23 20:10:58 UTC (rev 3617) @@ -29,11 +29,12 @@ import java.rmi.RemoteException; import java.util.Arrays; -import java.util.LinkedHashMap; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,12 +43,10 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; -import com.bigdata.util.InnerCause; +import com.bigdata.relation.accesspath.IBlockingBuffer; /** - * The run state for a {@link RunningQuery}. This class is NOT thread-safe. - * {@link RunningQuery} uses an internal lock to serialize requests against the - * public methods of this class. + * The run state for a {@link RunningQuery}. This class is thread-safe. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -66,19 +65,63 @@ } /** - * Note: Due to concurrency, it is possible for an {@link IChunkMessage} to - * be accepted and the corresponding chunk task started, before a - * {@link RunState#startOp(StartOpMessage)} transition has been fully - * processed. This means that the {@link RunState#totalAvailableChunkCount} - * can become transiently negative. This flag disables asserts which would - * otherwise fail on legal transient negatives. + * Message if the query has already started evaluation. */ - static private boolean availableChunkCountMayBeNegative = true; + static private final transient String ERR_QUERY_STARTED = "Query already running."; + + /** + * Message if query evaluation has already halted. + */ + static private final transient String ERR_QUERY_HALTED = "Query already halted."; + + /** + * Message if an operator addressed by a {@link HaltOpMessage} was never started. + */ + static private final transient String ERR_OP_NOT_STARTED = "Operator never ran."; + + /** + * Message if an operator addressed by a message has been halted. + */ + static private final transient String ERR_OP_HALTED = "Operator is not running."; + + /** + * Message if a query deadline has been exceeded. + */ + static private final transient String ERR_DEADLINE = "Query deadline is expired."; + + /** + * {@link RunningQuery#handleOutputChunk(BOp, int, IBlockingBuffer)} drops + * {@link IChunkMessage}s onto {@link RunningQuery#chunksIn} and drops the + * {@link RunningQuery} on {@link QueryEngine#runningQueries} as soon as + * output {@link IChunkMessage}s are generated. A {@link IChunkMessage} MAY + * be taken for evaluation as soon as it is published. This means that the + * operator which will consume that {@link IChunkMessage} can begin to + * execute <em>before</em> {@link RunningQuery#haltOp(HaltOpMessage)} is + * invoked to indicate the end of the operator which produced that + * {@link IChunkMessage}. + * <p> + * This is all fine. However, due to the potential overlap in these + * schedules {@link RunState#totalAvailableCount} can become transiently + * negative. This flag disables asserts which would otherwise fail on legal + * transient negatives. + */ + static private final boolean availableMessageCountMayBeNegative = true; /** + * Flag may be used to turn on stderr output. + */ + static private final boolean debug = true; + + /** * The query. */ - private final RunningQuery query; + private final BOp query; + + /** + * An index from {@link BOp.Annotations#BOP_ID} to {@link BOp} for the + * {@link #query}. + */ + private final Map<Integer,BOp> bopIndex; /** * The query identifier. @@ -94,36 +137,42 @@ private final long deadline; /** + * Set to <code>true</code> iff the query evaluation has begun. + * + * @see #startQuery(IChunkMessage) + */ + private final AtomicBoolean started = new AtomicBoolean(false); + + /** * Set to <code>true</code> iff the query evaluation is complete due to * normal termination. - * <p> - * Note: This is package private to expose it to {@link RunningQuery}. * * @see #haltOp(HaltOpMessage) */ - /*private*/ final AtomicBoolean allDone = new AtomicBoolean(false); + private final AtomicBoolean allDone = new AtomicBoolean(false); /** * The #of run state transitions which have occurred for this query. */ - private long nsteps = 0; + private final AtomicLong nsteps = new AtomicLong(); /** * The #of tasks for this query which have started but not yet halted. */ - private long totalRunningTaskCount = 0; + private final AtomicLong totalRunningCount = new AtomicLong(); /** - * The #of chunks for this query of which a running task has made available - * but which have not yet been accepted for processing by another task. + * The #of {@link IChunkMessage} for the query which a running task has made + * available but which have not yet been accepted for processing by another + * task. */ - private long totalAvailableChunkCount = 0; + private final AtomicLong totalAvailableCount = new AtomicLong(); /** - * A map reporting the #of chunks available for each operator in the - * pipeline (we only report chunks for pipeline operators). The total #of - * chunks available across all operators in the pipeline is reported by - * {@link #totalAvailableChunkCount}. + * A map reporting the #of {@link IChunkMessage} available for each operator + * in the pipeline. The total #of {@link IChunkMessage}s available across + * all operators in the pipeline is reported by {@link #totalAvailableCount} + * . * <p> * The movement of the intermediate binding set chunks forms an acyclic * directed graph. This map is used to track the #of chunks available for @@ -132,62 +181,166 @@ * {@link BOp} had executed informing the {@link QueryEngine} on that node * that it should immediately release all resources associated with that * {@link BOp}. + * <p> + * Note: This collection is package private in order to expose its state to + * the unit tests. Since the map contains {@link AtomicLong}s it can not be + * readily exposed as {@link Map} object. If we were to expose the map, it + * would have to be via a get(key) style interface. */ - private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); + /* private */final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableMap = new ConcurrentHashMap<Integer, AtomicLong>(); /** * A collection reporting on the #of instances of a given {@link BOp} which * are concurrently executing. + * <p> + * Note: This collection is package private in order to expose its state to + * the unit tests. Since the map contains {@link AtomicLong}s it can not be + * readily exposed as {@link Map} object. If we were to expose the map, it + * would have to be via a get(key) style interface. */ - private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningTaskCountMap = new LinkedHashMap<Integer, AtomicLong>(); + /* private */final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningMap = new ConcurrentHashMap<Integer, AtomicLong>(); /** * A collection of the operators which have executed at least once. */ private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>(); - public RunState(final RunningQuery query) { + /** + * Return the query identifier specified to the constructor. + */ + final public UUID getQueryId() { + return queryId; + } - this.query = query; + /** + * Return the deadline specified to the constructor. + */ + final public long getDeadline() { + return deadline; + } - this.queryId = query.getQueryId(); + /** + * Return <code>true</code> if evaluation of the query has been initiated + * using {@link #startQuery(IChunkMessage)}. + */ + final public boolean isStarted() { + return started.get(); + } - this.deadline = query.getDeadline(); - - // this.nops = query.bopIndex.size(); + /** + * Return <code>true</code> if the query is known to be completed based on + * the {@link #haltOp(HaltOpMessage)}. + */ + final public boolean isAllDone() { + return allDone.get(); + } + /** + * The #of run state transitions which have occurred for this query. + */ + final public long getStepCount() { + return nsteps.get(); } - public void startQuery(final IChunkMessage<?> msg) { + /** + * The #... [truncated message content] |