From: <tho...@us...> - 2011-02-08 17:50:51
|
Revision: 4184 http://bigdata.svn.sourceforge.net/bigdata/?rev=4184&view=rev Author: thompsonbry Date: 2011-02-08 17:50:40 +0000 (Tue, 08 Feb 2011) Log Message: ----------- Added support for "at-once" operator evaluation to the QueryEngine. This is only supported for ChunkedRunningQuery (and its subclass, FederatedRunningQuery), but that is the more efficient evaluation strategy. This is not yet integrated with the NIO buffers used in scale-out so all data is materialized on the Java heap for the moment and "blocked" evaluation of operators based on a memory threshold is not yet supported. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/AllocationContextKey.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SparqlBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_SortOp.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine2.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -301,7 +301,15 @@ return a; } - /** deep copy the arguments. */ + /** + * Deep copy the arguments. + * + * @todo As long as we stick to the immutable semantics for bops, we can + * just make a shallow copy of the arguments in the "copy" constructor + * and then modify them within the specific operator constructor + * before returning control to the caller. This would result in less + * heap churn. + */ static protected BOp[] deepCopy(final BOp[] a) { if (a == NOARGS) { // fast path for zero arity operators. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -32,12 +32,9 @@ import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.btree.ILocalBTreeView; -import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.service.IBigdataFederation; /** * The evaluation context for the operator (NOT serializable). @@ -137,56 +134,40 @@ return sink2; } - /** - * - * @param fed - * The {@link IBigdataFederation} IFF the operator is being - * evaluated on an {@link IBigdataFederation}. When evaluating - * operations against an {@link IBigdataFederation}, this - * reference provides access to the scale-out view of the indices - * and to other bigdata services. - * @param indexManager - * The <strong>local</strong> {@link IIndexManager}. Query - * evaluation occurs against the local indices. In scale-out, - * query evaluation proceeds shard wise and this - * {@link IIndexManager} MUST be able to read on the - * {@link ILocalBTreeView}. - * @param readTimestamp - * The timestamp or transaction identifier against which the - * query is reading. - * @param writeTimestamp - * The timestamp or transaction identifier against which the - * query is writing. - * @param partitionId - * The index partition identifier -or- <code>-1</code> if the - * index is not sharded. - * @param stats - * The object used to collect statistics about the evaluation of - * this operator. - * @param source - * Where to read the data to be consumed by the operator. - * @param sink - * Where to write the output of the operator. - * @param sink2 - * Alternative sink for the output of the operator (optional). - * This is used by things like SPARQL optional joins to route - * failed joins outside of the join group. - * - * @throws IllegalArgumentException - * if the <i>stats</i> is <code>null</code> - * @throws IllegalArgumentException - * if the <i>source</i> is <code>null</code> (use an empty - * source if the source will be ignored). - * @throws IllegalArgumentException - * if the <i>sink</i> is <code>null</code> - * - * @todo modify to accept {@link IChunkMessage} or an interface available - * from getChunk() on {@link IChunkMessage} which provides us with - * flexible mechanisms for accessing the chunk data. - * <p> - * When doing that, modify to automatically track the {@link BOpStats} - * as the <i>source</i> is consumed. - */ + /** + * + * @param runningQuery + * The {@link IRunningQuery}. + * @param partitionId + * The index partition identifier -or- <code>-1</code> if the + * index is not sharded. + * @param stats + * The object used to collect statistics about the evaluation of + * this operator. + * @param source + * Where to read the data to be consumed by the operator. + * @param sink + * Where to write the output of the operator. + * @param sink2 + * Alternative sink for the output of the operator (optional). + * This is used by things like SPARQL optional joins to route + * failed joins outside of the join group. + * + * @throws IllegalArgumentException + * if the <i>stats</i> is <code>null</code> + * @throws IllegalArgumentException + * if the <i>source</i> is <code>null</code> (use an empty + * source if the source will be ignored). + * @throws IllegalArgumentException + * if the <i>sink</i> is <code>null</code> + * + * @todo modify to accept {@link IChunkMessage} or an interface available + * from getChunk() on {@link IChunkMessage} which provides us with + * flexible mechanisms for accessing the chunk data. + * <p> + * When doing that, modify to automatically track the {@link BOpStats} + * as the <i>source</i> is consumed. + */ public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -42,9 +42,6 @@ /** * Base class for the bigdata operation evaluation context (NOT serializable). - * - * @param <E> - * The generic type of the objects processed by the operator. */ public class BOpContextBase { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -661,7 +661,7 @@ static public IBindingSet[] toArray(final Iterator<IBindingSet[]> itr, final BOpStats stats) { - final List<IBindingSet[]> list = new LinkedList<IBindingSet[]>(); + final List<IBindingSet[]> list = new LinkedList<IBindingSet[]>(); int nchunks = 0, nelements = 0; { @@ -676,8 +676,6 @@ nelements += a.length; - list.add(a); - } stats.chunksIn.add(nchunks); @@ -699,19 +697,27 @@ final IBindingSet[] a = new IBindingSet[nelements]; - final Iterator<IBindingSet[]> itr2 = list.iterator(); - - while (itr2.hasNext()) { - - final IBindingSet[] t = itr2.next(); - - System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, - n/* destPos */, t.length/* length */); - - n += t.length; - - } + final Iterator<IBindingSet[]> itr2 = list.iterator(); + while (itr2.hasNext()) { + + final IBindingSet[] t = itr2.next(); + try { + System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, + n/* destPos */, t.length/* length */); + } catch (IndexOutOfBoundsException ex) { + // Provide some more detail in the stack trace. + final IndexOutOfBoundsException ex2 = new IndexOutOfBoundsException( + "t.length=" + t.length + ", a.length=" + a.length + + ", n=" + n); + ex2.initCause(ex); + throw ex2; + } + + n += t.length; + + } + return a; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -27,12 +27,18 @@ package com.bigdata.bop; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import org.apache.log4j.Logger; + import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.ChunkedRunningQuery; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.relation.accesspath.IAsynchronousIterator; /** * Abstract base class for pipeline operators where the data moving along the @@ -55,6 +61,9 @@ */ private static final long serialVersionUID = 1L; + private final static transient Logger log = Logger + .getLogger(PipelineOp.class); + public interface Annotations extends BOp.Annotations, BufferAnnotations { /** @@ -91,6 +100,63 @@ boolean DEFAULT_SHARED_STATE = false; + /** + * Annotation may be used to indicate operators which are not thread + * safe (default {@value #DEFAULT_THREAD_SAFE}). Concurrent invocations + * of the evaluation task will not be scheduled for a given shard for an + * operator which is not thread safe. + * + * @todo Unit tests for {@link ChunkedRunningQuery} to verify that it + * eventually schedules operator tasks which were deferred to + * prevent concurrent evaluation. + */ + String THREAD_SAFE = PipelineOp.class.getName() + ".threadSafe"; + + boolean DEFAULT_THREAD_SAFE = true; + + /** + * Annotation used to mark pipelined (aka vectored) operators. When + * <code>false</code> the operator will use either "at-once" or + * "blocked" evaluation depending on how it buffers its data for + * evaluation. + */ + String PIPELINED = PipelineOp.class.getName() + ".pipelined"; + + boolean DEFAULT_PIPELINED = true; + + /** + * For non-{@link #PIPELINED} operators, this non-negative value + * specifies the maximum #of bytes which the operator may buffer on the + * native heap before evaluation of the operator is triggered -or- ZERO + * (0) if the operator buffers the data on the Java heap (default + * {@value #DEFAULT_MAX_MEMORY}). When non-zero, the #of bytes specified + * should be a multiple of 4k. For a shared operation, the value is the + * maximum #of bytes which may be buffered per shard. + * <p> + * Operator "at-once" evaluation will be used if either (a) the operator + * is buffering data on the Java heap; or (b) the operator is buffering + * data on the native heap and the amount of buffered data does not + * exceed the specified value for {@link #MAX_MEMORY}. For convenience, + * the value {@link Integer#MAX_VALUE} may be specified to indicate that + * "at-once" evaluation is required. + * <p> + * When data are buffered on the Java heap, "at-once" evaluation is + * implied and the data will be made available to the operator as a + * single {@link IAsynchronousIterator} when the operator is invoked. + * <p> + * When {@link #MAX_MEMORY} is positive, data are marshaled in + * {@link ByteBuffer}s and the operator will be invoked once either (a) + * its memory threshold for the buffered data has been exceeded; or (b) + * no predecessor of the operator is running (or can be triggered) -and- + * all inputs for the operator have been materialized on this node. Note + * that some operators DO NOT support multiple pass evaluation + * semantics. Such operators MUST throw an exception if the value of + * this annotation could result in multiple evaluation passes. + */ + String MAX_MEMORY = PipelineOp.class.getName() + ".maxMemory"; + + int DEFAULT_MAX_MEMORY = 0; + // /** // * Annotation used to mark a set of (non-optional) joins which may be // * freely reordered by the query optimizer in order to minimize the @@ -224,17 +290,52 @@ } + /** + * Return <code>true</code> if the operator is pipelined (versus using + * "at-once" or blocked evaluation as discussed below). + * <dl> + * <dt>Pipelined</dt> + * <dd>Pipelined operators stream chunks of intermediate results from one + * operator to the next using producer / consumer pattern. Each time a set + * of intermediate results is available for a pipelined operator, it is + * evaluated against those inputs producing another set of intermediate + * results for its target operator(s). Pipelined operators may be evaluated + * many times during a given query and often have excellent parallelism due + * to the concurrent evaluation of the different operators on different sets + * of intermediate results.</dd> + * <dt>At-Once</dt> + * <dd> + * An "at-once" operator will run exactly once and must wait for all of its + * inputs to be assembled before it runs. There are some operations for + * which "at-once" evaluation is always required, such as ORDER_BY. Other + * operations MAY use operator-at-once evaluation in order to benefit from a + * combination of more efficient IO patterns and simpler design. At-once + * operators may either buffer their data on the Java heap (which is not + * scalable due to the heap pressure exerted on the garbage collector) or + * buffer their data on the native heap (which does scale).</dd> + * <dt>Blocked</dt> + * <dd>Blocked operators buffer large amounts of data on the native heap and + * run each time they exceed some threshold #of bytes of buffered data. A + * blocked operator is basically an "at-once" operator which buffers its + * data on the native heap and which can be evaluated in multiple passes. + * For example, a hash join could use a blocked operator design while an + * ORDER_BY operator can not. By deferring their evaluation until some + * threshold amount of data has been materialized, they may be evaluated + * once or more than once, depending on the data scale, but still retain + * many of the benefits of "at-once" evaluation in terms of IO patterns. + * Whether or not an operator can be used as a "blocked" operator is a + * matter of the underlying operator implementation.</dd> + * </dl> + * + * @see Annotations#PIPELINED + * @see Annotations#MAX_MEMORY + */ + public boolean isPipelined() { + return getProperty(PipelineOp.Annotations.PIPELINED, + PipelineOp.Annotations.DEFAULT_PIPELINED); + } + /** - * Return the {@link PipelineType} of the operator (default - * {@link PipelineType#Vectored}). - */ - public PipelineType getPipelineType() { - - return PipelineType.Vectored; - - } - - /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a * given query. @@ -247,12 +348,21 @@ Annotations.DEFAULT_SHARED_STATE); } - - /** - * Return a new object which can be used to collect statistics on the - * operator evaluation (this may be overridden to return a more specific - * class depending on the operator). - */ + + /** + * Return a new object which can be used to collect statistics on the + * operator evaluation. This may be overridden to return a more specific + * class depending on the operator. + * <p> + * Some operators may use this to share state across multiple invocations of + * the operator within a given query (e.g., {@link SliceOp}). Another + * mechanism for sharing state is to use the same named allocation context + * for the memory manager across the operator invocation instances. + * <p> + * Operator life cycle events support pre-/post-operator behaviors. Such + * events can be used to processed buffered solutions accumulated within + * some shared state across multiple operator invocations. + */ public BOpStats newStats() { return new BOpStats(); @@ -283,7 +393,7 @@ // getChunkTimeout(), Annotations.chunkTimeoutUnit, stats); // // } - + /** * Return a {@link FutureTask} which computes the operator against the * evaluation context. The caller is responsible for executing the @@ -302,5 +412,31 @@ * return the ForkJoinTask. */ abstract public FutureTask<Void> eval(BOpContext<IBindingSet> context); - + + /** + * Hook to setup any resources associated with the operator (temporary + * files, memory manager allocation contexts, etc.). This hook is invoked + * exactly once and before any instance task for the operator is evaluated. + */ + public void setUp() throws Exception { + + if (log.isTraceEnabled()) + log.trace("bopId=" + getId()); + + } + + /** + * Hook to tear down any resources associated with the operator (temporary + * files, memory manager allocation contexts, etc.). This hook is invoked + * exactly once no later than when the query is cancelled. If the operator + * is known to be done executing, then this hook will be invoked at that + * time. + */ + public void tearDown() throws Exception { + + if (log.isTraceEnabled()) + log.trace("bopId=" + getId()); + + } + } Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -1,68 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Sep 21, 2010 - */ - -package com.bigdata.bop; - -/** - * Return the type of pipelining supported by an operator. - * <p> - * Note: bigdata does not support tuple-at-a-time processing. Only vectored and - * operator-at-a-time processing. Tuple at a time processing is generally very - * inefficient. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public enum PipelineType { - - /** - * Vectored operators stream chunks of intermediate results from one - * operator to the next using producer / consumer pattern. Each time a set - * of intermediate results is available for a vectored operator, it is - * evaluated against those inputs producing another set of intermediate - * results for its target operator(s). Vectored operators may be evaluated - * many times during a given query and often have excellent parallelism due - * to the concurrent evaluation of the different operators on different sets - * of intermediate results. - */ - Vectored, - - /** - * The operator will run exactly once and must wait for all of its inputs to - * be assembled before it runs. - * <p> - * There are some operations for which this is always true, such as SORT. - * Other operations MAY use operator-at-once evaluation in order to benefit - * from a combination of more efficient IO patterns and simpler design. - * However, pipelined operators using large memory blocks have many of the - * benefits of operator-at-once evaluation. By deferring their evaluation - * until some minimum number of source data blocks are available, they may - * be evaluated once or more than once, depending on the data scale. - */ - OneShot; - -} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -38,10 +38,10 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * @todo I think that we can avoid quoting operators by using annotations (for - * some cases) and through explicit interaction between operators for - * others (such as between a join and a predicate). If that proves to be - * true then this class will be dropped. + * @deprecated I think that we can avoid quoting operators by using annotations + * (for some cases) and through explicit interaction between + * operators for others (such as between a join and a predicate). If + * that proves to be true then this class will be dropped. */ public class QuoteOp extends BOpBase { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -29,6 +29,8 @@ import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -235,6 +237,16 @@ */ private final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); + /** + * A collection reporting on whether or not a given operator has been torn + * down. This collection is used to provide the guarantee that an operator + * is torn down exactly once, regardless of the #of invocations of the + * operator or the #of errors which might occur during query processing. + * + * @see PipelineOp#tearDown() + */ + private final Map<Integer/* bopId */, AtomicBoolean> tornDown = new LinkedHashMap<Integer, AtomicBoolean>(); + /** * Set the query deadline. The query will be cancelled when the deadline is * passed. If the deadline is passed, the query is immediately cancelled. @@ -601,8 +613,23 @@ try { - if (runState.startOp(msg)) + if (runState.startOp(msg)) { + + /* + * Set a flag in this collection so we will know that this + * operator needs to be torn down (we do not bother to tear down + * operators which have never been setup). + */ + tornDown.put(msg.bopId, new AtomicBoolean(false)); + + /* + * TODO It is a bit dangerous to hold the lock while we do this + * but this needs to be executed before any other thread can + * start an evaluation task for that operator. + */ lifeCycleSetUpOperator(msg.bopId); + + } } catch (TimeoutException ex) { @@ -616,19 +643,19 @@ } - /** - * Message provides notice that the operator has ended execution. The - * termination conditions for the query are checked. (For scale-out, the - * node node controlling the query needs to be involved for each operator - * start/stop in order to make the termination decision atomic). - * - * @param msg - * The {@link HaltOpMessage} - * - * @throws UnsupportedOperationException - * If this node is not the query coordinator. - */ - final protected void haltOp(final HaltOpMessage msg) { + /** + * Message provides notice that the operator has ended execution. The + * termination conditions for the query are checked. (For scale-out, the + * node controlling the query needs to be involved for each operator + * start/stop in order to make the termination decision atomic). + * + * @param msg + * The {@link HaltOpMessage} + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + /*final*/ protected void haltOp(final HaltOpMessage msg) { if (!controller) throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); @@ -653,13 +680,20 @@ if (runState.haltOp(msg)) { - /* - * No more chunks can appear for this operator so invoke its end - * of life cycle hook. - */ + /* + * No more chunks can appear for this operator so invoke its end + * of life cycle hook IFF it has not yet been invoked. + */ - lifeCycleTearDownOperator(msg.bopId); + final AtomicBoolean tornDown = AbstractRunningQuery.this.tornDown + .get(msg.bopId); + if (tornDown.compareAndSet(false/* expect */, true/* update */)) { + + lifeCycleTearDownOperator(msg.bopId); + + } + if (runState.isAllDone()) { // Normal termination. @@ -681,6 +715,69 @@ } + /** + * Return <code>true</code> iff the preconditions have been satisfied for + * the "at-once" invocation of the specified operator (no predecessors are + * running or could be triggered and the operator has not been evaluated). + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + */ + protected boolean isAtOnceReady(final int bopId) { + + lock.lock(); + + try { + +// if (isDone()) { +// // The query has already halted. +// throw new InterruptedException(); +// } + + return runState.isAtOnceReady(bopId); + + } finally { + + lock.unlock(); + + } + + } + +// /** +// * Return <code>true</code> iff there is already an instance of the operator +// * running. +// * +// * @param bopId +// * The bopId of the operator. +// * +// * @return True iff there is at least one instance of the operator running +// * (globally for this query). +// */ +// public boolean isOperatorRunning(final int bopId) { +// +// lock.lock(); +// +// try { +// +// final AtomicLong nrunning = runState.runningMap.get(bopId); +// +// if (nrunning == null) +// return false; +// +// return nrunning.get() > 0; +// +// } finally { +// +// lock.unlock(); +// +// } +// +// } + /** * Hook invoked the first time the given operator is evaluated for the * query. This may be used to set up life cycle resources for the operator, @@ -690,27 +787,53 @@ * @param bopId * The operator identifier. */ - protected void lifeCycleSetUpOperator(final int bopId) { + protected void lifeCycleSetUpOperator(final int bopId) { - if (log.isTraceEnabled()) - log.trace("queryId=" + queryId + ", bopId=" + bopId); + final BOp op = getBOpIndex().get(bopId); - } + if (op instanceof PipelineOp) { - /** - * Hook invoked the after the given operator has been evaluated for the - * query for what is known to be the last time. This may be used to tear - * down life cycle resources for the operator, such as a distributed hash - * table on a set of nodes identified by annotations of the operator. - * - * @param bopId - * The operator identifier. - */ - protected void lifeCycleTearDownOperator(final int bopId) { + try { - if (log.isTraceEnabled()) - log.trace("queryId=" + queryId + ", bopId=" + bopId); + ((PipelineOp) op).setUp(); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + } + + } + + /** + * Hook invoked the after the given operator has been evaluated for the + * query for what is known to be the last time. This may be used to tear + * down life cycle resources for the operator, such as a distributed hash + * table on a set of nodes identified by annotations of the operator. + * + * @param bopId + * The operator identifier. + */ + protected void lifeCycleTearDownOperator(final int bopId) { + + final BOp op = getBOpIndex().get(bopId); + + if (op instanceof PipelineOp) { + + try { + + ((PipelineOp) op).tearDown(); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + + } + } /** @@ -730,6 +853,27 @@ */ protected void lifeCycleTearDownQuery() { + final Iterator<Map.Entry<Integer/* bopId */, AtomicBoolean/* tornDown */>> itr = tornDown + .entrySet().iterator(); + + while(itr.hasNext()) { + + final Map.Entry<Integer/* bopId */, AtomicBoolean/* tornDown */> entry = itr + .next(); + + final AtomicBoolean tornDown = entry.getValue(); + + if (tornDown.compareAndSet(false/* expect */, true/* update */)) { + + /* + * Guaranteed one time tear down for this operator. + */ + lifeCycleTearDownOperator(entry.getKey()/* bopId */); + + } + + } + if (log.isTraceEnabled()) log.trace("queryId=" + queryId); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -30,6 +30,7 @@ import java.io.Serializable; import com.bigdata.bop.BOp; +import com.bigdata.bop.PipelineOp; import com.bigdata.counters.CAT; /** @@ -58,6 +59,13 @@ * The #of instances of a given operator which have been created for a given * query. This provides interesting information about the #of task instances * for each operator which were required to execute a query. + * + * TODO Due to the way this is incremented, this is always ONE (1) if + * {@link PipelineOp.Annotations#SHARED_STATE} is <code>true</code> (it + * reflects the #of times {@link #add(BOpStats)} was invoked plus one for + * the ctor rather than the #of times the operator task was invoked). This + * should be changed to reflect the #of operator task instances created + * instead. */ final public CAT opCount = new CAT(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -35,7 +35,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -50,6 +49,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.fed.FederatedRunningQuery; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.BufferClosedException; @@ -642,19 +642,42 @@ } } - /** - * Examine the input queue for the (bopId,partitionId). If there is work - * available and no task is currently running, then drain the work queue and - * submit a task to consume that work. - * - * @param bundle - * The (bopId,partitionId). - * - * @return <code>true</code> if a new task was started. - */ + /** + * Overridden to attempt to consume another chunk each time an operator + * reports that it has halted evaluation. This is necessary because the + * haltOp() message can arrive asynchronously, so we need to test the work + * queues in case there are "at-once" operators awaiting the termination of + * their predecessor(s) in the pipeline. + */ + @Override + protected void haltOp(final HaltOpMessage msg) { + super.haltOp(msg); + consumeChunk(); + } + + /** + * Examine the input queue for the (bopId,partitionId). If there is work + * available, then drain the work queue and submit a task to consume that + * work. This handles {@link PipelineOp.Annotations#THREAD_SAFE}, + * {@link PipelineOp.Annotations#PIPELINED} as special cases. + * + * + * @param bundle + * The (bopId,partitionId). + * + * @return <code>true</code> if a new task was started. + * + * @todo Also handle {@link PipelineOp.Annotations#MAX_MEMORY} here by + * handshaking with the {@link FederatedRunningQuery}. + */ private boolean scheduleNext(final BSBundle bundle) { if (bundle == null) throw new IllegalArgumentException(); + final BOp bop = getBOpIndex().get(bundle.bopId); + final boolean threadSafe = bop.getProperty( + PipelineOp.Annotations.THREAD_SAFE, + PipelineOp.Annotations.DEFAULT_THREAD_SAFE); + final boolean pipelined = ((PipelineOp)bop).isPipelined();; lock.lock(); try { // Make sure the query is still running. @@ -664,14 +687,31 @@ ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures .get(bundle); if (map != null) { - int nrunning = 0; +// // #of instances of the operator already running. +// int nrunning = 0; for (ChunkFutureTask cft : map.keySet()) { - if (cft.isDone()) + if (cft.isDone()) { + // Remove tasks which have already terminated. map.remove(cft); - nrunning++; + } +// nrunning++; } - if (map.isEmpty()) + if (map.isEmpty()) { + // No tasks running for this operator. operatorFutures.remove(bundle); + } else { + // At least one task is running for this operator. + if (!threadSafe) { + /* + * This operator is not thread-safe, so reject + * concurrent execution for the same (bopId,shardId). + */ + if (log.isDebugEnabled()) + log.debug("Rejecting concurrent execution: " + + bundle + ", #running=" + map.size()); + return false; + } + } /* * FIXME If we allow a limit on the concurrency then we need to * manage things in order to guarantee that deadlock can not @@ -709,26 +749,53 @@ // } // // } - // Remove the work queue for that (bopId,partitionId). + // Get the work queue for that (bopId,partitionId). final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues - .remove(bundle); - if (queue == null || queue.isEmpty()) { + .get(bundle); + if (queue == null) { // no work return false; } + if (!pipelined && !queue.isEmpty() && !isAtOnceReady(bundle.bopId)) { + /* + * This operator is not pipelined, so we need to wait until all + * of its input solutions have been materialized (no prior + * operator in the pipeline is running or has inputs available + * which could cause it to run). + * + * TODO This is where we should examine MAX_MEMORY and the + * buffered data to see whether or not to trigger an evaluation + * pass for the operator based on the data already materialized + * for that operator. + */ + if (log.isDebugEnabled()) + log.debug("Waiting on producer(s): bopId=" + bundle.bopId); + return false; + } + // Remove the work queue for that (bopId,partitionId). + operatorQueues.remove(bundle); + if (queue.isEmpty()) { + // no work + return false; + } // Drain the work queue for that (bopId,partitionId). final List<IChunkMessage<IBindingSet>> messages = new LinkedList<IChunkMessage<IBindingSet>>(); queue.drainTo(messages); final int nmessages = messages.size(); - /* - * Combine the messages into a single source to be consumed by a - * task. - */ - int nchunks = 1; - final IMultiSourceAsynchronousIterator<IBindingSet[]> source = new MultiSourceSequentialAsynchronousIterator<IBindingSet[]>(messages.remove(0).getChunkAccessor().iterator()); + /* + * Combine the messages into a single source to be consumed by a + * task. + * + * @todo We could limit the #of chunks combined here by leaving the + * rest on the work queue. + */ +// int nchunks = 1; + final IMultiSourceAsynchronousIterator<IBindingSet[]> source = new MultiSourceSequentialAsynchronousIterator<IBindingSet[]>(// + messages.remove(0).getChunkAccessor().iterator()// + ); for (IChunkMessage<IBindingSet> msg : messages) { source.add(msg.getChunkAccessor().iterator()); - nchunks++; +// nchunks++; } /* * Create task to consume that source. @@ -748,6 +815,9 @@ /* * Submit task for execution (asynchronous). */ + if (log.isDebugEnabled()) + log.debug("Running task: bop=" + bundle.bopId + ", nmessages=" + + nmessages); getQueryEngine().execute(cft); return true; } finally { @@ -1409,9 +1479,6 @@ */ synchronized (this) { - if (smallChunks == null) - smallChunks = new LinkedList<IBindingSet[]>(); - if (chunkSize + e.length > chunkCapacity) { // flush the buffer first. @@ -1419,6 +1486,9 @@ } + if (smallChunks == null) + smallChunks = new LinkedList<IBindingSet[]>(); + smallChunks.add(e); chunkSize += e.length; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -9,15 +9,15 @@ lic...@bi... This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by +it under the terms of the GNU General License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +GNU General License for more details. -You should have received a copy of the GNU General Public License +You should have received a copy of the GNU General License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -114,26 +114,45 @@ * @return The query deadline (milliseconds since the epoch) and * {@link Long#MAX_VALUE} if no explicit deadline was specified. */ - public long getDeadline(); + long getDeadline(); /** * The timestamp (ms) when the query began execution. */ - public long getStartTime(); + long getStartTime(); /** * The timestamp (ms) when the query was done and ZERO (0) if the query is * not yet done. */ - public long getDoneTime(); + long getDoneTime(); /** * The elapsed time (ms) for the query. This will be updated for each call * until the query is done executing. */ - public long getElapsed(); - - /** + long getElapsed(); + +// /** +// * Return <code>true</code> if there are no operators which could +// * (re-)trigger the specified operator. +// * <p> +// * Note: This is intended to be invoked synchronously from within the +// * evaluation of the operator in order to determine whether or not the +// * operator can be invoked again for this running query. +// * +// * @param bopId +// * The specified operator. +// * @param nconsumed +// * The #of {@link IChunkMessage} consumed by the operator during +// * its current invocation. +// * +// * @return <code>true</code> iff it is not possible for the specified +// * operator to be retriggered. +// */ +// boolean isLastInvocation(final int bopId,final int nconsumed); + +// /** // * Cancel the running query (normal termination). // * <p> // * Note: This method provides a means for an operator to indicate that the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -47,42 +47,45 @@ private static final Logger log = Logger.getLogger(PipelineUtility.class); - /** - * Return <code>true</code> iff <i>availableChunkMap</i> map is ZERO (0) for - * the given operator and its descendants AND the <i>runningCountMap</i> is - * ZERO (0) for the operator and all descendants of the operator. For the - * purposes of this method, only {@link BOp#args() operands} are considered - * as descendants. - * <p> - * Note: The movement of the intermediate binding set chunks during query - * processing forms an acyclic directed graph. We can decide whether or not - * a {@link BOp} in the query plan can be triggered by the current activity - * pattern by inspecting the {@link BOp} and its operands recursively. - * - * @param bopId - * The identifier for an operator which appears in the query - * plan. - * @param queryPlan - * The query plan. - * @param queryIndex - * An index for the query plan as constructed by - * {@link BOpUtility#getIndex(BOp)}. - * @param runningCountMap - * A map reporting the #of instances of each operator which are - * currently being evaluated (distinct evaluations are performed - * for each chunk and shard). - * @param availableChunkCountMap - * A map reporting the #of chunks available for each operator in - * the pipeline (we only report chunks for pipeline operators). - * - * @return <code>true</code> iff the {@link BOp} can not be triggered given - * the query plan and the activity map. - * - * @throws IllegalArgumentException - * if any argument is <code>null</code>. - * @throws NoSuchBOpException - * if <i>bopId</i> is not found in the query index. - */ + /** + * Return <code>true</code> iff the running query state is such that it is + * no longer possible for an operator to run which could cause solutions to + * be propagated to the operator identified by the <i>bopId</i>. + * Specifically, this returns true iff <i>availableChunkMap</i> map is ZERO + * (0) for the given operator and its descendants AND the + * <i>runningCountMap</i> is ZERO (0) for the operator and all descendants + * of the operator. For the purposes of this method, only {@link BOp#args() + * operands} are considered as descendants. + * <p> + * Note: The movement of the intermediate binding set chunks during query + * processing forms an acyclic directed graph. We can decide whether or not + * a {@link BOp} in the query plan can be triggered by the current activity + * pattern by inspecting the {@link BOp} and its operands recursively. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the {@link BOp} can not be triggered given + * the query plan and the activity map. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + */ static public boolean isDone(final int bopId, final BOp queryPlan, final Map<Integer, BOp> queryIndex, final Map<Integer, AtomicLong> runningCountMap, @@ -127,8 +130,8 @@ if (runningCount != null && runningCount.get() != 0) { - if (log.isInfoEnabled()) - log.info("Operator can be triggered: op=" + op + if (log.isDebugEnabled()) + log.debug("Operator can be triggered: op=" + op + ", possible trigger=" + t + " is running."); return false; @@ -150,8 +153,8 @@ if (availableChunkCount != null && availableChunkCount.get() != 0) { - if (log.isInfoEnabled()) - log.info("Operator can be triggered: op=" + op + if (log.isDebugEnabled()) + log.debug("Operator can be triggered: op=" + op + ", possible trigger=" + t + " has " + availableChunkCount + " chunks available."); @@ -170,4 +173,145 @@ } + /** + * Return <code>true</code> iff the running query state is such that the + * "at-once" evaluation of the specified operator may proceed. The specific + * requirements are: (a) the operator is not running and has not been + * started; (b) no predecessor in the pipeline is running; and (c) no + * predecessor in the pipeline can be triggered. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + * + * TODO Unit tests. + */ + static public boolean isAtOnceReady(final int bopId, final BOp queryPlan, + final Map<Integer, BOp> queryIndex, + final Map<Integer, AtomicLong> runningCountMap, + final Map<Integer, AtomicLong> availableChunkCountMap) { + + if (queryPlan == null) + throw new IllegalArgumentException(); + + if (queryIndex == null) + throw new IllegalArgumentException(); + + if (availableChunkCountMap == null) + throw new IllegalArgumentException(); + + final BOp op = queryIndex.get(bopId); + + if (op == null) + throw new NoSuchBOpException(bopId); + + final boolean didStart = runningCountMap.get(bopId) != null; + + if(didStart) { + + // Evaluation has already run (or begun) for this operator. + if (log.isInfoEnabled()) + log.info("Already ran/running: " + bopId); + + return false; + + } + + final Iterator<BOp> itr = BOpUtility.preOrderIterator(op); + + while (itr.hasNext()) { + + final BOp t = itr.next(); + + final Integer id = (Integer) t.getProperty(BOp.Annotations.BOP_ID); + + if (id == null) // TODO Why allow ops w/o bopId here? + continue; + + if(bopId == id.intValue()) { + + // Ignore self. + continue; + + } + + { + + /* + * If any descendants (aka predecessors) of the operator are + * running, then they could cause produce additional solutions + * so the operator is not ready for "at-once" evaluation. + */ + + final AtomicLong runningCount = runningCountMap.get(id); + + if (runningCount != null && runningCount.get() != 0) { + + if (log.isDebugEnabled()) + log.debug("Predecessor running: predecessorId=" + id + + ", predecessorRunningCount=" + runningCount); + + return false; + + } + + } + + { + + /* + * Any chunks available for a descendant (aka predecessor) of + * the operator could produce additional solutions as inputs to + * the operator so it is not ready for "at-once" evaluation. + */ + + final AtomicLong availableChunkCount = availableChunkCountMap + .get(id); + + if (availableChunkCount != null + && availableChunkCount.get() != 0) { + /* + * We are looking at some other predecessor of the specified + * operator. + */ + if (log.isDebugEnabled()) + log.debug("Predecessor can be triggered: predecessorId=" + + id + " has " + availableChunkCount + + " chunks available."); + + return false; + } + + } + + } + + // Success. + if (log.isInfoEnabled()) + log.info("Ready for 'at-once' evaluation: " + bopId); + + return true; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -541,7 +541,7 @@ * Since it is defacto done when [isAllDone] is satisfied, this tests * for that condition first and then for isOperatorDone(). */ - final boolean isOpDone = isAllDone||isOperatorDone(msg.bopId); + final boolean isOpDone = isAllDone || isOperatorDone(msg.bopId); // if (isAllDone && !isOpDone) // throw new RuntimeException("Whoops!: "+this); @@ -575,23 +575,24 @@ } - /** - * Return <code>true</code> the specified operator can no longer be - * triggered by the query. The specific criteria are that no operators which - * are descendants of the specified operator are running or have chunks - * available against which they could run. Under those conditions it is not - * possible for a chunk to show up which would cause the operator to be - * executed. - * - * @param bopId - * Some operator identifier. - * - * @return <code>true</code> if the operator can not be triggered given the - * current query activity. - * - * @throws IllegalMonitorStateException - * unless the {@link #runStateLock} is held by the caller. - */ + /** + * Return <code>true</code> the specified operator can no longer be + * triggered by the query. The specific criteria are that no operators which + * are descendants of the specified operator are running or have chunks + * available against which they could run. Under those conditions it is not + * possible for a chunk to show up which would cause the operator to be + * executed. + * <p> + * Note: The caller MUST hold a lock across this operation in order for it + * to be atomic with respect to the concurrent evaluation of other operators + * for the same query. + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> if the operator can not be triggered given the + * current query activity. + */ private boolean isOperatorDone(final int bopId) { return PipelineUtility.isDone(bopId, query, bopIndex, runningMap, @@ -599,6 +600,24 @@ } + /** + * Return <code>true</code> iff the preconditions have been satisfied for + * the "at-once" invocation of the specified operator (no predecessors are + * running or could be triggered and the operator has not been evaluated). + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + */ + boolean isAtOnceReady(final int bopId) { + + return PipelineUtility.isAtOnceReady(bopId, query, bopIndex, + runningMap, availableMap); + + } + /** * Update the {@link RunState} to reflect that fact that a new evaluation * phase has begun for an operator. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -27,7 +27,6 @@ package com.bigdata.bop.engine; -import java.nio.channels.ClosedByInterruptException; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -46,11 +45,9 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.MultiplexBlockingBuffer; -import com.bigdata.util.InnerCause; /** * An {@link IRunningQuery} implementation for a standalone database in which a @@ -68,7 +65,9 @@ * sources for the sink have been closed. * <p> * This implementation does not use {@link IChunkMessage}s, can not be used with - * scale-out, and does not support sharded indices. + * scale-out, and does not support sharded indices. This implementation ONLY + * supports pipelined operators. If "at-once" evaluation semantics are required, + * then use {@link ChunkedRunningQuery}. * * @todo Since each operator task runs exactly once there is less potential * parallelism in the operator task execution when compared to @@ -81,8 +80,16 @@ * @todo Run all unit tests of the query engine against the appropriate * strategies. * + * @todo Since this class does not support "at-once" evaluation, it can not be + * used with ORDER BY operator implementations. That really restricts its + * ... [truncated message content] |