From: <tho...@us...> - 2010-09-01 18:27:44
|
Revision: 3489 http://bigdata.svn.sourceforge.net/bigdata/?rev=3489&view=rev Author: thompsonbry Date: 2010-09-01 18:27:35 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Added life cycle hooks for operator evaluation to the query engine. Modified the operator evaluation context to use the bop annotations for the access path. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/DuplicateBOpException.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine2.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -152,6 +152,16 @@ * The default timeout for operator evaluation. */ long DEFAULT_TIMEOUT = Long.MAX_VALUE; + + /** + * For hash partitioned operators, this is the set of the member nodes + * for the operator. + * <p> + * This annotation is required for such operators since the set of known + * nodes of a given type (such as all data services) can otherwise + * change at runtime. + */ + String MEMBER_SERVICES = "memberServices"; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -30,29 +30,29 @@ import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; +import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; import com.bigdata.journal.TimestampUtility; -import com.bigdata.relation.AbstractRelation; import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.locator.IResourceLocator; import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.eval.IJoinNexus; +import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; +import com.bigdata.striterator.IKeyOrder; /** * The evaluation context for the operator (NOT serializable). * * @param <E> * The generic type of the objects processed by the operator. - * - * @todo Make it easy to obtain another {@link BOpContext} in which the source - * or sink are different? E.g., for the evaluation of the right operand in - * a join. */ public class BOpContext<E> { @@ -189,6 +189,9 @@ * @throws IllegalArgumentException * if the <i>indexManager</i> is <code>null</code> * @throws IllegalArgumentException + * if the <i>indexManager</i> is is not a <em>local</em> index + * manager. + * @throws IllegalArgumentException * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} * (queries may not read on the unisolated indices). * @throws IllegalArgumentException @@ -210,6 +213,16 @@ final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { if (indexManager == null) throw new IllegalArgumentException(); + if (indexManager instanceof IBigdataFederation<?>) { + /* + * This is disallowed because the predicate specifies an index + * partition and expects to have access to the local index objects + * for that index partition. + */ + throw new IllegalArgumentException( + "Expecting a local index manager, not: " + + indexManager.getClass().toString()); + } if (readTimestamp == ITx.UNISOLATED) throw new IllegalArgumentException(); if (TimestampUtility.isReadOnly(writeTimestamp)) @@ -263,7 +276,6 @@ } /** - /** * Obtain an access path reading from relation for the specified predicate * (from the tail of some rule). * <p> @@ -282,12 +294,44 @@ * * @return The access path. * - * @todo replaces {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. + * @todo replaces + * {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. */ @SuppressWarnings("unchecked") public IAccessPath<?> getAccessPath(final IRelation<?> relation, final IPredicate<?> predicate) { + if (relation == null) + throw new IllegalArgumentException(); + + if (predicate == null) + throw new IllegalArgumentException(); + + final IKeyOrder keyOrder = relation.getKeyOrder((IPredicate) predicate); + + if (keyOrder == null) + throw new RuntimeException("No access path: " + predicate); + + final int partitionId = predicate.getPartitionId(); + + final int flags = predicate.getProperty( + PipelineOp.Annotations.FLAGS, + PipelineOp.Annotations.DEFAULT_FLAGS) + | (TimestampUtility.isReadOnly(getReadTimestamp()) ? IRangeQuery.READONLY + : 0); + + final int chunkOfChunksCapacity = predicate.getProperty( + PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, + PipelineOp.Annotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); + + final int chunkCapacity = predicate.getProperty( + PipelineOp.Annotations.CHUNK_CAPACITY, + PipelineOp.Annotations.DEFAULT_CHUNK_CAPACITY); + + final int fullyBufferedReadThreshold = predicate.getProperty( + PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD, + PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); + if (predicate.getPartitionId() != -1) { /* @@ -299,18 +343,64 @@ * require a total view of the relation, which is not available * during scale-out pipeline joins. Likewise, the [backchain] * property will be ignored since it is handled by an expander. + * + * @todo Replace this with IRelation#getAccessPathForIndexPartition() */ +// return ((AbstractRelation<?>) relation) +// .getAccessPathForIndexPartition(indexManager, +// (IPredicate) predicate); + /* + * @todo This condition should probably be an error since the expander + * will be ignored. + */ +// if (predicate.getSolutionExpander() != null) +// throw new IllegalArgumentException(); + + final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); - return ((AbstractRelation<?>) relation) - .getAccessPathForIndexPartition(indexManager, - (IPredicate) predicate); + // The name of the desired index partition. + final String name = DataService.getIndexPartitionName(namespace + + "." + keyOrder.getIndexName(), partitionId); + // MUST be a local index view. + final ILocalBTreeView ndx = (ILocalBTreeView) indexManager + .getIndex(name, readTimestamp); + + return new AccessPath(relation, indexManager, readTimestamp, + predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, + chunkCapacity, fullyBufferedReadThreshold).init(); + } - // Find the best access path for the predicate for that relation. - final IAccessPath<?> accessPath = relation - .getAccessPath((IPredicate) predicate); + /* + * Find the best access path for the predicate for that relation. + * + * @todo Replace this with IRelation#getAccessPath(IPredicate) once the + * bop conversion is done. It is the same logic. + */ + IAccessPath accessPath; + { +// accessPath = relation.getAccessPath((IPredicate) predicate); + + final IIndex ndx = relation.getIndex(keyOrder); + + if (ndx == null) { + + throw new IllegalArgumentException("no index? relation=" + + relation.getNamespace() + ", timestamp=" + + readTimestamp + ", keyOrder=" + keyOrder + ", pred=" + + predicate + ", indexManager=" + getIndexManager()); + + } + + accessPath = new AccessPath((IRelation) relation, indexManager, + readTimestamp, (IPredicate) predicate, + (IKeyOrder) keyOrder, ndx, flags, chunkOfChunksCapacity, + chunkCapacity, fullyBufferedReadThreshold).init(); + + } + /* * @todo No expander's for bops, at least not right now. They could be * added in easily enough, which would support additional features for Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -29,8 +29,12 @@ import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; + import com.bigdata.bop.BOp.Annotations; import com.bigdata.btree.AbstractNode; @@ -44,12 +48,11 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo In general recursive traversal iterators do not protect against loops - * in the operator tree, but see {@link #getIndex(BOp)}. */ public class BOpUtility { + private static final Logger log = Logger.getLogger(BOpUtility.class); + /** * Pre-order recursive visitation of the operator tree (arguments only, no * annotations). @@ -361,36 +364,53 @@ * Return an index from the {@link BOp.Annotations#BOP_ID} to the * {@link BOp} for each spanned {@link BOp} (including annotations). * {@link BOp}s without identifiers are not indexed. + * <p> + * {@link BOp}s should form directed acyclic graphs, but this is not + * strictly enforced. The recursive traversal iterators declared by this + * class do not protect against loops in the operator tree. However, + * {@link #getIndex(BOp)} detects and report loops based on duplicate + * {@link Annotations#BOP_ID}s -or- duplicate {@link BOp} references. * * @param op * A {@link BOp}. * * @return The index. * - * @todo define recursive striterator for {@link BOp}s (as top-level method) - * and then layer on an expander for the {@link BOp} annotations. - * Finally, layer in a filter for the presence of the bopId. The - * {@link BOp}s visited by the iterator should be inserted into the - * indexed. [it is an error if there is a duplicate bopId.] + * @throws DuplicateBOpIdException + * if there are two or more {@link BOp}s having the same + * {@link Annotations#BOP_ID}. + * @throws BadBOpIdTypeException + * if the {@link Annotations#BOP_ID} is not an {@link Integer}. + * @throws DuplicateBOpException + * if the same {@link BOp} appears more once in the operator + * tree and it is neither an {@link IVariable} nor an + * {@link IConstant}. */ static public Map<Integer,BOp> getIndex(final BOp op) { final LinkedHashMap<Integer, BOp> map = new LinkedHashMap<Integer, BOp>(); + final LinkedHashSet<BOp> distinct = new LinkedHashSet<BOp>(); final Iterator<BOp> itr = preOrderIteratorWithAnnotations(op); while (itr.hasNext()) { final BOp t = itr.next(); final Object x = t.getProperty(Annotations.BOP_ID); - if (x == null) { - continue; + if (x != null) { + if (!(x instanceof Integer)) { + throw new BadBOpIdTypeException("Must be Integer, not: " + + x.getClass() + ": " + Annotations.BOP_ID); + } + final Integer id = (Integer) t.getProperty(Annotations.BOP_ID); + final BOp conflict = map.put(id, t); + if (conflict != null) + throw new DuplicateBOpIdException("duplicate id=" + id + + " for " + conflict + " and " + t); } - if (!(x instanceof Integer)) { - throw new BadBOpIdTypeException("Must be Integer, not: " - + x.getClass() + ": " + Annotations.BOP_ID); + if (!distinct.add(t) && !(t instanceof IVariableOrConstant<?>)) { + /* + * BOp appears more than once. This is only allowed for + * constants and variables. + */ + throw new DuplicateBOpException(t.toString()); } - final Integer id = (Integer) t.getProperty(Annotations.BOP_ID); - final BOp conflict = map.put(id, t); - if (conflict != null) - throw new DuplicateBOpIdException("duplicate id=" + id + " for " - + conflict + " and " + t); } return map; } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/DuplicateBOpException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/DuplicateBOpException.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/DuplicateBOpException.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -0,0 +1,24 @@ +package com.bigdata.bop; + +/** + * Exception thrown when a {@link BOp} appears more than once in an operator + * tree (operator trees must not contain loops and operator instances may not + * appear more than once unless they are an {@link IConstant} or an + * {@link IVariable}). + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DuplicateBOpIdException.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + */ +public class DuplicateBOpException extends RuntimeException { + + /** + * @param msg + */ + public DuplicateBOpException(String msg) { + super(msg); + } + + private static final long serialVersionUID = 1L; + +} \ No newline at end of file Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/DuplicateBOpException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -30,6 +30,8 @@ import java.util.concurrent.FutureTask; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.btree.IRangeQuery; +import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IBuffer; @@ -55,6 +57,8 @@ * would block (default {@value #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY}). * Note that partial chunks may be combined into full chunks whose * nominal capacity is specified by {@link #CHUNK_CAPACITY}. + * + * @see #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY */ String CHUNK_OF_CHUNKS_CAPACITY = PipelineOp.class.getName() + ".chunkOfChunksCapacity"; @@ -69,6 +73,7 @@ * of {@link IBindingSet}s (default {@value #CHUNK_CAPACITY}). Partial * chunks may be automatically combined into full chunks. * + * @see #DEFAULT_CHUNK_CAPACITY * @see #CHUNK_OF_CHUNKS_CAPACITY */ String CHUNK_CAPACITY = PipelineOp.class.getName() + ".chunkCapacity"; @@ -83,6 +88,8 @@ * for another chunk to combine with the current chunk before returning * the current chunk (default {@value #DEFAULT_CHUNK_TIMEOUT}). This may * be ZERO (0) to disable the chunk combiner. + * + * @see #DEFAULT_CHUNK_TIMEOUT */ String CHUNK_TIMEOUT = PipelineOp.class.getName() + ".chunkTimeout"; @@ -93,6 +100,46 @@ */ int DEFAULT_CHUNK_TIMEOUT = 1000; + /** + * If the estimated rangeCount for an {@link AccessPath#iterator()} is + * LTE this threshold then use a fully buffered (synchronous) iterator. + * Otherwise use an asynchronous iterator whose capacity is governed by + * {@link #CHUNK_OF_CHUNKS_CAPACITY}. + * + * @see #DEFAULT_FULLY_BUFFERED_READ_THRESHOLD + */ + String FULLY_BUFFERED_READ_THRESHOLD = PipelineOp.class.getName() + + ".fullyBufferedReadThreshold"; + + /** + * Default for {@link #FULLY_BUFFERED_READ_THRESHOLD}. + * + * @todo try something closer to the branching factor, e.g., 100. + */ + int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 1000; + + /** + * Flags for the iterator ({@link IRangeQuery#KEYS}, + * {@link IRangeQuery#VALS}, {@link IRangeQuery#PARALLEL}). + * <p> + * Note: The {@link IRangeQuery#PARALLEL} flag here is an indication + * that the iterator may run in parallel across the index partitions. + * This only effects scale-out and only for simple triple patterns since + * the pipeline join does something different (it runs inside the index + * partition using the local index, not the client's view of a + * distributed index). + * + * @see #DEFAULT_FLAGS + */ + String FLAGS = PipelineOp.class.getName() + ".flags"; + + /** + * The default flags will visit the keys and values of the non-deleted + * tuples and allows parallelism in the iterator (when supported). + */ + final int DEFAULT_FLAGS = IRangeQuery.KEYS | IRangeQuery.VALS + | IRangeQuery.PARALLEL; + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -39,7 +39,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; @@ -59,7 +58,6 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.btree.keys.IKeyBuilder; import com.bigdata.counters.CAT; -import com.bigdata.journal.IIndexManager; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; import com.bigdata.relation.accesspath.AccessPath; @@ -73,8 +71,6 @@ import com.bigdata.relation.rule.IStarJoin; import com.bigdata.relation.rule.IStarJoin.IStarConstraint; import com.bigdata.relation.rule.eval.ISolution; -import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; -import com.bigdata.relation.rule.eval.pipeline.JoinMasterTask; import com.bigdata.service.DataService; import com.bigdata.striterator.IChunkedOrderedIterator; import com.bigdata.striterator.IKeyOrder; @@ -95,30 +91,14 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * @todo There is only one source, even if scale-out, and the {@link JoinTask} - * runs only for the duration of that source. The termination conditions - * for query evaluation are handled outside of the operator - * implementation. - * <p> - * The first join dimension always has a single source - the - * initialBindingSet established by the {@link JoinMasterTask}. Downstream - * join dimensions read from {@link IAsynchronousIterator} (s) from the - * upstream join dimension. When the {@link IIndexManager} allows - * key-range partitions, then the fan-in for the sources may be larger - * than one as there will be one {@link JoinTask} for each index partition - * touched by each join dimension. - * - * @todo provide more control over the access path (fully buffered read - * thresholds). - * - * @todo Do we need to hook the source and sink {@link Future}s? - * * @todo Break the star join logic out into its own join operator. * * @todo Implement operator at a time or mega-chunk pipeline operators for high * volume query. These will differ by running across the entire shard on * the right hand operator using multi-block IO each time they process a * (mega-)chunk of bindings from the left hand operator. + * + * @todo Support SLICE via annotations. */ public class PipelineJoin extends AbstractPipelineOp<IBindingSet> implements BindingSetPipelineOp { @@ -334,12 +314,6 @@ */ private static class JoinTask extends Haltable<Void> implements Callable<Void> { -// /** -// * The federation reference is passed along when we evaluate the -// * {@link #left} operand. -// */ -// final protected IBigdataFederation<?> fed; - /** * The join that is being executed. */ @@ -361,15 +335,15 @@ * the failed joined needs to jump out of a join group rather than * routing directly to the ancestor in the operator tree. * - * @todo Support for the {@link #optionalSink} is not finished. When the - * optional target is not simply the direct ancestor in the - * operator tree then we need to have a separate thread local - * buffering in front of the optional sink for the join task. This - * means that we need to use two {@link #threadLocalBufferFactory} - * s, one for the optional path. All of this only matters when the - * binding sets are being routed out of an optional join group. - * When the tails are independent optionals then the target is the - * same as the target for binding sets which do join. + * FIXME Support for the {@link #optionalSink} is not finished. When the + * optional target is not simply the direct ancestor in the operator + * tree then we need to have a separate thread local buffering in front + * of the optional sink for the join task. This means that we need to + * use two {@link #threadLocalBufferFactory} s, one for the optional + * path. All of this only matters when the binding sets are being routed + * out of an optional join group. When the tails are independent + * optionals then the target is the same as the target for binding sets + * which do join. */ final IBlockingBuffer<IBindingSet[]> optionalSink; @@ -406,82 +380,6 @@ */ final protected BOpContext<IBindingSet> context; -// /** -// * Volatile flag is set <code>true</code> if the {@link JoinTask} -// * (including any tasks executing on its behalf) should halt. This flag -// * is monitored by the {@link BindingSetConsumerTask}, the -// * {@link AccessPathTask}, and the {@link ChunkTask}. It is set by any -// * of those tasks if they are interrupted or error out. -// * -// * @todo review handling of this flag. Should an exception always be -// * thrown if the flag is set wrapping the {@link #firstCause}? Are -// * there any cases where the behavior should be different? If not, -// * then replace tests with halt() and encapsulate the logic in -// * that method. -// */ -// volatile protected boolean halt = false; -// -// /** -// * Set by {@link BindingSetConsumerTask}, {@link AccessPathTask}, and -// * {@link ChunkTask} if they throw an error. Tasks are required to use -// * an {@link AtomicReference#compareAndSet(Object, Object)} and must -// * specify <code>null</code> as the expected value. This ensures that -// * only the first cause is recorded by this field. -// */ -// final protected AtomicReference<Throwable> firstCause = new AtomicReference<Throwable>( -// null); -// -// /** -// * Indicate that join processing should halt. This method is written -// * defensively and will not throw anything. -// * -// * @param cause -// * The cause. -// */ -// protected void halt(final Throwable cause) { -// -// halt = true; -// -// final boolean isFirstCause = firstCause.compareAndSet( -// null/* expect */, cause); -// -// if (log.isEnabledFor(Level.WARN)) -// -// try { -// -// if (!InnerCause.isInnerCause(cause, -// InterruptedException.class) -// && !InnerCause.isInnerCause(cause, -// CancellationException.class) -// && !InnerCause.isInnerCause(cause, -// ClosedByInterruptException.class) -// && !InnerCause.isInnerCause(cause, -// RejectedExecutionException.class) -// && !InnerCause.isInnerCause(cause, -// BufferClosedException.class)) { -// -// /* -// * This logs all unexpected causes, not just the first -// * one to be reported for this join task. -// * -// * Note: The master will log the firstCause that it -// * receives as an error. -// */ -// -// log.warn("joinOp=" + joinOp + ", isFirstCause=" -// + isFirstCause + " : " -// + cause.getLocalizedMessage(), cause); -// -// } -// -// } catch (Throwable ex) { -// -// // error in logging system - ignore. -// -// } -// -// } - /** * The statistics for this {@link JoinTask}. */ @@ -797,26 +695,9 @@ } catch (Throwable t) { - try { - logCallError(t); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } - /* * This is used for processing errors and also if this task is - * interrupted (because a SLICE has been satisfied). - * - * @todo For a SLICE, consider that the query solution buffer - * proxy could return the #of solutions added so far so that we - * can halt each join task on the last join dimension in a - * relatively timely manner producing no more than one chunk too - * many (actually, it might not be that timely since some index - * partitions might not produce any solutions; this suggests - * that the master might need a fatter API than a Future for the - * JoinTask so that it can directly notify the JoinTasks for the - * first predicate and they can propagate that notice downstream - * to their sinks). This will be an issue when fanOut GT ONE. + * interrupted (because the sink has been closed). */ halt(t); @@ -836,13 +717,6 @@ log.error(t2.getLocalizedMessage(), t2); } -// // report join stats _before_ we close our source(s). -// try { -// reportOnce(); -// } catch (Throwable t2) { -// log.error(t2.getLocalizedMessage(), t2); -// } - /* * Close source iterators, which will cause any source JoinTasks * that are still executing to throw a CancellationException @@ -857,61 +731,11 @@ throw new RuntimeException(t); - } finally { - -// // report join stats iff they have not already been reported. -// reportOnce(); - } } /** - * Method is used to log the primary exception thrown by {@link #call()} - * . The default implementation does nothing and the exception will be - * logged by the {@link JoinMasterTask}. However, this method is - * overridden by {@link DistributedJoinTask} so that the exception can - * be logged on the host and {@link DataService} where it originates. - * This appears to be necessary in order to trace back the cause of an - * exception which can otherwise be obscured (or even lost?) in a deeply - * nested RMI stack trace. - * - * @param o - * @param t - */ - protected void logCallError(Throwable t) { - - } - -// /** -// * Method reports {@link JoinStats} to the {@link JoinMasterTask}, but -// * only if they have not already been reported. This "report once" -// * constraint is used to make it safe to invoke during error handling -// * before actions which could cause the source {@link JoinTask}s (and -// * hence the {@link JoinMasterTask}) to terminate. -// */ -// protected void reportOnce() { -// -// if (didReport.compareAndSet(false/* expect */, true/* update */)) { -// -//// try { -//// -////// @todo report statistics to the master. -//// masterProxy.report(stats); -//// -//// } catch (IOException ex) { -//// -//// log.warn("Could not report statistics to the master", ex); -//// -//// } -// -// } -// -// } -// -// private final AtomicBoolean didReport = new AtomicBoolean(false); - - /** * Consume {@link IBindingSet} chunks from the {@link #sink}. * * @throws Exception Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -416,7 +416,7 @@ } /** - * Unit test for {@link BOpUtility#getIndex(BOp)}. + * Unit test for {@link BOpUtility#getIndex(BOp)} using valid inputs. */ public void test_getIndex() { @@ -489,7 +489,7 @@ /** * Unit test for {@link BOpUtility#getIndex(BOp)} in which we verify that it - * rejects operator trees operator ids which are not {@link Integer}s. + * rejects operator trees with operator ids which are not {@link Integer}s. */ public void test_getIndex_rejectsNonIntegerIds() { @@ -510,6 +510,65 @@ } /** + * Unit test for {@link BOpUtility#getIndex(BOp)} in which we verify that it + * rejects operator trees in which the same {@link BOp} reference appears + * more than once but allows duplicate {@link IVariable}s and + * {@link IConstant}s. + */ + public void test_getIndex_duplicateBOps() { + + final IConstant<Long> c1 = new Constant<Long>(12L); + final IVariable<?> v1 = Var.var("y"); + + /* + * Operator tree with duplicate variable and duplicate constant refs. + */ + { + // root + final BOp root = new BOpBase(new BOp[] { // root args[] + c1, v1 }, NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, 4),// + new NV("foo", v1), // duplicate variable. + new NV("bar", c1) // duplicate variable. + })); + + // should be Ok. + final Map<Integer, BOp> map = BOpUtility.getIndex(root); + + assertTrue(root == map.get(4)); + + } + + /* + * Operator tree with duplicate bop which is neither a var nor or a + * constant. + */ + { + + /* + * bop w/o bopId is used to verify correct detection of duplicate + * references. + */ + final BOp op2 = new BOpBase(new BOp[]{}, null/*annotations*/); + + // root + final BOp root = new BOpBase(new BOp[] { // root args[] + op2, op2 }, NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, 4),// + })); + + try { + BOpUtility.getIndex(root); + fail("Expecting: " + DuplicateBOpException.class); + } catch (DuplicateBOpException ex) { + if (log.isInfoEnabled()) + log.info("Ignoring expected exception: " + ex); + } + } + + } + + /** * Unit test for {@link BOpUtility#getParent(BOp, BOp)}. */ public void test_getParent() { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -0,0 +1,75 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +/** + * An immutable class capturing the evaluation context of an operator against a + * shard. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class BOpShard { + + public final int bopId; + + public final int shardId; + + public BOpShard(final int bopId, final int shardId) { + + this.bopId = bopId; + + this.shardId = shardId; + + } + + /** + * {@inheritDoc} + * + * @todo verify that this is a decent hash function. + */ + public int hashCode() { + + return (bopId * 31) + shardId; + + } + + public boolean equals(final Object o) { + + if (this == o) + return true; + + if (!(o instanceof BOpShard)) + return false; + + return bopId == ((BOpShard) o).bopId + && shardId == ((BOpShard) o).shardId; + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -0,0 +1,135 @@ +package com.bigdata.bop.engine; + +import java.io.Serializable; +import java.util.UUID; + +/** + * A message sent to the {@link IQueryClient} when an operator is done executing + * for some chunk of inputs. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class HaltOpMessage implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** The identifier of the query. */ + final long queryId; + + /** The identifier of the operator. */ + final int bopId; + + /** + * The index partition identifier against which the operator was + * executing. + */ + final int partitionId; + + /** + * The identifier of the service on which the operator was executing. + */ + final UUID serviceId; + + /** + * * The cause and <code>null</code> if the operator halted normally. + */ + final Throwable cause; + + /** + * The operator identifier for the primary sink -or- <code>null</code> + * if there is no primary sink (for example, if this is the last + * operator in the pipeline). + */ + final Integer sinkId; + + /** + * The number of the {@link BindingSetChunk}s that were output for the + * primary sink. (This information is used for the atomic termination + * decision.) + * <p> + * For a given downstream operator this is ONE (1) for scale-up. For + * scale-out, this is one per index partition over which the + * intermediate results were mapped. + */ + final int sinkChunksOut; + + /** + * The operator identifier for the alternative sink -or- + * <code>null</code> if there is no alternative sink. + */ + final Integer altSinkId; + + /** + * The number of the {@link BindingSetChunk}s that were output for the + * alternative sink. (This information is used for the atomic + * termination decision.) + * <p> + * For a given downstream operator this is ONE (1) for scale-up. For + * scale-out, this is one per index partition over which the + * intermediate results were mapped. It is zero if there was no + * alternative sink for the operator. + */ + final int altSinkChunksOut; + + /** + * The statistics for the execution of the bop against the partition on + * the service. + */ + final BOpStats taskStats; + + /** + * @param queryId + * The query identifier. + * @param bopId + * The operator whose execution phase has terminated for a + * specific index partition and input chunk. + * @param partitionId + * The index partition against which the operator was + * executed. + * @param serviceId + * The node which executed the operator. + * @param cause + * <code>null</code> unless execution halted abnormally. + * @param chunksOut + * A map reporting the #of binding set chunks which were + * output for each downstream operator for which at least one + * chunk of output was produced. + * @param taskStats + * The statistics for the execution of that bop on that shard + * and service. + */ + public HaltOpMessage( + // + final long queryId, final int bopId, final int partitionId, + final UUID serviceId, Throwable cause, // + final Integer sinkId, final int sinkChunksOut,// + final Integer altSinkId, final int altSinkChunksOut,// + final BOpStats taskStats) { + + if (altSinkId != null && sinkId == null) { + // The primary sink must be defined if the altSink is defined. + throw new IllegalArgumentException(); + } + + if (sinkId != null && altSinkId != null + && sinkId.intValue() == altSinkId.intValue()) { + // The primary and alternative sink may not be the same operator. + throw new IllegalArgumentException(); + } + + this.queryId = queryId; + this.bopId = bopId; + this.partitionId = partitionId; + this.serviceId = serviceId; + this.cause = cause; + this.sinkId = sinkId; + this.sinkChunksOut = sinkChunksOut; + this.altSinkId = altSinkId; + this.altSinkChunksOut = altSinkChunksOut; + this.taskStats = taskStats; + } +} \ No newline at end of file Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -2,7 +2,6 @@ import java.rmi.Remote; import java.rmi.RemoteException; -import java.util.UUID; import com.bigdata.bop.BOp; @@ -74,51 +73,17 @@ /** * Notify the client that execution has started for some query, operator, * node, and index partition. - * - * @param queryId - * The query identifier. - * @param opId - * The operator identifier. - * @param partitionId - * The index partition identifier. - * @param serviceId - * The node on which the operator will execute. - * @param nchunks - * The #of chunks which form the input to that operator (for the - * atomic termination condition decision). */ - public void startOp(long queryId, int opId, int partitionId, UUID serviceId, final int nchunks) + public void startOp(StartOpMessage msg) throws RemoteException; /** * Notify the client that execution has halted for some query, operator, - * node and index partition. If execution halted abnormally, then the cause - * is sent as well. - * - * @param queryId - * The query identifier. - * @param opId - * The operator whose execution phase has terminated for a - * specific index partition and input chunk. - * @param partitionId - * The index partition against which the operator was executed. - * @param serviceId - * The node which executed the operator. - * @param cause - * <code>null</code> unless execution halted abnormally. - * @param nchunks - * The #of chunks which were output by the operator (for the - * atomic termination decision). This is ONE (1) for scale-up. - * For scale-out, this is one per index partition over which the - * intermediate results were mapped. - * @param taskStats - * The statistics for the execution of that bop on that shard and - * service. + * node, shard, and source binding set chunk(s). If execution halted + * abnormally, then the cause is sent as well. */ - public void haltOp(long queryId, int opId, int partitionId, UUID serviceId, - Throwable cause, int nchunks, BOpStats taskStats) - throws RemoteException; - + public void haltOp(HaltOpMessage msg) throws RemoteException; + // /** // * Notify the query controller that a chunk of intermediate results is // * available for the query. Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -0,0 +1,156 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.NoSuchBOpException; + +/** + * Utility methods relevant to pipelined operator evaluation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class PipelineUtility { + + private static final Logger log = Logger.getLogger(PipelineUtility.class); + + /** + * Return <code>true</code> iff the <i>runningCountMap</i> AND + * <i>availableChunkMap</i> map are ZERO (0) for both the given operator and + * for all operators which proceed the given operator in the tree structure + * of its operands. + * <p> + * Note: The movement of the intermediate binding set chunks forms an + * acyclic directed graph. We can decide whether or not a {@link BOp} in the + * query plan can be triggered by the current activity pattern by inspecting + * the {@link BOp} and its operands recursively. If neither the {@link BOp} + * nor any of its operands (recursively) has non-zero activity then the + * {@link BOp} can not be triggered and this method will return + * <code>true</code>. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the {@link BOp} can not be triggered given + * the query plan and the activity map. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + */ + static public boolean isDone(final int bopId, final BOp queryPlan, + final Map<Integer, BOp> queryIndex, + final Map<Integer, AtomicLong> runningCountMap, + final Map<Integer, AtomicLong> availableChunkCountMap) { + + if (queryPlan == null) + throw new IllegalArgumentException(); + if (queryIndex == null) + throw new IllegalArgumentException(); + if (availableChunkCountMap == null) + throw new IllegalArgumentException(); + + final BOp op = queryIndex.get(bopId); + + if (op == null) + throw new NoSuchBOpException(bopId); + + final Iterator<BOp> itr = BOpUtility.preOrderIterator(op); + + while (itr.hasNext()) { + + final BOp t = itr.next(); + + final Integer id = (Integer) t.getProperty(BOp.Annotations.BOP_ID); + + if (id == null) + continue; + { + + final AtomicLong runningCount = runningCountMap.get(id); + + if (runningCount != null && runningCount.get() != 0) { + + if (log.isInfoEnabled()) + log.info("Operator can be triggered: op=" + op + + ", possible trigger=" + t + " is running."); + + return false; + + } + + } + + { + + final AtomicLong availableChunkCount = availableChunkCountMap + .get(id); + + if (availableChunkCount != null + && availableChunkCount.get() != 0) { + + if (log.isInfoEnabled()) + log.info("Operator can be triggered: op=" + op + + ", possible trigger=" + t + " has " + + availableChunkCount + " chunks available."); + + return false; + + } + + } + + } + + return true; + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -517,7 +517,7 @@ public void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, long queryId, int bopId) { - // TODO SCALEOUT + // @todo SCALEOUT notify peer when a buffer is ready. } @@ -538,22 +538,28 @@ return null; } - public void startOp(final long queryId, final int opId, - final int partitionId, final UUID serviceId, final int nchunks) - throws RemoteException { - final RunningQuery q = runningQueries.get(queryId); + public void startOp(final StartOpMessage msg) throws RemoteException { + + final RunningQuery q = runningQueries.get(msg.queryId); + if (q != null) { - q.startOp(opId, partitionId, serviceId, nchunks); + + q.startOp(msg); + } + } - public void haltOp(final long queryId, final int opId, - final int partitionId, final UUID serviceId, final Throwable cause, - final int nchunks, final BOpStats taskStats) throws RemoteException { - final RunningQuery q = runningQueries.get(queryId); + public void haltOp(final HaltOpMessage msg) throws RemoteException { + + final RunningQuery q = runningQueries.get(msg.queryId); + if (q != null) { - q.haltOp(opId, partitionId, serviceId, cause, nchunks, taskStats); + + q.haltOp(msg); + } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-01 18:27:08 UTC (rev 3488) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-01 18:27:35 UTC (rev 3489) @@ -29,7 +29,10 @@ import java.nio.ByteBuffer; import java.rmi.RemoteException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -39,8 +42,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -50,6 +53,8 @@ import com.bigdata.bop.BOpUtility; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IPredicate; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.ap.Predicate; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -134,6 +139,86 @@ final IBlockingBuffer<IBindingSet[]> queryBuffer; /** + * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. + */ + private final Map<Integer, BOp> bopIndex; + + /** + * A collection of the currently executing future for operators for this + * query. + */ + private final ConcurrentHashMap<BOpShard, Future<?>> operatorFutures = new ConcurrentHashMap<BOpShard, Future<?>>(); + + /** + * A lock guarding {@link #runningTaskCount}, {@link #availableChunkCount}, + * {@link #availableChunkCountMap}. + */ + private final ReentrantLock runStateLock = new ReentrantLock(); + + /** + * The #of tasks for this query which have started but not yet halted and + * ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long runningTaskCount = 0; + + /** + * The #of chunks for this query of which a running task has made available + * but which have not yet been accepted for processing by another task and + * ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long availableChunkCount = 0; + + /** + * A map reporting the #of chunks available for each operator in the + * pipeline (we only report chunks for pipeline operators). The total #of + * chunks available for any given operator in the pipeline is reported by + * {@link #availableChunkCount}. + * <p> + * The movement of the intermediate binding set chunks forms an acyclic + * directed graph. This map is used to track the #of chunks available for + * each bop in the pipeline. When a bop has no more incoming chunks, we send + * an asynchronous message to all nodes on which that bop had executed + * informing the {@link QueryEngine} on that node that it should immediately + * release all resources associated with that bop. + * <p> + * This is guarded by the {@link #runningStateLock}. + * + * FIXME {@link IConstraint}s for {@link PipelineJoin}, distinct elements + * and other filters for {@link IPredicate}s, conditional routing for + * binding sets in the pipeline (to route around an optional join group + * based on an {@link IConstraint}), and then buffer management for s/o. + * + * @todo SCALEOUT: Life cycle management of the operators and the query + * implies both a per-query bop:NodeList map on the query coordinator + * identifying the nodes on which the query has been executed and a + * per-query bop:ResourceList map identifying the resources associated + * with the execution of that bop on that node. In fact, this could be + * the same {@link #resourceMap} except that we would lose type + * information about the nature of the resource so it is better to + * have distinct maps for this purpose. + */ + private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection reporting on the #of instances of a given {@link BOp} which + * are concurrently executing. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Map<Integer/*bopId*/, AtomicLong/*runningCount*/> runningCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection of the operators which have executed at least once. ... [truncated message content] |