From: <tho...@us...> - 2013-11-13 21:50:30
|
Revision: 7548 http://bigdata.svn.sourceforge.net/bigdata/?rev=7548&view=rev Author: thompsonbry Date: 2013-11-13 21:50:23 +0000 (Wed, 13 Nov 2013) Log Message: ----------- Added @Override annotations and final attributes. Added checkDeadline() method on AbstractRunningQuery. This performs a non-blocking test of the RunState to determine whether a deadline (if one exists) has expired. If so, it halts the query. checkDeadline() is intended to provide a hook that can be used to force timely termination of queries that miss their deadline and do not terminate because some operator is compute bound. The current logic only check the deadline in startOp() and haltOp(). We will have to add additional logic to call checkDeadline() at other times, e.g., from a scheduled executor task, in order to ensure timely termination. See #772. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -340,24 +340,56 @@ } + /** + * If the query deadline has expired, then halt the query. + * + * @throws QueryTimeoutException + * if the query deadline has expired. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/772"> + * Query timeout only checked at operator start/stop. </a> + */ + final protected void checkDeadline() { + + try { + + runState.checkDeadline(); + + } catch (QueryTimeoutException ex) { + + halt(ex); + + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. See startOp() and haltOp() for the standard behavior. + */ + + } + + } + + @Override final public long getDeadline() { return runState.getDeadline(); } + @Override final public long getStartTime() { return startTime.get(); } + @Override final public long getDoneTime() { return doneTime.get(); } + @Override final public long getElapsed() { long mark = doneTime.get(); @@ -379,37 +411,28 @@ } + @Override public QueryEngine getQueryEngine() { return queryEngine; } - /** - * The client executing this query (aka the query controller). - * <p> - * Note: The proxy is primarily for light weight RMI messages used to - * coordinate the distributed query evaluation. Ideally, all large objects - * will be transfered among the nodes of the cluster using NIO buffers. - */ + @Override final public IQueryClient getQueryController() { return clientProxy; } - /** - * The unique identifier for this query. - */ + @Override final public UUID getQueryId() { return queryId; } - /** - * Return the operator tree for this query. - */ + @Override final public PipelineOp getQuery() { return query; @@ -425,6 +448,7 @@ } + @Override final public Map<Integer/* bopId */, BOpStats> getStats() { return Collections.unmodifiableMap(statsMap); @@ -744,6 +768,11 @@ halt(ex); + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. + */ + } finally { lock.unlock(); @@ -830,6 +859,11 @@ halt(t); + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. + */ + } finally { lock.unlock(); @@ -1149,6 +1183,7 @@ */ abstract protected void consumeChunk(); + @Override final public ICloseableIterator<IBindingSet[]> iterator() { if (!controller) @@ -1161,6 +1196,7 @@ } + @Override final public void halt(final Void v) { lock.lock(); @@ -1181,6 +1217,7 @@ } + @Override final public <T extends Throwable> T halt(final T t) { if (t == null) @@ -1223,6 +1260,7 @@ * consume them.</li> * </ul> */ + @Override final public boolean cancel(final boolean mayInterruptIfRunning) { /* * Set if we notice an interrupt during clean up of the query and then @@ -1397,43 +1435,50 @@ } + @Override final public Void get() throws InterruptedException, ExecutionException { return future.get(); } - final public Void get(long arg0, TimeUnit arg1) + @Override + final public Void get(final long arg0, final TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { return future.get(arg0, arg1); } + @Override final public boolean isCancelled() { return future.isCancelled(); } + @Override final public boolean isDone() { return future.isDone(); } + @Override final public Throwable getCause() { return future.getCause(); } + @Override public IBigdataFederation<?> getFederation() { return queryEngine.getFederation(); } + @Override public IIndexManager getLocalIndexManager() { return queryEngine.getIndexManager(); @@ -1526,6 +1571,7 @@ * buffered on the native heap) rather than as a limit to the among of * native memory the operator may use while it is running. */ + @Override public IMemoryManager getMemoryManager() { IMemoryManager memoryManager = this.memoryManager.get(); if (memoryManager == null) { @@ -1545,6 +1591,7 @@ private final AtomicReference<IMemoryManager> memoryManager = new AtomicReference<IMemoryManager>(); + @Override final public IQueryAttributes getAttributes() { return queryAttributes; @@ -1618,6 +1665,7 @@ } } + @Override public String toString() { final StringBuilder sb = new StringBuilder(getClass().getName()); sb.append("{queryId=" + queryId); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -85,9 +85,13 @@ /** * The client coordinate the evaluation of this query (aka the query * controller). For a standalone database, this will be the - * {@link QueryEngine}. For scale-out, this will be the RMI proxy for the - * {@link QueryEngine} instance to which the query was submitted for - * evaluation by the application. + * {@link QueryEngine}. + * <p> + * For scale-out, this will be the RMI proxy for the {@link QueryEngine} + * instance to which the query was submitted for evaluation by the + * application. The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. */ IQueryClient getQueryController(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -722,6 +722,7 @@ } + @Override public void run() { if(log.isInfoEnabled()) log.info("Running: " + this); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -606,8 +606,7 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); + checkDeadline(); if (!innerState.started.compareAndSet(false/* expect */, true/* update */)) throw new IllegalStateException(ERR_QUERY_STARTED); @@ -704,11 +703,9 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); -// + " bopId="+msg.bopId+" : msg="+msg); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); - + checkDeadline(); + innerState.stepCount.incrementAndGet(); final boolean firstTime = _startOp(msg); @@ -777,6 +774,19 @@ } // RunStateEnum /** + * Check the query to see whether its deadline has expired. + * + * @throws QueryTimeoutException + * if the query deadline has expired. + */ + protected void checkDeadline() throws QueryTimeoutException { + + if (innerState.deadline.get() < System.currentTimeMillis()) + throw new QueryTimeoutException(ERR_DEADLINE); + + } + + /** * Update the {@link RunState} to reflect the post-condition of the * evaluation of an operator against one or more {@link IChunkMessage}s, * adjusting the #of messages available for consumption by the operator @@ -809,8 +819,7 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); + checkDeadline(); innerState.stepCount.incrementAndGet(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-15 17:00:22
|
Revision: 7553 http://bigdata.svn.sourceforge.net/bigdata/?rev=7553&view=rev Author: thompsonbry Date: 2013-11-15 17:00:14 +0000 (Fri, 15 Nov 2013) Log Message: ----------- javadoc/@Override annotations Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2013-11-14 20:27:35 UTC (rev 7552) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2013-11-15 17:00:14 UTC (rev 7553) @@ -16,9 +16,14 @@ public interface IQueryPeer extends Remote { /** - * The {@link UUID} of the service within which the {@link IQueryPeer} is + * The {@link UUID} of the service in which this {@link QueryEngine} is * running. * + * @return The {@link UUID} of the service in which this {@link QueryEngine} + * is running -or- a unique and distinct UUID if the + * {@link QueryEngine} is not running against an + * IBigdataFederation. + * * @see IService#getServiceUUID() */ UUID getServiceUUID() throws RemoteException; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-14 20:27:35 UTC (rev 7552) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-15 17:00:14 UTC (rev 7553) @@ -59,6 +59,7 @@ /** * The unique identifier for this query. */ + @Override UUID getQueryId(); /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-04 21:59:15
|
Revision: 7726 http://bigdata.svn.sourceforge.net/bigdata/?rev=7726&view=rev Author: thompsonbry Date: 2014-01-04 21:59:08 +0000 (Sat, 04 Jan 2014) Log Message: ----------- The best way to analyze this is to use hint:atOnce to force each operator in the query plan to execute only once. You can then look at the opCount column in the detailed Explain view and see whether the correct number of operator invocations was reported. It should be ONE (1) for each operator where the atOnce hint was correctly applied. If the atOnce hint is applied to an operator, then it sets pipelined:=false as follows: com.bigdata.bop.PipelineOp.pipelined = false I have modified BOPStats to NOT pre-increment opCount. I have modified AbstractRunningQuery.haltOp() to post-increment opCount One consequence is that the opCount will not update until the operator has halted. Thus, it is a "how many times did this operator run successfully counter" rather than a "how many times did this operator start counter". However, it is now reporting the correct values. I cross validated this by logging out the following in ChunkedRunningQuery: {{{ if (log.isInfoEnabled()) log.info("Running task: bop=" + bundle.bopId + (pipelined?"":", atOnceReady=" + atOnceReady) + ", bop=" + bop.toShortString() + ", messages=" + naccepted + ", solutions=" + solutionsAccepted + (log.isDebugEnabled()?", runState=" + runStateString():"")); getQueryEngine().execute(cft); }}} This is the code that actually runs the operator. I then verified that the data on operator invocations was correct. I also cross correlated where pipelined:=false in the query plan with opCount:=1, i.e., if atOnce evaluation was imposed on the operator, then we had only one invocation reported for that operator. See #793. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2014-01-04 21:46:36 UTC (rev 7725) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2014-01-04 21:59:08 UTC (rev 7726) @@ -825,18 +825,38 @@ log.trace(msg.toString()); // update per-operator statistics. - final BOpStats tmp = statsMap.putIfAbsent(msg.getBOpId(), msg.getStats()); + { + // Data race on insert into CHM. + BOpStats tmp = statsMap.putIfAbsent(msg.getBOpId(), + msg.getStats()); - /* - * Combine stats, but do not combine a stats object with itself. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/464 (Query - * Statistics do not update correctly on cluster) - */ - if (tmp != null && tmp != msg.getStats()) { - tmp.add(msg.getStats()); + /** + * Combine stats, but do not combine a stats object with itself. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/464"> + * Query Statistics do not update correctly on cluster</a> + */ + if (tmp == null) { + // won the data race. + tmp = msg.getStats(); + } else { + // lost the data race. + if (tmp != msg.getStats()) { + tmp.add(msg.getStats()); + } + } + /** + * Post-increment now that we know who one the data race. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/793"> + * Explain reports incorrect value for opCount</a> + */ + tmp.opCount.increment(); + // log.warn("bop=" + getBOp(msg.getBOpId()).toShortString() + // + " : stats=" + tmp); } -// log.warn(msg.toString() + " : stats=" + tmp); switch (runState.haltOp(msg)) { case Running: Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2014-01-04 21:46:36 UTC (rev 7725) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2014-01-04 21:59:08 UTC (rev 7726) @@ -30,7 +30,6 @@ import java.io.Serializable; import com.bigdata.bop.BOp; -import com.bigdata.bop.PipelineOp; import com.bigdata.counters.CAT; /** @@ -55,18 +54,15 @@ */ final public CAT elapsed = new CAT(); - /** - * The #of instances of a given operator which have been created for a given - * query. This provides interesting information about the #of task instances - * for each operator which were required to execute a query. - * - * TODO Due to the way this is incremented, this is always ONE (1) if - * {@link PipelineOp.Annotations#SHARED_STATE} is <code>true</code> (it - * reflects the #of times {@link #add(BOpStats)} was invoked plus one for - * the ctor rather than the #of times the operator task was invoked). This - * should be changed to reflect the #of operator task instances created - * instead. - */ + /** + * The #of instances of a given operator which have been started (and + * successully terminated) for a given query. This provides interesting + * information about the #of task instances for each operator which were + * required to execute a query. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/793"> + * Explain reports incorrect value for opCount</a> + */ final public CAT opCount = new CAT(); /** @@ -127,10 +123,16 @@ /** * Constructor. + * <p> + * Note: Do not pre-increment {@link #opCount}. See {@link #add(BOpStats)} + * and {@link AbstractRunningQuery#haltOp(IHaltOpMessage)}. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/793"> + * Explain reports incorrect value for opCount</a> */ public BOpStats() { - opCount.increment(); +// opCount.increment(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-13 14:59:00
|
Revision: 7780 http://bigdata.svn.sourceforge.net/bigdata/?rev=7780&view=rev Author: thompsonbry Date: 2014-01-13 14:58:53 +0000 (Mon, 13 Jan 2014) Log Message: ----------- Added option (set via static final boolean) to use an ordered map over the (bopId,shardId) pairs to associate them with the operator input queues rather than a hash map. Both maps are concurrent. This option is currently set to false which is equivalent to the historical behavior. There is a possibility that setting this to true would cause solutions to have less dwell time on the JVM heap since it would bias the QueryEngine to run chunks associated with operators having bopId values that are smaller first. I also modified BSBundle so that it implements Comparable. It's default order should order BSBundles with lower bopIds first. This is the other half of how we can create this front bias over the input queues for the query plan. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2014-01-13 14:32:13 UTC (rev 7779) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2014-01-13 14:58:53 UTC (rev 7780) @@ -27,6 +27,8 @@ package com.bigdata.bop.engine; +import java.util.Comparator; + /** * An immutable class capturing the evaluation context of an operator against a * shard. @@ -34,12 +36,13 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public class BSBundle { +public class BSBundle implements Comparable<BSBundle> { public final int bopId; public final int shardId; + @Override public String toString() { return super.toString() + "{bopId=" + bopId + ",shardId=" + shardId @@ -55,15 +58,14 @@ } - /** - * {@inheritDoc} - */ + @Override public int hashCode() { return (bopId * 31) + shardId; } + @Override public boolean equals(final Object o) { if (this == o) @@ -78,4 +80,39 @@ } + /** + * {@inheritDoc} + * <p> + * This orders the {@link BSBundle}s by reverse {@link #bopId} and by + * {@link #shardId} if the {@link #bopId} is the same. This order imposes a + * bias to draw entries with higher {@link #bopId}s from an ordered + * collection. + * <p> + * Note: Query plans are assigned bopIds from 0 through N where higher + * bopIds are assigned to operators that occur later in the query plan. This + * is not a strict rule, but it is a strong bias. Given that bias and an + * ordered map, this {@link Comparator} will tend to draw from operators + * that are further along in the query plan. This emphasizes getting results + * through the pipeline quickly. Whether or not this {@link Comparator} has + * any effect depends on the {@link ChunkedRunningQuery#consumeChunk()} + * method and the backing map over the operator queues. If a hash map is + * used, then the {@link Comparator} is ignored. If a skip list map is used, + * then the {@link Comparator} will influence the manner in which the + * operator queues are drained. + */ + @Override + public int compareTo(final BSBundle o) { + + int ret = -Integer.compare(bopId, o.bopId); + + if (ret == 0) { + + ret = -Integer.compare(shardId, o.shardId); + + } + + return ret; + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-13 14:32:13 UTC (rev 7779) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-13 14:58:53 UTC (rev 7780) @@ -37,13 +37,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.AssertionFailedError; - import org.apache.log4j.Logger; import com.bigdata.bop.BOp; @@ -112,20 +111,35 @@ * are removed from the map. * <p> * The map is guarded by the {@link #lock}. + * <p> + * Note: Using a hash map here means that {@link #consumeChunks()} will draw + * from operator queues more or less at random. Using an ordered map will + * impose a bias, depending on the natural ordering of the map keys. * - * FIXME Either this and/or {@link #operatorFutures} must be a weak value - * map in order to ensure that entries are eventually cleared in scale-out - * where the #of entries can potentially be very large since they are per - * (bopId,shardId). While these maps were initially declared as - * {@link ConcurrentHashMap} instances, if we remove entries once the - * map/queue entry is empty, this appears to open a concurrency hole which - * does not exist if we leave entries with empty map/queue values in the - * map. Changing to a weak value map should provide the necessary pruning of - * unused entries without opening up this concurrency hole. + * @see BSBundle#compareTo(BSBundle) + * + * FIXME Either this and/or {@link #operatorFutures} must be a weak + * value map in order to ensure that entries are eventually cleared in + * scale-out where the #of entries can potentially be very large since + * they are per (bopId,shardId). While these maps were initially + * declared as {@link ConcurrentHashMap} instances, if we remove + * entries once the map/queue entry is empty, this appears to open a + * concurrency hole which does not exist if we leave entries with empty + * map/queue values in the map. Changing to a weak value map should + * provide the necessary pruning of unused entries without opening up + * this concurrency hole. */ private final ConcurrentMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> operatorQueues; /** + * Set to <code>true</code> to make {@link #operatorQueues} and ordered map. + * When <code>true</code>, {@link #consumeChunk()} will have an ordered bias + * in how it schedules work. [The historical behavior is present when this + * is <code>false</code>.] + */ + private static final boolean orderedOperatorQueueMap = false; + + /** * FIXME It appears that this is Ok based on a single unit test known to * fail when {@link #removeMapOperatorQueueEntries} is <code>true</code>, * but I expect that a similar concurrency problem could also exist for the @@ -204,8 +218,16 @@ this.operatorFutures = new ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>>(); - this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + if (orderedOperatorQueueMap) { + this.operatorQueues = new ConcurrentSkipListMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + + } else { + + this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); + + } + } /** @@ -250,12 +272,14 @@ if (queue == null) { /* - * If the target is a pipelined operator, then we impose a limit - * on the #of messages which may be buffered for that operator. - * If the operator is NOT pipelined, e.g., ORDER_BY, then we use - * an unbounded queue. + * There is no input queue for this operator, so we create one + * now while we are holding the lock. If the target is a + * pipelined operator, then we impose a limit on the #of + * messages which may be buffered for that operator. If the + * operator is NOT pipelined, e.g., ORDER_BY, then we use an + * unbounded queue. * - * TODO Unit/stress tests with capacity set to 1. + * TODO Unit/stress tests with capacity set to 1. */ // The target operator for this message. @@ -265,12 +289,24 @@ PipelineOp.Annotations.PIPELINE_QUEUE_CAPACITY, PipelineOp.Annotations.DEFAULT_PIPELINE_QUEUE_CAPACITY) : Integer.MAX_VALUE; - + + // Create a new queue using [lock]. queue = new com.bigdata.jsr166.LinkedBlockingDeque<IChunkMessage<IBindingSet>>(// capacity, lock); - operatorQueues.put(bundle, queue); + // Add to the collection of operator input queues. + if (operatorQueues.put(bundle, queue) != null) { + + /* + * There must not be an entry for this operator. We checked + * for this above. Nobody else should be adding entries into + * the [operatorQueues] map. + */ + + throw new AssertionError(bundle.toString()); + + } } @@ -396,7 +432,7 @@ if (nrunning == 0) { // No tasks running for this operator. if(removeMapOperatorFutureEntries) - if(map!=operatorFutures.remove(bundle)) throw new AssertionError(); + if (map != operatorFutures.remove(bundle)) throw new AssertionError(); } } if (nrunning >= maxParallel) { @@ -526,14 +562,14 @@ if (queue.isEmpty()) { // No work, so remove work queue for (bopId,partitionId). if(removeMapOperatorQueueEntries) - if(queue!=operatorQueues.remove(bundle)) throw new AssertionError(); + if (queue != operatorQueues.remove(bundle)) throw new AssertionError(); return false; } /* * true iff operator requires at once evaluation and all solutions * are now available for that operator. */ - boolean atOnceReady = false; + boolean atOnceReady = false; if (!pipelined) { if (!isAtOnceReady(bundle.bopId)) { /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |