From: <tho...@us...> - 2010-10-20 18:39:13
|
Revision: 3834 http://bigdata.svn.sourceforge.net/bigdata/?rev=3834&view=rev Author: thompsonbry Date: 2010-10-20 18:39:05 +0000 (Wed, 20 Oct 2010) Log Message: ----------- Modified BOpContext to support IMultiSourceAsynchronousIterator so we can attach sources to already running tasks. Modified BOpContextBase to hold a hard reference to the Executor to avoid problems with errors reported up from the IIndexManager if it has been closed. Since the caller now has access to the Executor after the IIndexManager is closed, the relevant code in RunningQuery now sees a RejectedExecutionException rather than an IllegalStateException. Modified RunningQuery to attach new chunks to already running tasks, at least in standalone. There is more work that needs to be done here which falls broadly under the category of performance optimizations of the query engine. This optimization is not yet available in scale-out because an RMI is necessary back to the controller and that should not happen in the QueryEngine's run Thread. There is a known problem with high volume queries, such as LUBM Q9 on U10 or above, that they can block. What appears to be happening is that a join is running into a bounded queue (a BlockingBuffer with a limited capacity). The code needs to be modified either to use an unbounded queue (potentially backed by a direct ByteBuffer), or to emit multiple IChunkMessages (this option was historically used in scale-out), or to chain the consumers and producers together as we did historically in the trunk (this option is very efficient in standalone). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-20 18:32:25 UTC (rev 3833) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-20 18:39:05 UTC (rev 3834) @@ -32,14 +32,14 @@ import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.bop.engine.RunningQuery; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; +import com.bigdata.relation.accesspath.MultiSourceSequentialAsynchronousIterator; import com.bigdata.service.IBigdataFederation; -import com.ibm.icu.impl.ByteBuffer; /** * The evaluation context for the operator (NOT serializable). @@ -57,7 +57,7 @@ private final BOpStats stats; - private final IAsynchronousIterator<E[]> source; + private final IMultiSourceAsynchronousIterator<E[]> source; private final IBlockingBuffer<E[]> sink; @@ -93,28 +93,31 @@ /** * Where to read the data to be consumed by the operator. - * - * @todo Since joins now run from locally materialized data in all cases the - * API could be simplified somewhat given that we know that there will - * be a single "source" chunk of binding sets. Also, the reason for - * the {@link IAsynchronousIterator} here is that a downstream join - * could error (or satisfy a slice) and halt the upstream joins. That - * is being coordinated through the {@link RunningQuery} now. - * <p> - * It is not yet clear what the right API is for the source. The - * iterator model might be just fine, but might not need to be - * asynchronous and does not need to be closeable. - * <p> - * Perhaps the right thing is to expose an object with a richer API - * for obtaining various kinds of iterators or even access to the - * direct {@link ByteBuffer}s backing the data (for high volume joins, - * external merge sorts, etc). */ public final IAsynchronousIterator<E[]> getSource() { return source; } /** + * Attach another source. The decision to attach the source is mutex with + * respect to the decision that the source reported by {@link #getSource()} + * is exhausted. + * + * @param source + * The source. + * + * @return <code>true</code> iff the source was attached. + */ + public boolean addSource(IAsynchronousIterator<E[]> source) { + + if (source == null) + throw new IllegalArgumentException(); + + return this.source.add(source); + + } + + /** * Where to write the output of the operator. * * @see PipelineOp.Annotations#SINK_REF @@ -199,7 +202,7 @@ throw new IllegalArgumentException(); this.partitionId = partitionId; this.stats = stats; - this.source = source; + this.source = new MultiSourceSequentialAsynchronousIterator<E[]>(source); this.sink = sink; this.sink2 = sink2; // may be null } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-10-20 18:32:25 UTC (rev 3833) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-10-20 18:39:05 UTC (rev 3834) @@ -60,6 +60,11 @@ */ private final IIndexManager indexManager; + /** + * The executor service. + */ + private final Executor executor; + /** * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs * against the local indices. In scale-out, query evaluation proceeds shard @@ -88,13 +93,13 @@ * <em>local</em> {@link #getIndexManager() index manager}. */ public final Executor getExecutorService() { - return indexManager.getExecutorService(); + return executor; } public BOpContextBase(final QueryEngine queryEngine) { this(queryEngine.getFederation(), queryEngine.getIndexManager()); - + } /** @@ -119,6 +124,9 @@ this.fed = fed; this.indexManager = indexManager; + + this.executor = indexManager == null ? null : indexManager + .getExecutorService(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-20 18:32:25 UTC (rev 3833) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-20 18:39:05 UTC (rev 3834) @@ -38,7 +38,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; @@ -366,6 +365,16 @@ } /** + * {@link QueryEngine}s are using with a singleton pattern. They must be + * torn down automatically once they are no longer reachable. + */ + @Override + protected void finalize() throws Throwable { + shutdownNow(); + super.finalize(); + } + + /** * The service on which we run the query engine. This is started by {@link #init()}. */ private final AtomicReference<ExecutorService> engineService = new AtomicReference<ExecutorService>(); @@ -430,35 +439,12 @@ private class QueryEngineTask implements Runnable { public void run() { if(log.isInfoEnabled()) - log.info("running: " + this); + log.info("Running: " + this); while (true) { try { final RunningQuery q = priorityQueue.take(); - final UUID queryId = q.getQueryId(); - if (q.isCancelled()) - continue; - final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll(); - if (chunk == null) - continue; - if (log.isTraceEnabled()) - log.trace("Accepted chunk: " + chunk); - try { - // create task. - final FutureTask<?> ft = q.newChunkTask(chunk); - if (log.isDebugEnabled()) - log.debug("Running chunk: " + chunk); - // execute task. - execute(ft); - } catch (RejectedExecutionException ex) { - // shutdown of the pool (should be an unbounded - // pool). - log.warn("Dropping chunk: queryId=" + queryId); - continue; - } catch (Throwable ex) { - // halt that query. - q.halt(ex); - continue; - } + if (!q.isDone()) + q.consumeChunk(); } catch (InterruptedException e) { /* * Note: Uncomment the stack trace here if you want to find Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-20 18:32:25 UTC (rev 3833) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-20 18:39:05 UTC (rev 3834) @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; @@ -195,7 +196,7 @@ * readily exposed as {@link Map} object. If we were to expose the map, it * would have to be via a get(key) style interface. */ - /* private */final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningMap = new ConcurrentHashMap<Integer, AtomicLong>(); + /* private */final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningMap = new LinkedHashMap<Integer, AtomicLong>(); /** * A collection of the operators which have executed at least once. @@ -367,9 +368,7 @@ /** * Update the {@link RunState} to indicate that the operator identified in * the {@link StartOpMessage} will execute and will consume the one or more - * {@link IChunkMessage}s. Both the total #of available messages and the #of - * messages available for that operator are incremented by - * {@link StartOpMessage#nmessages}. + * {@link IChunkMessage}s. * * @return <code>true</code> if this is the first time we will evaluate the * op. @@ -414,6 +413,72 @@ } /** + * Update the {@link RunState} to indicate that the data in the + * {@link IChunkMessage} was attached to an already running task for the + * target operator. + * + * @param msg + * @param runningOnServiceId + * @return <code>true</code> if this is the first time we will evaluate the + * op. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code>. + * @throws TimeoutException + * if the deadline for the query has passed. + */ + synchronized + public void addSource(final IChunkMessage<?> msg, + final UUID runningOnServiceId) throws TimeoutException { + + if (msg == null) + throw new IllegalArgumentException(); + + if (allDone.get()) + throw new IllegalStateException(ERR_QUERY_HALTED); + + if (deadline < System.currentTimeMillis()) + throw new TimeoutException(ERR_DEADLINE); + + nsteps.incrementAndGet(); + + final int bopId = msg.getBOpId(); + final int nmessages = 1; + + if (runningMap.get(bopId) == null) { + /* + * Note: There is a race condition in RunningQuery such that it is + * possible to add a 2nd source to an operator task before the task + * has begun to execute. Since the task calls startOp() once it + * begins to execute, this means that addSource() can be ordered + * before startOp() for the same task. This code block explicitly + * allows this condition and sets a 0L in the runningMap for the + * [bopId]. + */ + AtomicLong n = runningMap.get(bopId); + if (n == null) + runningMap.put(bopId, n = new AtomicLong()); +// throw new AssertionError(ERR_OP_NOT_STARTED + " msg=" + msg +// + ", this=" + this); + } + + messagesConsumed(bopId, nmessages); + + if (TableLog.tableLog.isInfoEnabled()) { + TableLog.tableLog.info(getTableRow("addSrc", runningOnServiceId, + bopId, msg.getPartitionId(), nmessages/* fanIn */, + null/* cause */, null/* stats */)); + } + + if (log.isInfoEnabled()) + log.info("startOp: " + toString() + " : bop=" + bopId); + + if (log.isTraceEnabled()) + log.trace(msg.toString()); + + } + + /** * Update the {@link RunState} to reflect the post-condition of the * evaluation of an operator against one or more {@link IChunkMessage}, * adjusting the #of messages available for consumption by the operator Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-20 18:32:25 UTC (rev 3833) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-20 18:39:05 UTC (rev 3834) @@ -31,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -53,11 +55,13 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.MultiplexBlockingBuffer; import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.concurrent.Haltable; @@ -116,6 +120,11 @@ /** The query. */ final private PipelineOp query; +// /** +// * @see QueryEngineTestAnnotations#COMBINE_RECEIVED_CHUNKS +// */ +// final protected boolean combineReceivedChunks; + /** * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. This * index is generated by the constructor. It is immutable and thread-safe. @@ -129,10 +138,20 @@ final private Haltable<Void> future = new Haltable<Void>(); /** - * A collection of {@link Future}s for currently executing operators for + * A collection of (bopId,partitionId) keys mapped onto a collection of + * operator task evaluation contexts for currently executing operators for * this query. + * + * @todo Futures are not being cleared from this collection as operators + * complete. This should be done systematically in order to ensure + * that any allocations associated with an operator task execution are + * released in a timely manner for long-running operators. (In fact, + * the {@link IAllocationContext} should take care of most of the + * issues here but we could still wind up with a lot of entries in + * this map in scale-out where there can be up to one per bop per + * shard in a given query.) */ - private final ConcurrentHashMap<BSBundle, Future<?>> operatorFutures = new ConcurrentHashMap<BSBundle, Future<?>>(); + private final ConcurrentHashMap<BSBundle, ChunkFutureTask> operatorFutures; /** * The runtime statistics for each {@link BOp} in the query and @@ -141,6 +160,50 @@ final private ConcurrentHashMap<Integer/* bopId */, BOpStats> statsMap; /** + * When running in stand alone, we can chain together the operators and have + * much higher throughput. Each operator has an {@link BlockingBuffer} which + * is essentially its input queue. The operator will drain its input queue + * using {@link BlockingBuffer#iterator()}. + * <p> + * Each operator closes its {@link IBlockingBuffer} sink(s) once its own + * source has been closed and it has finished processing that source. Since + * multiple producers can target the same operator, we need a means to + * ensure that the source for the target operator is not closed until each + * producer which targets that operator has closed its corresponding sink. + * <p> + * In order to support this many-to-one producer/consumer pattern, we wrap + * the input queue (a {@link BlockingBuffer}) for each operator having + * multiple sources with a {@link MultiplexBlockingBuffer}. This class gives + * each producer their own view on the underlying {@link BlockingBuffer}. + * The underlying {@link BlockingBuffer} will not be closed until all + * source(s) have closed their view of that buffer. This collection keeps + * track of the {@link MultiplexBlockingBuffer} wrapping the + * {@link BlockingBuffer} which is the input queue for each operator. + * <p> + * The input queues themselves are {@link BlockingBuffer} objects. Those + * objects are available from this map using + * {@link MultiplexBlockingBuffer#getBackingBuffer()}. These buffers are + * pre-allocated by {@link #populateInputBufferMap(BOp)}. + * {@link #startTasks(BOp)} is responsible for starting the operator tasks + * in a "back-to-front" order. {@link #startQuery(IChunkMessage)} kicks off + * the query and invokes {@link #startTasks(BOp)} to chain the input queues + * and output queues together (when so chained, the output queues are skins + * over the input queues obtained from {@link MultiplexBlockingBuffer}). + * + * FIXME The inputBufferMap will let us construct consumer producer chains + * where the consumer _waits_ for all producer(s) which target the consumer + * to close the sink associated with that consumer. Unlike when attaching an + * {@link IChunkMessage} to an already running operator, the consumer will + * NOT terminate (due to lack up input) until each running producer + * terminating that consumer terminates. This will improve concurrency, + * result in fewer task instances, and have better throughput than attaching + * a chunk to an already running task. However, in scale-out we will have + * tasks running on different nodes so we can not always chain together the + * producer and consumer in this tightly integrated manner. + */ + final private ConcurrentHashMap<Integer/*operator*/, MultiplexBlockingBuffer<IBindingSet[]>/*inputQueue*/> inputBufferMap; + + /** * The buffer used for the overall output of the query pipeline. * <p> * Note: This only exists on the query controller, and then only when the @@ -162,7 +225,9 @@ * A lock guarding various state changes. This guards changes to the * internal state of the {@link #runState} object. It is also used to * serialize requests to {@link #acceptChunk(IChunkMessage)} and - * {@link #cancel(boolean)}. + * {@link #cancel(boolean)} and make atomic decision concerning whether to + * attach a new {@link IChunkMessage} to an operator task which is already + * running or to start a new task for that message. * * @see RunState */ @@ -357,8 +422,14 @@ this.query = query; +// combineReceivedChunks = query.getProperty( +// QueryEngineTestAnnotations.COMBINE_RECEIVED_CHUNKS, +// QueryEngineTestAnnotations.DEFAULT_COMBINE_RECEIVED_CHUNKS); + this.bopIndex = BOpUtility.getIndex(query); + this.operatorFutures = new ConcurrentHashMap<BSBundle, ChunkFutureTask>(); + /* * Setup the BOpStats object for each pipeline operator in the query. */ @@ -366,8 +437,10 @@ runState = new RunState(this); - statsMap = createStatsMap(bopIndex); + statsMap = new ConcurrentHashMap<Integer, BOpStats>(); + populateStatsMap(query); + if (!query.isMutation()) { final BOpStats queryStats = statsMap.get(query.getId()); @@ -394,43 +467,101 @@ } + if(!queryEngine.isScaleOut()) { + /* + * Since the query engine is using the stand alone database mode we + * will now setup the input queues for each operator. Those queues + * will be used by each operator which targets a given operator. + * Each operator will start once and will run until all of its + * source(s) are closed. + * + * This allocates the buffers in a top-down manner (this is the + * reverse of the pipeline evaluation order). Allocation halts at if + * we reach an operator without children (e.g., StartOp) or an + * operator which is a CONTROLLER (Union). (If allocation does not + * halt at those boundaries then we can allocate buffers which will + * not be used. On the one hand, the StartOp receives a message + * containing the chunk to be evaluated. On the other hand, the + * buffers are not shared between the parent and a subquery so + * allocation within the subquery is wasted. This is also true for + * the [statsMap].) + */ + inputBufferMap = null; +// inputBufferMap = new ConcurrentHashMap<Integer, MultiplexBlockingBuffer<IBindingSet[]>>(); +// populateInputBufferMap(query); + } else { + inputBufferMap = null; + } + } /** - * Pre-populate a map with {@link BOpStats} objects for a query. - * - * @param bopIndex - * A map of the operators in the query which have assigned - * bopIds. - * - * @return A new map with an entry for each operator with a bopId which - * associates that operator with its {@link BOpStats} object. + * Pre-populate a map with {@link BOpStats} objects for the query. Operators + * in subqueries are not visited since they will be assigned {@link BOpStats} + * objects when they are run as a subquery. */ - static private ConcurrentHashMap<Integer, BOpStats> createStatsMap( - final Map<Integer, BOp> bopIndex) { + private void populateStatsMap(final BOp op) { - ConcurrentHashMap<Integer, BOpStats> statsMap = new ConcurrentHashMap<Integer, BOpStats>(); + if(!(op instanceof PipelineOp)) + return; + + final PipelineOp bop = (PipelineOp) op; - for (Map.Entry<Integer, BOp> e : bopIndex.entrySet()) { + final int bopId = bop.getId(); + + statsMap.put(bopId, bop.newStats()); - final int bopId = e.getKey(); - - final BOp tmp = e.getValue(); - - if ((tmp instanceof PipelineOp)) { - - final PipelineOp bop = (PipelineOp) tmp; - - statsMap.put(bopId, bop.newStats()); - + if (!op.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)) { + /* + * Visit children, but not if this is a CONTROLLER operator since + * its children belong to a subquery. + */ + for (BOp t : op.args()) { + // visit children (recursion) + populateStatsMap(t); } - } - - return statsMap; - + } +// /** +// * Pre-populate a map with {@link MultiplexBlockingBuffer} objects for the +// * query. Operators in subqueries are not visited since they will be +// * assigned buffer objects when they are run as a subquery. Operators +// * without children are not visited since they can not be the targets of +// * some other operator and hence do not need to have an assigned input +// * buffer. +// */ +// private void populateInputBufferMap(final BOp op) { +// +// if(!(op instanceof PipelineOp)) +// return; +// +// if (op.arity() == 0) +// return; +// +// final PipelineOp bop = (PipelineOp) op; +// +// final int bopId = bop.getId(); +// +// inputBufferMap.put(bopId, new MultiplexBlockingBuffer<IBindingSet[]>( +// bop.newBuffer(statsMap.get(bopId)))); +// +// if (!op.getProperty(BOp.Annotations.CONTROLLER, +// BOp.Annotations.DEFAULT_CONTROLLER)) { +// /* +// * Visit children, but not if this is a CONTROLLER operator since +// * its children belong to a subquery. +// */ +// for (BOp t : op.args()) { +// // visit children (recursion) +// populateInputBufferMap(t); +// } +// } +// +// } + /** * Take a chunk generated by some pass over an operator and make it * available to the target operator. How this is done depends on whether the @@ -465,6 +596,15 @@ if (sink == null) throw new IllegalArgumentException(); + if (inputBufferMap != null && inputBufferMap.get(sinkId) != null) { + /* + * FIXME The sink is just a wrapper for the input buffer so we do + * not need to do anything to propagate the data from one operator + * to the next. + */ + return 0; + } + /* * Note: The partitionId will always be -1 in scale-up. */ @@ -519,6 +659,9 @@ * * @param msg * The chunk. + * + * @todo Does this method really need the {@link #lock}? I doubt it since + * {@link #chunksIn} is thread-safe. */ protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { @@ -575,6 +718,11 @@ runState.startQuery(msg); +// if (inputBufferMap != null) { +// // Prestart a task for each operator. +// startTasks(query); +// } + } catch (TimeoutException ex) { halt(ex); @@ -587,6 +735,61 @@ } +// /** +// * Prestart a task for each operator. The operators are started in +// * back-to-front order (reverse pipeline evaluation order). The input queues +// * for the operators were created in by {@link #populateInputBufferMap(BOp)} +// * and are found in {@link #inputBufferMap}. The output queues for the +// * operators are skins over the output queues obtained from +// * {@link MultiplexBlockingBuffer}. +// * +// * @param op +// * The +// * +// * @see #inputBufferMap +// */ +// private void startTasks(final BOp op) { +// +// if(!(op instanceof PipelineOp)) +// return; +// +// if (op.arity() == 0) +// return; +// +// final PipelineOp bop = (PipelineOp) op; +// +// final int bopId = bop.getId(); +// +// final MultiplexBlockingBuffer<IBindingSet[]> inputBuffer = inputBufferMap +// .get(bopId); +// +// if (inputBuffer == null) +// throw new AssertionError("No input buffer? " + op); +// +// final IAsynchronousIterator<IBindingSet[]> src = inputBuffer +// .getBackingBuffer().iterator(); +// +// final ChunkTask chunkTask = new ChunkTask(bopId, -1/* partitionId */, +// src); +// +// final FutureTask<Void> futureTask = wrapChunkTask(chunkTask); +// +// queryEngine.execute(futureTask); +// +// if (!op.getProperty(BOp.Annotations.CONTROLLER, +// BOp.Annotations.DEFAULT_CONTROLLER)) { +// /* +// * Visit children, but not if this is a CONTROLLER operator since +// * its children belong to a subquery. +// */ +// for (BOp t : op.args()) { +// // visit children (recursion) +// startTasks(t); +// } +// } +// +// } + /** * Message provides notice that the operator has started execution and will * consume some specific number of binding set chunks. @@ -747,85 +950,290 @@ } /** - * Return a {@link FutureTask} which will consume the binding set chunk. The - * caller must run the {@link FutureTask}. + * Consume zero or more chunks in the input queue for this query. The + * chunk(s) will either be assigned to an already running task for the + * target operator or they will be assigned to new tasks. * - * @param chunk - * A chunk to be consumed. + * FIXME Drain the input queue, assigning any chunk waiting to a task. If + * the task is already running, then add the chunk to that task. Otherwise + * start a new task. */ - @SuppressWarnings("unchecked") - protected FutureTask<Void> newChunkTask( - final IChunkMessage<IBindingSet> chunk) { - - // create runnable to evaluate a chunk for an operator and partition. - final ChunkTask chunkTask = new ChunkTask(chunk); - - // wrap runnable. - final FutureTask<Void> f2 = new FutureTask(chunkTask, null/* result */); - - final BSBundle bundle = new BSBundle(chunk.getBOpId(), chunk - .getPartitionId()); - - // add to list of active futures for this query. - if (operatorFutures.put(bundle, f2) != null) { + protected void consumeChunk() { + final IChunkMessage<IBindingSet> msg = chunksIn.poll(); + if (msg == null) + return; + try { + if (!msg.isMaterialized()) + throw new IllegalStateException(); + if (log.isTraceEnabled()) + log.trace("Accepted chunk: " + msg); + final BSBundle bundle = new BSBundle(msg.getBOpId(), msg + .getPartitionId()); /* - * FIXME This indicates that we have more than one future for the - * same (bopId,shardId). When this is true we are losing track of - * with the consequence that we can not properly cancel them. - * Instead of losing track like this, we should be targeting the - * running operator instance with the new chunk. This needs to be - * done atomically. + * Look for instance of this task which is already running. */ -// throw new AssertionError(); + final ChunkFutureTask chunkFutureTask = operatorFutures.get(bundle); + if (!queryEngine.isScaleOut() && chunkFutureTask != null) { + /* + * Attempt to atomically attach the message as another src. + */ + if (chunkFutureTask.chunkTask.context.addSource(msg + .getChunkAccessor().iterator())) { + lock.lock(); + try { + /* + * message was added to a running task. + * + * FIXME This needs to be an RMI in scale-out back to + * the query controller so it can update the #of + * messages which are being consumed by this task. + * However, doing RMI here will add latency into the + * thread submitting tasks for evaluation and the + * coordination overhead of addSource() in scale-out may + * be too high. However, if we do not combine sources in + * scale-out then we may have too much overhead in terms + * of the #of running tasks with few tuples per task. + * Another approach is the remote async iterator with + * multiple sources (parallel multi source iterator). + * + * FIXME This code path is NOT being taken in scale-out + * right now since it would not get the message to the + * query controller. We will need to add addSource() to + * IQueryClient parallel to startOp() and haltOp() for + * this to work. + */ + runState.addSource(msg, queryEngine.getServiceUUID()); + return; + } finally { + lock.unlock(); + } + } + } + // wrap runnable. + final ChunkFutureTask ft = new ChunkFutureTask(new ChunkTask(msg)); + // add to list of active futures for this query. + if (operatorFutures.put(bundle, ft) != null) { + /* + * Note: This can cause the FutureTask to be accessible (above) + * before startOp() has been called for that ChunkTask (the + * latter occurs when the chunk task actually runs.) This a race + * condition has been resolved in RunState by allowing + * addSource() even when there is no registered task running for + * that [bopId]. + * + * FIXME This indicates that we have more than one future for + * the same (bopId,shardId). When this is true we are losing + * track of Futures with the consequence that we can not + * properly cancel them. Instead of losing track like this, we + * should be targeting the running operator instance with the + * new chunk. This needs to be done atomically, e.g., using the + * [lock]. + * + * Even if we only have one task per operator in standalone and + * we attach chunks to an already running task in scale-out, + * there is still the possibility in scale-out that a task may + * have closed its source but still be running, in which case we + * would lose the Future for the already running task when we + * start a new task for the new chunk for the target operator. + */ + // throw new AssertionError(); + } + // submit task for execution (asynchronous). + queryEngine.execute(ft); + } catch (Throwable ex) { + // halt query. + throw new RuntimeException(halt(ex)); } - - // return : caller will execute. - return f2; - } - - /* - * @todo Possible class to give us more information about a running operator - * so we can attach a new chunk to the source for a running instance. An - * alternative is to attach the same sinks to each instance of the operator, - * but then we get into trouble with the operator implementations which will - * close their sinks when they get to the bottom of their processing loop. - */ -// private static class RunningFutureContext { + +// /** +// * Return a {@link FutureTask} which will consume the binding set chunk. The +// * caller must run the {@link FutureTask}. +// * +// * @param chunk +// * A chunk to be consumed. +// */ +// private FutureTask<Void> newChunkTask( +// final IChunkMessage<IBindingSet> chunk) { // -// private final Future<Void> f; -// private final BOpContext<IBindingSet> context; -// private final ChunkTask chunkTask; +// if (!chunk.isMaterialized()) +// throw new IllegalStateException(); // -// public RunningFutureContext(final Future<Void> f, -// final BOpContext<IBindingSet> context, final ChunkTask chunkTask) { -// this.f = f; -// this.context = context; -// this.chunkTask = chunkTask; -// } +// // create runnable to evaluate a chunk for an operator and partition. +// final ChunkTask chunkTask = new ChunkTask(chunk); +// +//// return wrapChunkTask(chunkTask); +//// +//// } +//// +//// protected FutureTask<Void> wrapChunkTask(final ChunkTask chunkTask) { +// +// final BSBundle bundle = new BSBundle(chunkTask.bopId, +// chunkTask.partitionId); // -// public void addMessage(final IChunkMessage<IBindingSet> msg) { -// context.getSource(); -// throw new UnsupportedOperationException(); +// // wrap runnable. +// final ChunkFutureTask f2 = new ChunkFutureTask(chunkTask); +// +// // add to list of active futures for this query. +// if (operatorFutures.put(bundle, f2) != null) { +// /* +// * FIXME This indicates that we have more than one future for the +// * same (bopId,shardId). When this is true we are losing track of +// * Futures with the consequence that we can not properly cancel +// * them. Instead of losing track like this, we should be targeting +// * the running operator instance with the new chunk. This needs to +// * be done atomically, e.g., using the [lock]. +// * +// * Even if we only have one task per operator in standalone and we +// * attach chunks to an already running task in scale-out, there is +// * still the possibility in scale-out that a task may have closed +// * its source but still be running, in which case we would lose the +// * Future for the already running task when we start a new task for +// * the new chunk for the target operator. +// */ +//// throw new AssertionError(); // } -// +// +// // return : caller will execute. +// return f2; +// // } + + /** + * A {@link FutureTask} which exposes the {@link ChunkTask} which is being + * evaluated. + */ + private class ChunkFutureTask extends FutureTask<Void> { + + public final ChunkTask chunkTask; + + public ChunkFutureTask(final ChunkTask chunkTask) { + +// super(chunkTask, null/* result */); + + // Note: wraps chunk task to ensure source and sinks get closed. + super(new ChunkTaskWrapper(chunkTask), null/* result */); + + this.chunkTask = chunkTask; + + } + + } + + /** + * Wraps the {@link ChunkTask} and handles various handshaking with the + * {@link RunningQuery} and the {@link RunState}. Since starting and + * stopping a {@link ChunkTask} requires handshaking with the query + * controller, it is important that these actions take place once the task + * has been submitted - otherwise they would be synchronous in the loop + * which consumes available chunks and generates new {@link ChunkTask}s. + */ + private class ChunkTaskWrapper implements Runnable { + + private final ChunkTask t; + + public ChunkTaskWrapper(final ChunkTask chunkTask) { + + if (chunkTask == null) + throw new IllegalArgumentException(); + + this.t = chunkTask; + + } + + public void run() { + final UUID serviceId = queryEngine.getServiceUUID(); + final int messagesIn = 1; // accepted one IChunkMessage. FIXME + // Problem when chaining buffers? + try { + /* + * Note: This is potentially an RMI back to the controller. It + * is invoked from within the running task in order to remove + * the latency for that RMI from the thread which submits tasks + * to consume chunks. + */ + clientProxy.startOp(new StartOpMessage(queryId, t.bopId, + t.partitionId, serviceId, messagesIn)); + t.call(); + // Send message to controller. + final HaltOpMessage msg = new HaltOpMessage(queryId, t.bopId, + t.partitionId, serviceId, null/* cause */, t.sinkId, + t.sinkMessagesOut.get(), t.altSinkId, + t.altSinkMessagesOut.get(), t.context.getStats()); + try { + t.context.getExecutorService().execute( + new SendHaltMessageTask(clientProxy, msg, + RunningQuery.this)); + } catch (RejectedExecutionException ex) { + // e.g., service is shutting down. + log.error("Could not send message: " + msg, ex); + } + } catch (Throwable ex1) { + + // Log an error. + log.error("queryId=" + queryId + ", bopId=" + t.bopId, ex1); + + /* + * Mark the query as halted on this node regardless of whether + * we are able to communicate with the query controller. + * + * Note: Invoking halt(t) here will log an error. This logged + * error message is necessary in order to catch errors in + * clientProxy.haltOp() (above and below). + */ + final Throwable firstCause = halt(ex1); + + final HaltOpMessage msg = new HaltOpMessage(queryId, t.bopId, + t.partitionId, serviceId, firstCause, t.sinkId, + t.sinkMessagesOut.get(), t.altSinkId, + t.altSinkMessagesOut.get(), t.context.getStats()); + try { + /* + * Queue a task to send the halt message to the query + * controller. + */ + t.context.getExecutorService().execute( + new SendHaltMessageTask(clientProxy, msg, + RunningQuery.this)); + } catch (RejectedExecutionException ex) { + // e.g., service is shutting down. + log.warn("Could not send message: " + msg, ex); + } catch (Throwable ex) { + log + .error("Could not send message: " + msg + " : " + + ex, ex); + } + + } + + } + + } +// final BOpContext<?> context = chunkTask.context; +// context.getSource().close(); +// if (context.getSink() != null) { +// context.getSink().close(); +// } +// if (context.getSink2() != null) { +// context.getSink2().close(); +// } + /** * Runnable evaluates an operator for some chunk of inputs. In scale-out, * the operator may be evaluated against some partition of a scale-out * index. */ - private class ChunkTask implements Runnable { + private class ChunkTask implements Callable<Void> { /** Alias for the {@link ChunkTask}'s logger. */ private final Logger log = chunkTaskLog; - /** - * The message with the materialized chunk to be consumed by the - * operator. - */ - final IChunkMessage<IBindingSet> msg; +// /** +// * The message with the materialized chunk to be consumed by the +// * operator. +// */ +// final IChunkMessage<IBindingSet> msg; /** The index of the bop which is being evaluated. */ private final int bopId; @@ -882,13 +1290,27 @@ */ private final FutureTask<Void> ft; + /** #of chunk messages out to sink. */ + final AtomicInteger sinkMessagesOut = new AtomicInteger(0); + + /** #of chunk messages out to altSink. */ + final AtomicInteger altSinkMessagesOut = new AtomicInteger(0); + /** - * Create a task to consume a chunk. This looks up the {@link BOp} which - * is the target for the message in the {@link RunningQuery#bopIndex}, - * creates the sink(s) for the {@link BOp}, creates the - * {@link BOpContext} for that {@link BOp}, and wraps the value returned - * by {@link PipelineOp#eval(BOpContext)} in order to handle - * the outputs written on those sinks. + * A human readable representation of the {@link ChunkTask}'s state. + */ + public String toString() { + return "ChunkTask" + // + "{query=" + queryId + // + ",bopId=" + bopId + // + ",partitionId=" + partitionId + // + ",sinkId=" + sinkId + // + ",altSinkId=" + altSinkId + // + "}"; + } + + /** + * Create a task to consume a chunk. * * @param msg * A message containing the materialized chunk and metadata @@ -900,18 +1322,48 @@ */ public ChunkTask(final IChunkMessage<IBindingSet> msg) { - if (msg == null) - throw new IllegalArgumentException(); + this(msg.getBOpId(), msg.getPartitionId(), msg.getChunkAccessor() + .iterator()); + + } + +// /** +// * Alternative constructor used when chaining the operators together in +// * standalone. The input queue of an operator is wrapped and used as the +// * output queue of each operator which targets that operator as either +// * its default or alternative sink. +// */ +// public ChunkTask(final int bopId) { +// +// this(bopId, -1/* partitionId */, inputBufferMap.get(bopId) +// .getBackingBuffer().iterator()); +// +// } + + /** + * Core implementation. + * <p> + * This looks up the {@link BOp} which is the target for the message in + * the {@link RunningQuery#bopIndex}, creates the sink(s) for the + * {@link BOp}, creates the {@link BOpContext} for that {@link BOp}, and + * wraps the value returned by {@link PipelineOp#eval(BOpContext)} in + * order to handle the outputs written on those sinks. + * + * @param bopId + * The operator to which the message was addressed. + * @param partitionId + * The partition identifier to which the message was + * addressed. + * @param source + * Where the task will read its inputs. + */ + public ChunkTask(final int bopId, final int partitionId, + final IAsynchronousIterator<IBindingSet[]> src) { + + this.bopId = bopId; - if (!msg.isMaterialized()) - throw new IllegalStateException(); + this.partitionId = partitionId; - this.msg = msg; - - bopId = msg.getBOpId(); - - partitionId = msg.getPartitionId(); - bop = bopIndex.get(bopId); if (bop == null) @@ -947,13 +1399,6 @@ + bop); } -// if (sinkId != null && altSinkId != null -// && sinkId.intValue() == altSinkId.intValue()) { -// throw new RuntimeException( -// "The primary and alternative sink may not be the same operator: " -// + bop); -// } - /* * Setup the BOpStats object. For some operators, e.g., SliceOp, * this MUST be the same object across all invocations of that @@ -965,34 +1410,30 @@ * since that would cause double counting when the same object is * used for each invocation of the operator. * - * @todo If we always pass in a shared stats object then we will - * have live reporting on all instances of the task evaluating each - * operator in the query but there could be more contention for the - * counters. However, if we chain the operators together then we are - * likely to run one task instance per operator, at least in - * standalone. Try it w/ always shared and see if there is a hot - * spot? + * Note: By using a shared stats object we have live reporting on + * all instances of the task which are being evaluated on the query + * controller (tasks running on peers always have distinct stats + * objects and those stats are aggregated when the task finishes). */ final BOpStats stats; - if (((PipelineOp) bop).isSharedState()) { -// final BOpStats foo = op.newStats(); -// final BOpStats bar = statsMap.putIfAbsent(bopId, foo); -// stats = (bar == null ? foo : bar); + if (((PipelineOp) bop).isSharedState() || statsMap != null) { + // shared stats object. stats = statsMap.get(bopId); } else { + // distinct stats objects, aggregated as each task finishes. stats = op.newStats(); } assert stats != null; - sink = (p == null ? queryBuffer : op.newBuffer(stats)); + sink = (p == null ? queryBuffer : newBuffer(op, sinkId, stats)); altSink = altSinkId == null ? null - : altSinkId.equals(sinkId) ? sink : op.newBuffer(stats); + : altSinkId.equals(sinkId) ? sink : newBuffer(op, sinkId, + stats); // context : @todo pass in IChunkMessage or IChunkAccessor context = new BOpContext<IBindingSet>(RunningQuery.this, - partitionId, stats, msg.getChunkAccessor().iterator(), - sink, altSink); + partitionId, stats, src, sink, altSink); // FutureTask for operator execution (not running yet). if ((ft = op.eval(context)) == null) @@ -1001,6 +1442,38 @@ } /** + * Factory returns the {@link IBlockingBuffer} on which the operator + * should write its outputs which target the specified <i>sinkId</i>. + * + * @param op + * The operator whose evaluation task is being constructed. + * @param sinkId + * The identifier for an operator which which the task will + * write its solutions (either the primary or alternative + * sink). + * @param stats + * The statistics object for the evaluation of the operator. + * + * @return The buffer on which the operator should write outputs which + * target that sink. + */ + private IBlockingBuffer<IBindingSet[]> newBuffer(final PipelineOp op, + final int sinkId, final BOpStats stats) { + + final MultiplexBlockingBuffer<IBindingSet[]> factory = inputBufferMap == null ? null + : inputBufferMap.get(sinkId); + + if (factory != null) { + + return factory.newInstance(); + + } + + return op.newBuffer(stats); + + } + + /** * Return the effective default sink. * * @param bop @@ -1031,102 +1504,42 @@ return sink; } - + /** * Evaluate the {@link IChunkMessage}. */ - public void run() { - final UUID serviceId = queryEngine.getServiceUUID(); - final int messagesIn = 1; // accepted one IChunkMessage. - int sinkMessagesOut = 0; // #of chunk messages out to sink. - int altSinkMessagesOut = 0; // #of chunk messages out to altSink. - try { - clientProxy.startOp(new StartOpMessage(queryId, bopId, - partitionId, serviceId, messagesIn)); - if (log.isDebugEnabled()) - log.debug("Running chunk: " + msg); - ft.run(); // run - ft.get(); // verify success - if (sink != null && sink != queryBuffer && !sink.isEmpty()) { - if (sinkId == null) - throw new RuntimeException("sinkId not defined: bopId=" - + bopId + ", query=" - + BOpUtility.toString(query)); - /* - * Handle sink output, sending appropriate chunk message(s). - * - * Note: This maps output over shards/nodes in s/o. - */ - sinkMessagesOut += handleOutputChunk(bop, sinkId, sink); - } - if (altSink != null && altSink != queryBuffer - && !altSink.isEmpty()) { - if (altSinkId == null) - throw new RuntimeException( - "altSinkId not defined: bopId=" + bopId - + ", query=" - + BOpUtility.toString(query)); - /* - * Handle alt sink output, sending appropriate chunk - * message(s). - * - * Note: This maps output over shards/nodes in s/o. - */ - altSinkMessagesOut += handleOutputChunk(bop, altSinkId, - altSink); - } - // Send message to controller. - try { - final HaltOpMessage msg = new HaltOpMessage(queryId, bopId, - partitionId, serviceId, null/* cause */, sinkId, - sinkMessagesOut, altSinkId, altSinkMessagesOut, - context.getStats()); - context.getExecutorService().execute( - new SendHaltMessageTask(clientProxy, msg, - RunningQuery.this)); - } catch (RejectedExecutionException ex) { - // e.g., service is shutting down. - log.error("Could not send message: " + msg, ex); - } - } catch (Throwable t) { - - // Log an error. - log.error("queryId=" + queryId + ", bopId=" + bopId, t); - + public Void call() throws Exception { + if (log.isDebugEnabled()) + log.debug("Running chunk: " + this); + ft.run(); // run + ft.get(); // verify success + if (sink != null && sink != queryBuffer && !sink.isEmpty()) { + if (sinkId == null) + throw new RuntimeException("sinkId not defined: bopId=" + + bopId + ", query=" + BOpUtility.toString(query)); /* - * Mark the query as halted on this node regardless of whether - * we are able to communicate with the query controller. + * Handle sink output, sending appropriate chunk message(s). * - * Note: Invoking halt(t) here will log an error. This logged - * error message is necessary in order to catch errors in - * clientProxy.haltOp() (above and below). + * Note: This maps output over shards/nodes in s/o. */ - final Throwable firstCause = halt(t); - - try { - /* - * Queue a task to send the halt message to the query - * controller. - */ - final HaltOpMessage msg = new HaltOpMessage(queryId, bopId, - partitionId, serviceId, firstCause, sinkId, - sinkMessagesOut, altSinkId, altSinkMessagesOut, - context.getStats()); - context.getExecutorService().execute( - new SendHaltMessageTask(clientProxy, msg, - RunningQuery.this)); - } catch (RejectedExecutionException ex) { - // e.g., service is shutting down. - log.error("Could not send message: " + msg, ex); - } catch (Throwable ex) { - log.error("Could not send message: " + msg + " : " - + ex, ex); - } - + sinkMessagesOut.addAndGet(handleOutputChunk(bop, sinkId, sink)); } + if (altSink != null && altSink != queryBuffer && !altSink.isEmpty()) { + if (altSinkId == null) + throw new RuntimeException("altSinkId not defined: bopId=" + + bopId + ", query=" + BOpUtility.toString(query)); + /* + * Handle alt sink output, sending appropriate chunk message(s). + * + * Note: This maps output over shards/nodes in s/o. + */ + altSinkMessagesOut.addAndGet(handleOutputChunk(bop, altSinkId, + altSink)); + } +... [truncated message content] |