From: <tho...@us...> - 2014-01-13 14:59:00
|
Revision: 7780 http://bigdata.svn.sourceforge.net/bigdata/?rev=7780&view=rev Author: thompsonbry Date: 2014-01-13 14:58:53 +0000 (Mon, 13 Jan 2014) Log Message: ----------- Added option (set via static final boolean) to use an ordered map over the (bopId,shardId) pairs to associate them with the operator input queues rather than a hash map. Both maps are concurrent. This option is currently set to false which is equivalent to the historical behavior. There is a possibility that setting this to true would cause solutions to have less dwell time on the JVM heap since it would bias the QueryEngine to run chunks associated with operators having bopId values that are smaller first. I also modified BSBundle so that it implements Comparable. It's default order should order BSBundles with lower bopIds first. This is the other half of how we can create this front bias over the input queues for the query plan. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2014-01-13 14:32:13 UTC (rev 7779) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2014-01-13 14:58:53 UTC (rev 7780) @@ -27,6 +27,8 @@ package com.bigdata.bop.engine; +import java.util.Comparator; + /** * An immutable class capturing the evaluation context of an operator against a * shard. @@ -34,12 +36,13 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public class BSBundle { +public class BSBundle implements Comparable<BSBundle> { public final int bopId; public final int shardId; + @Override public String toString() { return super.toString() + "{bopId=" + bopId + ",shardId=" + shardId @@ -55,15 +58,14 @@ } - /** - * {@inheritDoc} - */ + @Override public int hashCode() { return (bopId * 31) + shardId; } + @Override public boolean equals(final Object o) { if (this == o) @@ -78,4 +80,39 @@ } + /** + * {@inheritDoc} + * <p> + * This orders the {@link BSBundle}s by reverse {@link #bopId} and by + * {@link #shardId} if the {@link #bopId} is the same. This order imposes a + * bias to draw entries with higher {@link #bopId}s from an ordered + * collection. + * <p> + * Note: Query plans are assigned bopIds from 0 through N where higher + * bopIds are assigned to operators that occur later in the query plan. This + * is not a strict rule, but it is a strong bias. Given that bias and an + * ordered map, this {@link Comparator} will tend to draw from operators + * that are further along in the query plan. This emphasizes getting results + * through the pipeline quickly. Whether or not this {@link Comparator} has + * any effect depends on the {@link ChunkedRunningQuery#consumeChunk()} + * method and the backing map over the operator queues. If a hash map is + * used, then the {@link Comparator} is ignored. If a skip list map is used, + * then the {@link Comparator} will influence the manner in which the + * operator queues are drained. + */ + @Override + public int compareTo(final BSBundle o) { + + int ret = -Integer.compare(bopId, o.bopId); + + if (ret == 0) { + + ret = -Integer.compare(shardId, o.shardId); + + } + + return ret; + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-13 14:32:13 UTC (rev 7779) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-13 14:58:53 UTC (rev 7780) @@ -37,13 +37,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.AssertionFailedError; - import org.apache.log4j.Logger; import com.bigdata.bop.BOp; @@ -112,20 +111,35 @@ * are removed from the map. * <p> * The map is guarded by the {@link #lock}. + * <p> + * Note: Using a hash map here means that {@link #consumeChunks()} will draw + * from operator queues more or less at random. Using an ordered map will + * impose a bias, depending on the natural ordering of the map keys. * - * FIXME Either this and/or {@link #operatorFutures} must be a weak value - * map in order to ensure that entries are eventually cleared in scale-out - * where the #of entries can potentially be very large since they are per - * (bopId,shardId). While these maps were initially declared as - * {@link ConcurrentHashMap} instances, if we remove entries once the - * map/queue entry is empty, this appears to open a concurrency hole which - * does not exist if we leave entries with empty map/queue values in the - * map. Changing to a weak value map should provide the necessary pruning of - * unused entries without opening up this concurrency hole. + * @see BSBundle#compareTo(BSBundle) + * + * FIXME Either this and/or {@link #operatorFutures} must be a weak + * value map in order to ensure that entries are eventually cleared in + * scale-out where the #of entries can potentially be very large since + * they are per (bopId,shardId). While these maps were initially + * declared as {@link ConcurrentHashMap} instances, if we remove + * entries once the map/queue entry is empty, this appears to open a + * concurrency hole which does not exist if we leave entries with empty + * map/queue values in the map. Changing to a weak value map should + * provide the necessary pruning of unused entries without opening up + * this concurrency hole. */ private final ConcurrentMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> operatorQueues; /** + * Set to <code>true</code> to make {@link #operatorQueues} and ordered map. + * When <code>true</code>, {@link #consumeChunk()} will have an ordered bias + * in how it schedules work. [The historical behavior is present when this + * is <code>false</code>.] + */ + private static final boolean orderedOperatorQueueMap = false; + + /** * FIXME It appears that this is Ok based on a single unit test known to * fail when {@link #removeMapOperatorQueueEntries} is <code>true</code>, * but I expect that a similar concurrency problem could also exist for the @@ -204,8 +218,16 @@ this.operatorFutures = new ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>>(); - this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + if (orderedOperatorQueueMap) { + this.operatorQueues = new ConcurrentSkipListMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + + } else { + + this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + + } + } /** @@ -250,12 +272,14 @@ if (queue == null) { /* - * If the target is a pipelined operator, then we impose a limit - * on the #of messages which may be buffered for that operator. - * If the operator is NOT pipelined, e.g., ORDER_BY, then we use - * an unbounded queue. + * There is no input queue for this operator, so we create one + * now while we are holding the lock. If the target is a + * pipelined operator, then we impose a limit on the #of + * messages which may be buffered for that operator. If the + * operator is NOT pipelined, e.g., ORDER_BY, then we use an + * unbounded queue. * - * TODO Unit/stress tests with capacity set to 1. + * TODO Unit/stress tests with capacity set to 1. */ // The target operator for this message. @@ -265,12 +289,24 @@ PipelineOp.Annotations.PIPELINE_QUEUE_CAPACITY, PipelineOp.Annotations.DEFAULT_PIPELINE_QUEUE_CAPACITY) : Integer.MAX_VALUE; - + + // Create a new queue using [lock]. queue = new com.bigdata.jsr166.LinkedBlockingDeque<IChunkMessage<IBindingSet>>(// capacity, lock); - operatorQueues.put(bundle, queue); + // Add to the collection of operator input queues. + if (operatorQueues.put(bundle, queue) != null) { + + /* + * There must not be an entry for this operator. We checked + * for this above. Nobody else should be adding entries into + * the [operatorQueues] map. + */ + + throw new AssertionError(bundle.toString()); + + } } @@ -396,7 +432,7 @@ if (nrunning == 0) { // No tasks running for this operator. if(removeMapOperatorFutureEntries) - if(map!=operatorFutures.remove(bundle)) throw new AssertionError(); + if (map != operatorFutures.remove(bundle)) throw new AssertionError(); } } if (nrunning >= maxParallel) { @@ -526,14 +562,14 @@ if (queue.isEmpty()) { // No work, so remove work queue for (bopId,partitionId). if(removeMapOperatorQueueEntries) - if(queue!=operatorQueues.remove(bundle)) throw new AssertionError(); + if (queue != operatorQueues.remove(bundle)) throw new AssertionError(); return false; } /* * true iff operator requires at once evaluation and all solutions * are now available for that operator. */ - boolean atOnceReady = false; + boolean atOnceReady = false; if (!pipelined) { if (!isAtOnceReady(bundle.bopId)) { /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |