From: <tho...@us...> - 2010-10-22 19:45:42
|
Revision: 3838 http://bigdata.svn.sourceforge.net/bigdata/?rev=3838&view=rev Author: thompsonbry Date: 2010-10-22 19:45:33 +0000 (Fri, 22 Oct 2010) Log Message: ----------- Reorganized RunningQueue in order to have (a) operator tasks generate chunks incrementally (this avoids problems with deadlock when the operator writes onto a bounded queue and reduces the latency required to produce each solution); and (b) have operator tasks drain their work queue in order to get better efficiency when the producer is leading. These changes should all benefit scale-out and as well as scale-up. Scale-up will also benefit from chaining the operators together (rather than passing around IChunkMessages) but I have not yet made that optimization. I have tested this change set against: - TestBigdataSailWithQuads - LUBM U10 - BSBM 1M No obvious lock contention was visible with BSBM 1M. No obvious hotspots were revealed by a sampling profiler. I am going to test on larger scale on a workstation next so I can compare performance to the trunk baseline. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-22 17:53:21 UTC (rev 3837) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -37,8 +37,6 @@ import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; -import com.bigdata.relation.accesspath.MultiSourceSequentialAsynchronousIterator; import com.bigdata.service.IBigdataFederation; /** @@ -57,7 +55,8 @@ private final BOpStats stats; - private final IMultiSourceAsynchronousIterator<E[]> source; +// private final IMultiSourceAsynchronousIterator<E[]> source; + private final IAsynchronousIterator<E[]> source; private final IBlockingBuffer<E[]> sink; @@ -98,25 +97,25 @@ return source; } - /** - * Attach another source. The decision to attach the source is mutex with - * respect to the decision that the source reported by {@link #getSource()} - * is exhausted. - * - * @param source - * The source. - * - * @return <code>true</code> iff the source was attached. - */ - public boolean addSource(IAsynchronousIterator<E[]> source) { +// /** +// * Attach another source. The decision to attach the source is mutex with +// * respect to the decision that the source reported by {@link #getSource()} +// * is exhausted. +// * +// * @param source +// * The source. +// * +// * @return <code>true</code> iff the source was attached. +// */ +// public boolean addSource(IAsynchronousIterator<E[]> source) { +// +// if (source == null) +// throw new IllegalArgumentException(); +// +// return this.source.add(source); +// +// } - if (source == null) - throw new IllegalArgumentException(); - - return this.source.add(source); - - } - /** * Where to write the output of the operator. * @@ -202,7 +201,8 @@ throw new IllegalArgumentException(); this.partitionId = partitionId; this.stats = stats; - this.source = new MultiSourceSequentialAsynchronousIterator<E[]>(source); + this.source = source; +// this.source = new MultiSourceSequentialAsynchronousIterator<E[]>(source); this.sink = sink; this.sink2 = sink2; // may be null } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-22 17:53:21 UTC (rev 3837) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -161,31 +161,31 @@ } - /** - * Instantiate a buffer suitable as a sink for this operator. The buffer - * will be provisioned based on the operator annotations. - * <p> - * Note: if the operation swallows binding sets from the pipeline (such as - * operators which write on the database) then the operator MAY return an - * immutable empty buffer. - * - * @param stats - * The statistics on this object will automatically be updated as - * elements and chunks are output onto the returned buffer. - * - * @return The buffer. - */ - public IBlockingBuffer<IBindingSet[]> newBuffer(final BOpStats stats) { +// /** +// * Instantiate a buffer suitable as a sink for this operator. The buffer +// * will be provisioned based on the operator annotations. +// * <p> +// * Note: if the operation swallows binding sets from the pipeline (such as +// * operators which write on the database) then the operator MAY return an +// * immutable empty buffer. +// * +// * @param stats +// * The statistics on this object will automatically be updated as +// * elements and chunks are output onto the returned buffer. +// * +// * @return The buffer. +// */ +// public IBlockingBuffer<IBindingSet[]> newBuffer(final BOpStats stats) { +// +// if (stats == null) +// throw new IllegalArgumentException(); +// +// return new BlockingBufferWithStats<IBindingSet[]>( +// getChunkOfChunksCapacity(), getChunkCapacity(), +// getChunkTimeout(), Annotations.chunkTimeoutUnit, stats); +// +// } - if (stats == null) - throw new IllegalArgumentException(); - - return new BlockingBufferWithStats<IBindingSet[]>( - getChunkOfChunksCapacity(), getChunkCapacity(), - getChunkTimeout(), Annotations.chunkTimeoutUnit, stats); - - } - /** * Return a {@link FutureTask} which computes the operator against the * evaluation context. The caller is responsible for executing the @@ -205,77 +205,4 @@ */ abstract public FutureTask<Void> eval(BOpContext<IBindingSet> context); - private static class BlockingBufferWithStats<E> extends BlockingBuffer<E> { - - private final BOpStats stats; - - /** - * @param chunkOfChunksCapacity - * @param chunkCapacity - * @param chunkTimeout - * @param chunkTimeoutUnit - * @param stats - */ - public BlockingBufferWithStats(int chunkOfChunksCapacity, - int chunkCapacity, long chunkTimeout, - TimeUnit chunkTimeoutUnit, final BOpStats stats) { - - super(chunkOfChunksCapacity, chunkCapacity, chunkTimeout, - chunkTimeoutUnit); - - this.stats = stats; - - } - - /** - * Overridden to track {@link BOpStats#unitsOut} and - * {@link BOpStats#chunksOut}. - * <p> - * Note: {@link BOpStats#chunksOut} will report the #of chunks added to - * this buffer. However, the buffer MAY combine chunks either on add() - * or when drained by the iterator so the actual #of chunks read back - * from the iterator MAY differ. - * <p> - * {@inheritDoc} - */ - @Override - public boolean add(final E e, final long timeout, final TimeUnit unit) - throws InterruptedException { - - final boolean ret = super.add(e, timeout, unit); - - if (e.getClass().getComponentType() != null) { - - stats.unitsOut.add(((Object[]) e).length); - - } else { - - stats.unitsOut.increment(); - - } - - stats.chunksOut.increment(); - - return ret; - - } - - /** - * You can uncomment a line in this method to see who is closing the - * buffer. - * <p> - * {@inheritDoc} - */ - @Override - public void close() { - -// if (isOpen()) -// log.error(toString(), new RuntimeException("STACK TRACE")); - - super.close(); - - } - - } - } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -0,0 +1,90 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 22, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.concurrent.TimeUnit; + +import com.bigdata.bop.BufferAnnotations; +import com.bigdata.bop.PipelineOp; +import com.bigdata.relation.accesspath.BlockingBuffer; + +/** + * Extended to use the {@link BufferAnnotations} to provision the + * {@link BlockingBuffer} and to track the {@link BOpStats} as chunks are added + * to the buffer. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class BlockingBufferWithStats<E> extends BlockingBuffer<E> { + + private final BOpStats stats; + + public BlockingBufferWithStats(final PipelineOp op, final BOpStats stats) { + + super(op.getChunkOfChunksCapacity(), op.getChunkCapacity(), op + .getChunkTimeout(), BufferAnnotations.chunkTimeoutUnit); + + this.stats = stats; + + } + + /** + * Overridden to track {@link BOpStats#unitsOut} and + * {@link BOpStats#chunksOut}. + * <p> + * Note: {@link BOpStats#chunksOut} will report the #of chunks added to this + * buffer. However, the buffer MAY combine chunks either on add() or when + * drained by the iterator so the actual #of chunks read back from the + * iterator MAY differ. + * <p> + * {@inheritDoc} + */ + @Override + public boolean add(final E e, final long timeout, final TimeUnit unit) + throws InterruptedException { + + final boolean ret = super.add(e, timeout, unit); + + if (e.getClass().getComponentType() != null) { + + stats.unitsOut.add(((Object[]) e).length); + + } else { + + stats.unitsOut.increment(); + + } + + stats.chunksOut.increment(); + + return ret; + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BlockingBufferWithStats.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -0,0 +1,76 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 22, 2010 + */ + +package com.bigdata.bop.engine; + +import com.bigdata.bop.IBindingSet; + +/** + * Interface dispatches an {@link IBindingSet}[] chunk generated by a running + * operator task. Each task may produce zero or more such chunks. The chunks may + * be combined together by the caller in order to have "chunkier" processing by + * this interface. The interface is responsible for generating the appropriate + * {@link IChunkMessage}(s) for each {@link IBindingSet}[] chunk. In standalone + * there is a one-to-one relationship between input chunks and output messages. + * In scale-out, we map each {@link IBindingSet} over the shard(s) for the next + * operator, which is a many-to-one mapping. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IChunkHandler { + + /** + * Take an {@link IBindingSet}[] chunk generated by some pass over an + * operator and make it available to the target operator. How this is done + * depends on whether the query is running against a standalone database or + * the scale-out database. + * <p> + * Note: The return value is used as part of the termination criteria for + * the query which depends on (a) the #of running operator tasks and (b) the + * #of {@link IChunkMessage}s generated (available) and consumed. The return + * value of this method increases the #of {@link IChunkMessage} available to + * the query. + * + * @param query + * The query. + * @param bopId + * The operator which wrote on the sink. + * @param sinkId + * The identifier of the target operator. + * @param chunk + * The intermediate results to be passed to that target operator. + * + * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) + * for scale-up. For scale-out, there will be at least one + * {@link IChunkMessage} per index partition over which the + * intermediate results were mapped. + */ + int handleChunk(RunningQuery query, int bopId, int sinkId, + IBindingSet[] chunk); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-10-22 17:53:21 UTC (rev 3837) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -54,13 +54,15 @@ * {@link RunState} termination conditions linked to having multiple * {@link IChunkMessage}s. * <p> - * Note: Just controlling the - * {@link PipelineOp.Annotations#CHUNK_CAPACITY} and - * {@link PipelineOp.Annotations#CHUNK_OF_CHUNKS_CAPACITY} is not - * enough to force the {@link QueryEngine} to run the an operator once per - * source chunk. The {@link QueryEngine} normally combines chunks together. - * You MUST also specify this annotation in order for the query engine to - * send multiple {@link IChunkMessage} rather than just one. + * Note: Just controlling the {@link PipelineOp.Annotations#CHUNK_CAPACITY} + * and {@link PipelineOp.Annotations#CHUNK_OF_CHUNKS_CAPACITY} is not enough + * to force the {@link QueryEngine} to run the an operator once per source + * chunk. The {@link QueryEngine} normally combines chunks together. You + * MUST also specify this annotation in order for the query engine to send + * multiple {@link IChunkMessage} rather than just one. + * + * @deprecated Support for this is no longer present. It was lost when the + * {@link StandaloneChunkHandler} was written. */ String ONE_MESSAGE_PER_CHUNK = QueryEngineTestAnnotations.class.getName() + ".oneMessagePerChunk"; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-22 17:53:21 UTC (rev 3837) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -185,7 +185,7 @@ * readily exposed as {@link Map} object. If we were to expose the map, it * would have to be via a get(key) style interface. */ - /* private */final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableMap = new ConcurrentHashMap<Integer, AtomicLong>(); + /* private */final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableMap = new LinkedHashMap<Integer, AtomicLong>(); /** * A collection reporting on the #of instances of a given {@link BOp} which @@ -412,71 +412,71 @@ } - /** - * Update the {@link RunState} to indicate that the data in the - * {@link IChunkMessage} was attached to an already running task for the - * target operator. - * - * @param msg - * @param runningOnServiceId - * @return <code>true</code> if this is the first time we will evaluate the - * op. - * - * @throws IllegalArgumentException - * if the argument is <code>null</code>. - * @throws TimeoutException - * if the deadline for the query has passed. - */ - synchronized - public void addSource(final IChunkMessage<?> msg, - final UUID runningOnServiceId) throws TimeoutException { - - if (msg == null) - throw new IllegalArgumentException(); - - if (allDone.get()) - throw new IllegalStateException(ERR_QUERY_HALTED); - - if (deadline < System.currentTimeMillis()) - throw new TimeoutException(ERR_DEADLINE); - - nsteps.incrementAndGet(); - - final int bopId = msg.getBOpId(); - final int nmessages = 1; - - if (runningMap.get(bopId) == null) { - /* - * Note: There is a race condition in RunningQuery such that it is - * possible to add a 2nd source to an operator task before the task - * has begun to execute. Since the task calls startOp() once it - * begins to execute, this means that addSource() can be ordered - * before startOp() for the same task. This code block explicitly - * allows this condition and sets a 0L in the runningMap for the - * [bopId]. - */ - AtomicLong n = runningMap.get(bopId); - if (n == null) - runningMap.put(bopId, n = new AtomicLong()); -// throw new AssertionError(ERR_OP_NOT_STARTED + " msg=" + msg -// + ", this=" + this); - } - - messagesConsumed(bopId, nmessages); - - if (TableLog.tableLog.isInfoEnabled()) { - TableLog.tableLog.info(getTableRow("addSrc", runningOnServiceId, - bopId, msg.getPartitionId(), nmessages/* fanIn */, - null/* cause */, null/* stats */)); - } - - if (log.isInfoEnabled()) - log.info("startOp: " + toString() + " : bop=" + bopId); - - if (log.isTraceEnabled()) - log.trace(msg.toString()); - - } +// /** +// * Update the {@link RunState} to indicate that the data in the +// * {@link IChunkMessage} was attached to an already running task for the +// * target operator. +// * +// * @param msg +// * @param runningOnServiceId +// * @return <code>true</code> if this is the first time we will evaluate the +// * op. +// * +// * @throws IllegalArgumentException +// * if the argument is <code>null</code>. +// * @throws TimeoutException +// * if the deadline for the query has passed. +// */ +// synchronized +// public void addSource(final IChunkMessage<?> msg, +// final UUID runningOnServiceId) throws TimeoutException { +// +// if (msg == null) +// throw new IllegalArgumentException(); +// +// if (allDone.get()) +// throw new IllegalStateException(ERR_QUERY_HALTED); +// +// if (deadline < System.currentTimeMillis()) +// throw new TimeoutException(ERR_DEADLINE); +// +// nsteps.incrementAndGet(); +// +// final int bopId = msg.getBOpId(); +// final int nmessages = 1; +// +// if (runningMap.get(bopId) == null) { +// /* +// * Note: There is a race condition in RunningQuery such that it is +// * possible to add a 2nd source to an operator task before the task +// * has begun to execute. Since the task calls startOp() once it +// * begins to execute, this means that addSource() can be ordered +// * before startOp() for the same task. This code block explicitly +// * allows this condition and sets a 0L in the runningMap for the +// * [bopId]. +// */ +// AtomicLong n = runningMap.get(bopId); +// if (n == null) +// runningMap.put(bopId, n = new AtomicLong()); +//// throw new AssertionError(ERR_OP_NOT_STARTED + " msg=" + msg +//// + ", this=" + this); +// } +// +// messagesConsumed(bopId, nmessages); +// +// if (TableLog.tableLog.isInfoEnabled()) { +// TableLog.tableLog.info(getTableRow("addSrc", runningOnServiceId, +// bopId, msg.getPartitionId(), nmessages/* fanIn */, +// null/* cause */, null/* stats */)); +// } +// +// if (log.isInfoEnabled()) +// log.info("startOp: " + toString() + " : bop=" + bopId); +// +// if (log.isTraceEnabled()) +// log.trace(msg.toString()); +// +// } /** * Update the {@link RunState} to reflect the post-condition of the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-22 17:53:21 UTC (rev 3837) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-22 19:45:33 UTC (rev 3838) @@ -28,6 +28,9 @@ package com.bigdata.bop.engine; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -36,7 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -55,13 +58,12 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; -import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.relation.accesspath.MultiplexBlockingBuffer; +import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; +import com.bigdata.relation.accesspath.MultiSourceSequentialAsynchronousIterator; import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.concurrent.Haltable; @@ -85,6 +87,12 @@ * controller is attempted on some other {@link IQueryPeer}. */ static protected final String ERR_NOT_CONTROLLER = "Operator only permitted on the query controller"; + + /** + * Error message used when a request is made after the query has stopped + * executing. + */ + static protected final String ERR_QUERY_DONE = "Query is no longer running"; /** * The class executing the query on this node. @@ -141,67 +149,66 @@ * A collection of (bopId,partitionId) keys mapped onto a collection of * operator task evaluation contexts for currently executing operators for * this query. - * - * @todo Futures are not being cleared from this collection as operators - * complete. This should be done systematically in order to ensure - * that any allocations associated with an operator task execution are - * released in a timely manner for long-running operators. (In fact, - * the {@link IAllocationContext} should take care of most of the - * issues here but we could still wind up with a lot of entries in - * this map in scale-out where there can be up to one per bop per - * shard in a given query.) */ private final ConcurrentHashMap<BSBundle, ChunkFutureTask> operatorFutures; /** + * A map of unbounded work queues for each (bopId,partitionId). Empty queues + * are removed from the map. + * <p> + * The map is guarded by the {@link #lock}. + */ + private final Map<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> operatorQueues; + + /** * The runtime statistics for each {@link BOp} in the query and * <code>null</code> unless this is the query controller. */ final private ConcurrentHashMap<Integer/* bopId */, BOpStats> statsMap; - /** - * When running in stand alone, we can chain together the operators and have - * much higher throughput. Each operator has an {@link BlockingBuffer} which - * is essentially its input queue. The operator will drain its input queue - * using {@link BlockingBuffer#iterator()}. - * <p> - * Each operator closes its {@link IBlockingBuffer} sink(s) once its own - * source has been closed and it has finished processing that source. Since - * multiple producers can target the same operator, we need a means to - * ensure that the source for the target operator is not closed until each - * producer which targets that operator has closed its corresponding sink. - * <p> - * In order to support this many-to-one producer/consumer pattern, we wrap - * the input queue (a {@link BlockingBuffer}) for each operator having - * multiple sources with a {@link MultiplexBlockingBuffer}. This class gives - * each producer their own view on the underlying {@link BlockingBuffer}. - * The underlying {@link BlockingBuffer} will not be closed until all - * source(s) have closed their view of that buffer. This collection keeps - * track of the {@link MultiplexBlockingBuffer} wrapping the - * {@link BlockingBuffer} which is the input queue for each operator. - * <p> - * The input queues themselves are {@link BlockingBuffer} objects. Those - * objects are available from this map using - * {@link MultiplexBlockingBuffer#getBackingBuffer()}. These buffers are - * pre-allocated by {@link #populateInputBufferMap(BOp)}. - * {@link #startTasks(BOp)} is responsible for starting the operator tasks - * in a "back-to-front" order. {@link #startQuery(IChunkMessage)} kicks off - * the query and invokes {@link #startTasks(BOp)} to chain the input queues - * and output queues together (when so chained, the output queues are skins - * over the input queues obtained from {@link MultiplexBlockingBuffer}). - * - * FIXME The inputBufferMap will let us construct consumer producer chains - * where the consumer _waits_ for all producer(s) which target the consumer - * to close the sink associated with that consumer. Unlike when attaching an - * {@link IChunkMessage} to an already running operator, the consumer will - * NOT terminate (due to lack up input) until each running producer - * terminating that consumer terminates. This will improve concurrency, - * result in fewer task instances, and have better throughput than attaching - * a chunk to an already running task. However, in scale-out we will have - * tasks running on different nodes so we can not always chain together the - * producer and consumer in this tightly integrated manner. - */ - final private ConcurrentHashMap<Integer/*operator*/, MultiplexBlockingBuffer<IBindingSet[]>/*inputQueue*/> inputBufferMap; +// /** +// * When running in stand alone, we can chain together the operators and have +// * much higher throughput. Each operator has an {@link BlockingBuffer} which +// * is essentially its input queue. The operator will drain its input queue +// * using {@link BlockingBuffer#iterator()}. +// * <p> +// * Each operator closes its {@link IBlockingBuffer} sink(s) once its own +// * source has been closed and it has finished processing that source. Since +// * multiple producers can target the same operator, we need a means to +// * ensure that the source for the target operator is not closed until each +// * producer which targets that operator has closed its corresponding sink. +// * <p> +// * In order to support this many-to-one producer/consumer pattern, we wrap +// * the input queue (a {@link BlockingBuffer}) for each operator having +// * multiple sources with a {@link MultiplexBlockingBuffer}. This class gives +// * each producer their own view on the underlying {@link BlockingBuffer}. +// * The underlying {@link BlockingBuffer} will not be closed until all +// * source(s) have closed their view of that buffer. This collection keeps +// * track of the {@link MultiplexBlockingBuffer} wrapping the +// * {@link BlockingBuffer} which is the input queue for each operator. +// * <p> +// * The input queues themselves are {@link BlockingBuffer} objects. Those +// * objects are available from this map using +// * {@link MultiplexBlockingBuffer#getBackingBuffer()}. These buffers are +// * pre-allocated by {@link #populateInputBufferMap(BOp)}. +// * {@link #startTasks(BOp)} is responsible for starting the operator tasks +// * in a "back-to-front" order. {@link #startQuery(IChunkMessage)} kicks off +// * the query and invokes {@link #startTasks(BOp)} to chain the input queues +// * and output queues together (when so chained, the output queues are skins +// * over the input queues obtained from {@link MultiplexBlockingBuffer}). +// * +// * FIXME The inputBufferMap will let us construct consumer producer chains +// * where the consumer _waits_ for all producer(s) which target the consumer +// * to close the sink associated with that consumer. Unlike when attaching an +// * {@link IChunkMessage} to an already running operator, the consumer will +// * NOT terminate (due to lack up input) until each running producer +// * terminating that consumer terminates. This will improve concurrency, +// * result in fewer task instances, and have better throughput than attaching +// * a chunk to an already running task. However, in scale-out we will have +// * tasks running on different nodes so we can not always chain together the +// * producer and consumer in this tightly integrated manner. +// */ +// final private ConcurrentHashMap<Integer/*operator*/, MultiplexBlockingBuffer<IBindingSet[]>/*inputQueue*/> inputBufferMap; /** * The buffer used for the overall output of the query pipeline. @@ -244,14 +251,14 @@ */ final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); - /** - * The chunks available for immediate processing (they must have been - * materialized). - * <p> - * Note: This is package private so it will be visible to the - * {@link QueryEngine}. - */ - final/* private */BlockingQueue<IChunkMessage<IBindingSet>> chunksIn = new LinkedBlockingDeque<IChunkMessage<IBindingSet>>(); +// /** +// * The chunks available for immediate processing (they must have been +// * materialized). +// * <p> +// * Note: This is package private so it will be visible to the +// * {@link QueryEngine}. +// */ +// final/* private */BlockingQueue<IChunkMessage<IBindingSet>> chunksIn = new LinkedBlockingDeque<IChunkMessage<IBindingSet>>(); /** * Set the query deadline. The query will be cancelled when the deadline is @@ -368,6 +375,21 @@ } /** + * Lookup and return the {@link BOp} with that identifier using an index. + * + * @param bopId + * The identifier. + * + * @return The {@link BOp} -or- <code>null</code> if no {@link BOp} was + * found in the query with for that identifier. + */ + public BOp getBOp(final int bopId) { + + return bopIndex.get(bopId); + + } + + /** * @param queryEngine * The {@link QueryEngine} on which the query is running. In * scale-out, a query is typically instantiated on many @@ -430,6 +452,8 @@ this.operatorFutures = new ConcurrentHashMap<BSBundle, ChunkFutureTask>(); + this.operatorQueues = new LinkedHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + /* * Setup the BOpStats object for each pipeline operator in the query. */ @@ -445,7 +469,8 @@ final BOpStats queryStats = statsMap.get(query.getId()); - queryBuffer = query.newBuffer(queryStats); + queryBuffer = new BlockingBufferWithStats<IBindingSet[]>(query, + queryStats); queryIterator = new QueryResultIterator<IBindingSet[]>(this, queryBuffer.iterator()); @@ -467,31 +492,31 @@ } - if(!queryEngine.isScaleOut()) { - /* - * Since the query engine is using the stand alone database mode we - * will now setup the input queues for each operator. Those queues - * will be used by each operator which targets a given operator. - * Each operator will start once and will run until all of its - * source(s) are closed. - * - * This allocates the buffers in a top-down manner (this is the - * reverse of the pipeline evaluation order). Allocation halts at if - * we reach an operator without children (e.g., StartOp) or an - * operator which is a CONTROLLER (Union). (If allocation does not - * halt at those boundaries then we can allocate buffers which will - * not be used. On the one hand, the StartOp receives a message - * containing the chunk to be evaluated. On the other hand, the - * buffers are not shared between the parent and a subquery so - * allocation within the subquery is wasted. This is also true for - * the [statsMap].) - */ - inputBufferMap = null; -// inputBufferMap = new ConcurrentHashMap<Integer, MultiplexBlockingBuffer<IBindingSet[]>>(); -// populateInputBufferMap(query); - } else { - inputBufferMap = null; - } +// if(!queryEngine.isScaleOut()) { +// /* +// * Since the query engine is using the stand alone database mode we +// * will now setup the input queues for each operator. Those queues +// * will be used by each operator which targets a given operator. +// * Each operator will start once and will run until all of its +// * source(s) are closed. +// * +// * This allocates the buffers in a top-down manner (this is the +// * reverse of the pipeline evaluation order). Allocation halts at if +// * we reach an operator without children (e.g., StartOp) or an +// * operator which is a CONTROLLER (Union). (If allocation does not +// * halt at those boundaries then we can allocate buffers which will +// * not be used. On the one hand, the StartOp receives a message +// * containing the chunk to be evaluated. On the other hand, the +// * buffers are not shared between the parent and a subquery so +// * allocation within the subquery is wasted. This is also true for +// * the [statsMap].) +// */ +// inputBufferMap = null; +//// inputBufferMap = new ConcurrentHashMap<Integer, MultiplexBlockingBuffer<IBindingSet[]>>(); +//// populateInputBufferMap(query); +// } else { +// inputBufferMap = null; +// } } @@ -562,140 +587,97 @@ // // } - /** - * Take a chunk generated by some pass over an operator and make it - * available to the target operator. How this is done depends on whether the - * query is running against a standalone database or the scale-out database. - * <p> - * Note: The return value is used as part of the termination criteria for - * the query. - * <p> - * The default implementation supports a standalone database. The generated - * chunk is left on the Java heap and handed off synchronously using - * {@link QueryEngine#acceptChunk(IChunkMessage)}. That method will queue - * the chunk for asynchronous processing. - * - * @param bop - * The operator which wrote on the sink. - * @param sinkId - * The identifier of the target operator. - * @param sink - * The intermediate results to be passed to that target operator. - * - * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) - * for scale-up. For scale-out, there will be at least one - * {@link IChunkMessage} per index partition over which the - * intermediate results were mapped. - */ - protected <E> int handleOutputChunk(final BOp bop, final int sinkId, - final IBlockingBuffer<IBindingSet[]> sink) { +// /** +// * Take a chunk generated by some pass over an operator and make it +// * available to the target operator. How this is done depends on whether the +// * query is running against a standalone database or the scale-out database. +// * <p> +// * Note: The return value is used as part of the termination criteria for +// * the query. +// * <p> +// * The default implementation supports a standalone database. The generated +// * chunk is left on the Java heap and handed off synchronously using +// * {@link QueryEngine#acceptChunk(IChunkMessage)}. That method will queue +// * the chunk for asynchronous processing. +// * +// * @param bop +// * The operator which wrote on the sink. +// * @param sinkId +// * The identifier of the target operator. +// * @param sink +// * The intermediate results to be passed to that target operator. +// * +// * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) +// * for scale-up. For scale-out, there will be at least one +// * {@link IChunkMessage} per index partition over which the +// * intermediate results were mapped. +// */ +// protected <E> int handleOutputChunk(final BOp bop, final int sinkId, +// final IBlockingBuffer<IBindingSet[]> sink) { +// +// if (bop == null) +// throw new IllegalArgumentException(); +// +// if (sink == null) +// throw new IllegalArgumentException(); +// +// if (inputBufferMap != null && inputBufferMap.get(sinkId) != null) { +// /* +// * FIXME The sink is just a wrapper for the input buffer so we do +// * not need to do anything to propagate the data from one operator +// * to the next. +// */ +// return 0; +// } +// +// /* +// * Note: The partitionId will always be -1 in scale-up. +// */ +// final int partitionId = -1; +// +// final boolean oneMessagePerChunk = bop.getProperty( +// QueryEngineTestAnnotations.ONE_MESSAGE_PER_CHUNK, +// QueryEngineTestAnnotations.DEFAULT_ONE_MESSAGE_PER_CHUNK); +// +// if (oneMessagePerChunk) { +// +// final IAsynchronousIterator<IBindingSet[]> itr = sink.iterator(); +// +// int nchunks = 0; +// +// while (itr.hasNext()) { +// +// final IBlockingBuffer<IBindingSet[]> tmp = new BlockingBuffer<IBindingSet[]>( +// 1); +// +// tmp.add(itr.next()); +// +// tmp.close(); +// +// final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>( +// clientProxy, queryId, sinkId, partitionId, tmp +// .iterator()); +// +// queryEngine.acceptChunk(chunk); +// +// nchunks++; +// +// } +// +// return nchunks; +// +// } +// +// final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>( +// clientProxy, queryId, sinkId, partitionId, sink.iterator()); +// +// queryEngine.acceptChunk(chunk); +// +// return 1; +// +// } - if (bop == null) - throw new IllegalArgumentException(); - - if (sink == null) - throw new IllegalArgumentException(); - - if (inputBufferMap != null && inputBufferMap.get(sinkId) != null) { - /* - * FIXME The sink is just a wrapper for the input buffer so we do - * not need to do anything to propagate the data from one operator - * to the next. - */ - return 0; - } - - /* - * Note: The partitionId will always be -1 in scale-up. - */ - final int partitionId = -1; - - final boolean oneMessagePerChunk = bop.getProperty( - QueryEngineTestAnnotations.ONE_MESSAGE_PER_CHUNK, - QueryEngineTestAnnotations.DEFAULT_ONE_MESSAGE_PER_CHUNK); - - if (oneMessagePerChunk) { - - final IAsynchronousIterator<IBindingSet[]> itr = sink.iterator(); - - int nchunks = 0; - - while (itr.hasNext()) { - - final IBlockingBuffer<IBindingSet[]> tmp = new BlockingBuffer<IBindingSet[]>( - 1); - - tmp.add(itr.next()); - - tmp.close(); - - final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>( - clientProxy, queryId, sinkId, partitionId, tmp - .iterator()); - - queryEngine.acceptChunk(chunk); - - nchunks++; - - } - - return nchunks; - - } - - final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>( - clientProxy, queryId, sinkId, partitionId, sink.iterator()); - - queryEngine.acceptChunk(chunk); - - return 1; - - } - /** - * Make a chunk of binding sets available for consumption by the query. - * <p> - * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} - * - * @param msg - * The chunk. - * - * @todo Does this method really need the {@link #lock}? I doubt it since - * {@link #chunksIn} is thread-safe. - */ - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { - - if (msg == null) - throw new IllegalArgumentException(); - - if (!msg.isMaterialized()) - throw new IllegalStateException(); - - lock.lock(); - - try { - - // verify still running. - if (future.isDone()) { - throw new RuntimeException("Query is done", future.getCause()); - } - - // add chunk to be consumed. - chunksIn.add(msg); - - if (log.isDebugEnabled()) - log - .debug("chunksIn.size()=" + chunksIn.size() + ", msg=" - + msg); - } finally { - - lock.unlock(); - - } - - } - - /** * Invoked once by the query controller with the initial * {@link IChunkMessage} which gets the query moving. */ @@ -949,155 +931,254 @@ } +// /** +// * Consume zero or more chunks in the input queue for this query. The +// * chunk(s) will either be assigned to an already running task for the +// * target operator or they will be assigned to new tasks. +// * +// * FIXME Drain the input queue, assigning any chunk waiting to a task. If +// * the task is already running, then add the chunk to that task. Otherwise +// * start a new task. +// */ +// protected void consumeChunk() { +// final IChunkMessage<IBindingSet> msg = chunksIn.poll(); +// if (msg == null) +// return; +// try { +// if (!msg.isMaterialized()) +// throw new IllegalStateException(); +// if (log.isTraceEnabled()) +// log.trace("Accepted chunk: " + msg); +// final BSBundle bundle = new BSBundle(msg.getBOpId(), msg +// .getPartitionId()); +//// /* +//// * Look for instance of this task which is already running. +//// */ +//// final ChunkFutureTask chunkFutureTask = operatorFutures.get(bundle); +//// if (!queryEngine.isScaleOut() && chunkFutureTask != null) { +//// /* +//// * Attempt to atomically attach the message as another src. +//// */ +//// if (chunkFutureTask.chunkTask.context.addSource(msg +//// .getChunkAccessor().iterator())) { +//// /* +//// * @todo I've commented this out for now. I am not convinced +//// * that we need to update the RunState when accepting +//// * another message into a running task. This would only +//// * matter if haltOp() reported the #of consumed messages, +//// * but RunState.haltOp() just decrements the #of available +//// * messages by one which balances startOp(). Just because we +//// * attach more messages dynamically does not mean that we +//// * need to report that back to the query controller as long +//// * as haltOp() balances startOp(). +//// */ +////// lock.lock(); +////// try { +////// /* +////// * message was added to a running task. +////// * +////// * FIXME This needs to be an RMI in scale-out back to +////// * the query controller so it can update the #of +////// * messages which are being consumed by this task. +////// * However, doing RMI here will add latency into the +////// * thread submitting tasks for evaluation and the +////// * coordination overhead of addSource() in scale-out may +////// * be too high. However, if we do not combine sources in +////// * scale-out then we may have too much overhead in terms +////// * of the #of running tasks with few tuples per task. +////// * Another approach is the remote async iterator with +////// * multiple sources (parallel multi source iterator). +////// * +////// * FIXME This code path is NOT being taken in scale-out +////// * right now since it would not get the message to the +////// * query controller. We will need to add addSource() to +////// * IQueryClient parallel to startOp() and haltOp() for +////// * this to work. +////// */ +////// runState.addSource(msg, queryEngine.getServiceUUID()); +////// return; +////// } finally { +////// lock.unlock(); +////// } +//// } +//// } +// // wrap runnable. +// final ChunkFutureTask ft = new ChunkFutureTask(new ChunkTask(msg)); +// /* +// * FIXME Rather than queue up a bunch of operator tasks for the same +// * (bopId,partitionId), this blocks until the current operator task +// * is done and then submits the new one. This prevents us from +// * allocating 100s of threads for complex queries and prevents us +// * from losing track of the Futures of those tasks. However, since +// * this is happening in the caller's thread the QueryEngine is not +// * making any progress while we are blocked. A pattern which hooks +// * the Future and then submits the next task (such as the +// * LatchedExecutor) would fix this. This might have to be one +// * LatchedExecutor per pipeline operator. +// */ +// FutureTask<Void> existing = operatorFutures.putIfAbsent(bundle, ft); +// if (existing != null) { +// existing.get(); +// if (!operatorFutures.remove(bundle, existing)) +// throw new AssertionError(); +// if (operatorFutures.put(bundle, ft) != null) +// throw new AssertionError(); +// } +//// // add to list of active futures for this query. +//// if (operatorFutures.put(bundle, ft) != null) { +//// /* +//// * Note: This can cause the FutureTask to be accessible (above) +//// * before startOp() has been called for that ChunkTask (the +//// * latter occurs when the chunk task actually runs.) This a race +//// * condition has been resolved in RunState by allowing +//// * addSource() even when there is no registered task running for +//// * that [bopId]. +//// * +//// * FIXME This indicates that we have more than one future for +//// * the same (bopId,shardId). When this is true we are losing +//// * track of Futures with the consequence that we can not +//// * properly cancel them. Instead of losing track like this, we +//// * should be targeting the running operator instance with the +//// * new chunk. This needs to be done atomically, e.g., using the +//// * [lock]. +//// * +//// * Even if we only have one task per operator in standalone and +//// * we attach chunks to an already running task in scale-out, +//// * there is still the possibility in scale-out that a task may +//// * have closed its source but still be running, in which case we +//// * would lose the Future for the already running task when we +//// * start a new task for the new chunk for the target operator. +//// */ +//// // throw new AssertionError(); +//// } +// // submit task for execution (asynchronous). +// queryEngine.execute(ft); +// } catch (Throwable ex) { +// // halt query. +// throw new RuntimeException(halt(ex)); +// } +// } + /** - * Consume zero or more chunks in the input queue for this query. The - * chunk(s) will either be assigned to an already running task for the - * target operator or they will be assigned to new tasks. + * Make a chunk of binding sets available for consumption by the query. + * <p> + * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} * - * FIXME Drain the input queue, assigning any chunk waiting to a task. If - * the task is already running, then add the chunk to that task. Otherwise - * start a new task. + * @param msg + * The chunk. */ - protected void consumeChunk() { - final IChunkMessage<IBindingSet> msg = chunksIn.poll(); + protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + if (msg == null) - return; + throw new IllegalArgumentException(); + + if (!msg.isMaterialized()) + throw new IllegalStateException(); + + final BSBundle bundle = new BSBundle(msg.getBOpId(), msg + .getPartitionId()); + + lock.lock(); + try { - if (!msg.isMaterialized()) - throw new IllegalStateException(); - if (log.isTraceEnabled()) - log.trace("Accepted chunk: " + msg); - final BSBundle bundle = new BSBundle(msg.getBOpId(), msg - .getPartitionId()); + + // verify still running. + if (future.isDone()) + throw new RuntimeException(ERR_QUERY_DONE, future.getCause()); + + BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues + .get(bundle); + + if (queue == null) { + + queue = new LinkedBlockingQueue<IChunkMessage<IBindingSet>>(/* unbounded */); + + operatorQueues.put(bundle, queue); + + } + + queue.add(msg); + + } finally { + + lock.unlock(); + + } + + } + + /** + * Examines the input queue for each (bopId,partitionId). If there is work + * available and no task is currently running, then drain the work queue and + * submit a task to consume that work. + */ + protected void consumeChunk() { + lock.lock(); + try { + for(BSBundle bundle : operatorQueues.keySet()) { + scheduleNext(bundle); + } + } finally { + lock.unlock(); + } + } + + /** + * Examine the input queue for the (bopId,partitionId). If there is work + * available and no task is currently running, then drain the work queue and + * submit a task to consume that work. + * + * @param bundle + * The (bopId,partitionId). + * + * @return <code>true</code> if a new task was started. + */ + private boolean scheduleNext(final BSBundle bundle) { + if (bundle == null) + throw new IllegalArgumentException(); + lock.lock(); + try { + // Make sure the query is still running. + future.halted(); + // Is there a Future for this (bopId,partitionId)? + final ChunkFutureTask cft = operatorFutures.get(bundle); + if (cft != null && !cft.isDone()) { + // already running. + return false; + } + // Remove the work queue for that (bopId,partitionId). + final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues + .remove(bundle); + if (queue == null || queue.isEmpty()) { + // no work + return false; + } + // Drain the work queue. + final List<IChunkMessage<IBindingSet>> messages = new LinkedList<IChunkMessage<IBindingSet>>(); + queue.drainTo(messages); + final int nmessages = messages.size(); /* - * Look for instance of this task which is already running. + * Combine the messages into a single source to be consumed by a + * task. */ - final ChunkFutureTask chunkFutureTask = operatorFutures.get(bundle); - if (!queryEngine.isScaleOut() && chunkFutureTask != null) { - /* - * Attempt to atomically attach the message as another src. - */ - if (chunkFutureTask.chunkTask.context.addSource(msg - ... [truncated message content] |