From: <tho...@us...> - 2011-01-02 00:41:41
|
Revision: 4044 http://bigdata.svn.sourceforge.net/bigdata/?rev=4044&view=rev Author: thompsonbry Date: 2011-01-02 00:41:31 +0000 (Sun, 02 Jan 2011) Log Message: ----------- Added a new IRunningQuery implementation based on chaining together operators using a blocking queue in front of each operator. The new implementation is conditionally enabled by an annotation. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestUnion.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBarData.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/OutputStatsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/SinkTransitionBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/SinkTransitionMetadata.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-12-22 17:32:36 UTC (rev 4043) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -413,40 +413,4 @@ // // } -// /** -// * Copy data from the source to the sink. The sink will be flushed and -// * closed. The source will be closed. -// */ -// public void copySourceToSink() { -// -// // source. -// final IAsynchronousIterator<IBindingSet[]> source = (IAsynchronousIterator) getSource(); -// -// // default sink -// final IBlockingBuffer<IBindingSet[]> sink = (IBlockingBuffer) getSink(); -// -// final BOpStats stats = getStats(); -// -// try { -// -// // copy binding sets from the source. -// BOpUtility.copy(source, sink, null/* sink2 */, -// null/* constraints */, stats); -// -// // flush the sink. -// sink.flush(); -// -// } finally { -// -// sink.close(); -// -// if (sink2 != null) -// sink2.close(); -// -// source.close(); -// -// } -// -// } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-12-22 17:32:36 UTC (rev 4043) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -233,13 +233,8 @@ * @todo replaces * {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. * - * @todo Reconcile with IRelation#getAccessPath(IPredicate) once the bop - * conversion is done. It has much of the same logic (this also - * handles remote access paths now). - * * @todo Support mutable relation views (no - just fix truth maintenance). */ -// @SuppressWarnings("unchecked") public <E> IAccessPath<E> getAccessPath(final IRelation<E> relation, final IPredicate<E> predicate) { @@ -252,146 +247,6 @@ return relation.getAccessPath(indexManager/* localIndexManager */, relation.getKeyOrder(predicate), predicate); -// /* -// * Note: ALWAYS use the "perfect" index. -// */ -// final IKeyOrder<E> keyOrder = relation.getKeyOrder(predicate); -//// { -//// final IKeyOrder<E> tmp = predicate.getKeyOrder(); -//// if (tmp != null) { -//// // use the specified index. -//// keyOrder = tmp; -//// } else { -//// // ask the relation for the best index. -//// keyOrder = relation.getKeyOrder(predicate); -//// } -//// } -//// -//// if (keyOrder == null) -//// throw new RuntimeException("No access path: " + predicate); -// -// final int partitionId = predicate.getPartitionId(); -// -// final long timestamp = (Long) predicate -// .getRequiredProperty(BOp.Annotations.TIMESTAMP); -// -// final int flags = predicate.getProperty( -// IPredicate.Annotations.FLAGS, -// IPredicate.Annotations.DEFAULT_FLAGS) -// | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY -// : 0); -// -// final int chunkOfChunksCapacity = predicate.getProperty( -// BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, -// BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); -// -// final int chunkCapacity = predicate.getProperty( -// BufferAnnotations.CHUNK_CAPACITY, -// BufferAnnotations.DEFAULT_CHUNK_CAPACITY); -// -// final int fullyBufferedReadThreshold = predicate.getProperty( -// IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, -// IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); -// -// if (partitionId != -1) { -// -// /* -// * Note: This handles a read against a local index partition. For -// * scale-out, the [indexManager] will be the data service's local -// * index manager. -// * -// * Note: Expanders ARE NOT applied in this code path. Expanders -// * require a total view of the relation, which is not available -// * during scale-out pipeline joins. Likewise, the [backchain] -// * property will be ignored since it is handled by an expander. -// * -// * @todo Replace this with IRelation#getAccessPathForIndexPartition() -// */ -//// return ((AbstractRelation<?>) relation) -//// .getAccessPathForIndexPartition(indexManager, -//// (IPredicate) predicate); -// -// /* -// * @todo This is an error since expanders are currently ignored on -// * shard-wise access paths. While it is possible to enable expanders -// * for shard-wise access paths. -// */ -// if (predicate.getSolutionExpander() != null) -// throw new IllegalArgumentException(); -// -// final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); -// -// // The name of the desired index partition. -// final String name = DataService.getIndexPartitionName(namespace -// + "." + keyOrder.getIndexName(), partitionId); -// -// // MUST be a local index view. -// final ILocalBTreeView ndx = (ILocalBTreeView) indexManager -// .getIndex(name, timestamp); -// -// return new AccessPath<E>(relation, indexManager, timestamp, -// predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, -// chunkCapacity, fullyBufferedReadThreshold).init(); -// -// } -// -//// accessPath = relation.getAccessPath((IPredicate) predicate); -// -// // Decide on a local or remote view of the index. -// final IIndexManager indexManager; -// if (predicate.isRemoteAccessPath()) { -// // use federation in scale-out for a remote access path. -// indexManager = fed != null ? fed : this.indexManager; -// } else { -// indexManager = this.indexManager; -// } -// -// // Obtain the index. -// final String fqn = AbstractRelation.getFQN(relation, keyOrder); -// final IIndex ndx = AbstractRelation.getIndex(indexManager, fqn, timestamp); -// -// if (ndx == null) { -// -// throw new IllegalArgumentException("no index? relation=" -// + relation.getNamespace() + ", timestamp=" + timestamp -// + ", keyOrder=" + keyOrder + ", pred=" + predicate -// + ", indexManager=" + getIndexManager()); -// -// } -// -// // Obtain the access path for that relation and index. -// final IAccessPath<E> accessPath = ((AbstractRelation<E>) relation) -// .newAccessPath(relation, indexManager, timestamp, predicate, -// keyOrder, ndx, flags, chunkOfChunksCapacity, -// chunkCapacity, fullyBufferedReadThreshold); -// -// // optionally wrap with an expander pattern. -// return expander(predicate, accessPath); - } -// /** -// * Optionally wrap with an expander pattern. -// * -// * @param predicate -// * @param accessPath -// * @return -// * @param <E> -// */ -// private <E> IAccessPath<E> expander(final IPredicate<E> predicate, -// final IAccessPath<E> accessPath) { -// -// final ISolutionExpander<E> expander = predicate.getSolutionExpander(); -// -// if (expander != null) { -// -// // allow the predicate to wrap the access path -// return expander.getAccessPath(accessPath); -// -// } -// -// return accessPath; -// -// } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2010-12-22 17:32:36 UTC (rev 4043) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -42,8 +42,8 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.bset.Tee; +import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.engine.RunningQuery; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.util.concurrent.LatchedExecutor; @@ -167,7 +167,7 @@ private final AbstractSubqueryOp controllerOp; private final BOpContext<IBindingSet> context; - private final List<FutureTask<RunningQuery>> tasks = new LinkedList<FutureTask<RunningQuery>>(); + private final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); private final CountDownLatch latch; private final int nparallel; private final Executor executor; @@ -204,7 +204,7 @@ * Task runs subquery and cancels all subqueries in [tasks] if * it fails. */ - tasks.add(new FutureTask<RunningQuery>(new SubqueryTask(op, + tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask(op, context)) { /* * Hook future to count down the latch when the task is @@ -233,7 +233,7 @@ /* * Run subqueries with limited parallelism. */ - for (FutureTask<RunningQuery> ft : tasks) { + for (FutureTask<IRunningQuery> ft : tasks) { executor.execute(ft); } @@ -251,7 +251,7 @@ /* * Get the futures, throwing out any errors. */ - for (FutureTask<RunningQuery> ft : tasks) + for (FutureTask<IRunningQuery> ft : tasks) ft.get(); // Now that we know the subqueries ran Ok, flush the sink. @@ -263,7 +263,7 @@ } finally { // Cancel any tasks which are still running. - for (FutureTask<RunningQuery> ft : tasks) + for (FutureTask<IRunningQuery> ft : tasks) ft.cancel(true/* mayInterruptIfRunning */); context.getSink().close(); @@ -281,7 +281,7 @@ * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> */ - private class SubqueryTask implements Callable<RunningQuery> { + private class SubqueryTask implements Callable<IRunningQuery> { /** * The evaluation context for the parent query. @@ -302,7 +302,7 @@ } - public RunningQuery call() throws Exception { + public IRunningQuery call() throws Exception { IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; try { @@ -310,7 +310,7 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - final RunningQuery runningQuery = queryEngine + final IRunningQuery runningQuery = queryEngine .eval(subQueryOp); // Iterator visiting the subquery solutions. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-12-22 17:32:36 UTC (rev 4043) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -61,9 +61,9 @@ import com.bigdata.bop.PipelineOp; import com.bigdata.bop.ap.SampleIndex; import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.bop.rdf.join.DataSetJoin; @@ -1061,7 +1061,7 @@ // run the cutoff sampling of the edge. final UUID queryId = UUID.randomUUID(); - final RunningQuery runningQuery = queryEngine.eval(queryId, + final IRunningQuery runningQuery = queryEngine.eval(queryId, queryOp, new LocalChunkMessage<IBindingSet>(queryEngine, queryId, joinOp.getId()/* startId */, -1 /* partitionId */, @@ -2834,7 +2834,7 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - final RunningQuery runningQuery = queryEngine + final IRunningQuery runningQuery = queryEngine .eval( queryId, queryOp, Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -0,0 +1,1000 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * Created on Dec 30, 2010 + */ + +package com.bigdata.bop.engine; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.Haltable; + +/** + * Abstract base class for various {@link IRunningQuery} implementations. The + * purpose of this class is to isolate aspects common to different designs for + * managing resources for a running query and make it easier to realize + * different strategies for managing the resources allocated to a running query. + * <p> + * There are common requirements for the {@link IRunningQuery}, but a variety of + * ways in which those requirements can be met. Among the common requirements + * are a means to manage tradeoffs in the allocation of various resources to the + * operators in each query. Some of the more important tradeoffs are the #of + * threads to allocate to each operator (threads bounds IO for Java 6 since we + * are using a synchronous IO model) and the amount of RAM allocated to each + * operator (including RAM on the JVM heap and RAM on the native Java process + * heap). If the #of threads is too restrictive, then queries will progress + * slowly due to insufficient IO level parallelism. If the query buffers too + * much data on the JVM heap, then it can cause GC overhead problems that can + * drastically reduce the responsiveness and throughput of the JVM. Data can be + * moved off of the JVM heap onto the Java process heap by serializing it into + * <em>direct</em> {@link ByteBuffer}s. This can be very efficient in + * combination with hash joins at the expense of increasing the latency to the + * first result when compared with pipelined evaluation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +abstract public class AbstractRunningQuery implements IRunningQuery { + + /** + * Error message used when an operation which must be performed on the query + * controller is attempted on some other {@link IQueryPeer}. + */ + protected static final String ERR_NOT_CONTROLLER = "Operator only permitted on the query controller"; + + /** + * Error message used when a request is made after the query has stopped + * executing. + */ + protected static final String ERR_QUERY_DONE = "Query is no longer running"; + + /** + * Error message used when a request is addressed to an operator other than + * the head of the pipeline in a context where the request must be addressed + * to the operator at the head of the pipeline (e.g., when presenting the + * initial binding sets to get the query moving.) + */ + protected static final String ERR_NOT_PIPELINE_START = "Not pipeline start"; + + /** + * Error message used when no operator can be found for a given + * {@link BOp.Annotations#BOP_ID}. + */ + protected static final String ERR_NO_SUCH_BOP = "No such bop: id="; + + /** + * Error message used when two operators have the same + * {@link BOp.Annotations#BOP_ID}. + */ + protected static final String ERR_DUPLICATE_IDENTIFIER = "Duplicate identifier: id="; + + private final static transient Logger log = Logger + .getLogger(AbstractRunningQuery.class); + + /** + * The class executing the query on this node. + */ + final private QueryEngine queryEngine; + + /** The unique identifier for this query. */ + final private UUID queryId; + + /** + * The query deadline. The value is the system clock time in milliseconds + * when the query is due and {@link Long#MAX_VALUE} if there is no deadline. + * In order to have a guarantee of a consistent clock, the deadline is + * interpreted by the query controller. + */ + final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); + + /** + * The timestamp (ms) when the query begins to execute. + */ + final private AtomicLong startTime = new AtomicLong(System + .currentTimeMillis()); + + /** + * The timestamp (ms) when the query is done executing and ZERO (0L) if the + * query is not done. + */ + final private AtomicLong doneTime = new AtomicLong(0L); + + /** + * <code>true</code> iff the outer {@link QueryEngine} is the controller for + * this query. + */ + final private boolean controller; + + /** + * The client executing this query (aka the query controller). + * <p> + * Note: The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. + */ + final private IQueryClient clientProxy; + + /** The query. */ + final private PipelineOp query; + + /** + * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. This + * index is generated by the constructor. It is immutable and thread-safe. + */ + private final Map<Integer, BOp> bopIndex; + + /** + * The run state of the query and the result of the computation iff it + * completes execution normally (without being interrupted, cancelled, etc). + */ + final private Haltable<Void> future = new Haltable<Void>(); + + /** + * The {@link Future} of this query. + * <p> + * Note: This is exposed to the {@link QueryEngine} to let it cache the + * {@link Future} for recently finished queries. + */ + final protected Future<Void> getFuture() { + + return future; + + } + + /** + * The runtime statistics for each {@link BOp} in the query and + * <code>null</code> unless this is the query controller. + */ + final private ConcurrentHashMap<Integer/* bopId */, BOpStats> statsMap; + + /** + * The buffer used for the overall output of the query pipeline. + * <p> + * Note: This only exists on the query controller, and then only when the + * top-level operator is not a mutation. In order to ensure that the results + * are transferred to the query controller in scale-out, the top-level + * operator in the query plan must specify + * {@link BOpEvaluationContext#CONTROLLER}. For example, {@link SliceOp} + * uses this {@link BOpEvaluationContext}. + */ + final private IBlockingBuffer<IBindingSet[]> queryBuffer; + + /** + * The iterator draining the {@link #queryBuffer} and <code>null</code> iff + * the {@link #queryBuffer} is <code>null</code>. + */ + final private IAsynchronousIterator<IBindingSet[]> queryIterator; + + /** + * A lock guarding various state changes. This guards changes to the + * internal state of the {@link #runState} object. It is also used to + * serialize requests to {@link #acceptChunk(IChunkMessage)} and + * {@link #cancel(boolean)} and make atomic decision concerning whether to + * attach a new {@link IChunkMessage} to an operator task which is already + * running or to start a new task for that message. + * + * @see RunState + */ + protected final ReentrantLock lock = new ReentrantLock(); + + /** + * The run state of this query and <code>null</code> unless this is the + * query controller. + */ + final private RunState runState; + + /** + * Flag used to prevent retriggering of {@link #lifeCycleTearDownQuery()}. + */ + private final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); + + /** + * Set the query deadline. The query will be cancelled when the deadline is + * passed. If the deadline is passed, the query is immediately cancelled. + * + * @param deadline + * The deadline. + * @throws IllegalArgumentException + * if the deadline is non-positive. + * @throws IllegalStateException + * if the deadline was already set. + * @throws UnsupportedOperationException + * unless node is the query controller. + */ + final public void setDeadline(final long deadline) { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (deadline <= 0) + throw new IllegalArgumentException(); + + // set the deadline. + if (!this.deadline + .compareAndSet(Long.MAX_VALUE/* expect */, deadline/* update */)) { + + // the deadline is already set. + throw new IllegalStateException(); + + } + + if (deadline < System.currentTimeMillis()) { + + // deadline has already expired. + halt(new TimeoutException()); + + } + + } + + final public long getDeadline() { + + return deadline.get(); + + } + + final public long getStartTime() { + + return startTime.get(); + + } + + final public long getDoneTime() { + + return doneTime.get(); + + } + + final public long getElapsed() { + + long mark = doneTime.get(); + + if (mark == 0L) + mark = System.currentTimeMillis(); + + return mark - startTime.get(); + + } + + /** + * Return the buffer used for the overall output of the query pipeline and + * <code>null</code> if this is not the query controller. + */ + final protected IBlockingBuffer<IBindingSet[]> getQueryBuffer() { + + return queryBuffer; + + } + + public QueryEngine getQueryEngine() { + + return queryEngine; + + } + + /** + * The client executing this query (aka the query controller). + * <p> + * Note: The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. + */ + final public IQueryClient getQueryController() { + + return clientProxy; + + } + + /** + * The unique identifier for this query. + */ + final public UUID getQueryId() { + + return queryId; + + } + + /** + * Return the operator tree for this query. + */ + final public PipelineOp getQuery() { + + return query; + + } + + /** + * Return <code>true</code> iff this is the query controller. + */ + final public boolean isController() { + + return controller; + + } + + final public Map<Integer/* bopId */, BOpStats> getStats() { + + return Collections.unmodifiableMap(statsMap); + + } + + /** + * Return the {@link BOpStats} instance associated with the given + * {@link BOp} identifier. + * + * @param bopId + * The {@link BOp} identifier. + * + * @return The associated {@link BOpStats} object -or- <code>null</code> if + * there is no entry for that {@link BOp} identifier. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code>. + */ + final public BOpStats getStats(final Integer bopId) { + + if (bopId == null) + throw new IllegalArgumentException(); + + return statsMap.get(bopId); + + } + + final public Map<Integer, BOp> getBOpIndex() { + + return bopIndex; + + } + + /** + * @param queryEngine + * The {@link QueryEngine} on which the query is running. In + * scale-out, a query is typically instantiated on many + * {@link QueryEngine}s. + * @param queryId + * The identifier for that query. + * @param controller + * <code>true</code> iff the {@link QueryEngine} is the query + * controller for this query (the {@link QueryEngine} which will + * coordinate the query evaluation). + * @param clientProxy + * The query controller. In standalone, this is the same as the + * <i>queryEngine</i>. In scale-out, this is an RMI proxy for the + * query controller whenever the query is instantiated on a node + * other than the query controller itself. + * @param query + * The query. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws IllegalArgumentException + * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} + * (queries may not read on the unisolated indices). + * @throws IllegalArgumentException + * if the <i>writeTimestamp</i> is neither + * {@link ITx#UNISOLATED} nor a read-write transaction + * identifier. + */ + public AbstractRunningQuery(final QueryEngine queryEngine, + final UUID queryId, final boolean controller, + final IQueryClient clientProxy, final PipelineOp query) { + + if (queryEngine == null) + throw new IllegalArgumentException(); + + if (queryId == null) + throw new IllegalArgumentException(); + + if (clientProxy == null) + throw new IllegalArgumentException(); + + if (query == null) + throw new IllegalArgumentException(); + + this.queryEngine = queryEngine; + + this.queryId = queryId; + + this.controller = controller; + + this.clientProxy = clientProxy; + + this.query = query; + + this.bopIndex = BOpUtility.getIndex(query); + + /* + * Setup the BOpStats object for each pipeline operator in the query. + */ + if (controller) { + + runState = new RunState(this); + + statsMap = new ConcurrentHashMap<Integer, BOpStats>(); + + populateStatsMap(query); + + /* + * FIXME Review the concept of mutation queries. It used to be that + * queries could only either read or write. Now we have access paths + * which either read or write and each query could use zero or more + * such access paths. + */ + if (true/* !query.isMutation() */) { + + // read-only query. + + final BOpStats queryStats = statsMap.get(query.getId()); + + queryBuffer = new BlockingBufferWithStats<IBindingSet[]>(query, + queryStats); + + queryIterator = new QueryResultIterator<IBindingSet[]>(this, + queryBuffer.iterator()); + + // } else { + // + // // Note: Not used for mutation queries. + // queryBuffer = null; + // queryIterator = null; + + } + + } else { + + runState = null; // Note: only on the query controller. + statsMap = null; // Note: only on the query controller. + queryBuffer = null; // Note: only on the query controller. + queryIterator = null; // Note: only when queryBuffer is defined. + + } + + } + + /** + * Pre-populate a map with {@link BOpStats} objects for the query. Only the + * child operands are visited. Operators in subqueries are not visited since + * they will be assigned {@link BOpStats} objects when they are run as a + * subquery. + * + * @see BOp.Annotations#CONTROLLER + */ + private void populateStatsMap(final BOp op) { + + if (!(op instanceof PipelineOp)) + return; + + final PipelineOp bop = (PipelineOp) op; + + final int bopId = bop.getId(); + + statsMap.put(bopId, bop.newStats()); + + if (!op.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)) { + /* + * Visit children, but not if this is a CONTROLLER operator since + * its children belong to a subquery. + */ + for (BOp t : op.args()) { + // visit children (recursion) + populateStatsMap(t); + } + } + + } + + /** + * Message provides notice that the query has started execution and will + * consume some specific number of binding set chunks. + * + * @param msg + * The initial message presented to the query. The message is + * used to update the query {@link RunState}. However, the + * message will not be consumed until it is presented to + * {@link #acceptChunk(IChunkMessage)} by the {@link QueryEngine} + * . + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + final protected void startQuery(final IChunkMessage<IBindingSet> msg) { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (msg == null) + throw new IllegalArgumentException(); + + if (!queryId.equals(msg.getQueryId())) + throw new IllegalArgumentException(); + + lock.lock(); + + try { + + runState.startQuery(msg); + + lifeCycleSetUpQuery(); + + } catch (TimeoutException ex) { + + halt(ex); + + } finally { + + lock.unlock(); + + } + + } + + /** + * Message provides notice that the operator has started execution and will + * consume some specific number of binding set chunks. + * + * @param msg + * The {@link StartOpMessage}. + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + final protected void startOp(final StartOpMessage msg) { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (msg == null) + throw new IllegalArgumentException(); + + if (!queryId.equals(msg.queryId)) + throw new IllegalArgumentException(); + + lock.lock(); + + try { + + if (runState.startOp(msg)) + lifeCycleSetUpOperator(msg.bopId); + + } catch (TimeoutException ex) { + + halt(ex); + + } finally { + + lock.unlock(); + + } + + } + + /** + * Message provides notice that the operator has ended execution. The + * termination conditions for the query are checked. (For scale-out, the + * node node controlling the query needs to be involved for each operator + * start/stop in order to make the termination decision atomic). + * + * @param msg + * The {@link HaltOpMessage} + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + final protected void haltOp(final HaltOpMessage msg) { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (msg == null) + throw new IllegalArgumentException(); + + if (!queryId.equals(msg.queryId)) + throw new IllegalArgumentException(); + + lock.lock(); + + try { + + // update per-operator statistics. + final BOpStats tmp = statsMap.putIfAbsent(msg.bopId, msg.taskStats); + + // combine stats, but do not combine a stats object with itself. + if (tmp != null && tmp != msg.taskStats) { + tmp.add(msg.taskStats); + } + + if (runState.haltOp(msg)) { + + /* + * No more chunks can appear for this operator so invoke its end + * of life cycle hook. + */ + + lifeCycleTearDownOperator(msg.bopId); + + if (runState.isAllDone()) { + + // Normal termination. + halt(); + + } + + } + + } catch (Throwable t) { + + halt(t); + + } finally { + + lock.unlock(); + + } + + } + + /** + * Hook invoked the first time the given operator is evaluated for the + * query. This may be used to set up life cycle resources for the operator, + * such as a distributed hash table on a set of nodes identified by + * annotations of the operator. + * + * @param bopId + * The operator identifier. + */ + protected void lifeCycleSetUpOperator(final int bopId) { + + if (log.isTraceEnabled()) + log.trace("queryId=" + queryId + ", bopId=" + bopId); + + } + + /** + * Hook invoked the after the given operator has been evaluated for the + * query for what is known to be the last time. This may be used to tear + * down life cycle resources for the operator, such as a distributed hash + * table on a set of nodes identified by annotations of the operator. + * + * @param bopId + * The operator identifier. + */ + protected void lifeCycleTearDownOperator(final int bopId) { + + if (log.isTraceEnabled()) + log.trace("queryId=" + queryId + ", bopId=" + bopId); + + } + + /** + * Hook invoked the before any operator is evaluated for the query. This may + * be used to set up life cycle resources for the query. + */ + protected void lifeCycleSetUpQuery() { + + if (log.isTraceEnabled()) + log.trace("queryId=" + queryId); + + } + + /** + * Hook invoked when the query terminates. This may be used to tear down + * life cycle resources for the query. + */ + protected void lifeCycleTearDownQuery() { + + if (log.isTraceEnabled()) + log.trace("queryId=" + queryId); + + } + + /** + * Make a chunk of binding sets available for consumption by the query. + * <p> + * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} + * + * @param msg + * The chunk. + * + * @return <code>true</code> if the message was accepted. + * + * @todo Reconcile {@link #acceptChunk(IChunkMessage)} and + * {@link #consumeChunk()}. Why {@link #consumeChunk()} is also used + * by the {@link QueryEngine}. + */ + abstract protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg); + + /** + * Instruct the {@link IRunningQuery} to consume an {@link IChunkMessage} + * already on its input queue. + */ + abstract protected void consumeChunk(); + + final public IAsynchronousIterator<IBindingSet[]> iterator() { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (queryIterator == null) + throw new UnsupportedOperationException(); + + return queryIterator; + + } + + public void halt() { + + lock.lock(); + + try { + + // signal normal completion. + future.halt((Void) null); + + // interrupt anything which is running. + cancel(true/* mayInterruptIfRunning */); + + } finally { + + lock.unlock(); + + } + + } + + public Throwable halt(final Throwable t) { + + if (t == null) + throw new IllegalArgumentException(); + + lock.lock(); + + try { + + if (!InnerCause.isInnerCause(t, InterruptedException.class)) + log.error(toString(), t); + + try { + + // signal error condition. + return future.halt(t); + + } finally { + + // interrupt anything which is running. + cancel(true/* mayInterruptIfRunning */); + + } + + } finally { + + lock.unlock(); + + } + + } + + /** + * {@inheritDoc} + * <p> + * Cancelled queries : + * <ul> + * <li>must reject new chunks</li> + * <li>must cancel any running operators</li> + * <li>must not begin to evaluate operators</li> + * <li>must release all of their resources</li> + * <li>must not cause the solutions to be discarded before the client can + * consume them.</li> + * </ul> + */ + final public boolean cancel(final boolean mayInterruptIfRunning) { + lock.lock(); + try { + // halt the query. + boolean cancelled = future.cancel(mayInterruptIfRunning); + if (didQueryTearDown + .compareAndSet(false/* expect */, true/* update */)) { + /* + * Do additional cleanup exactly once. + */ + // cancel any running operators for this query on this node. + cancelled |= cancelRunningOperators(mayInterruptIfRunning); + if (controller) { + // cancel query on other peers. + cancelled |= cancelQueryOnPeers(future.getCause()); + } + if (queryBuffer != null) { + /* + * Close the query buffer so the iterator draining the query + * results will recognize that no new results will become + * available. + */ + queryBuffer.close(); + } + // life cycle hook for the end of the query. + lifeCycleTearDownQuery(); + // mark done time. + doneTime.set(System.currentTimeMillis()); + // log summary statistics for the query. + if (isController()) + QueryLog.log(this); + } + // remove from the collection of running queries. + queryEngine.halt(this); + // true iff we cancelled something. + return cancelled; + } finally { + lock.unlock(); + } + } + + /** + * Cancel any running operators for this query on this node (internal API). + * <p> + * Note: This will wind up invoking the tear down methods for each operator + * which was running or which could have been re-triggered. + * + * @return <code>true</code> if any operators were cancelled. + */ + abstract protected boolean cancelRunningOperators( + final boolean mayInterruptIfRunning); + + // { + // boolean cancelled = false; + // + // final Iterator<ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>> fitr = + // operatorFutures.values().iterator(); + // + // while (fitr.hasNext()) { + // + // final ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask> set = + // fitr.next(); + // + // for(ChunkFutureTask f : set.keySet()) { + // + // if (f.cancel(mayInterruptIfRunning)) + // cancelled = true; + // + // } + // + // } + // + // return cancelled; + // + // } + + /** + * Cancel the query on each node where it is known to be running. + * <p> + * Note: The default implementation verifies that the caller is holding the + * {@link #lock} but is otherwise a NOP. This is overridden for scale-out. + * + * @param cause + * When non-<code>null</code>, the cause. + * + * @return <code>true</code> iff something was cancelled. + * + * @throws IllegalMonitorStateException + * unless the {@link #lock} is held by the current thread. + * @throws UnsupportedOperationException + * unless this is the query controller. + */ + protected boolean cancelQueryOnPeers(final Throwable cause) { + + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + + if (!lock.isHeldByCurrentThread()) + throw new IllegalMonitorStateException(); + + return false; + + } + + final public Void get() throws InterruptedException, ExecutionException { + + return future.get(); + + } + + final public Void get(long arg0, TimeUnit arg1) + throws InterruptedException, ExecutionException, TimeoutException { + + return future.get(arg0, arg1); + + } + + final public boolean isCancelled() { + + return future.isCancelled(); + + } + + final public boolean isDone() { + + return future.isDone(); + + } + + final public Throwable getCause() { + + return future.getCause(); + + } + + public IBigdataFederation<?> getFederation() { + + return queryEngine.getFederation(); + + } + + public IIndexManager getIndexManager() { + + return queryEngine.getIndexManager(); + + } + + public String toString() { + final StringBuilder sb = new StringBuilder(getClass().getName()); + sb.append("{queryId=" + queryId); + sb.append(",deadline=" + deadline.get()); + sb.append(",isDone=" + isDone()); + sb.append(",isCancelled=" + isCancelled()); + sb.append(",runState=" + runState); + sb.append(",controller=" + controller); + sb.append(",clientProxy=" + clientProxy); + sb.append(",query=" + query); + sb.append("}"); + return sb.toString(); + } + + // abstract protected IChunkHandler getChunkHandler(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java 2010-12-22 17:32:36 UTC (rev 4043) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -39,7 +39,11 @@ * to the buffer. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ + * @version $Id: BlockingBufferWithStats.java 3838 2010-10-22 19:45:33Z + * thompsonbry $ + * + * @todo replace with {@link OutputStatsBuffer}? (It is still used by the + * {@link ChunkedRunningQuery} and by the query output buffer.) */ public class BlockingBufferWithStats<E> extends BlockingBuffer<E> { Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java (from rev 4039, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-02 00:41:31 UTC (rev 4044) @@ -0,0 +1,1592 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + */ +/* + * Created on Aug 31, 2010 + */ +package com.bigdata.bop.engine; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.NoSuchBOpException; +import com.bigdata.bop.PipelineOp; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.BufferClosedException; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; +import com.bigdata.relation.accesspath.MultiSourceSequentialAsynchronousIterator; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.Memoizer; + +/** + * {@link IRunningQuery} implementation based on the assignment of + * {@link IChunkMessage}(s) to an operator task. Operators (other than those + * with "operator-at-once" evaluation semantics) will typically executed + * multiple times, consuming at least one {@link IChunkMessage} each time they + * are evaluated. {@link IChunkMessage}s target a specific operator (bopId) and + * shard (shardId). In scale-out, binding sets will be mapped across the target + * access path and may be replicated to one or more nodes depending on the + * distribution of the shards. This evaluation strategy is compatible with both + * the {@link Journal} (aka standalone) and the {@link IBigdataFederation} (aka + * clustered or scale-out). + * + * @todo The challenge with this implementation is managing the amount of data + * buffered on the JVM heap without introducing control structures which + * can result in deadlock or starvation. One way to manage this is to move + * the data off of the JVM heap onto direct ByteBuffers and then + * potentially spilling blocks to disk, e.g., using an RWStore based cache + * pattern. + */ +public class ChunkedRunningQuery extends AbstractRunningQuery { + + private final static transient Logger log = Logger + .getLogger(ChunkedRunningQuery.class); + + /** + * Logger for the {@link ChunkTask}. + */ + private final static Logger chunkTaskLog = Logger + .getLogger(ChunkTask.class); + +// /** +// * The maximum number of operator tasks which may be concurrently executed +// * for a given (bopId,shardId). +// * +// * @see QueryEngineTestAnnotations#MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD +// */ +// final private int maxConcurrentTasksPerOperatorAndShard; + +// /** +// * The maximum #of concurrent tasks for this query across all operators and +// * shards. +// * +// * Note: This is not a safe option and MUST be removed. It is possible for +// * N-1 tasks to backup with the Nth task not running due to concurrent +// * execution of some of the N-t tasks. +// */ +// final private int maxConcurrentTasks = 10; + + /* + * FIXME Explore the use of this semaphore to limit the maximum #of messages + * further. (Note that placing a limit on messages would allow us to buffer + * potentially many chunks. That could be solved by making LocalChunkMessage + * transparent in terms of the #of chunks or _binding_sets_ which it is + * carrying, but let's take this one step at a time). + * + * The first issue is ensuring that the query continue to make progress when + * a semaphore with a limited #of permits is introduced. This is because the + * ChunkFutureTask only attempts to schedule the next task for a given + * (bopId,shardId) but we could have failed to accept outstanding work for + * any of a number of operator/shard combinations. Likewise, the QueryEngine + * tells the RunningQuery to schedule work each time a message is dropped + * onto the QueryEngine, but the signal to execute more work is lost if the + * permits were not available immediately. + * + * One possibility would be to have a delayed retry. Another would be to + * have ChunkTaskFuture try to run *any* messages, not just messages for the + * same (bopId,shardId). + * + * Also, when scheduling work, there needs to be some bias towards the + * downstream operators in the query plan in order to ensure that they get a + * chance to clear work from upstream operators. This suggests that we might + * carry an order[] and use it to scan the work queue -- or make the work + * queue a priority heap using the order[] to place a primary sort over the + * bopIds in terms of the evaluation order and letting the shardIds fall in + * increasing shard order so we have a total order for the priority heap (a + * total order may also require a tie breaker, but I think that the priority + * heap allows ties). + * + * This concept of memory overhead and permits would be associated with the + * workload waiting on a given node for processing. (In scale-out, we do not + * care how much data is moving in the cluster, only how much data is + * challenging an individual machine). + * + * This emphasize again why we need to get the data off of the Java heap. + * + * The same concept should apply for chained buffers. Maybe one way to do + * this is to allocate a fixed budget to each query for the Java heap and + * the C heap and then the query blocks or goes to disk. + */ +// /** +// * The maximum number of binding sets which may be outstanding before a task +// * which is producing binding sets will block. This value may be used to +// * limit the memory demand of a query in which some operators produce +// * binding sets faster than other operators can consume them. +// * +// * @todo This could be generalized to consider the Java heap separately from +// * the native heap as we get into the use of native ByteBuffers to +// * buffer intermediate results. +// * +// * @todo This is expressed in terms of messages and not {@link IBindingSet}s +// * because the {@link LocalChunkMessage} does not self-report the #of +// * {@link IBindingSet}s (or chunks). [It should really be bytes on the +// * heap even if we can count binding sets and #s of bindings, but we +// * do not serialize all binding sets so we have to have one measure +// * for serialized and one measure for live objects.] +// */ +// final private int maxOutstandingMessageCount = 100; +// +// /** +// * A counting semaphore used to limit the #of outstanding binding set chunks +// * which may be buffered before a producer will block when trying to emit +// * another chunk. +// * +// * @see HandleChunkBuffer#outputChunk(IBindingSet[]) +// * @see #scheduleNext(BSBundle) +// * +// * @see #maxOutstandingMessageCount +// */ +// final private Semaphore outstandingMessageSemaphore = new Semaphore(maxOutstandingMessageCount); + + /** + * A collection of (bopId,partitionId) keys mapped onto a collection of + * operator task evaluation contexts for currently executing operators for + * this query. + */ + private final ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>> operatorFutures; + + /** + * A map of unbounded work queues for each (bopId,partitionId). Empty queues + * are removed from the map. + * <p> + * The map is guarded by the {@link #lock}. + */ + private final Map<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> operatorQueues; + +// /** +// * When running in stand alone, we can chain together the operators and have +// * much higher throughput. Each operator has an {@link BlockingBuffer} which +// * is essentially its input queue. The operator will drain its input queue +// * using {@link BlockingBuffer#iterator()}. +// * <p> +// * Each operator closes its {@link IBlockingBuffer} sink(s) once its own +// * source has been closed and it has finished processing that source. Since +// * multiple producers can target the same operator, we need a means to +// * ensure that the source for the target operator is not closed until each +// * producer which targets that operator has closed its corresponding sink. +// * <p> +// * In order to support this many-to-one producer/consumer pattern, we wrap +// * the input queue (a {@link BlockingBuffer}) for each operator having +// * multiple sources wi... [truncated message content] |